From acad62325530c7462c7ffdbd2e53790e5c516498 Mon Sep 17 00:00:00 2001 From: Luca Wehrstedt Date: Sat, 12 Dec 2020 02:19:19 -0800 Subject: [PATCH] Add test that stresses races during transport shutdown (#250) Summary: Pull Request resolved: https://github.com/pytorch/tensorpipe/pull/250 This test wants to do two things: defer functions to the loop while the context is shutting down, and create new objects (connections and listeners) before and/or just after the closing. Both these operations interfere with a correct shutdown, and are tricky to handle. Transports however don't offer a way to directly defer functions, so we achieve it by attaching a read callback to the connection, and causing it to be called immediately by closing the connection. Transports also don't really allow to control timing of when something that is deferred to the loop will really run. To "work around" it we simply jam the context by creating new connections over and over and over at an insane rate. This test proved its worth by finding many issues in transport shutdown. Differential Revision: D25495683 fbshipit-source-id: 6cb4a01385bd23811457a99afab2915425d5ecd8 --- tensorpipe/test/transport/connection_test.cc | 36 ++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tensorpipe/test/transport/connection_test.cc b/tensorpipe/test/transport/connection_test.cc index af60258cc..a82725a62 100644 --- a/tensorpipe/test/transport/connection_test.cc +++ b/tensorpipe/test/transport/connection_test.cc @@ -245,3 +245,39 @@ TEST_P(TransportTest, DISABLED_Connection_EmptyBuffer) { peers_->join(PeerGroup::kClient); }); } + +TEST_P(TransportTest, Connection_SpamAtClosing) { + using namespace std::chrono_literals; + + std::shared_ptr ctx = GetParam()->getContext(); + ctx->setId("loopback"); + + std::string addr = GetParam()->defaultAddr(); + std::shared_ptr listener = ctx->listen(addr); + + std::atomic stopSpamming{false}; + std::function spam = [&]() { + if (stopSpamming) { + return; + } + std::shared_ptr conn = ctx->connect(addr); + conn->read( + [&](const Error& error, const void* /* unused */, size_t /* unused */) { + EXPECT_TRUE(error); + spam(); + }); + conn->close(); + }; + + spam(); + + std::this_thread::sleep_for(10ms); + + ctx->close(); + + std::this_thread::sleep_for(10ms); + + stopSpamming = true; + + ctx->join(); +}