X-Git-Url: https://thelambdalab.xyz/gitweb/index.cgi?a=blobdiff_plain;f=sam.scm;h=511b95346c89156213fa71860e89f62a9ea6fbac;hb=b44160ac2c32648bf4728b10f83b2853300c5735;hp=5c5ec886a760d2e65394379e202b8e32d9512892;hpb=bf8a20a40dd9e5d963498a0fe43ef5fea52d07c6;p=sam.git diff --git a/sam.scm b/sam.scm index 5c5ec88..511b953 100644 --- a/sam.scm +++ b/sam.scm @@ -5,36 +5,49 @@ (import (chicken io) (chicken string) + (chicken port) matchable srfi-18 ; threads - srfi-69 ; hashtable + srfi-69 ; hash-table + uuid ; ids for actors + uri-generic udp fifo) ;; Actors -(define (make-machine host port) - (cons host port)) -(define (machine-host m) (car m)) -(define (machine-port m) (cdr m)) +(define this-host "localhost") +(define this-port 8000) -(define this-machine (make-machine "localhost" 1234)) +(define (make-address host port id) + (make-uri #:scheme "actor" + #:host host + #:port port + #:path (list '/ id))) -(define next-actor-id 1) +(define address->string uri->string) +(define string->address uri-reference) -(define (address-id address) (car address)) -(define (address-machine address) (cdr address)) +(define (make-local-address . args) + (make-address this-host + this-port + (if (null? args) + (uuid) + (car args)))) + +(define (address-id address) (cadr (uri-path address))) (define (address-local? address) - (equal? (address-machine address) - this-machine)) + (and (equal? (uri-host address) this-host) + (equal? (uri-port address) this-port))) (define actor-table (make-hash-table)) (define (make-actor beh) - (let* ((id next-actor-id)) - (hash-table-put! id beh) - (cons id this-machine))) + (let* ((address (make-local-address)) + (id (address-id address))) + (hash-table-set! actor-table id beh) + address)) (define (deliver-message address . message) (let ((id (address-id address))) @@ -42,9 +55,9 @@ (if (null? behaviour) (print "Warning: discarded message" message " to unknown actor " address) (match (apply (hash-table-ref actor-table id) (cons address message)) - ('done (hash-table-delete! actor-table actor)) + ('done (hash-table-delete! actor-table id)) ('sleep 'do-nothing) - (new-beh (hash-table-put! actor new-beh))))))) + (new-beh (hash-table-set! actor-table id new-beh))))))) ;; Scheduler @@ -57,7 +70,7 @@ (apply (if (address-local? address) send-local-message send-network-message) - message)) + (cons address message))) (define (send-local-message address . message) (mutex-lock! local-queue-mutex) @@ -67,30 +80,135 @@ (define (send-network-message address . message) (let ((s (udp-open-socket)) - (machine (address-machine address))) + (machine (address-machine address)) + (packet (with-output-to-string) + (lambda () + (print (cons (address->string address) message))))) (udp-bind! s #f 0) (udp-connect! s (machine-host machine) (machine-port machine)) - (udp-send s message) + (udp-send s packet) (udp-close-socket s))) (define (next-local-message) (let ((res #f)) - (mutex-lock! message-available-mutex) + (mutex-lock! message-available-mutex #f #f) (mutex-lock! local-queue-mutex) (set! res (fifo-pop local-queue)) - (mutex-unlock! message-available-mutex) + (if (not (fifo-empty? local-queue)) + (mutex-unlock! message-available-mutex)) (mutex-unlock! local-queue-mutex) res)) -(define scheduler-thread - (make-thread +(define (start-scheduler) + (thread-start! (lambda () - (let loop ((next-addressed-message (next-local-message))) - (apply deliver-message next-addressed-message) - (loop (next-local-message)))))) + (let loop () + (apply deliver-message (next-local-message)) + (loop))))) + +;; Console + +(define reader-queue-mutex (make-mutex "reader queue")) +(define reader-available-mutex (make-mutex "reader available")) +(mutex-lock! reader-available-mutex #f #f) +(define reader-queue (make-fifo)) + +(define console + (make-actor (lambda (self . message) + (mutex-lock! reader-queue-mutex) + (fifo-push reader-queue (car message)) + (mutex-unlock! reader-available-mutex) + (mutex-unlock! reader-queue-mutex) + 'sleep))) + +(define (next-reader) + (let ((res #f)) + (mutex-lock! reader-available-mutex #f #f) + (mutex-lock! reader-queue-mutex) + (set! res (fifo-pop reader-queue)) + (if (not (fifo-empty? reader-queue)) + (mutex-unlock! reader-available-mutex)) + (mutex-unlock! reader-queue-mutex) + res)) + +(define (start-console) + (let loop () + (let ((reader (next-reader))) + (##sys#thread-block-for-i/o! (current-thread) 0 #t) + (thread-yield!) + (send-message reader (read-line))) + (loop))) -(thread-start! scheduler-thread) -(thread-join! scheduler-thread) +;; Network + +(define (start-network-listener) + (thread-start! + (lambda () + (let ((s (udp-open-socket*))) + (udp-bind! s #f this-port) + (let loop () + (let-values (((n str) (udp-recv s 1024))) + (print "Received " n " bytes over network: " str) + (match (with-input-from-string str read) + ((addr-str message ...) + (apply send-message (cons (string->address addr-str) message))) + (else + (print "Warning: received badly formatted message string '" str "'")))) + (loop)))))) + +;; System interface + +(define system + (make-actor (lambda (self . message) + (match message + (('shutdown) + (print "## System actor received shutdown message.") + (exit 0) + 'done) + (('println strings ...) + (apply print strings) + 'sleep))))) + +;; Testing + + +(send-message system 'println "Hello, what is your name?") +(send-message console + (make-actor (lambda (self . message) + (match message + ((name) + (send-message system 'println "Hello, " name "!") + 'done))))) + +(thread-start! + (lambda () + (thread-sleep! 120) + (send-message system 'shutdown))) + +(print (uri->string system)) + +(start-scheduler) +(start-network-listener) +(start-console) + +(define (boot-sam host port) + (start-scheduler)) + +;; (thread-join! scheduler-thread) + +(define (main) + (let loop ((args (cdr (argv))) + (host "localhost") + (port 8000)) + (match args + ((or "-h" "--help") + (print-usage)) + (((or "-p" "--port") pstr rest ...) + (loop rest host (string->number pstr))) + (("--hostname" hstr rest ...) + (loop rest hstr port)) + (() + (boot-sam host port)))))