Replaced URIs as optional string representation of addresses.
[sam.git] / sam.scm
diff --git a/sam.scm b/sam.scm
index 656338d..88ae320 100644 (file)
--- a/sam.scm
+++ b/sam.scm
 ;; Simple Actor Machine
 ;;
-;; Houses a population of actors which can communicate using messages
-;; with actors on the same machine or other machines via the network.
-
-(import (chicken io)
-        (chicken string)
-        matchable
-        srfi-18 ; threads
-        srfi-69 ; hash-table
-        udp
-        fifo)
-
-;; Actors
-
-(define (make-machine host port)
-  (cons host port))
-(define (machine-host m) (car m))
-(define (machine-port m) (cdr m))
-
-(define this-machine (make-machine "localhost" 1234))
-
-(define next-actor-id
-  (let ((mutex (make-mutex "actor id mutex"))
-        (next-id 1))
-    (lambda ()
-      (let ((res #f))
-        (mutex-lock! mutex)
-        (set! res next-id)
-        (set! next-id (+ next-id 1))
-        (mutex-unlock! mutex)
-        res))))
-
-(define (address-id address) (car address))
-(define (address-machine address) (cdr address))
-(define (make-address id machine)
-  (cons id machine))
-
-(define (address-local? address)
-  (equal? (address-machine address)
-          this-machine))
-
-(define actor-table (make-hash-table))
-
-(define (make-actor beh)
-  (let* ((id (next-actor-id)))
-    (hash-table-set! actor-table id beh)
-    (make-address id this-machine)))
+;; A virtual machine which houses a population of actors which can
+;; communicate using messages with actors on the same machine or other
+;; machines via the network.
+
+(module sam
+    (boot-sam
+     make-actor
+     send-message
+     send-message-later
+     address->string
+     string->address)
+
+  (import scheme
+          (chicken base)
+          (chicken io)
+          (chicken string)
+          (chicken port)
+          (chicken process-context)
+          matchable
+          srfi-18 ; threads
+          srfi-69 ; hash-table
+          uuid ; ids for actors
+          uri-generic
+          udp
+          fifo)
+
+  (define trace #f)
+
+  ;; Actors
+
+  (define sam-host "localhost")
+  (define sam-port 8000)
+
+  (define (make-address host port id)
+    (list id host port))
+
+  (define (make-local-address . args)
+    (make-address sam-host
+                  sam-port
+                  (if (null? args)
+                      (uuid)
+                      (car args))))
   
-(define (deliver-message address . message)
-  (let ((id (address-id address)))
-    (let ((behaviour (hash-table-ref/default actor-table id '())))
-      (if (null? behaviour)
-          (print "Warning: discarded message" message " to unknown actor " address)
-          (match (apply (hash-table-ref actor-table id) (cons address message))
-            ('done (hash-table-delete! actor-table id))
-            ('sleep 'do-nothing)
-            (new-beh (hash-table-set! actor-table id new-beh)))))))
-
-;; Scheduler
-
-(define local-queue-mutex (make-mutex "message queue"))
-(define message-available-mutex (make-mutex "message available"))
-(mutex-lock! message-available-mutex #f #f)
-(define local-queue (make-fifo))
-
-(define (send-message address . message)
-  (apply (if (address-local? address)
-             send-local-message
-             send-network-message)
-         (cons address message)))
-
-(define (send-local-message address . message)
-  (mutex-lock! local-queue-mutex)
-  (fifo-push local-queue (cons address message))
-  (mutex-unlock! message-available-mutex)
-  (mutex-unlock! local-queue-mutex))
-
-(define (send-network-message address . message)
-  (let ((s (udp-open-socket))
-        (machine (address-machine address)))
-    (udp-bind! s #f 0)
-    (udp-connect! s
-                  (machine-host machine)
-                  (machine-port machine))
-    (udp-send s message)
-    (udp-close-socket s)))
-
-(define (next-local-message)
-  (let ((res #f))
-    (mutex-lock! message-available-mutex #f #f)
+  (define (address-id address)
+    (car address))
+  (define (address-host address)
+    (cadr address))
+  (define (address-port address)
+    (caddr address))
+  (define (address->string address)
+    (uri->string
+     (make-uri #:scheme "actor"
+               #:host (address-host address)
+               #:port (address-port address)
+               #:path (list '/ (address-id address)))))
+  (define (string->address str)
+    (let ((uri (uri-reference str)))
+      (make-address (uri-host uri)
+                    (uri-port uri)
+                    (cadr (uri-path uri)))))
+
+  (define (address-local? address)
+    (and (equal? (address-host address) sam-host)
+         (equal? (address-port address) sam-port)))
+
+  (define actor-table (make-hash-table))
+
+  (define (make-actor beh)
+    (let* ((address (make-local-address))
+           (id (address-id address)))
+      (hash-table-set! actor-table id beh)
+      address))
+  
+  (define (deliver-message address . message)
+    (if trace (print "Delivering to " address ": " message))
+    (let ((id (address-id address)))
+      (let ((behaviour (hash-table-ref/default actor-table id '())))
+        (if (null? behaviour)
+            (print "Warning: discarded message " message
+                   " to unknown actor id " id)
+            (match (apply (hash-table-ref actor-table id) (cons address message))
+              ('done (hash-table-delete! actor-table id))
+              ('sleep 'do-nothing)
+              (new-beh (hash-table-set! actor-table id new-beh)))))))
+
+  ;; Scheduler
+
+  (define local-queue-mutex (make-mutex "message queue"))
+  (define message-available-mutex (make-mutex "message available"))
+  (mutex-lock! message-available-mutex #f #f)
+  (define local-queue (make-fifo))
+
+  (define (send-message address . message)
+    (apply (if (address-local? address)
+               send-local-message
+               send-network-message)
+           (cons address message)))
+
+  (define (send-local-message address . message)
     (mutex-lock! local-queue-mutex)
-    (set! res (fifo-pop local-queue))
-    (if (not (fifo-empty? local-queue))
-      (mutex-unlock! message-available-mutex))
-    (mutex-unlock! local-queue-mutex)
-    res))
-
-(define (start-scheduler)
-  (thread-start!
-   (lambda ()
-     (let loop ()
-       (apply deliver-message (next-local-message))
-       (loop)))))
-
-;; Console
-
-(define reader-queue-mutex (make-mutex "reader queue"))
-(define reader-available-mutex (make-mutex "reader available"))
-(mutex-lock! reader-available-mutex #f #f)
-(define reader-queue (make-fifo))
-
-(define console
-  (make-actor (lambda (self . message)
-                (mutex-lock! reader-queue-mutex)
-                (fifo-push reader-queue (car message))
-                (mutex-unlock! reader-available-mutex)
-                (mutex-unlock! reader-queue-mutex)
-                'sleep)))
-
-(define (next-reader)
-  (let ((res #f))
-    (mutex-lock! reader-available-mutex #f #f)
-    (mutex-lock! reader-queue-mutex)
-    (set! res (fifo-pop reader-queue))
-    (if (not (fifo-empty? reader-queue))
-        (mutex-unlock! reader-available-mutex))
-    (mutex-unlock! reader-queue-mutex)
-    res))
-
-(define (start-console)
-  (let loop ()
-    (let ((reader (next-reader)))
-      (##sys#thread-block-for-i/o! (current-thread) 0 #t)
-      (thread-yield!)
-      (send-message reader (read-line)))
-    (loop)))
-
-
-;; System interface
-
-(define system
-  (make-actor (lambda (self . message)
-                (match message
-                  (('shutdown)
-                   (print "## System actor received shutdown message.")
-                   (exit 0)
-                   'done)
-                  (('println strings ...)
-                   (apply print strings)
-                   'sleep)))))
-
-;; Testing
-
-
-(send-message system 'println "Hello, what is your name?")
-(send-message console
-              (make-actor (lambda (self . message)
-                            (match message
-                              ((name)
-                               (send-message system 'println "Hello, " name "!")
-                               'done)))))
-
-(thread-start!
- (lambda ()
-   (thread-sleep! 10)
-   (send-message system 'shutdown)))
-
-(start-scheduler)
-(start-console)
-
-;; (thread-join! scheduler-thread)
+    (fifo-push local-queue (cons address message))
+    (mutex-unlock! message-available-mutex)
+    (mutex-unlock! local-queue-mutex))
+
+  (define (send-network-message address . message)
+    (let ((s (udp-open-socket))
+          (packet (with-output-to-string
+                    (lambda ()
+                      (write (cons address message))))))
+      (udp-bind! s #f 0)
+      (udp-connect! s
+                    (address-host address)
+                    (address-port address))
+      (udp-send s packet)
+      (udp-close-socket s)))
+
+  (define (send-message-later address time . message)
+    (thread-start!
+     (lambda ()
+       (thread-sleep! time)
+       (apply send-message (cons address message)))))
+
+  (define (next-local-message)
+    (let ((res #f))
+      (mutex-lock! message-available-mutex #f #f)
+      (mutex-lock! local-queue-mutex)
+      (set! res (fifo-pop local-queue))
+      (if (not (fifo-empty? local-queue))
+          (mutex-unlock! message-available-mutex))
+      (mutex-unlock! local-queue-mutex)
+      res))
+
+  (define (start-scheduler)
+    (let loop ()
+      (apply deliver-message (next-local-message))
+      (loop)))
+
+
+  ;; Network
+
+  (define (start-network-listener)
+    (thread-start!
+     (lambda ()
+       (let ((s (udp-open-socket*)))
+         (udp-bind! s #f sam-port)
+         (let loop ()
+           (let-values (((n str) (udp-recv s 65536)))
+             (match (with-input-from-string str read)
+               ((address message ...)
+                (apply send-message (cons address message)))
+               (else
+                (print "Warning: received badly formatted message string '" str "'"))))
+           (loop))))))
+
+  ;; System interface
+
+  (define reader-queue-mutex (make-mutex "reader queue"))
+  (define reader-available-mutex (make-mutex "reader available"))
+  (mutex-lock! reader-available-mutex #f #f)
+  (define reader-queue (make-fifo))
+
+  (define (next-reader)
+    (let ((res #f))
+      (mutex-lock! reader-available-mutex #f #f)
+      (mutex-lock! reader-queue-mutex)
+      (set! res (fifo-pop reader-queue))
+      (if (not (fifo-empty? reader-queue))
+          (mutex-unlock! reader-available-mutex))
+      (mutex-unlock! reader-queue-mutex)
+      res))
+
+  (define (start-console)
+    (thread-start!
+     (lambda ()
+       (let loop ()
+         (let ((reader (next-reader)))
+           (##sys#thread-block-for-i/o! (current-thread) 0 #t)
+           (thread-yield!)
+           (send-message reader (read-line)))
+         (loop)))))
+
+  ;; System initialization
+
+  (define (system-beh self . message)
+    (match message
+
+      (('shutdown)
+       (print "## System actor received shutdown message.")
+       (exit 0)
+       'done)
+
+      (('print strings ...)
+       (apply print strings)
+       'sleep)
+
+      (('read reader)
+       (mutex-lock! reader-queue-mutex)
+       (fifo-push reader-queue reader)
+       (mutex-unlock! reader-available-mutex)
+       (mutex-unlock! reader-queue-mutex)
+       'sleep)))
+
+  (define (boot-sam host port main-beh)
+    (set! sam-host host)
+    (set! sam-port port)
+    (start-console)
+    (start-network-listener)
+    (send-message (make-actor main-beh) (make-actor system-beh))
+    (start-scheduler)))