Commit fe70f6da authored by Ludovic Courtès's avatar Ludovic Courtès
Browse files

servers: Add 'serve-kernels'.

* jupyter/kernels.scm (kernel-socket-kind): New procedure.
(zmq-poll*, allocate-connection): Export.
* jupyter/servers.scm (%server-scheduler-prompt, %server-exit-prompt):
New variables.
* jupyter/servers.scm (monitor-client, leave-server-loop)
(serve-kernels): New procedures.
* tests/servers.scm: New file.
parent 5093a6ea
......@@ -95,3 +95,5 @@ anaconda-mode/
/build-aux/install-sh
/build-aux/missing
/configure
/tests/servers.log
/tests/servers.trs
......@@ -43,6 +43,7 @@ SCM_TESTS = \
tests/magic.scm \
tests/hmac.scm \
tests/kernels.scm \
tests/servers.scm \
tests/environ.scm
TESTS = $(SCM_TESTS)
......
......@@ -51,6 +51,7 @@
kernel-iosub
kernel-iopub ;alias
kernel-sockets
kernel-socket-kind
connection?
connection-transport
......@@ -63,7 +64,9 @@
connection-heartbeat-port
connection-iopub-port
json->connection
allocate-connection
zmq-poll* ;XXX: temporary hack
read-message
send-message
relay-message
......@@ -125,6 +128,20 @@
kernel-heartbeat
kernel-iopub)))
(define (kernel-socket-kind kernel socket)
"Return the procedure (e.g., 'kernel-shell', 'kernel-control') that, when
applied on KERNEL, returns SOCKET. This allows the caller to determine the
purpose of SOCKET for KERNEL.
Return #f if SOCKET is not one of KERNEL's sockets."
(find (lambda (kernel-socket)
(eq? (kernel-socket kernel) socket))
(list kernel-shell
kernel-control
kernel-standard-input
kernel-heartbeat
kernel-iopub)))
(define (close-kernel kernel)
"Close all the open connections of KERNEL."
(for-each zmq-close-socket (kernel-sockets kernel)))
......
......@@ -22,8 +22,15 @@
#:use-module (json)
#:use-module (rnrs bytevectors)
#:use-module (ice-9 match)
#:use-module (ice-9 vlist)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-26)
#:export (connection->kernel
connection-file->kernel))
connection-file->kernel
serve-kernels
monitor-client
leave-server-loop))
;;; Commentary:
;;;
......@@ -68,3 +75,111 @@
;; See <https://jupyter-client.readthedocs.io/en/stable/kernels.html#connection-files>.
(compose connection->kernel json->connection json->scm))
;;;
;;; Server loop.
;;;
;; We would happily use Fibers instead of rolling our own half-baked
;; scheduler. Alas, zmq uses its own I/O threads by default; it's possible
;; to set it to 0 threads with ZMQ_IO_THREADS but that voids your warranty
;; (and it doesn't work, too.)
(define %server-scheduler-prompt
;; Prompt to abort to to communicate with the server loop.
(make-prompt-tag "kernel-scheduler"))
(define %server-exit-prompt
;; Prompt to abort to to exit the server loop.
(make-prompt-tag "kernel-exit"))
(define (monitor-client kernel)
"Add KERNEL to the list of client monitored by the server loop.
This procedure must be called from the dynamic extent of a 'server-kernels'
call."
(abort-to-prompt %server-scheduler-prompt 'monitor-kernel kernel))
(define (leave-server-loop state)
"Leave the current server loop and have it return STATE.
This procedure must be called from the dynamic extent of a 'server-kernels'
call."
(abort-to-prompt %server-exit-prompt state))
(define* (serve-kernels kernels handle-message seed
#:key (log-port (current-error-port)))
"Wait for messages sent to KERNELS, a list of <kernel> records as returned
by 'connection->kernel', and pass each of them to HANDLE-MESSAGE. SEED is
the initial state of the server; it is passed to HANDLE-MESSAGE along with
each message received."
(define timeout -1) ;wait forever
(define (kernels->socket-lookup kernels)
;; Return a procedure that, given a zmq socket, return the <kernel> among
;; KERNELS it belongs to.
(define table
(fold (lambda (kernel table)
(fold (lambda (socket table)
(vhash-consq socket kernel table))
table
(kernel-sockets kernel)))
vlist-null
kernels))
(lambda (socket)
(match (vhash-assq socket table)
((_ . kernel) kernel)
(#f #f))))
(call-with-prompt %server-exit-prompt
(lambda ()
(let serve ((kernels kernels)
(socket->kernel (kernels->socket-lookup kernels))
(state seed))
(define items
(zmq-poll* (map (lambda (socket)
(poll-item socket (logior ZMQ_POLLIN ZMQ_POLLERR)))
(append-map kernel-sockets kernels))
timeout))
(define (handle-abort k . args)
;; Handle aborts to the prompt. We need an "+F+" kind of operator
;; (info "(guile) Shift and Reset") where we reinstate a prompt
;; before invoking the continuation, hence the self-reference.
(match args
(('monitor-kernel kernel)
(let ((kernels (cons kernel kernels)))
(format log-port "now monitoring ~a clients~%"
(length kernels))
(serve kernels
(kernels->socket-lookup kernels)
(call-with-prompt %server-scheduler-prompt
k handle-abort))))))
(let loop ((items items)
(state state))
(match items
(()
(serve kernels socket->kernel state))
((item rest ...)
(let* ((socket (poll-item-socket item))
(kernel (socket->kernel socket))
(kind (kernel-socket-kind kernel socket))
(parts (zmq-get-msg-parts-bytevector socket)))
;; Heartbeat messages are raw "bytestrings" that should be echoed
;; back right away.
(if (eq? kind kernel-heartbeat)
(begin
(zmq-send-msg-parts-bytevector (kernel-heartbeat kernel)
parts)
(loop rest state))
(let ((message (parts->message parts)))
(loop rest
(call-with-prompt %server-scheduler-prompt
(lambda ()
(handle-message kernel kind message state))
handle-abort))))))))))
(lambda (_ state)
state)))
;;; Guix-kernel -- Guix kernel for Jupyter
;;; Copyright (C) 2019 Inria
;;;
;;; This program is free software: you can redistribute it and/or modify
;;; it under the terms of the GNU General Public License as published by
;;; the Free Software Foundation, either version 3 of the License, or
;;; (at your option) any later version.
;;;
;;; This program is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;;; GNU General Public License for more details.
;;;
;;; You should have received a copy of the GNU General Public License
;;; along with this program. If not, see <https://www.gnu.org/licenses/>.
(define-module (tests kernels)
#:use-module (jupyter kernels)
#:use-module (jupyter messages)
#:use-module (jupyter servers)
#:use-module (simple-zmq)
#:use-module (json)
#:use-module (rnrs bytevectors)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-26)
#:use-module (srfi srfi-64)
#:use-module (srfi srfi-71)
#:use-module (ice-9 threads)
#:use-module (ice-9 match))
(define %client-context
(zmq-create-context))
(define %server-context
(zmq-create-context))
(define %kernel-key "secretkey")
(define %first-port
;; First TCP port to use. This is meant to avoid collisions with other
;; tests that may be running in parallel.
1500)
(test-begin "servers")
(test-equal "ping pong, one kernel, shell"
42
(let* ((connection client (allocate-connection %client-context
"tcp" "127.0.0.1"
%kernel-key
#:first-port %first-port))
(server (connection->kernel connection
#:context %server-context)))
(define (handle-message kernel kind message state)
(pk 'handle kernel message)
(let ((last? (= state 42)))
(send-message kernel
(if last?
(reply message "quit" "{}")
(reply message "pong" (scm->json-string state))))
(if last?
(leave-server-loop state)
(+ 1 state))))
(define (client-thunk)
(let ((request (message (header "ping" "foo" "12345") "{}")))
(send-message client request)
(let ((message (read-message client)))
(if (string=? "quit" (message-type message))
(close-kernel client)
(client-thunk)))))
(call-with-new-thread client-thunk)
(let ((result (serve-kernels (list server) handle-message 0)))
(close-kernel server)
result)))
(test-equal "ping pong, one kernel, shell + control + stdin"
(make-list 3 1) ;1 message received on each of the 3 sockets
(let* ((connection client (allocate-connection %client-context
"tcp" "127.0.0.1"
%kernel-key
#:first-port %first-port))
(server (connection->kernel connection
#:context %server-context)))
(define (handle-message kernel kind message state)
(send-message kernel (reply (pk 'handle kind message) "pong" "{}")
#:kernel-socket kind)
(let* ((n (or (assq-ref state kind) 0))
(state (alist-cons kind (+ n 1)
(alist-delete kind state eq?)))
(total (reduce + 0 (map cdr state))))
(if (>= total 3)
(leave-server-loop state)
state)))
(define (client-thunk)
(for-each (lambda (socket)
(let ((request (message (header "ping" "foo" "12345") "{}")))
(send-message client request
#:kernel-socket socket)
(read-message client)))
(list kernel-shell kernel-standard-input kernel-control))
(close-kernel client))
(call-with-new-thread client-thunk)
;; The server's state is an alist mapping each kernel socket procedure to
;; the number of messages received on that socket.
(match (serve-kernels (list server) handle-message '())
((((? procedure?) . counts) ...)
(close-kernel server)
counts))))
(test-equal "ping pong, many clients, shell"
(make-list 10 77) ;number of messages received from each kernel
;; Start KERNELS clients, each of which sends ROUNDS messages to the
;; server, which replies.
(let* ((kernels 10) ;number of kernels
(rounds 77) ;number of messages
(connections
clients
(unzip2
(map (lambda (i)
(define-values (connection client)
(allocate-connection %client-context
"tcp" "127.0.0.1"
%kernel-key
#:first-port
(+ %first-port (* i 10))))
(list connection client))
(iota kernels))))
(servers (map (lambda (connection)
(connection->kernel connection
#:context
%server-context))
connections)))
(define (handle-message kernel kind message state)
(send-message kernel (reply message "pong" "{}"))
(let* ((n (or (assq-ref state kernel) 0))
(state (alist-cons kernel (+ n 1)
(alist-delete kernel state eq?)))
(total (reduce + 0 (map cdr state))))
(if (>= total (* kernels rounds))
(leave-server-loop state)
state)))
(define (client-proc client)
(let loop ((n 1))
(let ((request (message (header "ping" "foo" "12345") "{}")))
(send-message client request)
(let ((message (read-message client)))
(if (< n rounds)
(loop (+ 1 n))
(close-kernel client))))))
(for-each (lambda (client)
(call-with-new-thread (cut client-proc client)))
clients)
;; The server's state is an alist mapping each kernel to the number of
;; messages received from that kernel. At the end, return the number of
;; messages received from each kernel.
(match (serve-kernels servers handle-message '())
((((? kernel?) . counts) ...)
(for-each close-kernel servers)
counts))))
(test-equal "ping pong, many clients coming dynamically, shell"
(make-list 10 77) ;number of messages received from each kernel
;; Start KERNELS clients, each of which sends ROUNDS messages to the
;; server, which replies.
(let* ((kernels 10) ;number of clients
(rounds 77) ;number of messages
(connections
clients
(unzip2
(map (lambda (i)
(define-values (connection client)
(allocate-connection %client-context
"tcp" "127.0.0.1"
%kernel-key
#:first-port
(+ %first-port (* i 10))))
(list connection client))
(iota kernels))))
(servers (map (lambda (connection)
(connection->kernel connection
#:context
%server-context))
connections)))
(define (increment-counter kernel state)
(let ((n (or (assq-ref state kernel) 0)))
(alist-cons kernel (+ n 1)
(alist-delete kernel state eq?))))
(define (handle-message kernel kind message state)
(send-message kernel (reply message "pong" "{}"))
(match state
(((next rest ...) state)
(let ((state (increment-counter kernel state)))
(monitor-client next) ;add a new client to serve
(list rest state)))
((() state)
(let* ((state (increment-counter kernel state))
(total (reduce + 0 (map cdr state))))
(if (>= total (* kernels rounds))
(leave-server-loop state)
(list '() state))))))
(define (client-proc client)
(let loop ((n 1))
(let ((request (message (header "ping" "foo" "12345") "{}")))
(send-message client request)
(let ((message (read-message client)))
(when (< n rounds)
(loop (+ 1 n)))))))
(for-each (lambda (client)
(call-with-new-thread (cut client-proc client)))
clients)
;; The server's state is an alist mapping each kernel to the number of
;; messages received from that kernel, plus the list of additional
;; clients to monitor. At the end, return the number of messages
;; received from each kernel.
(match (serve-kernels (list (first servers)) handle-message
(list (cdr servers) '()))
((((? kernel?) . counts) ...)
(for-each close-kernel servers)
counts))))
(test-equal "heartbeat, one kernel"
"success"
;; Ensure that heartbeat messages are automatically echoed back.
(let* ((connection client (allocate-connection %client-context
"tcp" "127.0.0.1"
%kernel-key
#:first-port %first-port))
(server (connection->kernel connection
#:context %server-context)))
(define (handle-message kernel kind message state)
(pk 'handle message)
(leave-server-loop (message-type message)))
(define (client-thunk)
(let ((bv (string->utf8 "Hello, kernel!")))
(zmq-send-bytevector (kernel-heartbeat client) bv)
(let ((bv* (zmq-receive-bytevector (kernel-heartbeat client)
(bytevector-length bv))))
(send-message client
(message (header (if (equal? bv* bv)
"success"
"failure")
"foo" "123")
"{}")))))
(call-with-new-thread client-thunk)
(let ((result (serve-kernels (list server) handle-message 0)))
(close-kernel server)
result)))
(test-end "servers")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment