c3dcbe6bb833d31dab70cc19cd9c0cb6ce12d3e2
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; Houses a population of actors which can communicate using messages
4 ;; with actors on the same machine or other machines via the network.
5
6 (module sam
7     (init-sam
8      start-console
9      send-message
10      make-actor
11      system)
12
13   (import scheme
14           (chicken base)
15           (chicken io)
16           (chicken string)
17           (chicken port)
18           (chicken process-context)
19           matchable
20           srfi-18 ; threads
21           srfi-69 ; hash-table
22           uuid ; ids for actors
23           uri-generic
24           udp
25           fifo)
26
27   ;; Actors
28
29   (define sam-host "localhost")
30   (define sam-port 8000)
31
32   (define (make-address host port id)
33     (uri->string
34      (make-uri #:scheme "actor"
35                #:host host
36                #:port port
37                #:path (list '/ id))))
38
39   (define (make-local-address . args)
40     (make-address sam-host
41                   sam-port
42                   (if (null? args)
43                       (uuid)
44                       (car args))))
45   
46   (define (address-id address)
47     (cadr (uri-path (uri-reference address))))
48
49   (define address->uri uri-reference)
50
51   (define (address-local? address)
52     (let ((uri (address->uri address)))
53       (and (equal? (uri-host uri) sam-host)
54            (equal? (uri-port uri) sam-port))))
55
56   (define actor-table (make-hash-table))
57
58   (define (make-actor beh)
59     (let* ((address (make-local-address))
60            (id (address-id address)))
61       (hash-table-set! actor-table id beh)
62       address))
63   
64   (define (deliver-message address . message)
65     (let ((id (address-id address)))
66       (let ((behaviour (hash-table-ref/default actor-table id '())))
67         (if (null? behaviour)
68             (print "Warning: discarded message" message " to unknown actor " address)
69             (match (apply (hash-table-ref actor-table id) (cons address message))
70               ('done (hash-table-delete! actor-table id))
71               ('sleep 'do-nothing)
72               (new-beh (hash-table-set! actor-table id new-beh)))))))
73
74   ;; Scheduler
75
76   (define local-queue-mutex (make-mutex "message queue"))
77   (define message-available-mutex (make-mutex "message available"))
78   (mutex-lock! message-available-mutex #f #f)
79   (define local-queue (make-fifo))
80
81   (define (send-message address . message)
82     (print "send-message: Sending " message " to " address)
83     (apply (if (address-local? address)
84                send-local-message
85                send-network-message)
86            (cons address message)))
87
88   (define (send-local-message address . message)
89     (mutex-lock! local-queue-mutex)
90     (fifo-push local-queue (cons address message))
91     (mutex-unlock! message-available-mutex)
92     (mutex-unlock! local-queue-mutex))
93
94   (define (send-network-message address . message)
95     (let ((s (udp-open-socket))
96           (uri (address->uri address))
97           (packet (with-output-to-string
98                     (lambda ()
99                       (print (cons address message))))))
100       (udp-bind! s #f 0)
101       (udp-connect! s
102                     (uri-host uri)
103                     (uri-port uri))
104       (udp-send s packet)
105       (udp-close-socket s)))
106
107   (define (next-local-message)
108     (let ((res #f))
109       (mutex-lock! message-available-mutex #f #f)
110       (mutex-lock! local-queue-mutex)
111       (set! res (fifo-pop local-queue))
112       (if (not (fifo-empty? local-queue))
113           (mutex-unlock! message-available-mutex))
114       (mutex-unlock! local-queue-mutex)
115       res))
116
117   (define (start-scheduler)
118     (thread-start!
119      (lambda ()
120        (let loop ()
121          (apply deliver-message (next-local-message))
122          (loop)))))
123
124
125   ;; Network
126
127   (define (start-network-listener)
128     (thread-start!
129      (lambda ()
130        (let ((s (udp-open-socket*)))
131          (udp-bind! s #f sam-port)
132          (let loop ()
133            (let-values (((n str) (udp-recv s 1024)))
134              (print "network-listener: Received " n " bytes over network: " str)
135              (match (with-input-from-string str read)
136                ((address message ...)
137                 (apply send-message (cons address message)))
138                (else
139                 (print "Warning: received badly formatted message string '" str "'"))))
140            (loop))))))
141
142   ;; System interface
143
144   (define reader-queue-mutex (make-mutex "reader queue"))
145   (define reader-available-mutex (make-mutex "reader available"))
146   (mutex-lock! reader-available-mutex #f #f)
147   (define reader-queue (make-fifo))
148
149   (define (next-reader)
150     (let ((res #f))
151       (mutex-lock! reader-available-mutex #f #f)
152       (mutex-lock! reader-queue-mutex)
153       (set! res (fifo-pop reader-queue))
154       (if (not (fifo-empty? reader-queue))
155           (mutex-unlock! reader-available-mutex))
156       (mutex-unlock! reader-queue-mutex)
157       res))
158
159   (define (start-console)
160     (let loop ()
161       (let ((reader (next-reader)))
162         (print "console: received next reader: " reader)
163         (##sys#thread-block-for-i/o! (current-thread) 0 #t)
164         (thread-yield!)
165         (send-message reader (read-line)))
166       (loop)))
167
168   ;; System initialization
169
170   (define (make-system-actor)
171     (make-actor (lambda (self . message)
172                   (match message
173
174                     (('shutdown)
175                      (print "## System actor received shutdown message.")
176                      (exit 0)
177                      'done)
178
179                     (('print strings ...)
180                      (apply print strings)
181                      'sleep)
182
183                     (('read reader)
184                      (mutex-lock! reader-queue-mutex)
185                      (fifo-push reader-queue reader)
186                      (mutex-unlock! reader-available-mutex)
187                      (mutex-unlock! reader-queue-mutex)
188                      'sleep)))))
189
190   (define system #f)
191
192   (define (init-sam host port)
193     (set! sam-host host)
194     (set! sam-port port)
195     (set! system (make-system-actor))
196     (start-scheduler)
197     (start-network-listener)))