Debugging scheduler.
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; Houses a population of actors which can communicate using messages
4 ;; with actors on the same machine or other machines via the network.
5
6 (import (chicken io)
7         (chicken string)
8         matchable
9         srfi-18 ; threads
10         srfi-69 ; hash-table
11         udp
12         fifo)
13
14 ;; Actors
15
16 (define (make-machine host port)
17   (cons host port))
18 (define (machine-host m) (car m))
19 (define (machine-port m) (cdr m))
20
21 (define this-machine (make-machine "localhost" 1234))
22
23 (define next-actor-id 1)
24
25 (define (address-id address) (car address))
26 (define (address-machine address) (cdr address))
27 (define (make-address id machine)
28   (cons id machine))
29
30 (define (address-local? address)
31   (equal? (address-machine address)
32           this-machine))
33
34 (define actor-table (make-hash-table))
35
36 (define (make-actor beh)
37   (let* ((id next-actor-id))
38     (hash-table-set! actor-table id beh)
39     (make-address id this-machine)))
40   
41 (define (deliver-message address . message)
42   (let ((id (address-id address)))
43     (let ((behaviour (hash-table-ref/default actor-table id '())))
44       (if (null? behaviour)
45           (print "Warning: discarded message" message " to unknown actor " address)
46           (match (apply (hash-table-ref actor-table id) (cons address message))
47             ('done (hash-table-delete! actor-table actor))
48             ('sleep 'do-nothing)
49             (new-beh (hash-table-set! actor-table actor new-beh)))))))
50
51 ;; Scheduler
52
53 (define local-queue-mutex (make-mutex "message queue"))
54 (define message-available-mutex (make-mutex "message available"))
55 (mutex-lock! message-available-mutex #f #f)
56 (define local-queue (make-fifo))
57
58 (define (send-message address . message)
59   (apply (if (address-local? address)
60              send-local-message
61              send-network-message)
62          message))
63
64 (define (send-local-message address . message)
65   (mutex-lock! local-queue-mutex)
66   (fifo-push local-queue (cons address message))
67   (mutex-unlock! message-available-mutex)
68   (mutex-unlock! local-queue-mutex))
69
70 (define (send-network-message address . message)
71   (let ((s (udp-open-socket))
72         (machine (address-machine address)))
73     (udp-bind! s #f 0)
74     (udp-connect! s
75                   (machine-host machine)
76                   (machine-port machine))
77     (udp-send s message)
78     (udp-close-socket s)))
79
80 (define (next-local-message)
81   (let ((res #f))
82     (mutex-lock! message-available-mutex #f #f)
83     (mutex-lock! local-queue-mutex)
84     (set! res (fifo-pop local-queue))
85     (if (not (fifo-empty? local-queue))
86       (mutex-unlock! message-available-mutex))
87     (mutex-unlock! local-queue-mutex)
88     res))
89
90 (define scheduler-thread
91   (make-thread
92    (lambda ()
93      (let loop ((next-addressed-message (next-local-message)))
94        (apply deliver-message next-addressed-message)
95        (loop (next-local-message))))))
96
97
98 ;; Testing
99
100 (thread-start! scheduler-thread)
101
102 (define println
103   (make-actor (lambda (self . message)
104                 (apply print message)
105                 'sleep)))
106
107 (print println)
108 (send-message println "Hello, world!")
109
110 (thread-join! scheduler-thread)