Skip to content

Commit

Permalink
adopt patch api between FSDB <-> SA gated by gflag
Browse files Browse the repository at this point in the history
Summary: as title, adopt the new patch api, and gated with gflag to roll out

Reviewed By: Linerd

Differential Revision:
D63923107

Privacy Context Container: L1210319

fbshipit-source-id: afec5551d3f8c2a727bbbce770415064624b6e8f
  • Loading branch information
Wei-Cheng Lin authored and facebook-github-bot committed Oct 10, 2024
1 parent f55d78d commit 36df6f5
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 3 deletions.
83 changes: 80 additions & 3 deletions fboss/fsdb/client/FsdbPubSubManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ std::string toSubscriptionStr(
":/",
PathHelpers::toString(paths));
}

std::string toSubscriptionStr(
const std::string& fsdbHost,
const std::map<SubscriptionKey, RawOperPath>& path,
SubscriptionType subscribeType,
bool subscribeStats) {
return folly::to<std::string>(
fsdbHost,
":/",
subscriptionTypeToStr[subscribeType],
":/",
(subscribeStats ? kStats : kState),
":/",
PathHelpers::toString(path));
}
} // namespace
namespace facebook::fboss::fsdb {

Expand Down Expand Up @@ -119,7 +134,7 @@ std::unique_ptr<PublisherT> FsdbPubSubManager::createPublisherImpl(
int32_t fsdbPort) const {
auto publisherExists = publishStats
? (statDeltaPublisher_ || statPathPublisher_)
: (stateDeltaPublisher_ || statePathPublisher_);
: (stateDeltaPublisher_ || statePathPublisher_ || statePatchPublisher_);

if (publisherExists) {
throw std::runtime_error(
Expand Down Expand Up @@ -361,6 +376,19 @@ void FsdbPubSubManager::addStateDeltaSubscription(
std::move(serverOptions));
}

void FsdbPubSubManager::addStatePatchSubscription(
const PatchPath& subscribePath,
SubscriptionStateChangeCb stateChangeCb,
FsdbPatchSubscriber::FsdbOperPatchUpdateCb patchCb,
FsdbStreamClient::ServerOptions&& serverOptions) {
addSubscriptionImpl<FsdbPatchSubscriber>(
subscribePath,
stateChangeCb,
patchCb,
false /*subscribeStat*/,
std::move(serverOptions));
}

void FsdbPubSubManager::addStatePathSubscription(
const Path& subscribePath,
SubscriptionStateChangeCb stateChangeCb,
Expand Down Expand Up @@ -409,8 +437,6 @@ void FsdbPubSubManager::addStatePathSubscription(
SubscriptionStateChangeCb stateChangeCb,
FsdbExtStateSubscriber::FsdbOperStateUpdateCb operStateCb,
FsdbStreamClient::ServerOptions&& serverOptions) {
XLOG(INFO) << "addStatePathSubscription: "
<< typeid(FsdbExtStateSubscriber).name();
addSubscriptionImpl<FsdbExtStateSubscriber>(
std::move(subscriptionOptions),
PathHelpers::toExtendedOperPath(subscribePaths),
Expand Down Expand Up @@ -471,6 +497,48 @@ void FsdbPubSubManager::addStatExtDeltaSubscription(
std::move(serverOptions));
}

template <typename SubscriberT, typename PathElement>
void FsdbPubSubManager::addSubscriptionImpl(
const std::map<SubscriptionKey, PathElement>& subscribePath,
SubscriptionStateChangeCb stateChangeCb,
typename SubscriberT::FsdbSubUnitUpdateCb subUnitAvailableCb,
bool subscribeStats,
FsdbStreamClient::ServerOptions&& serverOptions,
const std::optional<std::string>& clientIdSuffix) {
auto subscribeType = SubscriberT::subscriptionType();
XCHECK(subscribeType != SubscriptionType::UNKNOWN) << "Unknown data type";
auto subsStr = toSubscriptionStr(
serverOptions.dstAddr.getAddressStr(),
subscribePath,
subscribeType,
subscribeStats);
auto& path2Subscriber =
subscribeStats ? statPath2Subscriber_ : statePath2Subscriber_;
auto path2SubscriberW = path2Subscriber.wlock();

auto clientStr = folly::to<std::string>(clientId_);
if (clientIdSuffix.has_value()) {
clientStr.append(folly::to<std::string>("_", clientIdSuffix.value()));
}

auto [itr, inserted] = path2SubscriberW->emplace(std::make_pair(
subsStr,
std::make_unique<SubscriberT>(
clientStr,
subscribePath,
subscriberEvb_,
reconnectEvb_,
subUnitAvailableCb,
subscribeStats,
stateChangeCb)));
if (!inserted) {
throw std::runtime_error(
"Subscription at : " + subsStr + " already exists");
}
XLOG(DBG2) << "Added subscription for: " << subsStr;
itr->second->setServerOptions(std::move(serverOptions));
}

template <typename SubscriberT, typename PathElement>
void FsdbPubSubManager::addSubscriptionImpl(
const std::vector<PathElement>& subscribePath,
Expand Down Expand Up @@ -577,6 +645,15 @@ void FsdbPubSubManager::removeStateDeltaSubscription(
SubscriptionType::DELTA,
false /*subscribeStats*/);
}
void FsdbPubSubManager::removeStatePatchSubscription(
const Path& subscribePath,
const std::string& fsdbHost) {
removeSubscriptionImpl(
subscribePath,
fsdbHost,
SubscriptionType::PATCH,
false /*subscribeStats*/);
}
void FsdbPubSubManager::removeStatePathSubscription(
const Path& subscribePath,
const std::string& fsdbHost) {
Expand Down
21 changes: 21 additions & 0 deletions fboss/fsdb/client/FsdbPubSubManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class FsdbPubSubManager {
~FsdbPubSubManager();

using Path = std::vector<std::string>;
using PatchPath = std::map<SubscriptionKey, fboss::fsdb::RawOperPath>;
using MultiPath = std::vector<Path>;

/* Publisher create APIs */
Expand Down Expand Up @@ -127,6 +128,11 @@ class FsdbPubSubManager {
FsdbDeltaSubscriber::FsdbOperDeltaUpdateCb operDeltaCb,
FsdbStreamClient::ServerOptions&& serverOptions =
kDefaultServerOptions());
void addStatePatchSubscription(
const PatchPath& subscribePath,
SubscriptionStateChangeCb stateChangeCb,
FsdbPatchSubscriber::FsdbOperPatchUpdateCb patchCb,
FsdbStreamClient::ServerOptions&& serverOptions);
void addStatePathSubscription(
SubscriptionOptions&& subscriptionOptions,
const Path& subscribePath,
Expand Down Expand Up @@ -164,6 +170,9 @@ class FsdbPubSubManager {
void removeStateDeltaSubscription(
const Path& subscribePath,
const std::string& fsdbHost = "::1");
void removeStatePatchSubscription(
const Path& subscribePath,
const std::string& fsdbHost);
void removeStatePathSubscription(
const Path& subscribePath,
const std::string& fsdbHost = "::1");
Expand All @@ -177,6 +186,9 @@ class FsdbPubSubManager {
void removeStateDeltaSubscription(
const MultiPath& subscribePath,
const std::string& fsdbHost = "::1");
void removeStatePatchSubscription(
const MultiPath& subscribePath,
const std::string& fsdbHost = "::1");
void removeStatePathSubscription(
const MultiPath& subscribePath,
const std::string& fsdbHost = "::1");
Expand Down Expand Up @@ -250,6 +262,7 @@ class FsdbPubSubManager {
const std::string& fsdbHost,
SubscriptionType subscribeType,
bool subscribeStats);

template <typename SubscriberT, typename PathElement>
void addSubscriptionImpl(
const std::vector<PathElement>& subscribePath,
Expand All @@ -259,6 +272,14 @@ class FsdbPubSubManager {
FsdbStreamClient::ServerOptions&& serverOptions,
const std::optional<std::string>& clientIdSuffix = std::nullopt);
template <typename SubscriberT, typename PathElement>
void addSubscriptionImpl(
const std::map<SubscriptionKey, PathElement>& subscribePath,
SubscriptionStateChangeCb stateChangeCb,
typename SubscriberT::FsdbSubUnitUpdateCb subUnitAvailableCb,
bool subscribeStats,
FsdbStreamClient::ServerOptions&& serverOptions,
const std::optional<std::string>& clientIdSuffix = std::nullopt);
template <typename SubscriberT, typename PathElement>
void addSubscriptionImpl(
SubscriptionOptions&& subscriptionOptions,
const std::vector<PathElement>& subscribePath,
Expand Down
16 changes: 16 additions & 0 deletions fboss/fsdb/common/PathHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,21 @@ std::vector<ExtendedOperPath> PathHelpers::toExtendedOperPath(
}
return extPaths;
}
std::map<SubscriptionKey, ExtendedOperPath> toMappedExtendedOperPath(
const std::vector<std::vector<std::string>>& paths) {
std::map<SubscriptionKey, ExtendedOperPath> result;
for (size_t i = 0; i < paths.size(); i++) {
auto path = paths[i];
ExtendedOperPath extPath;
extPath.path()->reserve(path.size());
for (const auto& pathElm : path) {
OperPathElem operPathElm;
operPathElm.raw_ref() = pathElm;
extPath.path()->push_back(std::move(operPathElm));
}
result[i] = std::move(extPath);
}
return result;
}

} // namespace facebook::fboss::fsdb
2 changes: 2 additions & 0 deletions fboss/fsdb/common/PathHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class PathHelpers {

static std::vector<ExtendedOperPath> toExtendedOperPath(
const std::vector<std::vector<std::string>>& paths);
static std::map<SubscriptionKey, ExtendedOperPath> toMappedExtendedOperPath(
const std::vector<std::vector<std::string>>& paths);
};

} // namespace facebook::fboss::fsdb

0 comments on commit 36df6f5

Please sign in to comment.