1 ;; Simple Actor Machine
3 ;; Houses a population of actors which can communicate using messages
4 ;; with actors on the same machine or other machines via the network.
19 (define this-host "localhost")
20 (define this-port 8000)
22 (define (make-address host port id)
23 (make-uri #:scheme "actor"
28 (define address->string uri->string)
29 (define string->address uri-reference)
31 (define (make-local-address . args)
32 (make-address this-host
38 (define (address-id address) (cadr (uri-path address)))
40 (define (address-local? address)
41 (and (equal? (uri-host address) this-host)
42 (equal? (uri-port address) this-port)))
44 (define actor-table (make-hash-table))
46 (define (make-actor beh)
47 (let* ((address (make-local-address))
48 (id (address-id address)))
49 (hash-table-set! actor-table id beh)
52 (define (deliver-message address . message)
53 (let ((id (address-id address)))
54 (let ((behaviour (hash-table-ref/default actor-table id '())))
56 (print "Warning: discarded message" message " to unknown actor " address)
57 (match (apply (hash-table-ref actor-table id) (cons address message))
58 ('done (hash-table-delete! actor-table id))
60 (new-beh (hash-table-set! actor-table id new-beh)))))))
64 (define local-queue-mutex (make-mutex "message queue"))
65 (define message-available-mutex (make-mutex "message available"))
66 (mutex-lock! message-available-mutex #f #f)
67 (define local-queue (make-fifo))
69 (define (send-message address . message)
70 (apply (if (address-local? address)
73 (cons address message)))
75 (define (send-local-message address . message)
76 (mutex-lock! local-queue-mutex)
77 (fifo-push local-queue (cons address message))
78 (mutex-unlock! message-available-mutex)
79 (mutex-unlock! local-queue-mutex))
81 (define (send-network-message address . message)
82 (let ((s (udp-open-socket))
83 (machine (address-machine address))
84 (packet (with-output-to-string)
86 (print (cons (address->string address) message)))))
89 (machine-host machine)
90 (machine-port machine))
92 (udp-close-socket s)))
94 (define (next-local-message)
96 (mutex-lock! message-available-mutex #f #f)
97 (mutex-lock! local-queue-mutex)
98 (set! res (fifo-pop local-queue))
99 (if (not (fifo-empty? local-queue))
100 (mutex-unlock! message-available-mutex))
101 (mutex-unlock! local-queue-mutex)
104 (define (start-scheduler)
108 (apply deliver-message (next-local-message))
113 (define reader-queue-mutex (make-mutex "reader queue"))
114 (define reader-available-mutex (make-mutex "reader available"))
115 (mutex-lock! reader-available-mutex #f #f)
116 (define reader-queue (make-fifo))
119 (make-actor (lambda (self . message)
120 (mutex-lock! reader-queue-mutex)
121 (fifo-push reader-queue (car message))
122 (mutex-unlock! reader-available-mutex)
123 (mutex-unlock! reader-queue-mutex)
126 (define (next-reader)
128 (mutex-lock! reader-available-mutex #f #f)
129 (mutex-lock! reader-queue-mutex)
130 (set! res (fifo-pop reader-queue))
131 (if (not (fifo-empty? reader-queue))
132 (mutex-unlock! reader-available-mutex))
133 (mutex-unlock! reader-queue-mutex)
136 (define (start-console)
138 (let ((reader (next-reader)))
139 (##sys#thread-block-for-i/o! (current-thread) 0 #t)
141 (send-message reader (read-line)))
147 (define (start-network-listener)
150 (let ((s (udp-open-socket*)))
151 (udp-bind! s #f this-port)
153 (let-values (((n str) (udp-recv s 1024)))
154 (print "Received " n " bytes over network: " str)
155 (match (with-input-from-string str read)
156 ((addr-str message ...)
157 (apply send-message (cons (string->address addr-str) message)))
159 (print "Warning: received badly formatted message string '" str "'"))))
165 (make-actor (lambda (self . message)
168 (print "## System actor received shutdown message.")
171 (('println strings ...)
172 (apply print strings)
178 (send-message system 'println "Hello, what is your name?")
179 (send-message console
180 (make-actor (lambda (self . message)
183 (send-message system 'println "Hello, " name "!")
189 (send-message system 'shutdown)))
191 (print (uri->string system))
194 (start-network-listener)
197 (define (boot-sam host port)
200 ;; (thread-join! scheduler-thread)
203 (let loop ((args (cdr (argv)))
209 (((or "-p" "--port") pstr rest ...)
210 (loop rest host (string->number pstr)))
211 (("--hostname" hstr rest ...)
212 (loop rest hstr port))
214 (boot-sam host port)))))