Skip to content

Commit

Permalink
change state_cache_mb to 100 MB as default
Browse files Browse the repository at this point in the history
  • Loading branch information
AnandInguva committed Oct 4, 2023
1 parent c2666e1 commit b1ab7a3
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 14 deletions.
34 changes: 34 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,40 @@
* ([#X](https://github.com/apache/beam/issues/X)).
-->

# [2.52.0] - Unreleased

## Highlights

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).

## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements

* state cache has been enabled to a default of 100 MB (Python) ([#28770](https://github.com/apache/beam/issues/28770)).

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).

## Deprecations

* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)).

## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).

# [2.51.0] - Unreleased

## Highlights
Expand Down
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,16 @@ def _add_argparse_args(cls, parser):
dest='min_cpu_platform',
type=str,
help='GCE minimum CPU platform. Default is determined by GCP.')
parser.add_argument(
'--state_cache_size',
'--state_cache_size_mb',
dest='state_cache_size',
type=int,
default=None,
help=(
'Size of the state cache in MB. Default is 100MB.'
'State cache is per process and is shared between all threads '
'within the process.'))

def validate(self, validator):
errors = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,9 @@ def start_and_replace_loopback_environments(pipeline, options):
portable_options.environment_config, server = (
worker_pool_main.BeamFnExternalWorkerPoolServicer.start(
state_cache_size=
sdk_worker_main._get_state_cache_size(experiments),
sdk_worker_main._get_state_cache_size(
options=options,
experiments=experiments),
data_buffer_time_limit_ms=
sdk_worker_main._get_data_buffer_time_limit_ms(experiments),
use_process=use_loopback_process_worker))
Expand Down
31 changes: 18 additions & 13 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import ProfilingOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners.internal import names
Expand Down Expand Up @@ -159,7 +160,8 @@ def create_harness(environment, dry_run=False):
control_address=control_service_descriptor.url,
status_address=status_service_descriptor.url,
worker_id=_worker_id,
state_cache_size=_get_state_cache_size(experiments),
state_cache_size=_get_state_cache_size(
options=sdk_pipeline_options, experiments=experiments),
data_buffer_time_limit_ms=_get_data_buffer_time_limit_ms(experiments),
profiler_factory=profiler.Profile.factory_from_options(
sdk_pipeline_options.view_as(ProfilingOptions)),
Expand Down Expand Up @@ -239,24 +241,27 @@ def _parse_pipeline_options(options_json):
return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))


def _get_state_cache_size(experiments):
"""Defines the upper number of state items to cache.
Note: state_cache_size is an experimental flag and might not be available in
future releases.
def _get_state_cache_size(options, experiments):
"""Defines the maximum size of the cache in megabytes.
Returns:
an int indicating the maximum number of megabytes to cache.
Default is 0 MB
"""
state_cache_size = options.view_as(WorkerOptions).state_cache_size

for experiment in experiments:
# There should only be 1 match so returning from the loop
if re.match(r'state_cache_size=', experiment):
return int(
re.match(r'state_cache_size=(?P<state_cache_size>.*)',
experiment).group('state_cache_size')) << 20
return 0
if not state_cache_size:
# to maintain backward compatibility
for experiment in experiments:
# There should only be 1 match so returning from the loop
if re.match(r'state_cache_size=', experiment):
return int(
re.match(r'state_cache_size=(?P<state_cache_size>.*)',
experiment).group('state_cache_size')) << 20

state_cache_size = 100

return state_cache_size << 20


def _get_data_buffer_time_limit_ms(experiments):
Expand Down

0 comments on commit b1ab7a3

Please sign in to comment.