Skip to content

Commit

Permalink
Merge pull request #454 from MattBelle/main
Browse files Browse the repository at this point in the history
Added stream support to Container.exec_run().
  • Loading branch information
openshift-merge-bot[bot] authored Oct 29, 2024
2 parents 8a4cbf3 + 4f5cbdb commit 7a67491
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 13 deletions.
2 changes: 1 addition & 1 deletion podman/api/output_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ def demux_output(data_bytes):
# Update data for next frame
data_bytes = data_bytes[payload_size:]

return stdout, stderr
return stdout or None, stderr or None
13 changes: 10 additions & 3 deletions podman/api/parse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any, Dict, Iterator, Optional, Tuple, Union

from requests import Response
from .output_utils import demux_output


def parse_repository(name: str) -> Tuple[str, Optional[str]]:
Expand Down Expand Up @@ -79,11 +80,13 @@ def frames(response: Response) -> Iterator[bytes]:
yield response.content[frame_begin:frame_end]


def stream_frames(response: Response) -> Iterator[bytes]:
def stream_frames(
response: Response, demux: bool = False
) -> Iterator[Union[bytes, Tuple[bytes, bytes]]]:
"""Returns each frame from multiplexed streamed payload.
Notes:
The stdout and stderr frames are undifferentiated as they are returned.
If ``demux`` then output will be tuples where the first position is ``STDOUT`` and the second
is ``STDERR``.
"""
while True:
header = response.raw.read(8)
Expand All @@ -95,6 +98,10 @@ def stream_frames(response: Response) -> Iterator[bytes]:
continue

data = response.raw.read(frame_length)

if demux:
data = demux_output(header + data)

if not data:
return
yield data
Expand Down
27 changes: 19 additions & 8 deletions podman/domain/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,14 @@ def exec_run(
privileged: bool = False,
user=None,
detach: bool = False,
stream: bool = False, # pylint: disable=unused-argument
stream: bool = False,
socket: bool = False, # pylint: disable=unused-argument
environment: Union[Mapping[str, str], List[str]] = None,
workdir: str = None,
demux: bool = False,
) -> Tuple[Optional[int], Union[Iterator[bytes], Any, Tuple[bytes, bytes]]]:
) -> Tuple[
Optional[int], Union[Iterator[Union[bytes, Tuple[bytes, bytes]]], Any, Tuple[bytes, bytes]]
]:
"""Run given command inside container and return results.
Args:
Expand All @@ -156,7 +158,7 @@ def exec_run(
user: User to execute command as.
detach: If true, detach from the exec command.
Default: False
stream: Stream response data. Default: False
stream: Stream response data. Ignored if ``detach`` is ``True``. Default: False
socket: Return the connection socket to allow custom
read/write operations. Default: False
environment: A dictionary or a List[str] in
Expand All @@ -166,10 +168,13 @@ def exec_run(
demux: Return stdout and stderr separately
Returns:
First item is the command response code.
Second item is the requests response content.
If demux is True, the second item is a tuple of
(stdout, stderr).
A tuple of (``response_code``, ``output``).
``response_code``:
The exit code of the provided command. ``None`` if ``stream``.
``output``:
If ``stream``, then a generator yeilding response chunks.
If ``demux``, then a tuple of (``stdout``, ``stderr``).
Else the response content.
Raises:
NotImplementedError: method not implemented.
Expand All @@ -192,15 +197,21 @@ def exec_run(
if user:
data["User"] = user

stream = stream and not detach

# create the exec instance
response = self.client.post(f"/containers/{self.name}/exec", data=json.dumps(data))
response.raise_for_status()
exec_id = response.json()['Id']
# start the exec instance, this will store command output
start_resp = self.client.post(
f"/exec/{exec_id}/start", data=json.dumps({"Detach": detach, "Tty": tty})
f"/exec/{exec_id}/start", data=json.dumps({"Detach": detach, "Tty": tty}), stream=stream
)
start_resp.raise_for_status()

if stream:
return None, api.stream_frames(start_resp, demux=demux)

# get and return exec information
response = self.client.get(f"/exec/{exec_id}/json")
response.raise_for_status()
Expand Down
72 changes: 71 additions & 1 deletion podman/tests/integration/test_container_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,75 @@ def test_container_exec_run_demux(self):
error_code, output = container.exec_run("ls nonexistent", demux=True)

self.assertEqual(error_code, 1)
self.assertEqual(output[0], b'')
self.assertEqual(output[0], None)
self.assertEqual(output[1], b"ls: nonexistent: No such file or directory\n")

def test_container_exec_run_stream(self):
"""Test streaming the output from a long running command."""
container = self.client.containers.create(self.alpine_image, command=["top"], detach=True)
container.start()

command = [
'/bin/sh',
'-c',
'echo 0 ; sleep .1 ; echo 1 ; sleep .1 ; echo 2 ; sleep .1 ;',
]
error_code, output = container.exec_run(command, stream=True)

self.assertEqual(error_code, None)
self.assertEqual(
list(output),
[
b'0\n',
b'1\n',
b'2\n',
],
)

def test_container_exec_run_stream_demux(self):
"""Test streaming the output from a long running command with demux enabled."""
container = self.client.containers.create(self.alpine_image, command=["top"], detach=True)
container.start()

command = [
'/bin/sh',
'-c',
'echo 0 ; >&2 echo 1 ; sleep .1 ; '
+ 'echo 2 ; >&2 echo 3 ; sleep .1 ; '
+ 'echo 4 ; >&2 echo 5 ; sleep .1 ;',
]
error_code, output = container.exec_run(command, stream=True, demux=True)

self.assertEqual(error_code, None)
self.assertEqual(
list(output),
[
(b'0\n', None),
(None, b'1\n'),
(b'2\n', None),
(None, b'3\n'),
(b'4\n', None),
(None, b'5\n'),
],
)

def test_container_exec_run_stream_detach(self):
"""Test streaming the output from a long running command with detach enabled."""
container = self.client.containers.create(self.alpine_image, command=["top"], detach=True)
container.start()

command = [
'/bin/sh',
'-c',
'echo 0 ; sleep .1 ; echo 1 ; sleep .1 ; echo 2 ; sleep .1 ;',
]
error_code, output = container.exec_run(command, stream=True, detach=True)

# Detach should make the ``exec_run`` ignore the ``stream`` flag so we will assert against the standard,
# non-streaming behavior.
self.assertEqual(error_code, 0)
# The endpoint should return immediately, before we are able to actually get any of the output.
self.assertEqual(
output,
b'\n',
)

0 comments on commit 7a67491

Please sign in to comment.