diff --git a/sdks/python/apache_beam/runners/worker/data_sampler.py b/sdks/python/apache_beam/runners/worker/data_sampler.py index 303648738f3d..5ca307ca1b37 100644 --- a/sdks/python/apache_beam/runners/worker/data_sampler.py +++ b/sdks/python/apache_beam/runners/worker/data_sampler.py @@ -40,6 +40,8 @@ from apache_beam.coders.coder_impl import CoderImpl from apache_beam.coders.coder_impl import WindowedValueCoderImpl from apache_beam.coders.coders import Coder +from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.utils.windowed_value import WindowedValue @@ -216,6 +218,7 @@ def __init__( self, max_samples: int = 10, sample_every_sec: float = 30, + sample_only_exceptions: bool = False, clock=None) -> None: # Key is PCollection id. Is guarded by the _samplers_lock. self._samplers: Dict[str, OutputSampler] = {} @@ -223,10 +226,34 @@ def __init__( # runner queries for samples. self._samplers_lock: threading.Lock = threading.Lock() self._max_samples = max_samples - self._sample_every_sec = sample_every_sec + self._sample_every_sec = 0.0 if sample_only_exceptions else sample_every_sec self._samplers_by_output: Dict[str, List[OutputSampler]] = {} self._clock = clock + _ENABLE_DATA_SAMPLING = 'enable_data_sampling' + _ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING = 'enable_always_on_exception_sampling' + _DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING = 'disable_always_on_exception_sampling' + + @staticmethod + def create(sdk_pipeline_options: PipelineOptions, **kwargs): + experiments = sdk_pipeline_options.view_as(DebugOptions).experiments or [] + + # When true, enables only the sampling of exceptions. + always_on_exception_sampling = ( + DataSampler._ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING in experiments and + DataSampler._DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING not in experiments) + + # When true, enables the sampling of all PCollections and exceptions. + enable_data_sampling = DataSampler._ENABLE_DATA_SAMPLING in experiments + + if enable_data_sampling or always_on_exception_sampling: + sample_only_exceptions = ( + always_on_exception_sampling and not enable_data_sampling) + return DataSampler( + sample_only_exceptions=sample_only_exceptions, **kwargs) + else: + return None + def stop(self) -> None: """Stops all sampling, does not clear samplers in case there are outstanding samples. diff --git a/sdks/python/apache_beam/runners/worker/data_sampler_test.py b/sdks/python/apache_beam/runners/worker/data_sampler_test.py index 8d063fdb49d6..8c47315b7a9e 100644 --- a/sdks/python/apache_beam/runners/worker/data_sampler_test.py +++ b/sdks/python/apache_beam/runners/worker/data_sampler_test.py @@ -27,6 +27,7 @@ from apache_beam.coders import FastPrimitivesCoder from apache_beam.coders import WindowedValueCoder +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.runners.worker.data_sampler import DataSampler from apache_beam.runners.worker.data_sampler import OutputSampler @@ -56,7 +57,9 @@ def make_test_descriptor( return descriptor def setUp(self): - self.data_sampler = DataSampler(sample_every_sec=0.1) + self.data_sampler = DataSampler.create( + PipelineOptions(experiments=[DataSampler._ENABLE_DATA_SAMPLING]), + sample_every_sec=0.1) def tearDown(self): self.data_sampler.stop() @@ -341,6 +344,103 @@ def test_can_sample_exceptions(self): samples = self.data_sampler.wait_for_samples([MAIN_PCOLLECTION_ID]) self.assertGreater(len(samples.element_samples), 0) + def test_create_experiments(self): + """Tests that the experiments correctly make the DataSampler.""" + enable_exception_exp = DataSampler._ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING + disable_exception_exp = DataSampler._DISABLE_ALWAYS_ON_EXCEPTION_SAMPLING + enable_sampling_exp = DataSampler._ENABLE_DATA_SAMPLING + + self.assertIsNone(DataSampler.create(PipelineOptions())) + + exp = [disable_exception_exp] + self.assertIsNone(DataSampler.create(PipelineOptions(experiments=exp))) + + exp = [enable_exception_exp, disable_exception_exp] + self.assertIsNone(DataSampler.create(PipelineOptions(experiments=exp))) + + exp = [enable_exception_exp] + self.assertIsNotNone(DataSampler.create(PipelineOptions(experiments=exp))) + + exp = [enable_sampling_exp] + self.assertIsNotNone(DataSampler.create(PipelineOptions(experiments=exp))) + + exp = [enable_sampling_exp, enable_exception_exp, disable_exception_exp] + self.assertIsNotNone(DataSampler.create(PipelineOptions(experiments=exp))) + + def test_samples_all_with_both_experiments(self): + """Tests that the using both sampling experiments samples everything.""" + self.data_sampler = DataSampler.create( + PipelineOptions( + experiments=[ + DataSampler._ENABLE_DATA_SAMPLING, + DataSampler._ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING + ]), + sample_every_sec=0.1) + + # Create a descriptor with one transform with two outputs, 'a' and 'b'. + descriptor = self.make_test_descriptor(outputs=['a', 'b']) + self.data_sampler.initialize_samplers( + MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory) + + # Get the samples for the two outputs. + a_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0) + b_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1) + + # Sample an exception for the output 'a', this will show up in the final + # samples response. + exc_info = None + try: + raise Exception('test') + except Exception: + exc_info = sys.exc_info() + a_sampler.sample_exception('a', exc_info, MAIN_TRANSFORM_ID, 'instid') + + # Sample a normal element for the output 'b', this will not show up in the + # final samples response. + b_sampler.element_sampler.el = 'b' + b_sampler.element_sampler.has_element = True + + samples = self.data_sampler.wait_for_samples(['a', 'b']) + self.assertEqual(len(samples.element_samples), 2) + self.assertTrue( + samples.element_samples['a'].elements[0].HasField('exception')) + self.assertFalse( + samples.element_samples['b'].elements[0].HasField('exception')) + + def test_only_sample_exceptions(self): + """Tests that the exception sampling experiment only samples exceptions.""" + self.data_sampler = DataSampler.create( + PipelineOptions( + experiments=[DataSampler._ENABLE_ALWAYS_ON_EXCEPTION_SAMPLING]), + sample_every_sec=0.1) + + # Create a descriptor with one transform with two outputs, 'a' and 'b'. + descriptor = self.make_test_descriptor(outputs=['a', 'b']) + self.data_sampler.initialize_samplers( + MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory) + + # Get the samples for the two outputs. + a_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0) + b_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1) + + # Sample an exception for the output 'a', this will show up in the final + # samples response. + exc_info = None + try: + raise Exception('test') + except Exception: + exc_info = sys.exc_info() + a_sampler.sample_exception('a', exc_info, MAIN_TRANSFORM_ID, 'instid') + + # Sample a normal element for the output 'b', this will not show up in the + # final samples response. + b_sampler.element_sampler.el = 'b' + b_sampler.element_sampler.has_element = True + + samples = self.data_sampler.wait_for_samples([]) + self.assertEqual(len(samples.element_samples), 1) + self.assertIsNotNone(samples.element_samples['a'].elements[0].exception) + class OutputSamplerTest(unittest.TestCase): def tearDown(self): diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 4b49afb14cd2..bdb16a46aeea 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -207,14 +207,16 @@ def update_counters_start(self, windowed_value): # type: (WindowedValue) -> None self.opcounter.update_from(windowed_value) + if self.execution_context is not None: + self.execution_context.output_sampler = self.output_sampler + # The following code is optimized by inlining a function call. Because this # is called for every element, a function call is too expensive (order of # 100s of nanoseconds). Furthermore, a lock was purposefully not used # between here and the DataSampler as an additional operation. The tradeoff # is that some samples might be dropped, but it is better than the # alternative which is double sampling the same element. - if self.element_sampler is not None and self.execution_context is not None: - self.execution_context.output_sampler = self.output_sampler + if self.element_sampler is not None: if not self.element_sampler.has_element: self.element_sampler.el = windowed_value self.element_sampler.has_element = True diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 87cf06e862ab..d3442fcb5987 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -153,9 +153,7 @@ def create_harness(environment, dry_run=False): if dry_run: return - data_sampler = None - if 'enable_data_sampling' in experiments: - data_sampler = DataSampler() + data_sampler = DataSampler.create(sdk_pipeline_options) sdk_harness = SdkHarness( control_address=control_service_descriptor.url,