Behaviours are now tagged lists.
[sam.git] / sam.scm
diff --git a/sam.scm b/sam.scm
index 29da169..fbe800e 100644 (file)
--- a/sam.scm
+++ b/sam.scm
 ;; Simple Actor Machine
 ;;
-;; Houses a population of actors which can communicate using messages
-;; with actors on the same machine or other machines via the network.
-
-(module sam
-    (init-sam
-     start-console
-     send-message
-     make-actor
-     system)
-
-  (import scheme
-          (chicken base)
-          (chicken io)
-          (chicken string)
-          (chicken port)
-          (chicken process-context)
-          matchable
-          srfi-18 ; threads
-          srfi-69 ; hash-table
-          uuid ; ids for actors
-          uri-generic
-          udp
-          fifo)
-
-  ;; Actors
-
-  (define sam-host "localhost")
-  (define sam-port 8000)
-
-  (define (make-address host port id)
-    (uri->string
-     (make-uri #:scheme "actor"
-               #:host host
-               #:port port
-               #:path (list '/ id))))
-
-  (define (make-local-address . args)
-    (make-address sam-host
-                  sam-port
-                  (if (null? args)
-                      (uuid)
-                      (car args))))
-  
-  (define (address-id address)
-    (cadr (uri-path (uri-reference address))))
-
-  (define address->uri uri-reference)
-
-  (define (address-local? address)
-    (let ((uri (address->uri address)))
-      (and (equal? (uri-host uri) sam-host)
-           (equal? (uri-port uri) sam-port))))
-
-  (define actor-table (make-hash-table))
-
-  (define (make-actor beh)
-    (let* ((address (make-local-address))
-           (id (address-id address)))
-      (hash-table-set! actor-table id beh)
-      address))
-  
-  (define (deliver-message address . message)
-    (let ((id (address-id address)))
-      (let ((behaviour (hash-table-ref/default actor-table id '())))
-        (if (null? behaviour)
-            (print "Warning: discarded message" message " to unknown actor " address)
-            (match (apply (hash-table-ref actor-table id) (cons address message))
-              ('done (hash-table-delete! actor-table id))
-              ('sleep 'do-nothing)
-              (new-beh (hash-table-set! actor-table id new-beh)))))))
-
-  ;; Scheduler
-
-  (define local-queue-mutex (make-mutex "message queue"))
-  (define message-available-mutex (make-mutex "message available"))
-  (mutex-lock! message-available-mutex #f #f)
-  (define local-queue (make-fifo))
-
-  (define (send-message address . message)
-    (apply (if (address-local? address)
-               send-local-message
-               send-network-message)
-           (cons address message)))
-
-  (define (send-local-message address . message)
+;; A virtual machine which houses a population of actors which can
+;; communicate using messages with actors on the same host or other
+;; hosts via the network.
+
+(import scheme
+        (chicken base)
+        (chicken io)
+        (chicken string)
+        (chicken port)
+        (chicken process-context)
+        (chicken file)
+        (chicken condition)
+        matchable
+        srfi-18 ; threads
+        srfi-69 ; hash-table
+        uuid ; ids for actors
+        uri-generic
+        udp
+        fifo
+        sam-macros)
+
+;; Global variables
+
+(define trace #f)
+
+(define sam-host "localhost")
+(define sam-port 8000)
+
+(define sam-version "0.1")
+
+;; Logging
+
+(define (log-msg . args)
+  (with-output-to-port (current-error-port)
+    (lambda ()
+      (apply print (cons "## " args)))))
+
+(define (log-trace . args)
+  (with-output-to-port (current-error-port)
+    (lambda ()
+      (if trace (apply log-msg args)))))
+
+(define (->stringrep arg)
+  (with-output-to-string
+    (lambda ()
+      (write arg))))
+
+;; Behaviours
+;; (See also macros defined in sam-macros.scm.)
+
+(define (beh-proc beh)
+  (cadr beh))
+(define (beh-parent beh)
+  (caddr beh))  
+
+(define root-beh
+  (make-beh : #f (self)
+            (('ping recipient) =>
+             (send-message recipient 'pong))))
+
+(define (beh? x)
+  (and (pair? x)
+       (not (null? x))
+       (eq? (car x) 'beh)))
+
+;; Actors
+
+(define (make-address host port id)
+  (list id host port))
+
+(define (make-local-address . args)
+  (make-address sam-host
+                sam-port
+                (if (null? args)
+                    (uuid)
+                    (car args))))
+
+(define (address-id address)
+  (car address))
+(define (address-host address)
+  (cadr address))
+(define (address-port address)
+  (caddr address))
+(define (address->string address)
+  (uri->string
+   (make-uri #:scheme "actor"
+             #:host (address-host address)
+             #:port (address-port address)
+             #:path (list '/ (address-id address)))))
+(define (string->address str)
+  (let ((uri (uri-reference str)))
+    (make-address (uri-host uri)
+                  (uri-port uri)
+                  (cadr (uri-path uri)))))
+
+(define (address-local? address)
+  (and (equal? (address-host address) sam-host)
+       (equal? (address-port address) sam-port)))
+
+(define actor-table (make-hash-table))
+
+(define (make-actor beh)
+  (let* ((address (make-local-address))
+         (id (address-id address)))
+    (hash-table-set! actor-table id beh)
+    address))
+
+(define (deliver-message address . message)
+  (let ((id (address-id address)))
+    (log-trace "DELIVERING to " id ": " (->stringrep message))
+    (let loop ((beh (hash-table-ref/default actor-table id #f)))
+      (if beh
+          (condition-case
+              (match (apply (beh-proc beh) (cons address message))
+                ('done (hash-table-delete! actor-table id))
+                ('pass
+                 (log-trace "Passing to parent behaviour...")
+                 (loop (beh-parent beh)))
+                ((? beh? new-beh) (hash-table-set! actor-table id new-beh))
+                (else
+                 'do-nothing)) ; sleep is now the default
+            (o (exn)
+             (log-msg "Warning: actor " id " crashed evaluating message " (->stringrep message))
+             (print-error-message o)))
+          (log-msg "Warning: DISCARDING message to unknown actor " id ": " (->stringrep message))))))
+
+;; Scheduler
+
+(define local-queue-mutex (make-mutex "message queue"))
+(define message-available-mutex (make-mutex "message available"))
+(mutex-lock! message-available-mutex #f #f)
+(define local-queue (make-fifo))
+
+(define (send-message address . message)
+  (log-trace "SENDING to " address ": " (->stringrep message))
+  (apply (if (address-local? address)
+             send-local-message
+             send-network-message)
+         (cons address message)))
+
+(define (send-local-message address . message)
+  (mutex-lock! local-queue-mutex)
+  (fifo-push local-queue (cons address message))
+  (mutex-unlock! message-available-mutex)
+  (mutex-unlock! local-queue-mutex))
+
+(define (send-network-message address . message)
+  (let ((s (udp-open-socket))
+        (packet (with-output-to-string
+                  (lambda ()
+                    (write (cons address message))))))
+    (udp-bind! s #f 0)
+    (udp-connect! s
+                  (address-host address)
+                  (address-port address))
+    (udp-send s packet)
+    (udp-close-socket s)))
+
+(define (send-message-later address time . message)
+  (thread-start!
+   (lambda ()
+     (thread-sleep! time)
+     (apply send-message (cons address message)))))
+
+(define (next-local-message)
+  (let ((res #f))
+    (mutex-lock! message-available-mutex #f #f)
     (mutex-lock! local-queue-mutex)
-    (fifo-push local-queue (cons address message))
-    (mutex-unlock! message-available-mutex)
-    (mutex-unlock! local-queue-mutex))
-
-  (define (send-network-message address . message)
-    (let ((s (udp-open-socket))
-          (uri (address->uri address))
-          (packet (with-output-to-string
-                    (lambda ()
-                      (write (cons address message))))))
-      (udp-bind! s #f 0)
-      (udp-connect! s
-                    (uri-host uri)
-                    (uri-port uri))
-      (udp-send s packet)
-      (udp-close-socket s)))
-
-  (define (next-local-message)
-    (let ((res #f))
-      (mutex-lock! message-available-mutex #f #f)
-      (mutex-lock! local-queue-mutex)
-      (set! res (fifo-pop local-queue))
-      (if (not (fifo-empty? local-queue))
-          (mutex-unlock! message-available-mutex))
-      (mutex-unlock! local-queue-mutex)
-      res))
-
-  (define (start-scheduler)
-    (thread-start!
-     (lambda ()
+    (set! res (fifo-pop local-queue))
+    (if (not (fifo-empty? local-queue))
+        (mutex-unlock! message-available-mutex))
+    (mutex-unlock! local-queue-mutex)
+    res))
+
+(define (start-scheduler)
+  (let loop ()
+    (apply deliver-message (next-local-message))
+    (loop)))
+
+
+;; Network
+
+(define (start-network-listener)
+  (thread-start!
+   (lambda ()
+     (let ((s (udp-open-socket*)))
+       (udp-bind! s #f sam-port)
        (let loop ()
-         (apply deliver-message (next-local-message))
-         (loop)))))
-
-
-  ;; Network
-
-  (define (start-network-listener)
-    (thread-start!
-     (lambda ()
-       (let ((s (udp-open-socket*)))
-         (udp-bind! s #f sam-port)
-         (let loop ()
-           (let-values (((n str) (udp-recv s 1024)))
-             (match (with-input-from-string str read)
-               ((address message ...)
-                (apply send-message (cons address message)))
-               (else
-                (print "Warning: received badly formatted message string '" str "'"))))
-           (loop))))))
-
-  ;; System interface
-
-  (define reader-queue-mutex (make-mutex "reader queue"))
-  (define reader-available-mutex (make-mutex "reader available"))
-  (mutex-lock! reader-available-mutex #f #f)
-  (define reader-queue (make-fifo))
-
-  (define (next-reader)
-    (let ((res #f))
-      (mutex-lock! reader-available-mutex #f #f)
-      (mutex-lock! reader-queue-mutex)
-      (set! res (fifo-pop reader-queue))
-      (if (not (fifo-empty? reader-queue))
-          (mutex-unlock! reader-available-mutex))
-      (mutex-unlock! reader-queue-mutex)
-      res))
-
-  (define (start-console)
-    (let loop ()
-      (let ((reader (next-reader)))
-        (##sys#thread-block-for-i/o! (current-thread) 0 #t)
-        (thread-yield!)
-        (send-message reader (read-line)))
-      (loop)))
-
-  ;; System initialization
-
-  (define (make-system-actor)
-    (make-actor (lambda (self . message)
-                  (match message
-
-                    (('shutdown)
-                     (print "## System actor received shutdown message.")
-                     (exit 0)
-                     'done)
-
-                    (('print strings ...)
-                     (apply print strings)
-                     'sleep)
-
-                    (('read reader)
-                     (mutex-lock! reader-queue-mutex)
-                     (fifo-push reader-queue reader)
-                     (mutex-unlock! reader-available-mutex)
-                     (mutex-unlock! reader-queue-mutex)
-                     'sleep)))))
-
-  (define system #f)
-
-  (define (init-sam host port)
-    (set! sam-host host)
-    (set! sam-port port)
-    (set! system (make-system-actor))
-    (start-scheduler)
-    (start-network-listener)))
+         (let-values (((n str) (udp-recv s 65536)))
+           (match (with-input-from-string str read)
+             ((address message ...)
+              (apply send-message (cons address message)))
+             (else
+              (log-msg "Warning: received badly formatted message string '" str "'"))))
+         (loop))))))
+
+;; System interface
+
+(define reader-queue-mutex (make-mutex "reader queue"))
+(define reader-available-mutex (make-mutex "reader available"))
+(mutex-lock! reader-available-mutex #f #f)
+(define reader-queue (make-fifo))
+
+(define (next-reader)
+  (let ((res #f))
+    (mutex-lock! reader-available-mutex #f #f)
+    (mutex-lock! reader-queue-mutex)
+    (set! res (fifo-pop reader-queue))
+    (if (not (fifo-empty? reader-queue))
+        (mutex-unlock! reader-available-mutex))
+    (mutex-unlock! reader-queue-mutex)
+    res))
+
+(define (start-console)
+  (thread-start!
+   (lambda ()
+     (let loop ()
+       (let ((reader (next-reader)))
+         (##sys#thread-block-for-i/o! (current-thread) 0 #t)
+         (thread-yield!)
+         (send-message reader (read-line)))
+       (loop)))))
+
+;; System initialization
+
+(define-beh system-beh
+  (self)
+
+  (('shutdown) =>
+   (log-msg "System actor received shutdown message.")
+   (exit 0)
+   'done)
+
+  (('print strings ...) =>
+   (apply print strings))
+
+  (('read reader) =>
+   (mutex-lock! reader-queue-mutex)
+   (fifo-push reader-queue reader)
+   (mutex-unlock! reader-available-mutex)
+   (mutex-unlock! reader-queue-mutex)))
+
+(define (boot-sam)
+  (start-console)
+  (start-network-listener)
+  (let ((system (make-actor system-beh))
+        (main #f))
+    (condition-case
+        (begin
+          (set! main (make-actor main-beh)))
+      ((exn)
+       (log-msg "Error starting main actor. Is main-beh defined?")
+       (exit 1)))
+    (send-message main system))
+  (start-scheduler))
+
+(define (print-usage)
+  (print "Simple Actor Machine v" sam-version "\n"
+         "\n"
+         "Usage: sam -h|--help\n"
+         "       sam [-n hostname] [-p port] source-file-1 [source-file-2 [...]] "))
+
+(let loop ((args (cdr (argv))))
+  (match args
+    (((or "-h" "--help"))
+     (print-usage))
+    (((or "-p" "--port") pstr rest ...)
+     (set! sam-port (string->number pstr))
+     (loop rest))
+    (((or "-n" "--hostname") hstr rest ...)
+     (set! sam-host hstr)
+     (loop rest))
+    (((or "-t" "--trace") rest ...)
+     (log-msg "Enabling trace debugging")
+     (set! trace #t)
+     (loop rest))
+    (((? file-exists? filename) rest ...)
+     (log-msg "Loading " filename)
+     (load filename)
+     (loop rest))
+    (()
+     (log-msg "Booting SAM\n")
+     (boot-sam))
+    (else
+     (print "Unrecognised argument '" (car args) "'.\n")
+     (print-usage))))
+