diff --git a/perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py b/perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py
index ee0ff65871..079a90d4c8 100644
--- a/perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py
+++ b/perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py
@@ -16,12 +16,13 @@
 
 By default, this benchmark provision 1 single-CPU VM and spawn 1 thread
 to test Spanner. Configure the number of VMs via --ycsb_client_vms.
-"""
 
-import logging
-import time
-from typing import Any, Dict, List
+In some cases, sleep time in between loading and running may be required for
+best performance. See https://cloud.google.com/spanner/docs/pre-warm-database
+for best practices.
+"""
 
+from typing import Any
 from absl import flags
 from perfkitbenchmarker import background_tasks
 from perfkitbenchmarker import configs
@@ -73,60 +74,12 @@
 flags.DEFINE_enum('cloud_spanner_ycsb_readmode',
                   'query', ['query', 'read'],
                   'The Cloud Spanner read mode used in the YCSB benchmark.')
-flags.DEFINE_list('cloud_spanner_ycsb_custom_vm_install_commands', [],
-                  'A list of strings. If specified, execute them on every '
-                  'VM during the installation phase.')
-_CPU_OPTIMIZATION = flags.DEFINE_bool(
-    'cloud_spanner_ycsb_cpu_optimization', False,
-    'Whether to run in CPU-optimized mode. The test will increase QPS until '
-    'CPU is between --cloud_spanner_ycsb_cpu_optimization_target and '
-    '--cloud_spanner_ycsb_cpu_optimization_target_max.')
-_CPU_TARGET_HIGH_PRIORITY = flags.DEFINE_float(
-    'cloud_spanner_ycsb_cpu_optimization_target', 0.65,
-    'Minimum target CPU utilization at which to stop the test. The default is '
-    'the recommended Spanner high priority CPU utilization, see: '
-    'https://cloud.google.com/spanner/docs/cpu-utilization#recommended-max.')
-_CPU_TARGET_HIGH_PRIORITY_UPPER_BOUND = flags.DEFINE_float(
-    'cloud_spanner_ycsb_cpu_optimization_target_max', 0.75,
-    'Maximum target CPU utilization after which the benchmark will throw an '
-    'exception. This is needed so that in CPU-optimized mode, the increase in '
-    'QPS does not overshoot the target CPU percentage by too much.')
-_CPU_OPTIMIZATION_SLEEP_MINUTES = flags.DEFINE_integer(
-    'cloud_spanner_ycsb_cpu_optimization_sleep_mins', 0,
-    'Time in minutes to sleep in between run steps that increase the target '
-    'QPS. This allows for Spanner to run compactions and other background '
-    'tasks after a write-heavy workload. See '
-    'https://cloud.google.com/spanner/docs/pre-warm-database.')
-
-
-def _ValidateCpuTargetFlags(flags_dict):
-  return (flags_dict['cloud_spanner_ycsb_cpu_optimization_target_max'] >
-          flags_dict['cloud_spanner_ycsb_cpu_optimization_target'])
-
-
-flags.register_multi_flags_validator(
-    [
-        'cloud_spanner_ycsb_cpu_optimization_target',
-        'cloud_spanner_ycsb_cpu_optimization_target_max'
-    ], _ValidateCpuTargetFlags,
-    'CPU optimization max target must be greater than target.')
-
-_CPU_OPTIMIZATION_INCREMENT_MINUTES = flags.DEFINE_integer(
-    'cloud_spanner_ycsb_cpu_optimization_workload_mins', 30,
-    'Length of time to run YCSB until incrementing QPS.')
-_CPU_OPTIMIZATION_MEASUREMENT_MINUTES = flags.DEFINE_integer(
-    'cloud_spanner_ycsb_cpu_optimization_measurement_mins', 5,
-    'Length of time to measure average CPU at the end of a test. For example, '
-    'the default 5 means that only the last 5 minutes of the test will be '
-    'used for representative CPU utilization.')
-_STARTING_QPS = flags.DEFINE_integer(
-    'cloud_spanner_ycsb_min_target', None,
-    'Starting QPS to set as YCSB target. Defaults to a value which uses the '
-    'published throughput expectations for each node, see READ/WRITE caps per '
-    'node below.')
-_CPU_OPTIMIZATION_TARGET_QPS_INCREMENT = flags.DEFINE_integer(
-    'cloud_spanner_ycsb_cpu_optimization_target_qps_increment', 1000,
-    'The amount to increase target QPS by when running in CPU-optimized mode.')
+flags.DEFINE_list(
+    'cloud_spanner_ycsb_custom_vm_install_commands',
+    [],
+    'A list of strings. If specified, execute them on every '
+    'VM during the installation phase.',
+)
 
 
 def GetConfig(user_config):
@@ -142,24 +95,15 @@ def CheckPrerequisites(_):
   for scope in REQUIRED_SCOPES:
     if scope not in FLAGS.gcloud_scopes:
       raise ValueError('Scope {0} required.'.format(scope))
-  if _CPU_OPTIMIZATION.value:
-    workloads = ycsb.GetWorkloadFileList()
-    if len(workloads) != 1:
-      raise errors.Setup.InvalidFlagConfigurationError(
-          'Running with --cloud_spanner_ycsb_cpu_optimization requires using '
-          '1 workload file in --ycsb_workload_files.')
-    if FLAGS.ycsb_dynamic_load:
-      raise errors.Setup.InvalidFlagConfigurationError(
-          '--ycsb_dynamic_load and --cloud_spanner_ycsb_cpu_optimization are '
-          'mutually exclusive.')
-    if _CPU_OPTIMIZATION_INCREMENT_MINUTES.value < (
-        _CPU_OPTIMIZATION_MEASUREMENT_MINUTES.value +
-        gcp_spanner.CPU_API_DELAY_MINUTES):
-      raise errors.Setup.InvalidFlagConfigurationError(
-          f'workload_mins {_CPU_OPTIMIZATION_INCREMENT_MINUTES.value} must be '
-          'greater than measurement_mins '
-          f'{_CPU_OPTIMIZATION_MEASUREMENT_MINUTES.value} '
-          f'+ CPU_API_DELAY_MINUTES {gcp_spanner.CPU_API_DELAY_MINUTES}')
+  if ycsb.CPU_OPTIMIZATION.value and (
+      ycsb.CPU_OPTIMIZATION_MEASUREMENT_MINS.value
+      <= gcp_spanner.CPU_API_DELAY_MINUTES
+  ):
+    raise errors.Setup.InvalidFlagConfigurationError(
+        f'measurement_mins {ycsb.CPU_OPTIMIZATION_MEASUREMENT_MINS.value} must'
+        ' be greater than CPU_API_DELAY_MINUTES'
+        f' {gcp_spanner.CPU_API_DELAY_MINUTES}'
+    )
 
 
 def Prepare(benchmark_spec):
@@ -182,21 +126,6 @@ def Prepare(benchmark_spec):
   spanner.CreateTables(_BuildSchema())
 
 
-def _GetCpuOptimizationMetadata() -> Dict[str, Any]:
-  return {
-      'cloud_spanner_cpu_optimization':
-          True,
-      'cloud_spanner_cpu_target':
-          _CPU_TARGET_HIGH_PRIORITY.value,
-      'cloud_spanner_cpu_increment_minutes':
-          _CPU_OPTIMIZATION_INCREMENT_MINUTES.value,
-      'cloud_spanner_cpu_measurement_minutes':
-          _CPU_OPTIMIZATION_MEASUREMENT_MINUTES.value,
-      'cloud_spanner_cpu_qps_increment':
-          _CPU_OPTIMIZATION_TARGET_QPS_INCREMENT.value,
-  }
-
-
 def _LoadDatabase(executor: ycsb.YCSBExecutor,
                   spanner: gcp_spanner.GcpSpannerInstance,
                   vms: list[virtual_machine.VirtualMachine],
@@ -222,6 +151,7 @@ def Run(benchmark_spec):
   """
   vms = benchmark_spec.vms
   spanner: gcp_spanner.GcpSpannerInstance = benchmark_spec.relational_db
+  executor: ycsb.YCSBExecutor = benchmark_spec.executor
 
   run_kwargs = {
       'table': BENCHMARK_TABLE,
@@ -242,14 +172,9 @@ def Run(benchmark_spec):
   samples = []
   metadata = {'ycsb_client_type': 'java'}
 
-  samples += _LoadDatabase(benchmark_spec.executor, spanner, vms, load_kwargs)
+  samples += _LoadDatabase(executor, spanner, vms, load_kwargs)
 
-  if _CPU_OPTIMIZATION.value:
-    samples += CpuUtilizationRun(benchmark_spec.executor, spanner, vms,
-                                 run_kwargs)
-    metadata.update(_GetCpuOptimizationMetadata())
-  else:
-    samples += list(benchmark_spec.executor.Run(vms, run_kwargs=run_kwargs))
+  samples += list(executor.Run(vms, run_kwargs=run_kwargs, database=spanner))
 
   for result in samples:
     result.metadata.update(metadata)
@@ -258,65 +183,6 @@ def Run(benchmark_spec):
   return samples
 
 
-def _ExtractThroughput(samples: List[sample.Sample]) -> float:
-  for result in samples:
-    if result.metric == 'overall Throughput':
-      return result.value
-  return 0.0
-
-
-def CpuUtilizationRun(executor: ycsb.YCSBExecutor,
-                      spanner: gcp_spanner.GcpSpannerInstance,
-                      vms: List[virtual_machine.VirtualMachine],
-                      run_kwargs: Dict[str, Any]) -> List[sample.Sample]:
-  """Runs YCSB until the CPU utilization is over 65%."""
-  workload = ycsb.GetWorkloadFileList()[0]
-  with open(workload) as f:
-    workload_args = ycsb.ParseWorkload(f.read())
-  read_proportion = float(workload_args['readproportion'])
-  write_proportion = float(workload_args['updateproportion'])
-  qps = _STARTING_QPS.value or int(spanner.CalculateRecommendedThroughput(
-      read_proportion, write_proportion) * 0.5)  # Leave space for increment.
-  first_run = True
-  while True:
-    run_kwargs['target'] = qps
-    run_kwargs['maxexecutiontime'] = (
-        _CPU_OPTIMIZATION_INCREMENT_MINUTES.value * 60)
-    run_samples = executor.Run(vms, run_kwargs=run_kwargs)
-    throughput = _ExtractThroughput(run_samples)
-    cpu_utilization = spanner.GetAverageCpuUsage(
-        _CPU_OPTIMIZATION_MEASUREMENT_MINUTES.value)
-    logging.info(
-        'Run had throughput target %s and measured throughput %s, '
-        'with average high-priority CPU utilization %s.',
-        qps, throughput, cpu_utilization)
-    if cpu_utilization > _CPU_TARGET_HIGH_PRIORITY.value:
-      logging.info('CPU utilization is higher than cap %s, stopping test',
-                   _CPU_TARGET_HIGH_PRIORITY.value)
-      if first_run:
-        raise errors.Benchmarks.RunError(
-            f'Initial QPS {qps} already above cpu utilization cap. '
-            'Please lower the starting QPS.')
-      if cpu_utilization > _CPU_TARGET_HIGH_PRIORITY_UPPER_BOUND.value:
-        raise errors.Benchmarks.RunError(
-            f'CPU utilization measured was {cpu_utilization}, over the '
-            f'{_CPU_TARGET_HIGH_PRIORITY_UPPER_BOUND.value} threshold. '
-            'Decrease step size to avoid overshooting.')
-      for s in run_samples:
-        s.metadata['cloud_spanner_cpu_utilization'] = cpu_utilization
-      return run_samples
-
-    # Sleep between steps for some workloads.
-    if _CPU_OPTIMIZATION_SLEEP_MINUTES.value:
-      logging.info(
-          'Run phase finished, sleeping for %s minutes before starting the '
-          'next run.', _CPU_OPTIMIZATION_SLEEP_MINUTES.value)
-      time.sleep(_CPU_OPTIMIZATION_SLEEP_MINUTES.value * 60)
-
-    qps += _CPU_OPTIMIZATION_TARGET_QPS_INCREMENT.value
-    first_run = False
-
-
 def Cleanup(benchmark_spec):
   """Cleanup.
 
diff --git a/perfkitbenchmarker/linux_packages/ycsb.py b/perfkitbenchmarker/linux_packages/ycsb.py
index 408843e810..6048a43bfb 100644
--- a/perfkitbenchmarker/linux_packages/ycsb.py
+++ b/perfkitbenchmarker/linux_packages/ycsb.py
@@ -50,6 +50,7 @@
 from perfkitbenchmarker import errors
 from perfkitbenchmarker import events
 from perfkitbenchmarker import linux_packages
+from perfkitbenchmarker import resource
 from perfkitbenchmarker import sample
 from perfkitbenchmarker import virtual_machine
 from perfkitbenchmarker import vm_util
@@ -298,6 +299,88 @@
     'The maximum error rate allowed for the run. '
     'By default, this allows any number of errors.',
 )
+CPU_OPTIMIZATION = flags.DEFINE_bool(
+    'ycsb_cpu_optimization',
+    False,
+    'Whether to run in CPU-optimized mode. The test will increase QPS until '
+    'CPU is between --ycsb_cpu_optimization_target and '
+    '--ycsb_cpu_optimization_target_max.',
+)
+CPU_OPTIMIZATION_TARGET = flags.DEFINE_float(
+    'ycsb_cpu_optimization_target',
+    0.70,
+    'CPU-optimized mode: minimum target CPU utilization at which to stop the'
+    ' test. The test will continue to increment until the utilization level is'
+    ' reached, and then return relevant metrics for that usage level.',
+)
+CPU_OPTIMIZATION_TARGET_MAX = flags.DEFINE_float(
+    'ycsb_cpu_optimization_target_max',
+    0.80,
+    'CPU-optimized mode: maximum target CPU utilization after which the'
+    ' benchmark will throw an exception. This is needed so the increase in QPS'
+    ' does not overshoot the target CPU percentage by too much.',
+)
+_CPU_OPTIMIZATION_SLEEP_MINS = flags.DEFINE_integer(
+    'ycsb_cpu_optimization_sleep_mins',
+    0,
+    'CPU-optimized mode: time in minutes to sleep between run steps when'
+    ' increasing target QPS.',
+)
+CPU_OPTIMIZATION_INCREMENT_MINS = flags.DEFINE_integer(
+    'ycsb_cpu_optimization_workload_mins',
+    30,
+    'CPU-optimized mode: length of time to run YCSB until incrementing QPS.',
+)
+CPU_OPTIMIZATION_MEASUREMENT_MINS = flags.DEFINE_integer(
+    'ycsb_cpu_optimization_measurement_mins',
+    5,
+    'CPU-optimized mode: length of time to measure average CPU at the end of'
+    ' each step. For example, the default 5 means that only the last 5 minutes'
+    ' of the test will be used for representative CPU utilization. Must be '
+    ' less than or equal to --ycsb_cpu_optimization_workload_mins. Note that '
+    ' some APIs can have a delay in reporting results, so increase this'
+    ' accordingly.',
+)
+_CPU_OPTIMIZATION_STARTING_QPS = flags.DEFINE_integer(
+    'ycsb_cpu_optimization_starting_qps',
+    None,
+    'CPU-optimized mode: starting QPS to set as YCSB target.',
+)
+CPU_OPTIMIZATION_QPS_INCREMENT = flags.DEFINE_integer(
+    'ycsb_cpu_optimization_target_qps_increment',
+    1000,
+    'CPU-optimized mode: the amount to increase target QPS by.',
+)
+
+
+def _ValidateCpuTargetFlags(flags_dict):
+  return (
+      flags_dict['ycsb_cpu_optimization_target_max']
+      > flags_dict['ycsb_cpu_optimization_target']
+  )
+
+
+def _ValidateCpuMeasurementFlag(flags_dict):
+  return (
+      flags_dict['ycsb_cpu_optimization_measurement_mins']
+      <= flags_dict['ycsb_cpu_optimization_workload_mins']
+  )
+
+
+flags.register_multi_flags_validator(
+    ['ycsb_cpu_optimization_target', 'ycsb_cpu_optimization_target_max'],
+    _ValidateCpuTargetFlags,
+    'CPU optimization max target must be greater than target.',
+)
+flags.register_multi_flags_validator(
+    [
+        'ycsb_cpu_optimization_measurement_mins',
+        'ycsb_cpu_optimization_workload_mins',
+    ],
+    _ValidateCpuMeasurementFlag,
+    'CPU measurement minutes must be shorter than or equal to workload'
+    ' duration.',
+)
 
 # Status line pattern
 _STATUS_PATTERN = r'(\d+) sec: \d+ operations; (\d+(\.\d+)?) current ops\/sec'
@@ -457,6 +540,17 @@ def CheckPrerequisites():
         'unsupported unless additional parsing is added.'
     )
 
+  if [
+      dynamic_load,
+      _BURST_LOAD_MULTIPLIER.value is not None,
+      _INCREMENTAL_TARGET_QPS.value is not None,
+      CPU_OPTIMIZATION.value,
+  ].count(True) > 1:
+    raise errors.Setup.InvalidFlagConfigurationError(
+        '--ycsb_dynamic_load, --ycsb_burst_load, --ycsb_incremental_load, and'
+        ' --ycsb_cpu_optimization are mutually exclusive.'
+    )
+
 
 @vm_util.Retry(poll_interval=1)
 def Install(vm):
@@ -1049,7 +1143,9 @@ def _HasInsertFailures(result_samples):
     else:
       return []
 
-  def Run(self, vms, workloads=None, run_kwargs=None) -> list[sample.Sample]:
+  def Run(
+      self, vms, workloads=None, run_kwargs=None, database=None
+  ) -> list[sample.Sample]:
     """Runs each workload/client count combination."""
     if FLAGS.ycsb_skip_run_stage:
       return []
@@ -1061,6 +1157,8 @@ def Run(self, vms, workloads=None, run_kwargs=None) -> list[sample.Sample]:
       samples = self._RunBurstMode(vms, workloads, run_kwargs)
     elif _INCREMENTAL_TARGET_QPS.value:
       samples = self._RunIncrementalMode(vms, workloads, run_kwargs)
+    elif CPU_OPTIMIZATION.value:
+      samples = self._RunCpuMode(vms, workloads, run_kwargs, database)
     else:
       samples = list(self.RunStaircaseLoads(vms, workloads, **run_kwargs))
     if (
@@ -1166,6 +1264,116 @@ def _RunIncrementalMode(
     self._SetRunParameters(run_params)
     return list(self.RunStaircaseLoads(vms, workloads, **run_kwargs))
 
+  def _RunCpuMode(
+      self,
+      vms: list[virtual_machine.VirtualMachine],
+      workloads: Sequence[str],
+      run_kwargs: Mapping[str, Any],
+      database: resource.BaseResource,
+  ) -> list[sample.Sample]:
+    """Runs YCSB until the CPU utilization is over the recommended amount.
+
+    Args:
+      vms: The client VMs that will be used to push the load.
+      workloads: List of workloads to run.
+      run_kwargs: A mapping of additional YCSB run args to pass to the test.
+      database: This class must implement CalculateTheoreticalMaxThroughput and
+        GetAverageCpuUsage.
+
+    Returns:
+      A list of samples from the YCSB test at the specified CPU utilization.
+    """
+
+    def _ExtractThroughput(samples: list[sample.Sample]) -> float:
+      """Gets the throughput recorded in the samples."""
+      for result in samples:
+        if result.metric == 'overall Throughput':
+          return result.value
+      return 0.0
+
+    def _GetStartingThroughput(workload: str) -> int:
+      """Gets the starting throughput to start the test with."""
+      with open(workload) as f:
+        workload_args = ParseWorkload(f.read())
+      read_proportion = float(workload_args['readproportion'])
+      write_proportion = float(workload_args['updateproportion'])
+      # Multiply by half to leave space for increment.
+      return _CPU_OPTIMIZATION_STARTING_QPS.value or int(
+          database.CalculateTheoreticalMaxThroughput(
+              read_proportion, write_proportion
+          )
+          * 0.5
+      )
+
+    def _RunCpuModeSingleWorkload(workload: str) -> list[sample.Sample]:
+      """Runs the CPU utilization test for a single workload."""
+      qps = _GetStartingThroughput(workload)
+      first_run = True
+      while True:
+        run_kwargs['target'] = qps
+        run_kwargs['maxexecutiontime'] = (
+            CPU_OPTIMIZATION_INCREMENT_MINS.value * 60
+        )
+        run_samples = self.RunStaircaseLoads(
+            vms, workloads=workloads, **run_kwargs
+        )
+        throughput = _ExtractThroughput(run_samples)
+        cpu_utilization = database.GetAverageCpuUsage(
+            CPU_OPTIMIZATION_MEASUREMENT_MINS.value
+        )
+        logging.info(
+            'Run had throughput target %s and measured throughput %s, with CPU'
+            ' utilization %s.',
+            qps,
+            throughput,
+            cpu_utilization,
+        )
+        if cpu_utilization > CPU_OPTIMIZATION_TARGET.value:
+          logging.info(
+              'CPU utilization is higher than cap %s, stopping test',
+              CPU_OPTIMIZATION_TARGET.value,
+          )
+          if first_run:
+            raise errors.Benchmarks.RunError(
+                f'Initial QPS {qps} already above cpu utilization cap. '
+                'Please lower the starting QPS.'
+            )
+          if cpu_utilization > CPU_OPTIMIZATION_TARGET_MAX.value:
+            raise errors.Benchmarks.RunError(
+                f'CPU utilization measured was {cpu_utilization}, over the '
+                f'{CPU_OPTIMIZATION_TARGET_MAX.value} threshold. '
+                'Decrease step size to avoid overshooting.'
+            )
+          for s in run_samples:
+            s.metadata.update({
+                'ycsb_cpu_optimization': True,
+                'ycsb_cpu_utilization': cpu_utilization,
+                'ycsb_cpu_target': CPU_OPTIMIZATION_TARGET.value,
+                'ycsb_cpu_target_max': CPU_OPTIMIZATION_TARGET_MAX.value,
+                'ycsb_cpu_increment_minutes': (
+                    CPU_OPTIMIZATION_INCREMENT_MINS.value
+                ),
+                'ycsb_cpu_qps_increment': CPU_OPTIMIZATION_QPS_INCREMENT.value,
+            })
+          return run_samples
+
+        # Sleep between steps for some workloads.
+        if _CPU_OPTIMIZATION_SLEEP_MINS.value:
+          logging.info(
+              'Run phase finished, sleeping for %s minutes before starting the '
+              'next run.',
+              _CPU_OPTIMIZATION_SLEEP_MINS.value,
+          )
+          time.sleep(_CPU_OPTIMIZATION_SLEEP_MINS.value * 60)
+
+        qps += CPU_OPTIMIZATION_QPS_INCREMENT.value
+        first_run = False
+
+    results = []
+    for workload in workloads:
+      results += _RunCpuModeSingleWorkload(workload)
+    return results
+
   def LoadAndRun(self, vms, workloads=None, load_kwargs=None, run_kwargs=None):
     """Load data using YCSB, then run each workload/client count combination.
 
diff --git a/perfkitbenchmarker/providers/gcp/gcp_spanner.py b/perfkitbenchmarker/providers/gcp/gcp_spanner.py
index 1f1dffde45..709434fe0c 100644
--- a/perfkitbenchmarker/providers/gcp/gcp_spanner.py
+++ b/perfkitbenchmarker/providers/gcp/gcp_spanner.py
@@ -99,7 +99,7 @@
 CPU_API_DELAY_SECONDS = CPU_API_DELAY_MINUTES * 60
 
 # For more information on QPS expectations, see
-# https://cloud.google.com/spanner/docs/instance-configurations#regional-performance
+# https://cloud.google.com/spanner/docs/performance#typical-workloads
 _READ_OPS_PER_NODE = 10000
 _WRITE_OPS_PER_NODE = 2000
 
@@ -456,16 +456,23 @@ def GetResourceMetadata(self) -> Dict[Any, Any]:
 
   def GetAverageCpuUsage(self, duration_minutes: int) -> float:
     """Gets the average high priority CPU usage through the time duration."""
+    if duration_minutes * 60 <= CPU_API_DELAY_SECONDS:
+      raise ValueError(
+          f'Spanner API has a {CPU_API_DELAY_SECONDS} sec. delay in receiving'
+          ' data, choose a longer duration to get CPU usage.'
+      )
+
     client = monitoring_v3.MetricServiceClient()
     # It takes up to 3 minutes for CPU metrics to appear.
-    end_timestamp = time.time() - CPU_API_DELAY_SECONDS
     cpu_query = query.Query(
         client,
         project=self.project,
         metric_type=(
             'spanner.googleapis.com/instance/cpu/utilization_by_priority'
         ),
-        end_time=datetime.datetime.utcfromtimestamp(end_timestamp),
+        end_time=datetime.datetime.fromtimestamp(
+            time.time(), tz=datetime.timezone.utc
+        ),
         minutes=duration_minutes,
     )
     # Filter by high priority
@@ -484,24 +491,39 @@ def GetAverageCpuUsage(self, duration_minutes: int) -> float:
           'Expected 2 metrics (user and system) for Spanner high-priority CPU '
           f'utilization query, got {len(time_series)}'
       )
-    cpu_aggregated = [
-        user.value.double_value + system.value.double_value
-        for user, system in zip(time_series[0].points, time_series[1].points)
-    ]
+    logging.info('Instance %s utilization by minute:', self.instance_id)
+    cpu_aggregated = []
+    for user, system in zip(time_series[0].points, time_series[1].points):
+      point_utilization = user.value.double_value + system.value.double_value
+      point_time = datetime.datetime.fromtimestamp(
+          user.interval.start_time.seconds
+      )
+      logging.info('%s: %s', point_time, point_utilization)
+      cpu_aggregated.append(point_utilization)
     average_cpu = statistics.mean(cpu_aggregated)
-    logging.info('CPU aggregated: %s', cpu_aggregated)
     logging.info(
-        'Average CPU for the %s minutes ending at %s: %s',
-        duration_minutes,
-        datetime.datetime.fromtimestamp(end_timestamp),
+        'Instance %s average CPU utilization: %s',
+        self.instance_id,
         average_cpu,
     )
     return average_cpu
 
-  def CalculateRecommendedThroughput(
+  def CalculateTheoreticalMaxThroughput(
       self, read_proportion: float, write_proportion: float
   ) -> int:
-    """Returns the recommended throughput based on the workload and nodes."""
+    """Returns the theoretical max throughput based on the workload and nodes.
+
+    This is the published theoretical max throughput of a Spanner node, see
+    https://cloud.google.com/spanner/docs/cpu-utilization#recommended-max for
+    more info.
+
+    Args:
+      read_proportion: the proportion of read requests in the workload.
+      write_proportion: the propoertion of write requests in the workload.
+
+    Returns:
+      The max total QPS taking into account the published single node limits.
+    """
     if read_proportion + write_proportion != 1:
       raise errors.Benchmarks.RunError(
           'Unrecognized workload, read + write proportion must be equal to 1, '
diff --git a/tests/providers/gcp/gcp_spanner_test.py b/tests/providers/gcp/gcp_spanner_test.py
index 2fbb16a427..f5923c3078 100644
--- a/tests/providers/gcp/gcp_spanner_test.py
+++ b/tests/providers/gcp/gcp_spanner_test.py
@@ -176,7 +176,7 @@ def testCalculateStartingThroughput(self, write_proportion, read_proportion,
     test_spanner.nodes = 3
 
     # Act
-    actual_qps = test_spanner.CalculateRecommendedThroughput(
+    actual_qps = test_spanner.CalculateTheoreticalMaxThroughput(
         read_proportion, write_proportion)
 
     # Assert