;; Simple Actor Machine
;;
-;; Houses a population of actors which can communicate using messages
-;; with actors on the same machine or other machines via the network.
+;; A virtual machine which houses a population of actors which can
+;; communicate using messages with actors on the same host or other
+;; hosts via the network.
-(import (chicken io)
+(import scheme
+ (chicken base)
+ (chicken io)
(chicken string)
(chicken port)
+ (chicken process-context)
+ (chicken file)
+ (chicken condition)
matchable
srfi-18 ; threads
srfi-69 ; hash-table
uuid ; ids for actors
uri-generic
udp
- fifo)
+ fifo
+ sam-macros)
-;; Actors
+;; Global variables
-(define this-host "localhost")
-(define this-port 8000)
+(define trace #f)
-(define (make-address host port id)
- (make-uri #:scheme "actor"
- #:host host
- #:port port
- #:path (list '/ id)))
+(define sam-host "localhost")
+(define sam-port 8000)
+
+(define sam-version "0.1")
+
+;; Logging
+
+(define (log-msg . args)
+ (with-output-to-port (current-error-port)
+ (lambda ()
+ (apply print (cons "## " args)))))
+
+(define (log-trace . args)
+ (with-output-to-port (current-error-port)
+ (lambda ()
+ (if trace (apply log-msg args)))))
+
+(define (->stringrep arg)
+ (with-output-to-string
+ (lambda ()
+ (write arg))))
+
+;; Behaviours
+;; (See also macros defined in sam-macros.scm.)
-(define address->string uri->string)
-(define string->address uri-reference)
+(define (beh-proc beh)
+ (cadr beh))
+(define (beh-parent beh)
+ (caddr beh))
+
+(define root-beh
+ (make-beh : #f (self)
+ (('ping recipient) =>
+ (send-message recipient 'pong))))
+
+(define (beh? x)
+ (and (pair? x)
+ (not (null? x))
+ (eq? (car x) 'beh)))
+
+;; Actors
+
+(define (make-address host port id)
+ (list id host port))
(define (make-local-address . args)
- (make-address this-host
- this-port
+ (make-address sam-host
+ sam-port
(if (null? args)
(uuid)
(car args))))
-
-(define (address-id address) (cadr (uri-path address)))
+
+(define (address-id address)
+ (car address))
+(define (address-host address)
+ (cadr address))
+(define (address-port address)
+ (caddr address))
+(define (address->string address)
+ (uri->string
+ (make-uri #:scheme "actor"
+ #:host (address-host address)
+ #:port (address-port address)
+ #:path (list '/ (address-id address)))))
+(define (string->address str)
+ (let ((uri (uri-reference str)))
+ (make-address (uri-host uri)
+ (uri-port uri)
+ (cadr (uri-path uri)))))
(define (address-local? address)
- (and (equal? (uri-host address) this-host)
- (equal? (uri-port address) this-port)))
+ (and (equal? (address-host address) sam-host)
+ (equal? (address-port address) sam-port)))
(define actor-table (make-hash-table))
(id (address-id address)))
(hash-table-set! actor-table id beh)
address))
-
+
(define (deliver-message address . message)
(let ((id (address-id address)))
- (let ((behaviour (hash-table-ref/default actor-table id '())))
- (if (null? behaviour)
- (print "Warning: discarded message" message " to unknown actor " address)
- (match (apply (hash-table-ref actor-table id) (cons address message))
- ('done (hash-table-delete! actor-table id))
- ('sleep 'do-nothing)
- (new-beh (hash-table-set! actor-table id new-beh)))))))
+ (log-trace "DELIVERING to " id ": " (->stringrep message))
+ (let loop ((beh (hash-table-ref/default actor-table id #f)))
+ (if beh
+ (condition-case
+ (match (apply (beh-proc beh) (cons address message))
+ ('done (hash-table-delete! actor-table id))
+ ('pass
+ (log-trace "Passing to parent behaviour...")
+ (loop (beh-parent beh)))
+ ((? beh? new-beh) (hash-table-set! actor-table id new-beh))
+ (else
+ 'do-nothing)) ; sleep is now the default
+ (o (exn)
+ (log-msg "Warning: actor " id " crashed evaluating message " (->stringrep message))
+ (print-error-message o)))
+ (log-msg "Warning: DISCARDING message to unknown actor " id ": " (->stringrep message))))))
;; Scheduler
(define local-queue (make-fifo))
(define (send-message address . message)
+ (log-trace "SENDING to " address ": " (->stringrep message))
(apply (if (address-local? address)
send-local-message
send-network-message)
(define (send-network-message address . message)
(let ((s (udp-open-socket))
- (machine (address-machine address))
- (packet (with-output-to-string)
- (lambda ()
- (print (cons (address->string address) message)))))
+ (packet (with-output-to-string
+ (lambda ()
+ (write (cons address message))))))
(udp-bind! s #f 0)
(udp-connect! s
- (machine-host machine)
- (machine-port machine))
+ (address-host address)
+ (address-port address))
(udp-send s packet)
(udp-close-socket s)))
+(define (send-message-later address time . message)
+ (thread-start!
+ (lambda ()
+ (thread-sleep! time)
+ (apply send-message (cons address message)))))
+
(define (next-local-message)
(let ((res #f))
(mutex-lock! message-available-mutex #f #f)
(mutex-lock! local-queue-mutex)
(set! res (fifo-pop local-queue))
(if (not (fifo-empty? local-queue))
- (mutex-unlock! message-available-mutex))
+ (mutex-unlock! message-available-mutex))
(mutex-unlock! local-queue-mutex)
res))
(define (start-scheduler)
+ (let loop ()
+ (apply deliver-message (next-local-message))
+ (loop)))
+
+
+;; Network
+
+(define (start-network-listener)
(thread-start!
(lambda ()
- (let loop ()
- (apply deliver-message (next-local-message))
- (loop)))))
+ (let ((s (udp-open-socket*)))
+ (udp-bind! s #f sam-port)
+ (let loop ()
+ (let-values (((n str) (udp-recv s 65536)))
+ (match (with-input-from-string str read)
+ ((address message ...)
+ (apply send-message (cons address message)))
+ (else
+ (log-msg "Warning: received badly formatted message string '" str "'"))))
+ (loop))))))
-;; Console
+;; System interface
(define reader-queue-mutex (make-mutex "reader queue"))
(define reader-available-mutex (make-mutex "reader available"))
(mutex-lock! reader-available-mutex #f #f)
(define reader-queue (make-fifo))
-(define console
- (make-actor (lambda (self . message)
- (mutex-lock! reader-queue-mutex)
- (fifo-push reader-queue (car message))
- (mutex-unlock! reader-available-mutex)
- (mutex-unlock! reader-queue-mutex)
- 'sleep)))
-
(define (next-reader)
(let ((res #f))
(mutex-lock! reader-available-mutex #f #f)
res))
(define (start-console)
- (let loop ()
- (let ((reader (next-reader)))
- (##sys#thread-block-for-i/o! (current-thread) 0 #t)
- (thread-yield!)
- (send-message reader (read-line)))
- (loop)))
-
-
-;; Network
-
-(define (start-network-listener)
(thread-start!
(lambda ()
- (let ((s (udp-open-socket*)))
- (udp-bind! s #f this-port)
- (let loop ()
- (let-values (((n str) (udp-recv s 1024)))
- (print "Received " n " bytes over network: " str)
- (match (with-input-from-string str read)
- ((addr-str message ...)
- (apply send-message (cons (string->address addr-str) message)))
- (else
- (print "Warning: received badly formatted message string '" str "'"))))
- (loop))))))
-
-;; System interface
-
-(define system
- (make-actor (lambda (self . message)
- (match message
- (('shutdown)
- (print "## System actor received shutdown message.")
- (exit 0)
- 'done)
- (('println strings ...)
- (apply print strings)
- 'sleep)))))
-
-;; Testing
-
-
-(send-message system 'println "Hello, what is your name?")
-(send-message console
- (make-actor (lambda (self . message)
- (match message
- ((name)
- (send-message system 'println "Hello, " name "!")
- 'done)))))
-
-(thread-start!
- (lambda ()
- (thread-sleep! 120)
- (send-message system 'shutdown)))
-
-(print (uri->string system))
-
-(start-scheduler)
-(start-network-listener)
-(start-console)
+ (let loop ()
+ (let ((reader (next-reader)))
+ (##sys#thread-block-for-i/o! (current-thread) 0 #t)
+ (thread-yield!)
+ (send-message reader (read-line)))
+ (loop)))))
-(define (boot-sam host port)
+;; System initialization
+
+(define-beh system-beh
+ (self)
+
+ (('shutdown) =>
+ (log-msg "System actor received shutdown message.")
+ (exit 0)
+ 'done)
+
+ (('print strings ...) =>
+ (apply print strings))
+
+ (('read reader) =>
+ (mutex-lock! reader-queue-mutex)
+ (fifo-push reader-queue reader)
+ (mutex-unlock! reader-available-mutex)
+ (mutex-unlock! reader-queue-mutex)))
+
+(define (boot-sam)
+ (start-console)
+ (start-network-listener)
+ (let ((system (make-actor system-beh))
+ (main #f))
+ (condition-case
+ (begin
+ (set! main (make-actor main-beh)))
+ ((exn)
+ (log-msg "Error starting main actor. Is main-beh defined?")
+ (exit 1)))
+ (send-message main system))
(start-scheduler))
-;; (thread-join! scheduler-thread)
-
-(define (main)
- (let loop ((args (cdr (argv)))
- (host "localhost")
- (port 8000))
- (match args
- ((or "-h" "--help")
- (print-usage))
- (((or "-p" "--port") pstr rest ...)
- (loop rest host (string->number pstr)))
- (("--hostname" hstr rest ...)
- (loop rest hstr port))
- (()
- (boot-sam host port)))))
+(define (print-usage)
+ (print "Simple Actor Machine v" sam-version "\n"
+ "\n"
+ "Usage: sam -h|--help\n"
+ " sam [-n hostname] [-p port] source-file-1 [source-file-2 [...]] "))
+
+(let loop ((args (cdr (argv))))
+ (match args
+ (((or "-h" "--help"))
+ (print-usage))
+ (((or "-p" "--port") pstr rest ...)
+ (set! sam-port (string->number pstr))
+ (loop rest))
+ (((or "-n" "--hostname") hstr rest ...)
+ (set! sam-host hstr)
+ (loop rest))
+ (((or "-t" "--trace") rest ...)
+ (log-msg "Enabling trace debugging")
+ (set! trace #t)
+ (loop rest))
+ (((? file-exists? filename) rest ...)
+ (log-msg "Loading " filename)
+ (load filename)
+ (loop rest))
+ (()
+ (log-msg "Booting SAM\n")
+ (boot-sam))
+ (else
+ (print "Unrecognised argument '" (car args) "'.\n")
+ (print-usage))))
+