Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python]Enable state cache to 100 MB #28781

Merged
merged 13 commits into from
Oct 31, 2023
34 changes: 34 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,40 @@
* ([#X](https://github.com/apache/beam/issues/X)).
-->

# [2.52.0] - Unreleased

## Highlights

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).

## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements

* state cache has been enabled to a default of 100 MB (Python) ([#28770](https://github.com/apache/beam/issues/28770)).

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).

## Deprecations

* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)).

## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).

# [2.51.0] - Unreleased

## Highlights
Expand Down
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,17 @@ def _add_argparse_args(cls, parser):
dest='min_cpu_platform',
type=str,
help='GCE minimum CPU platform. Default is determined by GCP.')
parser.add_argument(
'--max_cache_memory_usage_mb',
dest='max_cache_memory_usage_mb',
type=int,
default=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any concerns to define the 100mb default here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current flow: If it is None here, it gives us an opportunity to look in --experiements for state_cache_size.

If the value is defined here as 100 MB, and if the user passes --experiments=state_cache_size, we should override 100 MB for the --experiments=state_cache_size.

I don't see any concerns of setting default here. might need to change some code though

help=(
'Size of the SdkHarness/Sdk Process cache in MB. Default is 100MB.'
AnandInguva marked this conversation as resolved.
Show resolved Hide resolved
'This cache is used to store the user state and side input '
'elements. If the cache is full, the least recently used '
'elements will be evicted. This cache will be per SdkHarness/Sdk '
'Process. SDKHarness is a python process that runs the user code.'))
AnandInguva marked this conversation as resolved.
Show resolved Hide resolved

def validate(self, validator):
errors = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ def start_and_replace_loopback_environments(pipeline, options):
portable_options.environment_config, server = (
worker_pool_main.BeamFnExternalWorkerPoolServicer.start(
state_cache_size=
sdk_worker_main._get_state_cache_size(experiments),
sdk_worker_main._get_state_cache_size_bytes(
options=options),
data_buffer_time_limit_ms=
sdk_worker_main._get_data_buffer_time_limit_ms(experiments),
use_process=use_loopback_process_worker))
Expand Down
33 changes: 19 additions & 14 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import ProfilingOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners.internal import names
Expand All @@ -46,6 +47,7 @@

_LOGGER = logging.getLogger(__name__)
_ENABLE_GOOGLE_CLOUD_PROFILER = 'enable_google_cloud_profiler'
_STATE_CACHE_SIZE_BYTES = 100 << 20


def _import_beam_plugins(plugins):
Expand Down Expand Up @@ -159,7 +161,8 @@ def create_harness(environment, dry_run=False):
control_address=control_service_descriptor.url,
status_address=status_service_descriptor.url,
worker_id=_worker_id,
state_cache_size=_get_state_cache_size(experiments),
state_cache_size=_get_state_cache_size_bytes(
options=sdk_pipeline_options),
data_buffer_time_limit_ms=_get_data_buffer_time_limit_ms(experiments),
profiler_factory=profiler.Profile.factory_from_options(
sdk_pipeline_options.view_as(ProfilingOptions)),
Expand Down Expand Up @@ -239,24 +242,26 @@ def _parse_pipeline_options(options_json):
return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))


def _get_state_cache_size(experiments):
"""Defines the upper number of state items to cache.

Note: state_cache_size is an experimental flag and might not be available in
future releases.
def _get_state_cache_size_bytes(options):
"""Return the maximun size of state cache in bytes.
AnandInguva marked this conversation as resolved.
Show resolved Hide resolved

Returns:
an int indicating the maximum number of megabytes to cache.
Default is 0 MB
"""

for experiment in experiments:
# There should only be 1 match so returning from the loop
if re.match(r'state_cache_size=', experiment):
return int(
re.match(r'state_cache_size=(?P<state_cache_size>.*)',
experiment).group('state_cache_size')) << 20
return 0
max_cache_memory_usage_mb = options.view_as(
WorkerOptions).max_cache_memory_usage_mb
if not max_cache_memory_usage_mb:
# to maintain backward compatibility
experiments = options.view_as(DebugOptions).experiments or []
for experiment in experiments:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty sure there is already a helper that does this parsing.

# There should only be 1 match so returning from the loop
if re.match(r'state_cache_size=', experiment):
return int(
re.match(r'state_cache_size=(?P<state_cache_size>.*)',
experiment).group('state_cache_size')) << 20
return _STATE_CACHE_SIZE_BYTES
return max_cache_memory_usage_mb << 20


def _get_data_buffer_time_limit_ms(experiments):
Expand Down
13 changes: 13 additions & 0 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,19 @@ def test_gcp_profiler_uses_job_name_when_enabled_as_experiment(self):
sdk_worker_main._start_profiler(gcp_profiler_name, "version")
sdk_worker_main._start_profiler.assert_called_with("sample_job", "version")

@unittest.mock.patch.dict(os.environ, {"JOB_NAME": "sample_job"}, clear=True)
def test_pipeline_option_max_cache_memory_usage_mb(self):
options = PipelineOptions(flags=['--max_cache_memory_usage_mb=100'])

cache_size = sdk_worker_main._get_state_cache_size_bytes(options)
self.assertEqual(cache_size, sdk_worker_main._STATE_CACHE_SIZE_BYTES)

@unittest.mock.patch.dict(os.environ, {"JOB_NAME": "sample_job"}, clear=True)
def test_pipeline_option_max_cache_memory_usage_mb_with_experiments(self):
options = PipelineOptions(flags=['--experiments=state_cache_size=100'])
cache_size = sdk_worker_main._get_state_cache_size_bytes(options)
self.assertEqual(cache_size, sdk_worker_main._STATE_CACHE_SIZE_BYTES)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Loading