1 ;; Simple Actor Machine
3 ;; Houses a population of actors which can communicate using messages
4 ;; with actors on the same machine or other machines via the network.
18 (chicken process-context)
29 (define sam-host "localhost")
30 (define sam-port 8000)
32 (define (make-address host port id)
34 (make-uri #:scheme "actor"
37 #:path (list '/ id))))
39 (define (make-local-address . args)
40 (make-address sam-host
46 (define (address-id address)
47 (cadr (uri-path (uri-reference address))))
49 (define address->uri uri-reference)
51 (define (address-local? address)
52 (let ((uri (address->uri address)))
53 (and (equal? (uri-host uri) sam-host)
54 (equal? (uri-port uri) sam-port))))
56 (define actor-table (make-hash-table))
58 (define (make-actor beh)
59 (let* ((address (make-local-address))
60 (id (address-id address)))
61 (hash-table-set! actor-table id beh)
64 (define (deliver-message address . message)
65 (let ((id (address-id address)))
66 (let ((behaviour (hash-table-ref/default actor-table id '())))
68 (print "Warning: discarded message" message " to unknown actor " address)
69 (match (apply (hash-table-ref actor-table id) (cons address message))
70 ('done (hash-table-delete! actor-table id))
72 (new-beh (hash-table-set! actor-table id new-beh)))))))
76 (define local-queue-mutex (make-mutex "message queue"))
77 (define message-available-mutex (make-mutex "message available"))
78 (mutex-lock! message-available-mutex #f #f)
79 (define local-queue (make-fifo))
81 (define (send-message address . message)
82 (apply (if (address-local? address)
85 (cons address message)))
87 (define (send-local-message address . message)
88 (mutex-lock! local-queue-mutex)
89 (fifo-push local-queue (cons address message))
90 (mutex-unlock! message-available-mutex)
91 (mutex-unlock! local-queue-mutex))
93 (define (send-network-message address . message)
94 (let ((s (udp-open-socket))
95 (uri (address->uri address))
96 (packet (with-output-to-string
98 (write (cons address message))))))
104 (udp-close-socket s)))
106 (define (next-local-message)
108 (mutex-lock! message-available-mutex #f #f)
109 (mutex-lock! local-queue-mutex)
110 (set! res (fifo-pop local-queue))
111 (if (not (fifo-empty? local-queue))
112 (mutex-unlock! message-available-mutex))
113 (mutex-unlock! local-queue-mutex)
116 (define (start-scheduler)
120 (apply deliver-message (next-local-message))
126 (define (start-network-listener)
129 (let ((s (udp-open-socket*)))
130 (udp-bind! s #f sam-port)
132 (let-values (((n str) (udp-recv s 1024)))
133 (match (with-input-from-string str read)
134 ((address message ...)
135 (apply send-message (cons address message)))
137 (print "Warning: received badly formatted message string '" str "'"))))
142 (define reader-queue-mutex (make-mutex "reader queue"))
143 (define reader-available-mutex (make-mutex "reader available"))
144 (mutex-lock! reader-available-mutex #f #f)
145 (define reader-queue (make-fifo))
147 (define (next-reader)
149 (mutex-lock! reader-available-mutex #f #f)
150 (mutex-lock! reader-queue-mutex)
151 (set! res (fifo-pop reader-queue))
152 (if (not (fifo-empty? reader-queue))
153 (mutex-unlock! reader-available-mutex))
154 (mutex-unlock! reader-queue-mutex)
157 (define (start-console)
159 (let ((reader (next-reader)))
160 (##sys#thread-block-for-i/o! (current-thread) 0 #t)
162 (send-message reader (read-line)))
165 ;; System initialization
167 (define (make-system-actor)
168 (make-actor (lambda (self . message)
172 (print "## System actor received shutdown message.")
176 (('print strings ...)
177 (apply print strings)
181 (mutex-lock! reader-queue-mutex)
182 (fifo-push reader-queue reader)
183 (mutex-unlock! reader-available-mutex)
184 (mutex-unlock! reader-queue-mutex)
189 (define (init-sam host port)
192 (set! system (make-system-actor))
194 (start-network-listener)))