From 3c27041c0b9946eda2e3a60b6c90696ff65fcecf Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Fri, 8 Sep 2023 18:58:59 -0700 Subject: [PATCH] Add load stage optimization to Spanner YCSB. PiperOrigin-RevId: 563905682 --- .../cloud_spanner_ycsb_benchmark.py | 18 +++- .../providers/gcp/gcp_spanner.py | 2 +- .../cloud_spanner_ycsb_benchmark_test.py | 88 +++++++++++++++++++ tests/providers/gcp/gcp_spanner_test.py | 16 ++++ 4 files changed, 121 insertions(+), 3 deletions(-) create mode 100644 tests/linux_benchmarks/cloud_spanner_ycsb_benchmark_test.py diff --git a/perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py b/perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py index 1ce63a118b..ee0ff65871 100644 --- a/perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/cloud_spanner_ycsb_benchmark.py @@ -197,6 +197,19 @@ def _GetCpuOptimizationMetadata() -> Dict[str, Any]: } +def _LoadDatabase(executor: ycsb.YCSBExecutor, + spanner: gcp_spanner.GcpSpannerInstance, + vms: list[virtual_machine.VirtualMachine], + load_kwargs: dict[str, Any]) -> list[sample.Sample]: + """Loads the database with the specified infrastructure capacity.""" + if spanner.restored or ycsb.SKIP_LOAD_STAGE.value: + return [] + spanner.UpdateCapacityForLoad() + results = list(executor.Load(vms, load_kwargs=load_kwargs)) + spanner.UpdateCapacityForRun() + return results + + def Run(benchmark_spec): """Spawn YCSB and gather the results. @@ -228,8 +241,9 @@ def Run(benchmark_spec): load_kwargs = run_kwargs.copy() samples = [] metadata = {'ycsb_client_type': 'java'} - if not spanner.restored: - samples += list(benchmark_spec.executor.Load(vms, load_kwargs=load_kwargs)) + + samples += _LoadDatabase(benchmark_spec.executor, spanner, vms, load_kwargs) + if _CPU_OPTIMIZATION.value: samples += CpuUtilizationRun(benchmark_spec.executor, spanner, vms, run_kwargs) diff --git a/perfkitbenchmarker/providers/gcp/gcp_spanner.py b/perfkitbenchmarker/providers/gcp/gcp_spanner.py index fd4807e921..1f1dffde45 100644 --- a/perfkitbenchmarker/providers/gcp/gcp_spanner.py +++ b/perfkitbenchmarker/providers/gcp/gcp_spanner.py @@ -451,7 +451,7 @@ def GetResourceMetadata(self) -> Dict[Any, Any]: 'gcp_spanner_node_count': self.nodes, 'gcp_spanner_config': self._config, 'gcp_spanner_endpoint': self.GetApiEndPoint(), - 'gcp_spanner_load_node_cont': self._load_nodes, + 'gcp_spanner_load_node_count': self._load_nodes, } def GetAverageCpuUsage(self, duration_minutes: int) -> float: diff --git a/tests/linux_benchmarks/cloud_spanner_ycsb_benchmark_test.py b/tests/linux_benchmarks/cloud_spanner_ycsb_benchmark_test.py new file mode 100644 index 0000000000..1ab10d6624 --- /dev/null +++ b/tests/linux_benchmarks/cloud_spanner_ycsb_benchmark_test.py @@ -0,0 +1,88 @@ +# Copyright 2023 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for cloud_spanner_ycsb_benchmark.""" +import textwrap +import unittest +from absl import flags +from absl.testing import flagsaver +import mock +from perfkitbenchmarker.linux_benchmarks import cloud_spanner_ycsb_benchmark +from perfkitbenchmarker.linux_packages import ycsb +from tests import pkb_common_test_case + +FLAGS = flags.FLAGS + + +class CloudSpannerYcsbBenchmarkTest(pkb_common_test_case.PkbCommonTestCase): + + def _CreateMockSpec(self, node_count, load_node_count): + mock_spec = textwrap.dedent(f""" + cloud_spanner_ycsb: + relational_db: + engine: spanner-googlesql + spanner_nodes: {node_count} + spanner_load_nodes: {load_node_count} + """) + self.mock_bm_spec = pkb_common_test_case.CreateBenchmarkSpecFromYaml( + mock_spec, 'cloud_spanner_ycsb' + ) + self.mock_bm_spec.ConstructRelationalDb() + self.mock_set_nodes = self.enter_context( + mock.patch.object(self.mock_bm_spec.relational_db, '_SetNodes') + ) + + def setUp(self): + super().setUp() + FLAGS.run_uri = 'test_uri' + self.executor = ycsb.YCSBExecutor('cloudspanner') + self.mock_load = self.enter_context( + mock.patch.object(self.executor, 'Load') + ) + + def testLoadDatabaseIncreasedCapacity(self): + self._CreateMockSpec(3, 6) + + cloud_spanner_ycsb_benchmark._LoadDatabase( + self.executor, self.mock_bm_spec.relational_db, [], {} + ) + + self.mock_set_nodes.assert_has_calls([mock.call(6), mock.call(3)]) + self.mock_load.assert_called_once() + + def testLoadDatabaseNotCalledRestored(self): + self._CreateMockSpec(3, 6) + self.mock_bm_spec.relational_db.restored = True + + cloud_spanner_ycsb_benchmark._LoadDatabase( + self.executor, self.mock_bm_spec.relational_db, [], {} + ) + + self.mock_set_nodes.assert_not_called() + self.mock_load.assert_not_called() + + @flagsaver.flagsaver(ycsb_skip_load_stage=True) + def testLoadDatabaseNotCalledSkipFlag(self): + self._CreateMockSpec(3, 6) + self.mock_bm_spec.relational_db.restored = True + + cloud_spanner_ycsb_benchmark._LoadDatabase( + self.executor, self.mock_bm_spec.relational_db, [], {} + ) + + self.mock_set_nodes.assert_not_called() + self.mock_load.assert_not_called() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/providers/gcp/gcp_spanner_test.py b/tests/providers/gcp/gcp_spanner_test.py index da44fc39a5..2fbb16a427 100644 --- a/tests/providers/gcp/gcp_spanner_test.py +++ b/tests/providers/gcp/gcp_spanner_test.py @@ -64,6 +64,22 @@ def testSetNodes(self): self.assertIn('--nodes 3', ' '.join(cmd.call_args[0][0])) + def testSetNodesSkipsIfCountAlreadyCorrect(self): + test_instance = GetTestSpannerInstance() + self.enter_context( + mock.patch.object(test_instance, '_GetNodes', return_value=1) + ) + self.enter_context( + mock.patch.object(test_instance, '_WaitUntilInstanceReady') + ) + cmd = self.enter_context( + mock.patch.object(vm_util, 'IssueCommand', return_value=[None, None, 0]) + ) + + test_instance._SetNodes(1) + + cmd.assert_not_called() + def testFreezeUsesCorrectNodeCount(self): instance = GetTestSpannerInstance() mock_set_nodes = self.enter_context(