P2P chat works.
[sam.git] / sam.scm
diff --git a/sam.scm b/sam.scm
index 431cd1c..29da169 100644 (file)
--- a/sam.scm
+++ b/sam.scm
 ;; Houses a population of actors which can communicate using messages
 ;; with actors on the same machine or other machines via the network.
 
-(import (chicken io)
-        (chicken string)
-        matchable
-        srfi-18 ; threads
-        srfi-69 ; hashtable
-        udp6
-        uri-generic)
-
-;; Actors
-
-(define (make-machine host port)
-  (cons host port))
-(define (machine-host m) (car m))
-(define (machine-port m) (cdr m))
-
-(define this-machine (make-machine "localhost" 1234))
-
-(define next-actor-id 1)
+(module sam
+    (init-sam
+     start-console
+     send-message
+     make-actor
+     system)
+
+  (import scheme
+          (chicken base)
+          (chicken io)
+          (chicken string)
+          (chicken port)
+          (chicken process-context)
+          matchable
+          srfi-18 ; threads
+          srfi-69 ; hash-table
+          uuid ; ids for actors
+          uri-generic
+          udp
+          fifo)
+
+  ;; Actors
+
+  (define sam-host "localhost")
+  (define sam-port 8000)
+
+  (define (make-address host port id)
+    (uri->string
+     (make-uri #:scheme "actor"
+               #:host host
+               #:port port
+               #:path (list '/ id))))
+
+  (define (make-local-address . args)
+    (make-address sam-host
+                  sam-port
+                  (if (null? args)
+                      (uuid)
+                      (car args))))
+  
+  (define (address-id address)
+    (cadr (uri-path (uri-reference address))))
 
-(define (address-id address) (car address))
-(define (address-machine address) (cdr address))
+  (define address->uri uri-reference)
 
-(define (address-local? address)
-  (equal? (address-machine address)
-          this-machine))
+  (define (address-local? address)
+    (let ((uri (address->uri address)))
+      (and (equal? (uri-host uri) sam-host)
+           (equal? (uri-port uri) sam-port))))
 
-(define actor-table (make-hash-table))
+  (define actor-table (make-hash-table))
 
-(define (make-actor beh)
-  (let* ((id next-actor-id))
-    (hash-table-put! id beh)
-    (cons id this-machine)))
+  (define (make-actor beh)
+    (let* ((address (make-local-address))
+           (id (address-id address)))
+      (hash-table-set! actor-table id beh)
+      address))
   
-(define (deliver-message address . message)
-  (let ((id (address-id address)))
-    (let ((behaviour (hash-table-ref/default actor-table id '()))))
-    (if (null? behaviour)
-        (print "Warning: discarded message" message " to unknown actor " address)
-        (match (apply (hash-table-ref actor-table id) (cons address message))
-          ('done (hash-table-delete! actor-table actor))
-          ('sleep 'do-nothing)
-          (new-beh (hash-table-put! actor new-beh))))))
-
-;; Scheduler
-
-(define local-queue-mutex (make-mutex "message queue"))
-(define message-available-mutex (make-mutex "message available"))
-(define local-queue (make-fifo))
-
-(define (send-message address . message)
-  (apply (if (address-local? address)
-             send-local-message
-             send-network-message)
-         message))
-
-(define (send-local-message address . message)
-  (mutex-lock! local-queue-mutex)
-  (fifo-push local-queue (cons address message))
-  (mutex-unlock! local-queue-mutex))
-
-(define (send-network-message address . message)
-  (let ((s (udp-open-socket))
-        (machine (address-machine address)))
-    (udp-bind! s #f 0)
-    (udp-connect! s
-                  (machine-host machine)
-                  (machine-port machine))
-    (udp-send s message)
-    (udp-close-socket s)))
-
-(define (next-local-message)
-  (let ((res #f))
+  (define (deliver-message address . message)
+    (let ((id (address-id address)))
+      (let ((behaviour (hash-table-ref/default actor-table id '())))
+        (if (null? behaviour)
+            (print "Warning: discarded message" message " to unknown actor " address)
+            (match (apply (hash-table-ref actor-table id) (cons address message))
+              ('done (hash-table-delete! actor-table id))
+              ('sleep 'do-nothing)
+              (new-beh (hash-table-set! actor-table id new-beh)))))))
+
+  ;; Scheduler
+
+  (define local-queue-mutex (make-mutex "message queue"))
+  (define message-available-mutex (make-mutex "message available"))
+  (mutex-lock! message-available-mutex #f #f)
+  (define local-queue (make-fifo))
+
+  (define (send-message address . message)
+    (apply (if (address-local? address)
+               send-local-message
+               send-network-message)
+           (cons address message)))
+
+  (define (send-local-message address . message)
     (mutex-lock! local-queue-mutex)
-    (set! res (if (fifo-empty? local-queue)
-                  #f
-                  (fifo-pop local-queue)))
-    (mutex-unlock! local-queue-mutex)
-    res))
-
-(define scheduler-thread
-  (make-thread
-   (lambda ()
-     (let loop ((next-addressed-message (next-local-message)))
-       (if next-addressed-message
-           (apply deliver-message next-addressed-message)
-           (begin
-             (lo))))))
-
-  (thread-start!))
+    (fifo-push local-queue (cons address message))
+    (mutex-unlock! message-available-mutex)
+    (mutex-unlock! local-queue-mutex))
+
+  (define (send-network-message address . message)
+    (let ((s (udp-open-socket))
+          (uri (address->uri address))
+          (packet (with-output-to-string
+                    (lambda ()
+                      (write (cons address message))))))
+      (udp-bind! s #f 0)
+      (udp-connect! s
+                    (uri-host uri)
+                    (uri-port uri))
+      (udp-send s packet)
+      (udp-close-socket s)))
+
+  (define (next-local-message)
+    (let ((res #f))
+      (mutex-lock! message-available-mutex #f #f)
+      (mutex-lock! local-queue-mutex)
+      (set! res (fifo-pop local-queue))
+      (if (not (fifo-empty? local-queue))
+          (mutex-unlock! message-available-mutex))
+      (mutex-unlock! local-queue-mutex)
+      res))
+
+  (define (start-scheduler)
+    (thread-start!
+     (lambda ()
+       (let loop ()
+         (apply deliver-message (next-local-message))
+         (loop)))))
+
+
+  ;; Network
+
+  (define (start-network-listener)
+    (thread-start!
+     (lambda ()
+       (let ((s (udp-open-socket*)))
+         (udp-bind! s #f sam-port)
+         (let loop ()
+           (let-values (((n str) (udp-recv s 1024)))
+             (match (with-input-from-string str read)
+               ((address message ...)
+                (apply send-message (cons address message)))
+               (else
+                (print "Warning: received badly formatted message string '" str "'"))))
+           (loop))))))
+
+  ;; System interface
+
+  (define reader-queue-mutex (make-mutex "reader queue"))
+  (define reader-available-mutex (make-mutex "reader available"))
+  (mutex-lock! reader-available-mutex #f #f)
+  (define reader-queue (make-fifo))
+
+  (define (next-reader)
+    (let ((res #f))
+      (mutex-lock! reader-available-mutex #f #f)
+      (mutex-lock! reader-queue-mutex)
+      (set! res (fifo-pop reader-queue))
+      (if (not (fifo-empty? reader-queue))
+          (mutex-unlock! reader-available-mutex))
+      (mutex-unlock! reader-queue-mutex)
+      res))
+
+  (define (start-console)
+    (let loop ()
+      (let ((reader (next-reader)))
+        (##sys#thread-block-for-i/o! (current-thread) 0 #t)
+        (thread-yield!)
+        (send-message reader (read-line)))
+      (loop)))
+
+  ;; System initialization
+
+  (define (make-system-actor)
+    (make-actor (lambda (self . message)
+                  (match message
+
+                    (('shutdown)
+                     (print "## System actor received shutdown message.")
+                     (exit 0)
+                     'done)
+
+                    (('print strings ...)
+                     (apply print strings)
+                     'sleep)
+
+                    (('read reader)
+                     (mutex-lock! reader-queue-mutex)
+                     (fifo-push reader-queue reader)
+                     (mutex-unlock! reader-available-mutex)
+                     (mutex-unlock! reader-queue-mutex)
+                     'sleep)))))
+
+  (define system #f)
+
+  (define (init-sam host port)
+    (set! sam-host host)
+    (set! sam-port port)
+    (set! system (make-system-actor))
+    (start-scheduler)
+    (start-network-listener)))