Message delivery now catches exceptions and prints warning.
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; A virtual machine which houses a population of actors which can
4 ;; communicate using messages with actors on the same host or other
5 ;; hosts via the network.
6
7 (import scheme
8         (chicken base)
9         (chicken io)
10         (chicken string)
11         (chicken port)
12         (chicken process-context)
13         (chicken file)
14         (chicken condition)
15         matchable
16         srfi-18 ; threads
17         srfi-69 ; hash-table
18         uuid ; ids for actors
19         uri-generic
20         udp
21         fifo)
22
23 ;; Global variables
24
25 (define trace #f)
26
27 (define sam-host "localhost")
28 (define sam-port 8000)
29
30 (define sam-version "0.1")
31
32 ;; Actors
33
34
35 (define (make-address host port id)
36   (list id host port))
37
38 (define (make-local-address . args)
39   (make-address sam-host
40                 sam-port
41                 (if (null? args)
42                     (uuid)
43                     (car args))))
44
45 (define (address-id address)
46   (car address))
47 (define (address-host address)
48   (cadr address))
49 (define (address-port address)
50   (caddr address))
51 (define (address->string address)
52   (uri->string
53    (make-uri #:scheme "actor"
54              #:host (address-host address)
55              #:port (address-port address)
56              #:path (list '/ (address-id address)))))
57 (define (string->address str)
58   (let ((uri (uri-reference str)))
59     (make-address (uri-host uri)
60                   (uri-port uri)
61                   (cadr (uri-path uri)))))
62
63 (define (address-local? address)
64   (and (equal? (address-host address) sam-host)
65        (equal? (address-port address) sam-port)))
66
67 (define actor-table (make-hash-table))
68
69 (define (make-actor beh)
70   (let* ((address (make-local-address))
71          (id (address-id address)))
72     (hash-table-set! actor-table id beh)
73     address))
74
75 (define (deliver-message address . message)
76   (if trace (print "Delivering to " address ": " message))
77   (let ((id (address-id address)))
78     (let ((behaviour (hash-table-ref/default actor-table id '())))
79       (if (null? behaviour)
80           (print "## Warning: discarded message " message
81                  " to unknown actor id " id)
82           (condition-case
83               (match (apply behaviour (cons address message))
84                 ('done (hash-table-delete! actor-table id))
85                 ('sleep 'do-nothing)
86                 (new-beh (hash-table-set! actor-table id new-beh)))
87             ((exn)
88              (print "## Warning: actor id " id " crashed evaluating message " message)))))))
89
90 ;; Scheduler
91
92 (define local-queue-mutex (make-mutex "message queue"))
93 (define message-available-mutex (make-mutex "message available"))
94 (mutex-lock! message-available-mutex #f #f)
95 (define local-queue (make-fifo))
96
97 (define (send-message address . message)
98   (apply (if (address-local? address)
99              send-local-message
100              send-network-message)
101          (cons address message)))
102
103 (define (send-local-message address . message)
104   (mutex-lock! local-queue-mutex)
105   (fifo-push local-queue (cons address message))
106   (mutex-unlock! message-available-mutex)
107   (mutex-unlock! local-queue-mutex))
108
109 (define (send-network-message address . message)
110   (let ((s (udp-open-socket))
111         (packet (with-output-to-string
112                   (lambda ()
113                     (write (cons address message))))))
114     (udp-bind! s #f 0)
115     (udp-connect! s
116                   (address-host address)
117                   (address-port address))
118     (udp-send s packet)
119     (udp-close-socket s)))
120
121 (define (send-message-later address time . message)
122   (thread-start!
123    (lambda ()
124      (thread-sleep! time)
125      (apply send-message (cons address message)))))
126
127 (define (next-local-message)
128   (let ((res #f))
129     (mutex-lock! message-available-mutex #f #f)
130     (mutex-lock! local-queue-mutex)
131     (set! res (fifo-pop local-queue))
132     (if (not (fifo-empty? local-queue))
133         (mutex-unlock! message-available-mutex))
134     (mutex-unlock! local-queue-mutex)
135     res))
136
137 (define (start-scheduler)
138   (let loop ()
139     (apply deliver-message (next-local-message))
140     (loop)))
141
142
143 ;; Network
144
145 (define (start-network-listener)
146   (thread-start!
147    (lambda ()
148      (let ((s (udp-open-socket*)))
149        (udp-bind! s #f sam-port)
150        (let loop ()
151          (let-values (((n str) (udp-recv s 65536)))
152            (match (with-input-from-string str read)
153              ((address message ...)
154               (apply send-message (cons address message)))
155              (else
156               (print "Warning: received badly formatted message string '" str "'"))))
157          (loop))))))
158
159 ;; System interface
160
161 (define reader-queue-mutex (make-mutex "reader queue"))
162 (define reader-available-mutex (make-mutex "reader available"))
163 (mutex-lock! reader-available-mutex #f #f)
164 (define reader-queue (make-fifo))
165
166 (define (next-reader)
167   (let ((res #f))
168     (mutex-lock! reader-available-mutex #f #f)
169     (mutex-lock! reader-queue-mutex)
170     (set! res (fifo-pop reader-queue))
171     (if (not (fifo-empty? reader-queue))
172         (mutex-unlock! reader-available-mutex))
173     (mutex-unlock! reader-queue-mutex)
174     res))
175
176 (define (start-console)
177   (thread-start!
178    (lambda ()
179      (let loop ()
180        (let ((reader (next-reader)))
181          (##sys#thread-block-for-i/o! (current-thread) 0 #t)
182          (thread-yield!)
183          (send-message reader (read-line)))
184        (loop)))))
185
186 ;; System initialization
187
188 (define (system-beh self . message)
189   (match message
190
191     (('shutdown)
192      (print "## System actor received shutdown message.")
193      (exit 0)
194      'done)
195
196     (('print strings ...)
197      (apply print strings)
198      'sleep)
199
200     (('read reader)
201      (mutex-lock! reader-queue-mutex)
202      (fifo-push reader-queue reader)
203      (mutex-unlock! reader-available-mutex)
204      (mutex-unlock! reader-queue-mutex)
205      'sleep)))
206
207 (define (boot-sam)
208   (start-console)
209   (start-network-listener)
210   (let ((system (make-actor system-beh))
211         (main #f))
212     (condition-case
213         (begin
214           (set! main (make-actor main-beh)))
215       ((exn)
216        (print "## Error starting main actor. Is main-beh defined?")
217        (exit 1)))
218     (send-message main system))
219   (start-scheduler))
220
221 (define (print-usage)
222   (print "Simple Actor Machine v" sam-version "\n"
223          "\n"
224          "Usage: sam -h|--help\n"
225          "       sam [-n hostname] [-p port] source-file-1 [source-file-2 [...]] "))
226
227
228 (let loop ((args (cdr (argv))))
229   (match args
230     (((or "-h" "--help"))
231      (print-usage))
232     (((or "-p" "--port") pstr rest ...)
233      (set! sam-port (string->number pstr))
234      (loop rest))
235     (((or "-n" "--hostname") hstr rest ...)
236      (set! sam-host hstr)
237      (loop rest))
238     (((? file-exists? filename) rest ...)
239      (print* "## Loading " filename "...")
240      (load filename)
241      (print " done.")
242      (loop rest))
243     (()
244      (print "## Booting SAM\n")
245      (boot-sam))
246     (else
247      (print "Unrecognised argument '" (car args) "'.\n")
248      (print-usage))))
249