Refactored chat client.
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; A virtual machine which houses a population of actors which can
4 ;; communicate using messages with actors on the same host or other
5 ;; hosts via the network.
6
7 (import scheme
8         (chicken base)
9         (chicken io)
10         (chicken string)
11         (chicken port)
12         (chicken process-context)
13         (chicken file)
14         (chicken condition)
15         matchable
16         srfi-18 ; threads
17         srfi-69 ; hash-table
18         uuid ; ids for actors
19         uri-generic
20         udp
21         fifo
22         sam-macros)
23
24 ;; Global variables
25
26 (define trace #f)
27
28 (define sam-host "localhost")
29 (define sam-port 8000)
30
31 (define sam-version "0.1")
32
33 ;; Logging
34
35 (define (log-msg . args)
36   (with-output-to-port (current-error-port)
37     (lambda ()
38       (apply print (cons "## " args)))))
39
40 (define (log-trace . args)
41   (with-output-to-port (current-error-port)
42     (lambda ()
43       (if trace (apply log-msg args)))))
44
45 ;; Behaviours
46 ;; (See also macros defined in sam-macros.scm.)
47
48 (define (beh-proc beh)
49   (car beh))
50 (define (beh-parent beh)
51   (cdr beh))  
52
53 (define root-beh
54   (make-beh : #f (self)
55             (('ping recipient) =>
56              (send-message recipient 'pong)
57              'sleep)))
58
59 ;; Actors
60
61 (define (make-address host port id)
62   (list id host port))
63
64 (define (make-local-address . args)
65   (make-address sam-host
66                 sam-port
67                 (if (null? args)
68                     (uuid)
69                     (car args))))
70
71 (define (address-id address)
72   (car address))
73 (define (address-host address)
74   (cadr address))
75 (define (address-port address)
76   (caddr address))
77 (define (address->string address)
78   (uri->string
79    (make-uri #:scheme "actor"
80              #:host (address-host address)
81              #:port (address-port address)
82              #:path (list '/ (address-id address)))))
83 (define (string->address str)
84   (let ((uri (uri-reference str)))
85     (make-address (uri-host uri)
86                   (uri-port uri)
87                   (cadr (uri-path uri)))))
88
89 (define (address-local? address)
90   (and (equal? (address-host address) sam-host)
91        (equal? (address-port address) sam-port)))
92
93 (define actor-table (make-hash-table))
94
95 (define (make-actor beh)
96   (let* ((address (make-local-address))
97          (id (address-id address)))
98     (hash-table-set! actor-table id beh)
99     address))
100
101 (define (deliver-message address . message)
102   (let ((id (address-id address)))
103     (log-trace "DELIVERING to " id ": " message)
104     (let loop ((beh (hash-table-ref/default actor-table id #f)))
105       (if beh
106           (condition-case
107               (match (apply (beh-proc beh) (cons address message))
108                 ('done (hash-table-delete! actor-table id))
109                 ('sleep 'do-nothing)
110                 ('pass
111                  (log-trace "Passing to parent behaviour...")
112                  (loop (beh-parent beh)))
113                 ((? procedure? new-beh) (hash-table-set! actor-table id new-beh))
114                 (else
115                  (log-msg "Warning: behaviour of actor " id " returned invalid value.")))
116             (o (exn)
117              (log-msg "Warning: actor " id " crashed evaluating message " message)
118              (print-error-message o)))
119           (log-msg "Warning: DISCARDING message to unknown actor " id ": " message)))))
120
121 ;; Scheduler
122
123 (define local-queue-mutex (make-mutex "message queue"))
124 (define message-available-mutex (make-mutex "message available"))
125 (mutex-lock! message-available-mutex #f #f)
126 (define local-queue (make-fifo))
127
128 (define (send-message address . message)
129   (log-trace "SENDING to " address ": " message)
130   (apply (if (address-local? address)
131              send-local-message
132              send-network-message)
133          (cons address message)))
134
135 (define (send-local-message address . message)
136   (mutex-lock! local-queue-mutex)
137   (fifo-push local-queue (cons address message))
138   (mutex-unlock! message-available-mutex)
139   (mutex-unlock! local-queue-mutex))
140
141 (define (send-network-message address . message)
142   (let ((s (udp-open-socket))
143         (packet (with-output-to-string
144                   (lambda ()
145                     (write (cons address message))))))
146     (udp-bind! s #f 0)
147     (udp-connect! s
148                   (address-host address)
149                   (address-port address))
150     (udp-send s packet)
151     (udp-close-socket s)))
152
153 (define (send-message-later address time . message)
154   (thread-start!
155    (lambda ()
156      (thread-sleep! time)
157      (apply send-message (cons address message)))))
158
159 (define (next-local-message)
160   (let ((res #f))
161     (mutex-lock! message-available-mutex #f #f)
162     (mutex-lock! local-queue-mutex)
163     (set! res (fifo-pop local-queue))
164     (if (not (fifo-empty? local-queue))
165         (mutex-unlock! message-available-mutex))
166     (mutex-unlock! local-queue-mutex)
167     res))
168
169 (define (start-scheduler)
170   (let loop ()
171     (apply deliver-message (next-local-message))
172     (loop)))
173
174
175 ;; Network
176
177 (define (start-network-listener)
178   (thread-start!
179    (lambda ()
180      (let ((s (udp-open-socket*)))
181        (udp-bind! s #f sam-port)
182        (let loop ()
183          (let-values (((n str) (udp-recv s 65536)))
184            (match (with-input-from-string str read)
185              ((address message ...)
186               (apply send-message (cons address message)))
187              (else
188               (log-msg "Warning: received badly formatted message string '" str "'"))))
189          (loop))))))
190
191 ;; System interface
192
193 (define reader-queue-mutex (make-mutex "reader queue"))
194 (define reader-available-mutex (make-mutex "reader available"))
195 (mutex-lock! reader-available-mutex #f #f)
196 (define reader-queue (make-fifo))
197
198 (define (next-reader)
199   (let ((res #f))
200     (mutex-lock! reader-available-mutex #f #f)
201     (mutex-lock! reader-queue-mutex)
202     (set! res (fifo-pop reader-queue))
203     (if (not (fifo-empty? reader-queue))
204         (mutex-unlock! reader-available-mutex))
205     (mutex-unlock! reader-queue-mutex)
206     res))
207
208 (define (start-console)
209   (thread-start!
210    (lambda ()
211      (let loop ()
212        (let ((reader (next-reader)))
213          (##sys#thread-block-for-i/o! (current-thread) 0 #t)
214          (thread-yield!)
215          (send-message reader (read-line)))
216        (loop)))))
217
218 ;; System initialization
219
220 (define system-beh
221   (make-beh (self)
222             (('shutdown) =>
223              (log-msg "System actor received shutdown message.")
224              (exit 0)
225              'done)
226
227             (('print strings ...) =>
228              (apply print strings)
229              'sleep)
230
231             (('read reader) =>
232              (mutex-lock! reader-queue-mutex)
233              (fifo-push reader-queue reader)
234              (mutex-unlock! reader-available-mutex)
235              (mutex-unlock! reader-queue-mutex)
236              'sleep)))
237
238 (define (boot-sam)
239   (start-console)
240   (start-network-listener)
241   (let ((system (make-actor system-beh))
242         (main #f))
243     (condition-case
244         (begin
245           (set! main (make-actor main-beh)))
246       ((exn)
247        (log-msg "Error starting main actor. Is main-beh defined?")
248        (exit 1)))
249     (send-message main system))
250   (start-scheduler))
251
252 (define (print-usage)
253   (print "Simple Actor Machine v" sam-version "\n"
254          "\n"
255          "Usage: sam -h|--help\n"
256          "       sam [-n hostname] [-p port] source-file-1 [source-file-2 [...]] "))
257
258 (let loop ((args (cdr (argv))))
259   (match args
260     (((or "-h" "--help"))
261      (print-usage))
262     (((or "-p" "--port") pstr rest ...)
263      (set! sam-port (string->number pstr))
264      (loop rest))
265     (((or "-n" "--hostname") hstr rest ...)
266      (set! sam-host hstr)
267      (loop rest))
268     (((or "-t" "--trace") rest ...)
269      (log-msg "Enabling trace debugging")
270      (set! trace #t)
271      (loop rest))
272     (((? file-exists? filename) rest ...)
273      (log-msg "Loading " filename)
274      (load filename)
275      (loop rest))
276     (()
277      (log-msg "Booting SAM\n")
278      (boot-sam))
279     (else
280      (print "Unrecognised argument '" (car args) "'.\n")
281      (print-usage))))
282