Skip to content

Commit

Permalink
Add load stage optimization to Bigtable YCSB benchmark.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 564810942
  • Loading branch information
bvliu authored and copybara-github committed Sep 12, 2023
1 parent 119f54a commit f78b508
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,19 +357,6 @@ def _GetYcsbExecutor(
return ycsb.YCSBExecutor(FLAGS.hbase_binding, **executor_flags)


def _LoadDatabase(executor: ycsb.YCSBExecutor,
bigtable: gcp_bigtable.GcpBigtableInstance,
vms: list[virtual_machine.VirtualMachine],
load_kwargs: dict[str, Any]) -> list[sample.Sample]:
"""Loads the database with the specified infrastructure capacity."""
if bigtable.restored or ycsb.SKIP_LOAD_STAGE.value:
return []
bigtable.UpdateCapacityForLoad()
results = list(executor.Load(vms, load_kwargs=load_kwargs))
bigtable.UpdateCapacityForRun()
return results


def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> List[sample.Sample]:
"""Spawn YCSB and gather the results.
Expand All @@ -391,7 +378,8 @@ def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> List[sample.Sample]:
load_kwargs = _GenerateLoadKwargs(instance)
run_kwargs = _GenerateRunKwargs(instance)
executor: ycsb.YCSBExecutor = _GetYcsbExecutor(vms)
samples += _LoadDatabase(executor, instance, vms, load_kwargs)
if not instance.restored:
samples += list(executor.Load(vms, load_kwargs=load_kwargs))
samples += list(executor.Run(vms, run_kwargs=run_kwargs))

# Optionally add new samples for cluster cpu utilization.
Expand Down
30 changes: 0 additions & 30 deletions perfkitbenchmarker/providers/gcp/gcp_bigtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import json
import logging
import time
from typing import Any, Dict, List, Optional

from absl import flags
Expand Down Expand Up @@ -53,13 +52,6 @@
'Ignored if --bigtable_autoscaling_min_nodes is set.'
),
)
_LOAD_NODES = flags.DEFINE_integer(
'bigtable_load_node_count',
None,
'The number of nodes for the Bigtable instance to use for the load'
' phase. Assumes that the benchmark calls UpdateRunCapacity to set the '
' correct node count manually before the run phase.',
)
_AUTOSCALING_MIN_NODES = flags.DEFINE_integer(
'bigtable_autoscaling_min_nodes', None,
'Minimum number of nodes for autoscaling.')
Expand Down Expand Up @@ -97,7 +89,6 @@ class BigtableSpec(non_relational_db.BaseNonRelationalDbSpec):
zone: str
project: str
node_count: int
load_node_count: int
storage_type: str
replication_cluster: bool
replication_cluster_zone: str
Expand All @@ -119,7 +110,6 @@ def _GetOptionDecoderConstructions(cls):
'zone': (option_decoders.StringDecoder, none_ok),
'project': (option_decoders.StringDecoder, none_ok),
'node_count': (option_decoders.IntDecoder, none_ok),
'load_node_count': (option_decoders.IntDecoder, none_ok),
'storage_type': (option_decoders.StringDecoder, none_ok),
'replication_cluster': (option_decoders.BooleanDecoder, none_ok),
'replication_cluster_zone': (option_decoders.StringDecoder, none_ok),
Expand Down Expand Up @@ -157,7 +147,6 @@ def _ApplyFlags(cls, config_values, flag_values) -> None:
'google_bigtable_zone': 'zone',
'bigtable_storage_type': 'storage_type',
'bigtable_node_count': 'node_count',
'bigtable_load_node_count': 'load_node_count',
'bigtable_replication_cluster': 'replication_cluster',
'bigtable_replication_cluster_zone': 'replication_cluster_zone',
'bigtable_multicluster_routing': 'multicluster_routing',
Expand Down Expand Up @@ -206,7 +195,6 @@ def __init__(self,
project: Optional[str],
zone: Optional[str],
node_count: Optional[int],
load_node_count: Optional[int],
storage_type: Optional[str],
replication_cluster: Optional[bool],
replication_cluster_zone: Optional[str],
Expand All @@ -222,7 +210,6 @@ def __init__(self,
self.zone: str = zone or FLAGS.google_bigtable_zone
self.project: str = project or FLAGS.project or util.GetDefaultProject()
self.node_count: int = node_count or _DEFAULT_NODE_COUNT
self._load_node_count = load_node_count or self.node_count
self.storage_type: str = storage_type or _DEFAULT_STORAGE_TYPE
self.replication_cluster: bool = replication_cluster or False
self.replication_cluster_zone: Optional[str] = (
Expand All @@ -240,7 +227,6 @@ def FromSpec(cls, spec: BigtableSpec) -> 'GcpBigtableInstance':
zone=spec.zone,
project=spec.project,
node_count=spec.node_count,
load_node_count=spec.load_node_count,
storage_type=spec.storage_type,
replication_cluster=spec.replication_cluster,
replication_cluster_zone=spec.replication_cluster_zone,
Expand Down Expand Up @@ -401,29 +387,13 @@ def _GetClusters(self) -> List[Dict[str, Any]]:
return json.loads(stdout)

def _UpdateNodes(self, nodes: int) -> None:
"""Updates clusters to the specified node count and waits until ready."""
clusters = self._GetClusters()
for i in range(len(clusters)):
# Do nothing if the node count is already equal to what we want.
if clusters[i]['serveNodes'] == nodes:
continue
cmd = _GetBigtableGcloudCommand(self, 'bigtable', 'clusters', 'update',
f'{self.name}-{i}')
cmd.flags['instance'] = self.name
cmd.flags['num-nodes'] = nodes or self.node_count
cmd.Issue()
# Note that Exists is implemented like IsReady, but should likely be
# refactored.
while not self._Exists():
time.sleep(10)

def UpdateCapacityForLoad(self) -> None:
"""See base class."""
self._UpdateNodes(self._load_node_count)

def UpdateCapacityForRun(self) -> None:
"""See base class."""
self._UpdateNodes(self.node_count)

def _Freeze(self) -> None:
self._UpdateNodes(_FROZEN_NODE_COUNT)
Expand Down
94 changes: 0 additions & 94 deletions tests/linux_benchmarks/cloud_bigtable_ycsb_benchmark_test.py

This file was deleted.

66 changes: 1 addition & 65 deletions tests/providers/gcp/gcp_bigtable_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
from absl import flags
from absl.testing import flagsaver
import mock

from perfkitbenchmarker import errors
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.providers.gcp import gcp_bigtable
from perfkitbenchmarker.providers.gcp import util
from tests import matchers
from tests import pkb_common_test_case
import requests

Expand All @@ -40,23 +39,6 @@
}}
"""

_TEST_LIST_CLUSTERS_OUTPUT = [
{
'defaultStorageType': 'SSD',
'location': 'projects/pkb-test/locations/us-west1-a',
'name': 'projects/pkb-test/instances/pkb-bigtable-730b1e6b/clusters/pkb-bigtable-730b1e6b-1',
'serveNodes': 3,
'state': 'READY',
},
{
'defaultStorageType': 'SSD',
'location': 'projects/pkb-test/locations/us-west1-c',
'name': 'projects/pkb-test/instances/pkb-bigtable-730b1e6b/clusters/pkb-bigtable-730b1e6b-0',
'serveNodes': 3,
'state': 'READY',
},
]


OUT_OF_QUOTA_STDERR = """
ERROR: (gcloud.beta.bigtable.instances.create) Operation successfully rolled
Expand Down Expand Up @@ -300,52 +282,6 @@ def testBigtableGcloudCommand(self):
self.assertEqual(cmd.flags['project'], PROJECT)
self.assertEmpty(cmd.flags['zone'])

def testSetNodes(self):
test_instance = GetTestBigtableInstance()
self.enter_context(
mock.patch.object(
test_instance,
'_GetClusters',
return_value=_TEST_LIST_CLUSTERS_OUTPUT,
)
)
self.enter_context(
mock.patch.object(test_instance, '_Exists', return_value=True)
)
cmd = self.enter_context(
mock.patch.object(vm_util, 'IssueCommand', return_value=[None, None, 0])
)

test_instance._UpdateNodes(6)

self.assertSequenceEqual(
[
mock.call(matchers.HASALLOF('--num-nodes', '6')),
mock.call(matchers.HASALLOF('--num-nodes', '6')),
],
cmd.mock_calls,
)

def testSetNodesSkipsIfCountAlreadyCorrect(self):
test_instance = GetTestBigtableInstance()
self.enter_context(
mock.patch.object(
test_instance,
'_GetClusters',
return_value=_TEST_LIST_CLUSTERS_OUTPUT,
)
)
self.enter_context(
mock.patch.object(test_instance, '_Exists', return_value=True)
)
cmd = self.enter_context(
mock.patch.object(vm_util, 'IssueCommand', return_value=[None, None, 0])
)

test_instance._UpdateNodes(3)

cmd.assert_not_called()


if __name__ == '__main__':
unittest.main()

0 comments on commit f78b508

Please sign in to comment.