431cd1c41ce9b9cac0e68843cc39ab80ab6aa3c8
[sam.git] / sam.scm
1 ;; Simple Actor Machine
2 ;;
3 ;; Houses a population of actors which can communicate using messages
4 ;; with actors on the same machine or other machines via the network.
5
6 (import (chicken io)
7         (chicken string)
8         matchable
9         srfi-18 ; threads
10         srfi-69 ; hashtable
11         udp6
12         uri-generic)
13
14 ;; Actors
15
16 (define (make-machine host port)
17   (cons host port))
18 (define (machine-host m) (car m))
19 (define (machine-port m) (cdr m))
20
21 (define this-machine (make-machine "localhost" 1234))
22
23 (define next-actor-id 1)
24
25 (define (address-id address) (car address))
26 (define (address-machine address) (cdr address))
27
28 (define (address-local? address)
29   (equal? (address-machine address)
30           this-machine))
31
32 (define actor-table (make-hash-table))
33
34 (define (make-actor beh)
35   (let* ((id next-actor-id))
36     (hash-table-put! id beh)
37     (cons id this-machine)))
38   
39 (define (deliver-message address . message)
40   (let ((id (address-id address)))
41     (let ((behaviour (hash-table-ref/default actor-table id '()))))
42     (if (null? behaviour)
43         (print "Warning: discarded message" message " to unknown actor " address)
44         (match (apply (hash-table-ref actor-table id) (cons address message))
45           ('done (hash-table-delete! actor-table actor))
46           ('sleep 'do-nothing)
47           (new-beh (hash-table-put! actor new-beh))))))
48
49 ;; Scheduler
50
51 (define local-queue-mutex (make-mutex "message queue"))
52 (define message-available-mutex (make-mutex "message available"))
53 (define local-queue (make-fifo))
54
55 (define (send-message address . message)
56   (apply (if (address-local? address)
57              send-local-message
58              send-network-message)
59          message))
60
61 (define (send-local-message address . message)
62   (mutex-lock! local-queue-mutex)
63   (fifo-push local-queue (cons address message))
64   (mutex-unlock! local-queue-mutex))
65
66 (define (send-network-message address . message)
67   (let ((s (udp-open-socket))
68         (machine (address-machine address)))
69     (udp-bind! s #f 0)
70     (udp-connect! s
71                   (machine-host machine)
72                   (machine-port machine))
73     (udp-send s message)
74     (udp-close-socket s)))
75
76 (define (next-local-message)
77   (let ((res #f))
78     (mutex-lock! local-queue-mutex)
79     (set! res (if (fifo-empty? local-queue)
80                   #f
81                   (fifo-pop local-queue)))
82     (mutex-unlock! local-queue-mutex)
83     res))
84
85 (define scheduler-thread
86   (make-thread
87    (lambda ()
88      (let loop ((next-addressed-message (next-local-message)))
89        (if next-addressed-message
90            (apply deliver-message next-addressed-message)
91            (begin
92              (lo))))))
93
94   (thread-start!))