(* CTM Chapter #11 Examples in Alice ML *)
import structure Remote from "x-alice:/lib/distribution/Remote"
import structure Channel from "x-alice:/lib/data/Channel"
(* syntactic sugar for solutions using promises/futures *)
open Promise
open Future
infix 3 ?=
val op?= = fulfill
val ? = future
(* Functions defined in previous chapters *)
fun known x =
let
val p = promise()
in
fulfill(p, x); p
end
fun forall nil f = ()
| forall (x::xs) f = (f x; forall xs f)
fun for a b s f =
let
fun loopup c where (c <= b) = (f c; loopup (c+s))
| loopup c = ()
fun loopdown c where (c >= b) = (f c; loopdown (c+s))
| loopdown c = ()
in
if (s > 0)
then loopup a
else
if (s < 0)
then loopdown a
else ()
end
(* 11.3.1 Distribution of declarative data - Open distribution and global naming *)
(* Alice offer corresponds to Oz offerUnlimited *)
val ticket = Remote.offer (pack (val x="hello") : (val x:string))
structure X = (unpack (Remote.take ticket) : (val x:string))
val x = X.x
(* 11.3.2 Distribution of declarative data - Sharing declarative data *)
(* sharing records *)
type novel = {text:string, author:string, year:int}
val x = {text="It was a dark and stormy night. ...", author="E.G.E. Bulwer-Lytton", year=1803}
val p = pack (val r=x) : (val r:novel)
val ticket = Remote.offer p;
val p = Remote.take ticket
structure X = unpack p : (val r:novel)
val x = X.r
(* sharing functions *)
(* copy the function *)
fun myEncoder x = (x*4449+1234) mod 33667
val p = pack (val f=myEncoder) : (val f:int->int)
val ticket = Remote.offer p
val p = Remote.take ticket
structure X = unpack p : (val f:int->int)
val myEncoder = X.f;
inspect (myEncoder 1);
(* proxy the function *)
fun myEncoder x = (x*4449+1234) mod 33667
val p = pack (val f=Remote.proxy(myEncoder)) : (val f:int->int)
val ticket = Remote.offer p
val p = Remote.take ticket
structure X = unpack p : (val f:int->int)
val myEncoder = X.f;
inspect (myEncoder 1);
(* proxy the structure containing the function *)
fun myEncoder x = (x*4449+1234) mod 33667
signature ENCODER = (val f:int->int)
structure Encoder = (val f=myEncoder)
structure ProxyEncoder = Remote.Proxy(signature S = ENCODER structure X=Encoder)
val p = pack (ProxyEncoder) : ENCODER
val ticket = Remote.offer p
val p = Remote.take ticket
structure X = unpack p : (val f:int->int)
val myEncoder = X.f;
inspect (myEncoder 1);
(* end sharing functions *)
(* sharing dataflow variables *)
val x = promise()
val p = pack (val r=x) : (val r:int promise)
(* Note: Alice requires futures to be fulfilled prior to being offered (Hole.Hole exception) *)
val _ = fulfill(x, 1111)
val ticket = Remote.offer p
val p = Remote.take ticket
structure X = unpack p : (val r:int promise)
val x = X.r
(* promises can be offered *)
val px = promise()
val p = pack (val pfulfill = Remote.proxy(fn n => fulfill(px, n))) : (val pfulfill : int -> unit)
val ticket = Remote.offer p;
inspect px;
val p = Remote.take ticket
structure X = unpack p : (val pfulfill : int -> unit);
X.pfulfill(1234);
(* 11.3.3 Distribution of declarative data - Ticket Distribution *)
fun myEncoder x = (x*4449+1234) mod 33667;
Pickle.save("encoder."^Pickle.extension, pack (val f=myEncoder) : (val f:int->int));
structure X = unpack Pickle.load("encoder."^Pickle.extension) : (val f:int->int)
val myEncoder = X.f;
inspect (myEncoder 1);
(* Not sure how you'd pass the signature across, so I'll cheat *)
fun offer (x, fname) =
Pickle.save(fname, pack (val f=x) : (val f:int->int));
fun take fname =
let
structure X = unpack Pickle.load(fname) : (val f:int->int);
in
X.f
end;
offer(myEncoder, "encoder."^Pickle.extension);
val myEncoder = take("encoder."^Pickle.extension);
inspect (myEncoder 1);
(* 11.3.4 Distribution of declarative data - Stream communication *)
(* Eager stream communication *)
fun sum (nil, a) = a
| sum (x::xs, a) = sum(xs, a+x);
fun generate (n, limit, p) where (n >= limit) =
( p ?= nil; future p )
| generate (n, limit, p) =
let
val px = promise()
in
p ?= n::(future px);
generate(n+1, limit, px);
future p
end
val ticket = Remote.offer(pack (val f=Remote.proxy(sum)) : (val f:int list * int->int))
structure X = unpack (Remote.take ticket) : (val f:int list * int->int)
val xs = promise()
val s = X.f(generate(0, 1500, xs), 0)
(* Lazy stream communication *)
(* Need to proxy the sum function but that causes program to enter infinite loop on the lazy function *)
fun sum (_, a, limit) where (limit <= 0) = a
| sum (x::xs, a, limit) = sum(xs, a+x, limit-1)
| sum (nil, _, _) = raise Empty
val ticket = Remote.offer(pack (val f=sum) : (val f:int list * int * int->int))
fun lazy generate n = n::generate(n+1)
structure X = unpack (Remote.take ticket) : (val f:int list * int * int->int)
val s = X.f(generate 0, 0, 1500)
(* Andreas Rossberg came up with the following solution *)
datatype 'a stream = Stream of 'a * (unit -> 'a stream)
fun sum (_, a, limit) where (limit <= 0) = a
| sum (Stream(x, xs), a, limit) = sum(xs(), a+x, limit-1)
val ticket = Remote.offer(pack (val f=Remote.proxy(sum)) : (val f:int stream * int * int->int))
fun generate n = Stream(n, fn () => generate(n+1))
structure X = unpack (Remote.take ticket) : (val f:int stream * int * int->int)
val s = X.f(generate 0, 0, 1500)
(* End Lazy stream communication *)
(* Ports and servers *)
val c = Channel.channel();
spawn forall (Channel.toList(c)) inspect;
val p = pack (val rput=Remote.proxy(fn x => Channel.put(c, x))) : (val rput:string->unit)
val ticket = Remote.offer(p)
structure X = unpack (Remote.take ticket) : (val rput:string->unit);
X.rput("hello");
X.rput("keep in touch");
(* 11.4.1 Distribution of state - Simple state sharing *)
(* Alice doesn't do distributed state, so use proxied accessor functions *)
signature RSTATE =
sig
type t
val set : t->unit
val get : unit->t
end
structure RState :> (RSTATE where type t=int) =
struct
type t = int
val a = ref 0
fun set x = ( Ref.exchange(a, x); () )
fun get () = !a
end
val p = pack (Remote.Proxy(signature S=RSTATE structure X=RState)) : RSTATE
val ticket = Remote.offer p;
RState.set(RState.get() + 1);
structure X = unpack (Remote.take ticket) : (RSTATE where type t = int);
X.set(X.get() + 1);
inspect (RState.get(), X.get());
(* Distributed locking *)
fun correctSimpleLock () =
let
val token = ref (known(()))
fun lock f =
let
val new = promise()
val old = Ref.exchange(token, new)
in
await old;
f() handle e => ( new ?= (); raise e );
new ?= ()
end
in
Remote.proxy lock
end
(* Sharing objects and other data types *)
signature CODER =
sig
val seed : int ref
val init : int -> unit
val get : unit -> int
end
functor Coder (val seed:int) :> CODER =
struct
val seed = ref seed
fun init x = seed := x
fun get () =
let in
seed := IntInf.toInt((IntInf.fromInt(!seed) *
IntInf.fromInt(1234 * 4449)) mod IntInf.fromInt(33667));
!seed
end
end
structure C = Coder(val seed=100)
val p = pack (Remote.Proxy(signature S=CODER structure X=C)) : CODER
val ticket = Remote.offer p
structure C2 = unpack (Remote.take ticket) : CODER;
inspect (C2.get(), C2.get());
(* 11.4.2 Distribution of state - Distributed lexical scoping *)
signature RSTATE =
sig
val inc : unit->int
end
structure RState :> RSTATE =
struct
val a = ref 0
fun inc () = ( Ref.exchange(a, !a+1); !a )
end
val p = pack (Remote.Proxy(signature S=RSTATE structure X=RState)) : RSTATE
val ticket = Remote.offer p;
RState.inc();
val p2 = pack (Remote.Proxy(signature S=RSTATE structure X=RState)) : RSTATE
val ticket2 = Remote.offer p2;
RState.inc();
structure X = unpack (Remote.take ticket) : RSTATE;
inspect (X.inc());
structure Y = unpack (Remote.take ticket2) : RSTATE;
inspect (Y.inc());
val p3 = pack (Remote.Proxy(signature S=RSTATE structure X=Y)) : RSTATE
val ticket3 = Remote.offer p3;
structure Z = unpack (Remote.take ticket3) : RSTATE;
inspect (Z.inc());
(* 11.6.1 Common distributed programming patterns - Stationary and mobile objects *)
(* Note: NewState not applicable for static typing languages - skipping for now (also 7.8.2) *)
structure C = Coder(val seed=100)
val p = pack (Remote.Proxy(signature S=CODER structure X=C)) : CODER
val ticket = Remote.offer p
structure C2 = unpack (Remote.take ticket) : CODER
val a = C2.get();
inspect a;
(* 11.6.1 Common distributed programming patterns - Asynchronous objects and dataflow *)
structure R = Coder(val seed=100)
val p = pack (Remote.Proxy(signature S=CODER structure X=R)) : CODER
val ticket = Remote.offer p
structure X = unpack (Remote.take ticket) : CODER;
inspect (X.get(), X.get());
|