From 03b87c0037b1a72cff744b715231ede60a68a5b1 Mon Sep 17 00:00:00 2001 From: Matt Belle Date: Tue, 22 Oct 2024 19:41:52 -0400 Subject: [PATCH 1/4] Added stream support to Container.exec_run(). Signed-off-by: Matt Belle --- podman/api/parse_utils.py | 13 ++++++++----- podman/domain/containers.py | 19 ++++++++++++------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/podman/api/parse_utils.py b/podman/api/parse_utils.py index b5bed75f..94b897e7 100644 --- a/podman/api/parse_utils.py +++ b/podman/api/parse_utils.py @@ -8,6 +8,7 @@ from typing import Any, Dict, Iterator, Optional, Tuple, Union from requests import Response +from .output_utils import demux_output def parse_repository(name: str) -> Tuple[str, Optional[str]]: @@ -79,11 +80,9 @@ def frames(response: Response) -> Iterator[bytes]: yield response.content[frame_begin:frame_end] -def stream_frames(response: Response) -> Iterator[bytes]: - """Returns each frame from multiplexed streamed payload. - - Notes: - The stdout and stderr frames are undifferentiated as they are returned. +def stream_frames(response: Response, demux: bool = False) -> Iterator[Union[bytes, Tuple[bytes, bytes]]]: + """Returns each frame from multiplexed streamed payload. If ``demux`` then output will be + tuples where the first position is ``STDOUT`` and the second is ``STDERR``. """ while True: header = response.raw.read(8) @@ -95,6 +94,10 @@ def stream_frames(response: Response) -> Iterator[bytes]: continue data = response.raw.read(frame_length) + + if demux: + data = demux_output(header + data) + if not data: return yield data diff --git a/podman/domain/containers.py b/podman/domain/containers.py index a23896f3..9de4b4a1 100644 --- a/podman/domain/containers.py +++ b/podman/domain/containers.py @@ -138,12 +138,12 @@ def exec_run( privileged: bool = False, user=None, detach: bool = False, - stream: bool = False, # pylint: disable=unused-argument + stream: bool = False, socket: bool = False, # pylint: disable=unused-argument environment: Union[Mapping[str, str], List[str]] = None, workdir: str = None, demux: bool = False, - ) -> Tuple[Optional[int], Union[Iterator[bytes], Any, Tuple[bytes, bytes]]]: + ) -> Tuple[Optional[int], Union[Iterator[Union[bytes, Tuple[bytes, bytes]]], Any, Tuple[bytes, bytes]]]: """Run given command inside container and return results. Args: @@ -166,10 +166,11 @@ def exec_run( demux: Return stdout and stderr separately Returns: - First item is the command response code. - Second item is the requests response content. - If demux is True, the second item is a tuple of - (stdout, stderr). + A tuple of (``response_code``, ``output``). ``response_code`` + is ``None`` if ``stream``. If neither ``stream`` nor ``demux``, + then ``output`` is the response content. If ``stream`` output + is a generator yeilding response chunks. If ``demux`` output is + a tuple of (``stdout``, ``stderr``). Raises: NotImplementedError: method not implemented. @@ -198,9 +199,13 @@ def exec_run( exec_id = response.json()['Id'] # start the exec instance, this will store command output start_resp = self.client.post( - f"/exec/{exec_id}/start", data=json.dumps({"Detach": detach, "Tty": tty}) + f"/exec/{exec_id}/start", data=json.dumps({"Detach": detach, "Tty": tty}), stream=stream ) start_resp.raise_for_status() + + if stream: + return None, api.stream_frames(start_resp, demux=demux) + # get and return exec information response = self.client.get(f"/exec/{exec_id}/json") response.raise_for_status() From e948cfac5ae2adab7654d6450082419bb34e46bd Mon Sep 17 00:00:00 2001 From: Matt Belle Date: Tue, 22 Oct 2024 20:18:02 -0400 Subject: [PATCH 2/4] Formatted code and cleaned up docstrings to pass the automated checks. Signed-off-by: Matt Belle --- podman/api/parse_utils.py | 10 +++++++--- podman/domain/containers.py | 18 +++++++++++------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/podman/api/parse_utils.py b/podman/api/parse_utils.py index 94b897e7..c07762ea 100644 --- a/podman/api/parse_utils.py +++ b/podman/api/parse_utils.py @@ -80,9 +80,13 @@ def frames(response: Response) -> Iterator[bytes]: yield response.content[frame_begin:frame_end] -def stream_frames(response: Response, demux: bool = False) -> Iterator[Union[bytes, Tuple[bytes, bytes]]]: - """Returns each frame from multiplexed streamed payload. If ``demux`` then output will be - tuples where the first position is ``STDOUT`` and the second is ``STDERR``. +def stream_frames( + response: Response, demux: bool = False +) -> Iterator[Union[bytes, Tuple[bytes, bytes]]]: + """Returns each frame from multiplexed streamed payload. + + If ``demux`` then output will be tuples where the first position is ``STDOUT`` and the second + is ``STDERR``. """ while True: header = response.raw.read(8) diff --git a/podman/domain/containers.py b/podman/domain/containers.py index 9de4b4a1..c4d20152 100644 --- a/podman/domain/containers.py +++ b/podman/domain/containers.py @@ -143,7 +143,9 @@ def exec_run( environment: Union[Mapping[str, str], List[str]] = None, workdir: str = None, demux: bool = False, - ) -> Tuple[Optional[int], Union[Iterator[Union[bytes, Tuple[bytes, bytes]]], Any, Tuple[bytes, bytes]]]: + ) -> Tuple[ + Optional[int], Union[Iterator[Union[bytes, Tuple[bytes, bytes]]], Any, Tuple[bytes, bytes]] + ]: """Run given command inside container and return results. Args: @@ -166,11 +168,13 @@ def exec_run( demux: Return stdout and stderr separately Returns: - A tuple of (``response_code``, ``output``). ``response_code`` - is ``None`` if ``stream``. If neither ``stream`` nor ``demux``, - then ``output`` is the response content. If ``stream`` output - is a generator yeilding response chunks. If ``demux`` output is - a tuple of (``stdout``, ``stderr``). + A tuple of (``response_code``, ``output``). + ``response_code``: + The exit code of the provided command. ``None`` if ``stream``. + ``output``: + If ``stream``, then a generator yeilding response chunks. + If ``demux``, then a tuple of (``stdout``, ``stderr``). + Else the response content. Raises: NotImplementedError: method not implemented. @@ -199,7 +203,7 @@ def exec_run( exec_id = response.json()['Id'] # start the exec instance, this will store command output start_resp = self.client.post( - f"/exec/{exec_id}/start", data=json.dumps({"Detach": detach, "Tty": tty}), stream=stream + f"/exec/{exec_id}/start", data=json.dumps({"Detach": detach, "Tty": tty}), stream=stream ) start_resp.raise_for_status() From 524087d80470cdbe3c2dbe5686f4cdc67ed84fe8 Mon Sep 17 00:00:00 2001 From: Matt Belle Date: Wed, 23 Oct 2024 11:56:08 -0400 Subject: [PATCH 3/4] Added integration tests. Signed-off-by: Matt Belle --- podman/api/output_utils.py | 2 +- .../tests/integration/test_container_exec.py | 39 ++++++++++++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/podman/api/output_utils.py b/podman/api/output_utils.py index 2c349ad5..e4b28191 100644 --- a/podman/api/output_utils.py +++ b/podman/api/output_utils.py @@ -46,4 +46,4 @@ def demux_output(data_bytes): # Update data for next frame data_bytes = data_bytes[payload_size:] - return stdout, stderr + return stdout or None, stderr or None diff --git a/podman/tests/integration/test_container_exec.py b/podman/tests/integration/test_container_exec.py index 0221bef0..3e22aec7 100644 --- a/podman/tests/integration/test_container_exec.py +++ b/podman/tests/integration/test_container_exec.py @@ -49,5 +49,42 @@ def test_container_exec_run_demux(self): error_code, output = container.exec_run("ls nonexistent", demux=True) self.assertEqual(error_code, 1) - self.assertEqual(output[0], b'') + self.assertEqual(output[0], None) self.assertEqual(output[1], b"ls: nonexistent: No such file or directory\n") + + def test_container_exec_run_stream(self): + """Test streaming the output from a long running command.""" + container = self.client.containers.create(self.alpine_image, command=["top"], detach=True) + container.start() + + command = [] + for i in range(3): + # We want to sleep so that the lines get processed seperately + command.extend(['echo', str(i), ';', 'sleep', '.1', ';']) + command = ['/bin/sh', '-c', ' '.join(command)] + error_code, output = container.exec_run(command, stream=True) + + self.assertEqual(error_code, None) + for index, data in enumerate(output): + self.assertEqual(data, f'{index}\n'.encode()) + + def test_container_exec_run_stream_demux(self): + """Test streaming the output from a long running command with demux enabled.""" + container = self.client.containers.create(self.alpine_image, command=["top"], detach=True) + container.start() + + command = [] + for i in range(3): + # We want to sleep so that the lines get processed seperately + command.extend( + ['echo', str(i * 2), ';', '>&2', 'echo', str(i * 2 + 1), ';', 'sleep', '.1', ';'] + ) + command = ['/bin/sh', '-c', ' '.join(command)] + error_code, output = container.exec_run(command, stream=True, demux=True) + + self.assertEqual(error_code, None) + for index, data in enumerate(output): + if index % 2 == 0: + self.assertEqual(data, (f'{index}\n'.encode(), None)) + else: + self.assertEqual(data, (None, f'{index}\n'.encode())) From 4f5cbdbf1bb815511940159a3208499e2b668cd5 Mon Sep 17 00:00:00 2001 From: Matt Belle Date: Tue, 29 Oct 2024 11:33:52 -0400 Subject: [PATCH 4/4] Fixed behavior if detach is set. Flattened test logic. Signed-off-by: Matt Belle --- podman/domain/containers.py | 4 +- .../tests/integration/test_container_exec.py | 71 ++++++++++++++----- 2 files changed, 55 insertions(+), 20 deletions(-) diff --git a/podman/domain/containers.py b/podman/domain/containers.py index c4d20152..d2ef2e3e 100644 --- a/podman/domain/containers.py +++ b/podman/domain/containers.py @@ -158,7 +158,7 @@ def exec_run( user: User to execute command as. detach: If true, detach from the exec command. Default: False - stream: Stream response data. Default: False + stream: Stream response data. Ignored if ``detach`` is ``True``. Default: False socket: Return the connection socket to allow custom read/write operations. Default: False environment: A dictionary or a List[str] in @@ -197,6 +197,8 @@ def exec_run( if user: data["User"] = user + stream = stream and not detach + # create the exec instance response = self.client.post(f"/containers/{self.name}/exec", data=json.dumps(data)) response.raise_for_status() diff --git a/podman/tests/integration/test_container_exec.py b/podman/tests/integration/test_container_exec.py index 3e22aec7..43190797 100644 --- a/podman/tests/integration/test_container_exec.py +++ b/podman/tests/integration/test_container_exec.py @@ -57,34 +57,67 @@ def test_container_exec_run_stream(self): container = self.client.containers.create(self.alpine_image, command=["top"], detach=True) container.start() - command = [] - for i in range(3): - # We want to sleep so that the lines get processed seperately - command.extend(['echo', str(i), ';', 'sleep', '.1', ';']) - command = ['/bin/sh', '-c', ' '.join(command)] + command = [ + '/bin/sh', + '-c', + 'echo 0 ; sleep .1 ; echo 1 ; sleep .1 ; echo 2 ; sleep .1 ;', + ] error_code, output = container.exec_run(command, stream=True) self.assertEqual(error_code, None) - for index, data in enumerate(output): - self.assertEqual(data, f'{index}\n'.encode()) + self.assertEqual( + list(output), + [ + b'0\n', + b'1\n', + b'2\n', + ], + ) def test_container_exec_run_stream_demux(self): """Test streaming the output from a long running command with demux enabled.""" container = self.client.containers.create(self.alpine_image, command=["top"], detach=True) container.start() - command = [] - for i in range(3): - # We want to sleep so that the lines get processed seperately - command.extend( - ['echo', str(i * 2), ';', '>&2', 'echo', str(i * 2 + 1), ';', 'sleep', '.1', ';'] - ) - command = ['/bin/sh', '-c', ' '.join(command)] + command = [ + '/bin/sh', + '-c', + 'echo 0 ; >&2 echo 1 ; sleep .1 ; ' + + 'echo 2 ; >&2 echo 3 ; sleep .1 ; ' + + 'echo 4 ; >&2 echo 5 ; sleep .1 ;', + ] error_code, output = container.exec_run(command, stream=True, demux=True) self.assertEqual(error_code, None) - for index, data in enumerate(output): - if index % 2 == 0: - self.assertEqual(data, (f'{index}\n'.encode(), None)) - else: - self.assertEqual(data, (None, f'{index}\n'.encode())) + self.assertEqual( + list(output), + [ + (b'0\n', None), + (None, b'1\n'), + (b'2\n', None), + (None, b'3\n'), + (b'4\n', None), + (None, b'5\n'), + ], + ) + + def test_container_exec_run_stream_detach(self): + """Test streaming the output from a long running command with detach enabled.""" + container = self.client.containers.create(self.alpine_image, command=["top"], detach=True) + container.start() + + command = [ + '/bin/sh', + '-c', + 'echo 0 ; sleep .1 ; echo 1 ; sleep .1 ; echo 2 ; sleep .1 ;', + ] + error_code, output = container.exec_run(command, stream=True, detach=True) + + # Detach should make the ``exec_run`` ignore the ``stream`` flag so we will assert against the standard, + # non-streaming behavior. + self.assertEqual(error_code, 0) + # The endpoint should return immediately, before we are able to actually get any of the output. + self.assertEqual( + output, + b'\n', + )