Behaviours are now tagged lists.
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; A virtual machine which houses a population of actors which can
4 ;; communicate using messages with actors on the same host or other
5 ;; hosts via the network.
6
7 (import scheme
8         (chicken base)
9         (chicken io)
10         (chicken string)
11         (chicken port)
12         (chicken process-context)
13         (chicken file)
14         (chicken condition)
15         matchable
16         srfi-18 ; threads
17         srfi-69 ; hash-table
18         uuid ; ids for actors
19         uri-generic
20         udp
21         fifo
22         sam-macros)
23
24 ;; Global variables
25
26 (define trace #f)
27
28 (define sam-host "localhost")
29 (define sam-port 8000)
30
31 (define sam-version "0.1")
32
33 ;; Logging
34
35 (define (log-msg . args)
36   (with-output-to-port (current-error-port)
37     (lambda ()
38       (apply print (cons "## " args)))))
39
40 (define (log-trace . args)
41   (with-output-to-port (current-error-port)
42     (lambda ()
43       (if trace (apply log-msg args)))))
44
45 (define (->stringrep arg)
46   (with-output-to-string
47     (lambda ()
48       (write arg))))
49
50 ;; Behaviours
51 ;; (See also macros defined in sam-macros.scm.)
52
53 (define (beh-proc beh)
54   (cadr beh))
55 (define (beh-parent beh)
56   (caddr beh))  
57
58 (define root-beh
59   (make-beh : #f (self)
60             (('ping recipient) =>
61              (send-message recipient 'pong))))
62
63 (define (beh? x)
64   (and (pair? x)
65        (not (null? x))
66        (eq? (car x) 'beh)))
67
68 ;; Actors
69
70 (define (make-address host port id)
71   (list id host port))
72
73 (define (make-local-address . args)
74   (make-address sam-host
75                 sam-port
76                 (if (null? args)
77                     (uuid)
78                     (car args))))
79
80 (define (address-id address)
81   (car address))
82 (define (address-host address)
83   (cadr address))
84 (define (address-port address)
85   (caddr address))
86 (define (address->string address)
87   (uri->string
88    (make-uri #:scheme "actor"
89              #:host (address-host address)
90              #:port (address-port address)
91              #:path (list '/ (address-id address)))))
92 (define (string->address str)
93   (let ((uri (uri-reference str)))
94     (make-address (uri-host uri)
95                   (uri-port uri)
96                   (cadr (uri-path uri)))))
97
98 (define (address-local? address)
99   (and (equal? (address-host address) sam-host)
100        (equal? (address-port address) sam-port)))
101
102 (define actor-table (make-hash-table))
103
104 (define (make-actor beh)
105   (let* ((address (make-local-address))
106          (id (address-id address)))
107     (hash-table-set! actor-table id beh)
108     address))
109
110 (define (deliver-message address . message)
111   (let ((id (address-id address)))
112     (log-trace "DELIVERING to " id ": " (->stringrep message))
113     (let loop ((beh (hash-table-ref/default actor-table id #f)))
114       (if beh
115           (condition-case
116               (match (apply (beh-proc beh) (cons address message))
117                 ('done (hash-table-delete! actor-table id))
118                 ('pass
119                  (log-trace "Passing to parent behaviour...")
120                  (loop (beh-parent beh)))
121                 ((? beh? new-beh) (hash-table-set! actor-table id new-beh))
122                 (else
123                  'do-nothing)) ; sleep is now the default
124             (o (exn)
125              (log-msg "Warning: actor " id " crashed evaluating message " (->stringrep message))
126              (print-error-message o)))
127           (log-msg "Warning: DISCARDING message to unknown actor " id ": " (->stringrep message))))))
128
129 ;; Scheduler
130
131 (define local-queue-mutex (make-mutex "message queue"))
132 (define message-available-mutex (make-mutex "message available"))
133 (mutex-lock! message-available-mutex #f #f)
134 (define local-queue (make-fifo))
135
136 (define (send-message address . message)
137   (log-trace "SENDING to " address ": " (->stringrep message))
138   (apply (if (address-local? address)
139              send-local-message
140              send-network-message)
141          (cons address message)))
142
143 (define (send-local-message address . message)
144   (mutex-lock! local-queue-mutex)
145   (fifo-push local-queue (cons address message))
146   (mutex-unlock! message-available-mutex)
147   (mutex-unlock! local-queue-mutex))
148
149 (define (send-network-message address . message)
150   (let ((s (udp-open-socket))
151         (packet (with-output-to-string
152                   (lambda ()
153                     (write (cons address message))))))
154     (udp-bind! s #f 0)
155     (udp-connect! s
156                   (address-host address)
157                   (address-port address))
158     (udp-send s packet)
159     (udp-close-socket s)))
160
161 (define (send-message-later address time . message)
162   (thread-start!
163    (lambda ()
164      (thread-sleep! time)
165      (apply send-message (cons address message)))))
166
167 (define (next-local-message)
168   (let ((res #f))
169     (mutex-lock! message-available-mutex #f #f)
170     (mutex-lock! local-queue-mutex)
171     (set! res (fifo-pop local-queue))
172     (if (not (fifo-empty? local-queue))
173         (mutex-unlock! message-available-mutex))
174     (mutex-unlock! local-queue-mutex)
175     res))
176
177 (define (start-scheduler)
178   (let loop ()
179     (apply deliver-message (next-local-message))
180     (loop)))
181
182
183 ;; Network
184
185 (define (start-network-listener)
186   (thread-start!
187    (lambda ()
188      (let ((s (udp-open-socket*)))
189        (udp-bind! s #f sam-port)
190        (let loop ()
191          (let-values (((n str) (udp-recv s 65536)))
192            (match (with-input-from-string str read)
193              ((address message ...)
194               (apply send-message (cons address message)))
195              (else
196               (log-msg "Warning: received badly formatted message string '" str "'"))))
197          (loop))))))
198
199 ;; System interface
200
201 (define reader-queue-mutex (make-mutex "reader queue"))
202 (define reader-available-mutex (make-mutex "reader available"))
203 (mutex-lock! reader-available-mutex #f #f)
204 (define reader-queue (make-fifo))
205
206 (define (next-reader)
207   (let ((res #f))
208     (mutex-lock! reader-available-mutex #f #f)
209     (mutex-lock! reader-queue-mutex)
210     (set! res (fifo-pop reader-queue))
211     (if (not (fifo-empty? reader-queue))
212         (mutex-unlock! reader-available-mutex))
213     (mutex-unlock! reader-queue-mutex)
214     res))
215
216 (define (start-console)
217   (thread-start!
218    (lambda ()
219      (let loop ()
220        (let ((reader (next-reader)))
221          (##sys#thread-block-for-i/o! (current-thread) 0 #t)
222          (thread-yield!)
223          (send-message reader (read-line)))
224        (loop)))))
225
226 ;; System initialization
227
228 (define-beh system-beh
229   (self)
230
231   (('shutdown) =>
232    (log-msg "System actor received shutdown message.")
233    (exit 0)
234    'done)
235
236   (('print strings ...) =>
237    (apply print strings))
238
239   (('read reader) =>
240    (mutex-lock! reader-queue-mutex)
241    (fifo-push reader-queue reader)
242    (mutex-unlock! reader-available-mutex)
243    (mutex-unlock! reader-queue-mutex)))
244
245 (define (boot-sam)
246   (start-console)
247   (start-network-listener)
248   (let ((system (make-actor system-beh))
249         (main #f))
250     (condition-case
251         (begin
252           (set! main (make-actor main-beh)))
253       ((exn)
254        (log-msg "Error starting main actor. Is main-beh defined?")
255        (exit 1)))
256     (send-message main system))
257   (start-scheduler))
258
259 (define (print-usage)
260   (print "Simple Actor Machine v" sam-version "\n"
261          "\n"
262          "Usage: sam -h|--help\n"
263          "       sam [-n hostname] [-p port] source-file-1 [source-file-2 [...]] "))
264
265 (let loop ((args (cdr (argv))))
266   (match args
267     (((or "-h" "--help"))
268      (print-usage))
269     (((or "-p" "--port") pstr rest ...)
270      (set! sam-port (string->number pstr))
271      (loop rest))
272     (((or "-n" "--hostname") hstr rest ...)
273      (set! sam-host hstr)
274      (loop rest))
275     (((or "-t" "--trace") rest ...)
276      (log-msg "Enabling trace debugging")
277      (set! trace #t)
278      (loop rest))
279     (((? file-exists? filename) rest ...)
280      (log-msg "Loading " filename)
281      (load filename)
282      (loop rest))
283     (()
284      (log-msg "Booting SAM\n")
285      (boot-sam))
286     (else
287      (print "Unrecognised argument '" (car args) "'.\n")
288      (print-usage))))
289