Skip to content

Commit

Permalink
Compute cluster duration as create time + sum of query run times in d…
Browse files Browse the repository at this point in the history
…pb_sparksql_benchmark.

PiperOrigin-RevId: 565744564
  • Loading branch information
dorellang authored and copybara-github committed Sep 15, 2023
1 parent 7e74db9 commit da82c21
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 21 deletions.
21 changes: 20 additions & 1 deletion perfkitbenchmarker/dpb_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def __init__(self, dpb_service_spec):
'Dynamic allocation off is not supported for the current DPB '
f'Service: {type(self).__name__}.'
)
self.cluster_duration = None
self._InitializeMetadata()

def GetDpbVersion(self) -> Optional[str]:
Expand Down Expand Up @@ -549,7 +550,7 @@ def GetClusterDuration(self) -> Optional[float]:
A float representing the number of seconds the cluster has been running or
None if it cannot be obtained.
"""
return None
return self.cluster_duration

def GetClusterCost(self) -> Optional[float]:
"""Gets the cost of running the cluster if applicable.
Expand Down Expand Up @@ -602,15 +603,33 @@ def GetSamples(self) -> list[sample.Sample]:
"""Gets samples with service statistics."""
samples = []
metrics: dict[str, tuple[Optional[float], str]] = {
# Cluster creation time as reported by the DPB service
# (non-Serverless DPB services only).
'dpb_cluster_create_time': (self.GetClusterCreateTime(), 'seconds'),

# Cluster duration as computed by the underlying benchmark.
# (non-Serverless DPB services only).
'dpb_cluster_duration': (self.GetClusterDuration(), 'seconds'),

# Cluster hardware cost computed from cluster duration and
# hourly costs passed in flags (non-Serverless DPB services only).
'dpb_cluster_hardware_cost': (self.GetClusterHardwareCost(), '$'),

# Cluster DPB service premium cost computed from cluster duration and
# hourly costs passed in flags (non-Serverless DPB services only).
'dpb_cluster_premium_cost': (self.GetClusterPremiumCost(), '$'),

# Cluster hardware cost computed from cluster duration and
# hourly costs passed in flags (non-Serverless DPB services only).
'dpb_cluster_total_cost': (self.GetClusterCost(), '$'),

# Cluster hardware cost per hour as specified in PKB flags.
'dpb_cluster_hardware_hourly_cost': (
_HARDWARE_HOURLY_COST.value,
'$/hour',
),

# DPB Service premium cost per hour as specified in PKB flags.
'dpb_cluster_premium_hourly_cost': (
_SERVICE_PREMIUM_HOURLY_COST.value,
'$/hour',
Expand Down
26 changes: 26 additions & 0 deletions perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,15 +414,25 @@ def _GetGlobalSamples(
sorted(set(FLAGS.dpb_sparksql_order) - all_passing_queries)
)

cumulative_run_time = sum(run_times.values())
cluster.cluster_duration = (
cumulative_run_time + cluster.GetClusterCreateTime()
)
cluster.metadata.update(metadata)

# TODO(user): Compute aggregated time for each query across streams and
# iterations.

# Wall time of the DPB service job submitted as reported by the DPB service.
# Should include sparksql_cumulative_run_time. Doesn't include
# dpb_sparksql_job_pending.
samples.append(
sample.Sample(
'sparksql_total_wall_time', job_result.wall_time, 'seconds', metadata
)
)

# Geomean of all the passing queries' run time.
samples.append(
sample.Sample(
'sparksql_geomean_run_time',
Expand All @@ -431,6 +441,19 @@ def _GetGlobalSamples(
metadata,
)
)

# Sum of all the passing queries' run time.
samples.append(
sample.Sample(
'sparksql_cumulative_run_time',
cumulative_run_time,
'seconds',
metadata,
)
)

# Time the DPB service job (AKA Spark application) was queued before running,
# as reported by the DPB service.
samples.append(
sample.Sample(
'dpb_sparksql_job_pending',
Expand All @@ -439,9 +462,12 @@ def _GetGlobalSamples(
metadata,
)
)

if FLAGS.dpb_export_job_stats:
run_cost = cluster.CalculateLastJobCost()
if run_cost is not None:
# Run cost of the job last DPB service job (valid for Serverless DPB
# services implementations only).
samples.append(
sample.Sample('sparksql_run_cost', run_cost, '$', metadata)
)
Expand Down
8 changes: 0 additions & 8 deletions perfkitbenchmarker/providers/aws/aws_dpb_emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,6 @@ def GetClusterCreateTime(self) -> Optional[float]:
"""
return self._cluster_ready_time - self._cluster_create_time

def GetClusterDuration(self) -> Optional[float]:
if (
self._cluster_create_time is not None
and self._cluster_delete_time is not None
):
return self._cluster_delete_time - self._cluster_create_time
return None

@staticmethod
def CheckPrerequisites(benchmark_config):
del benchmark_config # Unused
Expand Down
8 changes: 0 additions & 8 deletions perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,6 @@ def GetClusterCreateTime(self) -> Optional[float]:
"""
return self._cluster_ready_time - self._cluster_create_time

def GetClusterDuration(self) -> Optional[float]:
if (
self._cluster_create_time is not None
and self._cluster_delete_time is not None
):
return self._cluster_delete_time - self._cluster_create_time
return None

def _Create(self):
"""Creates the cluster."""
cmd = self.DataprocGcloudCommand('clusters', 'create', self.cluster_id)
Expand Down
5 changes: 1 addition & 4 deletions tests/dpb_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,12 @@ def __init__(
):
super().__init__(dpb_service_spec)
self._cluster_create_time = cluster_create_time
self._cluster_duration = cluster_duration
self.cluster_duration = cluster_duration
self.metadata = {'foo': 42}

def GetClusterCreateTime(self) -> Optional[float]:
return self._cluster_create_time

def GetClusterDuration(self) -> Optional[float]:
return self._cluster_duration

def SubmitJob(self, *args, **kwargs) -> dpb_service.JobResult:
return dpb_service.JobResult(run_time=1)

Expand Down

0 comments on commit da82c21

Please sign in to comment.