Now using URLs and UUIDs for actor addressing.
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; Houses a population of actors which can communicate using messages
4 ;; with actors on the same machine or other machines via the network.
5
6 (import (chicken io)
7         (chicken string)
8         matchable
9         srfi-18 ; threads
10         srfi-69 ; hash-table
11         uuid ; ids for actors
12         uri-generic
13         udp
14         fifo)
15
16 ;; Actors
17
18 (define this-host "localhost")
19 (define this-port 1234)
20
21 (define (make-address host port id)
22   (make-uri #:scheme "actor"
23             #:host host
24             #:port port
25             #:path (list '/ id)))
26
27 (define next-local-address
28   (let ((mutex (make-mutex "actor address mutex")))
29     (lambda ()
30       (let ((res #f))
31         (mutex-lock! mutex)
32         (set! res (make-address this-host this-port (uuid)))
33         (mutex-unlock! mutex)
34         res))))
35
36 (define (address-id address) (cadr (uri-path address)))
37
38 (define (address-local? address)
39   (and (equal? (uri-host address) this-host)
40        (equal? (uri-port address) this-port)))
41
42 (define actor-table (make-hash-table))
43
44 (define (make-actor beh)
45   (let* ((address (next-local-address))
46          (id (address-id address)))
47     (hash-table-set! actor-table id beh)
48     address))
49   
50 (define (deliver-message address . message)
51   (let ((id (address-id address)))
52     (let ((behaviour (hash-table-ref/default actor-table id '())))
53       (if (null? behaviour)
54           (print "Warning: discarded message" message " to unknown actor " address)
55           (match (apply (hash-table-ref actor-table id) (cons address message))
56             ('done (hash-table-delete! actor-table id))
57             ('sleep 'do-nothing)
58             (new-beh (hash-table-set! actor-table id new-beh)))))))
59
60 ;; Scheduler
61
62 (define local-queue-mutex (make-mutex "message queue"))
63 (define message-available-mutex (make-mutex "message available"))
64 (mutex-lock! message-available-mutex #f #f)
65 (define local-queue (make-fifo))
66
67 (define (send-message address . message)
68   (apply (if (address-local? address)
69              send-local-message
70              send-network-message)
71          (cons address message)))
72
73 (define (send-local-message address . message)
74   (mutex-lock! local-queue-mutex)
75   (fifo-push local-queue (cons address message))
76   (mutex-unlock! message-available-mutex)
77   (mutex-unlock! local-queue-mutex))
78
79 (define (send-network-message address . message)
80   (let ((s (udp-open-socket))
81         (machine (address-machine address)))
82     (udp-bind! s #f 0)
83     (udp-connect! s
84                   (machine-host machine)
85                   (machine-port machine))
86     (udp-send s message)
87     (udp-close-socket s)))
88
89 (define (next-local-message)
90   (let ((res #f))
91     (mutex-lock! message-available-mutex #f #f)
92     (mutex-lock! local-queue-mutex)
93     (set! res (fifo-pop local-queue))
94     (if (not (fifo-empty? local-queue))
95       (mutex-unlock! message-available-mutex))
96     (mutex-unlock! local-queue-mutex)
97     res))
98
99 (define (start-scheduler)
100   (thread-start!
101    (lambda ()
102      (let loop ()
103        (apply deliver-message (next-local-message))
104        (loop)))))
105
106 ;; Console
107
108 (define reader-queue-mutex (make-mutex "reader queue"))
109 (define reader-available-mutex (make-mutex "reader available"))
110 (mutex-lock! reader-available-mutex #f #f)
111 (define reader-queue (make-fifo))
112
113 (define console
114   (make-actor (lambda (self . message)
115                 (mutex-lock! reader-queue-mutex)
116                 (fifo-push reader-queue (car message))
117                 (mutex-unlock! reader-available-mutex)
118                 (mutex-unlock! reader-queue-mutex)
119                 'sleep)))
120
121 (define (next-reader)
122   (let ((res #f))
123     (mutex-lock! reader-available-mutex #f #f)
124     (mutex-lock! reader-queue-mutex)
125     (set! res (fifo-pop reader-queue))
126     (if (not (fifo-empty? reader-queue))
127         (mutex-unlock! reader-available-mutex))
128     (mutex-unlock! reader-queue-mutex)
129     res))
130
131 (define (start-console)
132   (let loop ()
133     (let ((reader (next-reader)))
134       (##sys#thread-block-for-i/o! (current-thread) 0 #t)
135       (thread-yield!)
136       (send-message reader (read-line)))
137     (loop)))
138
139
140 ;; System interface
141
142 (define system
143   (make-actor (lambda (self . message)
144                 (match message
145                   (('shutdown)
146                    (print "## System actor received shutdown message.")
147                    (exit 0)
148                    'done)
149                   (('println strings ...)
150                    (apply print strings)
151                    'sleep)))))
152
153 ;; Testing
154
155
156 (send-message system 'println "Hello, what is your name?")
157 (send-message console
158               (make-actor (lambda (self . message)
159                             (match message
160                               ((name)
161                                (send-message system 'println "Hello, " name "!")
162                                'done)))))
163
164 (thread-start!
165  (lambda ()
166    (thread-sleep! 10)
167    (send-message system 'shutdown)))
168
169 (start-scheduler)
170 (start-console)
171
172 ;; (thread-join! scheduler-thread)