Reorganized boot.
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; Houses a population of actors which can communicate using messages
4 ;; with actors on the same machine or other machines via the network.
5
6 (module sam
7     (boot-sam
8      make-actor
9      send-message
10      send-message-later)
11
12   (import scheme
13           (chicken base)
14           (chicken io)
15           (chicken string)
16           (chicken port)
17           (chicken process-context)
18           matchable
19           srfi-18 ; threads
20           srfi-69 ; hash-table
21           uuid ; ids for actors
22           uri-generic
23           udp
24           fifo)
25
26   (define trace #f)
27
28   ;; Actors
29
30   (define sam-host "localhost")
31   (define sam-port 8000)
32
33   (define (make-address host port id)
34     (uri->string
35      (make-uri #:scheme "actor"
36                #:host host
37                #:port port
38                #:path (list '/ id))))
39
40   (define (make-local-address . args)
41     (make-address sam-host
42                   sam-port
43                   (if (null? args)
44                       (uuid)
45                       (car args))))
46   
47   (define (address-id address)
48     (cadr (uri-path (uri-reference address))))
49
50   (define address->uri uri-reference)
51
52   (define (address-local? address)
53     (let ((uri (address->uri address)))
54       (and (equal? (uri-host uri) sam-host)
55            (equal? (uri-port uri) sam-port))))
56
57   (define actor-table (make-hash-table))
58
59   (define (make-actor beh)
60     (let* ((address (make-local-address))
61            (id (address-id address)))
62       (hash-table-set! actor-table id beh)
63       address))
64   
65   (define (deliver-message address . message)
66     (if trace (print "Delivering to " address ": " message))
67     (let ((id (address-id address)))
68       (let ((behaviour (hash-table-ref/default actor-table id '())))
69         (if (null? behaviour)
70             (print "Warning: discarded message" message " to unknown actor " address)
71             (match (apply (hash-table-ref actor-table id) (cons address message))
72               ('done (hash-table-delete! actor-table id))
73               ('sleep 'do-nothing)
74               (new-beh (hash-table-set! actor-table id new-beh)))))))
75
76   ;; Scheduler
77
78   (define local-queue-mutex (make-mutex "message queue"))
79   (define message-available-mutex (make-mutex "message available"))
80   (mutex-lock! message-available-mutex #f #f)
81   (define local-queue (make-fifo))
82
83   (define (send-message address . message)
84     (apply (if (address-local? address)
85                send-local-message
86                send-network-message)
87            (cons address message)))
88
89   (define (send-local-message address . message)
90     (mutex-lock! local-queue-mutex)
91     (fifo-push local-queue (cons address message))
92     (mutex-unlock! message-available-mutex)
93     (mutex-unlock! local-queue-mutex))
94
95   (define (send-network-message address . message)
96     (let ((s (udp-open-socket))
97           (uri (address->uri address))
98           (packet (with-output-to-string
99                     (lambda ()
100                       (write (cons address message))))))
101       (udp-bind! s #f 0)
102       (udp-connect! s
103                     (uri-host uri)
104                     (uri-port uri))
105       (udp-send s packet)
106       (udp-close-socket s)))
107
108   (define (send-message-later address time . message)
109     (thread-start!
110      (lambda ()
111        (thread-sleep! time)
112        (apply send-message (cons address message)))))
113
114   (define (next-local-message)
115     (let ((res #f))
116       (mutex-lock! message-available-mutex #f #f)
117       (mutex-lock! local-queue-mutex)
118       (set! res (fifo-pop local-queue))
119       (if (not (fifo-empty? local-queue))
120           (mutex-unlock! message-available-mutex))
121       (mutex-unlock! local-queue-mutex)
122       res))
123
124   (define (start-scheduler)
125     (let loop ()
126       (apply deliver-message (next-local-message))
127       (loop)))
128
129
130   ;; Network
131
132   (define (start-network-listener)
133     (thread-start!
134      (lambda ()
135        (let ((s (udp-open-socket*)))
136          (udp-bind! s #f sam-port)
137          (let loop ()
138            (let-values (((n str) (udp-recv s 1024)))
139              (match (with-input-from-string str read)
140                ((address message ...)
141                 (apply send-message (cons address message)))
142                (else
143                 (print "Warning: received badly formatted message string '" str "'"))))
144            (loop))))))
145
146   ;; System interface
147
148   (define reader-queue-mutex (make-mutex "reader queue"))
149   (define reader-available-mutex (make-mutex "reader available"))
150   (mutex-lock! reader-available-mutex #f #f)
151   (define reader-queue (make-fifo))
152
153   (define (next-reader)
154     (let ((res #f))
155       (mutex-lock! reader-available-mutex #f #f)
156       (mutex-lock! reader-queue-mutex)
157       (set! res (fifo-pop reader-queue))
158       (if (not (fifo-empty? reader-queue))
159           (mutex-unlock! reader-available-mutex))
160       (mutex-unlock! reader-queue-mutex)
161       res))
162
163   (define (start-console)
164     (thread-start!
165      (lambda ()
166        (let loop ()
167          (let ((reader (next-reader)))
168            (##sys#thread-block-for-i/o! (current-thread) 0 #t)
169            (thread-yield!)
170            (send-message reader (read-line)))
171          (loop)))))
172
173   ;; System initialization
174
175   (define (system-beh self . message)
176     (match message
177
178       (('shutdown)
179        (print "## System actor received shutdown message.")
180        (exit 0)
181        'done)
182
183       (('print strings ...)
184        (apply print strings)
185        'sleep)
186
187       (('read reader)
188        (mutex-lock! reader-queue-mutex)
189        (fifo-push reader-queue reader)
190        (mutex-unlock! reader-available-mutex)
191        (mutex-unlock! reader-queue-mutex)
192        'sleep)))
193
194   (define (boot-sam host port main-beh)
195     (set! sam-host host)
196     (set! sam-port port)
197     (start-console)
198     (start-network-listener)
199     (send-message (make-actor main-beh) (make-actor system-beh))
200     (start-scheduler)))