Fix exception sampling logic #30319
113 fail, 606 skipped, 1 319 pass in 6m 23s
Annotations
github-actions / Test Results
All 2 runs failed: test_pardo_side_outputs (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 2s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_side_outputs>
def test_pardo_side_outputs(self):
def tee(elem, *tags):
for tag in tags:
if tag in elem:
yield beam.pvalue.TaggedOutput(tag, elem)
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:396:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_pardo_state_only (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_state_only>
def test_pardo_state_only(self):
index_state_spec = userstate.CombiningValueStateSpec('index', sum)
value_and_index_state_spec = userstate.ReadModifyWriteStateSpec(
'value:index', StrUtf8Coder())
# TODO(ccy): State isn't detected with Map/FlatMap.
class AddIndex(beam.DoFn):
def process(
self,
kv,
index=beam.DoFn.StateParam(index_state_spec),
value_and_index=beam.DoFn.StateParam(value_and_index_state_spec)):
k, v = kv
index.add(1)
value_and_index.write('%s:%s' % (v, index.read()))
yield k, v, index.read(), value_and_index.read()
inputs = [('A', 'a')] * 2 + [('B', 'b')] * 3
expected = [('A', 'a', 1, 'a:1'), ('A', 'a', 2, 'a:2'),
('B', 'b', 1, 'b:1'), ('B', 'b', 2, 'b:2'),
('B', 'b', 3, 'b:3')]
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:643:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_pardo_state_timers (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_state_timers>
def test_pardo_state_timers(self):
> self._run_pardo_state_timers(windowed=False)
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:744:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:803: in _run_pardo_state_timers
with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_pardo_state_timers_non_standard_coder (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_state_timers_non_standard_coder>
def test_pardo_state_timers_non_standard_coder(self):
> self._run_pardo_state_timers(windowed=False, key_type=Any)
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:747:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:803: in _run_pardo_state_timers
with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_pardo_state_with_custom_key_coder (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_state_with_custom_key_coder>
def test_pardo_state_with_custom_key_coder(self):
"""Tests that state requests work correctly when the key coder is an
SDK-specific coder, i.e. non standard coder. This is additionally enforced
by Java's ProcessBundleDescriptorsTest and by Flink's
ExecutableStageDoFnOperator which detects invalid encoding by checking for
the correct key group of the encoded key."""
index_state_spec = userstate.CombiningValueStateSpec('index', sum)
# Test params
# Ensure decent amount of elements to serve all partitions
n = 200
duplicates = 1
split = n // (duplicates + 1)
inputs = [(i % split, str(i % split)) for i in range(0, n)]
# Use a DoFn which has to use FastPrimitivesCoder because the type cannot
# be inferred
class Input(beam.DoFn):
def process(self, impulse):
for i in inputs:
yield i
class AddIndex(beam.DoFn):
def process(self, kv, index=beam.DoFn.StateParam(index_state_spec)):
k, v = kv
index.add(1)
yield k, v, index.read()
expected = [(i % split, str(i % split), i // split + 1)
for i in range(0, n)]
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:203:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_pardo_timers (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_timers>
def test_pardo_timers(self):
timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
state_spec = userstate.CombiningValueStateSpec('num_called', sum)
class TimerDoFn(beam.DoFn):
def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
unused_key, ts = element
timer.set(ts)
timer.set(2 * ts)
@userstate.on_timer(timer_spec)
def process_timer(
self,
ts=beam.DoFn.TimestampParam,
timer=beam.DoFn.TimerParam(timer_spec),
state=beam.DoFn.StateParam(state_spec)):
if state.read() == 0:
state.add(1)
timer.set(timestamp.Timestamp(micros=2 * ts.micros))
yield 'fired'
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:698:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_pardo_timers_clear (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_timers_clear>
def test_pardo_timers_clear(self):
timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
clear_timer_spec = userstate.TimerSpec(
'clear_timer', userstate.TimeDomain.WATERMARK)
class TimerDoFn(beam.DoFn):
def process(
self,
element,
timer=beam.DoFn.TimerParam(timer_spec),
clear_timer=beam.DoFn.TimerParam(clear_timer_spec)):
unused_key, ts = element
timer.set(ts)
timer.set(2 * ts)
clear_timer.set(ts)
clear_timer.clear()
@userstate.on_timer(timer_spec)
def process_timer(self):
yield 'fired'
@userstate.on_timer(clear_timer_spec)
def process_clear_timer(self):
yield 'should not fire'
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:733:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_pardo_unfusable_side_inputs (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_unfusable_side_inputs>
def test_pardo_unfusable_side_inputs(self):
def cross_product(elem, sides):
for side in sides:
yield elem, side
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:600:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_pardo_unfusable_side_inputs_with_separation (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_unfusable_side_inputs_with_separation>
def test_pardo_unfusable_side_inputs_with_separation(self):
def cross_product(elem, sides):
for side in sides:
yield elem, side
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:611:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_pardo_windowed_side_inputs (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_windowed_side_inputs>
def test_pardo_windowed_side_inputs(self):
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:492:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_read (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_read>
def test_read(self):
# Can't use NamedTemporaryFile as a context
# due to https://bugs.python.org/issue14243
temp_file = tempfile.NamedTemporaryFile(delete=False)
try:
temp_file.write(b'a\nb\nc')
temp_file.close()
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1088:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_register_finalizations (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_register_finalizations>
def test_register_finalizations(self):
event_recorder = EventRecorder(tempfile.gettempdir())
class FinalizableSplittableDoFn(beam.DoFn):
def process(
self,
element,
bundle_finalizer=beam.DoFn.BundleFinalizerParam,
restriction_tracker=beam.DoFn.RestrictionParam(
OffsetRangeProvider(
use_bounded_offset_range=True, checkpoint_only=True))):
# We use SDF to enforce finalization call happens by using
# self-initiated checkpoint.
if 'finalized' in event_recorder.events():
restriction_tracker.try_claim(
restriction_tracker.current_restriction().start)
yield element
restriction_tracker.try_claim(element)
return
if restriction_tracker.try_claim(
restriction_tracker.current_restriction().start):
bundle_finalizer.register(lambda: event_recorder.record('finalized'))
# We sleep here instead of setting a resume time since the resume time
# doesn't need to be honored.
time.sleep(1)
restriction_tracker.defer_remainder()
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1294:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_reshuffle (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_reshuffle>
def test_reshuffle(self):
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1051:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_sdf (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_sdf>
def test_sdf(self):
class ExpandingStringsDoFn(beam.DoFn):
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(
ExpandStringsProvider())):
assert isinstance(restriction_tracker, RestrictionTrackerView)
cur = restriction_tracker.current_restriction().start
while restriction_tracker.try_claim(cur):
yield element[cur]
cur += 1
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:854:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_sdf_synthetic_source (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_sdf_synthetic_source>
def test_sdf_synthetic_source(self):
common_attrs = {
'key_size': 1,
'value_size': 1,
'initial_splitting_num_bundles': 2,
'initial_splitting_desired_bundle_size': 2,
'sleep_per_input_record_sec': 0,
'initial_splitting': 'const'
}
num_source_description = 5
min_num_record = 10
max_num_record = 20
# pylint: disable=unused-variable
source_descriptions = ([
dict({'num_records': random.randint(min_num_record, max_num_record)},
**common_attrs) for i in range(0, num_source_description)
])
total_num_records = 0
for source in source_descriptions:
total_num_records += source['num_records']
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1326:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_sdf_with_dofn_as_restriction_provider (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_sdf_with_dofn_as_restriction_provider>
def test_sdf_with_dofn_as_restriction_provider(self):
class ExpandingStringsDoFn(beam.DoFn, ExpandStringsProvider):
def process(
self, element, restriction_tracker=beam.DoFn.RestrictionParam()):
assert isinstance(restriction_tracker, RestrictionTrackerView)
cur = restriction_tracker.current_restriction().start
while restriction_tracker.try_claim(cur):
yield element[cur]
cur += 1
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:869:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_assert_that (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
AssertionError: "Failed assert" does not match "Subprocess terminated unexpectedly with exit code 1."
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
During handling of the above exception, another exception occurred:
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_assert_that>
def test_assert_that(self):
# TODO: figure out a way for fn_api_runner to parse and raise the
# underlying exception.
> with self.assertRaisesRegex(Exception, 'Failed assert'):
E AssertionError: "Failed assert" does not match "Subprocess terminated unexpectedly with exit code 1."
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:112: AssertionError
github-actions / Test Results
All 2 runs failed: test_sdf_with_dofn_as_watermark_estimator (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_sdf_with_dofn_as_watermark_estimator>
def test_sdf_with_dofn_as_watermark_estimator(self):
class ExpandingStringsDoFn(beam.DoFn, beam.WatermarkEstimatorProvider):
def initial_estimator_state(self, element, restriction):
return None
def create_watermark_estimator(self, state):
return beam.io.watermark_estimators.ManualWatermarkEstimator(state)
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(
ExpandStringsProvider()),
watermark_estimator=beam.DoFn.WatermarkEstimatorParam(
ManualWatermarkEstimator.default_provider())):
cur = restriction_tracker.current_restriction().start
while restriction_tracker.try_claim(cur):
watermark_estimator.set_watermark(timestamp.Timestamp(cur))
assert (
watermark_estimator.current_watermark() == timestamp.Timestamp(
cur))
yield element[cur]
if cur % 2 == 1:
restriction_tracker.defer_remainder(timestamp.Duration(micros=5))
return
cur += 1
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:946:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_batch_pardo (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_batch_pardo>
def test_batch_pardo(self):
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:130:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_sdf_with_sdf_initiated_checkpointing (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_sdf_with_sdf_initiated_checkpointing>
def test_sdf_with_sdf_initiated_checkpointing(self):
> self.run_sdf_initiated_checkpointing(is_drain=False)
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:984:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:970: in run_sdf_initiated_checkpointing
with self.create_pipeline(is_drain=is_drain) as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_batch_pardo_dofn_params (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_batch_pardo_dofn_params>
def test_batch_pardo_dofn_params(self):
class ConsumeParamsDoFn(beam.DoFn):
@no_type_check
def process_batch(
self,
batch: np.ndarray,
ts=beam.DoFn.TimestampParam,
pane_info=beam.DoFn.PaneInfoParam,
) -> Iterator[np.ndarray]:
assert isinstance(batch, np.ndarray)
assert isinstance(ts, timestamp.Timestamp)
assert isinstance(pane_info, windowed_value.PaneInfo)
yield batch * ts.seconds()
# infer_output_type must be defined (when there's no process method),
# otherwise we don't know the input type is the same as output type.
def infer_output_type(self, input_type):
return input_type
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:260:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_sdf_with_watermark_tracking (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_sdf_with_watermark_tracking>
def test_sdf_with_watermark_tracking(self):
class ExpandingStringsDoFn(beam.DoFn):
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(
ExpandStringsProvider()),
watermark_estimator=beam.DoFn.WatermarkEstimatorParam(
ManualWatermarkEstimator.default_provider())):
cur = restriction_tracker.current_restriction().start
while restriction_tracker.try_claim(cur):
watermark_estimator.set_watermark(timestamp.Timestamp(cur))
assert (
watermark_estimator.current_watermark() == timestamp.Timestamp(
cur))
yield element[cur]
if cur % 2 == 1:
restriction_tracker.defer_remainder(timestamp.Duration(micros=5))
return
cur += 1
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:914:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_batch_pardo_fusion_break (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_batch_pardo_fusion_break>
def test_batch_pardo_fusion_break(self):
class NormalizeDoFn(beam.DoFn):
@no_type_check
def process_batch(
self,
batch: np.ndarray,
mean: np.float64,
) -> Iterator[np.ndarray]:
assert isinstance(batch, np.ndarray)
yield batch - mean
# infer_output_type must be defined (when there's no process method),
# otherwise we don't know the input type is the same as output type.
def infer_output_type(self, input_type):
return np.float64
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:225:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_windowed_pardo_state_timers (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_windowed_pardo_state_timers>
def test_windowed_pardo_state_timers(self):
> self._run_pardo_state_timers(windowed=True)
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:750:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:803: in _run_pardo_state_timers
with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError
github-actions / Test Results
All 2 runs failed: test_batch_pardo_overlapping_windows (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312-cloud.xml [took 1s]
sdks/python/test-suites/tox/py312/build/srcs/sdks/python/pytest_py312.xml [took 1s]
Raw output
RuntimeError: Subprocess terminated unexpectedly with exit code 1.
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_batch_pardo_overlapping_windows>
def test_batch_pardo_overlapping_windows(self):
class PerWindowDoFn(beam.DoFn):
@no_type_check
def process_batch(self,
batch: np.ndarray,
window=beam.DoFn.WindowParam) -> Iterator[np.ndarray]:
yield batch * window.start.seconds()
# infer_output_type must be defined (when there's no process method),
# otherwise we don't know the input type is the same as output type.
def infer_output_type(self, input_type):
return input_type
> with self.create_pipeline() as p:
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:311:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:169: in create_pipeline
return beam.Pipeline(self.get_runner(), self.create_options())
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:272: in create_options
options = super().create_options()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:157: in create_options
options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:122: in _get_job_endpoint
cls._job_endpoint = cls._create_job_endpoint()
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:128: in _create_job_endpoint
return cls._start_local_runner_subprocess_job_service()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
cls = <class 'apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses'>
@classmethod
def _start_local_runner_subprocess_job_service(cls):
cls._maybe_kill_subprocess()
# TODO(robertwb): Consider letting the subprocess pick one and
# communicate it back...
# pylint: disable=unbalanced-tuple-unpacking
job_port, expansion_port = cls._pick_unused_ports(num_ports=2)
_LOGGER.info('Starting server on port %d.', job_port)
cls._subprocess = subprocess.Popen(
cls._subprocess_command(job_port, expansion_port))
address = 'localhost:%d' % job_port
job_service = beam_job_api_pb2_grpc.JobServiceStub(
GRPCChannelFactory.insecure_channel(address))
_LOGGER.info('Waiting for server to be ready...')
start = time.time()
# Long timeout based previously flaky test. See issue #22115.
timeout = 300
while True:
time.sleep(0.1)
if cls._subprocess.poll() is not None:
> raise RuntimeError(
'Subprocess terminated unexpectedly with exit code %d.' %
E RuntimeError: Subprocess terminated unexpectedly with exit code 1.
target/.tox-py312/py312/lib/python3.12/site-packages/apache_beam/runners/portability/portable_runner_test.py:101: RuntimeError