656338df60930975ae05e64a567969a2105707b8
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; Houses a population of actors which can communicate using messages
4 ;; with actors on the same machine or other machines via the network.
5
6 (import (chicken io)
7         (chicken string)
8         matchable
9         srfi-18 ; threads
10         srfi-69 ; hash-table
11         udp
12         fifo)
13
14 ;; Actors
15
16 (define (make-machine host port)
17   (cons host port))
18 (define (machine-host m) (car m))
19 (define (machine-port m) (cdr m))
20
21 (define this-machine (make-machine "localhost" 1234))
22
23 (define next-actor-id
24   (let ((mutex (make-mutex "actor id mutex"))
25         (next-id 1))
26     (lambda ()
27       (let ((res #f))
28         (mutex-lock! mutex)
29         (set! res next-id)
30         (set! next-id (+ next-id 1))
31         (mutex-unlock! mutex)
32         res))))
33
34 (define (address-id address) (car address))
35 (define (address-machine address) (cdr address))
36 (define (make-address id machine)
37   (cons id machine))
38
39 (define (address-local? address)
40   (equal? (address-machine address)
41           this-machine))
42
43 (define actor-table (make-hash-table))
44
45 (define (make-actor beh)
46   (let* ((id (next-actor-id)))
47     (hash-table-set! actor-table id beh)
48     (make-address id this-machine)))
49   
50 (define (deliver-message address . message)
51   (let ((id (address-id address)))
52     (let ((behaviour (hash-table-ref/default actor-table id '())))
53       (if (null? behaviour)
54           (print "Warning: discarded message" message " to unknown actor " address)
55           (match (apply (hash-table-ref actor-table id) (cons address message))
56             ('done (hash-table-delete! actor-table id))
57             ('sleep 'do-nothing)
58             (new-beh (hash-table-set! actor-table id new-beh)))))))
59
60 ;; Scheduler
61
62 (define local-queue-mutex (make-mutex "message queue"))
63 (define message-available-mutex (make-mutex "message available"))
64 (mutex-lock! message-available-mutex #f #f)
65 (define local-queue (make-fifo))
66
67 (define (send-message address . message)
68   (apply (if (address-local? address)
69              send-local-message
70              send-network-message)
71          (cons address message)))
72
73 (define (send-local-message address . message)
74   (mutex-lock! local-queue-mutex)
75   (fifo-push local-queue (cons address message))
76   (mutex-unlock! message-available-mutex)
77   (mutex-unlock! local-queue-mutex))
78
79 (define (send-network-message address . message)
80   (let ((s (udp-open-socket))
81         (machine (address-machine address)))
82     (udp-bind! s #f 0)
83     (udp-connect! s
84                   (machine-host machine)
85                   (machine-port machine))
86     (udp-send s message)
87     (udp-close-socket s)))
88
89 (define (next-local-message)
90   (let ((res #f))
91     (mutex-lock! message-available-mutex #f #f)
92     (mutex-lock! local-queue-mutex)
93     (set! res (fifo-pop local-queue))
94     (if (not (fifo-empty? local-queue))
95       (mutex-unlock! message-available-mutex))
96     (mutex-unlock! local-queue-mutex)
97     res))
98
99 (define (start-scheduler)
100   (thread-start!
101    (lambda ()
102      (let loop ()
103        (apply deliver-message (next-local-message))
104        (loop)))))
105
106 ;; Console
107
108 (define reader-queue-mutex (make-mutex "reader queue"))
109 (define reader-available-mutex (make-mutex "reader available"))
110 (mutex-lock! reader-available-mutex #f #f)
111 (define reader-queue (make-fifo))
112
113 (define console
114   (make-actor (lambda (self . message)
115                 (mutex-lock! reader-queue-mutex)
116                 (fifo-push reader-queue (car message))
117                 (mutex-unlock! reader-available-mutex)
118                 (mutex-unlock! reader-queue-mutex)
119                 'sleep)))
120
121 (define (next-reader)
122   (let ((res #f))
123     (mutex-lock! reader-available-mutex #f #f)
124     (mutex-lock! reader-queue-mutex)
125     (set! res (fifo-pop reader-queue))
126     (if (not (fifo-empty? reader-queue))
127         (mutex-unlock! reader-available-mutex))
128     (mutex-unlock! reader-queue-mutex)
129     res))
130
131 (define (start-console)
132   (let loop ()
133     (let ((reader (next-reader)))
134       (##sys#thread-block-for-i/o! (current-thread) 0 #t)
135       (thread-yield!)
136       (send-message reader (read-line)))
137     (loop)))
138
139
140 ;; System interface
141
142 (define system
143   (make-actor (lambda (self . message)
144                 (match message
145                   (('shutdown)
146                    (print "## System actor received shutdown message.")
147                    (exit 0)
148                    'done)
149                   (('println strings ...)
150                    (apply print strings)
151                    'sleep)))))
152
153 ;; Testing
154
155
156 (send-message system 'println "Hello, what is your name?")
157 (send-message console
158               (make-actor (lambda (self . message)
159                             (match message
160                               ((name)
161                                (send-message system 'println "Hello, " name "!")
162                                'done)))))
163
164 (thread-start!
165  (lambda ()
166    (thread-sleep! 10)
167    (send-message system 'shutdown)))
168
169 (start-scheduler)
170 (start-console)
171
172 ;; (thread-join! scheduler-thread)