Skip to content

Commit

Permalink
Added functionality for getting autoscaled slot-second usage for BigQ…
Browse files Browse the repository at this point in the history
…uery runs

PiperOrigin-RevId: 700276445
  • Loading branch information
jnguertin authored and copybara-github committed Dec 6, 2024
1 parent 8ebfc7e commit 3f796d9
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 18 deletions.
41 changes: 41 additions & 0 deletions perfkitbenchmarker/data/edw/bigquery/run_cost_query.sql.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
WITH
job_times AS (
SELECT MIN(start_time) AS start_time, MAX(end_time) AS end_time
FROM `{{project_dot_region}}`.INFORMATION_SCHEMA.JOBS
WHERE
EXISTS(
SELECT KEY, value
FROM UNNEST(labels)
WHERE KEY = 'minimal_run_key' AND VALUE = '{{run_identifier}}'
)
),
per_second_times AS (
SELECT * FROM UNNEST(
GENERATE_TIMESTAMP_ARRAY(
TIMESTAMP_TRUNC(
(SELECT start_time FROM job_times), SECOND),
TIMESTAMP_TRUNC(
(SELECT end_time FROM job_times), SECOND),
INTERVAL 1 SECOND))
AS period_start
),
res_empty_timeline AS (
SELECT
IFNULL(rc.change_timestamp, per_second_times.period_start) AS period_start_sec,
per_second_times.period_start,
autoscale
FROM per_second_times
LEFT JOIN
`{{project_dot_region}}`.INFORMATION_SCHEMA.RESERVATION_CHANGES rc
ON per_second_times.period_start = TIMESTAMP_TRUNC(rc.change_timestamp, SECOND)
),
res_timeline AS (
SELECT
period_start,
period_start_sec,
LAST_VALUE(autoscale.current_slots IGNORE NULLS)
OVER (ORDER BY period_start_sec ASC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
AS autoscale_current_slots
FROM res_empty_timeline
)
SELECT SUM(autoscale_current_slots) AS billed_slot_seconds FROM res_timeline r
24 changes: 24 additions & 0 deletions perfkitbenchmarker/edw_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@
['JDBC'],
'The Runtime Interface used when interacting with Snowflake.',
)
flags.DEFINE_boolean(
'edw_get_service_auxiliary_metrics',
'False',
'If set, the benchmark will collect service-specific metrics from the'
' remote service after the benchmark has completed. Additional delay may be'
' incurred due to the need to wait for metadata propogation.',
)
flags.DEFINE_enum(
'edw_bq_feature_config',
'default',
Expand Down Expand Up @@ -538,3 +545,20 @@ def RequiresWarmUpSuite(self) -> bool:
A boolean value (True) if the warm suite is recommended.
"""
return True

def GetIterationAuxiliaryMetrics(self, iter_run_key: str) -> Dict[str, Any]:
"""Returns service-specific metrics derived from server-side metadata.
Must be run after the benchmark has completed.
Args:
iter_run_key: The unique identifier of the run and iteration to fetch
metrics for.
Returns:
A dictionary of the following format:
{ 'metric_1': { 'value': 1, 'unit': 'imperial femtoseconds' },
'metric_2': { 'value': 2, 'unit': 'metric dollars' }
...}
"""
raise NotImplementedError
93 changes: 75 additions & 18 deletions perfkitbenchmarker/providers/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
import os
import re
from typing import Any

from absl import flags
from perfkitbenchmarker import data
Expand Down Expand Up @@ -56,9 +57,29 @@
}


class GenericClientInterface(edw_service.EdwClientInterface):
"""Generic Client Interface class for BigQuery.
Attributes:
project_id: String name of the BigQuery project to benchmark
dataset_id: String name of the BigQuery dataset to benchmark
"""

def __init__(self, project_id: str, dataset_id: str):
self.project_id = project_id
self.dataset_id = dataset_id

def GetMetadata(self) -> dict[str, str]:
"""Gets the Metadata attributes for the Client Interface."""
return {'client': FLAGS.bq_client_interface}

def RunQueryWithResults(self, query_name: str) -> str:
raise NotImplementedError


def GetBigQueryClientInterface(
project_id: str, dataset_id: str
) -> edw_service.EdwClientInterface:
) -> GenericClientInterface:
"""Builds and Returns the requested BigQuery client Interface.
Args:
Expand All @@ -82,23 +103,6 @@ def GetBigQueryClientInterface(
raise RuntimeError('Unknown BigQuery Client Interface requested.')


class GenericClientInterface(edw_service.EdwClientInterface):
"""Generic Client Interface class for BigQuery.
Attributes:
project_id: String name of the BigQuery project to benchmark
dataset_id: String name of the BigQuery dataset to benchmark
"""

def __init__(self, project_id: str, dataset_id: str):
self.project_id = project_id
self.dataset_id = dataset_id

def GetMetadata(self) -> dict[str, str]:
"""Gets the Metadata attributes for the Client Interface."""
return {'client': FLAGS.bq_client_interface}


class CliClientInterface(GenericClientInterface):
"""Command Line Client Interface class for BigQuery.
Expand Down Expand Up @@ -446,6 +450,16 @@ def ExecuteThroughput(
stdout, _ = self.client_vm.RemoteCommand(cmd)
return stdout

def RunQueryWithResults(self, query_name: str) -> str:
"""Executes a query and returns performance details and query output."""
cmd = (
f'python3 {BQ_PYTHON_CLIENT_FILE} single --project'
f' {self.project_id} --credentials_file {self.key_file_name} --dataset'
f' {self.dataset_id} --query_file {query_name} --print_results'
)
stdout, _ = self.client_vm.RemoteCommand(cmd)
return stdout


class Bigquery(edw_service.EdwService):
"""Object representing a Bigquery cluster.
Expand All @@ -456,6 +470,8 @@ class Bigquery(edw_service.EdwService):

CLOUD = provider_info.GCP
SERVICE_TYPE = 'bigquery'
RUN_COST_QUERY_TEMPLATE = 'edw/bigquery/run_cost_query.sql.j2'
client_interface: GenericClientInterface

def __init__(self, edw_service_spec):
super().__init__(edw_service_spec)
Expand Down Expand Up @@ -672,6 +688,18 @@ def LoadDataset(
cmd.append(f'{project_dataset}.{table}')
vm_util.IssueCommand(cmd)

def GetDatasetRegion(self, dataset=None):
"""Get the region that a dataset resides in."""
cmd = [
'bq',
'show',
'--format=prettyjson',
self.FormatProjectAndDatasetForCommand(dataset),
]
dataset_metadata, _, _ = vm_util.IssueCommand(cmd)
metadata_json = json.loads(str(dataset_metadata))
return str(metadata_json['location']).lower()

def OpenDataset(self, dataset: str):
self.client_interface.dataset_id = dataset

Expand All @@ -682,6 +710,35 @@ def CopyTable(self, copy_table_name: str, to_dataset: str) -> None:
cmd = ['bq', 'cp', source, dest]
vm_util.IssueCommand(cmd)

def GetAutoscaleSlotSeconds(self, run_iter_id: str) -> int:
query_file_name = f'cost_query_{run_iter_id}'
context = {
'run_identifier': run_iter_id,
'project_dot_region': f'{self.client_interface.project_id}.region-{self.GetDatasetRegion()}',
}
self.client_interface.client_vm.RenderTemplate(
data.ResourcePath(self.RUN_COST_QUERY_TEMPLATE),
query_file_name,
context,
)
output = json.loads(
self.client_interface.RunQueryWithResults(query_file_name)
)
run_cost = output['details']['query_results']['billed_slot_seconds'][0]
return run_cost

def GetIterationAuxiliaryMetrics(self, iter_run_key: str) -> dict[str, Any]:
service_auxiliary_metrics = {}
try:
run_cost = self.GetAutoscaleSlotSeconds(iter_run_key)
service_auxiliary_metrics['edw_bq_autoscale_slot_seconds'] = {
'value': run_cost,
'unit': 'slot-seconds',
}
return service_auxiliary_metrics
except NotImplementedError: # No metrics support in client interface.
return {}


class Endor(Bigquery):
"""Class representing BigQuery Endor service."""
Expand Down

0 comments on commit 3f796d9

Please sign in to comment.