Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flag to control automatic exception sampling in Python (disabled) #28418

Merged
merged 4 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion sdks/python/apache_beam/runners/worker/data_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
from apache_beam.coders.coder_impl import CoderImpl
from apache_beam.coders.coder_impl import WindowedValueCoderImpl
from apache_beam.coders.coders import Coder
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.utils.windowed_value import WindowedValue

Expand Down Expand Up @@ -216,17 +218,42 @@ def __init__(
self,
max_samples: int = 10,
sample_every_sec: float = 30,
sample_only_exceptions: bool = False,
clock=None) -> None:
# Key is PCollection id. Is guarded by the _samplers_lock.
self._samplers: Dict[str, OutputSampler] = {}
# Bundles are processed in parallel, so new samplers may be added when the
# runner queries for samples.
self._samplers_lock: threading.Lock = threading.Lock()
self._max_samples = max_samples
self._sample_every_sec = sample_every_sec
self._sample_every_sec = 0.0 if sample_only_exceptions else sample_every_sec
self._samplers_by_output: Dict[str, List[OutputSampler]] = {}
self._clock = clock

_ENABLE_DATA_SAMPLING = 'enable_data_sampling'
_ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING = 'enable_always_on_exception_sampling'
_DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING = 'disable_always_on_exception_sampling'

@staticmethod
def create(sdk_pipeline_options: PipelineOptions, **kwargs):
experiments = sdk_pipeline_options.view_as(DebugOptions).experiments or []

# When true, enables only the sampling of exceptions.
always_on_exception_sampling = (
DataSampler._ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING in experiments and
DataSampler._DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING not in experiments)

# When true, enables the sampling of all PCollections and exceptions.
enable_data_sampling = DataSampler._ENABLE_DATA_SAMPLING in experiments

if enable_data_sampling or always_on_exception_sampling:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If both of these flags are true, only exception sampling is enabled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah gotcha, looks like Java has the logic but not here. Added it here and added a test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sample_only_exceptions = (
always_on_exception_sampling and not enable_data_sampling)
return DataSampler(
sample_only_exceptions=sample_only_exceptions, **kwargs)
else:
return None

def stop(self) -> None:
"""Stops all sampling, does not clear samplers in case there are outstanding
samples.
Expand Down
102 changes: 101 additions & 1 deletion sdks/python/apache_beam/runners/worker/data_sampler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

from apache_beam.coders import FastPrimitivesCoder
from apache_beam.coders import WindowedValueCoder
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.runners.worker.data_sampler import DataSampler
from apache_beam.runners.worker.data_sampler import OutputSampler
Expand Down Expand Up @@ -56,7 +57,9 @@ def make_test_descriptor(
return descriptor

def setUp(self):
self.data_sampler = DataSampler(sample_every_sec=0.1)
self.data_sampler = DataSampler.create(
PipelineOptions(experiments=[DataSampler._ENABLE_DATA_SAMPLING]),
sample_every_sec=0.1)

def tearDown(self):
self.data_sampler.stop()
Expand Down Expand Up @@ -341,6 +344,103 @@ def test_can_sample_exceptions(self):
samples = self.data_sampler.wait_for_samples([MAIN_PCOLLECTION_ID])
self.assertGreater(len(samples.element_samples), 0)

def test_create_experiments(self):
"""Tests that the experiments correctly make the DataSampler."""
enable_exception_exp = DataSampler._ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING
disable_exception_exp = DataSampler._DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING
enable_sampling_exp = DataSampler._ENABLE_DATA_SAMPLING

self.assertIsNone(DataSampler.create(PipelineOptions()))

exp = [disable_exception_exp]
self.assertIsNone(DataSampler.create(PipelineOptions(experiments=exp)))

exp = [enable_exception_exp, disable_exception_exp]
self.assertIsNone(DataSampler.create(PipelineOptions(experiments=exp)))

exp = [enable_exception_exp]
self.assertIsNotNone(DataSampler.create(PipelineOptions(experiments=exp)))

exp = [enable_sampling_exp]
self.assertIsNotNone(DataSampler.create(PipelineOptions(experiments=exp)))

exp = [enable_sampling_exp, enable_exception_exp, disable_exception_exp]
self.assertIsNotNone(DataSampler.create(PipelineOptions(experiments=exp)))

def test_samples_all_with_both_experiments(self):
"""Tests that the using both sampling experiments samples everything."""
self.data_sampler = DataSampler.create(
PipelineOptions(
experiments=[
DataSampler._ENABLE_DATA_SAMPLING,
DataSampler._ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING
]),
sample_every_sec=0.1)

# Create a descriptor with one transform with two outputs, 'a' and 'b'.
descriptor = self.make_test_descriptor(outputs=['a', 'b'])
self.data_sampler.initialize_samplers(
MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory)

# Get the samples for the two outputs.
a_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0)
b_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1)

# Sample an exception for the output 'a', this will show up in the final
# samples response.
exc_info = None
try:
raise Exception('test')
except Exception:
exc_info = sys.exc_info()
a_sampler.sample_exception('a', exc_info, MAIN_TRANSFORM_ID, 'instid')

# Sample a normal element for the output 'b', this will not show up in the
# final samples response.
b_sampler.element_sampler.el = 'b'
b_sampler.element_sampler.has_element = True

samples = self.data_sampler.wait_for_samples([])
self.assertEqual(len(samples.element_samples), 2)
self.assertTrue(
samples.element_samples['a'].elements[0].HasField('exception'))
self.assertFalse(
samples.element_samples['b'].elements[0].HasField('exception'))

def test_only_sample_exceptions(self):
"""Tests that the exception sampling experiment only samples exceptions."""
self.data_sampler = DataSampler.create(
PipelineOptions(
experiments=[DataSampler._ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING]),
sample_every_sec=0.1)

# Create a descriptor with one transform with two outputs, 'a' and 'b'.
descriptor = self.make_test_descriptor(outputs=['a', 'b'])
self.data_sampler.initialize_samplers(
MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory)

# Get the samples for the two outputs.
a_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0)
b_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1)

# Sample an exception for the output 'a', this will show up in the final
# samples response.
exc_info = None
try:
raise Exception('test')
except Exception:
exc_info = sys.exc_info()
a_sampler.sample_exception('a', exc_info, MAIN_TRANSFORM_ID, 'instid')

# Sample a normal element for the output 'b', this will not show up in the
# final samples response.
b_sampler.element_sampler.el = 'b'
b_sampler.element_sampler.has_element = True

samples = self.data_sampler.wait_for_samples([])
self.assertEqual(len(samples.element_samples), 1)
self.assertIsNotNone(samples.element_samples['a'].elements[0].exception)


class OutputSamplerTest(unittest.TestCase):
def tearDown(self):
Expand Down
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/runners/worker/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,16 @@ def update_counters_start(self, windowed_value):
# type: (WindowedValue) -> None
self.opcounter.update_from(windowed_value)

if self.execution_context is not None:
self.execution_context.output_sampler = self.output_sampler

# The following code is optimized by inlining a function call. Because this
# is called for every element, a function call is too expensive (order of
# 100s of nanoseconds). Furthermore, a lock was purposefully not used
# between here and the DataSampler as an additional operation. The tradeoff
# is that some samples might be dropped, but it is better than the
# alternative which is double sampling the same element.
if self.element_sampler is not None and self.execution_context is not None:
self.execution_context.output_sampler = self.output_sampler
if self.element_sampler is not None:
if not self.element_sampler.has_element:
self.element_sampler.el = windowed_value
self.element_sampler.has_element = True
Expand Down
4 changes: 1 addition & 3 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,7 @@ def create_harness(environment, dry_run=False):
if dry_run:
return

data_sampler = None
if 'enable_data_sampling' in experiments:
data_sampler = DataSampler()
data_sampler = DataSampler.create(sdk_pipeline_options)

sdk_harness = SdkHarness(
control_address=control_service_descriptor.url,
Expand Down