Implemented beh hierarchy and behaviour macros.
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; A virtual machine which houses a population of actors which can
4 ;; communicate using messages with actors on the same host or other
5 ;; hosts via the network.
6
7 (import scheme
8         (chicken base)
9         (chicken io)
10         (chicken string)
11         (chicken port)
12         (chicken process-context)
13         (chicken file)
14         (chicken condition)
15         matchable
16         srfi-18 ; threads
17         srfi-69 ; hash-table
18         uuid ; ids for actors
19         uri-generic
20         udp
21         fifo
22         sam-macros)
23
24 ;; Global variables
25
26 (define trace #f)
27
28 (define sam-host "localhost")
29 (define sam-port 8000)
30
31 (define sam-version "0.1")
32
33 ;; Logging
34
35 (define (log-msg . args)
36   (with-output-to-port (current-error-port)
37     (lambda ()
38       (apply print (cons "## " args)))))
39
40 (define (log-trace . args)
41   (with-output-to-port (current-error-port)
42     (lambda ()
43       (if trace (apply log-msg args)))))
44
45 ;; Behaviours
46
47 (define (beh-proc beh)
48   (car beh))
49 (define (beh-parent beh)
50   (cdr beh))  
51
52 (define root-beh
53   (make-beh : #f (self)
54             (('ping recipient) =>
55              (send-message recipient 'pong)
56              'sleep)))
57
58 ;; Actors
59
60 (define (make-address host port id)
61   (list id host port))
62
63 (define (make-local-address . args)
64   (make-address sam-host
65                 sam-port
66                 (if (null? args)
67                     (uuid)
68                     (car args))))
69
70 (define (address-id address)
71   (car address))
72 (define (address-host address)
73   (cadr address))
74 (define (address-port address)
75   (caddr address))
76 (define (address->string address)
77   (uri->string
78    (make-uri #:scheme "actor"
79              #:host (address-host address)
80              #:port (address-port address)
81              #:path (list '/ (address-id address)))))
82 (define (string->address str)
83   (let ((uri (uri-reference str)))
84     (make-address (uri-host uri)
85                   (uri-port uri)
86                   (cadr (uri-path uri)))))
87
88 (define (address-local? address)
89   (and (equal? (address-host address) sam-host)
90        (equal? (address-port address) sam-port)))
91
92 (define actor-table (make-hash-table))
93
94 (define (make-actor beh)
95   (let* ((address (make-local-address))
96          (id (address-id address)))
97     (hash-table-set! actor-table id beh)
98     address))
99
100 (define (deliver-message address . message)
101   (let ((id (address-id address)))
102     (log-trace "DELIVERING to " id ": " message)
103     (let loop ((beh (hash-table-ref/default actor-table id #f)))
104       (if beh
105           (condition-case
106               (match (apply (beh-proc beh) (cons address message))
107                 ('done (hash-table-delete! actor-table id))
108                 ('sleep 'do-nothing)
109                 ('pass
110                  (log-trace "Passing to parent behaviour...")
111                  (loop (beh-parent beh)))
112                 ((? procedure? new-beh) (hash-table-set! actor-table id new-beh))
113                 (else
114                  (log-msg "Warning: behaviour of actor " id " returned invalid value.")))
115             (o (exn)
116              (log-msg "Warning: actor " id " crashed evaluating message " message)
117              (print-error-message o)))
118           (log-msg "Warning: DISCARDING message to unknown actor " id ": " message)))))
119
120 ;; Scheduler
121
122 (define local-queue-mutex (make-mutex "message queue"))
123 (define message-available-mutex (make-mutex "message available"))
124 (mutex-lock! message-available-mutex #f #f)
125 (define local-queue (make-fifo))
126
127 (define (send-message address . message)
128   (log-trace "SENDING to " address ": " message)
129   (apply (if (address-local? address)
130              send-local-message
131              send-network-message)
132          (cons address message)))
133
134 (define (send-local-message address . message)
135   (mutex-lock! local-queue-mutex)
136   (fifo-push local-queue (cons address message))
137   (mutex-unlock! message-available-mutex)
138   (mutex-unlock! local-queue-mutex))
139
140 (define (send-network-message address . message)
141   (let ((s (udp-open-socket))
142         (packet (with-output-to-string
143                   (lambda ()
144                     (write (cons address message))))))
145     (udp-bind! s #f 0)
146     (udp-connect! s
147                   (address-host address)
148                   (address-port address))
149     (udp-send s packet)
150     (udp-close-socket s)))
151
152 (define (send-message-later address time . message)
153   (thread-start!
154    (lambda ()
155      (thread-sleep! time)
156      (apply send-message (cons address message)))))
157
158 (define (next-local-message)
159   (let ((res #f))
160     (mutex-lock! message-available-mutex #f #f)
161     (mutex-lock! local-queue-mutex)
162     (set! res (fifo-pop local-queue))
163     (if (not (fifo-empty? local-queue))
164         (mutex-unlock! message-available-mutex))
165     (mutex-unlock! local-queue-mutex)
166     res))
167
168 (define (start-scheduler)
169   (let loop ()
170     (apply deliver-message (next-local-message))
171     (loop)))
172
173
174 ;; Network
175
176 (define (start-network-listener)
177   (thread-start!
178    (lambda ()
179      (let ((s (udp-open-socket*)))
180        (udp-bind! s #f sam-port)
181        (let loop ()
182          (let-values (((n str) (udp-recv s 65536)))
183            (match (with-input-from-string str read)
184              ((address message ...)
185               (apply send-message (cons address message)))
186              (else
187               (log-msg "Warning: received badly formatted message string '" str "'"))))
188          (loop))))))
189
190 ;; System interface
191
192 (define reader-queue-mutex (make-mutex "reader queue"))
193 (define reader-available-mutex (make-mutex "reader available"))
194 (mutex-lock! reader-available-mutex #f #f)
195 (define reader-queue (make-fifo))
196
197 (define (next-reader)
198   (let ((res #f))
199     (mutex-lock! reader-available-mutex #f #f)
200     (mutex-lock! reader-queue-mutex)
201     (set! res (fifo-pop reader-queue))
202     (if (not (fifo-empty? reader-queue))
203         (mutex-unlock! reader-available-mutex))
204     (mutex-unlock! reader-queue-mutex)
205     res))
206
207 (define (start-console)
208   (thread-start!
209    (lambda ()
210      (let loop ()
211        (let ((reader (next-reader)))
212          (##sys#thread-block-for-i/o! (current-thread) 0 #t)
213          (thread-yield!)
214          (send-message reader (read-line)))
215        (loop)))))
216
217 ;; System initialization
218
219 (define system-beh
220   (make-beh (self)
221             (('shutdown) =>
222              (log-msg "System actor received shutdown message.")
223              (exit 0)
224              'done)
225
226             (('print strings ...) =>
227              (apply print strings)
228              'sleep)
229
230             (('read reader) =>
231              (mutex-lock! reader-queue-mutex)
232              (fifo-push reader-queue reader)
233              (mutex-unlock! reader-available-mutex)
234              (mutex-unlock! reader-queue-mutex)
235              'sleep)))
236
237 (define (boot-sam)
238   (start-console)
239   (start-network-listener)
240   (let ((system (make-actor system-beh))
241         (main #f))
242     (condition-case
243         (begin
244           (set! main (make-actor main-beh)))
245       ((exn)
246        (log-msg "Error starting main actor. Is main-beh defined?")
247        (exit 1)))
248     (send-message main system))
249   (start-scheduler))
250
251 (define (print-usage)
252   (print "Simple Actor Machine v" sam-version "\n"
253          "\n"
254          "Usage: sam -h|--help\n"
255          "       sam [-n hostname] [-p port] source-file-1 [source-file-2 [...]] "))
256
257 (let loop ((args (cdr (argv))))
258   (match args
259     (((or "-h" "--help"))
260      (print-usage))
261     (((or "-p" "--port") pstr rest ...)
262      (set! sam-port (string->number pstr))
263      (loop rest))
264     (((or "-n" "--hostname") hstr rest ...)
265      (set! sam-host hstr)
266      (loop rest))
267     (((or "-t" "--trace") rest ...)
268      (log-msg "Enabling trace debugging")
269      (set! trace #t)
270      (loop rest))
271     (((? file-exists? filename) rest ...)
272      (log-msg "Loading " filename)
273      (load filename)
274      (loop rest))
275     (()
276      (log-msg "Booting SAM\n")
277      (boot-sam))
278     (else
279      (print "Unrecognised argument '" (car args) "'.\n")
280      (print-usage))))
281