diff --git a/fboss/fsdb/client/FsdbPubSubManager.cpp b/fboss/fsdb/client/FsdbPubSubManager.cpp index 95cc934b9dfee..992b1038a7e74 100644 --- a/fboss/fsdb/client/FsdbPubSubManager.cpp +++ b/fboss/fsdb/client/FsdbPubSubManager.cpp @@ -49,6 +49,21 @@ std::string toSubscriptionStr( ":/", PathHelpers::toString(paths)); } + +std::string toSubscriptionStr( + const std::string& fsdbHost, + const std::map& path, + SubscriptionType subscribeType, + bool subscribeStats) { + return folly::to( + fsdbHost, + ":/", + subscriptionTypeToStr[subscribeType], + ":/", + (subscribeStats ? kStats : kState), + ":/", + PathHelpers::toString(path)); +} } // namespace namespace facebook::fboss::fsdb { @@ -119,7 +134,7 @@ std::unique_ptr FsdbPubSubManager::createPublisherImpl( int32_t fsdbPort) const { auto publisherExists = publishStats ? (statDeltaPublisher_ || statPathPublisher_) - : (stateDeltaPublisher_ || statePathPublisher_); + : (stateDeltaPublisher_ || statePathPublisher_ || statePatchPublisher_); if (publisherExists) { throw std::runtime_error( @@ -361,6 +376,19 @@ void FsdbPubSubManager::addStateDeltaSubscription( std::move(serverOptions)); } +void FsdbPubSubManager::addStatePatchSubscription( + const PatchPath& subscribePath, + SubscriptionStateChangeCb stateChangeCb, + FsdbPatchSubscriber::FsdbOperPatchUpdateCb patchCb, + FsdbStreamClient::ServerOptions&& serverOptions) { + addSubscriptionImpl( + subscribePath, + stateChangeCb, + patchCb, + false /*subscribeStat*/, + std::move(serverOptions)); +} + void FsdbPubSubManager::addStatePathSubscription( const Path& subscribePath, SubscriptionStateChangeCb stateChangeCb, @@ -409,8 +437,6 @@ void FsdbPubSubManager::addStatePathSubscription( SubscriptionStateChangeCb stateChangeCb, FsdbExtStateSubscriber::FsdbOperStateUpdateCb operStateCb, FsdbStreamClient::ServerOptions&& serverOptions) { - XLOG(INFO) << "addStatePathSubscription: " - << typeid(FsdbExtStateSubscriber).name(); addSubscriptionImpl( std::move(subscriptionOptions), PathHelpers::toExtendedOperPath(subscribePaths), @@ -471,6 +497,48 @@ void FsdbPubSubManager::addStatExtDeltaSubscription( std::move(serverOptions)); } +template +void FsdbPubSubManager::addSubscriptionImpl( + const std::map& subscribePath, + SubscriptionStateChangeCb stateChangeCb, + typename SubscriberT::FsdbSubUnitUpdateCb subUnitAvailableCb, + bool subscribeStats, + FsdbStreamClient::ServerOptions&& serverOptions, + const std::optional& clientIdSuffix) { + auto subscribeType = SubscriberT::subscriptionType(); + XCHECK(subscribeType != SubscriptionType::UNKNOWN) << "Unknown data type"; + auto subsStr = toSubscriptionStr( + serverOptions.dstAddr.getAddressStr(), + subscribePath, + subscribeType, + subscribeStats); + auto& path2Subscriber = + subscribeStats ? statPath2Subscriber_ : statePath2Subscriber_; + auto path2SubscriberW = path2Subscriber.wlock(); + + auto clientStr = folly::to(clientId_); + if (clientIdSuffix.has_value()) { + clientStr.append(folly::to("_", clientIdSuffix.value())); + } + + auto [itr, inserted] = path2SubscriberW->emplace(std::make_pair( + subsStr, + std::make_unique( + clientStr, + subscribePath, + subscriberEvb_, + reconnectEvb_, + subUnitAvailableCb, + subscribeStats, + stateChangeCb))); + if (!inserted) { + throw std::runtime_error( + "Subscription at : " + subsStr + " already exists"); + } + XLOG(DBG2) << "Added subscription for: " << subsStr; + itr->second->setServerOptions(std::move(serverOptions)); +} + template void FsdbPubSubManager::addSubscriptionImpl( const std::vector& subscribePath, @@ -577,6 +645,15 @@ void FsdbPubSubManager::removeStateDeltaSubscription( SubscriptionType::DELTA, false /*subscribeStats*/); } +void FsdbPubSubManager::removeStatePatchSubscription( + const Path& subscribePath, + const std::string& fsdbHost) { + removeSubscriptionImpl( + subscribePath, + fsdbHost, + SubscriptionType::PATCH, + false /*subscribeStats*/); +} void FsdbPubSubManager::removeStatePathSubscription( const Path& subscribePath, const std::string& fsdbHost) { diff --git a/fboss/fsdb/client/FsdbPubSubManager.h b/fboss/fsdb/client/FsdbPubSubManager.h index f398fa8f91d68..e0fd61e12a816 100644 --- a/fboss/fsdb/client/FsdbPubSubManager.h +++ b/fboss/fsdb/client/FsdbPubSubManager.h @@ -30,6 +30,7 @@ class FsdbPubSubManager { ~FsdbPubSubManager(); using Path = std::vector; + using PatchPath = std::map; using MultiPath = std::vector; /* Publisher create APIs */ @@ -127,6 +128,11 @@ class FsdbPubSubManager { FsdbDeltaSubscriber::FsdbOperDeltaUpdateCb operDeltaCb, FsdbStreamClient::ServerOptions&& serverOptions = kDefaultServerOptions()); + void addStatePatchSubscription( + const PatchPath& subscribePath, + SubscriptionStateChangeCb stateChangeCb, + FsdbPatchSubscriber::FsdbOperPatchUpdateCb patchCb, + FsdbStreamClient::ServerOptions&& serverOptions); void addStatePathSubscription( SubscriptionOptions&& subscriptionOptions, const Path& subscribePath, @@ -164,6 +170,9 @@ class FsdbPubSubManager { void removeStateDeltaSubscription( const Path& subscribePath, const std::string& fsdbHost = "::1"); + void removeStatePatchSubscription( + const Path& subscribePath, + const std::string& fsdbHost); void removeStatePathSubscription( const Path& subscribePath, const std::string& fsdbHost = "::1"); @@ -177,6 +186,9 @@ class FsdbPubSubManager { void removeStateDeltaSubscription( const MultiPath& subscribePath, const std::string& fsdbHost = "::1"); + void removeStatePatchSubscription( + const MultiPath& subscribePath, + const std::string& fsdbHost = "::1"); void removeStatePathSubscription( const MultiPath& subscribePath, const std::string& fsdbHost = "::1"); @@ -250,6 +262,7 @@ class FsdbPubSubManager { const std::string& fsdbHost, SubscriptionType subscribeType, bool subscribeStats); + template void addSubscriptionImpl( const std::vector& subscribePath, @@ -259,6 +272,14 @@ class FsdbPubSubManager { FsdbStreamClient::ServerOptions&& serverOptions, const std::optional& clientIdSuffix = std::nullopt); template + void addSubscriptionImpl( + const std::map& subscribePath, + SubscriptionStateChangeCb stateChangeCb, + typename SubscriberT::FsdbSubUnitUpdateCb subUnitAvailableCb, + bool subscribeStats, + FsdbStreamClient::ServerOptions&& serverOptions, + const std::optional& clientIdSuffix = std::nullopt); + template void addSubscriptionImpl( SubscriptionOptions&& subscriptionOptions, const std::vector& subscribePath, diff --git a/fboss/fsdb/common/PathHelpers.cpp b/fboss/fsdb/common/PathHelpers.cpp index 76753e76c045c..5992019ecafe6 100644 --- a/fboss/fsdb/common/PathHelpers.cpp +++ b/fboss/fsdb/common/PathHelpers.cpp @@ -84,5 +84,21 @@ std::vector PathHelpers::toExtendedOperPath( } return extPaths; } +std::map toMappedExtendedOperPath( + const std::vector>& paths) { + std::map result; + for (size_t i = 0; i < paths.size(); i++) { + auto path = paths[i]; + ExtendedOperPath extPath; + extPath.path()->reserve(path.size()); + for (const auto& pathElm : path) { + OperPathElem operPathElm; + operPathElm.raw_ref() = pathElm; + extPath.path()->push_back(std::move(operPathElm)); + } + result[i] = std::move(extPath); + } + return result; +} } // namespace facebook::fboss::fsdb diff --git a/fboss/fsdb/common/PathHelpers.h b/fboss/fsdb/common/PathHelpers.h index fdbe7dfd0630c..1e9d00369a065 100644 --- a/fboss/fsdb/common/PathHelpers.h +++ b/fboss/fsdb/common/PathHelpers.h @@ -27,6 +27,8 @@ class PathHelpers { static std::vector toExtendedOperPath( const std::vector>& paths); + static std::map toMappedExtendedOperPath( + const std::vector>& paths); }; } // namespace facebook::fboss::fsdb