From 9201f8e605d4704d0b4b780b4fd7a557511b39c9 Mon Sep 17 00:00:00 2001 From: Tim Vaughan Date: Wed, 28 Apr 2021 18:02:05 +0200 Subject: [PATCH] SAM is now a module. --- sam.scm | 395 +++++++++++++++++++++++++++----------------------------- 1 file changed, 189 insertions(+), 206 deletions(-) diff --git a/sam.scm b/sam.scm index 511b953..c3dcbe6 100644 --- a/sam.scm +++ b/sam.scm @@ -3,212 +3,195 @@ ;; Houses a population of actors which can communicate using messages ;; with actors on the same machine or other machines via the network. -(import (chicken io) - (chicken string) - (chicken port) - matchable - srfi-18 ; threads - srfi-69 ; hash-table - uuid ; ids for actors - uri-generic - udp - fifo) - -;; Actors - -(define this-host "localhost") -(define this-port 8000) - -(define (make-address host port id) - (make-uri #:scheme "actor" - #:host host - #:port port - #:path (list '/ id))) - -(define address->string uri->string) -(define string->address uri-reference) - -(define (make-local-address . args) - (make-address this-host - this-port - (if (null? args) - (uuid) - (car args)))) - -(define (address-id address) (cadr (uri-path address))) - -(define (address-local? address) - (and (equal? (uri-host address) this-host) - (equal? (uri-port address) this-port))) - -(define actor-table (make-hash-table)) - -(define (make-actor beh) - (let* ((address (make-local-address)) - (id (address-id address))) - (hash-table-set! actor-table id beh) - address)) +(module sam + (init-sam + start-console + send-message + make-actor + system) + + (import scheme + (chicken base) + (chicken io) + (chicken string) + (chicken port) + (chicken process-context) + matchable + srfi-18 ; threads + srfi-69 ; hash-table + uuid ; ids for actors + uri-generic + udp + fifo) + + ;; Actors + + (define sam-host "localhost") + (define sam-port 8000) + + (define (make-address host port id) + (uri->string + (make-uri #:scheme "actor" + #:host host + #:port port + #:path (list '/ id)))) + + (define (make-local-address . args) + (make-address sam-host + sam-port + (if (null? args) + (uuid) + (car args)))) -(define (deliver-message address . message) - (let ((id (address-id address))) - (let ((behaviour (hash-table-ref/default actor-table id '()))) - (if (null? behaviour) - (print "Warning: discarded message" message " to unknown actor " address) - (match (apply (hash-table-ref actor-table id) (cons address message)) - ('done (hash-table-delete! actor-table id)) - ('sleep 'do-nothing) - (new-beh (hash-table-set! actor-table id new-beh))))))) - -;; Scheduler - -(define local-queue-mutex (make-mutex "message queue")) -(define message-available-mutex (make-mutex "message available")) -(mutex-lock! message-available-mutex #f #f) -(define local-queue (make-fifo)) - -(define (send-message address . message) - (apply (if (address-local? address) - send-local-message - send-network-message) - (cons address message))) - -(define (send-local-message address . message) - (mutex-lock! local-queue-mutex) - (fifo-push local-queue (cons address message)) - (mutex-unlock! message-available-mutex) - (mutex-unlock! local-queue-mutex)) - -(define (send-network-message address . message) - (let ((s (udp-open-socket)) - (machine (address-machine address)) - (packet (with-output-to-string) - (lambda () - (print (cons (address->string address) message))))) - (udp-bind! s #f 0) - (udp-connect! s - (machine-host machine) - (machine-port machine)) - (udp-send s packet) - (udp-close-socket s))) - -(define (next-local-message) - (let ((res #f)) - (mutex-lock! message-available-mutex #f #f) + (define (address-id address) + (cadr (uri-path (uri-reference address)))) + + (define address->uri uri-reference) + + (define (address-local? address) + (let ((uri (address->uri address))) + (and (equal? (uri-host uri) sam-host) + (equal? (uri-port uri) sam-port)))) + + (define actor-table (make-hash-table)) + + (define (make-actor beh) + (let* ((address (make-local-address)) + (id (address-id address))) + (hash-table-set! actor-table id beh) + address)) + + (define (deliver-message address . message) + (let ((id (address-id address))) + (let ((behaviour (hash-table-ref/default actor-table id '()))) + (if (null? behaviour) + (print "Warning: discarded message" message " to unknown actor " address) + (match (apply (hash-table-ref actor-table id) (cons address message)) + ('done (hash-table-delete! actor-table id)) + ('sleep 'do-nothing) + (new-beh (hash-table-set! actor-table id new-beh))))))) + + ;; Scheduler + + (define local-queue-mutex (make-mutex "message queue")) + (define message-available-mutex (make-mutex "message available")) + (mutex-lock! message-available-mutex #f #f) + (define local-queue (make-fifo)) + + (define (send-message address . message) + (print "send-message: Sending " message " to " address) + (apply (if (address-local? address) + send-local-message + send-network-message) + (cons address message))) + + (define (send-local-message address . message) (mutex-lock! local-queue-mutex) - (set! res (fifo-pop local-queue)) - (if (not (fifo-empty? local-queue)) - (mutex-unlock! message-available-mutex)) - (mutex-unlock! local-queue-mutex) - res)) - -(define (start-scheduler) - (thread-start! - (lambda () - (let loop () - (apply deliver-message (next-local-message)) - (loop))))) - -;; Console - -(define reader-queue-mutex (make-mutex "reader queue")) -(define reader-available-mutex (make-mutex "reader available")) -(mutex-lock! reader-available-mutex #f #f) -(define reader-queue (make-fifo)) - -(define console - (make-actor (lambda (self . message) - (mutex-lock! reader-queue-mutex) - (fifo-push reader-queue (car message)) - (mutex-unlock! reader-available-mutex) - (mutex-unlock! reader-queue-mutex) - 'sleep))) - -(define (next-reader) - (let ((res #f)) - (mutex-lock! reader-available-mutex #f #f) - (mutex-lock! reader-queue-mutex) - (set! res (fifo-pop reader-queue)) - (if (not (fifo-empty? reader-queue)) - (mutex-unlock! reader-available-mutex)) - (mutex-unlock! reader-queue-mutex) - res)) - -(define (start-console) - (let loop () - (let ((reader (next-reader))) - (##sys#thread-block-for-i/o! (current-thread) 0 #t) - (thread-yield!) - (send-message reader (read-line))) - (loop))) - - -;; Network - -(define (start-network-listener) - (thread-start! - (lambda () - (let ((s (udp-open-socket*))) - (udp-bind! s #f this-port) + (fifo-push local-queue (cons address message)) + (mutex-unlock! message-available-mutex) + (mutex-unlock! local-queue-mutex)) + + (define (send-network-message address . message) + (let ((s (udp-open-socket)) + (uri (address->uri address)) + (packet (with-output-to-string + (lambda () + (print (cons address message)))))) + (udp-bind! s #f 0) + (udp-connect! s + (uri-host uri) + (uri-port uri)) + (udp-send s packet) + (udp-close-socket s))) + + (define (next-local-message) + (let ((res #f)) + (mutex-lock! message-available-mutex #f #f) + (mutex-lock! local-queue-mutex) + (set! res (fifo-pop local-queue)) + (if (not (fifo-empty? local-queue)) + (mutex-unlock! message-available-mutex)) + (mutex-unlock! local-queue-mutex) + res)) + + (define (start-scheduler) + (thread-start! + (lambda () (let loop () - (let-values (((n str) (udp-recv s 1024))) - (print "Received " n " bytes over network: " str) - (match (with-input-from-string str read) - ((addr-str message ...) - (apply send-message (cons (string->address addr-str) message))) - (else - (print "Warning: received badly formatted message string '" str "'")))) - (loop)))))) - -;; System interface - -(define system - (make-actor (lambda (self . message) - (match message - (('shutdown) - (print "## System actor received shutdown message.") - (exit 0) - 'done) - (('println strings ...) - (apply print strings) - 'sleep))))) - -;; Testing - - -(send-message system 'println "Hello, what is your name?") -(send-message console - (make-actor (lambda (self . message) - (match message - ((name) - (send-message system 'println "Hello, " name "!") - 'done))))) - -(thread-start! - (lambda () - (thread-sleep! 120) - (send-message system 'shutdown))) - -(print (uri->string system)) - -(start-scheduler) -(start-network-listener) -(start-console) - -(define (boot-sam host port) - (start-scheduler)) - -;; (thread-join! scheduler-thread) - -(define (main) - (let loop ((args (cdr (argv))) - (host "localhost") - (port 8000)) - (match args - ((or "-h" "--help") - (print-usage)) - (((or "-p" "--port") pstr rest ...) - (loop rest host (string->number pstr))) - (("--hostname" hstr rest ...) - (loop rest hstr port)) - (() - (boot-sam host port))))) + (apply deliver-message (next-local-message)) + (loop))))) + + + ;; Network + + (define (start-network-listener) + (thread-start! + (lambda () + (let ((s (udp-open-socket*))) + (udp-bind! s #f sam-port) + (let loop () + (let-values (((n str) (udp-recv s 1024))) + (print "network-listener: Received " n " bytes over network: " str) + (match (with-input-from-string str read) + ((address message ...) + (apply send-message (cons address message))) + (else + (print "Warning: received badly formatted message string '" str "'")))) + (loop)))))) + + ;; System interface + + (define reader-queue-mutex (make-mutex "reader queue")) + (define reader-available-mutex (make-mutex "reader available")) + (mutex-lock! reader-available-mutex #f #f) + (define reader-queue (make-fifo)) + + (define (next-reader) + (let ((res #f)) + (mutex-lock! reader-available-mutex #f #f) + (mutex-lock! reader-queue-mutex) + (set! res (fifo-pop reader-queue)) + (if (not (fifo-empty? reader-queue)) + (mutex-unlock! reader-available-mutex)) + (mutex-unlock! reader-queue-mutex) + res)) + + (define (start-console) + (let loop () + (let ((reader (next-reader))) + (print "console: received next reader: " reader) + (##sys#thread-block-for-i/o! (current-thread) 0 #t) + (thread-yield!) + (send-message reader (read-line))) + (loop))) + + ;; System initialization + + (define (make-system-actor) + (make-actor (lambda (self . message) + (match message + + (('shutdown) + (print "## System actor received shutdown message.") + (exit 0) + 'done) + + (('print strings ...) + (apply print strings) + 'sleep) + + (('read reader) + (mutex-lock! reader-queue-mutex) + (fifo-push reader-queue reader) + (mutex-unlock! reader-available-mutex) + (mutex-unlock! reader-queue-mutex) + 'sleep))))) + + (define system #f) + + (define (init-sam host port) + (set! sam-host host) + (set! sam-port port) + (set! system (make-system-actor)) + (start-scheduler) + (start-network-listener))) -- 2.20.1