Skip to content
This repository has been archived by the owner on Jul 1, 2023. It is now read-only.

Commit

Permalink
Rely on enrollment to propagate closing from ctx to conn/list
Browse files Browse the repository at this point in the history
Summary: And thus retire the ClosingEmitter/Receiver from transports.

Differential Revision: D25495914

fbshipit-source-id: 52d6e52e0766c64e4b01329a98ecfa0370e9c199
  • Loading branch information
lw authored and facebook-github-bot committed Dec 11, 2020
1 parent e93a894 commit 1d9975a
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 25 deletions.
8 changes: 1 addition & 7 deletions tensorpipe/transport/connection_impl_boilerplate.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ class ConnectionImplBoilerplate : public std::enable_shared_from_this<TConn> {
// Deal with an error.
void handleError();

ClosingReceiver closingReceiver_;

// A sequence number for the calls to read and write.
uint64_t nextBufferBeingRead_{0};
uint64_t nextBufferBeingWritten_{0};
Expand All @@ -145,9 +143,7 @@ ConnectionImplBoilerplate<TCtx, TList, TConn>::ConnectionImplBoilerplate(
ConstructorToken /* unused */,
std::shared_ptr<TCtx> context,
std::string id)
: context_(std::move(context)),
id_(std::move(id)),
closingReceiver_(context_, context_->getClosingEmitter()) {}
: context_(std::move(context)), id_(std::move(id)) {}

template <typename TCtx, typename TList, typename TConn>
void ConnectionImplBoilerplate<TCtx, TList, TConn>::init() {
Expand All @@ -166,8 +162,6 @@ void ConnectionImplBoilerplate<TCtx, TList, TConn>::initFromLoop() {
return;
}

closingReceiver_.activate(*this);

initImplFromLoop();
}

Expand Down
26 changes: 15 additions & 11 deletions tensorpipe/transport/context_impl_boilerplate.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include <unordered_map>
#include <utility>

#include <tensorpipe/common/callback.h>
#include <tensorpipe/common/defs.h>
#include <tensorpipe/transport/connection_boilerplate.h>
#include <tensorpipe/transport/listener_boilerplate.h>
Expand Down Expand Up @@ -55,8 +54,6 @@ class ContextImplBoilerplate : public virtual DeferredExecutor,
// this must be called from within the loop.
bool closed();

ClosingEmitter& getClosingEmitter();

void setId(std::string id);

void close();
Expand All @@ -77,7 +74,6 @@ class ContextImplBoilerplate : public virtual DeferredExecutor,
private:
std::atomic<bool> closed_{false};
std::atomic<bool> joined_{false};
ClosingEmitter closingEmitter_;

const std::string domainDescriptor_;

Expand Down Expand Up @@ -171,12 +167,6 @@ bool ContextImplBoilerplate<TCtx, TList, TConn>::closed() {
return closed_;
};

template <typename TCtx, typename TList, typename TConn>
ClosingEmitter& ContextImplBoilerplate<TCtx, TList, TConn>::
getClosingEmitter() {
return closingEmitter_;
};

template <typename TCtx, typename TList, typename TConn>
void ContextImplBoilerplate<TCtx, TList, TConn>::setId(std::string id) {
TP_VLOG(7) << "Transport context " << id_ << " was renamed to " << id;
Expand All @@ -191,7 +181,21 @@ void ContextImplBoilerplate<TCtx, TList, TConn>::close() {
if (!closed_.exchange(true)) {
TP_VLOG(7) << "Transport context " << id_ << " is closing";

closingEmitter_.close();
// Make a copy as they could unenroll themselves inline.
decltype(listeners_) listenersCopy = listeners_;
decltype(connections_) connectionsCopy = connections_;
// We call closeFromLoop, rather than just close, because we need these
// objects to transition _immediately_ to error, "atomically". If we just
// deferred closing to later, this could come after some already-enqueued
// operations that could try to access the context, which would be closed,
// and this could fail.
for (auto& iter : listenersCopy) {
iter.second->closeFromLoop();
}
for (auto& iter : connectionsCopy) {
iter.second->closeFromLoop();
}

closeImpl();

TP_VLOG(7) << "Transport context " << id_ << " done closing";
Expand Down
14 changes: 7 additions & 7 deletions tensorpipe/transport/listener_impl_boilerplate.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,27 @@ class ListenerImplBoilerplate : public std::enable_shared_from_this<TList> {
// Deal with an error.
void handleError();

ClosingReceiver closingReceiver_;

// A sequence number for the calls to accept.
uint64_t nextConnectionBeingAccepted_{0};

// Sequence numbers for the connections created by this listener, used to
// create their identifiers based off this listener's identifier. They will
// only be used for logging and debugging.
std::atomic<uint64_t> connectionCounter_{0};

// Contexts do sometimes need to call directly into closeForLoop, in order to
// make sure that some of their operations can happen "atomically" on the
// connection, without possibly other operations occurring in between (e.g.,
// an error).
friend ContextImplBoilerplate<TCtx, TList, TConn>;
};

template <typename TCtx, typename TList, typename TConn>
ListenerImplBoilerplate<TCtx, TList, TConn>::ListenerImplBoilerplate(
ConstructorToken /* unused */,
std::shared_ptr<TCtx> context,
std::string id)
: context_(std::move(context)),
id_(std::move(id)),
closingReceiver_(context_, context_->getClosingEmitter()) {}
: context_(std::move(context)), id_(std::move(id)) {}

template <typename TCtx, typename TList, typename TConn>
void ListenerImplBoilerplate<TCtx, TList, TConn>::init() {
Expand All @@ -141,8 +143,6 @@ void ListenerImplBoilerplate<TCtx, TList, TConn>::initFromLoop() {
return;
}

closingReceiver_.activate(*this);

initImplFromLoop();
}

Expand Down

0 comments on commit 1d9975a

Please sign in to comment.