From b1ab7a3e02bd2fe94ffe549d129ae221d06da1b0 Mon Sep 17 00:00:00 2001 From: Anand Inguva Date: Wed, 4 Oct 2023 17:06:26 -0400 Subject: [PATCH] change state_cache_mb to 100 MB as default --- CHANGES.md | 34 +++++++++++++++++++ .../apache_beam/options/pipeline_options.py | 10 ++++++ .../runners/portability/portable_runner.py | 4 ++- .../runners/worker/sdk_worker_main.py | 31 ++++++++++------- 4 files changed, 65 insertions(+), 14 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 650b33c12407..df415f653fea 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -53,6 +53,40 @@ * ([#X](https://github.com/apache/beam/issues/X)). --> +# [2.52.0] - Unreleased + +## Highlights + +* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). +* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). + +## I/Os + +* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). + +## New Features / Improvements + +* state cache has been enabled to a default of 100 MB (Python) ([#28770](https://github.com/apache/beam/issues/28770)). + +## Breaking Changes + +* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). + +## Deprecations + +* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)). + +## Bugfixes + +* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). + +## Security Fixes +* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). + +## Known Issues + +* ([#X](https://github.com/apache/beam/issues/X)). + # [2.51.0] - Unreleased ## Highlights diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 3fbf7eff7dd6..9a7c4a79daae 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1127,6 +1127,16 @@ def _add_argparse_args(cls, parser): dest='min_cpu_platform', type=str, help='GCE minimum CPU platform. Default is determined by GCP.') + parser.add_argument( + '--state_cache_size', + '--state_cache_size_mb', + dest='state_cache_size', + type=int, + default=None, + help=( + 'Size of the state cache in MB. Default is 100MB.' + 'State cache is per process and is shared between all threads ' + 'within the process.')) def validate(self, validator): errors = [] diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 9ff03ec1d061..266d7aa127f3 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -415,7 +415,9 @@ def start_and_replace_loopback_environments(pipeline, options): portable_options.environment_config, server = ( worker_pool_main.BeamFnExternalWorkerPoolServicer.start( state_cache_size= - sdk_worker_main._get_state_cache_size(experiments), + sdk_worker_main._get_state_cache_size( + options=options, + experiments=experiments), data_buffer_time_limit_ms= sdk_worker_main._get_data_buffer_time_limit_ms(experiments), use_process=use_loopback_process_worker)) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index d3442fcb5987..9e75075f823a 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -36,6 +36,7 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import ProfilingOptions from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import WorkerOptions from apache_beam.options.value_provider import RuntimeValueProvider from apache_beam.portability.api import endpoints_pb2 from apache_beam.runners.internal import names @@ -159,7 +160,8 @@ def create_harness(environment, dry_run=False): control_address=control_service_descriptor.url, status_address=status_service_descriptor.url, worker_id=_worker_id, - state_cache_size=_get_state_cache_size(experiments), + state_cache_size=_get_state_cache_size( + options=sdk_pipeline_options, experiments=experiments), data_buffer_time_limit_ms=_get_data_buffer_time_limit_ms(experiments), profiler_factory=profiler.Profile.factory_from_options( sdk_pipeline_options.view_as(ProfilingOptions)), @@ -239,24 +241,27 @@ def _parse_pipeline_options(options_json): return PipelineOptions.from_dictionary(_load_pipeline_options(options_json)) -def _get_state_cache_size(experiments): - """Defines the upper number of state items to cache. - - Note: state_cache_size is an experimental flag and might not be available in - future releases. +def _get_state_cache_size(options, experiments): + """Defines the maximum size of the cache in megabytes. Returns: an int indicating the maximum number of megabytes to cache. Default is 0 MB """ + state_cache_size = options.view_as(WorkerOptions).state_cache_size - for experiment in experiments: - # There should only be 1 match so returning from the loop - if re.match(r'state_cache_size=', experiment): - return int( - re.match(r'state_cache_size=(?P.*)', - experiment).group('state_cache_size')) << 20 - return 0 + if not state_cache_size: + # to maintain backward compatibility + for experiment in experiments: + # There should only be 1 match so returning from the loop + if re.match(r'state_cache_size=', experiment): + return int( + re.match(r'state_cache_size=(?P.*)', + experiment).group('state_cache_size')) << 20 + + state_cache_size = 100 + + return state_cache_size << 20 def _get_data_buffer_time_limit_ms(experiments):