Minor changes to behaviour macros.
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; A virtual machine which houses a population of actors which can
4 ;; communicate using messages with actors on the same host or other
5 ;; hosts via the network.
6
7 (import scheme
8         (chicken base)
9         (chicken io)
10         (chicken string)
11         (chicken port)
12         (chicken process-context)
13         (chicken file)
14         (chicken condition)
15         matchable
16         srfi-18 ; threads
17         srfi-69 ; hash-table
18         uuid ; ids for actors
19         uri-generic
20         udp
21         fifo
22         sam-macros)
23
24 ;; Global variables
25
26 (define trace #f)
27
28 (define sam-host "localhost")
29 (define sam-port 8000)
30
31 (define sam-version "0.1")
32
33 ;; Logging
34
35 (define (log-msg . args)
36   (with-output-to-port (current-error-port)
37     (lambda ()
38       (apply print (cons "## " args)))))
39
40 (define (log-trace . args)
41   (with-output-to-port (current-error-port)
42     (lambda ()
43       (if trace (apply log-msg args)))))
44
45 (define (->stringrep arg)
46   (with-output-to-string
47     (lambda ()
48       (write arg))))
49
50 ;; Behaviours
51 ;; (See also macros defined in sam-macros.scm.)
52
53 (define (beh-proc beh)
54   (car beh))
55 (define (beh-parent beh)
56   (cdr beh))  
57
58 (define root-beh
59   (make-beh : #f (self)
60             (('ping recipient) =>
61              (send-message recipient 'pong)
62              'sleep)))
63
64 ;; Actors
65
66 (define (make-address host port id)
67   (list id host port))
68
69 (define (make-local-address . args)
70   (make-address sam-host
71                 sam-port
72                 (if (null? args)
73                     (uuid)
74                     (car args))))
75
76 (define (address-id address)
77   (car address))
78 (define (address-host address)
79   (cadr address))
80 (define (address-port address)
81   (caddr address))
82 (define (address->string address)
83   (uri->string
84    (make-uri #:scheme "actor"
85              #:host (address-host address)
86              #:port (address-port address)
87              #:path (list '/ (address-id address)))))
88 (define (string->address str)
89   (let ((uri (uri-reference str)))
90     (make-address (uri-host uri)
91                   (uri-port uri)
92                   (cadr (uri-path uri)))))
93
94 (define (address-local? address)
95   (and (equal? (address-host address) sam-host)
96        (equal? (address-port address) sam-port)))
97
98 (define actor-table (make-hash-table))
99
100 (define (make-actor beh)
101   (let* ((address (make-local-address))
102          (id (address-id address)))
103     (hash-table-set! actor-table id beh)
104     address))
105
106 (define (deliver-message address . message)
107   (let ((id (address-id address)))
108     (log-trace "DELIVERING to " id ": " (->stringrep message))
109     (let loop ((beh (hash-table-ref/default actor-table id #f)))
110       (if beh
111           (condition-case
112               (match (apply (beh-proc beh) (cons address message))
113                 ('done (hash-table-delete! actor-table id))
114                 ('sleep 'do-nothing)
115                 ('pass
116                  (log-trace "Passing to parent behaviour...")
117                  (loop (beh-parent beh)))
118                 ((? procedure? new-beh) (hash-table-set! actor-table id new-beh))
119                 (else
120                  (log-msg "Warning: behaviour of actor " id " returned invalid value.")))
121             (o (exn)
122              (log-msg "Warning: actor " id " crashed evaluating message " (->stringrep message))
123              (print-error-message o)))
124           (log-msg "Warning: DISCARDING message to unknown actor " id ": " (->stringrep message))))))
125
126 ;; Scheduler
127
128 (define local-queue-mutex (make-mutex "message queue"))
129 (define message-available-mutex (make-mutex "message available"))
130 (mutex-lock! message-available-mutex #f #f)
131 (define local-queue (make-fifo))
132
133 (define (send-message address . message)
134   (log-trace "SENDING to " address ": " (->stringrep message))
135   (apply (if (address-local? address)
136              send-local-message
137              send-network-message)
138          (cons address message)))
139
140 (define (send-local-message address . message)
141   (mutex-lock! local-queue-mutex)
142   (fifo-push local-queue (cons address message))
143   (mutex-unlock! message-available-mutex)
144   (mutex-unlock! local-queue-mutex))
145
146 (define (send-network-message address . message)
147   (let ((s (udp-open-socket))
148         (packet (with-output-to-string
149                   (lambda ()
150                     (write (cons address message))))))
151     (udp-bind! s #f 0)
152     (udp-connect! s
153                   (address-host address)
154                   (address-port address))
155     (udp-send s packet)
156     (udp-close-socket s)))
157
158 (define (send-message-later address time . message)
159   (thread-start!
160    (lambda ()
161      (thread-sleep! time)
162      (apply send-message (cons address message)))))
163
164 (define (next-local-message)
165   (let ((res #f))
166     (mutex-lock! message-available-mutex #f #f)
167     (mutex-lock! local-queue-mutex)
168     (set! res (fifo-pop local-queue))
169     (if (not (fifo-empty? local-queue))
170         (mutex-unlock! message-available-mutex))
171     (mutex-unlock! local-queue-mutex)
172     res))
173
174 (define (start-scheduler)
175   (let loop ()
176     (apply deliver-message (next-local-message))
177     (loop)))
178
179
180 ;; Network
181
182 (define (start-network-listener)
183   (thread-start!
184    (lambda ()
185      (let ((s (udp-open-socket*)))
186        (udp-bind! s #f sam-port)
187        (let loop ()
188          (let-values (((n str) (udp-recv s 65536)))
189            (match (with-input-from-string str read)
190              ((address message ...)
191               (apply send-message (cons address message)))
192              (else
193               (log-msg "Warning: received badly formatted message string '" str "'"))))
194          (loop))))))
195
196 ;; System interface
197
198 (define reader-queue-mutex (make-mutex "reader queue"))
199 (define reader-available-mutex (make-mutex "reader available"))
200 (mutex-lock! reader-available-mutex #f #f)
201 (define reader-queue (make-fifo))
202
203 (define (next-reader)
204   (let ((res #f))
205     (mutex-lock! reader-available-mutex #f #f)
206     (mutex-lock! reader-queue-mutex)
207     (set! res (fifo-pop reader-queue))
208     (if (not (fifo-empty? reader-queue))
209         (mutex-unlock! reader-available-mutex))
210     (mutex-unlock! reader-queue-mutex)
211     res))
212
213 (define (start-console)
214   (thread-start!
215    (lambda ()
216      (let loop ()
217        (let ((reader (next-reader)))
218          (##sys#thread-block-for-i/o! (current-thread) 0 #t)
219          (thread-yield!)
220          (send-message reader (read-line)))
221        (loop)))))
222
223 ;; System initialization
224
225 (define-beh system-beh
226   (self)
227   (('shutdown) =>
228    (log-msg "System actor received shutdown message.")
229    (exit 0)
230    'done)
231
232   (('print strings ...) =>
233    (apply print strings)
234    'sleep)
235
236   (('read reader) =>
237    (mutex-lock! reader-queue-mutex)
238    (fifo-push reader-queue reader)
239    (mutex-unlock! reader-available-mutex)
240    (mutex-unlock! reader-queue-mutex)
241    'sleep))
242
243 (define (boot-sam)
244   (start-console)
245   (start-network-listener)
246   (let ((system (make-actor system-beh))
247         (main #f))
248     (condition-case
249         (begin
250           (set! main (make-actor main-beh)))
251       ((exn)
252        (log-msg "Error starting main actor. Is main-beh defined?")
253        (exit 1)))
254     (send-message main system))
255   (start-scheduler))
256
257 (define (print-usage)
258   (print "Simple Actor Machine v" sam-version "\n"
259          "\n"
260          "Usage: sam -h|--help\n"
261          "       sam [-n hostname] [-p port] source-file-1 [source-file-2 [...]] "))
262
263 (let loop ((args (cdr (argv))))
264   (match args
265     (((or "-h" "--help"))
266      (print-usage))
267     (((or "-p" "--port") pstr rest ...)
268      (set! sam-port (string->number pstr))
269      (loop rest))
270     (((or "-n" "--hostname") hstr rest ...)
271      (set! sam-host hstr)
272      (loop rest))
273     (((or "-t" "--trace") rest ...)
274      (log-msg "Enabling trace debugging")
275      (set! trace #t)
276      (loop rest))
277     (((? file-exists? filename) rest ...)
278      (log-msg "Loading " filename)
279      (load filename)
280      (loop rest))
281     (()
282      (log-msg "Booting SAM\n")
283      (boot-sam))
284     (else
285      (print "Unrecognised argument '" (car args) "'.\n")
286      (print-usage))))
287