Skip to content

Commit

Permalink
Reformat dpb_sparksql_benchmark.py file.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 565724300
  • Loading branch information
dorellang authored and copybara-github committed Sep 15, 2023
1 parent 6b09e1d commit 7e74db9
Showing 1 changed file with 92 additions and 45 deletions.
137 changes: 92 additions & 45 deletions perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,19 @@
flags.DEFINE_string(
'spark_bigquery_connector',
None,
'The Spark BigQuery Connector jar to pass to the Spark Job')
'The Spark BigQuery Connector jar to pass to the Spark Job',
)
flags.DEFINE_list(
'bigquery_tables', [],
'bigquery_tables',
[],
'A list of BigQuery tables to load as Temporary Spark SQL views instead '
'of reading from external Hive tables.'
'of reading from external Hive tables.',
)
flags.DEFINE_string(
'bigquery_record_format', None,
'bigquery_record_format',
None,
'The record format to use when connecting to BigQuery storage. See: '
'https://github.com/GoogleCloudDataproc/spark-bigquery-connector#properties'
'https://github.com/GoogleCloudDataproc/spark-bigquery-connector#properties',
)

FLAGS = flags.FLAGS
Expand All @@ -126,29 +129,38 @@ def CheckPrerequisites(benchmark_config):
del benchmark_config # unused
if not FLAGS.dpb_sparksql_data and FLAGS.dpb_sparksql_create_hive_tables:
raise errors.Config.InvalidValue(
'You must pass dpb_sparksql_data with dpb_sparksql_create_hive_tables')
'You must pass dpb_sparksql_data with dpb_sparksql_create_hive_tables'
)
if FLAGS.dpb_sparksql_database and FLAGS.dpb_sparksql_create_hive_tables:
raise errors.Config.InvalidValue(
'You cannot create hive tables in a custom database.')
'You cannot create hive tables in a custom database.'
)
if FLAGS.bigquery_tables and not FLAGS.spark_bigquery_connector:
# Remove if Dataproc ever bundles BigQuery connector
raise errors.Config.InvalidValue(
'You must provide the BigQuery connector using '
'--spark_bigquery_connector.')
if not (FLAGS.dpb_sparksql_data or FLAGS.bigquery_tables or
FLAGS.dpb_sparksql_database):
'--spark_bigquery_connector.'
)
if not (
FLAGS.dpb_sparksql_data
or FLAGS.bigquery_tables
or FLAGS.dpb_sparksql_database
):
# In the case of a static dpb_service, data could pre-exist
logging.warning(
'You did not specify --dpb_sparksql_data, --bigquery_tables, '
'or dpb_sparksql_database. You will probably not have data to query!')
'or dpb_sparksql_database. You will probably not have data to query!'
)
if bool(FLAGS.dpb_sparksql_order) == bool(FLAGS.dpb_sparksql_streams):
raise errors.Config.InvalidValue(
'You must specify the queries to run with either --dpb_sparksql_order '
'or --dpb_sparksql_streams (but not both).')
'or --dpb_sparksql_streams (but not both).'
)
if FLAGS.dpb_sparksql_simultaneous and FLAGS.dpb_sparksql_streams:
raise errors.Config.InvalidValue(
'--dpb_sparksql_simultaneous is not compatible with '
'--dpb_sparksql_streams.')
'--dpb_sparksql_streams.'
)


def Prepare(benchmark_spec):
Expand Down Expand Up @@ -181,23 +193,28 @@ def Prepare(benchmark_spec):
if flag == 'destination' and FLAGS.dpb_sparksql_data_compression:
extra['compression'] = FLAGS.dpb_sparksql_data_compression
metadata = _GetDistCpMetadata(
data_dir, benchmark_spec.table_subdirs, extra_metadata=extra)
data_dir, benchmark_spec.table_subdirs, extra_metadata=extra
)
dpb_sparksql_benchmark_helper.StageMetadata(
metadata, storage_service, staged_file)
metadata, storage_service, staged_file
)
job_arguments += ['--{}-metadata'.format(flag), staged_file]
try:
result = cluster.SubmitJob(
pyspark_file='/'.join([
cluster.base_dir,
dpb_sparksql_benchmark_helper.SPARK_SQL_DISTCP_SCRIPT]),
dpb_sparksql_benchmark_helper.SPARK_SQL_DISTCP_SCRIPT,
]),
job_type=dpb_service.BaseDpbService.PYSPARK_JOB_TYPE,
job_arguments=job_arguments)
job_arguments=job_arguments,
)
logging.info(result)
# Tell the benchmark to read from HDFS instead.
benchmark_spec.data_dir = copy_dirs['destination']
except dpb_service.JobSubmissionError as e:
raise errors.Benchmarks.PrepareException(
'Copying tables into HDFS failed') from e
'Copying tables into HDFS failed'
) from e

# Create external Hive tables
benchmark_spec.hive_tables_creation_time = None
Expand All @@ -207,16 +224,19 @@ def Prepare(benchmark_spec):
result = cluster.SubmitJob(
pyspark_file='/'.join([
cluster.base_dir,
dpb_sparksql_benchmark_helper.SPARK_TABLE_SCRIPT]),
dpb_sparksql_benchmark_helper.SPARK_TABLE_SCRIPT,
]),
job_type=dpb_service.BaseDpbService.PYSPARK_JOB_TYPE,
job_arguments=[
benchmark_spec.data_dir, ','.join(benchmark_spec.table_subdirs)
])
benchmark_spec.data_dir,
','.join(benchmark_spec.table_subdirs),
],
)
logging.info(result)
except dpb_service.JobSubmissionError as e:
raise errors.Benchmarks.PrepareException(
'Creating tables from {}/* failed'.format(
benchmark_spec.data_dir)) from e
'Creating tables from {}/* failed'.format(benchmark_spec.data_dir)
) from e
end_time = time.time()
benchmark_spec.hive_tables_creation_time = end_time - start_time

Expand Down Expand Up @@ -250,7 +270,8 @@ def _GetSampleMetadata(benchmark_spec):
"""Gets metadata dict to be attached to exported benchmark samples/metrics."""
metadata = benchmark_spec.dpb_service.GetResourceMetadata()
metadata['benchmark'] = dpb_sparksql_benchmark_helper.BENCHMARK_NAMES[
FLAGS.dpb_sparksql_query]
FLAGS.dpb_sparksql_query
]
if FLAGS.bigquery_record_format:
# This takes higher priority since for BQ dpb_sparksql_data_format actually
# holds a fully qualified Java class/package name.
Expand Down Expand Up @@ -291,7 +312,8 @@ def _RunQueries(benchmark_spec) -> tuple[str, dpb_service.JobResult]:
if table_metadata:
table_metadata_file = '/'.join([cluster.base_dir, 'metadata.json'])
dpb_sparksql_benchmark_helper.StageMetadata(
table_metadata, storage_service, table_metadata_file)
table_metadata, storage_service, table_metadata_file
)
args += ['--table-metadata', table_metadata_file]
else:
# If we don't pass in tables, we must be reading from hive.
Expand All @@ -308,17 +330,19 @@ def _RunQueries(benchmark_spec) -> tuple[str, dpb_service.JobResult]:
job_result = cluster.SubmitJob(
pyspark_file='/'.join([
cluster.base_dir,
dpb_sparksql_benchmark_helper.SPARK_SQL_RUNNER_SCRIPT]),
dpb_sparksql_benchmark_helper.SPARK_SQL_RUNNER_SCRIPT,
]),
job_arguments=args,
job_jars=jars,
job_type=dpb_service.BaseDpbService.PYSPARK_JOB_TYPE)
job_type=dpb_service.BaseDpbService.PYSPARK_JOB_TYPE,
)
return report_dir, job_result


def _GetQuerySamples(
storage_service: object_storage_service.ObjectStorageService,
report_dir: str,
base_metadata: MutableMapping[str, str]
base_metadata: MutableMapping[str, str],
) -> list[sample.Sample]:
"""Get Sample objects from metrics storage path."""
# Spark can only write data to directories not files. So do a recursive copy
Expand All @@ -328,7 +352,8 @@ def _GetQuerySamples(
storage_service.Copy(report_dir, temp_run_dir, recursive=True)
report_files = []
for dir_name, _, files in os.walk(
os.path.join(temp_run_dir, os.path.basename(report_dir))):
os.path.join(temp_run_dir, os.path.basename(report_dir))
):
for filename in files:
if filename.endswith('.json'):
report_file = os.path.join(dir_name, filename)
Expand All @@ -350,16 +375,22 @@ def _GetQuerySamples(
if FLAGS.dpb_sparksql_streams:
metadata_copy['stream'] = result['stream']
samples.append(
sample.Sample('sparksql_run_time', result['duration'], 'seconds',
metadata_copy))
sample.Sample(
'sparksql_run_time',
result['duration'],
'seconds',
metadata_copy,
)
)
return samples


def _GetGlobalSamples(
query_samples: list[sample.Sample],
cluster: dpb_service.BaseDpbService,
job_result: dpb_service.JobResult,
metadata: MutableMapping[str, str]) -> list[sample.Sample]:
metadata: MutableMapping[str, str],
) -> list[sample.Sample]:
"""Gets samples that summarize the whole benchmark run."""

run_times = {}
Expand All @@ -373,31 +404,47 @@ def _GetGlobalSamples(
if FLAGS.dpb_sparksql_streams:
for i, stream in enumerate(dpb_sparksql_benchmark_helper.GetStreams()):
metadata[f'failing_queries_stream_{i}'] = ','.join(
sorted(set(stream) - passing_queries.get(i, set())))
sorted(set(stream) - passing_queries.get(i, set()))
)
else:
all_passing_queries = set()
for stream_passing_queries in passing_queries.values():
all_passing_queries.update(stream_passing_queries)
metadata['failing_queries'] = ','.join(
sorted(set(FLAGS.dpb_sparksql_order) - all_passing_queries))
sorted(set(FLAGS.dpb_sparksql_order) - all_passing_queries)
)

cluster.metadata.update(metadata)

# TODO(user): Compute aggregated time for each query across streams and
# iterations.
samples.append(
sample.Sample('sparksql_total_wall_time', job_result.wall_time, 'seconds',
metadata))
sample.Sample(
'sparksql_total_wall_time', job_result.wall_time, 'seconds', metadata
)
)
samples.append(
sample.Sample(
'sparksql_geomean_run_time',
sample.GeoMean(run_times.values()),
'seconds',
metadata,
)
)
samples.append(
sample.Sample('sparksql_geomean_run_time',
sample.GeoMean(run_times.values()), 'seconds', metadata))
samples.append(sample.Sample('dpb_sparksql_job_pending',
job_result.pending_time, 'seconds', metadata))
sample.Sample(
'dpb_sparksql_job_pending',
job_result.pending_time,
'seconds',
metadata,
)
)
if FLAGS.dpb_export_job_stats:
run_cost = cluster.CalculateLastJobCost()
if run_cost is not None:
samples.append(
sample.Sample('sparksql_run_cost', run_cost, '$', metadata))
sample.Sample('sparksql_run_cost', run_cost, '$', metadata)
)
return samples


Expand Down Expand Up @@ -433,10 +480,10 @@ def _GetDistCpMetadata(base_dir: str, subdirs: List[str], extra_metadata=None):
if not extra_metadata:
extra_metadata = {}
for subdir in subdirs or []:
metadata += [(FLAGS.dpb_sparksql_data_format or 'parquet', {
'path': '/'.join([base_dir, subdir]),
**extra_metadata
})]
metadata += [(
FLAGS.dpb_sparksql_data_format or 'parquet',
{'path': '/'.join([base_dir, subdir]), **extra_metadata},
)]
return metadata


Expand Down

0 comments on commit 7e74db9

Please sign in to comment.