From 8f6392508fbfdaed3a7e5cc68e49dd6d0a22e94e Mon Sep 17 00:00:00 2001 From: David Brochart Date: Mon, 28 Feb 2022 20:23:35 +0100 Subject: [PATCH] Add session async recv --- jupyter_client/session.py | 40 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/jupyter_client/session.py b/jupyter_client/session.py index 44f0a799d..68f296540 100644 --- a/jupyter_client/session.py +++ b/jupyter_client/session.py @@ -936,6 +936,46 @@ def recv( # TODO: handle it raise e + async def async_recv( + self, + socket: zmq.sugar.socket.Socket, + mode: int = zmq.NOBLOCK, + content: bool = True, + copy: bool = True, + ) -> t.Tuple[t.Optional[t.List[bytes]], t.Optional[t.Dict[str, t.Any]]]: + """Receive asynchronously and unpack a message. + + Parameters + ---------- + socket : ZMQStream or Socket + The socket or stream to use in receiving. + + Returns + ------- + [idents], msg + [idents] is a list of idents and msg is a nested message dict of + same format as self.msg returns. + """ + if isinstance(socket, ZMQStream): + socket = socket.socket + try: + msg_list = await socket.recv_multipart(mode, copy=copy) + except zmq.ZMQError as e: + if e.errno == zmq.EAGAIN: + # We can convert EAGAIN to None as we know in this case + # recv_multipart won't return None. + return None, None + else: + raise + # split multipart message into identity list and message dict + # invalid large messages can cause very expensive string comparisons + idents, msg_list = self.feed_identities(msg_list, copy) + try: + return idents, self.deserialize(msg_list, content=content, copy=copy) + except Exception as e: + # TODO: handle it + raise e + def feed_identities( self, msg_list: t.Union[t.List[bytes], t.List[zmq.Message]], copy: bool = True ) -> t.Tuple[t.List[bytes], t.Union[t.List[bytes], t.List[zmq.Message]]]: