From 69112cc1a6d6a5dee52f282f530d6218cdb76ce1 Mon Sep 17 00:00:00 2001 From: Roy Arsan Date: Thu, 11 Aug 2022 14:46:57 -0500 Subject: [PATCH 01/10] Add stats and cost calculator to GCP Dataflow provider. Include sample config file for wordcount benchmark --- .../configs/dataflow_wordcount.yaml | 85 ++++ .../dpb_wordcount_benchmark.py | 26 +- .../providers/gcp/gcp_dpb_dataflow.py | 363 +++++++++++++++++- 3 files changed, 454 insertions(+), 20 deletions(-) create mode 100644 perfkitbenchmarker/configs/dataflow_wordcount.yaml diff --git a/perfkitbenchmarker/configs/dataflow_wordcount.yaml b/perfkitbenchmarker/configs/dataflow_wordcount.yaml new file mode 100644 index 0000000000..4ee2a9b3cb --- /dev/null +++ b/perfkitbenchmarker/configs/dataflow_wordcount.yaml @@ -0,0 +1,85 @@ +################################# +# Worker machine configs +################################# +eight_core: &eight_core + vm_spec: + GCP: + machine_type: n1-standard-8 + disk_spec: + GCP: + disk_size: 300 + +four_core: &four_core + vm_spec: + GCP: + machine_type: n1-standard-4 + disk_spec: + GCP: + disk_size: 300 + +two_core: &two_core + vm_spec: + GCP: + machine_type: n1-standard-2 + disk_spec: + GCP: + disk_size: 300 + +################################################################## +# Benchmark flags specifying Dataflow template and parameters +################################################################## +flags: &myflags + dpb_service_zone: us-central1-a + dpb_wordcount_out_base: + dpb_dataflow_temp_location: gs:///temp + dpb_dataflow_staging_location: gs:///temp + dpb_job_jarfile: ./word-count-beam/target/word-count-beam-bundled-0.1.jar + dpb_job_classname: org.apache.beam.examples.WordCount + +################################# +# Benchmark variations to run +################################# +benchmarks: +- dpb_wordcount_benchmark: { + dpb_service: { service_type: dataflow, worker_count: 1, worker_group: *eight_core }, + flags: *myflags + } +- dpb_wordcount_benchmark: { + dpb_service: { service_type: dataflow, worker_count: 1, worker_group: *four_core }, + flags: *myflags + } +- dpb_wordcount_benchmark: { + dpb_service: { service_type: dataflow, worker_count: 1, worker_group: *two_core }, + flags: *myflags + } + + +# dpb_wordcount_benchmark: +# description: Run word count on dataflow +# dpb_service: +# service_type: dataflow +# worker_count: 1 +# worker_group: +# vm_spec: +# GCP: +# machine_type: n1-standard-4 +# disk_spec: +# GCP: +# disk_size: 300 + +# dpb_wordcount_benchmark: +# description: Run word count on dataflow +# dpb_service: +# service_type: dataflow +# worker_count: 1 +# worker_group: +# vm_spec: +# GCP: +# machine_type: n1-standard-4 +# disk_spec: +# GCP: +# disk_size: 300 +# flag_matrix: cross_runners +# flag_matrix_defs: +# cross_runners: +# dpb_dataflow_runner: [DataflowRunner, DirectRunner] diff --git a/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py b/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py index 63e9904de8..8d033a9a80 100644 --- a/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py @@ -146,7 +146,7 @@ def Run(benchmark_spec): base_out = FLAGS.dpb_dataflow_staging_location else: base_out = 'gs://{}'.format(FLAGS.dpb_wordcount_out_base) - job_arguments.append('--output={}/output/'.format(base_out)) + job_arguments.append('--output={}/output'.format(base_out)) else: # Use user-provided jar file if present; otherwise use the default example if not FLAGS.dpb_job_jarfile: @@ -159,10 +159,8 @@ def Run(benchmark_spec): # TODO (saksena): Finalize more stats to gather results = [] - metadata = copy.copy(dpb_service_instance.GetMetadata()) - metadata.update({'input_location': input_location}) - start = datetime.datetime.now() + start_time = datetime.datetime.now() dpb_service_instance.SubmitJob( jarfile=jarfile, classname=classname, @@ -170,8 +168,26 @@ def Run(benchmark_spec): job_stdout_file=stdout_file, job_type=job_type) end_time = datetime.datetime.now() - run_time = (end_time - start).total_seconds() + + # Update metadata after job run to get job id + metadata = copy.copy(dpb_service_instance.GetMetadata()) + metadata.update({'input_location': input_location}) + + run_time = (end_time - start_time).total_seconds() results.append(sample.Sample('run_time', run_time, 'seconds', metadata)) + + # TODO(odiego): Refactor to avoid explicit service type checks. + if dpb_service_instance.SERVICE_TYPE == dpb_service.DATAFLOW: + avg_cpu_util = dpb_service_instance.GetAvgCpuUtilization(start_time, end_time) + results.append(sample.Sample('avg_cpu_util', avg_cpu_util, '%', metadata)) + + stats = dpb_service_instance.GetStats() + for name, value in stats.items(): + results.append(sample.Sample(name, value, 'number', metadata)) + + total_cost = dpb_service_instance.CalculateCost() + results.append(sample.Sample('total_cost', total_cost, '$', metadata)) + return results diff --git a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py index 94eb9feea6..39dd2a39ed 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py +++ b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py @@ -11,15 +11,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Module containing class for GCP's dataflow service. +"""Module containing class for GCP's Dataflow service. +Use this module for running Dataflow jobs from compiled jar files. No Clusters can be created or destroyed, since it is a managed solution See details at: https://cloud.google.com/dataflow/ """ import os +import re +import time +import json +import logging +import datetime from absl import flags +from google.cloud import monitoring_v3 +from google.cloud.monitoring_v3.types import TimeInterval +from google.cloud.monitoring_v3.types import Aggregation from perfkitbenchmarker import beam_benchmark_helper from perfkitbenchmarker import dpb_service from perfkitbenchmarker import errors @@ -43,10 +52,31 @@ FLAGS = flags.FLAGS -GCP_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' - DATAFLOW_WC_INPUT = 'gs://dataflow-samples/shakespeare/kinglear.txt' +# Compute Engine CPU Monitoring API has up to 4 minute delay. +# See https://cloud.google.com/monitoring/api/metrics_gcp#gcp-compute +CPU_API_DELAY_MINUTES = 4 +CPU_API_DELAY_SECONDS = CPU_API_DELAY_MINUTES * 60 +# Dataflow Monitoring API has up to 3 minute delay. +# See https://cloud.google.com/monitoring/api/metrics_gcp#gcp-dataflow +DATAFLOW_METRICS_DELAY_MINUTES = 3 +DATAFLOW_METRICS_DELAY_SECONDS = DATAFLOW_METRICS_DELAY_MINUTES * 60 + +DATAFLOW_TYPE_BATCH = 'batch' +DATAFLOW_TYPE_STREAMING = 'streaming' + +METRIC_TYPE_COUNTER = 'counter' +METRIC_TYPE_DISTRIBUTION = 'distribution' + +# Dataflow resources cost factors (showing us-central-1 pricing). +# See https://cloud.google.com/dataflow/pricing#pricing-details +VCPU_PER_HR_BATCH = 0.056 +VCPU_PER_HR_STREAMING = 0.069 +MEM_PER_GB_HR_BATCH = 0.003557 +MEM_PER_GB_HR_STREAMING = 0.0035557 +PD_PER_GB_HR = 0.000054 +PD_SSD_PER_GB_HR = 0.000298 class GcpDpbDataflow(dpb_service.BaseDpbService): """Object representing GCP Dataflow Service.""" @@ -56,16 +86,11 @@ class GcpDpbDataflow(dpb_service.BaseDpbService): def __init__(self, dpb_service_spec): super(GcpDpbDataflow, self).__init__(dpb_service_spec) - self.project = None - - @staticmethod - def _GetStats(stdout): - """Get Stats. - - TODO(saksena): Hook up the metrics API of dataflow to retrieve performance - metrics when available - """ - pass + self.dpb_service_type = self.SERVICE_TYPE + self.project = FLAGS.project + self.job_id = None + self.job_metrics = None + self.job_stats = None @staticmethod def CheckPrerequisites(benchmark_config): @@ -116,7 +141,6 @@ def SubmitJob( disk_size_gb = None cmd = [] - # Needed to verify java executable is on the path dataflow_executable = 'java' if not vm_util.ExecutableOnPath(dataflow_executable): @@ -142,7 +166,17 @@ def SubmitJob( if disk_size_gb: cmd.append('--diskSizeGb={}'.format(disk_size_gb)) cmd.append('--defaultWorkerLogLevel={}'.format(FLAGS.dpb_log_level)) - _, _, _ = vm_util.IssueCommand(cmd) + cmd.append('--project={}'.format(self.project)) + _, stderr, _ = vm_util.IssueCommand(cmd) + + # Parse output to retrieve submitted job ID + match = re.search('Submitted job: (.\S*)', stderr) + if not match: + logging.warn('Dataflow output in unexpected format. Failed to parse Dataflow job ID.') + return + + self.job_id = match.group(1) + logging.info('Dataflow job ID: %s', self.job_id) def SetClusterProperty(self): pass @@ -152,4 +186,303 @@ def GetMetadata(self): basic_data = super(GcpDpbDataflow, self).GetMetadata() basic_data['dpb_dataflow_runner'] = FLAGS.dpb_dataflow_runner basic_data['dpb_dataflow_sdk'] = FLAGS.dpb_dataflow_sdk + basic_data['dpb_job_id'] = self.job_id return basic_data + + def GetJobStatus(self): + cmd = util.GcloudCommand(self, 'dataflow', 'jobs', 'show', self.job_id) + cmd.flags = { + 'project': self.project, + 'format': 'json', + } + + def GetStats(self): + """Collect series of relevant performance and cost stats.""" + stats = {} + stats['total_vcpu_time'] = self.GetMetricValue('TotalVcpuTime')/3600 # vCPU-hr + stats['total_mem_usage'] = self.GetMetricValue('TotalMemoryUsage')/1024/3600 # GB-hr + stats['total_pd_usage'] = self.GetMetricValue('TotalPdUsage')/3600 # GB-hr + # TODO(rarsan): retrieve BillableShuffleDataProcessed + # and/or BillableStreamingDataProcessed when applicable + self.job_stats = stats + return stats + + def CalculateCost(self, type=DATAFLOW_TYPE_BATCH): + if type not in (DATAFLOW_TYPE_BATCH, DATAFLOW_TYPE_STREAMING): + raise ValueError(f'Invalid type provided to CalculateCost(): {type}') + + if not self.job_stats: + self.GetStats() + + total_vcpu_time = self.job_stats['total_vcpu_time'] + total_mem_usage = self.job_stats['total_mem_usage'] + total_pd_usage = self.job_stats['total_pd_usage'] + + cost = 0 + if type == DATAFLOW_TYPE_BATCH: + cost += total_vcpu_time * VCPU_PER_HR_BATCH + cost += total_mem_usage * MEM_PER_GB_HR_BATCH + else: + cost += total_vcpu_time * VCPU_PER_HR_STREAMING + cost += total_mem_usage * MEM_PER_GB_HR_STREAMING + + cost += total_pd_usage * PD_PER_GB_HR + # TODO(rarsan): Add cost related to per-GB data processed by Dataflow Shuffle + # (for batch) or Streaming Engine (for streaming) when applicable + return cost + + def _PullJobMetrics(self, force_refresh=False): + """Retrieve and cache all job metrics from Dataflow API""" + # Skip if job metrics is already populated unless force_refresh is True + if self.job_metrics is not None and not force_refresh: + return + + cmd = util.GcloudCommand(self, 'dataflow', 'metrics', + 'list', self.job_id) + cmd.use_alpha_gcloud = True + cmd.flags = { + 'project': self.project, + 'region': util.GetRegionFromZone(FLAGS.dpb_service_zone), + 'format': 'json', + } + stdout, _, _ = cmd.Issue() + results = json.loads(stdout) + + counters = {} + distributions = {} + for metric in results: + if 'scalar' in metric: + counters[metric['name']['name']] = int(metric['scalar']) + elif 'distribution' in metric: + distributions[metric['name']['name']] = metric['distribution'] + else: + logging.warn(f'Unfamiliar metric type found: {metric}') + + self.job_metrics = { + METRIC_TYPE_COUNTER: counters, + METRIC_TYPE_DISTRIBUTION: distributions + } + + def GetMetricValue(self, name, type=METRIC_TYPE_COUNTER): + """Get value of a job's metric. + + Returns: + Integer if metric is of type counter + Dictionary if metric is of type distribution. Dictionary + contains keys such as count/max/mean/min/sum + """ + if type not in (METRIC_TYPE_COUNTER, METRIC_TYPE_DISTRIBUTION): + raise ValueError(f'Invalid type provided to GetMetricValue(): {type}') + + if self.job_metrics is None: + self._PullJobMetrics() + + return self.job_metrics[type][name] + + def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): + """Get average cpu utilization across all pipeline workers. + + Args: + start_time: datetime specifying the beginning of the time interval. + end_time: datetime specifying the end of the time interval. + + Returns: + Average value across time interval + """ + client = monitoring_v3.MetricServiceClient() + project_name = f"projects/{self.project}" + + now_seconds = int(time.time()) + start_time_seconds = int(start_time.timestamp()) + end_time_seconds = int(end_time.timestamp()) + # Cpu metrics data can take up to 240 seconds to appear + if (now_seconds - end_time_seconds) < CPU_API_DELAY_SECONDS: + logging.info('Waiting for CPU metrics to be available (up to 4 minutes)...') + time.sleep(CPU_API_DELAY_SECONDS - (now_seconds - end_time_seconds)) + + interval = TimeInterval( + { + "start_time": {"seconds": start_time_seconds}, + "end_time": {"seconds": end_time_seconds}, + } + ) + + api_filter = ( + 'metric.type = "compute.googleapis.com/instance/cpu/utilization" ' + f'AND resource.labels.project_id = "{self.project}" ' + f'AND metadata.user_labels.dataflow_job_id = "{self.job_id}" ' + ) + + aggregation = Aggregation( + { + "alignment_period": {"seconds": 60}, # 1 minute + "per_series_aligner": Aggregation.Aligner.ALIGN_MEAN, + "cross_series_reducer": Aggregation.Reducer.REDUCE_MEAN, + "group_by_fields": ["resource.instance_id"], + } + ) + + results = client.list_time_series( + request={ + "name": project_name, + "filter": api_filter, + "interval": interval, + "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + "aggregation": aggregation, + } + ) + + # if not results: + # raise Exception('No monitoring data found. Unable to calculate avg CPU utilization') + return self._GetAvgValueFromTimeSeries(results) + + def GetMaxOutputThroughput(self, ptransform: str, start_time: datetime, end_time: datetime): + """Get max throughput from a particular pTransform during job run interval. + + Args: + start_time: datetime specifying the beginning of the time interval. + end_time: datetime specifying the end of the time interval. + + Returns: + Max value across time interval + """ + client = monitoring_v3.MetricServiceClient() + project_name = f"projects/{self.project}" + + now_seconds = int(time.time()) + start_time_seconds = int(start_time.timestamp()) + end_time_seconds = int(end_time.timestamp()) + # Cpu metrics data can take up to 240 seconds to appear + if (now_seconds - end_time_seconds) < DATAFLOW_METRICS_DELAY_SECONDS: + logging.info('Waiting for Dataflow metrics to be available (up to 3 minutes)...') + time.sleep(DATAFLOW_METRICS_DELAY_SECONDS - (now_seconds - end_time_seconds)) + + interval = TimeInterval( + { + "start_time": {"seconds": start_time_seconds}, + "end_time": {"seconds": end_time_seconds}, + } + ) + + api_filter = ( + 'metric.type = "dataflow.googleapis.com/job/elements_produced_count" ' + f'AND resource.labels.project_id = "{self.project}" ' + f'AND metric.labels.job_id = "{self.job_id}" ' + f'AND metric.labels.ptransform = "{ptransform}" ' + ) + + aggregation = Aggregation( + { + "alignment_period": {"seconds": 60}, # 1 minute + "per_series_aligner": Aggregation.Aligner.ALIGN_RATE, + # "group_by_fields": ["metric.job_id", "metric.ptransform"], + } + ) + + results = client.list_time_series( + request={ + "name": project_name, + "filter": api_filter, + "interval": interval, + "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + "aggregation": aggregation + } + ) + + if not results: + logging.warn('No monitoring data found. Unable to calculate max throughput.') + return None + + return self._GetMaxValueFromTimeSeries(results) + + def GetSubscriptionBacklogSize(self, subscription_name, interval_length=4): + client = monitoring_v3.MetricServiceClient() + project_name = f"projects/{self.project}" + + now = time.time() + seconds = int(now) + + interval = TimeInterval( + { + "end_time": {"seconds": seconds}, + "start_time": {"seconds": seconds - interval_length * 60}, + } + ) + + api_filter = ( + 'metric.type = "pubsub.googleapis.com/subscription/num_undelivered_messages" ' + 'AND resource.labels.subscription_id = "' + subscription_name + '" ' + ) + + results = client.list_time_series( + request={ + "name": project_name, + "filter": api_filter, + "interval": interval, + "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + } + ) + + return self._GetLastValueFromTimeSeries(results) + + def _GetLastValueFromTimeSeries(self, time_series): + value = None + for i, time_interval in enumerate(time_series): + if i != 0: break + for j, snapshot in enumerate(time_interval.points): + if j != 0: break + value = snapshot.value.int64_value + + return value + + def _GetAvgValueFromTimeSeries(self, time_series): + """Parses time series data and returns average across intervals. + + Args: + time_series: time series of cpu fractional utilization returned by monitoring. + + Returns: + Average value across intervals + """ + points = [] + for time_interval in time_series: + for snapshot in time_interval.points: + points.append(snapshot.value.double_value) + + if points: + # Average over all minute intervals captured + averaged = sum(points) / len(points) + # If metric unit is a fractional number between 0 and 1 (e.g. CPU utilization metric) + # multiply by 100 to display a percentage usage. + if time_series.unit == "10^2.%": + averaged = round(averaged * 100, 2) + return averaged + + return None + + def _GetMaxValueFromTimeSeries(self, time_series): + """Parses time series data and returns maximum across intervals. + + Args: + time_series: time series of throughput rates returned by monitoring. + + Returns: + Maximum value across intervals + """ + points = [] + for time_interval in time_series: + for snapshot in time_interval.points: + points.append(snapshot.value.double_value) + + if points: + # Max over all minute intervals captured + max_rate = max(points) + # If metric unit is a fractional number between 0 and 1 (e.g. CPU utilization metric) + # multiply by 100 to display a percentage usage. + if time_series.unit == "10^2.%": + max_rate = round(max_rate * 100, 2) + else: + max_rate = round(max_rate, 2) + return max_rate + + return None \ No newline at end of file From 93f5ac5d53e7a3ee5e0cc3bef847a2c003d91f08 Mon Sep 17 00:00:00 2001 From: Roy Arsan Date: Fri, 12 Aug 2022 11:16:10 -0500 Subject: [PATCH 02/10] Remove commented lines, unused code and general style cleanup --- .../configs/dataflow_wordcount.yaml | 3 + .../dpb_wordcount_benchmark.py | 2 +- .../providers/gcp/gcp_dpb_dataflow.py | 177 +++--------------- 3 files changed, 28 insertions(+), 154 deletions(-) diff --git a/perfkitbenchmarker/configs/dataflow_wordcount.yaml b/perfkitbenchmarker/configs/dataflow_wordcount.yaml index 4ee2a9b3cb..391f43c2bc 100644 --- a/perfkitbenchmarker/configs/dataflow_wordcount.yaml +++ b/perfkitbenchmarker/configs/dataflow_wordcount.yaml @@ -54,6 +54,9 @@ benchmarks: } +################################# +# Alternative benchmark config examples +################################# # dpb_wordcount_benchmark: # description: Run word count on dataflow # dpb_service: diff --git a/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py b/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py index 8d033a9a80..2bf7ace1b2 100644 --- a/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py @@ -181,7 +181,7 @@ def Run(benchmark_spec): avg_cpu_util = dpb_service_instance.GetAvgCpuUtilization(start_time, end_time) results.append(sample.Sample('avg_cpu_util', avg_cpu_util, '%', metadata)) - stats = dpb_service_instance.GetStats() + stats = dpb_service_instance.job_stats for name, value in stats.items(): results.append(sample.Sample(name, value, 'number', metadata)) diff --git a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py index 39dd2a39ed..b59d2eb69c 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py +++ b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py @@ -18,6 +18,7 @@ See details at: https://cloud.google.com/dataflow/ """ +from functools import cached_property import os import re import time @@ -29,6 +30,7 @@ from google.cloud import monitoring_v3 from google.cloud.monitoring_v3.types import TimeInterval from google.cloud.monitoring_v3.types import Aggregation +from google.cloud.monitoring_v3.types import ListTimeSeriesResponse from perfkitbenchmarker import beam_benchmark_helper from perfkitbenchmarker import dpb_service from perfkitbenchmarker import errors @@ -90,7 +92,6 @@ def __init__(self, dpb_service_spec): self.project = FLAGS.project self.job_id = None self.job_metrics = None - self.job_stats = None @staticmethod def CheckPrerequisites(benchmark_config): @@ -189,14 +190,8 @@ def GetMetadata(self): basic_data['dpb_job_id'] = self.job_id return basic_data - def GetJobStatus(self): - cmd = util.GcloudCommand(self, 'dataflow', 'jobs', 'show', self.job_id) - cmd.flags = { - 'project': self.project, - 'format': 'json', - } - - def GetStats(self): + @cached_property + def job_stats(self): """Collect series of relevant performance and cost stats.""" stats = {} stats['total_vcpu_time'] = self.GetMetricValue('TotalVcpuTime')/3600 # vCPU-hr @@ -204,16 +199,12 @@ def GetStats(self): stats['total_pd_usage'] = self.GetMetricValue('TotalPdUsage')/3600 # GB-hr # TODO(rarsan): retrieve BillableShuffleDataProcessed # and/or BillableStreamingDataProcessed when applicable - self.job_stats = stats return stats def CalculateCost(self, type=DATAFLOW_TYPE_BATCH): if type not in (DATAFLOW_TYPE_BATCH, DATAFLOW_TYPE_STREAMING): raise ValueError(f'Invalid type provided to CalculateCost(): {type}') - if not self.job_stats: - self.GetStats() - total_vcpu_time = self.job_stats['total_vcpu_time'] total_mem_usage = self.job_stats['total_mem_usage'] total_pd_usage = self.job_stats['total_pd_usage'] @@ -236,6 +227,10 @@ def _PullJobMetrics(self, force_refresh=False): # Skip if job metrics is already populated unless force_refresh is True if self.job_metrics is not None and not force_refresh: return + # Skip if job id not available + if self.job_id is None: + logging.warn('Unable to pull job metrics. Job ID not available') + return cmd = util.GcloudCommand(self, 'dataflow', 'metrics', 'list', self.job_id) @@ -290,7 +285,7 @@ def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): Average value across time interval """ client = monitoring_v3.MetricServiceClient() - project_name = f"projects/{self.project}" + project_name = f'projects/{self.project}' now_seconds = int(time.time()) start_time_seconds = int(start_time.timestamp()) @@ -302,8 +297,8 @@ def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): interval = TimeInterval( { - "start_time": {"seconds": start_time_seconds}, - "end_time": {"seconds": end_time_seconds}, + 'start_time': {'seconds': start_time_seconds}, + 'end_time': {'seconds': end_time_seconds}, } ) @@ -315,127 +310,30 @@ def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): aggregation = Aggregation( { - "alignment_period": {"seconds": 60}, # 1 minute - "per_series_aligner": Aggregation.Aligner.ALIGN_MEAN, - "cross_series_reducer": Aggregation.Reducer.REDUCE_MEAN, - "group_by_fields": ["resource.instance_id"], + 'alignment_period': {'seconds': 60}, # 1 minute + 'per_series_aligner': Aggregation.Aligner.ALIGN_MEAN, + 'cross_series_reducer': Aggregation.Reducer.REDUCE_MEAN, + 'group_by_fields': ['resource.instance_id'], } ) results = client.list_time_series( request={ - "name": project_name, - "filter": api_filter, - "interval": interval, - "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, - "aggregation": aggregation, - } - ) - - # if not results: - # raise Exception('No monitoring data found. Unable to calculate avg CPU utilization') - return self._GetAvgValueFromTimeSeries(results) - - def GetMaxOutputThroughput(self, ptransform: str, start_time: datetime, end_time: datetime): - """Get max throughput from a particular pTransform during job run interval. - - Args: - start_time: datetime specifying the beginning of the time interval. - end_time: datetime specifying the end of the time interval. - - Returns: - Max value across time interval - """ - client = monitoring_v3.MetricServiceClient() - project_name = f"projects/{self.project}" - - now_seconds = int(time.time()) - start_time_seconds = int(start_time.timestamp()) - end_time_seconds = int(end_time.timestamp()) - # Cpu metrics data can take up to 240 seconds to appear - if (now_seconds - end_time_seconds) < DATAFLOW_METRICS_DELAY_SECONDS: - logging.info('Waiting for Dataflow metrics to be available (up to 3 minutes)...') - time.sleep(DATAFLOW_METRICS_DELAY_SECONDS - (now_seconds - end_time_seconds)) - - interval = TimeInterval( - { - "start_time": {"seconds": start_time_seconds}, - "end_time": {"seconds": end_time_seconds}, - } - ) - - api_filter = ( - 'metric.type = "dataflow.googleapis.com/job/elements_produced_count" ' - f'AND resource.labels.project_id = "{self.project}" ' - f'AND metric.labels.job_id = "{self.job_id}" ' - f'AND metric.labels.ptransform = "{ptransform}" ' - ) - - aggregation = Aggregation( - { - "alignment_period": {"seconds": 60}, # 1 minute - "per_series_aligner": Aggregation.Aligner.ALIGN_RATE, - # "group_by_fields": ["metric.job_id", "metric.ptransform"], - } - ) - - results = client.list_time_series( - request={ - "name": project_name, - "filter": api_filter, - "interval": interval, - "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, - "aggregation": aggregation + 'name': project_name, + 'filter': api_filter, + 'interval': interval, + 'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + 'aggregation': aggregation, } ) if not results: - logging.warn('No monitoring data found. Unable to calculate max throughput.') + logging.warn('No monitoring data found. Unable to calculate avg CPU utilization.') return None - return self._GetMaxValueFromTimeSeries(results) - - def GetSubscriptionBacklogSize(self, subscription_name, interval_length=4): - client = monitoring_v3.MetricServiceClient() - project_name = f"projects/{self.project}" - - now = time.time() - seconds = int(now) - - interval = TimeInterval( - { - "end_time": {"seconds": seconds}, - "start_time": {"seconds": seconds - interval_length * 60}, - } - ) - - api_filter = ( - 'metric.type = "pubsub.googleapis.com/subscription/num_undelivered_messages" ' - 'AND resource.labels.subscription_id = "' + subscription_name + '" ' - ) - - results = client.list_time_series( - request={ - "name": project_name, - "filter": api_filter, - "interval": interval, - "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, - } - ) - - return self._GetLastValueFromTimeSeries(results) - - def _GetLastValueFromTimeSeries(self, time_series): - value = None - for i, time_interval in enumerate(time_series): - if i != 0: break - for j, snapshot in enumerate(time_interval.points): - if j != 0: break - value = snapshot.value.int64_value - - return value + return self._GetAvgValueFromTimeSeries(results) - def _GetAvgValueFromTimeSeries(self, time_series): + def _GetAvgValueFromTimeSeries(self, time_series: ListTimeSeriesResponse): """Parses time series data and returns average across intervals. Args: @@ -458,31 +356,4 @@ def _GetAvgValueFromTimeSeries(self, time_series): averaged = round(averaged * 100, 2) return averaged - return None - - def _GetMaxValueFromTimeSeries(self, time_series): - """Parses time series data and returns maximum across intervals. - - Args: - time_series: time series of throughput rates returned by monitoring. - - Returns: - Maximum value across intervals - """ - points = [] - for time_interval in time_series: - for snapshot in time_interval.points: - points.append(snapshot.value.double_value) - - if points: - # Max over all minute intervals captured - max_rate = max(points) - # If metric unit is a fractional number between 0 and 1 (e.g. CPU utilization metric) - # multiply by 100 to display a percentage usage. - if time_series.unit == "10^2.%": - max_rate = round(max_rate * 100, 2) - else: - max_rate = round(max_rate, 2) - return max_rate - - return None \ No newline at end of file + return None From 1433c2f4a83aec2a6d26e8464f488b3484989b10 Mon Sep 17 00:00:00 2001 From: Roy Arsan Date: Fri, 12 Aug 2022 17:08:52 -0500 Subject: [PATCH 03/10] Change to generic Exception in case job ID is missing --- perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py index 481c92735f..2613012751 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py +++ b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py @@ -237,7 +237,7 @@ def _PullJobMetrics(self, force_refresh=False): return # Raise exception if job id not available if self.job_id is None: - raise ValueError('Unable to pull job metrics. Job ID not available') + raise Exception('Unable to pull job metrics. Job ID not available') cmd = util.GcloudCommand(self, 'dataflow', 'metrics', 'list', self.job_id) From 209f612314e95dcc1a97ab08fd1f0cfc4285b6db Mon Sep 17 00:00:00 2001 From: Roy Arsan Date: Mon, 15 Aug 2022 10:22:55 -0500 Subject: [PATCH 04/10] Minor linter fixes such removing trailing whitespace --- .../providers/gcp/gcp_dpb_dataflow.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py index 2613012751..8bba6f5f00 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py +++ b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py @@ -179,11 +179,11 @@ def SubmitJob( _, stderr, _ = vm_util.IssueCommand(cmd, timeout=FLAGS.dpb_dataflow_timeout) # Parse output to retrieve submitted job ID - match = re.search('Submitted job: (.\S*)', stderr) + match = re.search(r'Submitted job: (.\S*)', stderr) if not match: logging.warn('Dataflow output in unexpected format. Failed to parse Dataflow job ID.') return - + self.job_id = match.group(1) logging.info('Dataflow job ID: %s', self.job_id) @@ -224,7 +224,7 @@ def CalculateCost(self, type=DATAFLOW_TYPE_BATCH): else: cost += total_vcpu_time * VCPU_PER_HR_STREAMING cost += total_mem_usage * MEM_PER_GB_HR_STREAMING - + cost += total_pd_usage * PD_PER_GB_HR # TODO(rarsan): Add cost related to per-GB data processed by Dataflow Shuffle # (for batch) or Streaming Engine (for streaming) when applicable @@ -238,7 +238,7 @@ def _PullJobMetrics(self, force_refresh=False): # Raise exception if job id not available if self.job_id is None: raise Exception('Unable to pull job metrics. Job ID not available') - + cmd = util.GcloudCommand(self, 'dataflow', 'metrics', 'list', self.job_id) cmd.use_alpha_gcloud = True @@ -267,18 +267,18 @@ def _PullJobMetrics(self, force_refresh=False): def GetMetricValue(self, name, type=METRIC_TYPE_COUNTER): """Get value of a job's metric. - + Returns: Integer if metric is of type counter - Dictionary if metric is of type distribution. Dictionary + Dictionary if metric is of type distribution. Dictionary contains keys such as count/max/mean/min/sum """ if type not in (METRIC_TYPE_COUNTER, METRIC_TYPE_DISTRIBUTION): raise ValueError(f'Invalid type provided to GetMetricValue(): {type}') - + if self.job_metrics is None: self._PullJobMetrics() - + return self.job_metrics[type][name] def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): @@ -293,7 +293,7 @@ def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): """ client = monitoring_v3.MetricServiceClient() project_name = f'projects/{self.project}' - + now_seconds = int(time.time()) start_time_seconds = int(start_time.timestamp()) end_time_seconds = int(end_time.timestamp()) @@ -362,5 +362,5 @@ def _GetAvgValueFromTimeSeries(self, time_series: ListTimeSeriesResponse): if time_series.unit == "10^2.%": averaged = round(averaged * 100, 2) return averaged - + return None From 30aea15d8aa755a028dfbeaea2b92daae5d5057c Mon Sep 17 00:00:00 2001 From: Roy Arsan Date: Mon, 15 Aug 2022 15:13:29 -0500 Subject: [PATCH 05/10] Formatting fixes thanks to @dorellang --- .../providers/gcp/gcp_dpb_dataflow.py | 111 ++++++++++-------- 1 file changed, 62 insertions(+), 49 deletions(-) diff --git a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py index 8bba6f5f00..f6ce2aaae7 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py +++ b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py @@ -12,25 +12,26 @@ # See the License for the specific language governing permissions and # limitations under the License. """Module containing class for GCP's Dataflow service. + Use this module for running Dataflow jobs from compiled jar files. No Clusters can be created or destroyed, since it is a managed solution See details at: https://cloud.google.com/dataflow/ """ -from functools import cached_property +import datetime +import functools +import json +import logging import os import re import time -import json -import logging -import datetime from absl import flags from google.cloud import monitoring_v3 -from google.cloud.monitoring_v3.types import TimeInterval from google.cloud.monitoring_v3.types import Aggregation from google.cloud.monitoring_v3.types import ListTimeSeriesResponse +from google.cloud.monitoring_v3.types import TimeInterval from perfkitbenchmarker import beam_benchmark_helper from perfkitbenchmarker import dpb_service from perfkitbenchmarker import errors @@ -84,6 +85,7 @@ PD_PER_GB_HR = 0.000054 PD_SSD_PER_GB_HR = 0.000298 + class GcpDpbDataflow(dpb_service.BaseDpbService): """Object representing GCP Dataflow Service.""" @@ -181,15 +183,14 @@ def SubmitJob( # Parse output to retrieve submitted job ID match = re.search(r'Submitted job: (.\S*)', stderr) if not match: - logging.warn('Dataflow output in unexpected format. Failed to parse Dataflow job ID.') + logging.warning( + 'Dataflow output in unexpected format. Failed to parse Dataflow job ' + 'ID.') return self.job_id = match.group(1) logging.info('Dataflow job ID: %s', self.job_id) - def SetClusterProperty(self): - pass - def GetMetadata(self): """Return a dictionary of the metadata for this cluster.""" basic_data = super(GcpDpbDataflow, self).GetMetadata() @@ -198,40 +199,45 @@ def GetMetadata(self): basic_data['dpb_job_id'] = self.job_id return basic_data - @cached_property + @functools.cached_property def job_stats(self): """Collect series of relevant performance and cost stats.""" stats = {} - stats['total_vcpu_time'] = self.GetMetricValue('TotalVcpuTime')/3600 # vCPU-hr - stats['total_mem_usage'] = self.GetMetricValue('TotalMemoryUsage')/1024/3600 # GB-hr - stats['total_pd_usage'] = self.GetMetricValue('TotalPdUsage')/3600 # GB-hr + # vCPU-hr + stats['total_vcpu_time'] = self.GetMetricValue('TotalVcpuTime') / 3600 + # GB-hr + stats['total_mem_usage'] = self.GetMetricValue( + 'TotalMemoryUsage') / 1024 / 3600 + # GB-hr + stats['total_pd_usage'] = self.GetMetricValue('TotalPdUsage') / 3600 # TODO(rarsan): retrieve BillableShuffleDataProcessed # and/or BillableStreamingDataProcessed when applicable return stats - def CalculateCost(self, type=DATAFLOW_TYPE_BATCH): - if type not in (DATAFLOW_TYPE_BATCH, DATAFLOW_TYPE_STREAMING): - raise ValueError(f'Invalid type provided to CalculateCost(): {type}') + def CalculateCost(self, pricing_type=DATAFLOW_TYPE_BATCH): + if pricing_type not in (DATAFLOW_TYPE_BATCH, DATAFLOW_TYPE_STREAMING): + raise ValueError( + f'Invalid type provided to CalculateCost(): {pricing_type}') total_vcpu_time = self.job_stats['total_vcpu_time'] total_mem_usage = self.job_stats['total_mem_usage'] total_pd_usage = self.job_stats['total_pd_usage'] cost = 0 - if type == DATAFLOW_TYPE_BATCH: - cost += total_vcpu_time * VCPU_PER_HR_BATCH - cost += total_mem_usage * MEM_PER_GB_HR_BATCH + if pricing_type == DATAFLOW_TYPE_BATCH: + cost += total_vcpu_time * VCPU_PER_HR_BATCH + cost += total_mem_usage * MEM_PER_GB_HR_BATCH else: cost += total_vcpu_time * VCPU_PER_HR_STREAMING cost += total_mem_usage * MEM_PER_GB_HR_STREAMING cost += total_pd_usage * PD_PER_GB_HR - # TODO(rarsan): Add cost related to per-GB data processed by Dataflow Shuffle - # (for batch) or Streaming Engine (for streaming) when applicable + # TODO(rarsan): Add cost related to per-GB data processed by Dataflow + # Shuffle (for batch) or Streaming Engine (for streaming) when applicable return cost def _PullJobMetrics(self, force_refresh=False): - """Retrieve and cache all job metrics from Dataflow API""" + """Retrieve and cache all job metrics from Dataflow API.""" # Skip if job metrics is already populated unless force_refresh is True if self.job_metrics is not None and not force_refresh: return @@ -240,7 +246,7 @@ def _PullJobMetrics(self, force_refresh=False): raise Exception('Unable to pull job metrics. Job ID not available') cmd = util.GcloudCommand(self, 'dataflow', 'metrics', - 'list', self.job_id) + 'list', self.job_id) cmd.use_alpha_gcloud = True cmd.flags = { 'project': self.project, @@ -258,28 +264,33 @@ def _PullJobMetrics(self, force_refresh=False): elif 'distribution' in metric: distributions[metric['name']['name']] = metric['distribution'] else: - logging.warn(f'Unfamiliar metric type found: {metric}') + logging.warning('Unfamiliar metric type found: %s', metric) self.job_metrics = { - METRIC_TYPE_COUNTER: counters, - METRIC_TYPE_DISTRIBUTION: distributions + METRIC_TYPE_COUNTER: counters, + METRIC_TYPE_DISTRIBUTION: distributions, } - def GetMetricValue(self, name, type=METRIC_TYPE_COUNTER): + def GetMetricValue(self, name, metric_type=METRIC_TYPE_COUNTER): """Get value of a job's metric. + Args: + name: The name of the metric. + metric_type: Either METRIC_TYPE_COUNTER or METRIC_TYPE_DISTRIBUTION. + Returns: - Integer if metric is of type counter - Dictionary if metric is of type distribution. Dictionary - contains keys such as count/max/mean/min/sum + An int if metric is of type counter or a dict if metric is of + type distribution. The dictionary contains keys such as + count/max/mean/min/sum. """ if type not in (METRIC_TYPE_COUNTER, METRIC_TYPE_DISTRIBUTION): - raise ValueError(f'Invalid type provided to GetMetricValue(): {type}') + raise ValueError( + f'Invalid type provided to GetMetricValue(): {metric_type}') if self.job_metrics is None: self._PullJobMetrics() - return self.job_metrics[type][name] + return self.job_metrics[metric_type][name] def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): """Get average cpu utilization across all pipeline workers. @@ -299,7 +310,8 @@ def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): end_time_seconds = int(end_time.timestamp()) # Cpu metrics data can take up to 240 seconds to appear if (now_seconds - end_time_seconds) < CPU_API_DELAY_SECONDS: - logging.info('Waiting for CPU metrics to be available (up to 4 minutes)...') + logging.info( + 'Waiting for CPU metrics to be available (up to 4 minutes)...') time.sleep(CPU_API_DELAY_SECONDS - (now_seconds - end_time_seconds)) interval = TimeInterval( @@ -310,10 +322,9 @@ def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): ) api_filter = ( - 'metric.type = "compute.googleapis.com/instance/cpu/utilization" ' - f'AND resource.labels.project_id = "{self.project}" ' - f'AND metadata.user_labels.dataflow_job_id = "{self.job_id}" ' - ) + 'metric.type = "compute.googleapis.com/instance/cpu/utilization" ' + f'AND resource.labels.project_id = "{self.project}" ' + f'AND metadata.user_labels.dataflow_job_id = "{self.job_id}" ') aggregation = Aggregation( { @@ -325,17 +336,18 @@ def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): ) results = client.list_time_series( - request={ - 'name': project_name, - 'filter': api_filter, - 'interval': interval, - 'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, - 'aggregation': aggregation, - } + request={ + 'name': project_name, + 'filter': api_filter, + 'interval': interval, + 'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + 'aggregation': aggregation, + } ) if not results: - logging.warn('No monitoring data found. Unable to calculate avg CPU utilization.') + logging.warning( + 'No monitoring data found. Unable to calculate avg CPU utilization.') return None return self._GetAvgValueFromTimeSeries(results) @@ -344,7 +356,8 @@ def _GetAvgValueFromTimeSeries(self, time_series: ListTimeSeriesResponse): """Parses time series data and returns average across intervals. Args: - time_series: time series of cpu fractional utilization returned by monitoring. + time_series: time series of cpu fractional utilization returned by + monitoring. Returns: Average value across intervals @@ -357,9 +370,9 @@ def _GetAvgValueFromTimeSeries(self, time_series: ListTimeSeriesResponse): if points: # Average over all minute intervals captured averaged = sum(points) / len(points) - # If metric unit is a fractional number between 0 and 1 (e.g. CPU utilization metric) - # multiply by 100 to display a percentage usage. - if time_series.unit == "10^2.%": + # If metric unit is a fractional number between 0 and 1 (e.g. CPU + # utilization metric) multiply by 100 to display a percentage usage. + if time_series.unit == '10^2.%': averaged = round(averaged * 100, 2) return averaged From 061762a9c2cd01ef9f32c55fa7a714d755c35a8b Mon Sep 17 00:00:00 2001 From: Roy Arsan Date: Mon, 15 Aug 2022 15:42:08 -0500 Subject: [PATCH 06/10] Fix typo --- perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py index f6ce2aaae7..da42df827d 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py +++ b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py @@ -283,7 +283,7 @@ def GetMetricValue(self, name, metric_type=METRIC_TYPE_COUNTER): type distribution. The dictionary contains keys such as count/max/mean/min/sum. """ - if type not in (METRIC_TYPE_COUNTER, METRIC_TYPE_DISTRIBUTION): + if metric_type not in (METRIC_TYPE_COUNTER, METRIC_TYPE_DISTRIBUTION): raise ValueError( f'Invalid type provided to GetMetricValue(): {metric_type}') From 7fa1f4fb5a67cb0c9d546f39eca859208f29c54e Mon Sep 17 00:00:00 2001 From: Roy Arsan Date: Mon, 15 Aug 2022 16:29:33 -0500 Subject: [PATCH 07/10] Fix linting error with benchmark code --- perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py b/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py index 2bf7ace1b2..5958054ed2 100644 --- a/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py @@ -178,7 +178,8 @@ def Run(benchmark_spec): # TODO(odiego): Refactor to avoid explicit service type checks. if dpb_service_instance.SERVICE_TYPE == dpb_service.DATAFLOW: - avg_cpu_util = dpb_service_instance.GetAvgCpuUtilization(start_time, end_time) + avg_cpu_util = dpb_service_instance.GetAvgCpuUtilization( + start_time, end_time) results.append(sample.Sample('avg_cpu_util', avg_cpu_util, '%', metadata)) stats = dpb_service_instance.job_stats From 375e08376caaf61f0da9debff50097090078f76b Mon Sep 17 00:00:00 2001 From: Roy Arsan Date: Wed, 17 Aug 2022 13:08:39 -0500 Subject: [PATCH 08/10] Fix dataflow stats to work with older monitoring_v3 v0.31.1 (PBK current dependency, version-pinned for now) --- .../providers/gcp/gcp_dpb_dataflow.py | 43 ++++++++----------- requirements.txt | 2 +- 2 files changed, 20 insertions(+), 25 deletions(-) diff --git a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py index da42df827d..18b8ee1983 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py +++ b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py @@ -29,6 +29,7 @@ from absl import flags from google.cloud import monitoring_v3 +from google.cloud.monitoring_v3 import enums from google.cloud.monitoring_v3.types import Aggregation from google.cloud.monitoring_v3.types import ListTimeSeriesResponse from google.cloud.monitoring_v3.types import TimeInterval @@ -314,12 +315,13 @@ def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): 'Waiting for CPU metrics to be available (up to 4 minutes)...') time.sleep(CPU_API_DELAY_SECONDS - (now_seconds - end_time_seconds)) - interval = TimeInterval( - { - 'start_time': {'seconds': start_time_seconds}, - 'end_time': {'seconds': end_time_seconds}, - } - ) + interval = TimeInterval() + # Shift TZ of datetime arguments since FromDatetime() assumes UTC + # See: https://googleapis.dev/python/protobuf/latest/google/protobuf/timestamp_pb2.html#google.protobuf.timestamp_pb2.Timestamp.FromDatetime + interval.start_time.FromDatetime( + start_time.astimezone(datetime.timezone.utc)) + interval.end_time.FromDatetime( + end_time.astimezone(datetime.timezone.utc)) api_filter = ( 'metric.type = "compute.googleapis.com/instance/cpu/utilization" ' @@ -327,22 +329,18 @@ def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): f'AND metadata.user_labels.dataflow_job_id = "{self.job_id}" ') aggregation = Aggregation( - { - 'alignment_period': {'seconds': 60}, # 1 minute - 'per_series_aligner': Aggregation.Aligner.ALIGN_MEAN, - 'cross_series_reducer': Aggregation.Reducer.REDUCE_MEAN, - 'group_by_fields': ['resource.instance_id'], - } + alignment_period={'seconds': 60}, # 1 minute + per_series_aligner=Aggregation.Aligner.ALIGN_MEAN, + cross_series_reducer=Aggregation.Reducer.REDUCE_MEAN, + group_by_fields=['resource.instance_id'], ) results = client.list_time_series( - request={ - 'name': project_name, - 'filter': api_filter, - 'interval': interval, - 'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, - 'aggregation': aggregation, - } + name=project_name, + filter_=api_filter, + interval=interval, + view=enums.ListTimeSeriesRequest.TimeSeriesView.FULL, + aggregation=aggregation, ) if not results: @@ -350,7 +348,8 @@ def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): 'No monitoring data found. Unable to calculate avg CPU utilization.') return None - return self._GetAvgValueFromTimeSeries(results) + # Multiply fractional cpu util by 100 to display a percentage usage + return round(self._GetAvgValueFromTimeSeries(results) * 100, 2) def _GetAvgValueFromTimeSeries(self, time_series: ListTimeSeriesResponse): """Parses time series data and returns average across intervals. @@ -370,10 +369,6 @@ def _GetAvgValueFromTimeSeries(self, time_series: ListTimeSeriesResponse): if points: # Average over all minute intervals captured averaged = sum(points) / len(points) - # If metric unit is a fractional number between 0 and 1 (e.g. CPU - # utilization metric) multiply by 100 to display a percentage usage. - if time_series.unit == '10^2.%': - averaged = round(averaged * 100, 2) return averaged return None diff --git a/requirements.txt b/requirements.txt index b0a05dcb87..662c505a47 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,7 +27,7 @@ six>=1.13.0 pywinrm timeout-decorator google-cloud-datastore -google-cloud-monitoring +google-cloud-monitoring==0.31.1 beautifulsoup4 requests From 783c8fdc9f060757e68333e6a2787de82e46aaae Mon Sep 17 00:00:00 2001 From: Roy Arsan Date: Wed, 17 Aug 2022 15:47:25 -0500 Subject: [PATCH 09/10] Remove unused variable --- perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py index 18b8ee1983..738dc3f5b3 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py +++ b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py @@ -307,7 +307,6 @@ def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): project_name = f'projects/{self.project}' now_seconds = int(time.time()) - start_time_seconds = int(start_time.timestamp()) end_time_seconds = int(end_time.timestamp()) # Cpu metrics data can take up to 240 seconds to appear if (now_seconds - end_time_seconds) < CPU_API_DELAY_SECONDS: From 6b05b7183c791502d4249f97d6a89d28bb227409 Mon Sep 17 00:00:00 2001 From: Roy Arsan Date: Wed, 17 Aug 2022 15:49:58 -0500 Subject: [PATCH 10/10] Avoid long line lint error in comment --- perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py index 738dc3f5b3..5e72577965 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py +++ b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py @@ -316,7 +316,8 @@ def GetAvgCpuUtilization(self, start_time: datetime, end_time: datetime): interval = TimeInterval() # Shift TZ of datetime arguments since FromDatetime() assumes UTC - # See: https://googleapis.dev/python/protobuf/latest/google/protobuf/timestamp_pb2.html#google.protobuf.timestamp_pb2.Timestamp.FromDatetime + # See + # https://googleapis.dev/python/protobuf/latest/google/protobuf/timestamp_pb2.html#google.protobuf.timestamp_pb2.Timestamp.FromDatetime interval.start_time.FromDatetime( start_time.astimezone(datetime.timezone.utc)) interval.end_time.FromDatetime(