Skip to content

Commit

Permalink
Add helper API to remove duplicate code in sysport and neighbor table…
Browse files Browse the repository at this point in the history
… update benchmarks

Summary: Code clean up from previous 2 diffs. The update benchmark reuses much of the code except for the state generator update. Move the rest to helper API to avoid code duplication

Differential Revision:
D67207590

Privacy Context Container: L1125642

fbshipit-source-id: d0c9b31154817fa4d06d333e03b06ea569faeec5
  • Loading branch information
elangovan911-meta authored and facebook-github-bot committed Dec 18, 2024
1 parent f8f817a commit 6abe233
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 60 deletions.
14 changes: 14 additions & 0 deletions fboss/fsdb/benchmarks/FsdbBenchmarkTestHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@ bool FsdbBenchmarkTestHelper::isSubscriptionConnected(int32_t subscriberId) {
return subscriptionConnected_.at(subscriberId).load();
}

void FsdbBenchmarkTestHelper::waitForAllSubscribersConnected(
int32_t numSubscribers) {
if (numSubscribers > subscriptionConnected_.size()) {
throw std::runtime_error(
"Waiting on more subscribers than number of subscribers created");
}
for (int i = 0; i < numSubscribers; i++) {
while (!isSubscriptionConnected(i)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
return;
}

void FsdbBenchmarkTestHelper::removeSubscription(
bool stats,
int32_t subscriberID) {
Expand Down
1 change: 1 addition & 0 deletions fboss/fsdb/benchmarks/FsdbBenchmarkTestHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class FsdbBenchmarkTestHelper {
int32_t subscriberID);
void removeSubscription(bool stats = true, int32_t subscriberID = 0);
bool isSubscriptionConnected(int32_t subscriberID = 0);
void waitForAllSubscribersConnected(int32_t numSubscribers = 1);

private:
std::vector<std::string> getAgentStatsPath();
Expand Down
90 changes: 30 additions & 60 deletions fboss/fsdb/benchmarks/SysPortAndRIFBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ DEFINE_int32(

namespace facebook::fboss::fsdb::test {

using switchStateUpdateFn = std::function<void(state::SwitchState*, int32_t)>;

BENCHMARK(FsdbPublishSysPortsAndRIFs) {
folly::BenchmarkSuspender suspender;

Expand Down Expand Up @@ -108,9 +110,10 @@ BENCHMARK(FsdbSubscribeSysPortsAndRIFs) {
helper.TearDown();
}

BENCHMARK(FsdbUpdateSysPortsAndRIFs) {
void subscriberUpdateHelper(switchStateUpdateFn updateFn, int32_t numUpdates) {
const int numPeerDevices =
FLAGS_n_cluster_size - 1; // publish to cluster size - 1 devices
folly::Latch subscriptionComplete(numPeerDevices);
folly::Latch updateReceived(numPeerDevices);

folly::BenchmarkSuspender suspender;
Expand All @@ -123,9 +126,7 @@ BENCHMARK(FsdbUpdateSysPortsAndRIFs) {
helper.startPublisher(false /* stats*/);

auto state = std::make_shared<state::SwitchState>();
StateGenerator::fillSwitchState(
state.get(), FLAGS_n_switchIDs, FLAGS_n_system_ports);
helper.publishStatePatch(*state, 0);
helper.publishStatePatch(*state, 1);
helper.waitForPublisherConnected();

std::vector<std::thread> threads;
Expand All @@ -137,76 +138,30 @@ BENCHMARK(FsdbUpdateSysPortsAndRIFs) {
for (const auto& [key, patches] : *patch.patchGroups()) {
auto timestampVal =
patches[0].metadata()->lastConfirmedAt().value();
if (timestampVal == 1) {
if (timestampVal == 3) {
updateReceived.count_down(); // Decrement update latch counter
} else if (timestampVal == 2) {
subscriptionComplete.count_down(); // Decrement subscription
// latch counter
}
}
};
helper.addStatePatchSubscription(subscriptionCb, i);
});
}

// Add new sys ports to the switch state
StateGenerator::updateSysPorts(state.get(), FLAGS_n_sysports_to_add);
// benchmark test: update state and wait for N subscribers to receive it
suspender.dismiss();
helper.publishStatePatch(*state, 1);

updateReceived.wait(); // Wait until latch counter is zero
suspender.rehire();

for (auto& thread : threads) {
thread.join();
}
for (int i = 0; i < numPeerDevices; i++) {
helper.removeSubscription(false, i);
}
helper.TearDown();
}

BENCHMARK(FsdbUpdateNeighborTable) {
const int numPeerDevices =
FLAGS_n_cluster_size - 1; // publish to cluster size - 1 devices
folly::Latch updateReceived(numPeerDevices);

folly::BenchmarkSuspender suspender;

// setup FsdbTestServer
FsdbBenchmarkTestHelper helper;
helper.setup(numPeerDevices);

// start publisher and publish initial empty stats
helper.startPublisher(false /* stats*/);

auto state = std::make_shared<state::SwitchState>();
StateGenerator::fillSwitchState(
state.get(), FLAGS_n_switchIDs, FLAGS_n_system_ports);
helper.publishStatePatch(*state, 0);
helper.waitForPublisherConnected();
helper.publishStatePatch(*state, 2);
// wait for all subscribers to be connected and received initial sync
subscriptionComplete.wait(); // Wait until subscription latch counter is zero

std::vector<std::thread> threads;
// Spawn numPeerDevices subscribers
for (int i = 0; i < numPeerDevices; i++) {
threads.emplace_back([&, i] {
fsdb::FsdbPatchSubscriber::FsdbOperPatchUpdateCb subscriptionCb =
[&](SubscriberChunk&& patch) {
for (const auto& [key, patches] : *patch.patchGroups()) {
auto timestampVal =
patches[0].metadata()->lastConfirmedAt().value();
if (timestampVal == 1) {
updateReceived.count_down(); // Decrement update latch counter
}
}
};
helper.addStatePatchSubscription(subscriptionCb, i);
});
}
// Add new sys ports to the switch state
updateFn(state.get(), numUpdates);

// Add new sys ports to the switch state
StateGenerator::updateNeighborTables(state.get(), FLAGS_n_neighbor_tables);
// benchmark test: update state and wait for N subscribers to receive it
suspender.dismiss();
helper.publishStatePatch(*state, 1);
helper.publishStatePatch(*state, 3);

updateReceived.wait(); // Wait until latch counter is zero
suspender.rehire();
Expand All @@ -219,4 +174,19 @@ BENCHMARK(FsdbUpdateNeighborTable) {
}
helper.TearDown();
}

BENCHMARK(FsdbUpdateSysPortsAndRIFs) {
auto sysPortUpdateFn = [](state::SwitchState* state, int32_t numUpdates) {
StateGenerator::updateSysPorts(state, numUpdates);
};
subscriberUpdateHelper(sysPortUpdateFn, FLAGS_n_sysports_to_add);
}

BENCHMARK(FsdbUpdateNeighborTable) {
auto neighborTableUpdateFn = [](state::SwitchState* state,
int32_t numUpdates) {
StateGenerator::updateNeighborTables(state, numUpdates);
};
subscriberUpdateHelper(neighborTableUpdateFn, FLAGS_n_neighbor_tables);
}
} // namespace facebook::fboss::fsdb::test

0 comments on commit 6abe233

Please sign in to comment.