diff --git a/sdks/python/apache_beam/testing/load_tests/load_test.py b/sdks/python/apache_beam/testing/load_tests/load_test.py index 20dea3932b49..c9c20f82800e 100644 --- a/sdks/python/apache_beam/testing/load_tests/load_test.py +++ b/sdks/python/apache_beam/testing/load_tests/load_test.py @@ -25,6 +25,7 @@ from apache_beam.metrics import MetricsFilter from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners.runner import PipelineState from apache_beam.testing.load_tests.load_test_metrics_utils import InfluxDBMetricsPublisherOptions from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsReader @@ -133,7 +134,8 @@ def __init__(self, metrics_namespace=None): os.getenv('INFLUXDB_USER'), os.getenv('INFLUXDB_USER_PASSWORD')), # Apply filter to prevent system metrics from being published - filters=MetricsFilter().with_namespace(self.metrics_namespace)) + filters=MetricsFilter().with_namespace(self.metrics_namespace), + is_streaming_pipeline=options.view_as(StandardOptions).streaming) def test(self): """An abstract method where the pipeline definition should be put.""" diff --git a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py index 7e38be79e995..27f3bf874948 100644 --- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py +++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py @@ -45,6 +45,7 @@ import apache_beam as beam from apache_beam.metrics import Metrics +from apache_beam.metrics.execution import MetricResult from apache_beam.metrics.metric import MetricResults from apache_beam.metrics.metric import MetricsFilter from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult @@ -91,6 +92,15 @@ }] _LOGGER = logging.getLogger(__name__) +VCPU_PER_HR_STREAMING = 0.069 +VCPU_PER_HR_BATCH = 0.056 +VCPU_PER_HR_STREAMING = 0.069 +MEM_PER_GB_HR_BATCH = 0.003557 +MEM_PER_GB_HR_STREAMING = 0.0035557 +PD_PER_GB_HR = 0.000054 +PD_SSD_PER_GB_HR = 0.000298 +SHUFFLE_PER_GB_BATCH = 0.011 +SHUFFLE_PER_GB_STREAMING = 0.018 def parse_step(step_name): @@ -203,7 +213,9 @@ def __init__( publish_to_bq=False, influxdb_options=None, # type: Optional[InfluxDBMetricsPublisherOptions] namespace=None, - filters=None): + filters=None, + is_streaming_pipeline=False, + ): """Initializes :class:`MetricsReader` . Args: @@ -217,6 +229,7 @@ def __init__( self.publishers: List[MetricsPublisher] = [] # publish to console output self.publishers.append(ConsoleMetricsPublisher()) + self.is_streaming_pipeline = is_streaming_pipeline bq_check = project_name and bq_table and bq_dataset and publish_to_bq if bq_check: @@ -271,6 +284,16 @@ def publish_metrics( job_id = None if isinstance(result, DataflowPipelineResult): job_id = result.job_id() + total_cost = compute_dataflow_cost( + result=result, is_streaming_pipeline=self.is_streaming_pipeline) + # we might need time.sleep here. + insert_dicts.append([ + Metric( + submit_timestamp=time.time(), + metric_id=metric_id, + value=total_cost, + label='cost') + ]) self._add_job_id_to_metrics(insert_dicts, job_id) if len(insert_dicts) > 0: @@ -309,7 +332,6 @@ def publish_values(self, labeled_values): publisher.publish(metric_dicts) def _prepare_all_metrics(self, metrics, metric_id): - insert_rows = self._get_counters(metrics['counters'], metric_id) insert_rows += self._get_distributions(metrics['distributions'], metric_id) return insert_rows @@ -680,3 +702,57 @@ def __init__(self): def process(self, element): yield self.timestamp_val_fn( element, self.timestamp_fn(micros=int(self.time_fn() * 1000000))) + + +def compute_dataflow_cost( + result: DataflowPipelineResult, + is_streaming_pipeline: bool = False, +) -> float: + """ + Calculates the approximate cost of a Dataflow pipeline. + + Args: + result: The DataflowPipelineResult object containing job metrics. + is_streaming_pipeline: Whether the pipeline is streaming or batch. + + Returns: + The total cost of the pipeline in USD. + """ + + metrics: List[MetricResult] = result.metrics().all_metrics() + target_metrics = ( + "TotalVcpuTime", + "TotalMemoryUsage", + "TotalStreamingDataProcessed", + "TotalShuffleDataProcessed", + "TotalPdUsage", + "TotalSsdUsage") + dataflow_service_metrics = {} + for metric_result in metrics: + metric_name = metric_result.key.metric.name + if metric_name in target_metrics: + dataflow_service_metrics[metric_name] = metric_result.result + + cost = 0 + cost += dataflow_service_metrics.get( + "TotalVcpuTime") / 3600 * VCPU_PER_HR_BATCH if ( + not is_streaming_pipeline) else VCPU_PER_HR_STREAMING + cost += (dataflow_service_metrics.get("TotalMemoryUsage") / + 1000) / 3600 * MEM_PER_GB_HR_BATCH if ( + not is_streaming_pipeline) else MEM_PER_GB_HR_STREAMING + + # Additional costs for streaming or shuffle data + if is_streaming_pipeline: + cost += dataflow_service_metrics.get( + "TotalStreamingDataProcessed") * SHUFFLE_PER_GB_STREAMING + + cost += dataflow_service_metrics.get( + "TotalShuffleDataProcessed" + ) * SHUFFLE_PER_GB_BATCH if not is_streaming_pipeline else 0 + + # Add persistent disk costs + cost += dataflow_service_metrics.get("TotalPdUsage") / 3600 * PD_PER_GB_HR + cost += dataflow_service_metrics.get( + "TotalSsdUsage") / 3600 * PD_SSD_PER_GB_HR + + return cost