Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python]Enable state cache to 100 MB #28781

Merged
merged 13 commits into from
Oct 31, 2023
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)).
jobs using the DataStream API. By default the option is set to false, so the batch jobs are still executed
using the DataSet API.
* `upload_graph` as one of the Experiments options for DataflowRunner is no longer required when the graph is larger than 10MB for Java SDK ([PR#28621](https://github.com/apache/beam/pull/28621).
* state cache has been enabled to a default of 100 MB. Use `--max_cache_memory_usage_mb=X` to provide cache size for the user state API and side inputs. (Python) ([#28770](https://github.com/apache/beam/issues/28770)).

## Breaking Changes

Expand Down
14 changes: 14 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,20 @@ def _add_argparse_args(cls, parser):
dest='min_cpu_platform',
type=str,
help='GCE minimum CPU platform. Default is determined by GCP.')
parser.add_argument(
'--max_cache_memory_usage_mb',
dest='max_cache_memory_usage_mb',
type=int,
default=100,
help=(
'Size of the SdkHarness cache to store user state and side inputs '
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider following wording

            'Size of the SDK Harness cache to store user state and side inputs '
            'in MB. Default is 100MB. If the cache is full, least recently '
            'used elements will be evicted.  This cache is per '
            'each SDK Harness instance. SDK Harness  is a component  responsible '
            'for executing the user code and communicating with the runner. ' 
            'Depending on the runner, '
            'there may be more than one SDK Harness process running on the same worker node. '
            'Increasing cache size  might improve performance of some pipelines, but can lead to an increase '
            'in memory consumption and OOM errors if workers are not appropriately provisioned.'

'in MB. Default is 100MB. If the cache is full, least recently '
'used elements will be evicted. This cache will be per '
'SdkHarness. SDKHarness is a python process,'
'responsible for executing the user code and communicating with '
'the runner through the Fn API. Depending on the runner, '
'there may be more than 1 process running on the same worker node.'
))

def validate(self, validator):
errors = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ def start_and_replace_loopback_environments(pipeline, options):
portable_options.environment_config, server = (
worker_pool_main.BeamFnExternalWorkerPoolServicer.start(
state_cache_size=
sdk_worker_main._get_state_cache_size(experiments),
sdk_worker_main._get_state_cache_size_bytes(
options=options),
data_buffer_time_limit_ms=
sdk_worker_main._get_data_buffer_time_limit_ms(experiments),
use_process=use_loopback_process_worker))
Expand Down
25 changes: 16 additions & 9 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import ProfilingOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners.internal import names
Expand Down Expand Up @@ -159,7 +160,8 @@ def create_harness(environment, dry_run=False):
control_address=control_service_descriptor.url,
status_address=status_service_descriptor.url,
worker_id=_worker_id,
state_cache_size=_get_state_cache_size(experiments),
state_cache_size=_get_state_cache_size_bytes(
options=sdk_pipeline_options),
data_buffer_time_limit_ms=_get_data_buffer_time_limit_ms(experiments),
profiler_factory=profiler.Profile.factory_from_options(
sdk_pipeline_options.view_as(ProfilingOptions)),
Expand Down Expand Up @@ -239,24 +241,29 @@ def _parse_pipeline_options(options_json):
return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))


def _get_state_cache_size(experiments):
"""Defines the upper number of state items to cache.

Note: state_cache_size is an experimental flag and might not be available in
future releases.
def _get_state_cache_size_bytes(options):
"""Return the maximum size of state cache in bytes.

Returns:
an int indicating the maximum number of megabytes to cache.
an int indicating the maximum number of bytes to cache.
Default is 100 MB
AnandInguva marked this conversation as resolved.
Show resolved Hide resolved
"""

max_cache_memory_usage_mb = options.view_as(
WorkerOptions).max_cache_memory_usage_mb
# to maintain backward compatibility
experiments = options.view_as(DebugOptions).experiments or []
for experiment in experiments:
# There should only be 1 match so returning from the loop
if re.match(r'state_cache_size=', experiment):
damccorm marked this conversation as resolved.
Show resolved Hide resolved
_LOGGER.warning(
'--experiments=state_cache_size=X is deprecated and will be removed '
'in future releases.'
'Please use --max_cache_memory_usage_mb=X to set the cache size for '
'user state API and side inputs.')
return int(
re.match(r'state_cache_size=(?P<state_cache_size>.*)',
experiment).group('state_cache_size')) << 20
return 100 << 20
return max_cache_memory_usage_mb << 20


def _get_data_buffer_time_limit_ms(experiments):
Expand Down
13 changes: 13 additions & 0 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,19 @@ def test_gcp_profiler_uses_job_name_when_enabled_as_experiment(self):
sdk_worker_main._start_profiler(gcp_profiler_name, "version")
sdk_worker_main._start_profiler.assert_called_with("sample_job", "version")

@unittest.mock.patch.dict(os.environ, {"JOB_NAME": "sample_job"}, clear=True)
def test_pipeline_option_max_cache_memory_usage_mb(self):
options = PipelineOptions(flags=['--max_cache_memory_usage_mb=50'])

cache_size = sdk_worker_main._get_state_cache_size_bytes(options)
self.assertEqual(cache_size, 50 << 20)

@unittest.mock.patch.dict(os.environ, {"JOB_NAME": "sample_job"}, clear=True)
def test_pipeline_option_max_cache_memory_usage_mb_with_experiments(self):
options = PipelineOptions(flags=['--experiments=state_cache_size=50'])
cache_size = sdk_worker_main._get_state_cache_size_bytes(options)
self.assertEqual(cache_size, 50 << 20)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading