(* CTM Chapter #05 Examples in Alice ML *)
import structure Channel from "x-alice:/lib/data/Channel"
import structure Random from "x-alice:/lib/utility/Random"
(* syntactic sugar for solutions using promises/futures *)
open Promise
open Future
infix 3 ?=
val op?= = fulfill
val ? = future
(* Functions defined in previous chapters *)
fun forall nil f = ()
| forall (x::xs) f = (f x; forall xs f);
fun known x =
let
val p = promise()
in
fulfill(p, x);
p
end
fun mapPromise f nil p = ( p ?= nil; future p )
| mapPromise f (x::xs) p =
let
val px = promise()
in
p ?= (f x)::(future px);
mapPromise f xs px;
future p
end;
fun barrier ps =
let
fun barrierLoop (nil, n) = n
| barrierLoop (f::fs, n) =
let
val m = promise()
in
spawn ( f(); m ?= n );
barrierLoop(fs, future m)
end
val s = promise()
in
spawn s ?= barrierLoop(ps, ());
await s
end
(* 5.1 The message-passing concurrent model *)
(* Note: This is what the Oz functions NewPort & Send look like in Alice *)
fun newPort s =
let
val c = Channel.channel()
in
s ?= Channel.toList c;
c
end
val send = Channel.put
(* Note: Will be using Alice Channel library the rest of the chapter *)
val c = Channel.channel()
val s = Channel.toList(c);
Channel.put(c, "a");
Channel.put(c, "b");
List.take(s, 2);
inspect s;
(* 5.2 Port objects *)
let
val c = Channel.channel()
val s = Channel.toList(c)
in
spawn forall (s) inspect;
Channel.put(c, "hi")
end;
(* 5.2.1 Port objects - The newPortObject abstraction *)
fun newChannelObject (f, init) =
let
val c = Channel.channel()
val s = Channel.toList(c)
in
spawn foldl f init s;
c
end
fun newChannelObject2 f =
let
val c = Channel.channel()
val s = Channel.toList(c)
in
spawn forall s f;
c
end
(* 5.2.2 Port objects - An example *)
fun player others name =
let
fun swing msg =
let
val ran = (Random.int (List.length others))
in
inspect ("Player #" ^ name);
Thread.sleep(Time.fromMilliseconds(Int.toLarge(1000)));
Channel.put(List.nth(others, ran), msg)
end
in
newChannelObject2 swing
end
val p1 = promise()
val p2 = promise()
val p3 = promise();
p1 ?= player [future p2, future p3] "1";
p2 ?= player [future p1, future p3] "2";
p3 ?= player [future p1, future p2] "3";
(* Channel.put(future p1, "ball"); *) (* uncomment this to play game *)
(* 5.3.1 Simple message protocols - RMI (Remote Method Invocation) *)
datatype 'a procs = Calc of 'a * 'a promise
| Work of 'a promise
fun serverProc (Calc(x, y)) = ( y ?= x*x + 2.0*x + 2.0; future y )
| serverProc _ = raise Domain
val server = newChannelObject2 serverProc
fun clientProc (Work(y)) =
let
val y1 = promise()
val y2 = promise()
in
Channel.put(server, Calc(10.0, y1));
await y1;
Channel.put(server, Calc(20.0, y2));
await y2;
y ?= (future y1) + (future y2)
end
| clientProc _ = raise Domain
val client = newChannelObject2 clientProc
val y = promise();
Channel.put(client, Work(y));
inspect (future y);
(* 5.3.2 Simple message protocols - Asynchronous RMI *)
fun clientProc (Work(y)) =
let
val y1 = promise()
val y2 = promise()
in
Channel.put(server, Calc(10.0, y1));
Channel.put(server, Calc(20.0, y2));
y ?= (future y1) + (future y2)
end
| clientProc _ = raise Domain
val client = newChannelObject2 clientProc
val y = promise();
Channel.put(client, Work(y));
inspect (future y);
(* 5.3.3 Simple message protocols - RMI with callback (using thread) *)
datatype 'a procs = Calc of 'a * 'a promise * 'a procs Channel.channel
| Work of 'a promise * 'a procs Channel.channel
| Delta of 'a promise
fun serverProc (Calc(x, y, client)) =
let
val x1 = promise()
val d = promise()
in
Channel.put(client, Delta(d));
x1 ?= x + (future d);
y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0
end
| serverProc _ = raise Domain
val server = newChannelObject2 serverProc
(* deadlock version
fun clientProc (Work(z, client)) =
let
val y = promise()
in
Channel.put(server, Calc(10.0, y, client));
z ?= (future y) + 100.0
end
| clientProc (Delta(d)) = ( d ?= 1.0 )
| clientProc _ = raise Domain
val client = newChannelObject2 clientProc
val z = promise();
Channel.put(client, Work(z, client));
inspect (future z);
*)
(* non-deadlock version *)
fun clientProc (Work(z, client)) =
let
val y = promise()
in
Channel.put(server, Calc(10.0, y, client));
z ?= (spawn (future y) + 100.0)
end
| clientProc (Delta(d)) = d ?= 1.0
| clientProc _ = raise Domain
val client = newChannelObject2 clientProc
val z = promise();
Channel.put(client, Work(z, client));
await z;
inspect (future z);
(* 5.3.4 Simple message protocols - RMI with callback (using record continuation) *)
datatype 'a procs = Calc of 'a * 'a promise * 'a procs Channel.channel
| Work of 'a promise * 'a procs Channel.channel
| Delta of 'a promise
| Cont of 'a * 'a promise
fun serverProc (Calc(x, z, client)) =
let
val x1 = promise()
val d = promise()
val y = promise()
in
Channel.put(client, Delta(d));
x1 ?= x + (future d);
y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0;
Channel.put(client, Cont(future y, z))
end
| serverProc _ = raise Domain
val server = newChannelObject2 serverProc
fun clientProc (Work(z, client)) = Channel.put(server, Calc(10.0, z, client))
| clientProc (Cont(y, z)) = z ?= y + 100.0
| clientProc (Delta(d)) = d ?= 1.0
| clientProc _ = raise Domain
val client = newChannelObject2 clientProc
val z = promise();
Channel.put(client, Work(z, client));
await z;
inspect (future z);
(* 5.3.5 Simple message protocols - RMI with callback (using procedure continuation) *)
datatype 'a procs = Calc of 'a * ('a -> unit) * 'a procs Channel.channel
| Work of 'a promise * 'a procs Channel.channel
| Delta of 'a promise
| Cont of 'a * ('a -> unit)
fun serverProc (Calc(x, c, client)) =
let
val x1 = promise()
val d = promise()
val y = promise()
in
Channel.put(client, Delta(d));
x1 ?= x + (future d);
y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0;
Channel.put(client, Cont(future y, c))
end
| serverProc _ = raise Domain
val server = newChannelObject2 serverProc
fun clientProc (Work(z, client)) =
let
fun c y = z ?= y + 100.0
in
Channel.put(server, Calc(10.0, c, client))
end
| clientProc (Cont(y, c)) = c y
| clientProc (Delta(d)) = d ?= 1.0
| clientProc _ = raise Domain
val client = newChannelObject2 clientProc
val z = promise();
Channel.put(client, Work(z, client));
await z;
inspect (future z);
(* 5.3.6 Simple message protocols - Error reporting *)
datatype results = Normal
| Failure of exn
datatype 'a procs = Sqrt of 'a * 'a promise * results promise
fun serverProc (Sqrt(x, y, e)) =
let
in
y ?= Math.sqrt x;
if (Real.isNan(future y))
then raise Domain
else e ?= Normal
end
handle failure => e ?= Failure(failure)
val server = newChannelObject2 serverProc;
let
val x = 25.0
val y = promise()
val e = promise()
in
inspect (future y, future e);
Channel.put(server, Sqrt(x, y, e));
case (future e) of
Normal => ()
| Failure(exc) => raise exc
end;
(* 5.3.7 Simple message protocols - Asynchronous RMI with callback *)
datatype 'a procs = Calc of 'a * 'a promise * 'a procs Channel.channel
| Work of 'a promise * 'a procs Channel.channel
| Delta of 'a promise
fun serverProc (Calc(x, y, client)) =
let
val x1 = promise()
val d = promise()
in
Channel.put(client, Delta(d));
spawn (
x1 ?= x + (future d);
y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0)
end
| serverProc _ = raise Domain
val server = newChannelObject2 serverProc
fun clientProc (Work(y, client)) =
let
val y1 = promise()
val y2 = promise()
in
Channel.put(server, Calc(10.0, y1, client));
Channel.put(server, Calc(20.0, y2, client));
spawn y ?= (future y1) + (future y2)
end
| clientProc (Delta(d)) = d ?= 1.0
| clientProc _ = raise Domain
val client = newChannelObject2 clientProc
val y = promise();
Channel.put(client, Work(y, client));
inspect (future y);
(* 5.3.8 Simple message protocols - Double callbacks *)
datatype 'a procs = Calc of 'a * 'a promise * 'a procs Channel.channel
| Work of 'a promise * 'a procs Channel.channel
| Delta of 'a promise
| ServerDelta of 'a promise
fun serverProc (Calc(x, y, client)) =
let
val x1 = promise()
val d = promise()
in
Channel.put(client, Delta(d));
spawn (
x1 ?= x + (future d);
y ?= (future x1)*(future x1) + 2.0*(future x1) + 2.0)
end
| serverProc (ServerDelta(s)) = s ?= 0.01
| serverProc _ = raise Domain
val server = newChannelObject2 serverProc
fun clientProc (Work(z, client)) =
let
val y = promise()
in
Channel.put(server, Calc(10.0, y, client));
spawn z ?= (future y) + 100.0
end
| clientProc (Delta(d)) =
let
val s = promise()
in
Channel.put(server, ServerDelta(s));
d ?= 1.0 + (future s)
end
| clientProc _ = raise Domain
val client = newChannelObject2 clientProc
val y = promise();
Channel.put(client, Work(y, client));
inspect (future y);
(* 5.4.1 Program design for concurrency - Programming with concurrent components *)
fun notG (xs, c) =
let
fun notLoop (nil, c) = ()
| notLoop (x::xs, c) =
let
in
Channel.put(c, (1-x));
notLoop(xs, c)
end
in
spawn notLoop(xs, c)
end
fun gateMaker f =
fn (xs, ys, c) =>
let
fun gateLoop (nil, _, c) = ()
| gateLoop (_, nil, c) = ()
| gateLoop (x::xr, y::yr, c) =
let
in
Channel.put(c, f(x, y));
gateLoop(xr, yr, c)
end
in
spawn gateLoop(xs, ys, c)
end
val andG = gateMaker (fn (x, y) => x*y)
val orG = gateMaker (fn (x, y) => x+y-x*y)
val nandG = gateMaker (fn (x, y) => 1-x*y)
val norG = gateMaker (fn (x, y) => 1-x-y+x*y)
val xorG = gateMaker (fn (x, y) => x+y-2*x*y)
fun delayG (xs, c) = Channel.push(c, 0)
fun latch (c, di, d0) =
let
val x = Channel.channel()
val y = Channel.channel()
val z = Channel.channel()
val f = Channel.channel()
in
delayG(Channel.toList d0, f);
andG(Channel.toList f, Channel.toList c, x);
notG(Channel.toList c, z);
andG(Channel.toList z, Channel.toList di, y);
orG(Channel.toList x, Channel.toList y, d0)
end
(* 5.4.2 Program design for concurrency - Functional building blocks as concurrency patterns *)
datatype 'a query = Query of 'a * 'a promise
val cl = []
val foo = 1.0
val al = map
(fn c =>
let
val ans = promise()
in
Channel.put(c, Query(foo, ans));
future ans
end)
cl
fun max(x, y) : real =
if (x >= y) then x else y
val m = foldl max 0.0 al
(* 5.5.2 Lift control system - Implementation *)
datatype statefloor = FloorCalled
| FloorNotCalled
| FloorCall
| FloorDoorsOpen of unit promise
| FloorArrive of unit promise
| FloorStopTimer
| FloorState of statefloor
datatype statelift = LiftRunning
| LiftStopped
| LiftCall of int
| LiftAt of int
| LiftState of int * int list * bool
datatype statecontroller = ControllerStep of int
| ControllerStopTimer
| ControllerState of statelift * int * statelift Channel.channel promise
datatype statetimer = TimerStartController of int * statecontroller Channel.channel promise
| TimerStartFloor of int * statefloor Channel.channel promise
val timer =
newChannelObject2 (
fn (TimerStartController(t, pid)) => spawn
let in
Thread.sleep(Time.fromMilliseconds(Int.toLarge t));
Channel.put(future pid, ControllerStopTimer)
end
| (TimerStartFloor(t, pid)) => spawn
let in
Thread.sleep(Time.fromMilliseconds(Int.toLarge t));
Channel.put(future pid, FloorStopTimer)
end)
fun controller init =
let
val tid = timer
val cid = promise()
in
cid ?=
newChannelObject(
fn (ControllerStopTimer, ControllerState(LiftRunning, f, lid)) =>
let in
Channel.put(future lid, LiftAt(f));
ControllerState(LiftStopped, f, lid)
end
| (ControllerStep(dest), ControllerState(LiftStopped, f, lid)) if (f = dest) =>
let in
ControllerState(LiftStopped, f, lid)
end
| (ControllerStep(dest), ControllerState(LiftStopped, f, lid)) if (f < dest) =>
let in
Channel.put(tid, TimerStartController(5000, cid));
ControllerState(LiftRunning, f+1, lid)
end
| (ControllerStep(dest), ControllerState(LiftStopped, f, lid)) =>
let in
Channel.put(tid, TimerStartController(5000, cid));
ControllerState(LiftRunning, f-1, lid)
end
| _ => raise Domain,
init);
future cid
end
fun floor (num, init, lifts) =
let
val tid = timer
val fid = promise()
in
fid ?=
newChannelObject(
fn (FloorArrive(ack), FloorState(FloorNotCalled)) =>
let in
inspect ("Lift at floor " ^ Int.toString(num) ^ ": open doors");
Channel.put(tid, TimerStartFloor(5000, fid));
FloorState(FloorDoorsOpen(ack))
end
| (FloorCall, FloorState(FloorNotCalled)) =>
let
val lnum = Random.int (List.length (future lifts))
in
inspect ("Floor " ^ Int.toString(num) ^ " calls a lift " ^ Int.toString(lnum+1) ^ "!");
Channel.put(List.nth(future lifts, lnum), LiftCall(num));
FloorState(FloorCalled)
end
| (FloorArrive(ack), FloorState(FloorCalled)) =>
let in
inspect ("Lift at floor " ^ Int.toString(num) ^ ": open doors");
Channel.put(tid, TimerStartFloor(5000, fid));
FloorState(FloorDoorsOpen(ack))
end
| (FloorCall, FloorState(FloorCalled)) =>
let in
FloorState(FloorCalled)
end
| (FloorStopTimer, FloorState(FloorDoorsOpen(ack))) =>
let in
inspect ("Lift at floor " ^ Int.toString(num) ^ ": close doors");
ack ?= ();
FloorState(FloorNotCalled)
end
| (FloorArrive(a), FloorState(FloorDoorsOpen(ack))) =>
let in
a ?= future ack;
FloorState(FloorDoorsOpen(ack))
end
| (FloorCall, FloorState(FloorDoorsOpen(ack))) =>
let in
FloorState(FloorDoorsOpen(ack))
end
| _ => raise Domain,
init);
future fid
end
fun scheduleLast (xs, n) =
if (null(xs)) andalso ((List.length xs)-1 = n)
then xs
else xs @ [n]
fun lift (num, init, cid, floors) =
let
val lid = promise()
in
lid ?=
newChannelObject(
fn (LiftCall(n), LiftState(pos, sched, false)) if (n = pos) =>
let
val ack = promise()
in
inspect ("Lift " ^ Int.toString(num) ^ " needed at floor " ^ Int.toString(n));
Channel.put(List.nth(future floors, pos-1), FloorArrive(ack));
await ack;
LiftState(pos, sched, false)
end
| (LiftCall(n), LiftState(pos, sched, false)) =>
let
val sched2 = scheduleLast(sched, n)
in
inspect ("Lift " ^ Int.toString(num) ^ " needed at floor " ^ Int.toString(n));
Channel.put(future cid, ControllerStep(n));
LiftState(pos, sched2, true)
end
| (LiftCall(n), LiftState(pos, sched, true)) =>
let
val sched2 = scheduleLast(sched, n)
in
inspect ("Lift " ^ Int.toString(num) ^ " needed at floor " ^ Int.toString(n));
LiftState(pos, sched2, true)
end
| (LiftAt(newpos), LiftState(pos, s::nil, moving)) if (newpos = s) =>
let
val ack = promise()
in
inspect ("Lift " ^ Int.toString(num) ^ " at floor " ^ Int.toString(newpos));
Channel.put(List.nth(future floors, s-1), FloorArrive(ack));
await ack;
LiftState(newpos, nil, false)
end
| (LiftAt(newpos), LiftState(pos, s::sched2, moving)) if (newpos = s) =>
let
val ack = promise()
in
inspect ("Lift " ^ Int.toString(num) ^ " at floor " ^ Int.toString(newpos));
Channel.put(List.nth(future floors, s-1), FloorArrive(ack));
await ack;
Channel.put(future cid, ControllerStep(hd sched2));
LiftState(newpos, sched2, true)
end
| (LiftAt(newpos), LiftState(pos, sched, moving)) =>
let in
inspect ("Lift " ^ Int.toString(num) ^ " at floor " ^ Int.toString(newpos));
Channel.put(future cid, ControllerStep(hd sched));
LiftState(newpos, sched, moving)
end
| _ => raise Domain,
init);
future lid
end
fun building (fnum, lnum, floors, lifts) =
let
fun newLifts (i, m) if (i > m) = nil
| newLifts (i, m) =
let
val lid = promise()
val cid = promise()
in
cid ?= controller(ControllerState(LiftStopped, 1, lid));
lid ?= lift(i, LiftState(1, nil, false), cid, floors);
(future lid)::newLifts(i+1, m)
end
fun newFloors (i, m) if (i > m) = nil
| newFloors (i, m) =
let
val fid = floor(i, FloorState(FloorNotCalled), lifts)
in
fid::newFloors(i+1, m)
end
in
lifts ?= newLifts(1, lnum);
floors ?= newFloors(1, fnum)
end;
let
val floors = promise()
val lifts = promise()
in
building(10, 2, floors, lifts);
Channel.put(List.nth(future floors, 9-1), FloorCall);
Channel.put(List.nth(future floors, 10-1), FloorCall);
(* delay here to keep sequence in order *)
Thread.sleep(Time.fromMilliseconds(Int.toLarge 2000));
Channel.put(List.nth(future lifts, 1-1), LiftCall(4));
Channel.put(List.nth(future lifts, 2-1), LiftCall(5))
end;
(* 5.5.3 Lift control system - Improvements to the lift control system *)
fun liftShaft (i, LiftState(f, s, m), floors) =
let
val cid = promise()
val lid = promise()
in
cid ?= controller(ControllerState(LiftStopped, f, lid));
lid ?= lift(i, LiftState(f, s, m), cid, floors);
future lid
end
| liftShaft _ = raise Domain
fun building (fnum, lnum, floors, lifts) =
let
fun newLifts (i, m) if (i > m) = nil
| newLifts (i, m) =
let
val lid = promise()
val cid = promise()
in
lid ?= liftShaft(i, LiftState(1, nil, false), floors);
(future lid)::newLifts(i+1, m)
end
fun newFloors (i, m) if (i > m) = nil
| newFloors (i, m) =
let
val fid = floor(i, FloorState(FloorNotCalled), lifts)
in
fid::newFloors(i+1, m)
end
in
lifts ?= newLifts(1, lnum);
floors ?= newFloors(1, fnum)
end
(* 5.6.1 Using the message-passing model directly - Port objects that share one thread *)
datatype ('a, 'b, 'c) sharechannel = Add of 'a * 'b * unit promise
| Msg of 'a * 'c
fun newChannelObjects () =
let
val c = Channel.channel()
val sin = Channel.toList(c)
fun lookup (i, (m, proc)::xs) if (i = i) = proc
| lookup (i, (m, proc)::xs) = lookup(i, xs)
| lookup (i, nil) = raise Empty
fun msgLoop (Add(i, proc, sync)::s2, procs) =
let
in
sync ?= ();
msgLoop(s2, (i, proc)::procs)
end
| msgLoop (Msg(i, m)::s2, procs) =
let
in
lookup(i, procs)(m) handle _ => ();
msgLoop(s2, procs)
end
| msgLoop (nil, procs) = ()
fun addChannelObject (i, proc) =
let
val sync = promise()
in
Channel.put(c, Add(i, proc, sync));
await sync
end
fun call (i, m) =
let in
Channel.put(c, Msg(i, m))
end
in
spawn msgLoop(sin, nil);
(addChannelObject, call)
end
datatype pingpong = Ping of int
| Pong of int
datatype object = PingObj
| PongObj
val (addChannelObject, call) = newChannelObjects()
fun pingpongProc other (Ping n) =
let
in
inspect ("ping(" ^ Int.toString(n) ^ ")");
Thread.sleep(Time.fromMilliseconds(Int.toLarge 2000));
call(other, Pong(n+1))
end
| pingpongProc other (Pong n) =
let
in
inspect ("pong(" ^ Int.toString(n) ^ ")");
Thread.sleep(Time.fromMilliseconds(Int.toLarge 2000));
call(other, Ping(n+1))
end;
addChannelObject(PingObj, pingpongProc(PongObj));
addChannelObject(PongObj, pingpongProc(PingObj));
(* call(PingObj, Ping 0); *) (* uncomment this to play game *)
(* 5.6.2 Using the message-passing model directly - A concurrent queue with ports *)
(* skipping over the naive implementation *)
datatype 'a queue = Queue of { put:'a, get:'a }
fun newQueue () =
let
val giveChannel = Channel.channel()
val given = Channel.toList giveChannel
val takeChannel = Channel.channel()
val taken = Channel.toList takeChannel
fun match (nil, _) = nil
| match (_, nil) = nil
| match (x::xs, y::ys) =
let
val z = awaitEither(x, y)
in
if (isFulfilled x)
then y ?= future x
else x ?= future y;
match(xs, ys)
end
in
spawn match(given, taken);
Queue{
put = fn x => ( Channel.put(giveChannel, x); future x ),
get = fn x => ( Channel.put(takeChannel, x); future x )
}
end;
val q = newQueue()
val Queue{put, get} = q;
put(known(1));
inspect (get(promise()));
inspect (get(promise()));
inspect (get(promise()));
put(known(2));
put(known(3));
val x = promise();
put(known(4));
await x;
get x;
val x = promise();
get(x);
await x;
put(known(5));
(* 5.6.3 Using the message-passing model directly - A thread abstraction with termination detection *)
fun newThread (p, subThread) =
let
val c = Channel.channel()
val pis = Channel.toList c
fun zeroExit (n, i::ir) if (n+i <> 0) = zeroExit(n+i, ir)
| zeroExit _ = ()
in
subThread ?= (fn x =>
let
val _ = Channel.put(c, 1)
in
spawn ( p(); Channel.put(c, ~1) )
end);
(future subThread)(p);
zeroExit(0, pis)
end
fun newSChannel (s, ssend) =
let
val c = Channel.channel()
val s1 = Channel.toList c
fun sSend m =
let
val x = promise()
in
Channel.put(c, (m, x));
await x
end
in
spawn s ?= mapPromise (fn (m, x) => ( x ?= (); m)) s1 s
end
(* 5.6.4 Using the message-passing model directly - Eliminating sequential dependencies *)
fun filterPromise f nil p = ( p ?= nil; future p )
| filterPromise f (x::xs) p =
if (f x)
then
let
val px = promise()
in
p ?= (future x)::(future px);
filterPromise f xs px;
future p
end
else (filterPromise f xs p)
val p = promise()
val a = promise()
val b = promise();
(* filterPromise (fn x => x > 2) [future a, 5, 1, future b, 4, 0, 6] p; *)
(* Unable to get correct translation for concFilter *)
(*
fun newPortClose (s, send, close) =
let
val pc = ref s
in
send ?= (fn m =>
let
val s = promise()
in
Ref.exchange(pc, m::(future s))
end);
close ?= (fn () => pc := ref nil)
end
fun concFilter f xs ys p =
let
val send = promise()
val close = promise()
in
newPortClose(ys, send, close);
barrier (mapPromise (fn x => if (f x) then ((future send) x) else nil) xs p);
close()
end
*)
(* 5.7.2 The Erlang language - Introduction to Erlang programming *)
fun lazy factorial 0 = 1
| factorial n if (n > 0) = n * factorial(n-1)
| factorial n = raise Domain
datatype 'a shape = Square of 'a
| Rectangle of 'a * 'a
| Circle of 'a
| Triangle of 'a * 'a * 'a
fun area (Square (side)) = side * side
| area (Rectangle (x, y)) = x * y
| area (Circle (radius)) = 3.14159 * radius * radius
| area (Triangle (a, b, c)) =
let
val s = (a + b + c) / 2.0
in
Math.sqrt(s * (s-a)*(s-b)*(s-c))
end
datatype 'a msg = Msg of 'a promise * 'a shape
fun start () =
let
val areaServer = Channel.channel()
val s = Channel.toList(areaServer)
in
spawn forall s (fn (Msg(ans, shape)) => ans ?= area(shape));
areaServer
end
val pid = start()
val ans = promise();
Channel.put(pid, Msg(ans, Square(3.4)));
await ans;
(* 5.7.3 The Erlang language - The receive operation *)
(* Section does not use complete examples. May return here later. *)
(* 5.8.1 Advanced topic - The non-deterministic concurrent model *)
fun streamMerger (m::s1, s2, p) =
let
val px = promise()
in
p ?= m::(future px);
streamMerger (s1, s2, px)
end
| streamMerger (s1, m::s2, p) =
let
val px = promise()
in
p ?= m::(future px);
streamMerger (s1, s2, px)
end
| streamMerger (nil, s2, p) = p ?= s2
| streamMerger (s1, nil, p) = p ?= s1
| streamMerger (nil, nil, p) = p ?= nil
fun streamMerger (xs, ys, p) =
let
val ps = promise()
in
case awaitEither(xs, ys)
of Alt.FST (x::xr) => ( p ?= x::(future ps); streamMerger(xr, ys, ps) )
| Alt.SND (y::yr) => ( p ?= y::(future ps); streamMerger(xs, yr, ps) )
| Alt.FST nil => p ?= ys
| Alt.SND nil => p ?= xs
end;
let
val xc = Channel.channel()
val yc = Channel.channel()
val xs = Channel.toList xc
val ys = Channel.toList yc
val p = promise()
in
inspect p;
spawn streamMerger(xs, ys, p);
Channel.put(xc, "a");
Channel.put(yc, "b");
Channel.put(xc, "c");
Channel.put(yc, "d")
end;
(* Future.awaitEither is the Alice equivalent of the WaitTwo function *)
|