;; Simple Actor Machine
;;
-;; Houses a population of actors which can communicate using messages
-;; with actors on the same machine or other machines via the network.
-
-(import (chicken io)
- (chicken string)
- matchable
- srfi-18 ; threads
- srfi-69 ; hash-table
- udp
- fifo)
-
-;; Actors
-
-(define (make-machine host port)
- (cons host port))
-(define (machine-host m) (car m))
-(define (machine-port m) (cdr m))
-
-(define this-machine (make-machine "localhost" 1234))
-
-(define next-actor-id
- (let ((mutex (make-mutex "actor id mutex"))
- (next-id 1))
- (lambda ()
- (let ((res #f))
- (mutex-lock! mutex)
- (set! res next-id)
- (set! next-id (+ next-id 1))
- (mutex-unlock! mutex)
- res))))
-
-(define (address-id address) (car address))
-(define (address-machine address) (cdr address))
-(define (make-address id machine)
- (cons id machine))
-
-(define (address-local? address)
- (equal? (address-machine address)
- this-machine))
-
-(define actor-table (make-hash-table))
-
-(define (make-actor beh)
- (let* ((id (next-actor-id)))
- (hash-table-set! actor-table id beh)
- (make-address id this-machine)))
+;; A virtual machine which houses a population of actors which can
+;; communicate using messages with actors on the same machine or other
+;; machines via the network.
+
+(module sam
+ (boot-sam
+ make-actor
+ send-message
+ send-message-later
+ address->string
+ string->address)
+
+ (import scheme
+ (chicken base)
+ (chicken io)
+ (chicken string)
+ (chicken port)
+ (chicken process-context)
+ matchable
+ srfi-18 ; threads
+ srfi-69 ; hash-table
+ uuid ; ids for actors
+ udp
+ fifo)
+
+ (define trace #f)
+
+ ;; Actors
+
+ (define sam-host "localhost")
+ (define sam-port 8000)
+
+ (define (make-address host port id)
+ (list id host port))
+
+ (define (make-local-address . args)
+ (make-address sam-host
+ sam-port
+ (if (null? args)
+ (uuid)
+ (car args))))
-(define (deliver-message address . message)
- (let ((id (address-id address)))
- (let ((behaviour (hash-table-ref/default actor-table id '())))
- (if (null? behaviour)
- (print "Warning: discarded message" message " to unknown actor " address)
- (match (apply (hash-table-ref actor-table id) (cons address message))
- ('done (hash-table-delete! actor-table id))
- ('sleep 'do-nothing)
- (new-beh (hash-table-set! actor-table id new-beh)))))))
-
-;; Scheduler
-
-(define local-queue-mutex (make-mutex "message queue"))
-(define message-available-mutex (make-mutex "message available"))
-(mutex-lock! message-available-mutex #f #f)
-(define local-queue (make-fifo))
-
-(define (send-message address . message)
- (apply (if (address-local? address)
- send-local-message
- send-network-message)
- (cons address message)))
-
-(define (send-local-message address . message)
- (mutex-lock! local-queue-mutex)
- (fifo-push local-queue (cons address message))
- (mutex-unlock! message-available-mutex)
- (mutex-unlock! local-queue-mutex))
-
-(define (send-network-message address . message)
- (let ((s (udp-open-socket))
- (machine (address-machine address)))
- (udp-bind! s #f 0)
- (udp-connect! s
- (machine-host machine)
- (machine-port machine))
- (udp-send s message)
- (udp-close-socket s)))
-
-(define (next-local-message)
- (let ((res #f))
- (mutex-lock! message-available-mutex #f #f)
+ (define (address-id address)
+ (car address))
+ (define (address-host address)
+ (cadr address))
+ (define (address-port address)
+ (caddr address))
+ (define (address->string address)
+ (with-output-to-string
+ (lambda () (write address))))
+ (define (string->address str)
+ (with-input-from-string str read))
+
+ (define (address-local? address)
+ (and (equal? (address-host address) sam-host)
+ (equal? (address-port address) sam-port)))
+
+ (define actor-table (make-hash-table))
+
+ (define (make-actor beh)
+ (let* ((address (make-local-address))
+ (id (address-id address)))
+ (hash-table-set! actor-table id beh)
+ address))
+
+ (define (deliver-message address . message)
+ (if trace (print "Delivering to " address ": " message))
+ (let ((id (address-id address)))
+ (let ((behaviour (hash-table-ref/default actor-table id '())))
+ (if (null? behaviour)
+ (print "Warning: discarded message" message " to unknown actor " address)
+ (match (apply (hash-table-ref actor-table id) (cons address message))
+ ('done (hash-table-delete! actor-table id))
+ ('sleep 'do-nothing)
+ (new-beh (hash-table-set! actor-table id new-beh)))))))
+
+ ;; Scheduler
+
+ (define local-queue-mutex (make-mutex "message queue"))
+ (define message-available-mutex (make-mutex "message available"))
+ (mutex-lock! message-available-mutex #f #f)
+ (define local-queue (make-fifo))
+
+ (define (send-message address . message)
+ (apply (if (address-local? address)
+ send-local-message
+ send-network-message)
+ (cons address message)))
+
+ (define (send-local-message address . message)
(mutex-lock! local-queue-mutex)
- (set! res (fifo-pop local-queue))
- (if (not (fifo-empty? local-queue))
- (mutex-unlock! message-available-mutex))
- (mutex-unlock! local-queue-mutex)
- res))
-
-(define (start-scheduler)
- (thread-start!
- (lambda ()
- (let loop ()
- (apply deliver-message (next-local-message))
- (loop)))))
-
-;; Console
-
-(define reader-queue-mutex (make-mutex "reader queue"))
-(define reader-available-mutex (make-mutex "reader available"))
-(mutex-lock! reader-available-mutex #f #f)
-(define reader-queue (make-fifo))
-
-(define console
- (make-actor (lambda (self . message)
- (mutex-lock! reader-queue-mutex)
- (fifo-push reader-queue (car message))
- (mutex-unlock! reader-available-mutex)
- (mutex-unlock! reader-queue-mutex)
- 'sleep)))
-
-(define (next-reader)
- (let ((res #f))
- (mutex-lock! reader-available-mutex #f #f)
- (mutex-lock! reader-queue-mutex)
- (set! res (fifo-pop reader-queue))
- (if (not (fifo-empty? reader-queue))
- (mutex-unlock! reader-available-mutex))
- (mutex-unlock! reader-queue-mutex)
- res))
-
-(define (start-console)
- (let loop ()
- (let ((reader (next-reader)))
- (##sys#thread-block-for-i/o! (current-thread) 0 #t)
- (thread-yield!)
- (send-message reader (read-line)))
- (loop)))
-
-
-;; System interface
-
-(define system
- (make-actor (lambda (self . message)
- (match message
- (('shutdown)
- (print "## System actor received shutdown message.")
- (exit 0)
- 'done)
- (('println strings ...)
- (apply print strings)
- 'sleep)))))
-
-;; Testing
-
-
-(send-message system 'println "Hello, what is your name?")
-(send-message console
- (make-actor (lambda (self . message)
- (match message
- ((name)
- (send-message system 'println "Hello, " name "!")
- 'done)))))
-
-(thread-start!
- (lambda ()
- (thread-sleep! 10)
- (send-message system 'shutdown)))
-
-(start-scheduler)
-(start-console)
-
-;; (thread-join! scheduler-thread)
+ (fifo-push local-queue (cons address message))
+ (mutex-unlock! message-available-mutex)
+ (mutex-unlock! local-queue-mutex))
+
+ (define (send-network-message address . message)
+ (let ((s (udp-open-socket))
+ (packet (with-output-to-string
+ (lambda ()
+ (write (cons address message))))))
+ (udp-bind! s #f 0)
+ (udp-connect! s
+ (address-host address)
+ (address-port address))
+ (udp-send s packet)
+ (udp-close-socket s)))
+
+ (define (send-message-later address time . message)
+ (thread-start!
+ (lambda ()
+ (thread-sleep! time)
+ (apply send-message (cons address message)))))
+
+ (define (next-local-message)
+ (let ((res #f))
+ (mutex-lock! message-available-mutex #f #f)
+ (mutex-lock! local-queue-mutex)
+ (set! res (fifo-pop local-queue))
+ (if (not (fifo-empty? local-queue))
+ (mutex-unlock! message-available-mutex))
+ (mutex-unlock! local-queue-mutex)
+ res))
+
+ (define (start-scheduler)
+ (let loop ()
+ (apply deliver-message (next-local-message))
+ (loop)))
+
+
+ ;; Network
+
+ (define (start-network-listener)
+ (thread-start!
+ (lambda ()
+ (let ((s (udp-open-socket*)))
+ (udp-bind! s #f sam-port)
+ (let loop ()
+ (let-values (((n str) (udp-recv s 65536)))
+ (match (with-input-from-string str read)
+ ((address message ...)
+ (apply send-message (cons address message)))
+ (else
+ (print "Warning: received badly formatted message string '" str "'"))))
+ (loop))))))
+
+ ;; System interface
+
+ (define reader-queue-mutex (make-mutex "reader queue"))
+ (define reader-available-mutex (make-mutex "reader available"))
+ (mutex-lock! reader-available-mutex #f #f)
+ (define reader-queue (make-fifo))
+
+ (define (next-reader)
+ (let ((res #f))
+ (mutex-lock! reader-available-mutex #f #f)
+ (mutex-lock! reader-queue-mutex)
+ (set! res (fifo-pop reader-queue))
+ (if (not (fifo-empty? reader-queue))
+ (mutex-unlock! reader-available-mutex))
+ (mutex-unlock! reader-queue-mutex)
+ res))
+
+ (define (start-console)
+ (thread-start!
+ (lambda ()
+ (let loop ()
+ (let ((reader (next-reader)))
+ (##sys#thread-block-for-i/o! (current-thread) 0 #t)
+ (thread-yield!)
+ (send-message reader (read-line)))
+ (loop)))))
+
+ ;; System initialization
+
+ (define (system-beh self . message)
+ (match message
+
+ (('shutdown)
+ (print "## System actor received shutdown message.")
+ (exit 0)
+ 'done)
+
+ (('print strings ...)
+ (apply print strings)
+ 'sleep)
+
+ (('read reader)
+ (mutex-lock! reader-queue-mutex)
+ (fifo-push reader-queue reader)
+ (mutex-unlock! reader-available-mutex)
+ (mutex-unlock! reader-queue-mutex)
+ 'sleep)))
+
+ (define (boot-sam host port main-beh)
+ (set! sam-host host)
+ (set! sam-port port)
+ (start-console)
+ (start-network-listener)
+ (send-message (make-actor main-beh) (make-actor system-beh))
+ (start-scheduler)))