From b733ac358628f974688e5899984fd5705f69786c Mon Sep 17 00:00:00 2001 From: "Addisu Z. Taddese" Date: Wed, 21 Aug 2024 01:00:25 -0500 Subject: [PATCH] Use self-pipe trick to implement signal handlers (#618) This allows our callbacks to freely use any function they want including printing to the console. Previously, since the callbacks were called in a signal handler, they were limited to using async-signal-safe functions. However, most of our callbacks did not comply with this and so risked causing a deadlock when a signal was fired. For example, malloc or new is not async-signal-safe, but one of the signal handlers in gz-sim emits a Stop event which requires allocating memory. If before the signal was fired, another thread was allocating memory, trying to emit this event inside the signal handler will lock up since it'll internally try to lock a mutex when calling malloc/new. The same is true for using functions like gzdbg to print to the console which is not permitted inside a signal hander. --------- Signed-off-by: Addisu Z. Taddese --- src/SignalHandler.cc | 150 ++++++++++++++++++++++++++++++++++++-- src/SignalHandler_TEST.cc | 40 ++++++++++ 2 files changed, 183 insertions(+), 7 deletions(-) diff --git a/src/SignalHandler.cc b/src/SignalHandler.cc index 07b6d054c..0612c03ff 100644 --- a/src/SignalHandler.cc +++ b/src/SignalHandler.cc @@ -19,8 +19,15 @@ // comments when upgrading to gz-cmake's "make codecheck" #include "gz/common/SignalHandler.hh" // NOLINT(*) #include +#include #include // NOLINT(*) #include // NOLINT(*) +#include +#ifndef _WIN32 + #include +#else + #include +#endif #include // NOLINT(*) #include // NOLINT(*) #include // NOLINT(*) @@ -31,22 +38,150 @@ using namespace gz; using namespace common; // A wrapper for the sigaction sa_handler. +// TODO(azeey) We should avoid using objects with non-trivial destructors as +// globals. GZ_COMMON_VISIBLE std::map> gOnSignalWrappers; std::mutex gWrapperMutex; +#ifdef _WIN32 + #define write _write + #define read _read +#endif +namespace +{ + +/// \brief Index of the read file descriptor +constexpr int kReadFd = 0; +/// \brief Index of the write file descriptor +constexpr int kWriteFd = 1; + +/// \brief Class to encalpsulate the self-pipe trick which is a way enable +/// the user of non async-signal-safe functions in downstream signal handler +/// callbacks. +/// +/// It works by creating a pipe between the actual signal handler and +/// a servicing thread. When a signal is received the signal handler +/// writes a byte to the pipe and returns. The servicing thread reads the +/// byte from the pipe and calls all of the registered callbacks. Since +/// the registered callbacks are called from a regular thread instead of +/// an actual signal handler, the callbacks are free to use any function +/// (e.g. call gzdbg). +class SelfPipe { + + /// \brief The two pipes the comprise the self-pipe + public: static int pipeFd[2]; + + /// \brief Static function to create a singleton SelfPipe object + public: static void Initialize(); + + /// \brief Destructor. + public: ~SelfPipe(); + + /// \brief Constructor + /// Creates the pipes, applies configuration flags and starts the servicing + /// thread + private: SelfPipe(); + + /// \brief Servicing thread + private: void CheckPipe(); + + /// \brief Handle for CheckPipe thread + private: std::thread checkPipeThread; + + /// \brief Whether the program is running. This is set to true by the + /// Constructor and set to false by the destructor + private: std::atomic running{false}; +}; + +int SelfPipe::pipeFd[2]; + ///////////////////////////////////////////////// /// \brief Callback to execute when a signal is received. +/// This simply writes a byte to a pipe and returns /// \param[in] _value Signal number. -void onSignal(int _value) +void onSignalWriteToSelfPipe(int _value) { - std::lock_guard lock(gWrapperMutex); - // Send the signal to each wrapper - for (std::pair> func : gOnSignalWrappers) +#ifdef _WIN32 + // Windows resets the signal handler every time a signal is handled. + std::signal(_value, onSignalWriteToSelfPipe); +#endif + auto valueByte = static_cast(_value); + if (write(SelfPipe::pipeFd[kWriteFd], &valueByte, 1) == -1) + { + // TODO(azeey) Not clear what to do here. + } +} + +///////////////////////////////////////////////// +SelfPipe::SelfPipe() +{ +#ifdef _WIN32 + if (_pipe(this->pipeFd, 256, O_BINARY) == -1) +#else + if (pipe(this->pipeFd) == -1) +#endif { - func.second(_value); + gzerr << "Unable to create pipe\n"; } + +#ifndef _WIN32 + int flags = fcntl(this->pipeFd[kWriteFd], F_GETFL); + if (fcntl(this->pipeFd[kWriteFd], F_SETFL, flags | O_NONBLOCK) < 0) + { + gzerr << "Failed to set flags on pipe. " + << "Signal handling may not work properly" << std::endl; + } +#endif + this->running = true; + this->checkPipeThread = std::thread(&SelfPipe::CheckPipe, this); +} + +///////////////////////////////////////////////// +SelfPipe::~SelfPipe() +{ + this->running = false; + // Write a dummy signal value to the pipe. This is not a real signal, but we + // need to wakeup the CheckPipe thread so it can cleanup properly. The value + // was chosen to make it clear that this is not one of the standard signals. + onSignalWriteToSelfPipe(127); + this->checkPipeThread.join(); } +///////////////////////////////////////////////// +void SelfPipe::Initialize() +{ + // We actually need this object to be destructed in order to join the thread, + // so we can't use gz::utils::NeverDestroyed here. + static SelfPipe selfPipe; +} + +///////////////////////////////////////////////// +void SelfPipe::CheckPipe() +{ + while (this->running) + { + std::uint8_t signal; + if (read(SelfPipe::pipeFd[kReadFd], &signal, 1) != -1) + { + if (this->running) + { + std::lock_guard lock(gWrapperMutex); + // Send the signal to each wrapper + for (std::pair> func : gOnSignalWrappers) + { + func.second(signal); + } + } + } + else + { + gzerr << errno << " " << std::strerror(errno) << std::endl; + } + } +} + +} // namespace + ///////////////////////////////////////////////// class common::SignalHandlerPrivate { @@ -74,14 +209,15 @@ SignalHandler::SignalHandler() static int counter = 0; std::lock_guard lock(gWrapperMutex); - if (std::signal(SIGINT, onSignal) == SIG_ERR) + SelfPipe::Initialize(); + if (std::signal(SIGINT, onSignalWriteToSelfPipe) == SIG_ERR) { gzerr << "Unable to catch SIGINT.\n" << " Please visit http://community.gazebosim.org for help.\n"; return; } - if (std::signal(SIGTERM, onSignal) == SIG_ERR) + if (std::signal(SIGTERM, onSignalWriteToSelfPipe) == SIG_ERR) { gzerr << "Unable to catch SIGTERM.\n" << " Please visit http://community.gazebosim.org for help.\n"; diff --git a/src/SignalHandler_TEST.cc b/src/SignalHandler_TEST.cc index 420f3aa6f..5df2a9d52 100644 --- a/src/SignalHandler_TEST.cc +++ b/src/SignalHandler_TEST.cc @@ -19,6 +19,7 @@ // comments when upgrading to gz-cmake's "make codecheck" #include "gz/common/SignalHandler.hh" // NOLINT(*) #include // NOLINT(*) +#include #include // NOLINT(*) #include // NOLINT(*) #include // NOLINT(*) @@ -79,6 +80,7 @@ TEST(SignalHandler, Single) common::SignalHandler handler1; EXPECT_TRUE(handler1.AddCallback(handler1Cb)); std::raise(SIGTERM); + std::this_thread::sleep_for(std::chrono::milliseconds(11)); EXPECT_EQ(SIGTERM, gHandler1Sig); } @@ -98,6 +100,7 @@ TEST(SignalHandler, Multiple) std::raise(SIGINT); + std::this_thread::sleep_for(std::chrono::milliseconds(11)); EXPECT_EQ(-1, gHandler1Sig); EXPECT_EQ(-1, gHandler2Sig); @@ -127,6 +130,7 @@ TEST(SignalHandler, InitFailure) std::raise(SIGINT); + std::this_thread::sleep_for(std::chrono::milliseconds(11)); EXPECT_EQ(-1, gHandler1Sig); EXPECT_EQ(-1, gHandler2Sig); } @@ -287,3 +291,39 @@ TEST(SignalHandler, MultipleThreads) for (int i = 0; i < threadCount; ++i) EXPECT_EQ(SIGINT, results[i]); } + +///////////////////////////////////////////////// +TEST(SignalHandler, RapidFire) +{ + resetSignals(); + std::condition_variable cv; + std::mutex countMutex; + int countHandlerCalls = 0; + constexpr int kNumSignals = 100; + auto cb = [&](int _sig) + { + if (_sig == SIGTERM) + { + std::lock_guard lk(countMutex); + ++countHandlerCalls; + if (countHandlerCalls >= kNumSignals) + { + cv.notify_one(); + } + } + }; + common::SignalHandler handler1; + EXPECT_TRUE(handler1.AddCallback(cb)); + + for (int i=0; i < kNumSignals; ++i) + { + std::raise(SIGTERM); + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + + // wait for callback to be called kNumSignal times with a timeout + std::unique_lock lk(countMutex); + cv.wait_for(lk, std::chrono::seconds(5), + [&] { return countHandlerCalls >= kNumSignals; }); + EXPECT_GE(countHandlerCalls, kNumSignals); +}