diff --git a/perfkitbenchmarker/configs/dataflow_wordcount.yaml b/perfkitbenchmarker/configs/dataflow_wordcount.yaml new file mode 100644 index 0000000000..391f43c2bc --- /dev/null +++ b/perfkitbenchmarker/configs/dataflow_wordcount.yaml @@ -0,0 +1,88 @@ +################################# +# Worker machine configs +################################# +eight_core: &eight_core + vm_spec: + GCP: + machine_type: n1-standard-8 + disk_spec: + GCP: + disk_size: 300 + +four_core: &four_core + vm_spec: + GCP: + machine_type: n1-standard-4 + disk_spec: + GCP: + disk_size: 300 + +two_core: &two_core + vm_spec: + GCP: + machine_type: n1-standard-2 + disk_spec: + GCP: + disk_size: 300 + +################################################################## +# Benchmark flags specifying Dataflow template and parameters +################################################################## +flags: &myflags + dpb_service_zone: us-central1-a + dpb_wordcount_out_base: + dpb_dataflow_temp_location: gs:///temp + dpb_dataflow_staging_location: gs:///temp + dpb_job_jarfile: ./word-count-beam/target/word-count-beam-bundled-0.1.jar + dpb_job_classname: org.apache.beam.examples.WordCount + +################################# +# Benchmark variations to run +################################# +benchmarks: +- dpb_wordcount_benchmark: { + dpb_service: { service_type: dataflow, worker_count: 1, worker_group: *eight_core }, + flags: *myflags + } +- dpb_wordcount_benchmark: { + dpb_service: { service_type: dataflow, worker_count: 1, worker_group: *four_core }, + flags: *myflags + } +- dpb_wordcount_benchmark: { + dpb_service: { service_type: dataflow, worker_count: 1, worker_group: *two_core }, + flags: *myflags + } + + +################################# +# Alternative benchmark config examples +################################# +# dpb_wordcount_benchmark: +# description: Run word count on dataflow +# dpb_service: +# service_type: dataflow +# worker_count: 1 +# worker_group: +# vm_spec: +# GCP: +# machine_type: n1-standard-4 +# disk_spec: +# GCP: +# disk_size: 300 + +# dpb_wordcount_benchmark: +# description: Run word count on dataflow +# dpb_service: +# service_type: dataflow +# worker_count: 1 +# worker_group: +# vm_spec: +# GCP: +# machine_type: n1-standard-4 +# disk_spec: +# GCP: +# disk_size: 300 +# flag_matrix: cross_runners +# flag_matrix_defs: +# cross_runners: +# dpb_dataflow_runner: [DataflowRunner, DirectRunner] diff --git a/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py b/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py index 63e9904de8..5958054ed2 100644 --- a/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py @@ -146,7 +146,7 @@ def Run(benchmark_spec): base_out = FLAGS.dpb_dataflow_staging_location else: base_out = 'gs://{}'.format(FLAGS.dpb_wordcount_out_base) - job_arguments.append('--output={}/output/'.format(base_out)) + job_arguments.append('--output={}/output'.format(base_out)) else: # Use user-provided jar file if present; otherwise use the default example if not FLAGS.dpb_job_jarfile: @@ -159,10 +159,8 @@ def Run(benchmark_spec): # TODO (saksena): Finalize more stats to gather results = [] - metadata = copy.copy(dpb_service_instance.GetMetadata()) - metadata.update({'input_location': input_location}) - start = datetime.datetime.now() + start_time = datetime.datetime.now() dpb_service_instance.SubmitJob( jarfile=jarfile, classname=classname, @@ -170,8 +168,27 @@ def Run(benchmark_spec): job_stdout_file=stdout_file, job_type=job_type) end_time = datetime.datetime.now() - run_time = (end_time - start).total_seconds() + + # Update metadata after job run to get job id + metadata = copy.copy(dpb_service_instance.GetMetadata()) + metadata.update({'input_location': input_location}) + + run_time = (end_time - start_time).total_seconds() results.append(sample.Sample('run_time', run_time, 'seconds', metadata)) + + # TODO(odiego): Refactor to avoid explicit service type checks. + if dpb_service_instance.SERVICE_TYPE == dpb_service.DATAFLOW: + avg_cpu_util = dpb_service_instance.GetAvgCpuUtilization( + start_time, end_time) + results.append(sample.Sample('avg_cpu_util', avg_cpu_util, '%', metadata)) + + stats = dpb_service_instance.job_stats + for name, value in stats.items(): + results.append(sample.Sample(name, value, 'number', metadata)) + + total_cost = dpb_service_instance.CalculateCost() + results.append(sample.Sample('total_cost', total_cost, '$', metadata)) + return results diff --git a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py index c20fed7cb8..285bdd425a 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py +++ b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataflow.py @@ -11,15 +11,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Module containing class for GCP's dataflow service. +"""Module containing class for GCP's Dataflow service. + +Use this module for running Dataflow jobs from compiled jar files. No Clusters can be created or destroyed, since it is a managed solution See details at: https://cloud.google.com/dataflow/ """ +import datetime +import functools +import json +import logging import os +import re +import time from absl import flags +from google.cloud import monitoring_v3 +from google.cloud.monitoring_v3 import types from perfkitbenchmarker import beam_benchmark_helper from perfkitbenchmarker import dpb_service from perfkitbenchmarker import errors @@ -47,10 +57,32 @@ FLAGS = flags.FLAGS -GCP_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' - DATAFLOW_WC_INPUT = 'gs://dataflow-samples/shakespeare/kinglear.txt' +# Compute Engine CPU Monitoring API has up to 4 minute delay. +# See https://cloud.google.com/monitoring/api/metrics_gcp#gcp-compute +CPU_API_DELAY_MINUTES = 4 +CPU_API_DELAY_SECONDS = CPU_API_DELAY_MINUTES * 60 +# Dataflow Monitoring API has up to 3 minute delay. +# See https://cloud.google.com/monitoring/api/metrics_gcp#gcp-dataflow +DATAFLOW_METRICS_DELAY_MINUTES = 3 +DATAFLOW_METRICS_DELAY_SECONDS = DATAFLOW_METRICS_DELAY_MINUTES * 60 + +DATAFLOW_TYPE_BATCH = 'batch' +DATAFLOW_TYPE_STREAMING = 'streaming' + +METRIC_TYPE_COUNTER = 'counter' +METRIC_TYPE_DISTRIBUTION = 'distribution' + +# Dataflow resources cost factors (showing us-central-1 pricing). +# See https://cloud.google.com/dataflow/pricing#pricing-details +VCPU_PER_HR_BATCH = 0.056 +VCPU_PER_HR_STREAMING = 0.069 +MEM_PER_GB_HR_BATCH = 0.003557 +MEM_PER_GB_HR_STREAMING = 0.0035557 +PD_PER_GB_HR = 0.000054 +PD_SSD_PER_GB_HR = 0.000298 + class GcpDpbDataflow(dpb_service.BaseDpbService): """Object representing GCP Dataflow Service.""" @@ -60,16 +92,10 @@ class GcpDpbDataflow(dpb_service.BaseDpbService): def __init__(self, dpb_service_spec): super(GcpDpbDataflow, self).__init__(dpb_service_spec) - self.project = None - - @staticmethod - def _GetStats(stdout): - """Get Stats. - - TODO(saksena): Hook up the metrics API of dataflow to retrieve performance - metrics when available - """ - pass + self.dpb_service_type = self.SERVICE_TYPE + self.project = FLAGS.project + self.job_id = None + self.job_metrics = None @staticmethod def CheckPrerequisites(benchmark_config): @@ -120,7 +146,6 @@ def SubmitJob( disk_size_gb = None cmd = [] - # Needed to verify java executable is on the path dataflow_executable = 'java' if not vm_util.ExecutableOnPath(dataflow_executable): @@ -146,18 +171,206 @@ def SubmitJob( if disk_size_gb: cmd.append('--diskSizeGb={}'.format(disk_size_gb)) cmd.append('--defaultWorkerLogLevel={}'.format(FLAGS.dpb_log_level)) + cmd.append('--project={}'.format(self.project)) if FLAGS.dpb_dataflow_additional_args: cmd.extend(FLAGS.dpb_dataflow_additional_args) - vm_util.IssueCommand(cmd, timeout=FLAGS.dpb_dataflow_timeout) + _, stderr, _ = vm_util.IssueCommand(cmd, timeout=FLAGS.dpb_dataflow_timeout) - def SetClusterProperty(self): - pass + # Parse output to retrieve submitted job ID + match = re.search(r'Submitted job: (.\S*)', stderr) + if not match: + logging.warning( + 'Dataflow output in unexpected format. Failed to parse Dataflow job ' + 'ID.') + return + + self.job_id = match.group(1) + logging.info('Dataflow job ID: %s', self.job_id) def GetMetadata(self): """Return a dictionary of the metadata for this cluster.""" basic_data = super(GcpDpbDataflow, self).GetMetadata() basic_data['dpb_dataflow_runner'] = FLAGS.dpb_dataflow_runner basic_data['dpb_dataflow_sdk'] = FLAGS.dpb_dataflow_sdk + basic_data['dpb_job_id'] = self.job_id return basic_data + + @functools.cached_property + def job_stats(self): + """Collect series of relevant performance and cost stats.""" + stats = {} + # vCPU-hr + stats['total_vcpu_time'] = self.GetMetricValue('TotalVcpuTime') / 3600 + # GB-hr + stats['total_mem_usage'] = self.GetMetricValue( + 'TotalMemoryUsage') / 1024 / 3600 + # GB-hr + stats['total_pd_usage'] = self.GetMetricValue('TotalPdUsage') / 3600 + # TODO(user): retrieve BillableShuffleDataProcessed + # and/or BillableStreamingDataProcessed when applicable + return stats + + def CalculateCost(self, pricing_type=DATAFLOW_TYPE_BATCH): + if pricing_type not in (DATAFLOW_TYPE_BATCH, DATAFLOW_TYPE_STREAMING): + raise ValueError( + f'Invalid type provided to CalculateCost(): {pricing_type}') + + # For some reason, pytype doesn't play well with functools.cached_property + # pytype: disable=unsupported-operands + total_vcpu_time = self.job_stats['total_vcpu_time'] + total_mem_usage = self.job_stats['total_mem_usage'] + total_pd_usage = self.job_stats['total_pd_usage'] + # pytype: enable=unsupported-operands + + cost = 0 + if pricing_type == DATAFLOW_TYPE_BATCH: + cost += total_vcpu_time * VCPU_PER_HR_BATCH + cost += total_mem_usage * MEM_PER_GB_HR_BATCH + else: + cost += total_vcpu_time * VCPU_PER_HR_STREAMING + cost += total_mem_usage * MEM_PER_GB_HR_STREAMING + + cost += total_pd_usage * PD_PER_GB_HR + # TODO(user): Add cost related to per-GB data processed by Dataflow + # Shuffle (for batch) or Streaming Engine (for streaming) when applicable + return cost + + def _PullJobMetrics(self, force_refresh=False): + """Retrieve and cache all job metrics from Dataflow API.""" + # Skip if job metrics is already populated unless force_refresh is True + if self.job_metrics is not None and not force_refresh: + return + # Raise exception if job id not available + if self.job_id is None: + raise Exception('Unable to pull job metrics. Job ID not available') + + cmd = util.GcloudCommand(self, 'dataflow', 'metrics', + 'list', self.job_id) + cmd.use_alpha_gcloud = True + cmd.flags = { + 'project': self.project, + 'region': util.GetRegionFromZone(FLAGS.dpb_service_zone), + 'format': 'json', + } + stdout, _, _ = cmd.Issue() + results = json.loads(stdout) + + counters = {} + distributions = {} + for metric in results: + if 'scalar' in metric: + counters[metric['name']['name']] = int(metric['scalar']) + elif 'distribution' in metric: + distributions[metric['name']['name']] = metric['distribution'] + else: + logging.warning('Unfamiliar metric type found: %s', metric) + + self.job_metrics = { + METRIC_TYPE_COUNTER: counters, + METRIC_TYPE_DISTRIBUTION: distributions, + } + + def GetMetricValue(self, name, metric_type=METRIC_TYPE_COUNTER): + """Get value of a job's metric. + + Args: + name: The name of the metric. + metric_type: Either METRIC_TYPE_COUNTER or METRIC_TYPE_DISTRIBUTION. + + Returns: + An int if metric is of type counter or a dict if metric is of + type distribution. The dictionary contains keys such as + count/max/mean/min/sum. + """ + if metric_type not in (METRIC_TYPE_COUNTER, METRIC_TYPE_DISTRIBUTION): + raise ValueError( + f'Invalid type provided to GetMetricValue(): {metric_type}') + + if self.job_metrics is None: + self._PullJobMetrics() + + return self.job_metrics[metric_type][name] + + def GetAvgCpuUtilization( + self, start_time: datetime.datetime, end_time: datetime.datetime): + """Get average cpu utilization across all pipeline workers. + + Args: + start_time: datetime specifying the beginning of the time interval. + end_time: datetime specifying the end of the time interval. + + Returns: + Average value across time interval + """ + client = monitoring_v3.MetricServiceClient() + project_name = f'projects/{self.project}' + + now_seconds = int(time.time()) + end_time_seconds = int(end_time.timestamp()) + # Cpu metrics data can take up to 240 seconds to appear + if (now_seconds - end_time_seconds) < CPU_API_DELAY_SECONDS: + logging.info( + 'Waiting for CPU metrics to be available (up to 4 minutes)...') + time.sleep(CPU_API_DELAY_SECONDS - (now_seconds - end_time_seconds)) + + interval = types.TimeInterval() + # Shift TZ of datetime arguments since FromDatetime() assumes UTC + # See + # https://googleapis.dev/python/protobuf/latest/google/protobuf/timestamp_pb2.html#google.protobuf.timestamp_pb2.Timestamp.FromDatetime + interval.start_time.FromDatetime( + start_time.astimezone(datetime.timezone.utc)) + interval.end_time.FromDatetime( + end_time.astimezone(datetime.timezone.utc)) + + api_filter = ( + 'metric.type = "compute.googleapis.com/instance/cpu/utilization" ' + f'AND resource.labels.project_id = "{self.project}" ' + f'AND metadata.user_labels.dataflow_job_id = "{self.job_id}" ') + + aggregation = types.Aggregation( + alignment_period={'seconds': 60}, # 1 minute + per_series_aligner=types.Aggregation.Aligner.ALIGN_MEAN, + cross_series_reducer=types.Aggregation.Reducer.REDUCE_MEAN, + group_by_fields=['resource.instance_id'], + ) + + results = client.list_time_series( + name=project_name, + filter_=api_filter, + interval=interval, + view=monitoring_v3.enums.ListTimeSeriesRequest.TimeSeriesView.FULL, + aggregation=aggregation, + ) + + if not results: + logging.warning( + 'No monitoring data found. Unable to calculate avg CPU utilization.') + return None + + # Multiply fractional cpu util by 100 to display a percentage usage + return round(self._GetAvgValueFromTimeSeries(results) * 100, 2) + + def _GetAvgValueFromTimeSeries( + self, time_series: types.ListTimeSeriesResponse): + """Parses time series data and returns average across intervals. + + Args: + time_series: time series of cpu fractional utilization returned by + monitoring. + + Returns: + Average value across intervals + """ + points = [] + for time_interval in time_series: + for snapshot in time_interval.points: + points.append(snapshot.value.double_value) + + if points: + # Average over all minute intervals captured + averaged = sum(points) / len(points) + return averaged + + return None diff --git a/requirements.txt b/requirements.txt index b0a05dcb87..662c505a47 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,7 +27,7 @@ six>=1.13.0 pywinrm timeout-decorator google-cloud-datastore -google-cloud-monitoring +google-cloud-monitoring==0.31.1 beautifulsoup4 requests