Skip to content

Commit

Permalink
Add computing approximate cost for dataflow jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
AnandInguva committed Dec 19, 2023
1 parent 5ec9e7e commit a985c24
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 3 deletions.
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/testing/load_tests/load_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from apache_beam.metrics import MetricsFilter
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.runner import PipelineState
from apache_beam.testing.load_tests.load_test_metrics_utils import InfluxDBMetricsPublisherOptions
from apache_beam.testing.load_tests.load_test_metrics_utils import MetricsReader
Expand Down Expand Up @@ -133,7 +134,8 @@ def __init__(self, metrics_namespace=None):
os.getenv('INFLUXDB_USER'),
os.getenv('INFLUXDB_USER_PASSWORD')),
# Apply filter to prevent system metrics from being published
filters=MetricsFilter().with_namespace(self.metrics_namespace))
filters=MetricsFilter().with_namespace(self.metrics_namespace),
is_streaming_pipeline=options.view_as(StandardOptions).streaming)

def test(self):
"""An abstract method where the pipeline definition should be put."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import apache_beam as beam
from apache_beam.metrics import Metrics
from apache_beam.metrics.execution import MetricResult
from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
Expand Down Expand Up @@ -91,6 +92,15 @@
}]

_LOGGER = logging.getLogger(__name__)
VCPU_PER_HR_STREAMING = 0.069
VCPU_PER_HR_BATCH = 0.056
VCPU_PER_HR_STREAMING = 0.069
MEM_PER_GB_HR_BATCH = 0.003557
MEM_PER_GB_HR_STREAMING = 0.0035557
PD_PER_GB_HR = 0.000054
PD_SSD_PER_GB_HR = 0.000298
SHUFFLE_PER_GB_BATCH = 0.011
SHUFFLE_PER_GB_STREAMING = 0.018


def parse_step(step_name):
Expand Down Expand Up @@ -203,7 +213,9 @@ def __init__(
publish_to_bq=False,
influxdb_options=None, # type: Optional[InfluxDBMetricsPublisherOptions]
namespace=None,
filters=None):
filters=None,
is_streaming_pipeline=False,
):
"""Initializes :class:`MetricsReader` .
Args:
Expand All @@ -217,6 +229,7 @@ def __init__(
self.publishers: List[MetricsPublisher] = []
# publish to console output
self.publishers.append(ConsoleMetricsPublisher())
self.is_streaming_pipeline = is_streaming_pipeline

bq_check = project_name and bq_table and bq_dataset and publish_to_bq
if bq_check:
Expand Down Expand Up @@ -271,6 +284,16 @@ def publish_metrics(
job_id = None
if isinstance(result, DataflowPipelineResult):
job_id = result.job_id()
total_cost = compute_dataflow_cost(
result=result, is_streaming_pipeline=self.is_streaming_pipeline)
# we might need time.sleep here.
insert_dicts.append([
Metric(
submit_timestamp=time.time(),
metric_id=metric_id,
value=total_cost,
label='cost')
])
self._add_job_id_to_metrics(insert_dicts, job_id)

if len(insert_dicts) > 0:
Expand Down Expand Up @@ -309,7 +332,6 @@ def publish_values(self, labeled_values):
publisher.publish(metric_dicts)

def _prepare_all_metrics(self, metrics, metric_id):

insert_rows = self._get_counters(metrics['counters'], metric_id)
insert_rows += self._get_distributions(metrics['distributions'], metric_id)
return insert_rows
Expand Down Expand Up @@ -680,3 +702,57 @@ def __init__(self):
def process(self, element):
yield self.timestamp_val_fn(
element, self.timestamp_fn(micros=int(self.time_fn() * 1000000)))


def compute_dataflow_cost(
result: DataflowPipelineResult,
is_streaming_pipeline: bool = False,
) -> float:
"""
Calculates the approximate cost of a Dataflow pipeline.
Args:
result: The DataflowPipelineResult object containing job metrics.
is_streaming_pipeline: Whether the pipeline is streaming or batch.
Returns:
The total cost of the pipeline in USD.
"""

metrics: List[MetricResult] = result.metrics().all_metrics()
target_metrics = (
"TotalVcpuTime",
"TotalMemoryUsage",
"TotalStreamingDataProcessed",
"TotalShuffleDataProcessed",
"TotalPdUsage",
"TotalSsdUsage")
dataflow_service_metrics = {}
for metric_result in metrics:
metric_name = metric_result.key.metric.name
if metric_name in target_metrics:
dataflow_service_metrics[metric_name] = metric_result.result

cost = 0
cost += dataflow_service_metrics.get(
"TotalVcpuTime") / 3600 * VCPU_PER_HR_BATCH if (
not is_streaming_pipeline) else VCPU_PER_HR_STREAMING
cost += (dataflow_service_metrics.get("TotalMemoryUsage") /
1000) / 3600 * MEM_PER_GB_HR_BATCH if (
not is_streaming_pipeline) else MEM_PER_GB_HR_STREAMING

# Additional costs for streaming or shuffle data
if is_streaming_pipeline:
cost += dataflow_service_metrics.get(
"TotalStreamingDataProcessed") * SHUFFLE_PER_GB_STREAMING

cost += dataflow_service_metrics.get(
"TotalShuffleDataProcessed"
) * SHUFFLE_PER_GB_BATCH if not is_streaming_pipeline else 0

# Add persistent disk costs
cost += dataflow_service_metrics.get("TotalPdUsage") / 3600 * PD_PER_GB_HR
cost += dataflow_service_metrics.get(
"TotalSsdUsage") / 3600 * PD_SSD_PER_GB_HR

return cost

0 comments on commit a985c24

Please sign in to comment.