1 ;; Simple Actor Machine
3 ;; Houses a population of actors which can communicate using messages
4 ;; with actors on the same machine or other machines via the network.
18 (chicken process-context)
29 (define sam-host "localhost")
30 (define sam-port 8000)
32 (define (make-address host port id)
34 (make-uri #:scheme "actor"
37 #:path (list '/ id))))
39 (define (make-local-address . args)
40 (make-address sam-host
46 (define (address-id address)
47 (cadr (uri-path (uri-reference address))))
49 (define address->uri uri-reference)
51 (define (address-local? address)
52 (let ((uri (address->uri address)))
53 (and (equal? (uri-host uri) sam-host)
54 (equal? (uri-port uri) sam-port))))
56 (define actor-table (make-hash-table))
58 (define (make-actor beh)
59 (let* ((address (make-local-address))
60 (id (address-id address)))
61 (hash-table-set! actor-table id beh)
64 (define (deliver-message address . message)
65 (let ((id (address-id address)))
66 (let ((behaviour (hash-table-ref/default actor-table id '())))
68 (print "Warning: discarded message" message " to unknown actor " address)
69 (match (apply (hash-table-ref actor-table id) (cons address message))
70 ('done (hash-table-delete! actor-table id))
72 (new-beh (hash-table-set! actor-table id new-beh)))))))
76 (define local-queue-mutex (make-mutex "message queue"))
77 (define message-available-mutex (make-mutex "message available"))
78 (mutex-lock! message-available-mutex #f #f)
79 (define local-queue (make-fifo))
81 (define (send-message address . message)
82 (print "send-message: Sending " message " to " address)
83 (apply (if (address-local? address)
86 (cons address message)))
88 (define (send-local-message address . message)
89 (mutex-lock! local-queue-mutex)
90 (fifo-push local-queue (cons address message))
91 (mutex-unlock! message-available-mutex)
92 (mutex-unlock! local-queue-mutex))
94 (define (send-network-message address . message)
95 (let ((s (udp-open-socket))
96 (uri (address->uri address))
97 (packet (with-output-to-string
99 (print (cons address message))))))
105 (udp-close-socket s)))
107 (define (next-local-message)
109 (mutex-lock! message-available-mutex #f #f)
110 (mutex-lock! local-queue-mutex)
111 (set! res (fifo-pop local-queue))
112 (if (not (fifo-empty? local-queue))
113 (mutex-unlock! message-available-mutex))
114 (mutex-unlock! local-queue-mutex)
117 (define (start-scheduler)
121 (apply deliver-message (next-local-message))
127 (define (start-network-listener)
130 (let ((s (udp-open-socket*)))
131 (udp-bind! s #f sam-port)
133 (let-values (((n str) (udp-recv s 1024)))
134 (print "network-listener: Received " n " bytes over network: " str)
135 (match (with-input-from-string str read)
136 ((address message ...)
137 (apply send-message (cons address message)))
139 (print "Warning: received badly formatted message string '" str "'"))))
144 (define reader-queue-mutex (make-mutex "reader queue"))
145 (define reader-available-mutex (make-mutex "reader available"))
146 (mutex-lock! reader-available-mutex #f #f)
147 (define reader-queue (make-fifo))
149 (define (next-reader)
151 (mutex-lock! reader-available-mutex #f #f)
152 (mutex-lock! reader-queue-mutex)
153 (set! res (fifo-pop reader-queue))
154 (if (not (fifo-empty? reader-queue))
155 (mutex-unlock! reader-available-mutex))
156 (mutex-unlock! reader-queue-mutex)
159 (define (start-console)
161 (let ((reader (next-reader)))
162 (print "console: received next reader: " reader)
163 (##sys#thread-block-for-i/o! (current-thread) 0 #t)
165 (send-message reader (read-line)))
168 ;; System initialization
170 (define (make-system-actor)
171 (make-actor (lambda (self . message)
175 (print "## System actor received shutdown message.")
179 (('print strings ...)
180 (apply print strings)
184 (mutex-lock! reader-queue-mutex)
185 (fifo-push reader-queue reader)
186 (mutex-unlock! reader-available-mutex)
187 (mutex-unlock! reader-queue-mutex)
192 (define (init-sam host port)
195 (set! system (make-system-actor))
197 (start-network-listener)))