Minor fix to send-network-message.
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; Houses a population of actors which can communicate using messages
4 ;; with actors on the same machine or other machines via the network.
5
6 (import (chicken io)
7         (chicken string)
8         (chicken port)
9         matchable
10         srfi-18 ; threads
11         srfi-69 ; hash-table
12         uuid ; ids for actors
13         uri-generic
14         udp
15         fifo)
16
17 ;; Actors
18
19 (define this-host "localhost")
20 (define this-port 8000)
21
22 (define (make-address host port id)
23   (make-uri #:scheme "actor"
24             #:host host
25             #:port port
26             #:path (list '/ id)))
27
28 (define address->string uri->string)
29 (define string->address uri-reference)
30
31 (define (make-local-address . args)
32   (make-address this-host
33                 this-port
34                 (if (null? args)
35                     (uuid)
36                     (car args))))
37                 
38 (define (address-id address) (cadr (uri-path address)))
39
40 (define (address-local? address)
41   (and (equal? (uri-host address) this-host)
42        (equal? (uri-port address) this-port)))
43
44 (define actor-table (make-hash-table))
45
46 (define (make-actor beh)
47   (let* ((address (make-local-address))
48          (id (address-id address)))
49     (hash-table-set! actor-table id beh)
50     address))
51   
52 (define (deliver-message address . message)
53   (let ((id (address-id address)))
54     (let ((behaviour (hash-table-ref/default actor-table id '())))
55       (if (null? behaviour)
56           (print "Warning: discarded message" message " to unknown actor " address)
57           (match (apply (hash-table-ref actor-table id) (cons address message))
58             ('done (hash-table-delete! actor-table id))
59             ('sleep 'do-nothing)
60             (new-beh (hash-table-set! actor-table id new-beh)))))))
61
62 ;; Scheduler
63
64 (define local-queue-mutex (make-mutex "message queue"))
65 (define message-available-mutex (make-mutex "message available"))
66 (mutex-lock! message-available-mutex #f #f)
67 (define local-queue (make-fifo))
68
69 (define (send-message address . message)
70   (apply (if (address-local? address)
71              send-local-message
72              send-network-message)
73          (cons address message)))
74
75 (define (send-local-message address . message)
76   (mutex-lock! local-queue-mutex)
77   (fifo-push local-queue (cons address message))
78   (mutex-unlock! message-available-mutex)
79   (mutex-unlock! local-queue-mutex))
80
81 (define (send-network-message address . message)
82   (let ((s (udp-open-socket))
83         (machine (address-machine address))
84         (packet (with-output-to-string)
85                (lambda ()
86                  (print (cons (address->string address) message)))))
87     (udp-bind! s #f 0)
88     (udp-connect! s
89                   (machine-host machine)
90                   (machine-port machine))
91     (udp-send s packet)
92     (udp-close-socket s)))
93
94 (define (next-local-message)
95   (let ((res #f))
96     (mutex-lock! message-available-mutex #f #f)
97     (mutex-lock! local-queue-mutex)
98     (set! res (fifo-pop local-queue))
99     (if (not (fifo-empty? local-queue))
100       (mutex-unlock! message-available-mutex))
101     (mutex-unlock! local-queue-mutex)
102     res))
103
104 (define (start-scheduler)
105   (thread-start!
106    (lambda ()
107      (let loop ()
108        (apply deliver-message (next-local-message))
109        (loop)))))
110
111 ;; Console
112
113 (define reader-queue-mutex (make-mutex "reader queue"))
114 (define reader-available-mutex (make-mutex "reader available"))
115 (mutex-lock! reader-available-mutex #f #f)
116 (define reader-queue (make-fifo))
117
118 (define console
119   (make-actor (lambda (self . message)
120                 (mutex-lock! reader-queue-mutex)
121                 (fifo-push reader-queue (car message))
122                 (mutex-unlock! reader-available-mutex)
123                 (mutex-unlock! reader-queue-mutex)
124                 'sleep)))
125
126 (define (next-reader)
127   (let ((res #f))
128     (mutex-lock! reader-available-mutex #f #f)
129     (mutex-lock! reader-queue-mutex)
130     (set! res (fifo-pop reader-queue))
131     (if (not (fifo-empty? reader-queue))
132         (mutex-unlock! reader-available-mutex))
133     (mutex-unlock! reader-queue-mutex)
134     res))
135
136 (define (start-console)
137   (let loop ()
138     (let ((reader (next-reader)))
139       (##sys#thread-block-for-i/o! (current-thread) 0 #t)
140       (thread-yield!)
141       (send-message reader (read-line)))
142     (loop)))
143
144
145 ;; Network
146
147 (define (start-network-listener)
148   (thread-start!
149    (lambda ()
150      (let ((s (udp-open-socket*)))
151        (udp-bind! s #f this-port)
152        (let loop ()
153          (let-values (((n str) (udp-recv s 1024)))
154            (print "Received " n " bytes over network: " str)
155            (match (with-input-from-string str read)
156              ((addr-str message ...)
157               (apply send-message (cons (string->address addr-str) message)))
158              (else
159               (print "Warning: received badly formatted message string '" str "'"))))
160          (loop))))))
161
162 ;; System interface
163
164 (define system
165   (make-actor (lambda (self . message)
166                 (match message
167                   (('shutdown)
168                    (print "## System actor received shutdown message.")
169                    (exit 0)
170                    'done)
171                   (('println strings ...)
172                    (apply print strings)
173                    'sleep)))))
174
175 ;; Testing
176
177
178 (send-message system 'println "Hello, what is your name?")
179 (send-message console
180               (make-actor (lambda (self . message)
181                             (match message
182                               ((name)
183                                (send-message system 'println "Hello, " name "!")
184                                'done)))))
185
186 (thread-start!
187  (lambda ()
188    (thread-sleep! 120)
189    (send-message system 'shutdown)))
190
191 (print (uri->string system))
192
193 (start-scheduler)
194 (start-network-listener)
195 (start-console)
196
197 (define (boot-sam host port)
198   (start-scheduler))
199
200 ;; (thread-join! scheduler-thread)
201
202 (define (main)
203   (let loop ((args (cdr (argv)))
204              (host "localhost")
205              (port 8000))
206     (match args
207       ((or "-h" "--help")
208        (print-usage))
209       (((or "-p" "--port") pstr rest ...)
210        (loop rest host (string->number pstr)))
211       (("--hostname" hstr rest ...)
212        (loop rest hstr port))
213       (()
214        (boot-sam host port)))))