;; Simple Actor Machine ;; ;; Houses a population of actors which can communicate using messages ;; with actors on the same machine or other machines via the network. (module sam (init-sam start-console send-message make-actor system) (import scheme (chicken base) (chicken io) (chicken string) (chicken port) (chicken process-context) matchable srfi-18 ; threads srfi-69 ; hash-table uuid ; ids for actors uri-generic udp fifo) ;; Actors (define sam-host "localhost") (define sam-port 8000) (define (make-address host port id) (uri->string (make-uri #:scheme "actor" #:host host #:port port #:path (list '/ id)))) (define (make-local-address . args) (make-address sam-host sam-port (if (null? args) (uuid) (car args)))) (define (address-id address) (cadr (uri-path (uri-reference address)))) (define address->uri uri-reference) (define (address-local? address) (let ((uri (address->uri address))) (and (equal? (uri-host uri) sam-host) (equal? (uri-port uri) sam-port)))) (define actor-table (make-hash-table)) (define (make-actor beh) (let* ((address (make-local-address)) (id (address-id address))) (hash-table-set! actor-table id beh) address)) (define (deliver-message address . message) (let ((id (address-id address))) (let ((behaviour (hash-table-ref/default actor-table id '()))) (if (null? behaviour) (print "Warning: discarded message" message " to unknown actor " address) (match (apply (hash-table-ref actor-table id) (cons address message)) ('done (hash-table-delete! actor-table id)) ('sleep 'do-nothing) (new-beh (hash-table-set! actor-table id new-beh))))))) ;; Scheduler (define local-queue-mutex (make-mutex "message queue")) (define message-available-mutex (make-mutex "message available")) (mutex-lock! message-available-mutex #f #f) (define local-queue (make-fifo)) (define (send-message address . message) (print "send-message: Sending " message " to " address) (apply (if (address-local? address) send-local-message send-network-message) (cons address message))) (define (send-local-message address . message) (mutex-lock! local-queue-mutex) (fifo-push local-queue (cons address message)) (mutex-unlock! message-available-mutex) (mutex-unlock! local-queue-mutex)) (define (send-network-message address . message) (let ((s (udp-open-socket)) (uri (address->uri address)) (packet (with-output-to-string (lambda () (print (cons address message)))))) (udp-bind! s #f 0) (udp-connect! s (uri-host uri) (uri-port uri)) (udp-send s packet) (udp-close-socket s))) (define (next-local-message) (let ((res #f)) (mutex-lock! message-available-mutex #f #f) (mutex-lock! local-queue-mutex) (set! res (fifo-pop local-queue)) (if (not (fifo-empty? local-queue)) (mutex-unlock! message-available-mutex)) (mutex-unlock! local-queue-mutex) res)) (define (start-scheduler) (thread-start! (lambda () (let loop () (apply deliver-message (next-local-message)) (loop))))) ;; Network (define (start-network-listener) (thread-start! (lambda () (let ((s (udp-open-socket*))) (udp-bind! s #f sam-port) (let loop () (let-values (((n str) (udp-recv s 1024))) (print "network-listener: Received " n " bytes over network: " str) (match (with-input-from-string str read) ((address message ...) (apply send-message (cons address message))) (else (print "Warning: received badly formatted message string '" str "'")))) (loop)))))) ;; System interface (define reader-queue-mutex (make-mutex "reader queue")) (define reader-available-mutex (make-mutex "reader available")) (mutex-lock! reader-available-mutex #f #f) (define reader-queue (make-fifo)) (define (next-reader) (let ((res #f)) (mutex-lock! reader-available-mutex #f #f) (mutex-lock! reader-queue-mutex) (set! res (fifo-pop reader-queue)) (if (not (fifo-empty? reader-queue)) (mutex-unlock! reader-available-mutex)) (mutex-unlock! reader-queue-mutex) res)) (define (start-console) (let loop () (let ((reader (next-reader))) (print "console: received next reader: " reader) (##sys#thread-block-for-i/o! (current-thread) 0 #t) (thread-yield!) (send-message reader (read-line))) (loop))) ;; System initialization (define (make-system-actor) (make-actor (lambda (self . message) (match message (('shutdown) (print "## System actor received shutdown message.") (exit 0) 'done) (('print strings ...) (apply print strings) 'sleep) (('read reader) (mutex-lock! reader-queue-mutex) (fifo-push reader-queue reader) (mutex-unlock! reader-available-mutex) (mutex-unlock! reader-queue-mutex) 'sleep))))) (define system #f) (define (init-sam host port) (set! sam-host host) (set! sam-port port) (set! system (make-system-actor)) (start-scheduler) (start-network-listener)))