X-Git-Url: https://thelambdalab.xyz/gitweb/index.cgi?a=blobdiff_plain;f=sam.scm;h=c3dcbe6bb833d31dab70cc19cd9c0cb6ce12d3e2;hb=9201f8e605d4704d0b4b780b4fd7a557511b39c9;hp=82f3d7f3eac319fe15c95c96efb3baa8f06feaff;hpb=ae2621e0018bd23c301f4da4ee1990bb6e02d71f;p=sam.git diff --git a/sam.scm b/sam.scm index 82f3d7f..c3dcbe6 100644 --- a/sam.scm +++ b/sam.scm @@ -3,108 +3,195 @@ ;; Houses a population of actors which can communicate using messages ;; with actors on the same machine or other machines via the network. -(import (chicken io) - (chicken string) - matchable - srfi-18 ; threads - srfi-69 ; hash-table - udp - fifo) - -;; Actors - -(define (make-machine host port) - (cons host port)) -(define (machine-host m) (car m)) -(define (machine-port m) (cdr m)) - -(define this-machine (make-machine "localhost" 1234)) - -(define next-actor-id 1) +(module sam + (init-sam + start-console + send-message + make-actor + system) + + (import scheme + (chicken base) + (chicken io) + (chicken string) + (chicken port) + (chicken process-context) + matchable + srfi-18 ; threads + srfi-69 ; hash-table + uuid ; ids for actors + uri-generic + udp + fifo) + + ;; Actors + + (define sam-host "localhost") + (define sam-port 8000) + + (define (make-address host port id) + (uri->string + (make-uri #:scheme "actor" + #:host host + #:port port + #:path (list '/ id)))) + + (define (make-local-address . args) + (make-address sam-host + sam-port + (if (null? args) + (uuid) + (car args)))) + + (define (address-id address) + (cadr (uri-path (uri-reference address)))) -(define (address-id address) (car address)) -(define (address-machine address) (cdr address)) -(define (make-address id machine) - (cons id machine)) + (define address->uri uri-reference) -(define (address-local? address) - (equal? (address-machine address) - this-machine)) + (define (address-local? address) + (let ((uri (address->uri address))) + (and (equal? (uri-host uri) sam-host) + (equal? (uri-port uri) sam-port)))) -(define actor-table (make-hash-table)) + (define actor-table (make-hash-table)) -(define (make-actor beh) - (let* ((id next-actor-id)) - (hash-table-set! actor-table id beh) - (make-address id this-machine))) + (define (make-actor beh) + (let* ((address (make-local-address)) + (id (address-id address))) + (hash-table-set! actor-table id beh) + address)) -(define (deliver-message address . message) - (let ((id (address-id address))) - (let ((behaviour (hash-table-ref/default actor-table id '()))) - (if (null? behaviour) - (print "Warning: discarded message" message " to unknown actor " address) - (match (apply (hash-table-ref actor-table id) (cons address message)) - ('done (hash-table-delete! actor-table actor)) - ('sleep 'do-nothing) - (new-beh (hash-table-set! actor-table actor new-beh))))))) - -;; Scheduler - -(define local-queue-mutex (make-mutex "message queue")) -(define message-available-mutex (make-mutex "message available")) -(mutex-lock! message-available-mutex #f #f) -(define local-queue (make-fifo)) - -(define (send-message address . message) - (apply (if (address-local? address) - send-local-message - send-network-message) - message)) - -(define (send-local-message address . message) - (mutex-lock! local-queue-mutex) - (fifo-push local-queue (cons address message)) - (mutex-unlock! message-available-mutex) - (mutex-unlock! local-queue-mutex)) - -(define (send-network-message address . message) - (let ((s (udp-open-socket)) - (machine (address-machine address))) - (udp-bind! s #f 0) - (udp-connect! s - (machine-host machine) - (machine-port machine)) - (udp-send s message) - (udp-close-socket s))) - -(define (next-local-message) - (let ((res #f)) - (mutex-lock! message-available-mutex #f #f) + (define (deliver-message address . message) + (let ((id (address-id address))) + (let ((behaviour (hash-table-ref/default actor-table id '()))) + (if (null? behaviour) + (print "Warning: discarded message" message " to unknown actor " address) + (match (apply (hash-table-ref actor-table id) (cons address message)) + ('done (hash-table-delete! actor-table id)) + ('sleep 'do-nothing) + (new-beh (hash-table-set! actor-table id new-beh))))))) + + ;; Scheduler + + (define local-queue-mutex (make-mutex "message queue")) + (define message-available-mutex (make-mutex "message available")) + (mutex-lock! message-available-mutex #f #f) + (define local-queue (make-fifo)) + + (define (send-message address . message) + (print "send-message: Sending " message " to " address) + (apply (if (address-local? address) + send-local-message + send-network-message) + (cons address message))) + + (define (send-local-message address . message) (mutex-lock! local-queue-mutex) - (set! res (fifo-pop local-queue)) - (if (not (fifo-empty? local-queue)) - (mutex-unlock! message-available-mutex)) - (mutex-unlock! local-queue-mutex) - res)) - -(define scheduler-thread - (make-thread - (lambda () - (let loop ((next-addressed-message (next-local-message))) - (apply deliver-message next-addressed-message) - (loop (next-local-message)))))) - - -;; Testing - -(thread-start! scheduler-thread) - -(define println - (make-actor (lambda (self . message) - (apply print message) - 'sleep))) - -(print println) -(send-message println "Hello, world!") - -(thread-join! scheduler-thread) + (fifo-push local-queue (cons address message)) + (mutex-unlock! message-available-mutex) + (mutex-unlock! local-queue-mutex)) + + (define (send-network-message address . message) + (let ((s (udp-open-socket)) + (uri (address->uri address)) + (packet (with-output-to-string + (lambda () + (print (cons address message)))))) + (udp-bind! s #f 0) + (udp-connect! s + (uri-host uri) + (uri-port uri)) + (udp-send s packet) + (udp-close-socket s))) + + (define (next-local-message) + (let ((res #f)) + (mutex-lock! message-available-mutex #f #f) + (mutex-lock! local-queue-mutex) + (set! res (fifo-pop local-queue)) + (if (not (fifo-empty? local-queue)) + (mutex-unlock! message-available-mutex)) + (mutex-unlock! local-queue-mutex) + res)) + + (define (start-scheduler) + (thread-start! + (lambda () + (let loop () + (apply deliver-message (next-local-message)) + (loop))))) + + + ;; Network + + (define (start-network-listener) + (thread-start! + (lambda () + (let ((s (udp-open-socket*))) + (udp-bind! s #f sam-port) + (let loop () + (let-values (((n str) (udp-recv s 1024))) + (print "network-listener: Received " n " bytes over network: " str) + (match (with-input-from-string str read) + ((address message ...) + (apply send-message (cons address message))) + (else + (print "Warning: received badly formatted message string '" str "'")))) + (loop)))))) + + ;; System interface + + (define reader-queue-mutex (make-mutex "reader queue")) + (define reader-available-mutex (make-mutex "reader available")) + (mutex-lock! reader-available-mutex #f #f) + (define reader-queue (make-fifo)) + + (define (next-reader) + (let ((res #f)) + (mutex-lock! reader-available-mutex #f #f) + (mutex-lock! reader-queue-mutex) + (set! res (fifo-pop reader-queue)) + (if (not (fifo-empty? reader-queue)) + (mutex-unlock! reader-available-mutex)) + (mutex-unlock! reader-queue-mutex) + res)) + + (define (start-console) + (let loop () + (let ((reader (next-reader))) + (print "console: received next reader: " reader) + (##sys#thread-block-for-i/o! (current-thread) 0 #t) + (thread-yield!) + (send-message reader (read-line))) + (loop))) + + ;; System initialization + + (define (make-system-actor) + (make-actor (lambda (self . message) + (match message + + (('shutdown) + (print "## System actor received shutdown message.") + (exit 0) + 'done) + + (('print strings ...) + (apply print strings) + 'sleep) + + (('read reader) + (mutex-lock! reader-queue-mutex) + (fifo-push reader-queue reader) + (mutex-unlock! reader-available-mutex) + (mutex-unlock! reader-queue-mutex) + 'sleep))))) + + (define system #f) + + (define (init-sam host port) + (set! sam-host host) + (set! sam-port port) + (set! system (make-system-actor)) + (start-scheduler) + (start-network-listener)))