93906b958bd0472bfe4ff5583acdc7266ab8cca9
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; A virtual machine which houses a population of actors which can
4 ;; communicate using messages with actors on the same machine or other
5 ;; machines via the network.
6
7 (import scheme
8         (chicken base)
9         (chicken io)
10         (chicken string)
11         (chicken port)
12         (chicken process-context)
13         (chicken file)
14         matchable
15         srfi-18 ; threads
16         srfi-69 ; hash-table
17         uuid ; ids for actors
18         uri-generic
19         udp
20         fifo)
21
22 ;; Global variables
23
24 (define trace #f)
25
26 (define sam-host "localhost")
27 (define sam-port 8000)
28
29 (define sam-version "0.1")
30
31 ;; Actors
32
33
34 (define (make-address host port id)
35   (list id host port))
36
37 (define (make-local-address . args)
38   (make-address sam-host
39                 sam-port
40                 (if (null? args)
41                     (uuid)
42                     (car args))))
43
44 (define (address-id address)
45   (car address))
46 (define (address-host address)
47   (cadr address))
48 (define (address-port address)
49   (caddr address))
50 (define (address->string address)
51   (uri->string
52    (make-uri #:scheme "actor"
53              #:host (address-host address)
54              #:port (address-port address)
55              #:path (list '/ (address-id address)))))
56 (define (string->address str)
57   (let ((uri (uri-reference str)))
58     (make-address (uri-host uri)
59                   (uri-port uri)
60                   (cadr (uri-path uri)))))
61
62 (define (address-local? address)
63   (and (equal? (address-host address) sam-host)
64        (equal? (address-port address) sam-port)))
65
66 (define actor-table (make-hash-table))
67
68 (define (make-actor beh)
69   (let* ((address (make-local-address))
70          (id (address-id address)))
71     (hash-table-set! actor-table id beh)
72     address))
73
74 (define (deliver-message address . message)
75   (if trace (print "Delivering to " address ": " message))
76   (let ((id (address-id address)))
77     (let ((behaviour (hash-table-ref/default actor-table id '())))
78       (if (null? behaviour)
79           (print "Warning: discarded message " message
80                  " to unknown actor id " id)
81           (match (apply (hash-table-ref actor-table id) (cons address message))
82             ('done (hash-table-delete! actor-table id))
83             ('sleep 'do-nothing)
84             (new-beh (hash-table-set! actor-table id new-beh)))))))
85
86 ;; Scheduler
87
88 (define local-queue-mutex (make-mutex "message queue"))
89 (define message-available-mutex (make-mutex "message available"))
90 (mutex-lock! message-available-mutex #f #f)
91 (define local-queue (make-fifo))
92
93 (define (send-message address . message)
94   (apply (if (address-local? address)
95              send-local-message
96              send-network-message)
97          (cons address message)))
98
99 (define (send-local-message address . message)
100   (mutex-lock! local-queue-mutex)
101   (fifo-push local-queue (cons address message))
102   (mutex-unlock! message-available-mutex)
103   (mutex-unlock! local-queue-mutex))
104
105 (define (send-network-message address . message)
106   (let ((s (udp-open-socket))
107         (packet (with-output-to-string
108                   (lambda ()
109                     (write (cons address message))))))
110     (udp-bind! s #f 0)
111     (udp-connect! s
112                   (address-host address)
113                   (address-port address))
114     (udp-send s packet)
115     (udp-close-socket s)))
116
117 (define (send-message-later address time . message)
118   (thread-start!
119    (lambda ()
120      (thread-sleep! time)
121      (apply send-message (cons address message)))))
122
123 (define (next-local-message)
124   (let ((res #f))
125     (mutex-lock! message-available-mutex #f #f)
126     (mutex-lock! local-queue-mutex)
127     (set! res (fifo-pop local-queue))
128     (if (not (fifo-empty? local-queue))
129         (mutex-unlock! message-available-mutex))
130     (mutex-unlock! local-queue-mutex)
131     res))
132
133 (define (start-scheduler)
134   (let loop ()
135     (apply deliver-message (next-local-message))
136     (loop)))
137
138
139 ;; Network
140
141 (define (start-network-listener)
142   (thread-start!
143    (lambda ()
144      (let ((s (udp-open-socket*)))
145        (udp-bind! s #f sam-port)
146        (let loop ()
147          (let-values (((n str) (udp-recv s 65536)))
148            (match (with-input-from-string str read)
149              ((address message ...)
150               (apply send-message (cons address message)))
151              (else
152               (print "Warning: received badly formatted message string '" str "'"))))
153          (loop))))))
154
155 ;; System interface
156
157 (define reader-queue-mutex (make-mutex "reader queue"))
158 (define reader-available-mutex (make-mutex "reader available"))
159 (mutex-lock! reader-available-mutex #f #f)
160 (define reader-queue (make-fifo))
161
162 (define (next-reader)
163   (let ((res #f))
164     (mutex-lock! reader-available-mutex #f #f)
165     (mutex-lock! reader-queue-mutex)
166     (set! res (fifo-pop reader-queue))
167     (if (not (fifo-empty? reader-queue))
168         (mutex-unlock! reader-available-mutex))
169     (mutex-unlock! reader-queue-mutex)
170     res))
171
172 (define (start-console)
173   (thread-start!
174    (lambda ()
175      (let loop ()
176        (let ((reader (next-reader)))
177          (##sys#thread-block-for-i/o! (current-thread) 0 #t)
178          (thread-yield!)
179          (send-message reader (read-line)))
180        (loop)))))
181
182 ;; System initialization
183
184 (define (system-beh self . message)
185   (match message
186
187     (('shutdown)
188      (print "## System actor received shutdown message.")
189      (exit 0)
190      'done)
191
192     (('print strings ...)
193      (apply print strings)
194      'sleep)
195
196     (('read reader)
197      (mutex-lock! reader-queue-mutex)
198      (fifo-push reader-queue reader)
199      (mutex-unlock! reader-available-mutex)
200      (mutex-unlock! reader-queue-mutex)
201      'sleep)))
202
203 (define (boot-sam)
204   (start-console)
205   (start-network-listener)
206   (send-message (make-actor main-beh) (make-actor system-beh))
207   (start-scheduler))
208
209 (define (print-usage)
210   (print "Simple Actor Machine v" sam-version "\n"
211          "\n"
212          "Usage: sam -h|--help\n"
213          "       sam [-n hostname] [-p port] source-file-1 [source-file-2 [...]] "))
214
215
216 (let loop ((args (cdr (argv))))
217   (match args
218     (((or "-h" "--help"))
219      (print-usage))
220     (((or "-p" "--port") pstr rest ...)
221      (set! sam-port (string->number pstr))
222      (loop rest))
223     (((or "-n" "--hostname") hstr rest ...)
224      (set! sam-host hstr)
225      (loop rest)
226     (((? file-exists? filename) rest ...))
227      (print* "Loading " filename "...")
228      (load filename)
229      (print " done.")
230      (loop rest))
231     (()
232      (boot-sam host port main-beh))
233     (else
234      (print "Unrecognised argument '" (car args) "'.\n")
235      (print-usage))))
236