X-Git-Url: https://thelambdalab.xyz/gitweb/index.cgi?a=blobdiff_plain;f=sam.scm;h=88ae320571b278c59e00879782f51518cecef13e;hb=d9768084b4cde1dd4ac4edcbb432b7df4101bfc2;hp=29da1692bccadf63ebd47ebeeb730de45847acfb;hpb=0fb9daea3d2da0258a787e5918bf5622a2f70d1e;p=sam.git diff --git a/sam.scm b/sam.scm index 29da169..88ae320 100644 --- a/sam.scm +++ b/sam.scm @@ -1,14 +1,16 @@ ;; Simple Actor Machine ;; -;; Houses a population of actors which can communicate using messages -;; with actors on the same machine or other machines via the network. +;; A virtual machine which houses a population of actors which can +;; communicate using messages with actors on the same machine or other +;; machines via the network. (module sam - (init-sam - start-console - send-message + (boot-sam make-actor - system) + send-message + send-message-later + address->string + string->address) (import scheme (chicken base) @@ -24,17 +26,15 @@ udp fifo) + (define trace #f) + ;; Actors (define sam-host "localhost") (define sam-port 8000) (define (make-address host port id) - (uri->string - (make-uri #:scheme "actor" - #:host host - #:port port - #:path (list '/ id)))) + (list id host port)) (define (make-local-address . args) (make-address sam-host @@ -44,14 +44,26 @@ (car args)))) (define (address-id address) - (cadr (uri-path (uri-reference address)))) - - (define address->uri uri-reference) + (car address)) + (define (address-host address) + (cadr address)) + (define (address-port address) + (caddr address)) + (define (address->string address) + (uri->string + (make-uri #:scheme "actor" + #:host (address-host address) + #:port (address-port address) + #:path (list '/ (address-id address))))) + (define (string->address str) + (let ((uri (uri-reference str))) + (make-address (uri-host uri) + (uri-port uri) + (cadr (uri-path uri))))) (define (address-local? address) - (let ((uri (address->uri address))) - (and (equal? (uri-host uri) sam-host) - (equal? (uri-port uri) sam-port)))) + (and (equal? (address-host address) sam-host) + (equal? (address-port address) sam-port))) (define actor-table (make-hash-table)) @@ -62,10 +74,12 @@ address)) (define (deliver-message address . message) + (if trace (print "Delivering to " address ": " message)) (let ((id (address-id address))) (let ((behaviour (hash-table-ref/default actor-table id '()))) (if (null? behaviour) - (print "Warning: discarded message" message " to unknown actor " address) + (print "Warning: discarded message " message + " to unknown actor id " id) (match (apply (hash-table-ref actor-table id) (cons address message)) ('done (hash-table-delete! actor-table id)) ('sleep 'do-nothing) @@ -92,17 +106,22 @@ (define (send-network-message address . message) (let ((s (udp-open-socket)) - (uri (address->uri address)) (packet (with-output-to-string (lambda () (write (cons address message)))))) (udp-bind! s #f 0) (udp-connect! s - (uri-host uri) - (uri-port uri)) + (address-host address) + (address-port address)) (udp-send s packet) (udp-close-socket s))) + (define (send-message-later address time . message) + (thread-start! + (lambda () + (thread-sleep! time) + (apply send-message (cons address message))))) + (define (next-local-message) (let ((res #f)) (mutex-lock! message-available-mutex #f #f) @@ -114,11 +133,9 @@ res)) (define (start-scheduler) - (thread-start! - (lambda () - (let loop () - (apply deliver-message (next-local-message)) - (loop))))) + (let loop () + (apply deliver-message (next-local-message)) + (loop))) ;; Network @@ -129,7 +146,7 @@ (let ((s (udp-open-socket*))) (udp-bind! s #f sam-port) (let loop () - (let-values (((n str) (udp-recv s 1024))) + (let-values (((n str) (udp-recv s 65536))) (match (with-input-from-string str read) ((address message ...) (apply send-message (cons address message))) @@ -155,40 +172,40 @@ res)) (define (start-console) - (let loop () - (let ((reader (next-reader))) - (##sys#thread-block-for-i/o! (current-thread) 0 #t) - (thread-yield!) - (send-message reader (read-line))) - (loop))) + (thread-start! + (lambda () + (let loop () + (let ((reader (next-reader))) + (##sys#thread-block-for-i/o! (current-thread) 0 #t) + (thread-yield!) + (send-message reader (read-line))) + (loop))))) ;; System initialization - (define (make-system-actor) - (make-actor (lambda (self . message) - (match message - - (('shutdown) - (print "## System actor received shutdown message.") - (exit 0) - 'done) + (define (system-beh self . message) + (match message - (('print strings ...) - (apply print strings) - 'sleep) + (('shutdown) + (print "## System actor received shutdown message.") + (exit 0) + 'done) - (('read reader) - (mutex-lock! reader-queue-mutex) - (fifo-push reader-queue reader) - (mutex-unlock! reader-available-mutex) - (mutex-unlock! reader-queue-mutex) - 'sleep))))) + (('print strings ...) + (apply print strings) + 'sleep) - (define system #f) + (('read reader) + (mutex-lock! reader-queue-mutex) + (fifo-push reader-queue reader) + (mutex-unlock! reader-available-mutex) + (mutex-unlock! reader-queue-mutex) + 'sleep))) - (define (init-sam host port) + (define (boot-sam host port main-beh) (set! sam-host host) (set! sam-port port) - (set! system (make-system-actor)) - (start-scheduler) - (start-network-listener))) + (start-console) + (start-network-listener) + (send-message (make-actor main-beh) (make-actor system-beh)) + (start-scheduler)))