17c6f41b101187c5eeb054b5c62c268442feec32
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; A virtual machine which houses a population of actors which can
4 ;; communicate using messages with actors on the same machine or other
5 ;; machines via the network.
6
7 (module sam
8     (boot-sam
9      make-actor
10      send-message
11      send-message-later
12      address->string
13      string->address)
14
15   (import scheme
16           (chicken base)
17           (chicken io)
18           (chicken string)
19           (chicken port)
20           (chicken process-context)
21           matchable
22           srfi-18 ; threads
23           srfi-69 ; hash-table
24           uuid ; ids for actors
25           udp
26           fifo)
27
28   (define trace #f)
29
30   ;; Actors
31
32   (define sam-host "localhost")
33   (define sam-port 8000)
34
35   (define (make-address host port id)
36     (list id host port))
37
38   (define (make-local-address . args)
39     (make-address sam-host
40                   sam-port
41                   (if (null? args)
42                       (uuid)
43                       (car args))))
44   
45   (define (address-id address)
46     (car address))
47   (define (address-host address)
48     (cadr address))
49   (define (address-port address)
50     (caddr address))
51   (define (address->string address)
52     (with-output-to-string
53       (lambda () (write address))))
54   (define (string->address str)
55     (with-input-from-string str read))
56
57   (define (address-local? address)
58     (and (equal? (address-host address) sam-host)
59          (equal? (address-port address) sam-port)))
60
61   (define actor-table (make-hash-table))
62
63   (define (make-actor beh)
64     (let* ((address (make-local-address))
65            (id (address-id address)))
66       (hash-table-set! actor-table id beh)
67       address))
68   
69   (define (deliver-message address . message)
70     (if trace (print "Delivering to " address ": " message))
71     (let ((id (address-id address)))
72       (let ((behaviour (hash-table-ref/default actor-table id '())))
73         (if (null? behaviour)
74             (print "Warning: discarded message" message " to unknown actor " address)
75             (match (apply (hash-table-ref actor-table id) (cons address message))
76               ('done (hash-table-delete! actor-table id))
77               ('sleep 'do-nothing)
78               (new-beh (hash-table-set! actor-table id new-beh)))))))
79
80   ;; Scheduler
81
82   (define local-queue-mutex (make-mutex "message queue"))
83   (define message-available-mutex (make-mutex "message available"))
84   (mutex-lock! message-available-mutex #f #f)
85   (define local-queue (make-fifo))
86
87   (define (send-message address . message)
88     (apply (if (address-local? address)
89                send-local-message
90                send-network-message)
91            (cons address message)))
92
93   (define (send-local-message address . message)
94     (mutex-lock! local-queue-mutex)
95     (fifo-push local-queue (cons address message))
96     (mutex-unlock! message-available-mutex)
97     (mutex-unlock! local-queue-mutex))
98
99   (define (send-network-message address . message)
100     (let ((s (udp-open-socket))
101           (packet (with-output-to-string
102                     (lambda ()
103                       (write (cons address message))))))
104       (udp-bind! s #f 0)
105       (udp-connect! s
106                     (address-host address)
107                     (address-port address))
108       (udp-send s packet)
109       (udp-close-socket s)))
110
111   (define (send-message-later address time . message)
112     (thread-start!
113      (lambda ()
114        (thread-sleep! time)
115        (apply send-message (cons address message)))))
116
117   (define (next-local-message)
118     (let ((res #f))
119       (mutex-lock! message-available-mutex #f #f)
120       (mutex-lock! local-queue-mutex)
121       (set! res (fifo-pop local-queue))
122       (if (not (fifo-empty? local-queue))
123           (mutex-unlock! message-available-mutex))
124       (mutex-unlock! local-queue-mutex)
125       res))
126
127   (define (start-scheduler)
128     (let loop ()
129       (apply deliver-message (next-local-message))
130       (loop)))
131
132
133   ;; Network
134
135   (define (start-network-listener)
136     (thread-start!
137      (lambda ()
138        (let ((s (udp-open-socket*)))
139          (udp-bind! s #f sam-port)
140          (let loop ()
141            (let-values (((n str) (udp-recv s 65536)))
142              (match (with-input-from-string str read)
143                ((address message ...)
144                 (apply send-message (cons address message)))
145                (else
146                 (print "Warning: received badly formatted message string '" str "'"))))
147            (loop))))))
148
149   ;; System interface
150
151   (define reader-queue-mutex (make-mutex "reader queue"))
152   (define reader-available-mutex (make-mutex "reader available"))
153   (mutex-lock! reader-available-mutex #f #f)
154   (define reader-queue (make-fifo))
155
156   (define (next-reader)
157     (let ((res #f))
158       (mutex-lock! reader-available-mutex #f #f)
159       (mutex-lock! reader-queue-mutex)
160       (set! res (fifo-pop reader-queue))
161       (if (not (fifo-empty? reader-queue))
162           (mutex-unlock! reader-available-mutex))
163       (mutex-unlock! reader-queue-mutex)
164       res))
165
166   (define (start-console)
167     (thread-start!
168      (lambda ()
169        (let loop ()
170          (let ((reader (next-reader)))
171            (##sys#thread-block-for-i/o! (current-thread) 0 #t)
172            (thread-yield!)
173            (send-message reader (read-line)))
174          (loop)))))
175
176   ;; System initialization
177
178   (define (system-beh self . message)
179     (match message
180
181       (('shutdown)
182        (print "## System actor received shutdown message.")
183        (exit 0)
184        'done)
185
186       (('print strings ...)
187        (apply print strings)
188        'sleep)
189
190       (('read reader)
191        (mutex-lock! reader-queue-mutex)
192        (fifo-push reader-queue reader)
193        (mutex-unlock! reader-available-mutex)
194        (mutex-unlock! reader-queue-mutex)
195        'sleep)))
196
197   (define (boot-sam host port main-beh)
198     (set! sam-host host)
199     (set! sam-port port)
200     (start-console)
201     (start-network-listener)
202     (send-message (make-actor main-beh) (make-actor system-beh))
203     (start-scheduler)))