Commit d88fb653 authored by Ludovic Courtès's avatar Ludovic Courtès
Browse files

kernels: 'read-message' reads from any of the kernel's sockets.

* jupyter/kernels.scm (read-message): Rewrite to use 'zmq-poll'.  Add
optional 'timeout' parameter.
* tests/kernels.scm ("execute_request"): Call 'read-message' thrice and
check all three replies.
parent 22e7d33f
......@@ -259,23 +259,29 @@ return its PID."
;; Communicating with a kernel.
(define (read-message kernel)
"Read a message for KERNEL--i.e., a message sent by Jupyter--and return
it or #f if nothing is available."
;; FIXME Use 'zmq_poll'.
(define (waiting-data socket next)
(catch 'zmq-error
(λ _
(zmq-get-msg-parts-bytevector socket))
(λ stuff
((equal? EAGAIN (cadr stuff))
(and next (waiting-data next socket)))
(else #f)))))
(and=> (waiting-data (kernel-shell kernel)
(kernel-control kernel))
(define* (read-message kernel #:optional (timeout -1))
"Read one message from one of the sockets of KERNEL and return it. If
TIMEOUT is -1, wait indefinitely; otherwise wait that number of milliseconds.
If TIMEOUT expires before a message has been received, return #f."
(define shell (kernel-shell kernel))
(define iopub (kernel-iopub kernel))
(define items
(zmq-poll (map (lambda (socket)
(poll-item socket (logior ZMQ_POLLIN ZMQ_POLLERR)))
(kernel-sockets kernel))
(let loop ((items items))
(match items
((item rest ...)
(let ((socket (poll-item-socket item)))
(cond ((eq? socket (kernel-heartbeat kernel))
(loop rest))
(zmq-get-msg-parts-bytevector socket)))))))))
(define* (send-message kernel message
......@@ -20,6 +20,7 @@
#:use-module (simple-zmq)
#:use-module (json)
#:use-module (rnrs bytevectors)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-64))
(define %context
......@@ -41,19 +42,41 @@
(kill (kernel-pid kernel) 0))))
(unless %kernel (test-skip 1))
(test-assert "execute_request"
(test-equal "execute_request"
(let ((request (message (header "execute_request" "luser" "12345")
'((code . "40 + 2\n")
(silent . #f))))))
(send-message %kernel request)
(let ((reply (pk 'reply (read-message %kernel))))
(and (message? reply)
(equal? (message-parent-header reply)
(message-header request))
(string=? (message-type reply) "execute_reply")
(let ((content (json-string->scm (message-content reply))))
(string=? (hash-ref content "status") "ok"))))))
;; As noted at
;; <>,
;; in the request-reply pattern, we first receive on the iopub socket a
;; "status" message, then our input is broadcast on iopub, and then we
;; get the result.
(let* ((reply1 (read-message %kernel))
(reply2 (read-message %kernel))
(reply3 (read-message %kernel))
(replies (pk 'replies (list reply1 reply2 reply3))))
(and (every message? replies)
(every (compose bytevector? message-sender) replies)
(string=? "status" (message-type reply1))
(string=? "busy"
(hash-ref (json-string->scm (message-content reply1))
(equal? (message-parent-header reply1) (message-header request))
(string=? "execute_input" (message-type reply2))
(equal? (message-parent-header reply2) (message-header request))
(string=? (message-type reply3) "execute_result")
(equal? (message-parent-header reply3) (message-header request))
(let* ((content (json-string->scm (message-content reply3)))
(data (hash-ref content "data"))
(text (hash-ref data "text/plain")))
(string->number text))))))
(when %kernel
(close-kernel %kernel)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment