From 4bf84ef6c1cbafe16b70ff7ba3d3a7904d48f8be Mon Sep 17 00:00:00 2001 From: Diego Orellana Date: Thu, 11 Jan 2024 13:47:14 -0800 Subject: [PATCH] Drop BigQuery connector support for old Dataproc versions. PiperOrigin-RevId: 597644312 --- .../linux_benchmarks/dpb_sparksql_benchmark.py | 13 ------------- perfkitbenchmarker/providers/gcp/flags.py | 6 ++++++ .../providers/gcp/gcp_dpb_dataproc.py | 5 +++++ tests/providers/gcp/gcp_dpb_dataproc_test.py | 14 ++++++++++++++ 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py b/perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py index e8939aa001..b98a670737 100644 --- a/perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py @@ -93,11 +93,6 @@ worker_count: 2 """ -flags.DEFINE_string( - 'spark_bigquery_connector', - None, - 'The Spark BigQuery Connector jar to pass to the Spark Job', -) flags.DEFINE_list( 'bigquery_tables', [], @@ -136,12 +131,6 @@ def CheckPrerequisites(benchmark_config): raise errors.Config.InvalidValue( 'You cannot create hive tables in a custom database.' ) - if FLAGS.bigquery_tables and not FLAGS.spark_bigquery_connector: - # Remove if Dataproc ever bundles BigQuery connector - raise errors.Config.InvalidValue( - 'You must provide the BigQuery connector using ' - '--spark_bigquery_connector.' - ) if not ( FLAGS.dpb_sparksql_data or FLAGS.bigquery_tables @@ -326,8 +315,6 @@ def _RunQueries(benchmark_spec) -> tuple[str, dpb_service.JobResult]: if dpb_sparksql_benchmark_helper.DUMP_SPARK_CONF.value: args += ['--dump-spark-conf', os.path.join(cluster.base_dir, 'spark-conf')] jars = [] - if FLAGS.spark_bigquery_connector: - jars.append(FLAGS.spark_bigquery_connector) job_result = cluster.SubmitJob( pyspark_file='/'.join([ cluster.base_dir, diff --git a/perfkitbenchmarker/providers/gcp/flags.py b/perfkitbenchmarker/providers/gcp/flags.py index c4fcc49ad3..e9e2bd3bcd 100644 --- a/perfkitbenchmarker/providers/gcp/flags.py +++ b/perfkitbenchmarker/providers/gcp/flags.py @@ -413,6 +413,12 @@ 'multiple zones.', ) +SPARK_BIGQUERY_CONNECTOR = flags.DEFINE_string( + 'spark_bigquery_connector', + None, + 'The Spark BigQuery Connector jar to pass to the Spark Job', +) + def _ValidatePreemptFlags(flags_dict): if flags_dict['gce_preemptible_vms']: diff --git a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc.py b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc.py index 25c165acc1..ce292b1016 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc.py +++ b/perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc.py @@ -33,6 +33,7 @@ from perfkitbenchmarker import provider_info from perfkitbenchmarker import vm_util from perfkitbenchmarker.linux_packages import aws_credentials +from perfkitbenchmarker.providers.gcp import flags as gcp_flags from perfkitbenchmarker.providers.gcp import gcp_dpb_dataproc_serverless_prices from perfkitbenchmarker.providers.gcp import gcs from perfkitbenchmarker.providers.gcp import util @@ -259,6 +260,10 @@ def _Create(self): metadata = util.GetDefaultTags() metadata.update(flag_util.ParseKeyValuePairs(FLAGS.gcp_instance_metadata)) + if gcp_flags.SPARK_BIGQUERY_CONNECTOR.value: + metadata['SPARK_BQ_CONNECTOR_URL'] = ( + gcp_flags.SPARK_BIGQUERY_CONNECTOR.value + ) cmd.flags['metadata'] = util.FormatTags(metadata) cmd.flags['labels'] = util.MakeFormattedDefaultTags() timeout = 900 # 15 min diff --git a/tests/providers/gcp/gcp_dpb_dataproc_test.py b/tests/providers/gcp/gcp_dpb_dataproc_test.py index 584684ba16..e9df9541b6 100644 --- a/tests/providers/gcp/gcp_dpb_dataproc_test.py +++ b/tests/providers/gcp/gcp_dpb_dataproc_test.py @@ -139,6 +139,20 @@ def testCreate(self, mock_issue): self.assertIn('--region us-central1', command_string) self.assertIn('--zone us-central1-a', command_string) + @mock.patch.object( + vm_util, 'IssueCommand', return_value=('fake_stdout', 'fake_stderr', 0) + ) + def testCreateWithBQConnector(self, mock_issue): + FLAGS.spark_bigquery_connector = 'gs://custom-bq-libs/bqconnector.jar' + cluster = LocalGcpDpbDataproc(GetClusterSpec()) + cluster._Create() + self.assertEqual(mock_issue.call_count, 1) + command_string = ' '.join(mock_issue.call_args[0][0]) + self.assertIn( + '--metadata SPARK_BQ_CONNECTOR_URL=gs://custom-bq-libs/bqconnector.jar', + command_string, + ) + @mock.patch.object( vm_util, 'IssueCommand',