Replaced URIs as optional string representation of addresses.
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; A virtual machine which houses a population of actors which can
4 ;; communicate using messages with actors on the same machine or other
5 ;; machines via the network.
6
7 (module sam
8     (boot-sam
9      make-actor
10      send-message
11      send-message-later
12      address->string
13      string->address)
14
15   (import scheme
16           (chicken base)
17           (chicken io)
18           (chicken string)
19           (chicken port)
20           (chicken process-context)
21           matchable
22           srfi-18 ; threads
23           srfi-69 ; hash-table
24           uuid ; ids for actors
25           uri-generic
26           udp
27           fifo)
28
29   (define trace #f)
30
31   ;; Actors
32
33   (define sam-host "localhost")
34   (define sam-port 8000)
35
36   (define (make-address host port id)
37     (list id host port))
38
39   (define (make-local-address . args)
40     (make-address sam-host
41                   sam-port
42                   (if (null? args)
43                       (uuid)
44                       (car args))))
45   
46   (define (address-id address)
47     (car address))
48   (define (address-host address)
49     (cadr address))
50   (define (address-port address)
51     (caddr address))
52   (define (address->string address)
53     (uri->string
54      (make-uri #:scheme "actor"
55                #:host (address-host address)
56                #:port (address-port address)
57                #:path (list '/ (address-id address)))))
58   (define (string->address str)
59     (let ((uri (uri-reference str)))
60       (make-address (uri-host uri)
61                     (uri-port uri)
62                     (cadr (uri-path uri)))))
63
64   (define (address-local? address)
65     (and (equal? (address-host address) sam-host)
66          (equal? (address-port address) sam-port)))
67
68   (define actor-table (make-hash-table))
69
70   (define (make-actor beh)
71     (let* ((address (make-local-address))
72            (id (address-id address)))
73       (hash-table-set! actor-table id beh)
74       address))
75   
76   (define (deliver-message address . message)
77     (if trace (print "Delivering to " address ": " message))
78     (let ((id (address-id address)))
79       (let ((behaviour (hash-table-ref/default actor-table id '())))
80         (if (null? behaviour)
81             (print "Warning: discarded message " message
82                    " to unknown actor id " id)
83             (match (apply (hash-table-ref actor-table id) (cons address message))
84               ('done (hash-table-delete! actor-table id))
85               ('sleep 'do-nothing)
86               (new-beh (hash-table-set! actor-table id new-beh)))))))
87
88   ;; Scheduler
89
90   (define local-queue-mutex (make-mutex "message queue"))
91   (define message-available-mutex (make-mutex "message available"))
92   (mutex-lock! message-available-mutex #f #f)
93   (define local-queue (make-fifo))
94
95   (define (send-message address . message)
96     (apply (if (address-local? address)
97                send-local-message
98                send-network-message)
99            (cons address message)))
100
101   (define (send-local-message address . message)
102     (mutex-lock! local-queue-mutex)
103     (fifo-push local-queue (cons address message))
104     (mutex-unlock! message-available-mutex)
105     (mutex-unlock! local-queue-mutex))
106
107   (define (send-network-message address . message)
108     (let ((s (udp-open-socket))
109           (packet (with-output-to-string
110                     (lambda ()
111                       (write (cons address message))))))
112       (udp-bind! s #f 0)
113       (udp-connect! s
114                     (address-host address)
115                     (address-port address))
116       (udp-send s packet)
117       (udp-close-socket s)))
118
119   (define (send-message-later address time . message)
120     (thread-start!
121      (lambda ()
122        (thread-sleep! time)
123        (apply send-message (cons address message)))))
124
125   (define (next-local-message)
126     (let ((res #f))
127       (mutex-lock! message-available-mutex #f #f)
128       (mutex-lock! local-queue-mutex)
129       (set! res (fifo-pop local-queue))
130       (if (not (fifo-empty? local-queue))
131           (mutex-unlock! message-available-mutex))
132       (mutex-unlock! local-queue-mutex)
133       res))
134
135   (define (start-scheduler)
136     (let loop ()
137       (apply deliver-message (next-local-message))
138       (loop)))
139
140
141   ;; Network
142
143   (define (start-network-listener)
144     (thread-start!
145      (lambda ()
146        (let ((s (udp-open-socket*)))
147          (udp-bind! s #f sam-port)
148          (let loop ()
149            (let-values (((n str) (udp-recv s 65536)))
150              (match (with-input-from-string str read)
151                ((address message ...)
152                 (apply send-message (cons address message)))
153                (else
154                 (print "Warning: received badly formatted message string '" str "'"))))
155            (loop))))))
156
157   ;; System interface
158
159   (define reader-queue-mutex (make-mutex "reader queue"))
160   (define reader-available-mutex (make-mutex "reader available"))
161   (mutex-lock! reader-available-mutex #f #f)
162   (define reader-queue (make-fifo))
163
164   (define (next-reader)
165     (let ((res #f))
166       (mutex-lock! reader-available-mutex #f #f)
167       (mutex-lock! reader-queue-mutex)
168       (set! res (fifo-pop reader-queue))
169       (if (not (fifo-empty? reader-queue))
170           (mutex-unlock! reader-available-mutex))
171       (mutex-unlock! reader-queue-mutex)
172       res))
173
174   (define (start-console)
175     (thread-start!
176      (lambda ()
177        (let loop ()
178          (let ((reader (next-reader)))
179            (##sys#thread-block-for-i/o! (current-thread) 0 #t)
180            (thread-yield!)
181            (send-message reader (read-line)))
182          (loop)))))
183
184   ;; System initialization
185
186   (define (system-beh self . message)
187     (match message
188
189       (('shutdown)
190        (print "## System actor received shutdown message.")
191        (exit 0)
192        'done)
193
194       (('print strings ...)
195        (apply print strings)
196        'sleep)
197
198       (('read reader)
199        (mutex-lock! reader-queue-mutex)
200        (fifo-push reader-queue reader)
201        (mutex-unlock! reader-available-mutex)
202        (mutex-unlock! reader-queue-mutex)
203        'sleep)))
204
205   (define (boot-sam host port main-beh)
206     (set! sam-host host)
207     (set! sam-port port)
208     (start-console)
209     (start-network-listener)
210     (send-message (make-actor main-beh) (make-actor system-beh))
211     (start-scheduler)))