X-Git-Url: https://thelambdalab.xyz/gitweb/index.cgi?a=blobdiff_plain;f=sam.scm;h=93906b958bd0472bfe4ff5583acdc7266ab8cca9;hb=3001c497e5536bb767303d96a6b65e2ad040e754;hp=8e8a730e4a22b75084be5bc9867baf4c48208e24;hpb=fa56c151c2551ad22577766c67ed52832066bf33;p=sam.git diff --git a/sam.scm b/sam.scm index 8e8a730..93906b9 100644 --- a/sam.scm +++ b/sam.scm @@ -1,11 +1,16 @@ ;; Simple Actor Machine ;; -;; Houses a population of actors which can communicate using messages -;; with actors on the same machine or other machines via the network. +;; A virtual machine which houses a population of actors which can +;; communicate using messages with actors on the same machine or other +;; machines via the network. -(import (chicken io) +(import scheme + (chicken base) + (chicken io) (chicken string) (chicken port) + (chicken process-context) + (chicken file) matchable srfi-18 ; threads srfi-69 ; hash-table @@ -14,29 +19,49 @@ udp fifo) +;; Global variables + +(define trace #f) + +(define sam-host "localhost") +(define sam-port 8000) + +(define sam-version "0.1") + ;; Actors -(define this-host "localhost") -(define this-port 8000) (define (make-address host port id) - (make-uri #:scheme "actor" - #:host host - #:port port - #:path (list '/ id))) + (list id host port)) (define (make-local-address . args) - (make-address this-host - this-port + (make-address sam-host + sam-port (if (null? args) (uuid) (car args)))) - -(define (address-id address) (cadr (uri-path address))) + +(define (address-id address) + (car address)) +(define (address-host address) + (cadr address)) +(define (address-port address) + (caddr address)) +(define (address->string address) + (uri->string + (make-uri #:scheme "actor" + #:host (address-host address) + #:port (address-port address) + #:path (list '/ (address-id address))))) +(define (string->address str) + (let ((uri (uri-reference str))) + (make-address (uri-host uri) + (uri-port uri) + (cadr (uri-path uri))))) (define (address-local? address) - (and (equal? (uri-host address) this-host) - (equal? (uri-port address) this-port))) + (and (equal? (address-host address) sam-host) + (equal? (address-port address) sam-port))) (define actor-table (make-hash-table)) @@ -45,12 +70,14 @@ (id (address-id address))) (hash-table-set! actor-table id beh) address)) - + (define (deliver-message address . message) + (if trace (print "Delivering to " address ": " message)) (let ((id (address-id address))) (let ((behaviour (hash-table-ref/default actor-table id '()))) (if (null? behaviour) - (print "Warning: discarded message" message " to unknown actor " address) + (print "Warning: discarded message " message + " to unknown actor id " id) (match (apply (hash-table-ref actor-table id) (cons address message)) ('done (hash-table-delete! actor-table id)) ('sleep 'do-nothing) @@ -77,46 +104,61 @@ (define (send-network-message address . message) (let ((s (udp-open-socket)) - (machine (address-machine address))) + (packet (with-output-to-string + (lambda () + (write (cons address message)))))) (udp-bind! s #f 0) (udp-connect! s - (machine-host machine) - (machine-port machine)) - (udp-send s message) + (address-host address) + (address-port address)) + (udp-send s packet) (udp-close-socket s))) +(define (send-message-later address time . message) + (thread-start! + (lambda () + (thread-sleep! time) + (apply send-message (cons address message))))) + (define (next-local-message) (let ((res #f)) (mutex-lock! message-available-mutex #f #f) (mutex-lock! local-queue-mutex) (set! res (fifo-pop local-queue)) (if (not (fifo-empty? local-queue)) - (mutex-unlock! message-available-mutex)) + (mutex-unlock! message-available-mutex)) (mutex-unlock! local-queue-mutex) res)) (define (start-scheduler) + (let loop () + (apply deliver-message (next-local-message)) + (loop))) + + +;; Network + +(define (start-network-listener) (thread-start! (lambda () - (let loop () - (apply deliver-message (next-local-message)) - (loop))))) + (let ((s (udp-open-socket*))) + (udp-bind! s #f sam-port) + (let loop () + (let-values (((n str) (udp-recv s 65536))) + (match (with-input-from-string str read) + ((address message ...) + (apply send-message (cons address message))) + (else + (print "Warning: received badly formatted message string '" str "'")))) + (loop)))))) -;; Console +;; System interface (define reader-queue-mutex (make-mutex "reader queue")) (define reader-available-mutex (make-mutex "reader available")) (mutex-lock! reader-available-mutex #f #f) (define reader-queue (make-fifo)) -(define console - (make-actor (lambda (self . message) - (mutex-lock! reader-queue-mutex) - (fifo-push reader-queue (car message)) - (mutex-unlock! reader-available-mutex) - (mutex-unlock! reader-queue-mutex) - 'sleep))) - (define (next-reader) (let ((res #f)) (mutex-lock! reader-available-mutex #f #f) @@ -128,64 +170,67 @@ res)) (define (start-console) - (let loop () - (let ((reader (next-reader))) - (##sys#thread-block-for-i/o! (current-thread) 0 #t) - (thread-yield!) - (send-message reader (read-line))) - (loop))) - - -;; Network - -(define (start-network-listener) (thread-start! (lambda () - (let ((s (udp-open-socket*))) - (udp-bind! s #f this-port) - (let loop () - (let-values (((n str) (udp-recv s 1024))) - (print "Received " n " bytes over network: " str) - (match (with-input-from-string str read) - ((uri-str message ...) - (apply send-message (cons (uri-reference uri-str) message))) - (else - (print "Warning: received badly formatted message string '" str "'")))) - (loop)))))) - -;; System interface - -(define system - (make-actor (lambda (self . message) - (match message - (('shutdown) - (print "## System actor received shutdown message.") - (exit 0) - 'done) - (('println strings ...) - (apply print strings) - 'sleep))))) - -;; Testing - - -(send-message system 'println "Hello, what is your name?") -(send-message console - (make-actor (lambda (self . message) - (match message - ((name) - (send-message system 'println "Hello, " name "!") - 'done))))) - -(thread-start! - (lambda () - (thread-sleep! 120) - (send-message system 'shutdown))) - -(print (uri->string system)) + (let loop () + (let ((reader (next-reader))) + (##sys#thread-block-for-i/o! (current-thread) 0 #t) + (thread-yield!) + (send-message reader (read-line))) + (loop))))) -(start-scheduler) -(start-network-listener) -(start-console) +;; System initialization + +(define (system-beh self . message) + (match message + + (('shutdown) + (print "## System actor received shutdown message.") + (exit 0) + 'done) + + (('print strings ...) + (apply print strings) + 'sleep) + + (('read reader) + (mutex-lock! reader-queue-mutex) + (fifo-push reader-queue reader) + (mutex-unlock! reader-available-mutex) + (mutex-unlock! reader-queue-mutex) + 'sleep))) + +(define (boot-sam) + (start-console) + (start-network-listener) + (send-message (make-actor main-beh) (make-actor system-beh)) + (start-scheduler)) + +(define (print-usage) + (print "Simple Actor Machine v" sam-version "\n" + "\n" + "Usage: sam -h|--help\n" + " sam [-n hostname] [-p port] source-file-1 [source-file-2 [...]] ")) + + +(let loop ((args (cdr (argv)))) + (match args + (((or "-h" "--help")) + (print-usage)) + (((or "-p" "--port") pstr rest ...) + (set! sam-port (string->number pstr)) + (loop rest)) + (((or "-n" "--hostname") hstr rest ...) + (set! sam-host hstr) + (loop rest) + (((? file-exists? filename) rest ...)) + (print* "Loading " filename "...") + (load filename) + (print " done.") + (loop rest)) + (() + (boot-sam host port main-beh)) + (else + (print "Unrecognised argument '" (car args) "'.\n") + (print-usage)))) -;; (thread-join! scheduler-thread)