X-Git-Url: https://thelambdalab.xyz/gitweb/index.cgi?p=sam.git;a=blobdiff_plain;f=sam.scm;h=fbe800e53436138590f3674c7900b7e86e851e39;hp=c3dcbe6bb833d31dab70cc19cd9c0cb6ce12d3e2;hb=HEAD;hpb=9201f8e605d4704d0b4b780b4fd7a557511b39c9 diff --git a/sam.scm b/sam.scm index c3dcbe6..fbe800e 100644 --- a/sam.scm +++ b/sam.scm @@ -1,197 +1,289 @@ ;; Simple Actor Machine ;; -;; Houses a population of actors which can communicate using messages -;; with actors on the same machine or other machines via the network. - -(module sam - (init-sam - start-console - send-message - make-actor - system) - - (import scheme - (chicken base) - (chicken io) - (chicken string) - (chicken port) - (chicken process-context) - matchable - srfi-18 ; threads - srfi-69 ; hash-table - uuid ; ids for actors - uri-generic - udp - fifo) - - ;; Actors - - (define sam-host "localhost") - (define sam-port 8000) - - (define (make-address host port id) - (uri->string - (make-uri #:scheme "actor" - #:host host - #:port port - #:path (list '/ id)))) - - (define (make-local-address . args) - (make-address sam-host - sam-port - (if (null? args) - (uuid) - (car args)))) - - (define (address-id address) - (cadr (uri-path (uri-reference address)))) - - (define address->uri uri-reference) - - (define (address-local? address) - (let ((uri (address->uri address))) - (and (equal? (uri-host uri) sam-host) - (equal? (uri-port uri) sam-port)))) - - (define actor-table (make-hash-table)) - - (define (make-actor beh) - (let* ((address (make-local-address)) - (id (address-id address))) - (hash-table-set! actor-table id beh) - address)) - - (define (deliver-message address . message) - (let ((id (address-id address))) - (let ((behaviour (hash-table-ref/default actor-table id '()))) - (if (null? behaviour) - (print "Warning: discarded message" message " to unknown actor " address) - (match (apply (hash-table-ref actor-table id) (cons address message)) - ('done (hash-table-delete! actor-table id)) - ('sleep 'do-nothing) - (new-beh (hash-table-set! actor-table id new-beh))))))) - - ;; Scheduler - - (define local-queue-mutex (make-mutex "message queue")) - (define message-available-mutex (make-mutex "message available")) - (mutex-lock! message-available-mutex #f #f) - (define local-queue (make-fifo)) - - (define (send-message address . message) - (print "send-message: Sending " message " to " address) - (apply (if (address-local? address) - send-local-message - send-network-message) - (cons address message))) - - (define (send-local-message address . message) +;; A virtual machine which houses a population of actors which can +;; communicate using messages with actors on the same host or other +;; hosts via the network. + +(import scheme + (chicken base) + (chicken io) + (chicken string) + (chicken port) + (chicken process-context) + (chicken file) + (chicken condition) + matchable + srfi-18 ; threads + srfi-69 ; hash-table + uuid ; ids for actors + uri-generic + udp + fifo + sam-macros) + +;; Global variables + +(define trace #f) + +(define sam-host "localhost") +(define sam-port 8000) + +(define sam-version "0.1") + +;; Logging + +(define (log-msg . args) + (with-output-to-port (current-error-port) + (lambda () + (apply print (cons "## " args))))) + +(define (log-trace . args) + (with-output-to-port (current-error-port) + (lambda () + (if trace (apply log-msg args))))) + +(define (->stringrep arg) + (with-output-to-string + (lambda () + (write arg)))) + +;; Behaviours +;; (See also macros defined in sam-macros.scm.) + +(define (beh-proc beh) + (cadr beh)) +(define (beh-parent beh) + (caddr beh)) + +(define root-beh + (make-beh : #f (self) + (('ping recipient) => + (send-message recipient 'pong)))) + +(define (beh? x) + (and (pair? x) + (not (null? x)) + (eq? (car x) 'beh))) + +;; Actors + +(define (make-address host port id) + (list id host port)) + +(define (make-local-address . args) + (make-address sam-host + sam-port + (if (null? args) + (uuid) + (car args)))) + +(define (address-id address) + (car address)) +(define (address-host address) + (cadr address)) +(define (address-port address) + (caddr address)) +(define (address->string address) + (uri->string + (make-uri #:scheme "actor" + #:host (address-host address) + #:port (address-port address) + #:path (list '/ (address-id address))))) +(define (string->address str) + (let ((uri (uri-reference str))) + (make-address (uri-host uri) + (uri-port uri) + (cadr (uri-path uri))))) + +(define (address-local? address) + (and (equal? (address-host address) sam-host) + (equal? (address-port address) sam-port))) + +(define actor-table (make-hash-table)) + +(define (make-actor beh) + (let* ((address (make-local-address)) + (id (address-id address))) + (hash-table-set! actor-table id beh) + address)) + +(define (deliver-message address . message) + (let ((id (address-id address))) + (log-trace "DELIVERING to " id ": " (->stringrep message)) + (let loop ((beh (hash-table-ref/default actor-table id #f))) + (if beh + (condition-case + (match (apply (beh-proc beh) (cons address message)) + ('done (hash-table-delete! actor-table id)) + ('pass + (log-trace "Passing to parent behaviour...") + (loop (beh-parent beh))) + ((? beh? new-beh) (hash-table-set! actor-table id new-beh)) + (else + 'do-nothing)) ; sleep is now the default + (o (exn) + (log-msg "Warning: actor " id " crashed evaluating message " (->stringrep message)) + (print-error-message o))) + (log-msg "Warning: DISCARDING message to unknown actor " id ": " (->stringrep message)))))) + +;; Scheduler + +(define local-queue-mutex (make-mutex "message queue")) +(define message-available-mutex (make-mutex "message available")) +(mutex-lock! message-available-mutex #f #f) +(define local-queue (make-fifo)) + +(define (send-message address . message) + (log-trace "SENDING to " address ": " (->stringrep message)) + (apply (if (address-local? address) + send-local-message + send-network-message) + (cons address message))) + +(define (send-local-message address . message) + (mutex-lock! local-queue-mutex) + (fifo-push local-queue (cons address message)) + (mutex-unlock! message-available-mutex) + (mutex-unlock! local-queue-mutex)) + +(define (send-network-message address . message) + (let ((s (udp-open-socket)) + (packet (with-output-to-string + (lambda () + (write (cons address message)))))) + (udp-bind! s #f 0) + (udp-connect! s + (address-host address) + (address-port address)) + (udp-send s packet) + (udp-close-socket s))) + +(define (send-message-later address time . message) + (thread-start! + (lambda () + (thread-sleep! time) + (apply send-message (cons address message))))) + +(define (next-local-message) + (let ((res #f)) + (mutex-lock! message-available-mutex #f #f) (mutex-lock! local-queue-mutex) - (fifo-push local-queue (cons address message)) - (mutex-unlock! message-available-mutex) - (mutex-unlock! local-queue-mutex)) - - (define (send-network-message address . message) - (let ((s (udp-open-socket)) - (uri (address->uri address)) - (packet (with-output-to-string - (lambda () - (print (cons address message)))))) - (udp-bind! s #f 0) - (udp-connect! s - (uri-host uri) - (uri-port uri)) - (udp-send s packet) - (udp-close-socket s))) - - (define (next-local-message) - (let ((res #f)) - (mutex-lock! message-available-mutex #f #f) - (mutex-lock! local-queue-mutex) - (set! res (fifo-pop local-queue)) - (if (not (fifo-empty? local-queue)) - (mutex-unlock! message-available-mutex)) - (mutex-unlock! local-queue-mutex) - res)) - - (define (start-scheduler) - (thread-start! - (lambda () + (set! res (fifo-pop local-queue)) + (if (not (fifo-empty? local-queue)) + (mutex-unlock! message-available-mutex)) + (mutex-unlock! local-queue-mutex) + res)) + +(define (start-scheduler) + (let loop () + (apply deliver-message (next-local-message)) + (loop))) + + +;; Network + +(define (start-network-listener) + (thread-start! + (lambda () + (let ((s (udp-open-socket*))) + (udp-bind! s #f sam-port) (let loop () - (apply deliver-message (next-local-message)) - (loop))))) - - - ;; Network - - (define (start-network-listener) - (thread-start! - (lambda () - (let ((s (udp-open-socket*))) - (udp-bind! s #f sam-port) - (let loop () - (let-values (((n str) (udp-recv s 1024))) - (print "network-listener: Received " n " bytes over network: " str) - (match (with-input-from-string str read) - ((address message ...) - (apply send-message (cons address message))) - (else - (print "Warning: received badly formatted message string '" str "'")))) - (loop)))))) - - ;; System interface - - (define reader-queue-mutex (make-mutex "reader queue")) - (define reader-available-mutex (make-mutex "reader available")) - (mutex-lock! reader-available-mutex #f #f) - (define reader-queue (make-fifo)) - - (define (next-reader) - (let ((res #f)) - (mutex-lock! reader-available-mutex #f #f) - (mutex-lock! reader-queue-mutex) - (set! res (fifo-pop reader-queue)) - (if (not (fifo-empty? reader-queue)) - (mutex-unlock! reader-available-mutex)) - (mutex-unlock! reader-queue-mutex) - res)) - - (define (start-console) - (let loop () - (let ((reader (next-reader))) - (print "console: received next reader: " reader) - (##sys#thread-block-for-i/o! (current-thread) 0 #t) - (thread-yield!) - (send-message reader (read-line))) - (loop))) - - ;; System initialization - - (define (make-system-actor) - (make-actor (lambda (self . message) - (match message - - (('shutdown) - (print "## System actor received shutdown message.") - (exit 0) - 'done) - - (('print strings ...) - (apply print strings) - 'sleep) - - (('read reader) - (mutex-lock! reader-queue-mutex) - (fifo-push reader-queue reader) - (mutex-unlock! reader-available-mutex) - (mutex-unlock! reader-queue-mutex) - 'sleep))))) - - (define system #f) - - (define (init-sam host port) - (set! sam-host host) - (set! sam-port port) - (set! system (make-system-actor)) - (start-scheduler) - (start-network-listener))) + (let-values (((n str) (udp-recv s 65536))) + (match (with-input-from-string str read) + ((address message ...) + (apply send-message (cons address message))) + (else + (log-msg "Warning: received badly formatted message string '" str "'")))) + (loop)))))) + +;; System interface + +(define reader-queue-mutex (make-mutex "reader queue")) +(define reader-available-mutex (make-mutex "reader available")) +(mutex-lock! reader-available-mutex #f #f) +(define reader-queue (make-fifo)) + +(define (next-reader) + (let ((res #f)) + (mutex-lock! reader-available-mutex #f #f) + (mutex-lock! reader-queue-mutex) + (set! res (fifo-pop reader-queue)) + (if (not (fifo-empty? reader-queue)) + (mutex-unlock! reader-available-mutex)) + (mutex-unlock! reader-queue-mutex) + res)) + +(define (start-console) + (thread-start! + (lambda () + (let loop () + (let ((reader (next-reader))) + (##sys#thread-block-for-i/o! (current-thread) 0 #t) + (thread-yield!) + (send-message reader (read-line))) + (loop))))) + +;; System initialization + +(define-beh system-beh + (self) + + (('shutdown) => + (log-msg "System actor received shutdown message.") + (exit 0) + 'done) + + (('print strings ...) => + (apply print strings)) + + (('read reader) => + (mutex-lock! reader-queue-mutex) + (fifo-push reader-queue reader) + (mutex-unlock! reader-available-mutex) + (mutex-unlock! reader-queue-mutex))) + +(define (boot-sam) + (start-console) + (start-network-listener) + (let ((system (make-actor system-beh)) + (main #f)) + (condition-case + (begin + (set! main (make-actor main-beh))) + ((exn) + (log-msg "Error starting main actor. Is main-beh defined?") + (exit 1))) + (send-message main system)) + (start-scheduler)) + +(define (print-usage) + (print "Simple Actor Machine v" sam-version "\n" + "\n" + "Usage: sam -h|--help\n" + " sam [-n hostname] [-p port] source-file-1 [source-file-2 [...]] ")) + +(let loop ((args (cdr (argv)))) + (match args + (((or "-h" "--help")) + (print-usage)) + (((or "-p" "--port") pstr rest ...) + (set! sam-port (string->number pstr)) + (loop rest)) + (((or "-n" "--hostname") hstr rest ...) + (set! sam-host hstr) + (loop rest)) + (((or "-t" "--trace") rest ...) + (log-msg "Enabling trace debugging") + (set! trace #t) + (loop rest)) + (((? file-exists? filename) rest ...) + (log-msg "Loading " filename) + (load filename) + (loop rest)) + (() + (log-msg "Booting SAM\n") + (boot-sam)) + (else + (print "Unrecognised argument '" (car args) "'.\n") + (print-usage)))) +