Updated examples to be compatible with new architecture.
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; A virtual machine which houses a population of actors which can
4 ;; communicate using messages with actors on the same machine or other
5 ;; machines via the network.
6
7 (import scheme
8         (chicken base)
9         (chicken io)
10         (chicken string)
11         (chicken port)
12         (chicken process-context)
13         (chicken file)
14         (chicken condition)
15         matchable
16         srfi-18 ; threads
17         srfi-69 ; hash-table
18         uuid ; ids for actors
19         uri-generic
20         udp
21         fifo)
22
23 ;; Global variables
24
25 (define trace #f)
26
27 (define sam-host "localhost")
28 (define sam-port 8000)
29
30 (define sam-version "0.1")
31
32 ;; Actors
33
34
35 (define (make-address host port id)
36   (list id host port))
37
38 (define (make-local-address . args)
39   (make-address sam-host
40                 sam-port
41                 (if (null? args)
42                     (uuid)
43                     (car args))))
44
45 (define (address-id address)
46   (car address))
47 (define (address-host address)
48   (cadr address))
49 (define (address-port address)
50   (caddr address))
51 (define (address->string address)
52   (uri->string
53    (make-uri #:scheme "actor"
54              #:host (address-host address)
55              #:port (address-port address)
56              #:path (list '/ (address-id address)))))
57 (define (string->address str)
58   (let ((uri (uri-reference str)))
59     (make-address (uri-host uri)
60                   (uri-port uri)
61                   (cadr (uri-path uri)))))
62
63 (define (address-local? address)
64   (and (equal? (address-host address) sam-host)
65        (equal? (address-port address) sam-port)))
66
67 (define actor-table (make-hash-table))
68
69 (define (make-actor beh)
70   (let* ((address (make-local-address))
71          (id (address-id address)))
72     (hash-table-set! actor-table id beh)
73     address))
74
75 (define (deliver-message address . message)
76   (if trace (print "Delivering to " address ": " message))
77   (let ((id (address-id address)))
78     (let ((behaviour (hash-table-ref/default actor-table id '())))
79       (if (null? behaviour)
80           (print "Warning: discarded message " message
81                  " to unknown actor id " id)
82           (match (apply (hash-table-ref actor-table id) (cons address message))
83             ('done (hash-table-delete! actor-table id))
84             ('sleep 'do-nothing)
85             (new-beh (hash-table-set! actor-table id new-beh)))))))
86
87 ;; Scheduler
88
89 (define local-queue-mutex (make-mutex "message queue"))
90 (define message-available-mutex (make-mutex "message available"))
91 (mutex-lock! message-available-mutex #f #f)
92 (define local-queue (make-fifo))
93
94 (define (send-message address . message)
95   (apply (if (address-local? address)
96              send-local-message
97              send-network-message)
98          (cons address message)))
99
100 (define (send-local-message address . message)
101   (mutex-lock! local-queue-mutex)
102   (fifo-push local-queue (cons address message))
103   (mutex-unlock! message-available-mutex)
104   (mutex-unlock! local-queue-mutex))
105
106 (define (send-network-message address . message)
107   (let ((s (udp-open-socket))
108         (packet (with-output-to-string
109                   (lambda ()
110                     (write (cons address message))))))
111     (udp-bind! s #f 0)
112     (udp-connect! s
113                   (address-host address)
114                   (address-port address))
115     (udp-send s packet)
116     (udp-close-socket s)))
117
118 (define (send-message-later address time . message)
119   (thread-start!
120    (lambda ()
121      (thread-sleep! time)
122      (apply send-message (cons address message)))))
123
124 (define (next-local-message)
125   (let ((res #f))
126     (mutex-lock! message-available-mutex #f #f)
127     (mutex-lock! local-queue-mutex)
128     (set! res (fifo-pop local-queue))
129     (if (not (fifo-empty? local-queue))
130         (mutex-unlock! message-available-mutex))
131     (mutex-unlock! local-queue-mutex)
132     res))
133
134 (define (start-scheduler)
135   (let loop ()
136     (apply deliver-message (next-local-message))
137     (loop)))
138
139
140 ;; Network
141
142 (define (start-network-listener)
143   (thread-start!
144    (lambda ()
145      (let ((s (udp-open-socket*)))
146        (udp-bind! s #f sam-port)
147        (let loop ()
148          (let-values (((n str) (udp-recv s 65536)))
149            (match (with-input-from-string str read)
150              ((address message ...)
151               (apply send-message (cons address message)))
152              (else
153               (print "Warning: received badly formatted message string '" str "'"))))
154          (loop))))))
155
156 ;; System interface
157
158 (define reader-queue-mutex (make-mutex "reader queue"))
159 (define reader-available-mutex (make-mutex "reader available"))
160 (mutex-lock! reader-available-mutex #f #f)
161 (define reader-queue (make-fifo))
162
163 (define (next-reader)
164   (let ((res #f))
165     (mutex-lock! reader-available-mutex #f #f)
166     (mutex-lock! reader-queue-mutex)
167     (set! res (fifo-pop reader-queue))
168     (if (not (fifo-empty? reader-queue))
169         (mutex-unlock! reader-available-mutex))
170     (mutex-unlock! reader-queue-mutex)
171     res))
172
173 (define (start-console)
174   (thread-start!
175    (lambda ()
176      (let loop ()
177        (let ((reader (next-reader)))
178          (##sys#thread-block-for-i/o! (current-thread) 0 #t)
179          (thread-yield!)
180          (send-message reader (read-line)))
181        (loop)))))
182
183 ;; System initialization
184
185 (define (system-beh self . message)
186   (match message
187
188     (('shutdown)
189      (print "## System actor received shutdown message.")
190      (exit 0)
191      'done)
192
193     (('print strings ...)
194      (apply print strings)
195      'sleep)
196
197     (('read reader)
198      (mutex-lock! reader-queue-mutex)
199      (fifo-push reader-queue reader)
200      (mutex-unlock! reader-available-mutex)
201      (mutex-unlock! reader-queue-mutex)
202      'sleep)))
203
204 (define (boot-sam)
205   (start-console)
206   (start-network-listener)
207   (let ((system (make-actor system-beh))
208         (main #f))
209     (condition-case
210         (begin
211           (set! main (make-actor main-beh)))
212       ((exn)
213        (print "Error starting main actor. Is main-beh defined?")
214        (exit 1)))
215     (send-message main system))
216   (start-scheduler))
217
218 (define (print-usage)
219   (print "Simple Actor Machine v" sam-version "\n"
220          "\n"
221          "Usage: sam -h|--help\n"
222          "       sam [-n hostname] [-p port] source-file-1 [source-file-2 [...]] "))
223
224
225 (let loop ((args (cdr (argv))))
226   (match args
227     (((or "-h" "--help"))
228      (print-usage))
229     (((or "-p" "--port") pstr rest ...)
230      (set! sam-port (string->number pstr))
231      (loop rest))
232     (((or "-n" "--hostname") hstr rest ...)
233      (set! sam-host hstr)
234      (loop rest))
235     (((? file-exists? filename) rest ...)
236      (print* "Loading " filename "...")
237      (load filename)
238      (print " done.")
239      (loop rest))
240     (()
241      (boot-sam))
242     (else
243      (print "Unrecognised argument '" (car args) "'.\n")
244      (print-usage))))
245