;; Simple Actor Machine ;; ;; Houses a population of actors which can communicate using messages ;; with actors on the same machine or other machines via the network. (import (chicken io) (chicken string) matchable srfi-18 ; threads srfi-69 ; hashtable udp fifo) ;; Actors (define (make-machine host port) (cons host port)) (define (machine-host m) (car m)) (define (machine-port m) (cdr m)) (define this-machine (make-machine "localhost" 1234)) (define next-actor-id 1) (define (address-id address) (car address)) (define (address-machine address) (cdr address)) (define (address-local? address) (equal? (address-machine address) this-machine)) (define actor-table (make-hash-table)) (define (make-actor beh) (let* ((id next-actor-id)) (hash-table-put! id beh) (cons id this-machine))) (define (deliver-message address . message) (let ((id (address-id address))) (let ((behaviour (hash-table-ref/default actor-table id '()))) (if (null? behaviour) (print "Warning: discarded message" message " to unknown actor " address) (match (apply (hash-table-ref actor-table id) (cons address message)) ('done (hash-table-delete! actor-table actor)) ('sleep 'do-nothing) (new-beh (hash-table-put! actor new-beh))))))) ;; Scheduler (define local-queue-mutex (make-mutex "message queue")) (define message-available-mutex (make-mutex "message available")) (mutex-lock! message-available-mutex #f #f) (define local-queue (make-fifo)) (define (send-message address . message) (apply (if (address-local? address) send-local-message send-network-message) message)) (define (send-local-message address . message) (mutex-lock! local-queue-mutex) (fifo-push local-queue (cons address message)) (mutex-unlock! message-available-mutex) (mutex-unlock! local-queue-mutex)) (define (send-network-message address . message) (let ((s (udp-open-socket)) (machine (address-machine address))) (udp-bind! s #f 0) (udp-connect! s (machine-host machine) (machine-port machine)) (udp-send s message) (udp-close-socket s))) (define (next-local-message) (let ((res #f)) (mutex-lock! message-available-mutex) (mutex-lock! local-queue-mutex) (set! res (fifo-pop local-queue)) (mutex-unlock! message-available-mutex) (mutex-unlock! local-queue-mutex) res)) (define scheduler-thread (make-thread (lambda () (let loop ((next-addressed-message (next-local-message))) (apply deliver-message next-addressed-message) (loop (next-local-message)))))) (thread-start! scheduler-thread) (thread-join! scheduler-thread)