Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reformat dpb_sparksql_benchmark.py file. #4474

Merged
merged 1 commit into from
Sep 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 92 additions & 45 deletions perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,19 @@
flags.DEFINE_string(
'spark_bigquery_connector',
None,
'The Spark BigQuery Connector jar to pass to the Spark Job')
'The Spark BigQuery Connector jar to pass to the Spark Job',
)
flags.DEFINE_list(
'bigquery_tables', [],
'bigquery_tables',
[],
'A list of BigQuery tables to load as Temporary Spark SQL views instead '
'of reading from external Hive tables.'
'of reading from external Hive tables.',
)
flags.DEFINE_string(
'bigquery_record_format', None,
'bigquery_record_format',
None,
'The record format to use when connecting to BigQuery storage. See: '
'https://github.com/GoogleCloudDataproc/spark-bigquery-connector#properties'
'https://github.com/GoogleCloudDataproc/spark-bigquery-connector#properties',
)

FLAGS = flags.FLAGS
Expand All @@ -126,29 +129,38 @@ def CheckPrerequisites(benchmark_config):
del benchmark_config # unused
if not FLAGS.dpb_sparksql_data and FLAGS.dpb_sparksql_create_hive_tables:
raise errors.Config.InvalidValue(
'You must pass dpb_sparksql_data with dpb_sparksql_create_hive_tables')
'You must pass dpb_sparksql_data with dpb_sparksql_create_hive_tables'
)
if FLAGS.dpb_sparksql_database and FLAGS.dpb_sparksql_create_hive_tables:
raise errors.Config.InvalidValue(
'You cannot create hive tables in a custom database.')
'You cannot create hive tables in a custom database.'
)
if FLAGS.bigquery_tables and not FLAGS.spark_bigquery_connector:
# Remove if Dataproc ever bundles BigQuery connector
raise errors.Config.InvalidValue(
'You must provide the BigQuery connector using '
'--spark_bigquery_connector.')
if not (FLAGS.dpb_sparksql_data or FLAGS.bigquery_tables or
FLAGS.dpb_sparksql_database):
'--spark_bigquery_connector.'
)
if not (
FLAGS.dpb_sparksql_data
or FLAGS.bigquery_tables
or FLAGS.dpb_sparksql_database
):
# In the case of a static dpb_service, data could pre-exist
logging.warning(
'You did not specify --dpb_sparksql_data, --bigquery_tables, '
'or dpb_sparksql_database. You will probably not have data to query!')
'or dpb_sparksql_database. You will probably not have data to query!'
)
if bool(FLAGS.dpb_sparksql_order) == bool(FLAGS.dpb_sparksql_streams):
raise errors.Config.InvalidValue(
'You must specify the queries to run with either --dpb_sparksql_order '
'or --dpb_sparksql_streams (but not both).')
'or --dpb_sparksql_streams (but not both).'
)
if FLAGS.dpb_sparksql_simultaneous and FLAGS.dpb_sparksql_streams:
raise errors.Config.InvalidValue(
'--dpb_sparksql_simultaneous is not compatible with '
'--dpb_sparksql_streams.')
'--dpb_sparksql_streams.'
)


def Prepare(benchmark_spec):
Expand Down Expand Up @@ -181,23 +193,28 @@ def Prepare(benchmark_spec):
if flag == 'destination' and FLAGS.dpb_sparksql_data_compression:
extra['compression'] = FLAGS.dpb_sparksql_data_compression
metadata = _GetDistCpMetadata(
data_dir, benchmark_spec.table_subdirs, extra_metadata=extra)
data_dir, benchmark_spec.table_subdirs, extra_metadata=extra
)
dpb_sparksql_benchmark_helper.StageMetadata(
metadata, storage_service, staged_file)
metadata, storage_service, staged_file
)
job_arguments += ['--{}-metadata'.format(flag), staged_file]
try:
result = cluster.SubmitJob(
pyspark_file='/'.join([
cluster.base_dir,
dpb_sparksql_benchmark_helper.SPARK_SQL_DISTCP_SCRIPT]),
dpb_sparksql_benchmark_helper.SPARK_SQL_DISTCP_SCRIPT,
]),
job_type=dpb_service.BaseDpbService.PYSPARK_JOB_TYPE,
job_arguments=job_arguments)
job_arguments=job_arguments,
)
logging.info(result)
# Tell the benchmark to read from HDFS instead.
benchmark_spec.data_dir = copy_dirs['destination']
except dpb_service.JobSubmissionError as e:
raise errors.Benchmarks.PrepareException(
'Copying tables into HDFS failed') from e
'Copying tables into HDFS failed'
) from e

# Create external Hive tables
benchmark_spec.hive_tables_creation_time = None
Expand All @@ -207,16 +224,19 @@ def Prepare(benchmark_spec):
result = cluster.SubmitJob(
pyspark_file='/'.join([
cluster.base_dir,
dpb_sparksql_benchmark_helper.SPARK_TABLE_SCRIPT]),
dpb_sparksql_benchmark_helper.SPARK_TABLE_SCRIPT,
]),
job_type=dpb_service.BaseDpbService.PYSPARK_JOB_TYPE,
job_arguments=[
benchmark_spec.data_dir, ','.join(benchmark_spec.table_subdirs)
])
benchmark_spec.data_dir,
','.join(benchmark_spec.table_subdirs),
],
)
logging.info(result)
except dpb_service.JobSubmissionError as e:
raise errors.Benchmarks.PrepareException(
'Creating tables from {}/* failed'.format(
benchmark_spec.data_dir)) from e
'Creating tables from {}/* failed'.format(benchmark_spec.data_dir)
) from e
end_time = time.time()
benchmark_spec.hive_tables_creation_time = end_time - start_time

Expand Down Expand Up @@ -250,7 +270,8 @@ def _GetSampleMetadata(benchmark_spec):
"""Gets metadata dict to be attached to exported benchmark samples/metrics."""
metadata = benchmark_spec.dpb_service.GetResourceMetadata()
metadata['benchmark'] = dpb_sparksql_benchmark_helper.BENCHMARK_NAMES[
FLAGS.dpb_sparksql_query]
FLAGS.dpb_sparksql_query
]
if FLAGS.bigquery_record_format:
# This takes higher priority since for BQ dpb_sparksql_data_format actually
# holds a fully qualified Java class/package name.
Expand Down Expand Up @@ -291,7 +312,8 @@ def _RunQueries(benchmark_spec) -> tuple[str, dpb_service.JobResult]:
if table_metadata:
table_metadata_file = '/'.join([cluster.base_dir, 'metadata.json'])
dpb_sparksql_benchmark_helper.StageMetadata(
table_metadata, storage_service, table_metadata_file)
table_metadata, storage_service, table_metadata_file
)
args += ['--table-metadata', table_metadata_file]
else:
# If we don't pass in tables, we must be reading from hive.
Expand All @@ -308,17 +330,19 @@ def _RunQueries(benchmark_spec) -> tuple[str, dpb_service.JobResult]:
job_result = cluster.SubmitJob(
pyspark_file='/'.join([
cluster.base_dir,
dpb_sparksql_benchmark_helper.SPARK_SQL_RUNNER_SCRIPT]),
dpb_sparksql_benchmark_helper.SPARK_SQL_RUNNER_SCRIPT,
]),
job_arguments=args,
job_jars=jars,
job_type=dpb_service.BaseDpbService.PYSPARK_JOB_TYPE)
job_type=dpb_service.BaseDpbService.PYSPARK_JOB_TYPE,
)
return report_dir, job_result


def _GetQuerySamples(
storage_service: object_storage_service.ObjectStorageService,
report_dir: str,
base_metadata: MutableMapping[str, str]
base_metadata: MutableMapping[str, str],
) -> list[sample.Sample]:
"""Get Sample objects from metrics storage path."""
# Spark can only write data to directories not files. So do a recursive copy
Expand All @@ -328,7 +352,8 @@ def _GetQuerySamples(
storage_service.Copy(report_dir, temp_run_dir, recursive=True)
report_files = []
for dir_name, _, files in os.walk(
os.path.join(temp_run_dir, os.path.basename(report_dir))):
os.path.join(temp_run_dir, os.path.basename(report_dir))
):
for filename in files:
if filename.endswith('.json'):
report_file = os.path.join(dir_name, filename)
Expand All @@ -350,16 +375,22 @@ def _GetQuerySamples(
if FLAGS.dpb_sparksql_streams:
metadata_copy['stream'] = result['stream']
samples.append(
sample.Sample('sparksql_run_time', result['duration'], 'seconds',
metadata_copy))
sample.Sample(
'sparksql_run_time',
result['duration'],
'seconds',
metadata_copy,
)
)
return samples


def _GetGlobalSamples(
query_samples: list[sample.Sample],
cluster: dpb_service.BaseDpbService,
job_result: dpb_service.JobResult,
metadata: MutableMapping[str, str]) -> list[sample.Sample]:
metadata: MutableMapping[str, str],
) -> list[sample.Sample]:
"""Gets samples that summarize the whole benchmark run."""

run_times = {}
Expand All @@ -373,31 +404,47 @@ def _GetGlobalSamples(
if FLAGS.dpb_sparksql_streams:
for i, stream in enumerate(dpb_sparksql_benchmark_helper.GetStreams()):
metadata[f'failing_queries_stream_{i}'] = ','.join(
sorted(set(stream) - passing_queries.get(i, set())))
sorted(set(stream) - passing_queries.get(i, set()))
)
else:
all_passing_queries = set()
for stream_passing_queries in passing_queries.values():
all_passing_queries.update(stream_passing_queries)
metadata['failing_queries'] = ','.join(
sorted(set(FLAGS.dpb_sparksql_order) - all_passing_queries))
sorted(set(FLAGS.dpb_sparksql_order) - all_passing_queries)
)

cluster.metadata.update(metadata)

# TODO(user): Compute aggregated time for each query across streams and
# iterations.
samples.append(
sample.Sample('sparksql_total_wall_time', job_result.wall_time, 'seconds',
metadata))
sample.Sample(
'sparksql_total_wall_time', job_result.wall_time, 'seconds', metadata
)
)
samples.append(
sample.Sample(
'sparksql_geomean_run_time',
sample.GeoMean(run_times.values()),
'seconds',
metadata,
)
)
samples.append(
sample.Sample('sparksql_geomean_run_time',
sample.GeoMean(run_times.values()), 'seconds', metadata))
samples.append(sample.Sample('dpb_sparksql_job_pending',
job_result.pending_time, 'seconds', metadata))
sample.Sample(
'dpb_sparksql_job_pending',
job_result.pending_time,
'seconds',
metadata,
)
)
if FLAGS.dpb_export_job_stats:
run_cost = cluster.CalculateLastJobCost()
if run_cost is not None:
samples.append(
sample.Sample('sparksql_run_cost', run_cost, '$', metadata))
sample.Sample('sparksql_run_cost', run_cost, '$', metadata)
)
return samples


Expand Down Expand Up @@ -433,10 +480,10 @@ def _GetDistCpMetadata(base_dir: str, subdirs: List[str], extra_metadata=None):
if not extra_metadata:
extra_metadata = {}
for subdir in subdirs or []:
metadata += [(FLAGS.dpb_sparksql_data_format or 'parquet', {
'path': '/'.join([base_dir, subdir]),
**extra_metadata
})]
metadata += [(
FLAGS.dpb_sparksql_data_format or 'parquet',
{'path': '/'.join([base_dir, subdir]), **extra_metadata},
)]
return metadata


Expand Down
Loading