From c2bddbabe98af4f6408eddb935e836b096820bd5 Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Mon, 11 Sep 2023 14:52:32 -0700 Subject: [PATCH] Move CPU utilization-based YCSB test to the common package. PiperOrigin-RevId: 564509241 --- .../cloud_spanner_ycsb_benchmark.py | 180 ++------------- perfkitbenchmarker/linux_packages/ycsb.py | 210 +++++++++++++++++- .../providers/gcp/gcp_spanner.py | 48 ++-- tests/providers/gcp/gcp_spanner_test.py | 2 +- 4 files changed, 268 insertions(+), 172 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py b/perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py index ee0ff65871..079a90d4c8 100644 --- a/perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py @@ -16,12 +16,13 @@ By default, this benchmark provision 1 single-CPU VM and spawn 1 thread to test Spanner. Configure the number of VMs via --ycsb_client_vms. -""" -import logging -import time -from typing import Any, Dict, List +In some cases, sleep time in between loading and running may be required for +best performance. See https://cloud.google.com/spanner/docs/pre-warm-database +for best practices. +""" +from typing import Any from absl import flags from perfkitbenchmarker import background_tasks from perfkitbenchmarker import configs @@ -73,60 +74,12 @@ flags.DEFINE_enum('cloud_spanner_ycsb_readmode', 'query', ['query', 'read'], 'The Cloud Spanner read mode used in the YCSB benchmark.') -flags.DEFINE_list('cloud_spanner_ycsb_custom_vm_install_commands', [], - 'A list of strings. If specified, execute them on every ' - 'VM during the installation phase.') -_CPU_OPTIMIZATION = flags.DEFINE_bool( - 'cloud_spanner_ycsb_cpu_optimization', False, - 'Whether to run in CPU-optimized mode. The test will increase QPS until ' - 'CPU is between --cloud_spanner_ycsb_cpu_optimization_target and ' - '--cloud_spanner_ycsb_cpu_optimization_target_max.') -_CPU_TARGET_HIGH_PRIORITY = flags.DEFINE_float( - 'cloud_spanner_ycsb_cpu_optimization_target', 0.65, - 'Minimum target CPU utilization at which to stop the test. The default is ' - 'the recommended Spanner high priority CPU utilization, see: ' - 'https://cloud.google.com/spanner/docs/cpu-utilization#recommended-max.') -_CPU_TARGET_HIGH_PRIORITY_UPPER_BOUND = flags.DEFINE_float( - 'cloud_spanner_ycsb_cpu_optimization_target_max', 0.75, - 'Maximum target CPU utilization after which the benchmark will throw an ' - 'exception. This is needed so that in CPU-optimized mode, the increase in ' - 'QPS does not overshoot the target CPU percentage by too much.') -_CPU_OPTIMIZATION_SLEEP_MINUTES = flags.DEFINE_integer( - 'cloud_spanner_ycsb_cpu_optimization_sleep_mins', 0, - 'Time in minutes to sleep in between run steps that increase the target ' - 'QPS. This allows for Spanner to run compactions and other background ' - 'tasks after a write-heavy workload. See ' - 'https://cloud.google.com/spanner/docs/pre-warm-database.') - - -def _ValidateCpuTargetFlags(flags_dict): - return (flags_dict['cloud_spanner_ycsb_cpu_optimization_target_max'] > - flags_dict['cloud_spanner_ycsb_cpu_optimization_target']) - - -flags.register_multi_flags_validator( - [ - 'cloud_spanner_ycsb_cpu_optimization_target', - 'cloud_spanner_ycsb_cpu_optimization_target_max' - ], _ValidateCpuTargetFlags, - 'CPU optimization max target must be greater than target.') - -_CPU_OPTIMIZATION_INCREMENT_MINUTES = flags.DEFINE_integer( - 'cloud_spanner_ycsb_cpu_optimization_workload_mins', 30, - 'Length of time to run YCSB until incrementing QPS.') -_CPU_OPTIMIZATION_MEASUREMENT_MINUTES = flags.DEFINE_integer( - 'cloud_spanner_ycsb_cpu_optimization_measurement_mins', 5, - 'Length of time to measure average CPU at the end of a test. For example, ' - 'the default 5 means that only the last 5 minutes of the test will be ' - 'used for representative CPU utilization.') -_STARTING_QPS = flags.DEFINE_integer( - 'cloud_spanner_ycsb_min_target', None, - 'Starting QPS to set as YCSB target. Defaults to a value which uses the ' - 'published throughput expectations for each node, see READ/WRITE caps per ' - 'node below.') -_CPU_OPTIMIZATION_TARGET_QPS_INCREMENT = flags.DEFINE_integer( - 'cloud_spanner_ycsb_cpu_optimization_target_qps_increment', 1000, - 'The amount to increase target QPS by when running in CPU-optimized mode.') +flags.DEFINE_list( + 'cloud_spanner_ycsb_custom_vm_install_commands', + [], + 'A list of strings. If specified, execute them on every ' + 'VM during the installation phase.', +) def GetConfig(user_config): @@ -142,24 +95,15 @@ def CheckPrerequisites(_): for scope in REQUIRED_SCOPES: if scope not in FLAGS.gcloud_scopes: raise ValueError('Scope {0} required.'.format(scope)) - if _CPU_OPTIMIZATION.value: - workloads = ycsb.GetWorkloadFileList() - if len(workloads) != 1: - raise errors.Setup.InvalidFlagConfigurationError( - 'Running with --cloud_spanner_ycsb_cpu_optimization requires using ' - '1 workload file in --ycsb_workload_files.') - if FLAGS.ycsb_dynamic_load: - raise errors.Setup.InvalidFlagConfigurationError( - '--ycsb_dynamic_load and --cloud_spanner_ycsb_cpu_optimization are ' - 'mutually exclusive.') - if _CPU_OPTIMIZATION_INCREMENT_MINUTES.value < ( - _CPU_OPTIMIZATION_MEASUREMENT_MINUTES.value + - gcp_spanner.CPU_API_DELAY_MINUTES): - raise errors.Setup.InvalidFlagConfigurationError( - f'workload_mins {_CPU_OPTIMIZATION_INCREMENT_MINUTES.value} must be ' - 'greater than measurement_mins ' - f'{_CPU_OPTIMIZATION_MEASUREMENT_MINUTES.value} ' - f'+ CPU_API_DELAY_MINUTES {gcp_spanner.CPU_API_DELAY_MINUTES}') + if ycsb.CPU_OPTIMIZATION.value and ( + ycsb.CPU_OPTIMIZATION_MEASUREMENT_MINS.value + <= gcp_spanner.CPU_API_DELAY_MINUTES + ): + raise errors.Setup.InvalidFlagConfigurationError( + f'measurement_mins {ycsb.CPU_OPTIMIZATION_MEASUREMENT_MINS.value} must' + ' be greater than CPU_API_DELAY_MINUTES' + f' {gcp_spanner.CPU_API_DELAY_MINUTES}' + ) def Prepare(benchmark_spec): @@ -182,21 +126,6 @@ def Prepare(benchmark_spec): spanner.CreateTables(_BuildSchema()) -def _GetCpuOptimizationMetadata() -> Dict[str, Any]: - return { - 'cloud_spanner_cpu_optimization': - True, - 'cloud_spanner_cpu_target': - _CPU_TARGET_HIGH_PRIORITY.value, - 'cloud_spanner_cpu_increment_minutes': - _CPU_OPTIMIZATION_INCREMENT_MINUTES.value, - 'cloud_spanner_cpu_measurement_minutes': - _CPU_OPTIMIZATION_MEASUREMENT_MINUTES.value, - 'cloud_spanner_cpu_qps_increment': - _CPU_OPTIMIZATION_TARGET_QPS_INCREMENT.value, - } - - def _LoadDatabase(executor: ycsb.YCSBExecutor, spanner: gcp_spanner.GcpSpannerInstance, vms: list[virtual_machine.VirtualMachine], @@ -222,6 +151,7 @@ def Run(benchmark_spec): """ vms = benchmark_spec.vms spanner: gcp_spanner.GcpSpannerInstance = benchmark_spec.relational_db + executor: ycsb.YCSBExecutor = benchmark_spec.executor run_kwargs = { 'table': BENCHMARK_TABLE, @@ -242,14 +172,9 @@ def Run(benchmark_spec): samples = [] metadata = {'ycsb_client_type': 'java'} - samples += _LoadDatabase(benchmark_spec.executor, spanner, vms, load_kwargs) + samples += _LoadDatabase(executor, spanner, vms, load_kwargs) - if _CPU_OPTIMIZATION.value: - samples += CpuUtilizationRun(benchmark_spec.executor, spanner, vms, - run_kwargs) - metadata.update(_GetCpuOptimizationMetadata()) - else: - samples += list(benchmark_spec.executor.Run(vms, run_kwargs=run_kwargs)) + samples += list(executor.Run(vms, run_kwargs=run_kwargs, database=spanner)) for result in samples: result.metadata.update(metadata) @@ -258,65 +183,6 @@ def Run(benchmark_spec): return samples -def _ExtractThroughput(samples: List[sample.Sample]) -> float: - for result in samples: - if result.metric == 'overall Throughput': - return result.value - return 0.0 - - -def CpuUtilizationRun(executor: ycsb.YCSBExecutor, - spanner: gcp_spanner.GcpSpannerInstance, - vms: List[virtual_machine.VirtualMachine], - run_kwargs: Dict[str, Any]) -> List[sample.Sample]: - """Runs YCSB until the CPU utilization is over 65%.""" - workload = ycsb.GetWorkloadFileList()[0] - with open(workload) as f: - workload_args = ycsb.ParseWorkload(f.read()) - read_proportion = float(workload_args['readproportion']) - write_proportion = float(workload_args['updateproportion']) - qps = _STARTING_QPS.value or int(spanner.CalculateRecommendedThroughput( - read_proportion, write_proportion) * 0.5) # Leave space for increment. - first_run = True - while True: - run_kwargs['target'] = qps - run_kwargs['maxexecutiontime'] = ( - _CPU_OPTIMIZATION_INCREMENT_MINUTES.value * 60) - run_samples = executor.Run(vms, run_kwargs=run_kwargs) - throughput = _ExtractThroughput(run_samples) - cpu_utilization = spanner.GetAverageCpuUsage( - _CPU_OPTIMIZATION_MEASUREMENT_MINUTES.value) - logging.info( - 'Run had throughput target %s and measured throughput %s, ' - 'with average high-priority CPU utilization %s.', - qps, throughput, cpu_utilization) - if cpu_utilization > _CPU_TARGET_HIGH_PRIORITY.value: - logging.info('CPU utilization is higher than cap %s, stopping test', - _CPU_TARGET_HIGH_PRIORITY.value) - if first_run: - raise errors.Benchmarks.RunError( - f'Initial QPS {qps} already above cpu utilization cap. ' - 'Please lower the starting QPS.') - if cpu_utilization > _CPU_TARGET_HIGH_PRIORITY_UPPER_BOUND.value: - raise errors.Benchmarks.RunError( - f'CPU utilization measured was {cpu_utilization}, over the ' - f'{_CPU_TARGET_HIGH_PRIORITY_UPPER_BOUND.value} threshold. ' - 'Decrease step size to avoid overshooting.') - for s in run_samples: - s.metadata['cloud_spanner_cpu_utilization'] = cpu_utilization - return run_samples - - # Sleep between steps for some workloads. - if _CPU_OPTIMIZATION_SLEEP_MINUTES.value: - logging.info( - 'Run phase finished, sleeping for %s minutes before starting the ' - 'next run.', _CPU_OPTIMIZATION_SLEEP_MINUTES.value) - time.sleep(_CPU_OPTIMIZATION_SLEEP_MINUTES.value * 60) - - qps += _CPU_OPTIMIZATION_TARGET_QPS_INCREMENT.value - first_run = False - - def Cleanup(benchmark_spec): """Cleanup. diff --git a/perfkitbenchmarker/linux_packages/ycsb.py b/perfkitbenchmarker/linux_packages/ycsb.py index 408843e810..6048a43bfb 100644 --- a/perfkitbenchmarker/linux_packages/ycsb.py +++ b/perfkitbenchmarker/linux_packages/ycsb.py @@ -50,6 +50,7 @@ from perfkitbenchmarker import errors from perfkitbenchmarker import events from perfkitbenchmarker import linux_packages +from perfkitbenchmarker import resource from perfkitbenchmarker import sample from perfkitbenchmarker import virtual_machine from perfkitbenchmarker import vm_util @@ -298,6 +299,88 @@ 'The maximum error rate allowed for the run. ' 'By default, this allows any number of errors.', ) +CPU_OPTIMIZATION = flags.DEFINE_bool( + 'ycsb_cpu_optimization', + False, + 'Whether to run in CPU-optimized mode. The test will increase QPS until ' + 'CPU is between --ycsb_cpu_optimization_target and ' + '--ycsb_cpu_optimization_target_max.', +) +CPU_OPTIMIZATION_TARGET = flags.DEFINE_float( + 'ycsb_cpu_optimization_target', + 0.70, + 'CPU-optimized mode: minimum target CPU utilization at which to stop the' + ' test. The test will continue to increment until the utilization level is' + ' reached, and then return relevant metrics for that usage level.', +) +CPU_OPTIMIZATION_TARGET_MAX = flags.DEFINE_float( + 'ycsb_cpu_optimization_target_max', + 0.80, + 'CPU-optimized mode: maximum target CPU utilization after which the' + ' benchmark will throw an exception. This is needed so the increase in QPS' + ' does not overshoot the target CPU percentage by too much.', +) +_CPU_OPTIMIZATION_SLEEP_MINS = flags.DEFINE_integer( + 'ycsb_cpu_optimization_sleep_mins', + 0, + 'CPU-optimized mode: time in minutes to sleep between run steps when' + ' increasing target QPS.', +) +CPU_OPTIMIZATION_INCREMENT_MINS = flags.DEFINE_integer( + 'ycsb_cpu_optimization_workload_mins', + 30, + 'CPU-optimized mode: length of time to run YCSB until incrementing QPS.', +) +CPU_OPTIMIZATION_MEASUREMENT_MINS = flags.DEFINE_integer( + 'ycsb_cpu_optimization_measurement_mins', + 5, + 'CPU-optimized mode: length of time to measure average CPU at the end of' + ' each step. For example, the default 5 means that only the last 5 minutes' + ' of the test will be used for representative CPU utilization. Must be ' + ' less than or equal to --ycsb_cpu_optimization_workload_mins. Note that ' + ' some APIs can have a delay in reporting results, so increase this' + ' accordingly.', +) +_CPU_OPTIMIZATION_STARTING_QPS = flags.DEFINE_integer( + 'ycsb_cpu_optimization_starting_qps', + None, + 'CPU-optimized mode: starting QPS to set as YCSB target.', +) +CPU_OPTIMIZATION_QPS_INCREMENT = flags.DEFINE_integer( + 'ycsb_cpu_optimization_target_qps_increment', + 1000, + 'CPU-optimized mode: the amount to increase target QPS by.', +) + + +def _ValidateCpuTargetFlags(flags_dict): + return ( + flags_dict['ycsb_cpu_optimization_target_max'] + > flags_dict['ycsb_cpu_optimization_target'] + ) + + +def _ValidateCpuMeasurementFlag(flags_dict): + return ( + flags_dict['ycsb_cpu_optimization_measurement_mins'] + <= flags_dict['ycsb_cpu_optimization_workload_mins'] + ) + + +flags.register_multi_flags_validator( + ['ycsb_cpu_optimization_target', 'ycsb_cpu_optimization_target_max'], + _ValidateCpuTargetFlags, + 'CPU optimization max target must be greater than target.', +) +flags.register_multi_flags_validator( + [ + 'ycsb_cpu_optimization_measurement_mins', + 'ycsb_cpu_optimization_workload_mins', + ], + _ValidateCpuMeasurementFlag, + 'CPU measurement minutes must be shorter than or equal to workload' + ' duration.', +) # Status line pattern _STATUS_PATTERN = r'(\d+) sec: \d+ operations; (\d+(\.\d+)?) current ops\/sec' @@ -457,6 +540,17 @@ def CheckPrerequisites(): 'unsupported unless additional parsing is added.' ) + if [ + dynamic_load, + _BURST_LOAD_MULTIPLIER.value is not None, + _INCREMENTAL_TARGET_QPS.value is not None, + CPU_OPTIMIZATION.value, + ].count(True) > 1: + raise errors.Setup.InvalidFlagConfigurationError( + '--ycsb_dynamic_load, --ycsb_burst_load, --ycsb_incremental_load, and' + ' --ycsb_cpu_optimization are mutually exclusive.' + ) + @vm_util.Retry(poll_interval=1) def Install(vm): @@ -1049,7 +1143,9 @@ def _HasInsertFailures(result_samples): else: return [] - def Run(self, vms, workloads=None, run_kwargs=None) -> list[sample.Sample]: + def Run( + self, vms, workloads=None, run_kwargs=None, database=None + ) -> list[sample.Sample]: """Runs each workload/client count combination.""" if FLAGS.ycsb_skip_run_stage: return [] @@ -1061,6 +1157,8 @@ def Run(self, vms, workloads=None, run_kwargs=None) -> list[sample.Sample]: samples = self._RunBurstMode(vms, workloads, run_kwargs) elif _INCREMENTAL_TARGET_QPS.value: samples = self._RunIncrementalMode(vms, workloads, run_kwargs) + elif CPU_OPTIMIZATION.value: + samples = self._RunCpuMode(vms, workloads, run_kwargs, database) else: samples = list(self.RunStaircaseLoads(vms, workloads, **run_kwargs)) if ( @@ -1166,6 +1264,116 @@ def _RunIncrementalMode( self._SetRunParameters(run_params) return list(self.RunStaircaseLoads(vms, workloads, **run_kwargs)) + def _RunCpuMode( + self, + vms: list[virtual_machine.VirtualMachine], + workloads: Sequence[str], + run_kwargs: Mapping[str, Any], + database: resource.BaseResource, + ) -> list[sample.Sample]: + """Runs YCSB until the CPU utilization is over the recommended amount. + + Args: + vms: The client VMs that will be used to push the load. + workloads: List of workloads to run. + run_kwargs: A mapping of additional YCSB run args to pass to the test. + database: This class must implement CalculateTheoreticalMaxThroughput and + GetAverageCpuUsage. + + Returns: + A list of samples from the YCSB test at the specified CPU utilization. + """ + + def _ExtractThroughput(samples: list[sample.Sample]) -> float: + """Gets the throughput recorded in the samples.""" + for result in samples: + if result.metric == 'overall Throughput': + return result.value + return 0.0 + + def _GetStartingThroughput(workload: str) -> int: + """Gets the starting throughput to start the test with.""" + with open(workload) as f: + workload_args = ParseWorkload(f.read()) + read_proportion = float(workload_args['readproportion']) + write_proportion = float(workload_args['updateproportion']) + # Multiply by half to leave space for increment. + return _CPU_OPTIMIZATION_STARTING_QPS.value or int( + database.CalculateTheoreticalMaxThroughput( + read_proportion, write_proportion + ) + * 0.5 + ) + + def _RunCpuModeSingleWorkload(workload: str) -> list[sample.Sample]: + """Runs the CPU utilization test for a single workload.""" + qps = _GetStartingThroughput(workload) + first_run = True + while True: + run_kwargs['target'] = qps + run_kwargs['maxexecutiontime'] = ( + CPU_OPTIMIZATION_INCREMENT_MINS.value * 60 + ) + run_samples = self.RunStaircaseLoads( + vms, workloads=workloads, **run_kwargs + ) + throughput = _ExtractThroughput(run_samples) + cpu_utilization = database.GetAverageCpuUsage( + CPU_OPTIMIZATION_MEASUREMENT_MINS.value + ) + logging.info( + 'Run had throughput target %s and measured throughput %s, with CPU' + ' utilization %s.', + qps, + throughput, + cpu_utilization, + ) + if cpu_utilization > CPU_OPTIMIZATION_TARGET.value: + logging.info( + 'CPU utilization is higher than cap %s, stopping test', + CPU_OPTIMIZATION_TARGET.value, + ) + if first_run: + raise errors.Benchmarks.RunError( + f'Initial QPS {qps} already above cpu utilization cap. ' + 'Please lower the starting QPS.' + ) + if cpu_utilization > CPU_OPTIMIZATION_TARGET_MAX.value: + raise errors.Benchmarks.RunError( + f'CPU utilization measured was {cpu_utilization}, over the ' + f'{CPU_OPTIMIZATION_TARGET_MAX.value} threshold. ' + 'Decrease step size to avoid overshooting.' + ) + for s in run_samples: + s.metadata.update({ + 'ycsb_cpu_optimization': True, + 'ycsb_cpu_utilization': cpu_utilization, + 'ycsb_cpu_target': CPU_OPTIMIZATION_TARGET.value, + 'ycsb_cpu_target_max': CPU_OPTIMIZATION_TARGET_MAX.value, + 'ycsb_cpu_increment_minutes': ( + CPU_OPTIMIZATION_INCREMENT_MINS.value + ), + 'ycsb_cpu_qps_increment': CPU_OPTIMIZATION_QPS_INCREMENT.value, + }) + return run_samples + + # Sleep between steps for some workloads. + if _CPU_OPTIMIZATION_SLEEP_MINS.value: + logging.info( + 'Run phase finished, sleeping for %s minutes before starting the ' + 'next run.', + _CPU_OPTIMIZATION_SLEEP_MINS.value, + ) + time.sleep(_CPU_OPTIMIZATION_SLEEP_MINS.value * 60) + + qps += CPU_OPTIMIZATION_QPS_INCREMENT.value + first_run = False + + results = [] + for workload in workloads: + results += _RunCpuModeSingleWorkload(workload) + return results + def LoadAndRun(self, vms, workloads=None, load_kwargs=None, run_kwargs=None): """Load data using YCSB, then run each workload/client count combination. diff --git a/perfkitbenchmarker/providers/gcp/gcp_spanner.py b/perfkitbenchmarker/providers/gcp/gcp_spanner.py index 1f1dffde45..709434fe0c 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_spanner.py +++ b/perfkitbenchmarker/providers/gcp/gcp_spanner.py @@ -99,7 +99,7 @@ CPU_API_DELAY_SECONDS = CPU_API_DELAY_MINUTES * 60 # For more information on QPS expectations, see -# https://cloud.google.com/spanner/docs/instance-configurations#regional-performance +# https://cloud.google.com/spanner/docs/performance#typical-workloads _READ_OPS_PER_NODE = 10000 _WRITE_OPS_PER_NODE = 2000 @@ -456,16 +456,23 @@ def GetResourceMetadata(self) -> Dict[Any, Any]: def GetAverageCpuUsage(self, duration_minutes: int) -> float: """Gets the average high priority CPU usage through the time duration.""" + if duration_minutes * 60 <= CPU_API_DELAY_SECONDS: + raise ValueError( + f'Spanner API has a {CPU_API_DELAY_SECONDS} sec. delay in receiving' + ' data, choose a longer duration to get CPU usage.' + ) + client = monitoring_v3.MetricServiceClient() # It takes up to 3 minutes for CPU metrics to appear. - end_timestamp = time.time() - CPU_API_DELAY_SECONDS cpu_query = query.Query( client, project=self.project, metric_type=( 'spanner.googleapis.com/instance/cpu/utilization_by_priority' ), - end_time=datetime.datetime.utcfromtimestamp(end_timestamp), + end_time=datetime.datetime.fromtimestamp( + time.time(), tz=datetime.timezone.utc + ), minutes=duration_minutes, ) # Filter by high priority @@ -484,24 +491,39 @@ def GetAverageCpuUsage(self, duration_minutes: int) -> float: 'Expected 2 metrics (user and system) for Spanner high-priority CPU ' f'utilization query, got {len(time_series)}' ) - cpu_aggregated = [ - user.value.double_value + system.value.double_value - for user, system in zip(time_series[0].points, time_series[1].points) - ] + logging.info('Instance %s utilization by minute:', self.instance_id) + cpu_aggregated = [] + for user, system in zip(time_series[0].points, time_series[1].points): + point_utilization = user.value.double_value + system.value.double_value + point_time = datetime.datetime.fromtimestamp( + user.interval.start_time.seconds + ) + logging.info('%s: %s', point_time, point_utilization) + cpu_aggregated.append(point_utilization) average_cpu = statistics.mean(cpu_aggregated) - logging.info('CPU aggregated: %s', cpu_aggregated) logging.info( - 'Average CPU for the %s minutes ending at %s: %s', - duration_minutes, - datetime.datetime.fromtimestamp(end_timestamp), + 'Instance %s average CPU utilization: %s', + self.instance_id, average_cpu, ) return average_cpu - def CalculateRecommendedThroughput( + def CalculateTheoreticalMaxThroughput( self, read_proportion: float, write_proportion: float ) -> int: - """Returns the recommended throughput based on the workload and nodes.""" + """Returns the theoretical max throughput based on the workload and nodes. + + This is the published theoretical max throughput of a Spanner node, see + https://cloud.google.com/spanner/docs/cpu-utilization#recommended-max for + more info. + + Args: + read_proportion: the proportion of read requests in the workload. + write_proportion: the propoertion of write requests in the workload. + + Returns: + The max total QPS taking into account the published single node limits. + """ if read_proportion + write_proportion != 1: raise errors.Benchmarks.RunError( 'Unrecognized workload, read + write proportion must be equal to 1, ' diff --git a/tests/providers/gcp/gcp_spanner_test.py b/tests/providers/gcp/gcp_spanner_test.py index 2fbb16a427..f5923c3078 100644 --- a/tests/providers/gcp/gcp_spanner_test.py +++ b/tests/providers/gcp/gcp_spanner_test.py @@ -176,7 +176,7 @@ def testCalculateStartingThroughput(self, write_proportion, read_proportion, test_spanner.nodes = 3 # Act - actual_qps = test_spanner.CalculateRecommendedThroughput( + actual_qps = test_spanner.CalculateTheoreticalMaxThroughput( read_proportion, write_proportion) # Assert