Working on scheduler.
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; Houses a population of actors which can communicate using messages
4 ;; with actors on the same machine or other machines via the network.
5
6 (import (chicken io)
7         (chicken string)
8         matchable
9         srfi-18 ; threads
10         srfi-69 ; hashtable
11         udp
12         fifo)
13
14 ;; Actors
15
16 (define (make-machine host port)
17   (cons host port))
18 (define (machine-host m) (car m))
19 (define (machine-port m) (cdr m))
20
21 (define this-machine (make-machine "localhost" 1234))
22
23 (define next-actor-id 1)
24
25 (define (address-id address) (car address))
26 (define (address-machine address) (cdr address))
27
28 (define (address-local? address)
29   (equal? (address-machine address)
30           this-machine))
31
32 (define actor-table (make-hash-table))
33
34 (define (make-actor beh)
35   (let* ((id next-actor-id))
36     (hash-table-put! id beh)
37     (cons id this-machine)))
38   
39 (define (deliver-message address . message)
40   (let ((id (address-id address)))
41     (let ((behaviour (hash-table-ref/default actor-table id '())))
42       (if (null? behaviour)
43           (print "Warning: discarded message" message " to unknown actor " address)
44           (match (apply (hash-table-ref actor-table id) (cons address message))
45             ('done (hash-table-delete! actor-table actor))
46             ('sleep 'do-nothing)
47             (new-beh (hash-table-put! actor new-beh)))))))
48
49 ;; Scheduler
50
51 (define local-queue-mutex (make-mutex "message queue"))
52 (define message-available-mutex (make-mutex "message available"))
53 (mutex-lock! message-available-mutex #f #f)
54 (define local-queue (make-fifo))
55
56 (define (send-message address . message)
57   (apply (if (address-local? address)
58              send-local-message
59              send-network-message)
60          message))
61
62 (define (send-local-message address . message)
63   (mutex-lock! local-queue-mutex)
64   (fifo-push local-queue (cons address message))
65   (mutex-unlock! message-available-mutex)
66   (mutex-unlock! local-queue-mutex))
67
68 (define (send-network-message address . message)
69   (let ((s (udp-open-socket))
70         (machine (address-machine address)))
71     (udp-bind! s #f 0)
72     (udp-connect! s
73                   (machine-host machine)
74                   (machine-port machine))
75     (udp-send s message)
76     (udp-close-socket s)))
77
78 (define (next-local-message)
79   (let ((res #f))
80     (mutex-lock! message-available-mutex)
81     (mutex-lock! local-queue-mutex)
82     (set! res (fifo-pop local-queue))
83     (mutex-unlock! message-available-mutex)
84     (mutex-unlock! local-queue-mutex)
85     res))
86
87 (define scheduler-thread
88   (make-thread
89    (lambda ()
90      (let loop ((next-addressed-message (next-local-message)))
91        (apply deliver-message next-addressed-message)
92        (loop (next-local-message))))))
93
94 (thread-start! scheduler-thread)
95
96 (thread-join! scheduler-thread)