Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python]Enable state cache to 100 MB #28781

Merged
merged 13 commits into from
Oct 31, 2023
34 changes: 34 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,40 @@
* ([#X](https://github.com/apache/beam/issues/X)).
-->

# [2.52.0] - Unreleased

## Highlights

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).

## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements

* state cache has been enabled to a default of 100 MB (Python) ([#28770](https://github.com/apache/beam/issues/28770)).

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).

## Deprecations

* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)).

## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).

# [2.51.0] - Unreleased

## Highlights
Expand Down
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,16 @@ def _add_argparse_args(cls, parser):
dest='min_cpu_platform',
type=str,
help='GCE minimum CPU platform. Default is determined by GCP.')
parser.add_argument(
'--state_cache_size',
'--state_cache_size_mb',
dest='state_cache_size',
type=int,
default=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any concerns to define the 100mb default here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current flow: If it is None here, it gives us an opportunity to look in --experiements for state_cache_size.

If the value is defined here as 100 MB, and if the user passes --experiments=state_cache_size, we should override 100 MB for the --experiments=state_cache_size.

I don't see any concerns of setting default here. might need to change some code though

help=(
'Size of the state cache in MB. Default is 100MB.'
'State cache is per process and is shared between all threads '
'within the process.'))

def validate(self, validator):
errors = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,9 @@ def start_and_replace_loopback_environments(pipeline, options):
portable_options.environment_config, server = (
worker_pool_main.BeamFnExternalWorkerPoolServicer.start(
state_cache_size=
sdk_worker_main._get_state_cache_size(experiments),
sdk_worker_main._get_state_cache_size(
options=options,
experiments=experiments),
data_buffer_time_limit_ms=
sdk_worker_main._get_data_buffer_time_limit_ms(experiments),
use_process=use_loopback_process_worker))
Expand Down
31 changes: 18 additions & 13 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import ProfilingOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners.internal import names
Expand Down Expand Up @@ -159,7 +160,8 @@ def create_harness(environment, dry_run=False):
control_address=control_service_descriptor.url,
status_address=status_service_descriptor.url,
worker_id=_worker_id,
state_cache_size=_get_state_cache_size(experiments),
state_cache_size=_get_state_cache_size(
options=sdk_pipeline_options, experiments=experiments),
data_buffer_time_limit_ms=_get_data_buffer_time_limit_ms(experiments),
profiler_factory=profiler.Profile.factory_from_options(
sdk_pipeline_options.view_as(ProfilingOptions)),
Expand Down Expand Up @@ -239,24 +241,27 @@ def _parse_pipeline_options(options_json):
return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))


def _get_state_cache_size(experiments):
"""Defines the upper number of state items to cache.

Note: state_cache_size is an experimental flag and might not be available in
future releases.
def _get_state_cache_size(options, experiments):
"""Defines the maximum size of the cache in megabytes.
AnandInguva marked this conversation as resolved.
Show resolved Hide resolved

Returns:
an int indicating the maximum number of megabytes to cache.
Default is 0 MB
"""
state_cache_size = options.view_as(WorkerOptions).state_cache_size

for experiment in experiments:
# There should only be 1 match so returning from the loop
if re.match(r'state_cache_size=', experiment):
return int(
re.match(r'state_cache_size=(?P<state_cache_size>.*)',
experiment).group('state_cache_size')) << 20
return 0
if not state_cache_size:
AnandInguva marked this conversation as resolved.
Show resolved Hide resolved
# to maintain backward compatibility
for experiment in experiments:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pretty sure there is already a helper that does this parsing.

# There should only be 1 match so returning from the loop
if re.match(r'state_cache_size=', experiment):
return int(
re.match(r'state_cache_size=(?P<state_cache_size>.*)',
experiment).group('state_cache_size')) << 20

state_cache_size = 100
AnandInguva marked this conversation as resolved.
Show resolved Hide resolved

return state_cache_size << 20


def _get_data_buffer_time_limit_ms(experiments):
Expand Down
Loading