Skip to content

Commit

Permalink
Drop BigQuery connector support for old Dataproc versions.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 597644312
  • Loading branch information
dorellang authored and copybara-github committed Jan 11, 2024
1 parent 498a67f commit 4bf84ef
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 13 deletions.
13 changes: 0 additions & 13 deletions perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,6 @@
worker_count: 2
"""

flags.DEFINE_string(
'spark_bigquery_connector',
None,
'The Spark BigQuery Connector jar to pass to the Spark Job',
)
flags.DEFINE_list(
'bigquery_tables',
[],
Expand Down Expand Up @@ -136,12 +131,6 @@ def CheckPrerequisites(benchmark_config):
raise errors.Config.InvalidValue(
'You cannot create hive tables in a custom database.'
)
if FLAGS.bigquery_tables and not FLAGS.spark_bigquery_connector:
# Remove if Dataproc ever bundles BigQuery connector
raise errors.Config.InvalidValue(
'You must provide the BigQuery connector using '
'--spark_bigquery_connector.'
)
if not (
FLAGS.dpb_sparksql_data
or FLAGS.bigquery_tables
Expand Down Expand Up @@ -326,8 +315,6 @@ def _RunQueries(benchmark_spec) -> tuple[str, dpb_service.JobResult]:
if dpb_sparksql_benchmark_helper.DUMP_SPARK_CONF.value:
args += ['--dump-spark-conf', os.path.join(cluster.base_dir, 'spark-conf')]
jars = []
if FLAGS.spark_bigquery_connector:
jars.append(FLAGS.spark_bigquery_connector)
job_result = cluster.SubmitJob(
pyspark_file='/'.join([
cluster.base_dir,
Expand Down
6 changes: 6 additions & 0 deletions perfkitbenchmarker/providers/gcp/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,12 @@
'multiple zones.',
)

SPARK_BIGQUERY_CONNECTOR = flags.DEFINE_string(
'spark_bigquery_connector',
None,
'The Spark BigQuery Connector jar to pass to the Spark Job',
)


def _ValidatePreemptFlags(flags_dict):
if flags_dict['gce_preemptible_vms']:
Expand Down
5 changes: 5 additions & 0 deletions perfkitbenchmarker/providers/gcp/gcp_dpb_dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from perfkitbenchmarker import provider_info
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.linux_packages import aws_credentials
from perfkitbenchmarker.providers.gcp import flags as gcp_flags
from perfkitbenchmarker.providers.gcp import gcp_dpb_dataproc_serverless_prices
from perfkitbenchmarker.providers.gcp import gcs
from perfkitbenchmarker.providers.gcp import util
Expand Down Expand Up @@ -259,6 +260,10 @@ def _Create(self):

metadata = util.GetDefaultTags()
metadata.update(flag_util.ParseKeyValuePairs(FLAGS.gcp_instance_metadata))
if gcp_flags.SPARK_BIGQUERY_CONNECTOR.value:
metadata['SPARK_BQ_CONNECTOR_URL'] = (
gcp_flags.SPARK_BIGQUERY_CONNECTOR.value
)
cmd.flags['metadata'] = util.FormatTags(metadata)
cmd.flags['labels'] = util.MakeFormattedDefaultTags()
timeout = 900 # 15 min
Expand Down
14 changes: 14 additions & 0 deletions tests/providers/gcp/gcp_dpb_dataproc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,20 @@ def testCreate(self, mock_issue):
self.assertIn('--region us-central1', command_string)
self.assertIn('--zone us-central1-a', command_string)

@mock.patch.object(
vm_util, 'IssueCommand', return_value=('fake_stdout', 'fake_stderr', 0)
)
def testCreateWithBQConnector(self, mock_issue):
FLAGS.spark_bigquery_connector = 'gs://custom-bq-libs/bqconnector.jar'
cluster = LocalGcpDpbDataproc(GetClusterSpec())
cluster._Create()
self.assertEqual(mock_issue.call_count, 1)
command_string = ' '.join(mock_issue.call_args[0][0])
self.assertIn(
'--metadata SPARK_BQ_CONNECTOR_URL=gs://custom-bq-libs/bqconnector.jar',
command_string,
)

@mock.patch.object(
vm_util,
'IssueCommand',
Expand Down

0 comments on commit 4bf84ef

Please sign in to comment.