From 6abe233fd94761de3bd910c300797d162957a195 Mon Sep 17 00:00:00 2001 From: Elangovan Natarajan Date: Wed, 18 Dec 2024 13:34:33 -0800 Subject: [PATCH] Add helper API to remove duplicate code in sysport and neighbor table update benchmarks Summary: Code clean up from previous 2 diffs. The update benchmark reuses much of the code except for the state generator update. Move the rest to helper API to avoid code duplication Differential Revision: D67207590 Privacy Context Container: L1125642 fbshipit-source-id: d0c9b31154817fa4d06d333e03b06ea569faeec5 --- .../benchmarks/FsdbBenchmarkTestHelper.cpp | 14 +++ .../fsdb/benchmarks/FsdbBenchmarkTestHelper.h | 1 + fboss/fsdb/benchmarks/SysPortAndRIFBench.cpp | 90 +++++++------------ 3 files changed, 45 insertions(+), 60 deletions(-) diff --git a/fboss/fsdb/benchmarks/FsdbBenchmarkTestHelper.cpp b/fboss/fsdb/benchmarks/FsdbBenchmarkTestHelper.cpp index fe91dc18e5138..8d479ef017e88 100644 --- a/fboss/fsdb/benchmarks/FsdbBenchmarkTestHelper.cpp +++ b/fboss/fsdb/benchmarks/FsdbBenchmarkTestHelper.cpp @@ -167,6 +167,20 @@ bool FsdbBenchmarkTestHelper::isSubscriptionConnected(int32_t subscriberId) { return subscriptionConnected_.at(subscriberId).load(); } +void FsdbBenchmarkTestHelper::waitForAllSubscribersConnected( + int32_t numSubscribers) { + if (numSubscribers > subscriptionConnected_.size()) { + throw std::runtime_error( + "Waiting on more subscribers than number of subscribers created"); + } + for (int i = 0; i < numSubscribers; i++) { + while (!isSubscriptionConnected(i)) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + return; +} + void FsdbBenchmarkTestHelper::removeSubscription( bool stats, int32_t subscriberID) { diff --git a/fboss/fsdb/benchmarks/FsdbBenchmarkTestHelper.h b/fboss/fsdb/benchmarks/FsdbBenchmarkTestHelper.h index 93b1e80ec3ba6..48372697ce34c 100644 --- a/fboss/fsdb/benchmarks/FsdbBenchmarkTestHelper.h +++ b/fboss/fsdb/benchmarks/FsdbBenchmarkTestHelper.h @@ -33,6 +33,7 @@ class FsdbBenchmarkTestHelper { int32_t subscriberID); void removeSubscription(bool stats = true, int32_t subscriberID = 0); bool isSubscriptionConnected(int32_t subscriberID = 0); + void waitForAllSubscribersConnected(int32_t numSubscribers = 1); private: std::vector getAgentStatsPath(); diff --git a/fboss/fsdb/benchmarks/SysPortAndRIFBench.cpp b/fboss/fsdb/benchmarks/SysPortAndRIFBench.cpp index 6fb74dcdce40b..8d6705bdaa8e5 100644 --- a/fboss/fsdb/benchmarks/SysPortAndRIFBench.cpp +++ b/fboss/fsdb/benchmarks/SysPortAndRIFBench.cpp @@ -21,6 +21,8 @@ DEFINE_int32( namespace facebook::fboss::fsdb::test { +using switchStateUpdateFn = std::function; + BENCHMARK(FsdbPublishSysPortsAndRIFs) { folly::BenchmarkSuspender suspender; @@ -108,9 +110,10 @@ BENCHMARK(FsdbSubscribeSysPortsAndRIFs) { helper.TearDown(); } -BENCHMARK(FsdbUpdateSysPortsAndRIFs) { +void subscriberUpdateHelper(switchStateUpdateFn updateFn, int32_t numUpdates) { const int numPeerDevices = FLAGS_n_cluster_size - 1; // publish to cluster size - 1 devices + folly::Latch subscriptionComplete(numPeerDevices); folly::Latch updateReceived(numPeerDevices); folly::BenchmarkSuspender suspender; @@ -123,9 +126,7 @@ BENCHMARK(FsdbUpdateSysPortsAndRIFs) { helper.startPublisher(false /* stats*/); auto state = std::make_shared(); - StateGenerator::fillSwitchState( - state.get(), FLAGS_n_switchIDs, FLAGS_n_system_ports); - helper.publishStatePatch(*state, 0); + helper.publishStatePatch(*state, 1); helper.waitForPublisherConnected(); std::vector threads; @@ -137,8 +138,11 @@ BENCHMARK(FsdbUpdateSysPortsAndRIFs) { for (const auto& [key, patches] : *patch.patchGroups()) { auto timestampVal = patches[0].metadata()->lastConfirmedAt().value(); - if (timestampVal == 1) { + if (timestampVal == 3) { updateReceived.count_down(); // Decrement update latch counter + } else if (timestampVal == 2) { + subscriptionComplete.count_down(); // Decrement subscription + // latch counter } } }; @@ -146,67 +150,18 @@ BENCHMARK(FsdbUpdateSysPortsAndRIFs) { }); } - // Add new sys ports to the switch state - StateGenerator::updateSysPorts(state.get(), FLAGS_n_sysports_to_add); - // benchmark test: update state and wait for N subscribers to receive it - suspender.dismiss(); - helper.publishStatePatch(*state, 1); - - updateReceived.wait(); // Wait until latch counter is zero - suspender.rehire(); - - for (auto& thread : threads) { - thread.join(); - } - for (int i = 0; i < numPeerDevices; i++) { - helper.removeSubscription(false, i); - } - helper.TearDown(); -} - -BENCHMARK(FsdbUpdateNeighborTable) { - const int numPeerDevices = - FLAGS_n_cluster_size - 1; // publish to cluster size - 1 devices - folly::Latch updateReceived(numPeerDevices); - - folly::BenchmarkSuspender suspender; - - // setup FsdbTestServer - FsdbBenchmarkTestHelper helper; - helper.setup(numPeerDevices); - - // start publisher and publish initial empty stats - helper.startPublisher(false /* stats*/); - - auto state = std::make_shared(); StateGenerator::fillSwitchState( state.get(), FLAGS_n_switchIDs, FLAGS_n_system_ports); - helper.publishStatePatch(*state, 0); - helper.waitForPublisherConnected(); + helper.publishStatePatch(*state, 2); + // wait for all subscribers to be connected and received initial sync + subscriptionComplete.wait(); // Wait until subscription latch counter is zero - std::vector threads; - // Spawn numPeerDevices subscribers - for (int i = 0; i < numPeerDevices; i++) { - threads.emplace_back([&, i] { - fsdb::FsdbPatchSubscriber::FsdbOperPatchUpdateCb subscriptionCb = - [&](SubscriberChunk&& patch) { - for (const auto& [key, patches] : *patch.patchGroups()) { - auto timestampVal = - patches[0].metadata()->lastConfirmedAt().value(); - if (timestampVal == 1) { - updateReceived.count_down(); // Decrement update latch counter - } - } - }; - helper.addStatePatchSubscription(subscriptionCb, i); - }); - } + // Add new sys ports to the switch state + updateFn(state.get(), numUpdates); - // Add new sys ports to the switch state - StateGenerator::updateNeighborTables(state.get(), FLAGS_n_neighbor_tables); // benchmark test: update state and wait for N subscribers to receive it suspender.dismiss(); - helper.publishStatePatch(*state, 1); + helper.publishStatePatch(*state, 3); updateReceived.wait(); // Wait until latch counter is zero suspender.rehire(); @@ -219,4 +174,19 @@ BENCHMARK(FsdbUpdateNeighborTable) { } helper.TearDown(); } + +BENCHMARK(FsdbUpdateSysPortsAndRIFs) { + auto sysPortUpdateFn = [](state::SwitchState* state, int32_t numUpdates) { + StateGenerator::updateSysPorts(state, numUpdates); + }; + subscriberUpdateHelper(sysPortUpdateFn, FLAGS_n_sysports_to_add); +} + +BENCHMARK(FsdbUpdateNeighborTable) { + auto neighborTableUpdateFn = [](state::SwitchState* state, + int32_t numUpdates) { + StateGenerator::updateNeighborTables(state, numUpdates); + }; + subscriberUpdateHelper(neighborTableUpdateFn, FLAGS_n_neighbor_tables); +} } // namespace facebook::fboss::fsdb::test