From a806c0ed3a9ead3b7738ae94bf323865abbd5e73 Mon Sep 17 00:00:00 2001 From: Jack McCluskey <34928439+jrmccluskey@users.noreply.github.com> Date: Mon, 9 Dec 2024 13:20:12 -0500 Subject: [PATCH] Add Dataflow Cost Benchmark framework to Beam Python (#33297) * initial benchmark framework code * Implement Dataflow cost benchmark framework + add wordcount example * formatting * move to base wordcount instead * add comment for pipeline execution in wordcount --- ...rdcount_Python_Cost_Benchmark_Dataflow.yml | 91 ++++++++++++++ .../python_wordcount.txt | 28 +++++ sdks/python/apache_beam/examples/wordcount.py | 39 +++--- .../testing/benchmarks/wordcount/__init__.py | 16 +++ .../testing/benchmarks/wordcount/wordcount.py | 39 ++++++ .../load_tests/dataflow_cost_benchmark.py | 113 ++++++++++++++++++ .../load_tests/dataflow_cost_consts.py | 59 +++++++++ 7 files changed, 368 insertions(+), 17 deletions(-) create mode 100644 .github/workflows/beam_Wordcount_Python_Cost_Benchmark_Dataflow.yml create mode 100644 .github/workflows/cost-benchmarks-pipeline-options/python_wordcount.txt create mode 100644 sdks/python/apache_beam/testing/benchmarks/wordcount/__init__.py create mode 100644 sdks/python/apache_beam/testing/benchmarks/wordcount/wordcount.py create mode 100644 sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py create mode 100644 sdks/python/apache_beam/testing/load_tests/dataflow_cost_consts.py diff --git a/.github/workflows/beam_Wordcount_Python_Cost_Benchmark_Dataflow.yml b/.github/workflows/beam_Wordcount_Python_Cost_Benchmark_Dataflow.yml new file mode 100644 index 000000000000..51d1005affbc --- /dev/null +++ b/.github/workflows/beam_Wordcount_Python_Cost_Benchmark_Dataflow.yml @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Wordcount Python Cost Benchmarks Dataflow + +on: + workflow_dispatch: + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: read + checks: read + contents: read + deployments: read + id-token: none + issues: read + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +env: + DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + INFLUXDB_USER: ${{ secrets.INFLUXDB_USER }} + INFLUXDB_USER_PASSWORD: ${{ secrets.INFLUXDB_USER_PASSWORD }} + +jobs: + beam_Inference_Python_Benchmarks_Dataflow: + if: | + github.event_name == 'workflow_dispatch' + runs-on: [self-hosted, ubuntu-20.04, main] + timeout-minutes: 900 + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + strategy: + matrix: + job_name: ["beam_Wordcount_Python_Cost_Benchmarks_Dataflow"] + job_phrase: ["Run Wordcount Cost Benchmark"] + steps: + - uses: actions/checkout@v4 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup Python environment + uses: ./.github/actions/setup-environment-action + with: + python-version: '3.10' + - name: Prepare test arguments + uses: ./.github/actions/test-arguments-action + with: + test-type: load + test-language: python + argument-file-paths: | + ${{ github.workspace }}/.github/workflows/cost-benchmarks-pipeline-options/python_wordcount.txt + # The env variables are created and populated in the test-arguments-action as "_test_arguments_" + - name: get current time + run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV + - name: run wordcount on Dataflow Python + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 30 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.wordcount.wordcount \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \ \ No newline at end of file diff --git a/.github/workflows/cost-benchmarks-pipeline-options/python_wordcount.txt b/.github/workflows/cost-benchmarks-pipeline-options/python_wordcount.txt new file mode 100644 index 000000000000..424936ddad97 --- /dev/null +++ b/.github/workflows/cost-benchmarks-pipeline-options/python_wordcount.txt @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--region=us-central1 +--machine_type=n1-standard-2 +--num_workers=1 +--disk_size_gb=50 +--autoscaling_algorithm=NONE +--input_options={} +--staging_location=gs://temp-storage-for-perf-tests/loadtests +--temp_location=gs://temp-storage-for-perf-tests/loadtests +--publish_to_big_query=true +--metrics_dataset=beam_run_inference +--metrics_table=python_wordcount +--runner=DataflowRunner \ No newline at end of file diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 31407aec6c40..a9138647581c 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -45,6 +45,7 @@ from apache_beam.io import WriteToText from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.runners.runner import PipelineResult class WordExtractingDoFn(beam.DoFn): @@ -63,7 +64,7 @@ def process(self, element): return re.findall(r'[\w\']+', element, re.UNICODE) -def run(argv=None, save_main_session=True): +def run(argv=None, save_main_session=True) -> PipelineResult: """Main entry point; defines and runs the wordcount pipeline.""" parser = argparse.ArgumentParser() parser.add_argument( @@ -83,27 +84,31 @@ def run(argv=None, save_main_session=True): pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = save_main_session - # The pipeline will be run on exiting the with block. - with beam.Pipeline(options=pipeline_options) as p: + pipeline = beam.Pipeline(options=pipeline_options) - # Read the text file[pattern] into a PCollection. - lines = p | 'Read' >> ReadFromText(known_args.input) + # Read the text file[pattern] into a PCollection. + lines = pipeline | 'Read' >> ReadFromText(known_args.input) - counts = ( - lines - | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) - | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) - | 'GroupAndSum' >> beam.CombinePerKey(sum)) + counts = ( + lines + | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) + | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) + | 'GroupAndSum' >> beam.CombinePerKey(sum)) - # Format the counts into a PCollection of strings. - def format_result(word, count): - return '%s: %d' % (word, count) + # Format the counts into a PCollection of strings. + def format_result(word, count): + return '%s: %d' % (word, count) - output = counts | 'Format' >> beam.MapTuple(format_result) + output = counts | 'Format' >> beam.MapTuple(format_result) - # Write the output using a "Write" transform that has side effects. - # pylint: disable=expression-not-assigned - output | 'Write' >> WriteToText(known_args.output) + # Write the output using a "Write" transform that has side effects. + # pylint: disable=expression-not-assigned + output | 'Write' >> WriteToText(known_args.output) + + # Execute the pipeline and return the result. + result = pipeline.run() + result.wait_until_finish() + return result if __name__ == '__main__': diff --git a/sdks/python/apache_beam/testing/benchmarks/wordcount/__init__.py b/sdks/python/apache_beam/testing/benchmarks/wordcount/__init__.py new file mode 100644 index 000000000000..cce3acad34a4 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/wordcount/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/sdks/python/apache_beam/testing/benchmarks/wordcount/wordcount.py b/sdks/python/apache_beam/testing/benchmarks/wordcount/wordcount.py new file mode 100644 index 000000000000..513ede47e80a --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/wordcount/wordcount.py @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pytype: skip-file + +import logging + +from apache_beam.examples import wordcount +from apache_beam.testing.load_tests.dataflow_cost_benchmark import DataflowCostBenchmark + + +class WordcountCostBenchmark(DataflowCostBenchmark): + def __init__(self): + super().__init__() + + def test(self): + extra_opts = {} + extra_opts['output'] = self.pipeline.get_option('output_file') + self.result = wordcount.run( + self.pipeline.get_full_options_as_args(**extra_opts), + save_main_session=False) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + WordcountCostBenchmark().run() diff --git a/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py new file mode 100644 index 000000000000..b60af1249756 --- /dev/null +++ b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_benchmark.py @@ -0,0 +1,113 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pytype: skip-file + +import logging +import time +from typing import Any +from typing import Optional + +import apache_beam.testing.load_tests.dataflow_cost_consts as costs +from apache_beam.metrics.execution import MetricResult +from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult +from apache_beam.runners.runner import PipelineState +from apache_beam.testing.load_tests.load_test import LoadTest + + +class DataflowCostBenchmark(LoadTest): + """Base class for Dataflow performance tests which export metrics to + external databases: BigQuery or/and InfluxDB. Calculates the expected cost + for running the job on Dataflow in region us-central1. + + Refer to :class:`~apache_beam.testing.load_tests.LoadTestOptions` for more + information on the required pipeline options. + + If using InfluxDB with Basic HTTP authentication enabled, provide the + following environment options: `INFLUXDB_USER` and `INFLUXDB_USER_PASSWORD`. + + If the hardware configuration for the job includes use of a GPU, please + specify the version in use with the Accelerator enumeration. This is used to + calculate the cost of the job later, as different accelerators have different + billing rates per hour of use. + """ + def __init__( + self, + metrics_namespace: Optional[str] = None, + is_streaming: bool = False, + gpu: Optional[costs.Accelerator] = None): + self.is_streaming = is_streaming + self.gpu = gpu + super().__init__(metrics_namespace=metrics_namespace) + + def run(self): + try: + self.test() + if not hasattr(self, 'result'): + self.result = self.pipeline.run() + # Defaults to waiting forever unless timeout has been set + state = self.result.wait_until_finish(duration=self.timeout_ms) + assert state != PipelineState.FAILED + logging.info( + 'Pipeline complete, sleeping for 4 minutes to allow resource ' + 'metrics to populate.') + time.sleep(240) + self.extra_metrics = self._retrieve_cost_metrics(self.result) + self._metrics_monitor.publish_metrics(self.result, self.extra_metrics) + finally: + self.cleanup() + + def _retrieve_cost_metrics(self, + result: DataflowPipelineResult) -> dict[str, Any]: + job_id = result.job_id() + metrics = result.metrics().all_metrics(job_id) + metrics_dict = self._process_metrics_list(metrics) + logging.info(metrics_dict) + cost = 0.0 + if (self.is_streaming): + cost += metrics_dict.get( + "TotalVcpuTime", 0.0) / 3600 * costs.VCPU_PER_HR_STREAMING + cost += ( + metrics_dict.get("TotalMemoryUsage", 0.0) / + 1000) / 3600 * costs.MEM_PER_GB_HR_STREAMING + cost += metrics_dict.get( + "TotalStreamingDataProcessed", 0.0) * costs.SHUFFLE_PER_GB_STREAMING + else: + cost += metrics_dict.get( + "TotalVcpuTime", 0.0) / 3600 * costs.VCPU_PER_HR_BATCH + cost += ( + metrics_dict.get("TotalMemoryUsage", 0.0) / + 1000) / 3600 * costs.MEM_PER_GB_HR_BATCH + cost += metrics_dict.get( + "TotalStreamingDataProcessed", 0.0) * costs.SHUFFLE_PER_GB_BATCH + if (self.gpu): + rate = costs.ACCELERATOR_TO_COST[self.gpu] + cost += metrics_dict.get("TotalGpuTime", 0.0) / 3600 * rate + cost += metrics_dict.get("TotalPdUsage", 0.0) / 3600 * costs.PD_PER_GB_HR + cost += metrics_dict.get( + "TotalSsdUsage", 0.0) / 3600 * costs.PD_SSD_PER_GB_HR + metrics_dict["EstimatedCost"] = cost + return metrics_dict + + def _process_metrics_list(self, + metrics: list[MetricResult]) -> dict[str, Any]: + system_metrics = {} + for entry in metrics: + metric_key = entry.key + metric = metric_key.metric + if metric_key.step == '' and metric.namespace == 'dataflow/v1b3': + system_metrics[metric.name] = entry.committed + return system_metrics diff --git a/sdks/python/apache_beam/testing/load_tests/dataflow_cost_consts.py b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_consts.py new file mode 100644 index 000000000000..f291991b48bb --- /dev/null +++ b/sdks/python/apache_beam/testing/load_tests/dataflow_cost_consts.py @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# These values are Dataflow costs for running jobs in us-central1. +# The cost values are found at https://cloud.google.com/dataflow/pricing + +from enum import Enum + +VCPU_PER_HR_BATCH = 0.056 +VCPU_PER_HR_STREAMING = 0.069 +MEM_PER_GB_HR_BATCH = 0.003557 +MEM_PER_GB_HR_STREAMING = 0.0035557 +PD_PER_GB_HR = 0.000054 +PD_SSD_PER_GB_HR = 0.000298 +SHUFFLE_PER_GB_BATCH = 0.011 +SHUFFLE_PER_GB_STREAMING = 0.018 + +# GPU Resource Pricing +P100_PER_GPU_PER_HOUR = 1.752 +V100_PER_GPU_PER_HOUR = 2.976 +T4_PER_GPU_PER_HOUR = 0.42 +P4_PER_GPU_PER_HOUR = 0.72 +L4_PER_GPU_PER_HOUR = 0.672 +A100_40GB_PER_GPU_PER_HOUR = 3.72 +A100_80GB_PER_GPU_PER_HOUR = 4.7137 + + +class Accelerator(Enum): + P100 = 1 + V100 = 2 + T4 = 3 + P4 = 4 + L4 = 5 + A100_40GB = 6 + A100_80GB = 7 + + +ACCELERATOR_TO_COST: dict[Accelerator, float] = { + Accelerator.P100: P100_PER_GPU_PER_HOUR, + Accelerator.V100: V100_PER_GPU_PER_HOUR, + Accelerator.T4: T4_PER_GPU_PER_HOUR, + Accelerator.P4: P4_PER_GPU_PER_HOUR, + Accelerator.L4: L4_PER_GPU_PER_HOUR, + Accelerator.A100_40GB: A100_40GB_PER_GPU_PER_HOUR, + Accelerator.A100_80GB: A100_80GB_PER_GPU_PER_HOUR, +}