About CTM The following Alice ML code is derived from the examples provided in the book:
      "Concepts, Techniques, and Models of Computer Programming" by Peter Van Roy and Seif Haridi.
      http://www2.info.ucl.ac.be/people/PVR/book.html

(* CTM Chapter #05 Examples in Alice ML *)
import structure Channel from "x-alice:/lib/data/Channel"
import structure Random from "x-alice:/lib/utility/Random"

(* syntactic sugar for solutions using promises/futures *)
open Promise
open Future
infix 3 ?=
val op?= = fulfill
val ? = future

(* Functions defined in previous chapters *)
fun forall nil f = ()
  | forall (x::xs) f = (f x; forall xs f);

fun known x =
   let
      val p = promise()
   in
      fulfill(p, x);
      p
   end

fun mapPromise f nil p = ( p ?= nil; future p )
  | mapPromise f (x::xs) p =
      let
         val px = promise()
      in
         p ?= (f x)::(future px);
         mapPromise f xs px;
         future p
      end;

fun barrier ps =
   let
      fun barrierLoop (nil, n) = n
        | barrierLoop (f::fs, n) =
            let
               val m = promise()
            in
               spawn ( f(); m ?= n );
               barrierLoop(fs, future m)
            end
      val s = promise()
   in
      spawn s ?= barrierLoop(ps, ());
      await s
   end

(* 5.1 The message-passing concurrent model *)
(* Note: This is what the Oz functions NewPort & Send look like in Alice *)
fun newPort s =
   let
      val c = Channel.channel()
   in
      s ?= Channel.toList c;
      c
   end
val send = Channel.put

(* Note: Will be using Alice Channel library the rest of the chapter *)
val c = Channel.channel()
val s = Channel.toList(c);

Channel.put(c, "a");
Channel.put(c, "b");

List.take(s, 2);
inspect s;

(* 5.2 Port objects *)
let
   val c = Channel.channel()
   val s = Channel.toList(c)
in
   spawn forall (s) inspect;
   Channel.put(c, "hi")
end;

(* 5.2.1 Port objects - The newPortObject abstraction *)
fun newChannelObject (f, init) =
   let
      val c = Channel.channel()
      val s = Channel.toList(c)
   in
      spawn foldl f init s;
      c
   end

fun newChannelObject2 f =
   let
      val c = Channel.channel()
      val s = Channel.toList(c)
   in
      spawn forall s f;
      c
   end

(* 5.2.2 Port objects - An example *)
fun player others name =
   let
      fun swing msg =
         let
            val ran = (Random.int (List.length others))
         in
            inspect ("Player #" ^ name);
            Thread.sleep(Time.fromMilliseconds(Int.toLarge(1000)));
            Channel.put(List.nth(others, ran), msg)
         end
   in
      newChannelObject2 swing
   end

val p1 = promise()
val p2 = promise()
val p3 = promise();
p1 ?= player [future p2, future p3] "1";
p2 ?= player [future p1, future p3] "2";
p3 ?= player [future p1, future p2] "3";
(* Channel.put(future p1, "ball"); *)      (* uncomment this to play game *)

(* 5.3.1 Simple message protocols - RMI (Remote Method Invocation) *)
datatype 'a procs = Calc of 'a * 'a promise
                  | Work of 'a promise

fun serverProc (Calc(x, y)) = ( y ?= x*x + 2.0*x + 2.0; future y )
  | serverProc _ = raise Domain

val server = newChannelObject2 serverProc

fun clientProc (Work(y)) =
      let
         val y1 = promise()
         val y2 = promise()
      in
         Channel.put(server, Calc(10.0, y1));
         await y1;
         Channel.put(server, Calc(20.0, y2));
         await y2;
         y ?= (future y1) + (future y2)
      end
  | clientProc _ = raise Domain

val client = newChannelObject2 clientProc

val y = promise();
Channel.put(client, Work(y));
inspect (future y);

(* 5.3.2 Simple message protocols - Asynchronous RMI *)
fun clientProc (Work(y)) =
      let
         val y1 = promise()
         val y2 = promise()
      in
         Channel.put(server, Calc(10.0, y1));
         Channel.put(server, Calc(20.0, y2));
         y ?= (future y1) + (future y2)
      end
  | clientProc _ = raise Domain

val client = newChannelObject2 clientProc

val y = promise();
Channel.put(client, Work(y));
inspect (future y);

(* 5.3.3 Simple message protocols - RMI with callback (using thread) *)
datatype 'a procs = Calc of 'a * 'a promise * 'a procs Channel.channel
                  | Work of 'a promise * 'a procs Channel.channel
                  | Delta of 'a promise

fun serverProc (Calc(x, y, client)) =
      let
         val x1 = promise()
         val d = promise()
      in
         Channel.put(client, Delta(d));
         x1 ?= x + (future d);
         y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0
      end
  | serverProc _ = raise Domain

val server = newChannelObject2 serverProc

(* deadlock version
   fun clientProc (Work(z, client)) =
         let
            val y = promise()
         in
            Channel.put(server, Calc(10.0, y, client));
            z ?= (future y) + 100.0
         end
     | clientProc (Delta(d)) = ( d ?= 1.0 )
     | clientProc _ = raise Domain

   val client = newChannelObject2 clientProc

   val z = promise();
   Channel.put(client, Work(z, client));
   inspect (future z);
*)

(* non-deadlock version *)
fun clientProc (Work(z, client)) =
      let
         val y = promise()
      in
         Channel.put(server, Calc(10.0, y, client));
         z ?= (spawn (future y) + 100.0)
      end
  | clientProc (Delta(d)) = d ?= 1.0
  | clientProc _ = raise Domain

val client = newChannelObject2 clientProc

val z = promise();
Channel.put(client, Work(z, client));
await z;
inspect (future z);

(* 5.3.4 Simple message protocols - RMI with callback (using record continuation) *)
datatype 'a procs = Calc of 'a * 'a promise * 'a procs Channel.channel
                  | Work of 'a promise * 'a procs Channel.channel
                  | Delta of 'a promise
                  | Cont of 'a * 'a promise

fun serverProc (Calc(x, z, client)) =
      let
         val x1 = promise()
         val d = promise()
         val y = promise()
      in
         Channel.put(client, Delta(d));
         x1 ?= x + (future d);
         y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0;
         Channel.put(client, Cont(future y, z))
      end
  | serverProc _ = raise Domain

val server = newChannelObject2 serverProc

fun clientProc (Work(z, client)) = Channel.put(server, Calc(10.0, z, client))
  | clientProc (Cont(y, z))      = z ?= y + 100.0
  | clientProc (Delta(d))        = d ?= 1.0
  | clientProc _                 = raise Domain

val client = newChannelObject2 clientProc

val z = promise();
Channel.put(client, Work(z, client));
await z;
inspect (future z);

(* 5.3.5 Simple message protocols - RMI with callback (using procedure continuation) *)
datatype 'a procs = Calc of 'a * ('a -> unit) * 'a procs Channel.channel
                  | Work of 'a promise * 'a procs Channel.channel
                  | Delta of 'a promise
                  | Cont of 'a * ('a -> unit)

fun serverProc (Calc(x, c, client)) =
      let
         val x1 = promise()
         val d = promise()
         val y = promise()
      in
         Channel.put(client, Delta(d));
         x1 ?= x + (future d);
         y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0;
         Channel.put(client, Cont(future y, c))
      end
  | serverProc _ = raise Domain

val server = newChannelObject2 serverProc

fun clientProc (Work(z, client)) =
      let
         fun c y = z ?= y + 100.0
      in
         Channel.put(server, Calc(10.0, c, client))
      end
  | clientProc (Cont(y, c)) = c y
  | clientProc (Delta(d)) = d ?= 1.0
  | clientProc _ = raise Domain

val client = newChannelObject2 clientProc

val z = promise();
Channel.put(client, Work(z, client));
await z;
inspect (future z);

(* 5.3.6 Simple message protocols - Error reporting *)
datatype results = Normal
                 | Failure of exn
datatype 'a procs = Sqrt of 'a * 'a promise * results promise

fun serverProc (Sqrt(x, y, e)) =
   let
   in
      y ?= Math.sqrt x;
      if (Real.isNan(future y))
         then raise Domain
         else e ?= Normal
   end
   handle failure => e ?= Failure(failure)

val server = newChannelObject2 serverProc;

let
   val x = 25.0
   val y = promise()
   val e = promise()
in
   inspect (future y, future e);
   Channel.put(server, Sqrt(x, y, e));
   case (future e) of
       Normal => ()
     | Failure(exc) => raise exc
end;

(* 5.3.7 Simple message protocols - Asynchronous RMI with callback *)
datatype 'a procs = Calc of 'a * 'a promise * 'a procs Channel.channel
                  | Work of 'a promise * 'a procs Channel.channel
                  | Delta of 'a promise

fun serverProc (Calc(x, y, client)) =
      let
         val x1 = promise()
         val d = promise()
      in
         Channel.put(client, Delta(d));
         spawn (
            x1 ?= x + (future d);
            y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0)
      end
  | serverProc _ = raise Domain

val server = newChannelObject2 serverProc

fun clientProc (Work(y, client)) =
      let
         val y1 = promise()
         val y2 = promise()
      in
         Channel.put(server, Calc(10.0, y1, client));
         Channel.put(server, Calc(20.0, y2, client));
         spawn y ?= (future y1) + (future y2)
      end
  | clientProc (Delta(d)) = d ?= 1.0
  | clientProc _ = raise Domain

val client = newChannelObject2 clientProc

val y = promise();
Channel.put(client, Work(y, client));
inspect (future y);

(* 5.3.8 Simple message protocols - Double callbacks *)
datatype 'a procs = Calc of 'a * 'a promise * 'a procs Channel.channel
                  | Work of 'a promise * 'a procs Channel.channel
                  | Delta of 'a promise
                  | ServerDelta of 'a promise

fun serverProc (Calc(x, y, client)) =
      let
         val x1 = promise()
         val d = promise()
      in
         Channel.put(client, Delta(d));
         spawn (
            x1 ?= x + (future d);
            y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0)
      end
  | serverProc (ServerDelta(s)) = s ?= 0.01
  | serverProc _ = raise Domain

val server = newChannelObject2 serverProc

fun clientProc (Work(z, client)) =
      let
         val y = promise()
      in
         Channel.put(server, Calc(10.0, y, client));
         spawn z ?= (future y) + 100.0
      end
  | clientProc (Delta(d)) =
      let
         val s = promise()
      in
         Channel.put(server, ServerDelta(s));
         d ?= 1.0 + (future s)
      end
  | clientProc _ = raise Domain

val client = newChannelObject2 clientProc

val y = promise();
Channel.put(client, Work(y, client));
inspect (future y);

(* 5.4.1 Program design for concurrency - Programming with concurrent components *)
fun notG (xs, c) =
   let
      fun notLoop (nil, c)   = ()
        | notLoop (x::xs, c) =
            let
            in
               Channel.put(c, (1-x));
               notLoop(xs, c)
            end
   in
      spawn notLoop(xs, c)
   end

fun gateMaker f =
   fn (xs, ys, c) =>
      let
         fun gateLoop (nil, _, c)       = ()
           | gateLoop (_, nil, c)       = ()
           | gateLoop (x::xr, y::yr, c) =
               let
               in
                  Channel.put(c, f(x, y));
                  gateLoop(xr, yr, c)
               end
      in
         spawn gateLoop(xs, ys, c)
      end

val andG    = gateMaker (fn (x, y) => x*y)
val orG     = gateMaker (fn (x, y) => x+y-x*y)
val nandG   = gateMaker (fn (x, y) => 1-x*y)
val norG    = gateMaker (fn (x, y) => 1-x-y+x*y)
val xorG    = gateMaker (fn (x, y) => x+y-2*x*y)

fun delayG (xs, c) = Channel.push(c, 0)

fun latch (c, di, d0) =
   let
      val x = Channel.channel()
      val y = Channel.channel()
      val z = Channel.channel()
      val f = Channel.channel()
   in
      delayG(Channel.toList d0, f);
      andG(Channel.toList f, Channel.toList c, x);
      notG(Channel.toList c, z);
      andG(Channel.toList z, Channel.toList di, y);
      orG(Channel.toList x, Channel.toList y, d0)
   end

(* 5.4.2 Program design for concurrency - Functional building blocks as concurrency patterns *)
datatype 'a query = Query of 'a * 'a promise

val cl = []
val foo = 1.0

val al = map
   (fn c =>
      let
         val ans = promise()
      in
         Channel.put(c, Query(foo, ans));
         future ans
      end)
   cl

fun max(x, y) : real =
   if (x >= y) then x else y

val m = foldl max 0.0 al

(* 5.5.2 Lift control system - Implementation *)
datatype statefloor        = FloorCalled
                           | FloorNotCalled
                           | FloorCall
                           | FloorDoorsOpen of unit promise
                           | FloorArrive of unit promise
                           | FloorStopTimer
                           | FloorState of statefloor

datatype statelift         = LiftRunning
                           | LiftStopped
                           | LiftCall of int
                           | LiftAt of int
                           | LiftState of int * int list * bool

datatype statecontroller   = ControllerStep of int
                           | ControllerStopTimer
                           | ControllerState of statelift * int * statelift Channel.channel promise

datatype statetimer        = TimerStartController of int * statecontroller Channel.channel promise
                           | TimerStartFloor of int * statefloor Channel.channel promise


val timer =
   newChannelObject2 (
      fn (TimerStartController(t, pid)) => spawn
         let in
            Thread.sleep(Time.fromMilliseconds(Int.toLarge t));
            Channel.put(future pid, ControllerStopTimer)
         end
       | (TimerStartFloor(t, pid)) => spawn
         let in
            Thread.sleep(Time.fromMilliseconds(Int.toLarge t));
            Channel.put(future pid, FloorStopTimer)
         end)

fun controller init =
   let
      val tid = timer
      val cid = promise()
   in
      cid ?=
         newChannelObject(
            fn (ControllerStopTimer, ControllerState(LiftRunning, f, lid)) =>
               let in
                  Channel.put(future lid, LiftAt(f));
                  ControllerState(LiftStopped, f, lid)
               end
             | (ControllerStep(dest), ControllerState(LiftStopped, f, lid)) if (f = dest) =>
               let in
                  ControllerState(LiftStopped, f, lid)
               end
             | (ControllerStep(dest), ControllerState(LiftStopped, f, lid)) if (f < dest) =>
               let in
                  Channel.put(tid, TimerStartController(5000, cid));
                  ControllerState(LiftRunning, f+1, lid)
               end
             | (ControllerStep(dest), ControllerState(LiftStopped, f, lid)) =>
               let in
                  Channel.put(tid, TimerStartController(5000, cid));
                  ControllerState(LiftRunning, f-1, lid)
               end
             | _ => raise Domain,
            init);
      future cid
   end

fun floor (num, init, lifts) =
   let
      val tid = timer
      val fid = promise()
   in
      fid ?=
         newChannelObject(
            fn (FloorArrive(ack), FloorState(FloorNotCalled)) =>
               let in
                  inspect ("Lift at floor " ^ Int.toString(num) ^ ": open doors");
                  Channel.put(tid, TimerStartFloor(5000, fid));
                  FloorState(FloorDoorsOpen(ack))
               end
             | (FloorCall, FloorState(FloorNotCalled)) =>
               let
                  val lnum = Random.int (List.length (future lifts))
               in
                  inspect ("Floor " ^ Int.toString(num) ^ " calls a lift " ^ Int.toString(lnum+1) ^ "!");
                  Channel.put(List.nth(future lifts, lnum), LiftCall(num));
                  FloorState(FloorCalled)
               end
             | (FloorArrive(ack), FloorState(FloorCalled)) =>
               let in
                  inspect ("Lift at floor " ^ Int.toString(num) ^ ": open doors");
                  Channel.put(tid, TimerStartFloor(5000, fid));
                  FloorState(FloorDoorsOpen(ack))
               end
             | (FloorCall, FloorState(FloorCalled)) =>
               let in
                  FloorState(FloorCalled)
               end
             | (FloorStopTimer, FloorState(FloorDoorsOpen(ack))) =>
               let in
                  inspect ("Lift at floor " ^ Int.toString(num) ^ ": close doors");
                  ack ?= ();
                  FloorState(FloorNotCalled)
               end
             | (FloorArrive(a), FloorState(FloorDoorsOpen(ack))) =>
               let in
                  a ?= future ack;
                  FloorState(FloorDoorsOpen(ack))
               end
             | (FloorCall, FloorState(FloorDoorsOpen(ack))) =>
               let in
                  FloorState(FloorDoorsOpen(ack))
               end
             | _ => raise Domain,
            init);
      future fid
   end

fun scheduleLast (xs, n) =
   if (null(xs)) andalso ((List.length xs)-1 = n)
      then xs
      else xs @ [n]

fun lift (num, init, cid, floors) =
   let
      val lid = promise()
   in
      lid ?=
         newChannelObject(
            fn (LiftCall(n), LiftState(pos, sched, false)) if (n = pos) =>
               let
                  val ack = promise()
               in
                  inspect ("Lift " ^ Int.toString(num) ^ " needed at floor " ^ Int.toString(n));
                  Channel.put(List.nth(future floors, pos-1), FloorArrive(ack));
                  await ack;
                  LiftState(pos, sched, false)
               end
             | (LiftCall(n), LiftState(pos, sched, false)) =>
               let
                  val sched2 = scheduleLast(sched, n)
               in
                  inspect ("Lift " ^ Int.toString(num) ^ " needed at floor " ^ Int.toString(n));
                  Channel.put(future cid, ControllerStep(n));
                  LiftState(pos, sched2, true)
               end
             | (LiftCall(n), LiftState(pos, sched, true)) =>
               let
                  val sched2 = scheduleLast(sched, n)
               in
                  inspect ("Lift " ^ Int.toString(num) ^ " needed at floor " ^ Int.toString(n));
                  LiftState(pos, sched2, true)
               end

             | (LiftAt(newpos), LiftState(pos, s::nil, moving)) if (newpos = s) =>
               let
                  val ack = promise()
               in
                  inspect ("Lift " ^ Int.toString(num) ^ " at floor " ^ Int.toString(newpos));
                  Channel.put(List.nth(future floors, s-1), FloorArrive(ack));
                  await ack;
                  LiftState(newpos, nil, false)
               end
             | (LiftAt(newpos), LiftState(pos, s::sched2, moving)) if (newpos = s) =>
               let
                  val ack = promise()
               in
                  inspect ("Lift " ^ Int.toString(num) ^ " at floor " ^ Int.toString(newpos));
                  Channel.put(List.nth(future floors, s-1), FloorArrive(ack));
                  await ack;
                  Channel.put(future cid, ControllerStep(hd sched2));
                  LiftState(newpos, sched2, true)
               end
             | (LiftAt(newpos), LiftState(pos, sched, moving)) =>
               let in
                  inspect ("Lift " ^ Int.toString(num) ^ " at floor " ^ Int.toString(newpos));
                  Channel.put(future cid, ControllerStep(hd sched));
                  LiftState(newpos, sched, moving)
               end
             | _ => raise Domain,
            init);
      future lid
   end

fun building (fnum, lnum, floors, lifts) =
   let
      fun newLifts (i, m) if (i > m) = nil
        | newLifts (i, m) =
         let
            val lid = promise()
            val cid = promise()
         in
            cid ?= controller(ControllerState(LiftStopped, 1, lid));
            lid ?= lift(i, LiftState(1, nil, false), cid, floors);
            (future lid)::newLifts(i+1, m)
         end
      fun newFloors (i, m) if (i > m) = nil
        | newFloors (i, m) =
         let
            val fid = floor(i, FloorState(FloorNotCalled), lifts)
         in
            fid::newFloors(i+1, m)
         end
   in
      lifts ?= newLifts(1, lnum);
      floors ?= newFloors(1, fnum)
   end;

let
   val floors = promise()
   val lifts = promise()
in
   building(10, 2, floors, lifts);
   Channel.put(List.nth(future floors,  9-1), FloorCall);
   Channel.put(List.nth(future floors, 10-1), FloorCall);
   (* delay here to keep sequence in order *)
   Thread.sleep(Time.fromMilliseconds(Int.toLarge 2000));
   Channel.put(List.nth(future lifts, 1-1), LiftCall(4));
   Channel.put(List.nth(future lifts, 2-1), LiftCall(5))
end;

(* 5.5.3 Lift control system - Improvements to the lift control system *)
fun liftShaft (i, LiftState(f, s, m), floors) =
      let
         val cid = promise()
         val lid = promise()
      in
         cid ?= controller(ControllerState(LiftStopped, f, lid));
         lid ?= lift(i, LiftState(f, s, m), cid, floors);
         future lid
      end
  | liftShaft _ = raise Domain

fun building (fnum, lnum, floors, lifts) =
   let
      fun newLifts (i, m) if (i > m) = nil
        | newLifts (i, m) =
         let
            val lid = promise()
            val cid = promise()
         in
            lid ?= liftShaft(i, LiftState(1, nil, false), floors);
            (future lid)::newLifts(i+1, m)
         end
      fun newFloors (i, m) if (i > m) = nil
        | newFloors (i, m) =
         let
            val fid = floor(i, FloorState(FloorNotCalled), lifts)
         in
            fid::newFloors(i+1, m)
         end
   in
      lifts ?= newLifts(1, lnum);
      floors ?= newFloors(1, fnum)
   end

(* 5.6.1 Using the message-passing model directly - Port objects that share one thread *)
datatype ('a, 'b, 'c) sharechannel = Add of 'a * 'b * unit promise
                                   | Msg of 'a * 'c

fun newChannelObjects () =
   let
      val c = Channel.channel()
      val sin = Channel.toList(c)

      fun lookup (i, (m, proc)::xs) if (i = i) = proc
        | lookup (i, (m, proc)::xs) = lookup(i, xs)
        | lookup (i, nil) = raise Empty

      fun msgLoop (Add(i, proc, sync)::s2, procs) =
            let
            in
               sync ?= ();
               msgLoop(s2, (i, proc)::procs)
            end
        | msgLoop (Msg(i, m)::s2, procs) =
            let
            in
               lookup(i, procs)(m) handle _ => ();
               msgLoop(s2, procs)
            end
        | msgLoop (nil, procs) = ()

      fun addChannelObject (i, proc) =
         let
            val sync = promise()
         in
            Channel.put(c, Add(i, proc, sync));
            await sync
         end

      fun call (i, m) =
         let in
            Channel.put(c, Msg(i, m))
         end
   in
      spawn msgLoop(sin, nil);
      (addChannelObject, call)
   end

   datatype pingpong = Ping of int
                     | Pong of int

   datatype object = PingObj
                   | PongObj

   val (addChannelObject, call) = newChannelObjects()

   fun pingpongProc other (Ping n) =
         let
         in
            inspect ("ping(" ^ Int.toString(n) ^ ")");
            Thread.sleep(Time.fromMilliseconds(Int.toLarge 2000));
            call(other, Pong(n+1))
         end
     | pingpongProc other (Pong n) =
         let
         in
            inspect ("pong(" ^ Int.toString(n) ^ ")");
            Thread.sleep(Time.fromMilliseconds(Int.toLarge 2000));
            call(other, Ping(n+1))
         end;

   addChannelObject(PingObj, pingpongProc(PongObj));
   addChannelObject(PongObj, pingpongProc(PingObj));
   (* call(PingObj, Ping 0); *)        (* uncomment this to play game *)

(* 5.6.2 Using the message-passing model directly - A concurrent queue with ports *)

(* skipping over the naive implementation *)

datatype 'a queue = Queue of { put:'a, get:'a }

fun newQueue () =
   let
      val giveChannel = Channel.channel()
      val given = Channel.toList giveChannel
      val takeChannel = Channel.channel()
      val taken = Channel.toList takeChannel
      fun match (nil, _) = nil
        | match (_, nil) = nil
        | match (x::xs, y::ys) =
            let
               val z = awaitEither(x, y)
            in
               if (isFulfilled x)
                  then y ?= future x
                  else x ?= future y;
               match(xs, ys)
            end
   in
      spawn match(given, taken);
      Queue{
         put = fn x => ( Channel.put(giveChannel, x); future x ),
         get = fn x => ( Channel.put(takeChannel, x); future x )
      }
   end;

val q = newQueue()
val Queue{put, get} = q;

put(known(1));
inspect (get(promise()));
inspect (get(promise()));
inspect (get(promise()));
put(known(2));
put(known(3));

val x = promise();
put(known(4));
await x;
get x;

val x = promise();
get(x);
await x;
put(known(5));

(* 5.6.3 Using the message-passing model directly - A thread abstraction with termination detection *)

fun newThread (p, subThread) =
   let
      val c = Channel.channel()
      val pis = Channel.toList c
      fun zeroExit (n, i::ir) if (n+i <> 0) = zeroExit(n+i, ir)
        | zeroExit _ = ()
   in
      subThread ?= (fn x =>
         let
            val _ = Channel.put(c, 1)
         in
            spawn ( p(); Channel.put(c, ~1) )
         end);

      (future subThread)(p);
      zeroExit(0, pis)
   end

fun newSChannel (s, ssend) =
   let
      val c = Channel.channel()
      val s1 = Channel.toList c
      fun sSend m =
         let
            val x = promise()
         in
            Channel.put(c, (m, x));
            await x
         end
   in
      spawn s ?= mapPromise (fn (m, x) => ( x ?= (); m)) s1 s
   end

(* 5.6.4 Using the message-passing model directly - Eliminating sequential dependencies *)
fun filterPromise f nil p = ( p ?= nil; future p )
  | filterPromise f (x::xs) p =
      if (f x)
         then
            let
               val px = promise()
            in
               p ?= (future x)::(future px);
               filterPromise f xs px;
               future p
            end
         else (filterPromise f xs p)

val p = promise()
val a = promise()
val b = promise();
(* filterPromise (fn x => x > 2) [future a, 5, 1, future b, 4, 0, 6] p; *)

(* Unable to get correct translation for concFilter *)
(*
fun newPortClose (s, send, close) =
   let
      val pc = ref s
   in
      send ?= (fn m =>
         let
            val s = promise()
         in
            Ref.exchange(pc, m::(future s))
         end);
      close ?= (fn () => pc := ref nil)
   end
fun concFilter f xs ys p =
   let
      val send = promise()
      val close = promise()
   in
      newPortClose(ys, send, close);
      barrier (mapPromise (fn x => if (f x) then ((future send) x) else nil) xs p);
      close()
   end
*)

(* 5.7.2 The Erlang language - Introduction to Erlang programming *)

fun lazy factorial 0            = 1
  |      factorial n if (n > 0) = n * factorial(n-1)
  |      factorial n            = raise Domain

datatype 'a shape = Square of 'a
                  | Rectangle of 'a * 'a
                  | Circle of 'a
                  | Triangle of 'a * 'a * 'a

fun area (Square (side))      = side * side
  | area (Rectangle (x, y))   = x * y
  | area (Circle (radius))    = 3.14159 * radius * radius
  | area (Triangle (a, b, c)) =
      let
         val s = (a + b + c) / 2.0
      in
         Math.sqrt(s * (s-a)*(s-b)*(s-c))
      end

datatype 'a msg = Msg of 'a promise * 'a shape
fun start () =
   let
      val areaServer = Channel.channel()
      val s = Channel.toList(areaServer)
   in
      spawn forall s (fn (Msg(ans, shape)) => ans ?= area(shape));
      areaServer
   end

val pid = start()
val ans = promise();
Channel.put(pid, Msg(ans, Square(3.4)));
await ans;

(* 5.7.3 The Erlang language - The receive operation *)

(* Section does not use complete examples.  May return here later. *)

(* 5.8.1 Advanced topic - The non-deterministic concurrent model *)

fun streamMerger (m::s1, s2, p) =
      let
         val px = promise()
      in
         p ?= m::(future px);
         streamMerger (s1, s2, px)
      end
  | streamMerger (s1, m::s2, p) =
      let
         val px = promise()
      in
         p ?= m::(future px);
         streamMerger (s1, s2, px)
      end
  | streamMerger (nil, s2, p) = p ?= s2
  | streamMerger (s1, nil, p) = p ?= s1
  | streamMerger (nil, nil, p) = p ?= nil

fun streamMerger (xs, ys, p) =
   let
      val ps = promise()
   in
      case awaitEither(xs, ys)
         of Alt.FST (x::xr) => ( p ?= x::(future ps); streamMerger(xr, ys, ps) )
         |  Alt.SND (y::yr) => ( p ?= y::(future ps); streamMerger(xs, yr, ps) )
         |  Alt.FST nil     => p ?= ys
         |  Alt.SND nil     => p ?= xs
   end;

let
   val xc = Channel.channel()
   val yc = Channel.channel()
   val xs = Channel.toList xc
   val ys = Channel.toList yc
   val p = promise()
in
   inspect p;
   spawn streamMerger(xs, ys, p);
   Channel.put(xc, "a");
   Channel.put(yc, "b");
   Channel.put(xc, "c");
   Channel.put(yc, "d")
end;

(* Future.awaitEither is the Alice equivalent of the WaitTwo function *)




Chris Rathman / Chris.Rathman@tx.rr.com