From 2ddb195dc4dace1129e50e10c6992e2ae95fae83 Mon Sep 17 00:00:00 2001 From: Tim Vaughan Date: Fri, 23 Apr 2021 09:39:43 +0200 Subject: [PATCH] Initial commit. --- fifo.scm | 71 ++++++++++++++++++++++++++++++++++++++++++ sam.scm | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 fifo.scm create mode 100644 sam.scm diff --git a/fifo.scm b/fifo.scm new file mode 100644 index 0000000..d2cea22 --- /dev/null +++ b/fifo.scm @@ -0,0 +1,71 @@ +(module fifo + (make-fifo + fifo-push + fifo-pop + fifo-empty? + fifo->list) + + (import scheme + (chicken base) + srfi-18) + + (define (make-fifo) + (define (cell val prev next) + (list val prev next)) + (define cell-val car) + (define cell-prev cadr) + (define cell-next caddr) + (define (set-cell-prev! cell prev-cell) + (set-car! (cdr cell) prev-cell)) + (define (set-cell-next! cell next-cell) + (set-car! (cddr cell) next-cell)) + + (let ((head '()) + (tail '()) + (pop-mutex (make-mutex))) + (mutex-lock! pop-mutex #f #f) + (lambda (cmd . args) + (case cmd + ((empty?) (null? head)) + ((push) + (if (not (null? head)) + (let ((old-head head)) + (set! head (cell (car args) '() old-head)) + (set-cell-prev! old-head head)) + (begin + ;; Initialize list + (set! head (cell (car args) '() '())) + (set! tail head) + (mutex-unlock! pop-mutex)))) + ((pop) + (mutex-lock! pop-mutex #f #f) + (if (null? head) + (error "FIFO empty.") + (let ((old-tail tail)) + (set! tail (cell-prev old-tail)) + (if (null? tail) + (set! head '()) + (begin + (set-cell-next! tail '()) + (mutex-unlock! pop-mutex #f #f))) + (cell-val old-tail))))) + ((->list) (if (not (= (length args) 0)) + (error "Wrong number of arguments to ->list.") + (let loop ((this-cell head)) + (if (null? this-cell) + '() + (cons (cell-val this-cell) + (loop (cell-next this-cell)))))))))) + + + (define (fifo-push fifo x) + (fifo 'push x)) + + (define (fifo-pop fifo) + (fifo 'pop)) + + (define (fifo-empty? fifo) + (fifo 'empty?)) + + (define (fifo->list fifo) + (fifo '->list))) diff --git a/sam.scm b/sam.scm new file mode 100644 index 0000000..431cd1c --- /dev/null +++ b/sam.scm @@ -0,0 +1,94 @@ +;; Simple Actor Machine +;; +;; Houses a population of actors which can communicate using messages +;; with actors on the same machine or other machines via the network. + +(import (chicken io) + (chicken string) + matchable + srfi-18 ; threads + srfi-69 ; hashtable + udp6 + uri-generic) + +;; Actors + +(define (make-machine host port) + (cons host port)) +(define (machine-host m) (car m)) +(define (machine-port m) (cdr m)) + +(define this-machine (make-machine "localhost" 1234)) + +(define next-actor-id 1) + +(define (address-id address) (car address)) +(define (address-machine address) (cdr address)) + +(define (address-local? address) + (equal? (address-machine address) + this-machine)) + +(define actor-table (make-hash-table)) + +(define (make-actor beh) + (let* ((id next-actor-id)) + (hash-table-put! id beh) + (cons id this-machine))) + +(define (deliver-message address . message) + (let ((id (address-id address))) + (let ((behaviour (hash-table-ref/default actor-table id '())))) + (if (null? behaviour) + (print "Warning: discarded message" message " to unknown actor " address) + (match (apply (hash-table-ref actor-table id) (cons address message)) + ('done (hash-table-delete! actor-table actor)) + ('sleep 'do-nothing) + (new-beh (hash-table-put! actor new-beh)))))) + +;; Scheduler + +(define local-queue-mutex (make-mutex "message queue")) +(define message-available-mutex (make-mutex "message available")) +(define local-queue (make-fifo)) + +(define (send-message address . message) + (apply (if (address-local? address) + send-local-message + send-network-message) + message)) + +(define (send-local-message address . message) + (mutex-lock! local-queue-mutex) + (fifo-push local-queue (cons address message)) + (mutex-unlock! local-queue-mutex)) + +(define (send-network-message address . message) + (let ((s (udp-open-socket)) + (machine (address-machine address))) + (udp-bind! s #f 0) + (udp-connect! s + (machine-host machine) + (machine-port machine)) + (udp-send s message) + (udp-close-socket s))) + +(define (next-local-message) + (let ((res #f)) + (mutex-lock! local-queue-mutex) + (set! res (if (fifo-empty? local-queue) + #f + (fifo-pop local-queue))) + (mutex-unlock! local-queue-mutex) + res)) + +(define scheduler-thread + (make-thread + (lambda () + (let loop ((next-addressed-message (next-local-message))) + (if next-addressed-message + (apply deliver-message next-addressed-message) + (begin + (lo)))))) + + (thread-start!)) -- 2.20.1