Skip to content

Commit

Permalink
Move CPU utilization-based YCSB test to the common package.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 563876305
  • Loading branch information
bvliu authored and copybara-github committed Sep 11, 2023
1 parent ebd1f4e commit f98e84c
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 172 deletions.
180 changes: 23 additions & 157 deletions perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
By default, this benchmark provision 1 single-CPU VM and spawn 1 thread
to test Spanner. Configure the number of VMs via --ycsb_client_vms.
"""
import logging
import time
from typing import Any, Dict, List
In some cases, sleep time in between loading and running may be required for
best performance. See https://cloud.google.com/spanner/docs/pre-warm-database
for best practices.
"""

from typing import Any
from absl import flags
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import configs
Expand Down Expand Up @@ -73,60 +74,12 @@
flags.DEFINE_enum('cloud_spanner_ycsb_readmode',
'query', ['query', 'read'],
'The Cloud Spanner read mode used in the YCSB benchmark.')
flags.DEFINE_list('cloud_spanner_ycsb_custom_vm_install_commands', [],
'A list of strings. If specified, execute them on every '
'VM during the installation phase.')
_CPU_OPTIMIZATION = flags.DEFINE_bool(
'cloud_spanner_ycsb_cpu_optimization', False,
'Whether to run in CPU-optimized mode. The test will increase QPS until '
'CPU is between --cloud_spanner_ycsb_cpu_optimization_target and '
'--cloud_spanner_ycsb_cpu_optimization_target_max.')
_CPU_TARGET_HIGH_PRIORITY = flags.DEFINE_float(
'cloud_spanner_ycsb_cpu_optimization_target', 0.65,
'Minimum target CPU utilization at which to stop the test. The default is '
'the recommended Spanner high priority CPU utilization, see: '
'https://cloud.google.com/spanner/docs/cpu-utilization#recommended-max.')
_CPU_TARGET_HIGH_PRIORITY_UPPER_BOUND = flags.DEFINE_float(
'cloud_spanner_ycsb_cpu_optimization_target_max', 0.75,
'Maximum target CPU utilization after which the benchmark will throw an '
'exception. This is needed so that in CPU-optimized mode, the increase in '
'QPS does not overshoot the target CPU percentage by too much.')
_CPU_OPTIMIZATION_SLEEP_MINUTES = flags.DEFINE_integer(
'cloud_spanner_ycsb_cpu_optimization_sleep_mins', 0,
'Time in minutes to sleep in between run steps that increase the target '
'QPS. This allows for Spanner to run compactions and other background '
'tasks after a write-heavy workload. See '
'https://cloud.google.com/spanner/docs/pre-warm-database.')


def _ValidateCpuTargetFlags(flags_dict):
return (flags_dict['cloud_spanner_ycsb_cpu_optimization_target_max'] >
flags_dict['cloud_spanner_ycsb_cpu_optimization_target'])


flags.register_multi_flags_validator(
[
'cloud_spanner_ycsb_cpu_optimization_target',
'cloud_spanner_ycsb_cpu_optimization_target_max'
], _ValidateCpuTargetFlags,
'CPU optimization max target must be greater than target.')

_CPU_OPTIMIZATION_INCREMENT_MINUTES = flags.DEFINE_integer(
'cloud_spanner_ycsb_cpu_optimization_workload_mins', 30,
'Length of time to run YCSB until incrementing QPS.')
_CPU_OPTIMIZATION_MEASUREMENT_MINUTES = flags.DEFINE_integer(
'cloud_spanner_ycsb_cpu_optimization_measurement_mins', 5,
'Length of time to measure average CPU at the end of a test. For example, '
'the default 5 means that only the last 5 minutes of the test will be '
'used for representative CPU utilization.')
_STARTING_QPS = flags.DEFINE_integer(
'cloud_spanner_ycsb_min_target', None,
'Starting QPS to set as YCSB target. Defaults to a value which uses the '
'published throughput expectations for each node, see READ/WRITE caps per '
'node below.')
_CPU_OPTIMIZATION_TARGET_QPS_INCREMENT = flags.DEFINE_integer(
'cloud_spanner_ycsb_cpu_optimization_target_qps_increment', 1000,
'The amount to increase target QPS by when running in CPU-optimized mode.')
flags.DEFINE_list(
'cloud_spanner_ycsb_custom_vm_install_commands',
[],
'A list of strings. If specified, execute them on every '
'VM during the installation phase.',
)


def GetConfig(user_config):
Expand All @@ -142,24 +95,15 @@ def CheckPrerequisites(_):
for scope in REQUIRED_SCOPES:
if scope not in FLAGS.gcloud_scopes:
raise ValueError('Scope {0} required.'.format(scope))
if _CPU_OPTIMIZATION.value:
workloads = ycsb.GetWorkloadFileList()
if len(workloads) != 1:
raise errors.Setup.InvalidFlagConfigurationError(
'Running with --cloud_spanner_ycsb_cpu_optimization requires using '
'1 workload file in --ycsb_workload_files.')
if FLAGS.ycsb_dynamic_load:
raise errors.Setup.InvalidFlagConfigurationError(
'--ycsb_dynamic_load and --cloud_spanner_ycsb_cpu_optimization are '
'mutually exclusive.')
if _CPU_OPTIMIZATION_INCREMENT_MINUTES.value < (
_CPU_OPTIMIZATION_MEASUREMENT_MINUTES.value +
gcp_spanner.CPU_API_DELAY_MINUTES):
raise errors.Setup.InvalidFlagConfigurationError(
f'workload_mins {_CPU_OPTIMIZATION_INCREMENT_MINUTES.value} must be '
'greater than measurement_mins '
f'{_CPU_OPTIMIZATION_MEASUREMENT_MINUTES.value} '
f'+ CPU_API_DELAY_MINUTES {gcp_spanner.CPU_API_DELAY_MINUTES}')
if ycsb.CPU_OPTIMIZATION.value and (
ycsb.CPU_OPTIMIZATION_MEASUREMENT_MINS.value
<= gcp_spanner.CPU_API_DELAY_MINUTES
):
raise errors.Setup.InvalidFlagConfigurationError(
f'measurement_mins {ycsb.CPU_OPTIMIZATION_MEASUREMENT_MINS.value} must'
' be greater than CPU_API_DELAY_MINUTES'
f' {gcp_spanner.CPU_API_DELAY_MINUTES}'
)


def Prepare(benchmark_spec):
Expand All @@ -182,21 +126,6 @@ def Prepare(benchmark_spec):
spanner.CreateTables(_BuildSchema())


def _GetCpuOptimizationMetadata() -> Dict[str, Any]:
return {
'cloud_spanner_cpu_optimization':
True,
'cloud_spanner_cpu_target':
_CPU_TARGET_HIGH_PRIORITY.value,
'cloud_spanner_cpu_increment_minutes':
_CPU_OPTIMIZATION_INCREMENT_MINUTES.value,
'cloud_spanner_cpu_measurement_minutes':
_CPU_OPTIMIZATION_MEASUREMENT_MINUTES.value,
'cloud_spanner_cpu_qps_increment':
_CPU_OPTIMIZATION_TARGET_QPS_INCREMENT.value,
}


def _LoadDatabase(executor: ycsb.YCSBExecutor,
spanner: gcp_spanner.GcpSpannerInstance,
vms: list[virtual_machine.VirtualMachine],
Expand All @@ -222,6 +151,7 @@ def Run(benchmark_spec):
"""
vms = benchmark_spec.vms
spanner: gcp_spanner.GcpSpannerInstance = benchmark_spec.relational_db
executor: ycsb.YCSBExecutor = benchmark_spec.executor

run_kwargs = {
'table': BENCHMARK_TABLE,
Expand All @@ -242,14 +172,9 @@ def Run(benchmark_spec):
samples = []
metadata = {'ycsb_client_type': 'java'}

samples += _LoadDatabase(benchmark_spec.executor, spanner, vms, load_kwargs)
samples += _LoadDatabase(executor, spanner, vms, load_kwargs)

if _CPU_OPTIMIZATION.value:
samples += CpuUtilizationRun(benchmark_spec.executor, spanner, vms,
run_kwargs)
metadata.update(_GetCpuOptimizationMetadata())
else:
samples += list(benchmark_spec.executor.Run(vms, run_kwargs=run_kwargs))
samples += list(executor.Run(vms, run_kwargs=run_kwargs, database=spanner))

for result in samples:
result.metadata.update(metadata)
Expand All @@ -258,65 +183,6 @@ def Run(benchmark_spec):
return samples


def _ExtractThroughput(samples: List[sample.Sample]) -> float:
for result in samples:
if result.metric == 'overall Throughput':
return result.value
return 0.0


def CpuUtilizationRun(executor: ycsb.YCSBExecutor,
spanner: gcp_spanner.GcpSpannerInstance,
vms: List[virtual_machine.VirtualMachine],
run_kwargs: Dict[str, Any]) -> List[sample.Sample]:
"""Runs YCSB until the CPU utilization is over 65%."""
workload = ycsb.GetWorkloadFileList()[0]
with open(workload) as f:
workload_args = ycsb.ParseWorkload(f.read())
read_proportion = float(workload_args['readproportion'])
write_proportion = float(workload_args['updateproportion'])
qps = _STARTING_QPS.value or int(spanner.CalculateRecommendedThroughput(
read_proportion, write_proportion) * 0.5) # Leave space for increment.
first_run = True
while True:
run_kwargs['target'] = qps
run_kwargs['maxexecutiontime'] = (
_CPU_OPTIMIZATION_INCREMENT_MINUTES.value * 60)
run_samples = executor.Run(vms, run_kwargs=run_kwargs)
throughput = _ExtractThroughput(run_samples)
cpu_utilization = spanner.GetAverageCpuUsage(
_CPU_OPTIMIZATION_MEASUREMENT_MINUTES.value)
logging.info(
'Run had throughput target %s and measured throughput %s, '
'with average high-priority CPU utilization %s.',
qps, throughput, cpu_utilization)
if cpu_utilization > _CPU_TARGET_HIGH_PRIORITY.value:
logging.info('CPU utilization is higher than cap %s, stopping test',
_CPU_TARGET_HIGH_PRIORITY.value)
if first_run:
raise errors.Benchmarks.RunError(
f'Initial QPS {qps} already above cpu utilization cap. '
'Please lower the starting QPS.')
if cpu_utilization > _CPU_TARGET_HIGH_PRIORITY_UPPER_BOUND.value:
raise errors.Benchmarks.RunError(
f'CPU utilization measured was {cpu_utilization}, over the '
f'{_CPU_TARGET_HIGH_PRIORITY_UPPER_BOUND.value} threshold. '
'Decrease step size to avoid overshooting.')
for s in run_samples:
s.metadata['cloud_spanner_cpu_utilization'] = cpu_utilization
return run_samples

# Sleep between steps for some workloads.
if _CPU_OPTIMIZATION_SLEEP_MINUTES.value:
logging.info(
'Run phase finished, sleeping for %s minutes before starting the '
'next run.', _CPU_OPTIMIZATION_SLEEP_MINUTES.value)
time.sleep(_CPU_OPTIMIZATION_SLEEP_MINUTES.value * 60)

qps += _CPU_OPTIMIZATION_TARGET_QPS_INCREMENT.value
first_run = False


def Cleanup(benchmark_spec):
"""Cleanup.
Expand Down
Loading

0 comments on commit f98e84c

Please sign in to comment.