X-Git-Url: https://thelambdalab.xyz/gitweb/index.cgi?a=blobdiff_plain;f=sam.scm;h=29da1692bccadf63ebd47ebeeb730de45847acfb;hb=0fb9daea3d2da0258a787e5918bf5622a2f70d1e;hp=656338df60930975ae05e64a567969a2105707b8;hpb=6406acb5e3365ee9d47d5766393ac260dab3d019;p=sam.git diff --git a/sam.scm b/sam.scm index 656338d..29da169 100644 --- a/sam.scm +++ b/sam.scm @@ -3,170 +3,192 @@ ;; Houses a population of actors which can communicate using messages ;; with actors on the same machine or other machines via the network. -(import (chicken io) - (chicken string) - matchable - srfi-18 ; threads - srfi-69 ; hash-table - udp - fifo) - -;; Actors - -(define (make-machine host port) - (cons host port)) -(define (machine-host m) (car m)) -(define (machine-port m) (cdr m)) - -(define this-machine (make-machine "localhost" 1234)) - -(define next-actor-id - (let ((mutex (make-mutex "actor id mutex")) - (next-id 1)) - (lambda () - (let ((res #f)) - (mutex-lock! mutex) - (set! res next-id) - (set! next-id (+ next-id 1)) - (mutex-unlock! mutex) - res)))) - -(define (address-id address) (car address)) -(define (address-machine address) (cdr address)) -(define (make-address id machine) - (cons id machine)) - -(define (address-local? address) - (equal? (address-machine address) - this-machine)) - -(define actor-table (make-hash-table)) - -(define (make-actor beh) - (let* ((id (next-actor-id))) - (hash-table-set! actor-table id beh) - (make-address id this-machine))) +(module sam + (init-sam + start-console + send-message + make-actor + system) + + (import scheme + (chicken base) + (chicken io) + (chicken string) + (chicken port) + (chicken process-context) + matchable + srfi-18 ; threads + srfi-69 ; hash-table + uuid ; ids for actors + uri-generic + udp + fifo) + + ;; Actors + + (define sam-host "localhost") + (define sam-port 8000) + + (define (make-address host port id) + (uri->string + (make-uri #:scheme "actor" + #:host host + #:port port + #:path (list '/ id)))) + + (define (make-local-address . args) + (make-address sam-host + sam-port + (if (null? args) + (uuid) + (car args)))) -(define (deliver-message address . message) - (let ((id (address-id address))) - (let ((behaviour (hash-table-ref/default actor-table id '()))) - (if (null? behaviour) - (print "Warning: discarded message" message " to unknown actor " address) - (match (apply (hash-table-ref actor-table id) (cons address message)) - ('done (hash-table-delete! actor-table id)) - ('sleep 'do-nothing) - (new-beh (hash-table-set! actor-table id new-beh))))))) - -;; Scheduler - -(define local-queue-mutex (make-mutex "message queue")) -(define message-available-mutex (make-mutex "message available")) -(mutex-lock! message-available-mutex #f #f) -(define local-queue (make-fifo)) - -(define (send-message address . message) - (apply (if (address-local? address) - send-local-message - send-network-message) - (cons address message))) - -(define (send-local-message address . message) - (mutex-lock! local-queue-mutex) - (fifo-push local-queue (cons address message)) - (mutex-unlock! message-available-mutex) - (mutex-unlock! local-queue-mutex)) - -(define (send-network-message address . message) - (let ((s (udp-open-socket)) - (machine (address-machine address))) - (udp-bind! s #f 0) - (udp-connect! s - (machine-host machine) - (machine-port machine)) - (udp-send s message) - (udp-close-socket s))) - -(define (next-local-message) - (let ((res #f)) - (mutex-lock! message-available-mutex #f #f) + (define (address-id address) + (cadr (uri-path (uri-reference address)))) + + (define address->uri uri-reference) + + (define (address-local? address) + (let ((uri (address->uri address))) + (and (equal? (uri-host uri) sam-host) + (equal? (uri-port uri) sam-port)))) + + (define actor-table (make-hash-table)) + + (define (make-actor beh) + (let* ((address (make-local-address)) + (id (address-id address))) + (hash-table-set! actor-table id beh) + address)) + + (define (deliver-message address . message) + (let ((id (address-id address))) + (let ((behaviour (hash-table-ref/default actor-table id '()))) + (if (null? behaviour) + (print "Warning: discarded message" message " to unknown actor " address) + (match (apply (hash-table-ref actor-table id) (cons address message)) + ('done (hash-table-delete! actor-table id)) + ('sleep 'do-nothing) + (new-beh (hash-table-set! actor-table id new-beh))))))) + + ;; Scheduler + + (define local-queue-mutex (make-mutex "message queue")) + (define message-available-mutex (make-mutex "message available")) + (mutex-lock! message-available-mutex #f #f) + (define local-queue (make-fifo)) + + (define (send-message address . message) + (apply (if (address-local? address) + send-local-message + send-network-message) + (cons address message))) + + (define (send-local-message address . message) (mutex-lock! local-queue-mutex) - (set! res (fifo-pop local-queue)) - (if (not (fifo-empty? local-queue)) - (mutex-unlock! message-available-mutex)) - (mutex-unlock! local-queue-mutex) - res)) - -(define (start-scheduler) - (thread-start! - (lambda () - (let loop () - (apply deliver-message (next-local-message)) - (loop))))) - -;; Console - -(define reader-queue-mutex (make-mutex "reader queue")) -(define reader-available-mutex (make-mutex "reader available")) -(mutex-lock! reader-available-mutex #f #f) -(define reader-queue (make-fifo)) - -(define console - (make-actor (lambda (self . message) - (mutex-lock! reader-queue-mutex) - (fifo-push reader-queue (car message)) - (mutex-unlock! reader-available-mutex) - (mutex-unlock! reader-queue-mutex) - 'sleep))) - -(define (next-reader) - (let ((res #f)) - (mutex-lock! reader-available-mutex #f #f) - (mutex-lock! reader-queue-mutex) - (set! res (fifo-pop reader-queue)) - (if (not (fifo-empty? reader-queue)) - (mutex-unlock! reader-available-mutex)) - (mutex-unlock! reader-queue-mutex) - res)) - -(define (start-console) - (let loop () - (let ((reader (next-reader))) - (##sys#thread-block-for-i/o! (current-thread) 0 #t) - (thread-yield!) - (send-message reader (read-line))) - (loop))) - - -;; System interface - -(define system - (make-actor (lambda (self . message) - (match message - (('shutdown) - (print "## System actor received shutdown message.") - (exit 0) - 'done) - (('println strings ...) - (apply print strings) - 'sleep))))) - -;; Testing - - -(send-message system 'println "Hello, what is your name?") -(send-message console - (make-actor (lambda (self . message) - (match message - ((name) - (send-message system 'println "Hello, " name "!") - 'done))))) - -(thread-start! - (lambda () - (thread-sleep! 10) - (send-message system 'shutdown))) - -(start-scheduler) -(start-console) - -;; (thread-join! scheduler-thread) + (fifo-push local-queue (cons address message)) + (mutex-unlock! message-available-mutex) + (mutex-unlock! local-queue-mutex)) + + (define (send-network-message address . message) + (let ((s (udp-open-socket)) + (uri (address->uri address)) + (packet (with-output-to-string + (lambda () + (write (cons address message)))))) + (udp-bind! s #f 0) + (udp-connect! s + (uri-host uri) + (uri-port uri)) + (udp-send s packet) + (udp-close-socket s))) + + (define (next-local-message) + (let ((res #f)) + (mutex-lock! message-available-mutex #f #f) + (mutex-lock! local-queue-mutex) + (set! res (fifo-pop local-queue)) + (if (not (fifo-empty? local-queue)) + (mutex-unlock! message-available-mutex)) + (mutex-unlock! local-queue-mutex) + res)) + + (define (start-scheduler) + (thread-start! + (lambda () + (let loop () + (apply deliver-message (next-local-message)) + (loop))))) + + + ;; Network + + (define (start-network-listener) + (thread-start! + (lambda () + (let ((s (udp-open-socket*))) + (udp-bind! s #f sam-port) + (let loop () + (let-values (((n str) (udp-recv s 1024))) + (match (with-input-from-string str read) + ((address message ...) + (apply send-message (cons address message))) + (else + (print "Warning: received badly formatted message string '" str "'")))) + (loop)))))) + + ;; System interface + + (define reader-queue-mutex (make-mutex "reader queue")) + (define reader-available-mutex (make-mutex "reader available")) + (mutex-lock! reader-available-mutex #f #f) + (define reader-queue (make-fifo)) + + (define (next-reader) + (let ((res #f)) + (mutex-lock! reader-available-mutex #f #f) + (mutex-lock! reader-queue-mutex) + (set! res (fifo-pop reader-queue)) + (if (not (fifo-empty? reader-queue)) + (mutex-unlock! reader-available-mutex)) + (mutex-unlock! reader-queue-mutex) + res)) + + (define (start-console) + (let loop () + (let ((reader (next-reader))) + (##sys#thread-block-for-i/o! (current-thread) 0 #t) + (thread-yield!) + (send-message reader (read-line))) + (loop))) + + ;; System initialization + + (define (make-system-actor) + (make-actor (lambda (self . message) + (match message + + (('shutdown) + (print "## System actor received shutdown message.") + (exit 0) + 'done) + + (('print strings ...) + (apply print strings) + 'sleep) + + (('read reader) + (mutex-lock! reader-queue-mutex) + (fifo-push reader-queue reader) + (mutex-unlock! reader-available-mutex) + (mutex-unlock! reader-queue-mutex) + 'sleep))))) + + (define system #f) + + (define (init-sam host port) + (set! sam-host host) + (set! sam-port port) + (set! system (make-system-actor)) + (start-scheduler) + (start-network-listener)))