P2P chat works.
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; Houses a population of actors which can communicate using messages
4 ;; with actors on the same machine or other machines via the network.
5
6 (module sam
7     (init-sam
8      start-console
9      send-message
10      make-actor
11      system)
12
13   (import scheme
14           (chicken base)
15           (chicken io)
16           (chicken string)
17           (chicken port)
18           (chicken process-context)
19           matchable
20           srfi-18 ; threads
21           srfi-69 ; hash-table
22           uuid ; ids for actors
23           uri-generic
24           udp
25           fifo)
26
27   ;; Actors
28
29   (define sam-host "localhost")
30   (define sam-port 8000)
31
32   (define (make-address host port id)
33     (uri->string
34      (make-uri #:scheme "actor"
35                #:host host
36                #:port port
37                #:path (list '/ id))))
38
39   (define (make-local-address . args)
40     (make-address sam-host
41                   sam-port
42                   (if (null? args)
43                       (uuid)
44                       (car args))))
45   
46   (define (address-id address)
47     (cadr (uri-path (uri-reference address))))
48
49   (define address->uri uri-reference)
50
51   (define (address-local? address)
52     (let ((uri (address->uri address)))
53       (and (equal? (uri-host uri) sam-host)
54            (equal? (uri-port uri) sam-port))))
55
56   (define actor-table (make-hash-table))
57
58   (define (make-actor beh)
59     (let* ((address (make-local-address))
60            (id (address-id address)))
61       (hash-table-set! actor-table id beh)
62       address))
63   
64   (define (deliver-message address . message)
65     (let ((id (address-id address)))
66       (let ((behaviour (hash-table-ref/default actor-table id '())))
67         (if (null? behaviour)
68             (print "Warning: discarded message" message " to unknown actor " address)
69             (match (apply (hash-table-ref actor-table id) (cons address message))
70               ('done (hash-table-delete! actor-table id))
71               ('sleep 'do-nothing)
72               (new-beh (hash-table-set! actor-table id new-beh)))))))
73
74   ;; Scheduler
75
76   (define local-queue-mutex (make-mutex "message queue"))
77   (define message-available-mutex (make-mutex "message available"))
78   (mutex-lock! message-available-mutex #f #f)
79   (define local-queue (make-fifo))
80
81   (define (send-message address . message)
82     (apply (if (address-local? address)
83                send-local-message
84                send-network-message)
85            (cons address message)))
86
87   (define (send-local-message address . message)
88     (mutex-lock! local-queue-mutex)
89     (fifo-push local-queue (cons address message))
90     (mutex-unlock! message-available-mutex)
91     (mutex-unlock! local-queue-mutex))
92
93   (define (send-network-message address . message)
94     (let ((s (udp-open-socket))
95           (uri (address->uri address))
96           (packet (with-output-to-string
97                     (lambda ()
98                       (write (cons address message))))))
99       (udp-bind! s #f 0)
100       (udp-connect! s
101                     (uri-host uri)
102                     (uri-port uri))
103       (udp-send s packet)
104       (udp-close-socket s)))
105
106   (define (next-local-message)
107     (let ((res #f))
108       (mutex-lock! message-available-mutex #f #f)
109       (mutex-lock! local-queue-mutex)
110       (set! res (fifo-pop local-queue))
111       (if (not (fifo-empty? local-queue))
112           (mutex-unlock! message-available-mutex))
113       (mutex-unlock! local-queue-mutex)
114       res))
115
116   (define (start-scheduler)
117     (thread-start!
118      (lambda ()
119        (let loop ()
120          (apply deliver-message (next-local-message))
121          (loop)))))
122
123
124   ;; Network
125
126   (define (start-network-listener)
127     (thread-start!
128      (lambda ()
129        (let ((s (udp-open-socket*)))
130          (udp-bind! s #f sam-port)
131          (let loop ()
132            (let-values (((n str) (udp-recv s 1024)))
133              (match (with-input-from-string str read)
134                ((address message ...)
135                 (apply send-message (cons address message)))
136                (else
137                 (print "Warning: received badly formatted message string '" str "'"))))
138            (loop))))))
139
140   ;; System interface
141
142   (define reader-queue-mutex (make-mutex "reader queue"))
143   (define reader-available-mutex (make-mutex "reader available"))
144   (mutex-lock! reader-available-mutex #f #f)
145   (define reader-queue (make-fifo))
146
147   (define (next-reader)
148     (let ((res #f))
149       (mutex-lock! reader-available-mutex #f #f)
150       (mutex-lock! reader-queue-mutex)
151       (set! res (fifo-pop reader-queue))
152       (if (not (fifo-empty? reader-queue))
153           (mutex-unlock! reader-available-mutex))
154       (mutex-unlock! reader-queue-mutex)
155       res))
156
157   (define (start-console)
158     (let loop ()
159       (let ((reader (next-reader)))
160         (##sys#thread-block-for-i/o! (current-thread) 0 #t)
161         (thread-yield!)
162         (send-message reader (read-line)))
163       (loop)))
164
165   ;; System initialization
166
167   (define (make-system-actor)
168     (make-actor (lambda (self . message)
169                   (match message
170
171                     (('shutdown)
172                      (print "## System actor received shutdown message.")
173                      (exit 0)
174                      'done)
175
176                     (('print strings ...)
177                      (apply print strings)
178                      'sleep)
179
180                     (('read reader)
181                      (mutex-lock! reader-queue-mutex)
182                      (fifo-push reader-queue reader)
183                      (mutex-unlock! reader-available-mutex)
184                      (mutex-unlock! reader-queue-mutex)
185                      'sleep)))))
186
187   (define system #f)
188
189   (define (init-sam host port)
190     (set! sam-host host)
191     (set! sam-port port)
192     (set! system (make-system-actor))
193     (start-scheduler)
194     (start-network-listener)))