;; Simple Actor Machine ;; ;; A virtual machine which houses a population of actors which can ;; communicate using messages with actors on the same host or other ;; hosts via the network. (import scheme (chicken base) (chicken io) (chicken string) (chicken port) (chicken process-context) (chicken file) (chicken condition) matchable srfi-18 ; threads srfi-69 ; hash-table uuid ; ids for actors uri-generic udp fifo sam-macros) ;; Global variables (define trace #f) (define sam-host "localhost") (define sam-port 8000) (define sam-version "0.1") ;; Logging (define (log-msg . args) (with-output-to-port (current-error-port) (lambda () (apply print (cons "## " args))))) (define (log-trace . args) (with-output-to-port (current-error-port) (lambda () (if trace (apply log-msg args))))) (define (->stringrep arg) (with-output-to-string (lambda () (write arg)))) ;; Behaviours ;; (See also macros defined in sam-macros.scm.) (define (beh-proc beh) (cadr beh)) (define (beh-parent beh) (caddr beh)) (define root-beh (make-beh : #f (self) (('ping recipient) => (send-message recipient 'pong)))) (define (beh? x) (and (pair? x) (not (null? x)) (eq? (car x) 'beh))) ;; Actors (define (make-address host port id) (list id host port)) (define (make-local-address . args) (make-address sam-host sam-port (if (null? args) (uuid) (car args)))) (define (address-id address) (car address)) (define (address-host address) (cadr address)) (define (address-port address) (caddr address)) (define (address->string address) (uri->string (make-uri #:scheme "actor" #:host (address-host address) #:port (address-port address) #:path (list '/ (address-id address))))) (define (string->address str) (let ((uri (uri-reference str))) (make-address (uri-host uri) (uri-port uri) (cadr (uri-path uri))))) (define (address-local? address) (and (equal? (address-host address) sam-host) (equal? (address-port address) sam-port))) (define actor-table (make-hash-table)) (define (make-actor beh) (let* ((address (make-local-address)) (id (address-id address))) (hash-table-set! actor-table id beh) address)) (define (deliver-message address . message) (let ((id (address-id address))) (log-trace "DELIVERING to " id ": " (->stringrep message)) (let loop ((beh (hash-table-ref/default actor-table id #f))) (if beh (condition-case (match (apply (beh-proc beh) (cons address message)) ('done (hash-table-delete! actor-table id)) ('pass (log-trace "Passing to parent behaviour...") (loop (beh-parent beh))) ((? beh? new-beh) (hash-table-set! actor-table id new-beh)) (else 'do-nothing)) ; sleep is now the default (o (exn) (log-msg "Warning: actor " id " crashed evaluating message " (->stringrep message)) (print-error-message o))) (log-msg "Warning: DISCARDING message to unknown actor " id ": " (->stringrep message)))))) ;; Scheduler (define local-queue-mutex (make-mutex "message queue")) (define message-available-mutex (make-mutex "message available")) (mutex-lock! message-available-mutex #f #f) (define local-queue (make-fifo)) (define (send-message address . message) (log-trace "SENDING to " address ": " (->stringrep message)) (apply (if (address-local? address) send-local-message send-network-message) (cons address message))) (define (send-local-message address . message) (mutex-lock! local-queue-mutex) (fifo-push local-queue (cons address message)) (mutex-unlock! message-available-mutex) (mutex-unlock! local-queue-mutex)) (define (send-network-message address . message) (let ((s (udp-open-socket)) (packet (with-output-to-string (lambda () (write (cons address message)))))) (udp-bind! s #f 0) (udp-connect! s (address-host address) (address-port address)) (udp-send s packet) (udp-close-socket s))) (define (send-message-later address time . message) (thread-start! (lambda () (thread-sleep! time) (apply send-message (cons address message))))) (define (next-local-message) (let ((res #f)) (mutex-lock! message-available-mutex #f #f) (mutex-lock! local-queue-mutex) (set! res (fifo-pop local-queue)) (if (not (fifo-empty? local-queue)) (mutex-unlock! message-available-mutex)) (mutex-unlock! local-queue-mutex) res)) (define (start-scheduler) (let loop () (apply deliver-message (next-local-message)) (loop))) ;; Network (define (start-network-listener) (thread-start! (lambda () (let ((s (udp-open-socket*))) (udp-bind! s #f sam-port) (let loop () (let-values (((n str) (udp-recv s 65536))) (match (with-input-from-string str read) ((address message ...) (apply send-message (cons address message))) (else (log-msg "Warning: received badly formatted message string '" str "'")))) (loop)))))) ;; System interface (define reader-queue-mutex (make-mutex "reader queue")) (define reader-available-mutex (make-mutex "reader available")) (mutex-lock! reader-available-mutex #f #f) (define reader-queue (make-fifo)) (define (next-reader) (let ((res #f)) (mutex-lock! reader-available-mutex #f #f) (mutex-lock! reader-queue-mutex) (set! res (fifo-pop reader-queue)) (if (not (fifo-empty? reader-queue)) (mutex-unlock! reader-available-mutex)) (mutex-unlock! reader-queue-mutex) res)) (define (start-console) (thread-start! (lambda () (let loop () (let ((reader (next-reader))) (##sys#thread-block-for-i/o! (current-thread) 0 #t) (thread-yield!) (send-message reader (read-line))) (loop))))) ;; System initialization (define-beh system-beh (self) (('shutdown) => (log-msg "System actor received shutdown message.") (exit 0) 'done) (('print strings ...) => (apply print strings)) (('read reader) => (mutex-lock! reader-queue-mutex) (fifo-push reader-queue reader) (mutex-unlock! reader-available-mutex) (mutex-unlock! reader-queue-mutex))) (define (boot-sam) (start-console) (start-network-listener) (let ((system (make-actor system-beh)) (main #f)) (condition-case (begin (set! main (make-actor main-beh))) ((exn) (log-msg "Error starting main actor. Is main-beh defined?") (exit 1))) (send-message main system)) (start-scheduler)) (define (print-usage) (print "Simple Actor Machine v" sam-version "\n" "\n" "Usage: sam -h|--help\n" " sam [-n hostname] [-p port] source-file-1 [source-file-2 [...]] ")) (let loop ((args (cdr (argv)))) (match args (((or "-h" "--help")) (print-usage)) (((or "-p" "--port") pstr rest ...) (set! sam-port (string->number pstr)) (loop rest)) (((or "-n" "--hostname") hstr rest ...) (set! sam-host hstr) (loop rest)) (((or "-t" "--trace") rest ...) (log-msg "Enabling trace debugging") (set! trace #t) (loop rest)) (((? file-exists? filename) rest ...) (log-msg "Loading " filename) (load filename) (loop rest)) (() (log-msg "Booting SAM\n") (boot-sam)) (else (print "Unrecognised argument '" (car args) "'.\n") (print-usage))))