Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#44 mllp client: read full response from the server #49

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 101 additions & 47 deletions hl7/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import io
import logging
import os.path
import socket
import sys
from optparse import OptionParser
import time
import typing
from argparse import ArgumentParser

import hl7
from hl7.exceptions import CLIException, MLLPException

SB = b"\x0b" # <SB>, vertical tab
EB = b"\x1c" # <EB>, file separator
Expand All @@ -14,9 +18,7 @@

RECV_BUFFER = 4096


class MLLPException(Exception):
pass
log = logging.getLogger(__name__)


class MLLPClient(object):
Expand All @@ -36,16 +38,22 @@ class MLLPClient(object):
with MLLPClient(host, port) as client:
client.send_message('MSH|...')

MLLPClient takes an optional ``encoding`` parameter, defaults to UTF-8,
for encoding unicode messages [#]_.
MLLPClient takes optional parameters:

* ``encoding``, defaults to UTF-8, for encoding unicode messages [#]_.
* ``timeout`` in seconds, timeout for socket operations [_t]_.
* ``deadline`` in seconds, will be used by the client to determine how long it should wait for full response.

.. [#] http://wiki.hl7.org/index.php?title=Character_Set_used_in_v2_messages
.. [_t] https://docs.python.org/3/library/socket.html#socket.socket.settimeout
"""

def __init__(self, host, port, encoding="utf-8"):
def __init__(self, host, port, encoding="utf-8", timeout=10, deadline=3):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((host, port))
self.encoding = encoding
self.timeout = timeout # seconds for socket timeout
self.deadline = deadline # seconds for max time client will wait for a response

def __enter__(self):
return self
Expand All @@ -57,7 +65,7 @@ def close(self):
"""Release the socket connection"""
self.socket.close()

def send_message(self, message):
def send_message(self, message: typing.Union[bytes, str, hl7.Message]) -> bytes:
"""Wraps a byte string, unicode string, or :py:class:`hl7.Message`
in a MLLP container and send the message to the server

Expand All @@ -79,14 +87,41 @@ def send_message(self, message):
data = SB + binary + EB + CR
return self.send(data)

def send(self, data):
def send(self, data: bytes) -> bytes:
"""Low-level, direct access to the socket.send (data must be already
wrapped in an MLLP container). Blocks until the server returns.
"""
# upload the data
log.debug(f"sending {(data,)}")
self.socket.send(data)
# wait for the ACK/NACK
return self.socket.recv(RECV_BUFFER)
self.socket.settimeout(self.timeout)
buff = b""
# the whole message should be received within this deadline
deadline = time.time() + self.deadline

# This will read data until deadline is reached.
# some HL7 counterparts may send responses in chunks, so we should wait some
# and read from the socket until we probably receive full message.
# timeout/deadline are configurable on client creation.
while deadline > time.time():
try:
data = self.socket.recv(RECV_BUFFER)
except TimeoutError:
data = None
if data is not None:
buff += data
# received LLP end markers
if buff.endswith(EB + CR):
break
log.debug(f"received {(buff,)}")
return self.clean(buff)

def clean(self, data: bytes) -> bytes:
"""Removes LLP bytes from data"""
data = data.lstrip(SB)
data = data.rstrip(EB + CR)
return data


# wrappers to make testing easier
Expand Down Expand Up @@ -117,8 +152,10 @@ def read_stream(stream):

while True:
data = stream.read(RECV_BUFFER)
if data == b"":
if not data:
break
if isinstance(data, str):
data = data.encode("utf-8")
# usually should be broken up by EB, but I have seen FF separating
# messages
messages = (_buffer + data).split(EB if FF not in data else FF)
Expand Down Expand Up @@ -165,96 +202,113 @@ def read_loose(stream):
yield START_BLOCK + m


def mllp_send():
def mllp_send(in_args=None):
"""Command line tool to send messages to an MLLP server"""
# set up the command line options
script_name = os.path.basename(sys.argv[0])
parser = OptionParser(usage=script_name + " [options] <server>")
parser.add_option(
in_args = in_args or sys.argv
script_name = os.path.basename(in_args[0])
parser = ArgumentParser(usage=script_name + " [options] <server>")
parser.add_argument("host", action="store", nargs=1, help="Host to connect to")
parser.add_argument(
"--version",
action="store_true",
dest="version",
default=False,
help="print current version and exit",
)
parser.add_option(
parser.add_argument(
"-p",
"--port",
action="store",
type="int",
type=int,
required=False,
dest="port",
default=6661,
help="port to connect to",
)
parser.add_option(
parser.add_argument(
"-f",
"--file",
required=False,
dest="filename",
default=None,
help="read from FILE instead of stdin",
metavar="FILE",
)
parser.add_option(
parser.add_argument(
"-q",
"--quiet",
action="store_true",
action="store_false",
dest="verbose",
default=True,
help="do not print status messages to stdout",
)
parser.add_option(
parser.add_argument(
"--loose",
action="store_true",
dest="loose",
default=False,
help=(
"allow file to be a HL7-like object (\\r\\n instead "
"of \\r). Requires that messages start with "
'"MSH|^~\\&|". Requires --file option (no stdin)'
),
)
parser.add_argument(
"--timeout",
action="store",
dest="timeout",
required=False,
default=10,
type=float,
help="number of seconds for socket operations to timeout",
)
parser.add_argument(
"--deadline",
action="store",
required=False,
dest="deadline",
default=3,
type=float,
help="number of seconds for the client to receive full response",
)

(options, args) = parser.parse_args()

if options.version:
args = parser.parse_args(in_args[1:])
if args.version:
import hl7

stdout(hl7.__version__)
return

if len(args) == 1:
host = args[0]
else:
# server not present
parser.print_usage()
stderr().write("server required\n")
sys.exit(1)
return # for testing when sys.exit mocked
host = args.host[0]

if options.filename is not None:
log.setLevel(logging.INFO)
if args.verbose:
log.setLevel(logging.DEBUG)

if args.filename is not None:
# Previously set stream to the open() handle, but then we did not
# close the open file handle. This new approach consumes the entire
# file into memory before starting to process, which is not required
# or ideal, since we can handle a stream
with open(options.filename, "rb") as f:
with open(args.filename, "rb") as f:
stream = io.BytesIO(f.read())
else:
if options.loose:
if args.loose:
stderr().write("--loose requires --file\n")
sys.exit(1)
return # for testing when sys.exit mocked
raise CLIException(1)

stream = stdin()

with MLLPClient(host, options.port) as client:
message_stream = (
read_stream(stream) if not options.loose else read_loose(stream)
)

with MLLPClient(
host, args.port, deadline=args.deadline, timeout=args.timeout
) as client:
message_stream = read_stream(stream) if not args.loose else read_loose(stream)
for message in message_stream:
result = client.send_message(message)
if options.verbose:
if args.verbose:
stdout(result)


if __name__ == "__main__":
mllp_send()
try:
mllp_send(sys.argv)
except CLIException as err:
sys.exit(err.exit_code)
Comment on lines +311 to +314
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to hide the actual stack trace of what went wrong, isn't it. E.g. it won't be clear if it was a TimeoutError or a parsing exception?

10 changes: 10 additions & 0 deletions hl7/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,13 @@ class MalformedFileException(HL7Exception):

class ParseException(HL7Exception):
pass


class MLLPException(HL7Exception):
pass


class CLIException(HL7Exception):
""" An exception to propagate expected exit code from cli script"""
def __init__(self, exit_code):
self.exit_code = exit_code
Loading