Commit 88e4b3e0 authored by Ludovic Courtès's avatar Ludovic Courtès
Browse files

servers: Add 'unmonitor-client'.

* jupyter/servers.scm (unmonitor-client): New procedure.
(serve-kernels)[handle-abort]: Change to simply accumulate requests.
[handle-requests]: New procedure.
Adjust call to 'handle-message' accordingly.
* tests/servers.scm ("ping pong, many clients leaving dynamically,
shell"): New test.
parent f985eaac
......@@ -25,11 +25,13 @@
#:use-module (ice-9 vlist)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-26)
#:use-module (srfi srfi-71)
#:export (connection->kernel
connection-file->kernel
serve-kernels
monitor-client
unmonitor-client
leave-server-loop))
;;; Commentary:
......@@ -95,12 +97,19 @@
(make-prompt-tag "kernel-exit"))
(define (monitor-client kernel)
"Add KERNEL to the list of client monitored by the server loop.
"Add KERNEL to the list of clients monitored by the server loop.
This procedure must be called from the dynamic extent of a 'server-kernels'
call."
(abort-to-prompt %server-scheduler-prompt 'monitor-kernel kernel))
(define (unmonitor-client kernel)
"Remove KERNEL from the list of clients monitored by the server loop.
This procedure must be called from the dynamic extent of a 'server-kernels'
call."
(abort-to-prompt %server-scheduler-prompt 'unmonitor-kernel kernel))
(define (leave-server-loop state)
"Leave the current server loop and have it return STATE.
......@@ -148,15 +157,28 @@ each message received."
;; Handle aborts to the prompt. We need an "+F+" kind of operator
;; (info "(guile) Shift and Reset") where we reinstate a prompt
;; before invoking the continuation, hence the self-reference.
(match args
(('monitor-kernel kernel)
(let ((kernels (cons kernel kernels)))
(format log-port "now monitoring ~a clients~%"
(length kernels))
(serve kernels
(kernels->socket-lookup kernels)
(call-with-prompt %server-scheduler-prompt
k handle-abort))))))
(call-with-prompt %server-scheduler-prompt
(lambda ()
(let ((state requests (k)))
;; Memorize the request ARGS so we can process it later.
(values state (cons args requests))))
handle-abort))
(define (handle-requests requests state)
;; Handle the 'monitor-kernel' and 'unmonitor-kernel' REQUESTS.
(let ((kernels (fold (lambda (request kernels)
(match request
(('monitor-kernel kernel)
(cons kernel kernels))
(('unmonitor-kernel kernel)
(delq kernel kernels))))
kernels
requests)))
(format log-port "[~a] now monitoring ~a kernels~%"
(getpid) (length kernels))
(serve kernels
(kernels->socket-lookup kernels)
state)))
(let loop ((items items)
(state state))
......@@ -175,11 +197,21 @@ each message received."
(zmq-send-msg-parts-bytevector (kernel-heartbeat kernel)
parts)
(loop rest state))
(let ((message (parts->message parts)))
(loop rest
(call-with-prompt %server-scheduler-prompt
(lambda ()
(handle-message kernel kind message state))
handle-abort))))))))))
(let* ((message (parts->message parts))
(state requests
(call-with-prompt %server-scheduler-prompt
(lambda ()
(values (handle-message kernel kind
message state)
'()))
handle-abort)))
(match requests
(()
(loop rest state))
(_
;; We got a series of requests as aborts to
;; %SERVER-SCHEDULER-PROMPT while calling
;; HANDLE-MESSAGE; process them now.
(handle-requests requests state)))))))))))
(lambda (_ state)
state)))
......@@ -232,6 +232,51 @@
(for-each close-kernel servers)
counts))))
(test-equal "ping pong, many clients leaving dynamically, shell"
10
;; Start KERNELS clients, each of which sends 1 message to the server,
;; which replies and then "unmonitors" the sender.
(let* ((kernels 10) ;number of kernels
(connections
clients
(unzip2
(map (lambda (i)
(define-values (connection client)
(allocate-connection %client-context
"tcp" "127.0.0.1"
%kernel-key
#:first-port
(+ %first-port (* i 10))))
(list connection client))
(iota kernels))))
(servers (map (lambda (connection)
(connection->kernel connection
#:context
%server-context))
connections)))
(define (handle-message kernel kind message count)
(send-message kernel (reply message "pong" "{}"))
(unmonitor-client kernel)
(close-kernel kernel)
(let ((count (+ count 1)))
(if (>= count kernels)
(leave-server-loop count)
count)))
(define (client-proc client)
(let ((request (message (header "ping" "foo" "12345") "{}")))
(send-message client request)
(read-message client)
(close-kernel client)))
(for-each (lambda (client)
(call-with-new-thread (cut client-proc client)))
clients)
;; The server state is a message counter.
(serve-kernels servers handle-message 0)))
(test-equal "heartbeat, one kernel"
"success"
;; Ensure that heartbeat messages are automatically echoed back.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment