X-Git-Url: https://thelambdalab.xyz/gitweb/index.cgi?a=blobdiff_plain;f=sam.scm;h=656338df60930975ae05e64a567969a2105707b8;hb=6406acb5e3365ee9d47d5766393ac260dab3d019;hp=431cd1c41ce9b9cac0e68843cc39ab80ab6aa3c8;hpb=2ddb195dc4dace1129e50e10c6992e2ae95fae83;p=sam.git diff --git a/sam.scm b/sam.scm index 431cd1c..656338d 100644 --- a/sam.scm +++ b/sam.scm @@ -7,9 +7,9 @@ (chicken string) matchable srfi-18 ; threads - srfi-69 ; hashtable - udp6 - uri-generic) + srfi-69 ; hash-table + udp + fifo) ;; Actors @@ -20,10 +20,21 @@ (define this-machine (make-machine "localhost" 1234)) -(define next-actor-id 1) +(define next-actor-id + (let ((mutex (make-mutex "actor id mutex")) + (next-id 1)) + (lambda () + (let ((res #f)) + (mutex-lock! mutex) + (set! res next-id) + (set! next-id (+ next-id 1)) + (mutex-unlock! mutex) + res)))) (define (address-id address) (car address)) (define (address-machine address) (cdr address)) +(define (make-address id machine) + (cons id machine)) (define (address-local? address) (equal? (address-machine address) @@ -32,35 +43,37 @@ (define actor-table (make-hash-table)) (define (make-actor beh) - (let* ((id next-actor-id)) - (hash-table-put! id beh) - (cons id this-machine))) + (let* ((id (next-actor-id))) + (hash-table-set! actor-table id beh) + (make-address id this-machine))) (define (deliver-message address . message) (let ((id (address-id address))) - (let ((behaviour (hash-table-ref/default actor-table id '())))) - (if (null? behaviour) - (print "Warning: discarded message" message " to unknown actor " address) - (match (apply (hash-table-ref actor-table id) (cons address message)) - ('done (hash-table-delete! actor-table actor)) - ('sleep 'do-nothing) - (new-beh (hash-table-put! actor new-beh)))))) + (let ((behaviour (hash-table-ref/default actor-table id '()))) + (if (null? behaviour) + (print "Warning: discarded message" message " to unknown actor " address) + (match (apply (hash-table-ref actor-table id) (cons address message)) + ('done (hash-table-delete! actor-table id)) + ('sleep 'do-nothing) + (new-beh (hash-table-set! actor-table id new-beh))))))) ;; Scheduler (define local-queue-mutex (make-mutex "message queue")) (define message-available-mutex (make-mutex "message available")) +(mutex-lock! message-available-mutex #f #f) (define local-queue (make-fifo)) (define (send-message address . message) (apply (if (address-local? address) send-local-message send-network-message) - message)) + (cons address message))) (define (send-local-message address . message) (mutex-lock! local-queue-mutex) (fifo-push local-queue (cons address message)) + (mutex-unlock! message-available-mutex) (mutex-unlock! local-queue-mutex)) (define (send-network-message address . message) @@ -75,20 +88,85 @@ (define (next-local-message) (let ((res #f)) + (mutex-lock! message-available-mutex #f #f) (mutex-lock! local-queue-mutex) - (set! res (if (fifo-empty? local-queue) - #f - (fifo-pop local-queue))) + (set! res (fifo-pop local-queue)) + (if (not (fifo-empty? local-queue)) + (mutex-unlock! message-available-mutex)) (mutex-unlock! local-queue-mutex) res)) -(define scheduler-thread - (make-thread +(define (start-scheduler) + (thread-start! (lambda () - (let loop ((next-addressed-message (next-local-message))) - (if next-addressed-message - (apply deliver-message next-addressed-message) - (begin - (lo)))))) + (let loop () + (apply deliver-message (next-local-message)) + (loop))))) + +;; Console + +(define reader-queue-mutex (make-mutex "reader queue")) +(define reader-available-mutex (make-mutex "reader available")) +(mutex-lock! reader-available-mutex #f #f) +(define reader-queue (make-fifo)) + +(define console + (make-actor (lambda (self . message) + (mutex-lock! reader-queue-mutex) + (fifo-push reader-queue (car message)) + (mutex-unlock! reader-available-mutex) + (mutex-unlock! reader-queue-mutex) + 'sleep))) + +(define (next-reader) + (let ((res #f)) + (mutex-lock! reader-available-mutex #f #f) + (mutex-lock! reader-queue-mutex) + (set! res (fifo-pop reader-queue)) + (if (not (fifo-empty? reader-queue)) + (mutex-unlock! reader-available-mutex)) + (mutex-unlock! reader-queue-mutex) + res)) + +(define (start-console) + (let loop () + (let ((reader (next-reader))) + (##sys#thread-block-for-i/o! (current-thread) 0 #t) + (thread-yield!) + (send-message reader (read-line))) + (loop))) + + +;; System interface + +(define system + (make-actor (lambda (self . message) + (match message + (('shutdown) + (print "## System actor received shutdown message.") + (exit 0) + 'done) + (('println strings ...) + (apply print strings) + 'sleep))))) + +;; Testing + + +(send-message system 'println "Hello, what is your name?") +(send-message console + (make-actor (lambda (self . message) + (match message + ((name) + (send-message system 'println "Hello, " name "!") + 'done))))) + +(thread-start! + (lambda () + (thread-sleep! 10) + (send-message system 'shutdown))) + +(start-scheduler) +(start-console) - (thread-start!)) +;; (thread-join! scheduler-thread)