(* CTM Chapter #08 Examples in Alice ML *)
import functor MkRedBlackImpMap from "x-alice:/lib/data/MkRedBlackImpMap"
import structure GlobalStamp from "x-alice:/lib/data/GlobalStamp"
import structure Random from "x-alice:/lib/utility/Random"
import structure Gtk from "x-alice:/lib/gtk/Gtk"
import structure Canvas from "x-alice:/lib/gtk/Canvas"
(* syntactic sugar for solutions using promises/futures *)
open Promise
open Future
infix 3 ?=
val op?= = fulfill
val ? = future
infix 3 ::=
val op ::= = Gtk.Prop.prop
(* Functions defined in previous chapters *)
fun known x =
let
val p = promise()
in
fulfill(p, x); p
end
fun forall nil f = ()
| forall (x::xs) f = (f x; forall xs f);
fun for a b s f =
let
fun loopup c where (c <= b) = (f c; loopup (c+s))
| loopup c = ()
fun loopdown c where (c >= b) = (f c; loopdown (c+s))
| loopdown c = ()
in
if (s > 0)
then loopup a
else
if (s < 0)
then loopdown a
else ()
end;
(* 8.2.2 Programming with concurrency - Using the shared-state model directly *)
fun newStack () =
let
val stack = ref nil
fun push x =
let
val _ = Ref.exchange(stack, x::(!stack))
in
()
end
fun pop () =
let
val s = Ref.exchange(stack, tl (!stack))
in
hd s
end
handle failure => raise Empty
in
{push=push, pop=pop}
end
val {push, pop} = newStack()
val _ = push(123)
val _ = push(456)
val _ = inspect (pop())
val _ = inspect (pop())
(* Alternative implementation using functors *)
signature STACK =
sig
type t
val push : t -> unit
val pop : unit -> t
end
functor Stack (type t) :> (STACK where type t=t) =
struct
type t = t
val stack = ref nil
fun push x =
let
val _ = Ref.exchange(stack, x::(!stack))
in
()
end
fun pop () =
let
val s = Ref.exchange(stack, tl (!stack))
in
hd s
end
handle failure => raise Empty
end
structure S = Stack(type t = int)
val _ = S.push(123)
val _ = S.push(456)
val _ = inspect (S.pop())
val _ = inspect (S.pop())
(* End Alternative implementation using functors *)
fun slowNet1 (obj, d) =
fn m => spawn
let
val _ = Thread.sleep(Time.fromMilliseconds(Int.toLarge(d)))
in
obj(m)
end
val {push, pop} = newStack()
val push = slowNet1(push, 1000)
val pop = slowNet1(pop, 1000)
val _ = push(123)
val _ = push(456)
val _ = inspect (pop())
val _ = inspect (pop())
fun slowNet2 (obj, d) =
let
val c = ref ()
in
fn m =>
let
val new = promise()
val old = Ref.exchange(c, future new);
in
spawn
let
val _ = Thread.sleep(Time.fromMilliseconds(Int.toLarge(d)))
val x = promise()
in
await old;
x ?= obj(m);
new ?= ();
future x
end
end
end
val {push, pop} = newStack()
val push = slowNet2(push, 1000)
val pop = slowNet2(pop, 1000)
val _ = push(123)
val _ = push(456)
val _ = inspect (pop())
val _ = inspect (pop())
(* 8.3.1 Locks - Building stateful concurrent data abstractions *)
(* Queue - Declarative version *)
datatype 'a queue = Queue of int * 'a promise list promise * 'a promise list promise
fun newQueue () =
let
val p = promise()
in
Queue(0, p, p)
end
fun insertQ (Queue(n, s, e), x) where (n >= 0) =
let
val e1 = promise()
val px = promise()
in
px ?= future x;
e ?= px::(future e1);
Queue(n+1, s, e1)
end
| insertQ (Queue(n, s, e), x) where (n = ~1) =
let in
hd(future e) ?= future x;
Queue(n+1, s, s)
end
| insertQ (Queue(n, s, e), x) =
let
val e1 = promise()
val px = hd(future e)
in
hd(future e) ?= future x;
e1 ?= tl(future e);
Queue(n+1, s, e1)
end
fun deleteQ (Queue(n, s, e), px) where (n <= 0) =
let
val s1 = promise()
in
s ?= px::(future s1);
Queue(n-1, s1, e)
end
| deleteQ (Queue(n, s, e), px) where (n = 1) =
let in
px ?= future (hd(future s));
Queue(n-1, e, e)
end
| deleteQ (Queue(n, s, e), px) =
let
val s1 = promise()
in
px ?= future (hd(future s));
s1 ?= tl(future s);
Queue(n-1, s1, e)
end
val p2 = promise()
val p3 = promise()
val p6 = promise();
inspect (future p2);
inspect (future p3);
inspect (future p6);
val q0 = newQueue();
val q1 = insertQ(q0, known("peter"));
val q2 = deleteQ(q1, p2);
val q3 = deleteQ(q2, p3);
val q4 = insertQ(q3, known("paul"));
val q5 = insertQ(q4, known("mary"));
val q6 = deleteQ(q5, p6);
(* Queue - Sequential stateful version *)
fun newQueue () =
let
val p = promise()
val c = ref (Queue(0, p, p))
fun insertQ x =
let
val Queue(n, s, e) = !c
in
if (n >= 0) then
let
val px = promise()
val e1 = promise()
in
px ?= future x;
e ?= px::(future e1);
c := Queue(n+1, s, e1)
end
else if (n = ~1) then
let in
hd(future e) ?= future x;
c := Queue(n+1, s, s)
end
else
let
val e1 = promise()
in
hd(future e) ?= future x;
e1 ?= tl(future e);
c := Queue(n+1, s, e1)
end
end
fun deleteQ () =
let
val Queue(n, s, e) = !c
val px = promise()
in
if (n <= 0) then
let
val s1 = promise()
in
s ?= px::(future s1);
c := Queue(n-1, s1, e)
end
else if (n = 1) then
let in
px ?= future (hd(future s));
c := Queue(n-1, e, e)
end
else
let
val s1 = promise()
in
px ?= future (hd(future s));
s1 ?= tl(future s);
c := Queue(n-1, s1, e)
end;
px
end
in
{insertQ=insertQ, deleteQ=deleteQ}
end
val {insertQ, deleteQ} = newQueue();
insertQ(known("peter"));
insertQ(known("paul"));
inspect (deleteQ());
inspect (deleteQ());
inspect (deleteQ());
insertQ(known("mary"));
(* Queue - Concurrent stateful version with lock *)
fun newQueue () =
let
val lock = Lock.lock()
val p = promise()
val c = ref (Queue(0, p, p))
fun insertQ x =
let
val e1 = promise()
in
Lock.sync lock (fn () =>
let
val Queue(n, s, e) = !c
in
if (n >= 0) then
let
val px = promise()
val e1 = promise()
in
px ?= future x;
e ?= px::(future e1);
c := Queue(n+1, s, e1)
end
else if (n = ~1) then
let in
hd(future e) ?= future x;
c := Queue(n+1, s, s)
end
else
let
val e1 = promise()
in
hd(future e) ?= future x;
e1 ?= tl(future e);
c := Queue(n+1, s, e1)
end
end)()
end
fun deleteQ () =
let
val px = promise()
in
Lock.sync lock (fn () =>
let
val Queue(n, s, e) = !c
in
if (n <= 0) then
let
val s1 = promise()
in
s ?= px::(future s1);
c := Queue(n-1, s1, e)
end
else if (n = 1) then
let in
px ?= future (hd(future s));
c := Queue(n-1, e, e)
end
else
let
val s1 = promise()
in
px ?= future (hd(future s));
s1 ?= tl(future s);
c := Queue(n-1, s1, e)
end
end)();
px
end
in
{insertQ=insertQ, deleteQ=deleteQ}
end
val {insertQ, deleteQ} = newQueue();
insertQ(known("peter"));
insertQ(known("paul"));
inspect (deleteQ());
inspect (deleteQ());
inspect (deleteQ());
insertQ(known("mary"));
(* Queue - Concurrent object-oriented version with lock *)
signature QUEUE =
sig
type t
val insertQ : t promise -> unit
val deleteQ : unit -> t promise
end
functor NewQueue (type t) :> (QUEUE where type t = t) =
struct
type t = t
val lock = Lock.lock()
val p = promise()
val c = ref (Queue(0, p, p))
fun insertQ x =
let
val e1 = promise()
in
Lock.sync lock (fn () =>
let
val Queue(n, s, e) = !c
in
if (n >= 0) then
let
val px = promise()
val e1 = promise()
in
px ?= future x;
e ?= px::(future e1);
c := Queue(n+1, s, e1)
end
else if (n = ~1) then
let in
hd(future e) ?= future x;
c := Queue(n+1, s, s)
end
else
let
val e1 = promise()
in
hd(future e) ?= future x;
e1 ?= tl(future e);
c := Queue(n+1, s, e1)
end
end)()
end
fun deleteQ () =
let
val px = promise()
in
Lock.sync lock (fn () =>
let
val Queue(n, s, e) = !c
in
if (n <= 0) then
let
val s1 = promise()
in
s ?= px::(future s1);
c := Queue(n-1, s1, e)
end
else if (n = 1) then
let in
px ?= future (hd(future s));
c := Queue(n-1, e, e)
end
else
let
val s1 = promise()
in
px ?= future (hd(future s));
s1 ?= tl(future s);
c := Queue(n-1, s1, e)
end
end)();
px
end
end
structure Q = NewQueue(type t = string);
Q.insertQ(known("peter"));
Q.insertQ(known("paul"));
inspect (Q.deleteQ());
inspect (Q.deleteQ());
inspect (Q.deleteQ());
Q.insertQ(known("mary"));
(* Queue - Concurrent stateful version with exchange *)
(* Note: the translation still has a problem when the buffer becomes empty *)
fun newQueue () =
let
val x = promise()
val c = ref (Queue(0, x, x))
fun insertQ x =
let
val n1 = promise()
val s1 = promise()
val e1 = promise()
val Queue(n, s, e) = Ref.exchange(c, Queue(future n1, s1, e1))
in
s1 ?= future s;
if (n >= 0)
then
let
val px = promise()
in
px ?= future x;
e ?= px::(future e1)
end
else
let
val px = hd(future e)
in
px ?= future x;
e1 ?= tl(future e)
end;
n1 ?= n + 1
end
fun deleteQ () =
let
val px = promise()
val n1 = promise()
val s1 = promise()
val e1 = promise()
val Queue(n, s, e) = Ref.exchange(c, Queue(future n1, s1, e1))
in
e1 ?= future e;
if (n <= 0)
then
let in
s ?= px::(future s1)
end
else
let in
px ?= future (hd(future s));
s1 ?= tl(future s)
end;
n1 ?= n - 1;
px
end
in
{insertQ=insertQ, deleteQ=deleteQ}
end
val {insertQ, deleteQ} = newQueue();
insertQ(known("peter"));
insertQ(known("paul"));
inspect (deleteQ());
inspect (deleteQ());
(* inspect (deleteQ());
insertQ(known("mary"); *)
(* 8.3.2 Locks - Tuple spaces ("Linda") *)
signature TUPLESPACE =
sig
type key = string
type tuple
val write : key * tuple -> unit
val read : key -> tuple
val readNonBlock : key -> tuple * bool
val sizeTupleSpace : key -> int
end
functor TupleSpace (type tuple) :> (TUPLESPACE where type tuple=tuple) =
struct
type key = string
type tuple = tuple
structure Dictionary = MkRedBlackImpMap String
val tupleDict = Dictionary.map()
val lock = Lock.lock()
fun ensurePresent lbl =
if not(Dictionary.member(tupleDict, lbl))
then Dictionary.insert(tupleDict, lbl, ref nil)
else ()
fun cleanup lbl =
if (Dictionary.member(tupleDict, lbl))
then
let
val ps = case Dictionary.lookup(tupleDict, lbl) of
NONE => raise Empty
| SOME px => px
in
if (null(!ps))
then Dictionary.remove(tupleDict, lbl)
else ()
end
else ()
fun write (lbl, tup:tuple) =
Lock.sync lock (fn () =>
let
val _ = ensurePresent lbl
val ps = case Dictionary.lookup(tupleDict, lbl) of
NONE => raise Empty
| SOME px => px
in
if (null(!ps))
then ps := [known tup]
else
let
val p = hd (!ps)
in
if (isFulfilled p)
then ps := !ps @ [known tup]
else p ?= tup
end
end)()
fun read lbl =
let
val tup = promise()
in
Lock.sync lock (fn () =>
let
val _ = ensurePresent lbl
val ps = case Dictionary.lookup(tupleDict, lbl) of
NONE => raise Empty
| SOME px => px
in
if (null(!ps))
then ps := [tup]
else
let in
tup ?= future(hd(!ps));
ps := tl(!ps)
end;
cleanup lbl
end)();
await (future tup);
future tup
end
fun readNonBlock lbl =
let
val tup = promise()
val b = promise()
in
Lock.sync lock (fn () =>
let
val _ = ensurePresent lbl
val ps = case Dictionary.lookup(tupleDict, lbl) of
NONE => raise Empty
| SOME px => px
in
if (null(!ps))
then b ?= false
else
let
val _ = b ?= true
in
tup ?= future(hd(!ps));
ps := tl(!ps)
end;
cleanup lbl
end)();
(future tup, future b)
end
fun sizeTupleSpace lbl = Lock.sync lock (fn () => Dictionary.size(tupleDict))()
end
structure Ts = TupleSpace(type tuple = int*int*int);
Ts.write("foo", (1, 2, 3));
inspect (Ts.read("foo"));
spawn inspect (Ts.read("foo"));
Ts.write("foo", (4, 5, 6));
Ts.write("foo", (7, 8, 9));
inspect (Ts.readNonBlock("foo"));
fun newQueue () =
let
val p = promise()
structure Ts = TupleSpace(type tuple = string queue)
val _ = Ts.write("Queue", Queue(0, p, p));
fun insertQ x =
let
val e1 = promise()
in
let
val Queue(n, s, e) = Ts.read("Queue")
in
if (n >= 0) then
let
val px = promise()
val e1 = promise()
in
px ?= future x;
e ?= px::(future e1);
Ts.write("Queue", Queue(n+1, s, e1))
end
else if (n = ~1) then
let in
hd(future e) ?= future x;
Ts.write("Queue", Queue(n+1, s, s))
end
else
let
val e1 = promise()
in
hd(future e) ?= future x;
e1 ?= tl(future e);
Ts.write("Queue", Queue(n+1, s, e1))
end
end
end
fun deleteQ () =
let
val px = promise()
in
let
val Queue(n, s, e) = Ts.read("Queue")
in
if (n <= 0) then
let
val s1 = promise()
in
s ?= px::(future s1);
Ts.write("Queue", Queue(n-1, s1, e))
end
else if (n = 1) then
let in
px ?= future (hd(future s));
Ts.write("Queue", Queue(n-1, e, e))
end
else
let
val s1 = promise()
in
px ?= future (hd(future s));
s1 ?= tl(future s);
Ts.write("Queue", Queue(n-1, s1, e))
end;
px
end
end
in
{insertQ=insertQ, deleteQ=deleteQ}
end
val {insertQ, deleteQ} = newQueue();
insertQ(known("peter"));
insertQ(known("paul"));
inspect (deleteQ());
inspect (deleteQ());
inspect (deleteQ());
insertQ(known("mary"));
(* 8.3.3 Locks - Implementing locks *)
fun simpleLock () =
let
val token = ref (known(()))
fun lock f =
let
val new = promise()
val old = Ref.exchange(token, new)
in
await old;
f();
new ?= ()
end
in
lock
end
fun correctSimpleLock () =
let
val token = ref (known(()))
fun lock f =
let
val new = promise()
val old = Ref.exchange(token, new)
in
await old;
f() handle e => ( new ?= (); raise e );
new ?= ()
end
in
lock
end
fun newLock () =
let
val token = ref (known(()))
val curThr = ref (promise())
fun lock f =
if (isFulfilled(!curThr)) andalso (Thread.current() = future(!curThr))
then f()
else
let
val new = promise()
val old = Ref.exchange(token, new)
in
await old;
curThr := known(Thread.current());
f() handle e => ( new ?= (); raise e );
new ?= ()
end
in
lock
end
(* 8.4.4 Monitors - Implementing Monitors *)
(* Note: Changed the order since I need the definition of Monitor for 8.4.1 thru 8.4.3 *)
fun newQueue () =
let
val lock = Lock.lock()
val p = promise()
val c = ref (Queue(0, p, p))
fun insertQ x =
let
val e1 = promise()
in
Lock.sync lock (fn () =>
let
val Queue(n, s, e) = !c
in
if (n >= 0) then
let
val px = promise()
val e1 = promise()
in
px ?= future x;
e ?= px::(future e1);
c := Queue(n+1, s, e1)
end
else if (n = ~1) then
let in
hd(future e) ?= future x;
c := Queue(n+1, s, s)
end
else
let
val e1 = promise()
in
hd(future e) ?= future x;
e1 ?= tl(future e);
c := Queue(n+1, s, e1)
end
end)()
end
fun deleteQ () =
let
val px = promise()
in
Lock.sync lock (fn () =>
let
val Queue(n, s, e) = !c
in
if (n <= 0) then
let
val s1 = promise()
in
s ?= px::(future s1);
c := Queue(n-1, s1, e)
end
else if (n = 1) then
let in
px ?= future (hd(future s));
c := Queue(n-1, e, e)
end
else
let
val s1 = promise()
in
px ?= future (hd(future s));
s1 ?= tl(future s);
c := Queue(n-1, s1, e)
end
end)();
px
end
fun sizeQ () =
Lock.sync lock (fn () =>
let
val Queue(n, _, _) = !c
in
n
end)()
fun deleteAll () =
Lock.sync lock (fn () =>
let
val x = promise()
val Queue(_, s, e) = !c
in
c := Queue(0, x, x);
e ?= nil;
future s
end)()
fun deleteNonBlock () =
Lock.sync lock (fn () =>
if (sizeQ() > 0)
then deleteQ()
else known(()))()
in
{insertQ=insertQ, deleteQ=deleteQ, sizeQ=sizeQ, deleteAll=deleteAll, deleteNonBlock=deleteNonBlock}
end
fun newGRLock () =
let
val token1 = ref (known(()))
val token2 = ref (known(()))
val curThr = ref (promise())
fun getLock () =
if (not(isFulfilled(!curThr))) orelse (Thread.current() <> future(!curThr))
then
let
val new = promise()
val old = Ref.exchange(token1, new)
in
await old;
token2 := new;
curThr := known(Thread.current())
end
else ()
fun releaseLock () =
let
in
curThr = ref (promise());
!token2 = known(())
end
in
{get=getLock, release=releaseLock}
end
fun newMonitor () =
let
val {insertQ, deleteQ, sizeQ, deleteAll, deleteNonBlock} = newQueue()
val {get, release} = newGRLock()
fun lockM f =
let in
get();
f() handle e => ( release(); raise e );
release()
end
fun waitM () =
let
val x = promise()
in
insertQ(x);
release();
await x;
get()
end
fun notifyM () =
let
val x = deleteNonBlock()
in
if (not(isFulfilled x))
then x ?= ()
else ()
end
fun notifyAllM () =
let
val xs = deleteAll()
in
forall xs (fn x => x ?= ())
end
in
{lockM=lockM, waitM=waitM, notifyM=notifyM, notifyAllM=notifyAllM}
end
(* 8.4.1 Monitors - Definition *)
val {lockM, waitM, notifyM, notifyAllM} = newMonitor()
(* 8.4.2 Monitors - Bounded buffer *)
functor Buffer (val n:int) =
struct
val {lockM, waitM, notifyM, notifyAllM} = newMonitor()
val buf = Array.array(n, promise())
val n = n
val i = ref 0
val first = ref 0
val last = ref 0
fun put x =
lockM (fn () =>
if (!i >= n)
then
let in
waitM();
put x;
()
end
else
let in
Array.update(buf, !last, x);
last := !last + 1;
i := !i + 1;
notifyAllM();
()
end)
fun get x =
lockM (fn () =>
if (!i = 0)
then
let in
waitM();
get x;
()
end
else
let in
x ?= Array.sub(buf, !first);
first := (!first+1) mod n;
i := !i - 1;
notifyAllM();
()
end)
end
(* 8.5.4 Transactions - Implementing transactions on cells *)
(* Note: Changed the order since I need the definition of Trans for 8.5.1 thru 8.5.3 *)
type keytype = GlobalStamp.t
fun newKeyName () = GlobalStamp.stamp()
structure Dictionary = MkRedBlackImpMap GlobalStamp
datatype unitype = UTstring of string
| UTint of int
| UTchar of char
| UTword of word
| UTreal of real
| UTlist of unitype list
| UTpair of unitype*unitype
| UTfun of unitype->unitype
| UTunit of unit;
type outtype = unitype
type intype = unitype
datatype statustype = Ok
| Halt
| Abort
| AbortExc
| Commit of outtype
datatype statetype = Running
| Probation
| WaitingOn of cellrec
| Reference of intype
and queuerec = QueueRec of { enqueue : (statustype promise * transrec) * int -> unit,
dequeue : unit -> (statustype promise * transrec),
delete : int -> (statustype promise * transrec),
isEmpty : unit -> bool }
and cellrec = CellRec of { name : keytype,
owner : transrec ref,
queue : queuerec,
stateC : statetype ref }
and saverec = SaveRec of { cell : cellrec,
stateS : statetype ref }
and bodyrec = BodyRec of { access : cellrec * statetype ref -> statetype ref,
assign : cellrec * statetype ref -> statetype ref,
exchange : cellrec * statetype ref * statetype ref -> statetype ref,
abort : unit -> unit }
and transrec = NullRec
| TransRec of { stamp : int,
save : saverec Dictionary.map,
body : bodyrec -> outtype,
stateT : statetype ref,
result : statustype promise }
(* Note: I'm not sure whether I've captured the essence of the transaction processing here.
Did the best I could and it has the appearance of working, but I had a hard time
following the exact nature of the example. Also, some of the Constructor labels could have
been dropped above to make the code simpler to work with, but I have a bias for naming
the data structures even when there is no intersection involved. Also, there's probably
a simpler way to translate the example in Alice, but I stuck to a fairly literal
translation of the Oz code *)
signature TMCLASS =
sig
val newtrans : (bodyrec -> outtype) * statustype promise -> unit
val getlock : transrec * cellrec * statustype promise -> unit
val savestate : transrec * cellrec * statustype promise -> unit
val commit : transrec -> unit
val abort : transrec -> unit
val newPrioQueue : unit -> queuerec
end
functor TMClass () :> TMCLASS =
struct
val timestamp = ref 0
fun unlockall (TransRec{save, ...}, restoreflag) =
forall (Dictionary.toList(save)) (
fn (key, SaveRec{cell, stateS}) =>
let
val CellRec{name, owner, queue, stateC} = cell
val QueueRec{dequeue, isEmpty, ...} = queue
in
owner := NullRec;
if restoreflag
then stateC := !stateS
else ();
if not(isEmpty())
then
let
val (sync2, t2) = dequeue()
val TransRec{stateT, ...} = t2
in
stateT := Running;
owner := t2;
sync2 ?= Ok
end
else ()
end)
| unlockall _ = raise Domain
fun savestate (TransRec{save, ...}, CellRec{name, stateC, ...} as c, sync) = (
if not(Dictionary.member(save, name))
then Dictionary.insert(save, name, SaveRec{cell=c, stateS=ref (!stateC)})
else ();
sync ?= Ok)
| savestate _ = raise Domain
fun commit t = unlockall(t, false)
fun abort t = unlockall(t, true)
fun trans(p, r, ts) =
let
exception Halted
val t = TransRec{ stamp = ts,
save = Dictionary.map(),
body = p,
stateT = ref Running,
result = r }
fun excT (c, x, y) =
let
val s1 = promise()
val s2 = promise()
val CellRec{stateC, ...} = c
in
getlock(t, c, s1);
if ((future s1) = Halt)
then raise Halted
else ();
savestate(t, c, s2);
await s2;
x := Ref.exchange(stateC, !y);
x
end
fun accT (c, x) =
let
val s1 = promise()
val s2 = promise()
val CellRec{stateC, ...} = c
in
getlock(t, c, s1);
if ((future s1) = Halt)
then raise Halted
else ();
savestate(t, c, s2);
await s2;
stateC
end
fun assT (c, x) = excT(c, ref Running, x)
fun aboT () = ( abort t; r ?= Abort; raise Halted )
in
spawn
let
val TransRec{body, ...} = t
val b = BodyRec{access=accT, assign=assT, exchange=excT, abort=aboT}
val res = body(b)
in
commit t;
r ?= Commit(res)
end
handle e =>
if (e <> Halted)
then ( abort t; r ?= AbortExc )
else ()
end
and getlock (TransRec{stamp, stateT, body, result, ...} as t, CellRec{owner, queue, ...} as c, sync) =
let
in
if (!stateT = Probation)
then ( unlockall(t, true); trans(body, result, stamp); sync ?= Halt )
else
if (!owner = NullRec)
then ( owner := t; sync ?= Ok )
else
let
val TransRec{stamp=ownerstamp, ...} = !owner
in
if (stamp = ownerstamp)
then sync ?= Ok
else
let
val TransRec{stamp=stampT2,
body=bodyT2,
stateT=stateT2,
result=resultT2, ...} as t2 = !owner
val QueueRec{enqueue, ...} = queue
in
enqueue((sync, t), stamp);
stateT := WaitingOn(c);
if (stamp < stampT2)
then
case !stateT2 of
WaitingOn(c2) =>
let
val CellRec{queue=q2, ...} = c2
val QueueRec{delete, ...} = q2
val (sync2, _) = delete(stampT2)
in
unlockall(t2, true);
trans(bodyT2, resultT2, stampT2);
sync ?= Halt
end
| Running => stateT2 := Probation
| Probation => ()
else ()
end
end
end
| getlock _ = raise Domain
fun newtrans (p, r) = (
timestamp := !timestamp + 1;
trans(p, r, !timestamp))
fun newPrioQueue () =
let
val q = ref nil
fun penqueue(x, prio) =
let
fun insertloop (((y, p)::l2) as l) =
if (prio < p)
then (x, prio)::l
else (x, p)::insertloop l2
| insertloop nil =
[(x, prio)]
in
q := insertloop(!q)
end
fun pdequeue () =
let
val (y, _) = hd(!q)
in
q := tl(!q);
y
end
fun pdelete prio =
let
val x = ref (promise(), NullRec)
fun deleteloop ((y, p)::l2) =
if (p = prio)
then ( x := y; l2 )
else (y, p)::deleteloop l2
| deleteloop nil = nil
in
q := deleteloop(!q);
!x
end
fun pisEmpty () = (!q = nil)
in
QueueRec{enqueue=penqueue, dequeue=pdequeue, delete=pdelete, isEmpty=pisEmpty}
end
end
fun newTrans (ptrans, pnewCellT) =
let
exception Halted
structure Tm = TMClass()
fun trans (p, b) =
let
val r = promise()
in
Tm.newtrans(p, r);
case (future r) of
Abort => b ?= Abort
| AbortExc => ( b ?= Abort; raise Halted )
| Commit(res) => b ?= Commit(res)
end
fun newCellT x =
CellRec{name=newKeyName(), owner=ref NullRec, queue=Tm.newPrioQueue(), stateC=ref (Reference(x)) }
in
ptrans ?= trans;
pnewCellT ?= newCellT
end
(* 8.5.3 Transactions - An example *)
(* help function to trundle through constructors *)
fun getInt (access, c) =
let
val a = access(c, ref Running)
val Reference b = !a
val UTint c = b
in
c
end
val trans = promise()
val newCellT = promise()
val _ = newTrans(trans, newCellT)
val trans = future trans
val newCellT = future newCellT
val c1 = newCellT (UTint 0)
val c2 = newCellT (UTint 0)
val _ = trans((
fn (BodyRec{access, assign, exchange, abort}) =>
let in
assign(c1, ref (Reference(UTint (getInt(access, c1) + 1))));
assign(c2, ref (Reference(UTint (getInt(access, c2) - 1))));
UTunit()
end
), promise())
val r = promise()
val _ = inspect r
val _ = trans((
fn (BodyRec{access, assign, exchange, abort}) =>
let in
assign(c1, ref (Reference(UTint(getInt(access, c1) + 1))));
assign(c2, ref (Reference(UTint(getInt(access, c2) - 1))));
UTpair(UTint(getInt(access, c1)), UTint(getInt(access, c2)))
end
), r)
(* another example *)
val d = Array.array(100, newCellT (UTint 0))
val _ = Array.modifyi (fn (i, x) => newCellT (UTint (i+1))) d
fun rand () = Random.int 100
fun mix () =
trans((
fn (BodyRec{access, assign, exchange, abort}) =>
let
val i = rand()
val j = rand()
val k = rand()
val a = getInt(access, Array.sub(d, i))
val b = getInt(access, Array.sub(d, j))
val c = getInt(access, Array.sub(d, k))
in
assign(Array.sub(d, i), ref (Reference(UTint(a+b-c))));
assign(Array.sub(d, j), ref (Reference(UTint(a-b+c))));
if (i = j) orelse (i = k) orelse (j = k)
then abort()
else ();
assign(Array.sub(d, k), ref (Reference(UTint(~a+b+c))));
UTunit()
end
), promise())
val s = newCellT (UTint 0)
fun sum () =
let
val r = promise()
in
trans((
fn (BodyRec{access, assign, exchange, abort}) =>
let in
assign(s, ref (Reference(UTint 0)));
for 0 (100-1) 1 (
fn n =>
assign(s, ref (Reference(UTint(
getInt(access, s) +
getInt(access, Array.sub(d, n)))))));
UTint(getInt(access, s))
end
), r);
future r
end;
(* Note: doing this 1000 times will result in integer overflow in array.
Switch to IntInf if need that magnitude.
Also, the sum function that follows the for loop, does not wait
on the individual transactions in the for loop to complete.
So even though it is wrapped in a transaction, the sum function
may be interleaved at an arbitrary point in the threaded mixes.
I put in an arbitrary delay to make the sum and pair a bit more
indicative of the mix. *)
inspect (sum());
for 1 500 1 (fn n => mix());
Thread.sleep(Time.fromMilliseconds(Int.toLarge(2000)));
inspect (spawn sum());
val r = promise()
val _ = inspect r
val _ = trans((
fn (BodyRec{access, assign, exchange, abort}) =>
let in
UTpair(UTint(getInt(access, Array.sub(d, 0))), UTint(getInt(access, Array.sub(d, 1))))
end
), r)
(* 8.6.1 The Java language (concurrent part) - Locks *)
signature POINT =
sig
val getX : unit -> real
val getY : unit -> real
val origin : unit -> unit
val scale : real -> unit
val add : package -> unit
val draw : Gtk.object -> unit
end
functor Point (val x:real val y:real) :> POINT =
struct
val lock = Lock.lock()
val x = ref x
val y = ref y
fun getX () = !x
fun getY () = !y
fun origin () =
Lock.sync lock (fn () =>
let in
x := 0.0;
y := 0.0
end)()
fun scale s =
Lock.sync lock (fn () =>
let in
x := !x * s;
y := !y * s
end)()
fun add p =
let
structure P = unpack p : POINT
in
Lock.sync lock (fn () =>
let in
x := !x + P.getX();
y := !y + P.getX()
end)()
end
fun draw (group) =
let
val lx = promise()
val ly = promise()
in
Lock.sync lock (fn () =>
let in
lx ?= !x;
ly ?= !y
end)();
let
val line = Canvas.Group.newItem(group, Canvas.Line.getType())
in
Canvas.Prop.setL line
[Canvas.Line.points ::= [(future lx, future ly)],
Canvas.Line.widthPixels ::= 1]
end;
()
end
end
structure P = Point(val x=10.0 val y=20.0);
P.getX();
P.getY();
(* 8.6.2 The Java language (concurrent part) - Monitors *)
(* Note: This is just a repeat of the translation of 8.4.2 *)
functor Buffer (val n:int) =
struct
val {lockM, waitM, notifyM, notifyAllM} = newMonitor()
val buf = Array.array(n, promise())
val n = n
val i = ref 0
val first = ref 0
val last = ref 0
fun put x =
lockM (fn () =>
if (!i >= n)
then
let in
waitM();
put x;
()
end
else
let in
Array.update(buf, !last, x);
last := !last + 1;
i := !i + 1;
notifyAllM();
()
end)
fun get x =
lockM (fn () =>
if (!i = 0)
then
let in
waitM();
get x;
()
end
else
let in
x ?= Array.sub(buf, !first);
first := (!first+1) mod n;
i := !i - 1;
notifyAllM();
()
end)
end
|