-;; communicate using messages with actors on the same machine or other
-;; machines via the network.
-
-(module sam
- (boot-sam
- make-actor
- send-message
- send-message-later
- address->string
- string->address)
-
- (import scheme
- (chicken base)
- (chicken io)
- (chicken string)
- (chicken port)
- (chicken process-context)
- matchable
- srfi-18 ; threads
- srfi-69 ; hash-table
- uuid ; ids for actors
- uri-generic
- udp
- fifo)
-
- (define trace #f)
-
- ;; Actors
-
- (define sam-host "localhost")
- (define sam-port 8000)
-
- (define (make-address host port id)
- (list id host port))
-
- (define (make-local-address . args)
- (make-address sam-host
- sam-port
- (if (null? args)
- (uuid)
- (car args))))
-
- (define (address-id address)
- (car address))
- (define (address-host address)
- (cadr address))
- (define (address-port address)
- (caddr address))
- (define (address->string address)
- (uri->string
- (make-uri #:scheme "actor"
- #:host (address-host address)
- #:port (address-port address)
- #:path (list '/ (address-id address)))))
- (define (string->address str)
- (let ((uri (uri-reference str)))
- (make-address (uri-host uri)
- (uri-port uri)
- (cadr (uri-path uri)))))
-
- (define (address-local? address)
- (and (equal? (address-host address) sam-host)
- (equal? (address-port address) sam-port)))
-
- (define actor-table (make-hash-table))
-
- (define (make-actor beh)
- (let* ((address (make-local-address))
- (id (address-id address)))
- (hash-table-set! actor-table id beh)
- address))
-
- (define (deliver-message address . message)
- (if trace (print "Delivering to " address ": " message))
- (let ((id (address-id address)))
- (let ((behaviour (hash-table-ref/default actor-table id '())))
- (if (null? behaviour)
- (print "Warning: discarded message " message
- " to unknown actor id " id)
- (match (apply (hash-table-ref actor-table id) (cons address message))
- ('done (hash-table-delete! actor-table id))
- ('sleep 'do-nothing)
- (new-beh (hash-table-set! actor-table id new-beh)))))))
-
- ;; Scheduler
-
- (define local-queue-mutex (make-mutex "message queue"))
- (define message-available-mutex (make-mutex "message available"))
- (mutex-lock! message-available-mutex #f #f)
- (define local-queue (make-fifo))
-
- (define (send-message address . message)
- (apply (if (address-local? address)
- send-local-message
- send-network-message)
- (cons address message)))
-
- (define (send-local-message address . message)
+;; communicate using messages with actors on the same host or other
+;; hosts via the network.
+
+(import scheme
+ (chicken base)
+ (chicken io)
+ (chicken string)
+ (chicken port)
+ (chicken process-context)
+ (chicken file)
+ (chicken condition)
+ matchable
+ srfi-18 ; threads
+ srfi-69 ; hash-table
+ uuid ; ids for actors
+ uri-generic
+ udp
+ fifo
+ sam-macros)
+
+;; Global variables
+
+(define trace #f)
+
+(define sam-host "localhost")
+(define sam-port 8000)
+
+(define sam-version "0.1")
+
+;; Logging
+
+(define (log-msg . args)
+ (with-output-to-port (current-error-port)
+ (lambda ()
+ (apply print (cons "## " args)))))
+
+(define (log-trace . args)
+ (with-output-to-port (current-error-port)
+ (lambda ()
+ (if trace (apply log-msg args)))))
+
+(define (->stringrep arg)
+ (with-output-to-string
+ (lambda ()
+ (write arg))))
+
+;; Behaviours
+;; (See also macros defined in sam-macros.scm.)
+
+(define (beh-proc beh)
+ (car beh))
+(define (beh-parent beh)
+ (cdr beh))
+
+(define root-beh
+ (make-beh : #f (self)
+ (('ping recipient) =>
+ (send-message recipient 'pong))))
+
+;; Actors
+
+(define (make-address host port id)
+ (list id host port))
+
+(define (make-local-address . args)
+ (make-address sam-host
+ sam-port
+ (if (null? args)
+ (uuid)
+ (car args))))
+
+(define (address-id address)
+ (car address))
+(define (address-host address)
+ (cadr address))
+(define (address-port address)
+ (caddr address))
+(define (address->string address)
+ (uri->string
+ (make-uri #:scheme "actor"
+ #:host (address-host address)
+ #:port (address-port address)
+ #:path (list '/ (address-id address)))))
+(define (string->address str)
+ (let ((uri (uri-reference str)))
+ (make-address (uri-host uri)
+ (uri-port uri)
+ (cadr (uri-path uri)))))
+
+(define (address-local? address)
+ (and (equal? (address-host address) sam-host)
+ (equal? (address-port address) sam-port)))
+
+(define actor-table (make-hash-table))
+
+(define (make-actor beh)
+ (let* ((address (make-local-address))
+ (id (address-id address)))
+ (hash-table-set! actor-table id beh)
+ address))
+
+(define (deliver-message address . message)
+ (let ((id (address-id address)))
+ (log-trace "DELIVERING to " id ": " (->stringrep message))
+ (let loop ((beh (hash-table-ref/default actor-table id #f)))
+ (if beh
+ (condition-case
+ (match (apply (beh-proc beh) (cons address message))
+ ('done (hash-table-delete! actor-table id))
+ ('pass
+ (log-trace "Passing to parent behaviour...")
+ (loop (beh-parent beh)))
+ ((? procedure? new-beh) (hash-table-set! actor-table id new-beh))
+ (else
+ 'do-nothing)) ; sleep is now the default
+ (o (exn)
+ (log-msg "Warning: actor " id " crashed evaluating message " (->stringrep message))
+ (print-error-message o)))
+ (log-msg "Warning: DISCARDING message to unknown actor " id ": " (->stringrep message))))))
+
+;; Scheduler
+
+(define local-queue-mutex (make-mutex "message queue"))
+(define message-available-mutex (make-mutex "message available"))
+(mutex-lock! message-available-mutex #f #f)
+(define local-queue (make-fifo))
+
+(define (send-message address . message)
+ (log-trace "SENDING to " address ": " (->stringrep message))
+ (apply (if (address-local? address)
+ send-local-message
+ send-network-message)
+ (cons address message)))
+
+(define (send-local-message address . message)
+ (mutex-lock! local-queue-mutex)
+ (fifo-push local-queue (cons address message))
+ (mutex-unlock! message-available-mutex)
+ (mutex-unlock! local-queue-mutex))
+
+(define (send-network-message address . message)
+ (let ((s (udp-open-socket))
+ (packet (with-output-to-string
+ (lambda ()
+ (write (cons address message))))))
+ (udp-bind! s #f 0)
+ (udp-connect! s
+ (address-host address)
+ (address-port address))
+ (udp-send s packet)
+ (udp-close-socket s)))
+
+(define (send-message-later address time . message)
+ (thread-start!
+ (lambda ()
+ (thread-sleep! time)
+ (apply send-message (cons address message)))))
+
+(define (next-local-message)
+ (let ((res #f))
+ (mutex-lock! message-available-mutex #f #f)