-
Notifications
You must be signed in to change notification settings - Fork 98
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #410 from inknos/demux-stdout-stderr
Enable demux option in `exec_run`
- Loading branch information
Showing
3 changed files
with
110 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
"""Utility functions for dealing with stdout and stderr.""" | ||
|
||
HEADER_SIZE = 8 | ||
STDOUT = 1 | ||
STDERR = 2 | ||
|
||
|
||
# pylint: disable=line-too-long | ||
def demux_output(data_bytes): | ||
"""Demuxes the output of a container stream into stdout and stderr streams. | ||
Stream data is expected to be in the following format: | ||
- 1 byte: stream type (1=stdout, 2=stderr) | ||
- 3 bytes: padding | ||
- 4 bytes: payload size (big-endian) | ||
- N bytes: payload data | ||
ref: https://docs.podman.io/en/latest/_static/api.html?version=v5.0#tag/containers/operation/ContainerAttachLibpod | ||
Args: | ||
data_bytes: Bytes object containing the combined stream data. | ||
Returns: | ||
A tuple containing two bytes objects: (stdout, stderr). | ||
""" | ||
stdout = b"" | ||
stderr = b"" | ||
while len(data_bytes) >= HEADER_SIZE: | ||
# Extract header information | ||
header, data_bytes = data_bytes[:HEADER_SIZE], data_bytes[HEADER_SIZE:] | ||
stream_type = header[0] | ||
payload_size = int.from_bytes(header[4:HEADER_SIZE], "big") | ||
# Check if data is sufficient for payload | ||
if len(data_bytes) < payload_size: | ||
break # Incomplete frame, wait for more data | ||
|
||
# Extract and process payload | ||
payload = data_bytes[:payload_size] | ||
if stream_type == STDOUT: | ||
stdout += payload | ||
elif stream_type == STDERR: | ||
stderr += payload | ||
else: | ||
# todo: Handle unexpected stream types | ||
pass | ||
|
||
# Update data for next frame | ||
data_bytes = data_bytes[payload_size:] | ||
|
||
return stdout, stderr |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import unittest | ||
|
||
import podman.tests.integration.base as base | ||
from podman import PodmanClient | ||
|
||
# @unittest.skipIf(os.geteuid() != 0, 'Skipping, not running as root') | ||
|
||
|
||
class ContainersExecIntegrationTests(base.IntegrationTest): | ||
"""Containers integration tests for exec""" | ||
|
||
def setUp(self): | ||
super().setUp() | ||
|
||
self.client = PodmanClient(base_url=self.socket_uri) | ||
self.addCleanup(self.client.close) | ||
|
||
self.alpine_image = self.client.images.pull("quay.io/libpod/alpine", tag="latest") | ||
self.containers = [] | ||
|
||
def tearDown(self): | ||
for container in self.containers: | ||
container.remove(force=True) | ||
|
||
def test_container_exec_run(self): | ||
"""Test any command that will return code 0 and no output""" | ||
container = self.client.containers.create(self.alpine_image, command=["top"], detach=True) | ||
container.start() | ||
error_code, stdout = container.exec_run("echo hello") | ||
|
||
self.assertEqual(error_code, 0) | ||
self.assertEqual(stdout, b'\x01\x00\x00\x00\x00\x00\x00\x06hello\n') | ||
|
||
def test_container_exec_run_errorcode(self): | ||
"""Test a failing command with stdout and stderr in a single bytestring""" | ||
container = self.client.containers.create(self.alpine_image, command=["top"], detach=True) | ||
container.start() | ||
error_code, output = container.exec_run("ls nonexistent") | ||
|
||
self.assertEqual(error_code, 1) | ||
self.assertEqual( | ||
output, b"\x02\x00\x00\x00\x00\x00\x00+ls: nonexistent: No such file or directory\n" | ||
) | ||
|
||
def test_container_exec_run_demux(self): | ||
"""Test a failing command with stdout and stderr in a bytestring tuple""" | ||
container = self.client.containers.create(self.alpine_image, command=["top"], detach=True) | ||
container.start() | ||
error_code, output = container.exec_run("ls nonexistent", demux=True) | ||
|
||
self.assertEqual(error_code, 1) | ||
self.assertEqual(output[0], b'') | ||
self.assertEqual(output[1], b"ls: nonexistent: No such file or directory\n") |