;; Simple Actor Machine
;;
-;; Houses a population of actors which can communicate using messages
-;; with actors on the same machine or other machines via the network.
+;; A virtual machine which houses a population of actors which can
+;; communicate using messages with actors on the same machine or other
+;; machines via the network.
(module sam
- (init-sam
- start-console
- send-message
+ (boot-sam
make-actor
- system)
+ send-message
+ send-message-later
+ address->string
+ string->address)
(import scheme
(chicken base)
udp
fifo)
+ (define trace #f)
+
;; Actors
(define sam-host "localhost")
(define sam-port 8000)
(define (make-address host port id)
- (uri->string
- (make-uri #:scheme "actor"
- #:host host
- #:port port
- #:path (list '/ id))))
+ (list id host port))
(define (make-local-address . args)
(make-address sam-host
(car args))))
(define (address-id address)
- (cadr (uri-path (uri-reference address))))
-
- (define address->uri uri-reference)
+ (car address))
+ (define (address-host address)
+ (cadr address))
+ (define (address-port address)
+ (caddr address))
+ (define (address->string address)
+ (uri->string
+ (make-uri #:scheme "actor"
+ #:host (address-host address)
+ #:port (address-port address)
+ #:path (list '/ (address-id address)))))
+ (define (string->address str)
+ (let ((uri (uri-reference str)))
+ (make-address (uri-host uri)
+ (uri-port uri)
+ (cadr (uri-path uri)))))
(define (address-local? address)
- (let ((uri (address->uri address)))
- (and (equal? (uri-host uri) sam-host)
- (equal? (uri-port uri) sam-port))))
+ (and (equal? (address-host address) sam-host)
+ (equal? (address-port address) sam-port)))
(define actor-table (make-hash-table))
address))
(define (deliver-message address . message)
+ (if trace (print "Delivering to " address ": " message))
(let ((id (address-id address)))
(let ((behaviour (hash-table-ref/default actor-table id '())))
(if (null? behaviour)
- (print "Warning: discarded message" message " to unknown actor " address)
+ (print "Warning: discarded message " message
+ " to unknown actor id " id)
(match (apply (hash-table-ref actor-table id) (cons address message))
('done (hash-table-delete! actor-table id))
('sleep 'do-nothing)
(define local-queue (make-fifo))
(define (send-message address . message)
- (print "send-message: Sending " message " to " address)
(apply (if (address-local? address)
send-local-message
send-network-message)
(define (send-network-message address . message)
(let ((s (udp-open-socket))
- (uri (address->uri address))
(packet (with-output-to-string
(lambda ()
- (print (cons address message))))))
+ (write (cons address message))))))
(udp-bind! s #f 0)
(udp-connect! s
- (uri-host uri)
- (uri-port uri))
+ (address-host address)
+ (address-port address))
(udp-send s packet)
(udp-close-socket s)))
+ (define (send-message-later address time . message)
+ (thread-start!
+ (lambda ()
+ (thread-sleep! time)
+ (apply send-message (cons address message)))))
+
(define (next-local-message)
(let ((res #f))
(mutex-lock! message-available-mutex #f #f)
res))
(define (start-scheduler)
- (thread-start!
- (lambda ()
- (let loop ()
- (apply deliver-message (next-local-message))
- (loop)))))
+ (let loop ()
+ (apply deliver-message (next-local-message))
+ (loop)))
;; Network
(let ((s (udp-open-socket*)))
(udp-bind! s #f sam-port)
(let loop ()
- (let-values (((n str) (udp-recv s 1024)))
- (print "network-listener: Received " n " bytes over network: " str)
+ (let-values (((n str) (udp-recv s 65536)))
(match (with-input-from-string str read)
((address message ...)
(apply send-message (cons address message)))
res))
(define (start-console)
- (let loop ()
- (let ((reader (next-reader)))
- (print "console: received next reader: " reader)
- (##sys#thread-block-for-i/o! (current-thread) 0 #t)
- (thread-yield!)
- (send-message reader (read-line)))
- (loop)))
+ (thread-start!
+ (lambda ()
+ (let loop ()
+ (let ((reader (next-reader)))
+ (##sys#thread-block-for-i/o! (current-thread) 0 #t)
+ (thread-yield!)
+ (send-message reader (read-line)))
+ (loop)))))
;; System initialization
- (define (make-system-actor)
- (make-actor (lambda (self . message)
- (match message
-
- (('shutdown)
- (print "## System actor received shutdown message.")
- (exit 0)
- 'done)
+ (define (system-beh self . message)
+ (match message
- (('print strings ...)
- (apply print strings)
- 'sleep)
+ (('shutdown)
+ (print "## System actor received shutdown message.")
+ (exit 0)
+ 'done)
- (('read reader)
- (mutex-lock! reader-queue-mutex)
- (fifo-push reader-queue reader)
- (mutex-unlock! reader-available-mutex)
- (mutex-unlock! reader-queue-mutex)
- 'sleep)))))
+ (('print strings ...)
+ (apply print strings)
+ 'sleep)
- (define system #f)
+ (('read reader)
+ (mutex-lock! reader-queue-mutex)
+ (fifo-push reader-queue reader)
+ (mutex-unlock! reader-available-mutex)
+ (mutex-unlock! reader-queue-mutex)
+ 'sleep)))
- (define (init-sam host port)
+ (define (boot-sam host port main-beh)
(set! sam-host host)
(set! sam-port port)
- (set! system (make-system-actor))
- (start-scheduler)
- (start-network-listener)))
+ (start-console)
+ (start-network-listener)
+ (send-message (make-actor main-beh) (make-actor system-beh))
+ (start-scheduler)))