Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#3703 from rarsan:wordcount_bench
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 468308963
  • Loading branch information
copybara-github committed Aug 17, 2022
2 parents d69041f + 6b05b71 commit e823566
Show file tree
Hide file tree
Showing 4 changed files with 341 additions and 23 deletions.
88 changes: 88 additions & 0 deletions perfkitbenchmarker/configs/dataflow_wordcount.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#################################
# Worker machine configs
#################################
eight_core: &eight_core
vm_spec:
GCP:
machine_type: n1-standard-8
disk_spec:
GCP:
disk_size: 300

four_core: &four_core
vm_spec:
GCP:
machine_type: n1-standard-4
disk_spec:
GCP:
disk_size: 300

two_core: &two_core
vm_spec:
GCP:
machine_type: n1-standard-2
disk_spec:
GCP:
disk_size: 300

##################################################################
# Benchmark flags specifying Dataflow template and parameters
##################################################################
flags: &myflags
dpb_service_zone: us-central1-a
dpb_wordcount_out_base: <MY_BUCKET>
dpb_dataflow_temp_location: gs://<MY_BUCKET>/temp
dpb_dataflow_staging_location: gs://<MY_BUCKET>/temp
dpb_job_jarfile: ./word-count-beam/target/word-count-beam-bundled-0.1.jar
dpb_job_classname: org.apache.beam.examples.WordCount

#################################
# Benchmark variations to run
#################################
benchmarks:
- dpb_wordcount_benchmark: {
dpb_service: { service_type: dataflow, worker_count: 1, worker_group: *eight_core },
flags: *myflags
}
- dpb_wordcount_benchmark: {
dpb_service: { service_type: dataflow, worker_count: 1, worker_group: *four_core },
flags: *myflags
}
- dpb_wordcount_benchmark: {
dpb_service: { service_type: dataflow, worker_count: 1, worker_group: *two_core },
flags: *myflags
}


#################################
# Alternative benchmark config examples
#################################
# dpb_wordcount_benchmark:
# description: Run word count on dataflow
# dpb_service:
# service_type: dataflow
# worker_count: 1
# worker_group:
# vm_spec:
# GCP:
# machine_type: n1-standard-4
# disk_spec:
# GCP:
# disk_size: 300

# dpb_wordcount_benchmark:
# description: Run word count on dataflow
# dpb_service:
# service_type: dataflow
# worker_count: 1
# worker_group:
# vm_spec:
# GCP:
# machine_type: n1-standard-4
# disk_spec:
# GCP:
# disk_size: 300
# flag_matrix: cross_runners
# flag_matrix_defs:
# cross_runners:
# dpb_dataflow_runner: [DataflowRunner, DirectRunner]
27 changes: 22 additions & 5 deletions perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def Run(benchmark_spec):
base_out = FLAGS.dpb_dataflow_staging_location
else:
base_out = 'gs://{}'.format(FLAGS.dpb_wordcount_out_base)
job_arguments.append('--output={}/output/'.format(base_out))
job_arguments.append('--output={}/output'.format(base_out))
else:
# Use user-provided jar file if present; otherwise use the default example
if not FLAGS.dpb_job_jarfile:
Expand All @@ -159,19 +159,36 @@ def Run(benchmark_spec):

# TODO (saksena): Finalize more stats to gather
results = []
metadata = copy.copy(dpb_service_instance.GetMetadata())
metadata.update({'input_location': input_location})

start = datetime.datetime.now()
start_time = datetime.datetime.now()
dpb_service_instance.SubmitJob(
jarfile=jarfile,
classname=classname,
job_arguments=job_arguments,
job_stdout_file=stdout_file,
job_type=job_type)
end_time = datetime.datetime.now()
run_time = (end_time - start).total_seconds()

# Update metadata after job run to get job id
metadata = copy.copy(dpb_service_instance.GetMetadata())
metadata.update({'input_location': input_location})

run_time = (end_time - start_time).total_seconds()
results.append(sample.Sample('run_time', run_time, 'seconds', metadata))

# TODO(odiego): Refactor to avoid explicit service type checks.
if dpb_service_instance.SERVICE_TYPE == dpb_service.DATAFLOW:
avg_cpu_util = dpb_service_instance.GetAvgCpuUtilization(
start_time, end_time)
results.append(sample.Sample('avg_cpu_util', avg_cpu_util, '%', metadata))

stats = dpb_service_instance.job_stats
for name, value in stats.items():
results.append(sample.Sample(name, value, 'number', metadata))

total_cost = dpb_service_instance.CalculateCost()
results.append(sample.Sample('total_cost', total_cost, '$', metadata))

return results


Expand Down
Loading

0 comments on commit e823566

Please sign in to comment.