diff --git a/src/SignalHandler.cc b/src/SignalHandler.cc index 07b6d054..0612c03f 100644 --- a/src/SignalHandler.cc +++ b/src/SignalHandler.cc @@ -19,8 +19,15 @@ // comments when upgrading to gz-cmake's "make codecheck" #include "gz/common/SignalHandler.hh" // NOLINT(*) #include +#include #include // NOLINT(*) #include // NOLINT(*) +#include +#ifndef _WIN32 + #include +#else + #include +#endif #include // NOLINT(*) #include // NOLINT(*) #include // NOLINT(*) @@ -31,22 +38,150 @@ using namespace gz; using namespace common; // A wrapper for the sigaction sa_handler. +// TODO(azeey) We should avoid using objects with non-trivial destructors as +// globals. GZ_COMMON_VISIBLE std::map> gOnSignalWrappers; std::mutex gWrapperMutex; +#ifdef _WIN32 + #define write _write + #define read _read +#endif +namespace +{ + +/// \brief Index of the read file descriptor +constexpr int kReadFd = 0; +/// \brief Index of the write file descriptor +constexpr int kWriteFd = 1; + +/// \brief Class to encalpsulate the self-pipe trick which is a way enable +/// the user of non async-signal-safe functions in downstream signal handler +/// callbacks. +/// +/// It works by creating a pipe between the actual signal handler and +/// a servicing thread. When a signal is received the signal handler +/// writes a byte to the pipe and returns. The servicing thread reads the +/// byte from the pipe and calls all of the registered callbacks. Since +/// the registered callbacks are called from a regular thread instead of +/// an actual signal handler, the callbacks are free to use any function +/// (e.g. call gzdbg). +class SelfPipe { + + /// \brief The two pipes the comprise the self-pipe + public: static int pipeFd[2]; + + /// \brief Static function to create a singleton SelfPipe object + public: static void Initialize(); + + /// \brief Destructor. + public: ~SelfPipe(); + + /// \brief Constructor + /// Creates the pipes, applies configuration flags and starts the servicing + /// thread + private: SelfPipe(); + + /// \brief Servicing thread + private: void CheckPipe(); + + /// \brief Handle for CheckPipe thread + private: std::thread checkPipeThread; + + /// \brief Whether the program is running. This is set to true by the + /// Constructor and set to false by the destructor + private: std::atomic running{false}; +}; + +int SelfPipe::pipeFd[2]; + ///////////////////////////////////////////////// /// \brief Callback to execute when a signal is received. +/// This simply writes a byte to a pipe and returns /// \param[in] _value Signal number. -void onSignal(int _value) +void onSignalWriteToSelfPipe(int _value) { - std::lock_guard lock(gWrapperMutex); - // Send the signal to each wrapper - for (std::pair> func : gOnSignalWrappers) +#ifdef _WIN32 + // Windows resets the signal handler every time a signal is handled. + std::signal(_value, onSignalWriteToSelfPipe); +#endif + auto valueByte = static_cast(_value); + if (write(SelfPipe::pipeFd[kWriteFd], &valueByte, 1) == -1) + { + // TODO(azeey) Not clear what to do here. + } +} + +///////////////////////////////////////////////// +SelfPipe::SelfPipe() +{ +#ifdef _WIN32 + if (_pipe(this->pipeFd, 256, O_BINARY) == -1) +#else + if (pipe(this->pipeFd) == -1) +#endif { - func.second(_value); + gzerr << "Unable to create pipe\n"; } + +#ifndef _WIN32 + int flags = fcntl(this->pipeFd[kWriteFd], F_GETFL); + if (fcntl(this->pipeFd[kWriteFd], F_SETFL, flags | O_NONBLOCK) < 0) + { + gzerr << "Failed to set flags on pipe. " + << "Signal handling may not work properly" << std::endl; + } +#endif + this->running = true; + this->checkPipeThread = std::thread(&SelfPipe::CheckPipe, this); +} + +///////////////////////////////////////////////// +SelfPipe::~SelfPipe() +{ + this->running = false; + // Write a dummy signal value to the pipe. This is not a real signal, but we + // need to wakeup the CheckPipe thread so it can cleanup properly. The value + // was chosen to make it clear that this is not one of the standard signals. + onSignalWriteToSelfPipe(127); + this->checkPipeThread.join(); } +///////////////////////////////////////////////// +void SelfPipe::Initialize() +{ + // We actually need this object to be destructed in order to join the thread, + // so we can't use gz::utils::NeverDestroyed here. + static SelfPipe selfPipe; +} + +///////////////////////////////////////////////// +void SelfPipe::CheckPipe() +{ + while (this->running) + { + std::uint8_t signal; + if (read(SelfPipe::pipeFd[kReadFd], &signal, 1) != -1) + { + if (this->running) + { + std::lock_guard lock(gWrapperMutex); + // Send the signal to each wrapper + for (std::pair> func : gOnSignalWrappers) + { + func.second(signal); + } + } + } + else + { + gzerr << errno << " " << std::strerror(errno) << std::endl; + } + } +} + +} // namespace + ///////////////////////////////////////////////// class common::SignalHandlerPrivate { @@ -74,14 +209,15 @@ SignalHandler::SignalHandler() static int counter = 0; std::lock_guard lock(gWrapperMutex); - if (std::signal(SIGINT, onSignal) == SIG_ERR) + SelfPipe::Initialize(); + if (std::signal(SIGINT, onSignalWriteToSelfPipe) == SIG_ERR) { gzerr << "Unable to catch SIGINT.\n" << " Please visit http://community.gazebosim.org for help.\n"; return; } - if (std::signal(SIGTERM, onSignal) == SIG_ERR) + if (std::signal(SIGTERM, onSignalWriteToSelfPipe) == SIG_ERR) { gzerr << "Unable to catch SIGTERM.\n" << " Please visit http://community.gazebosim.org for help.\n"; diff --git a/src/SignalHandler_TEST.cc b/src/SignalHandler_TEST.cc index 420f3aa6..5df2a9d5 100644 --- a/src/SignalHandler_TEST.cc +++ b/src/SignalHandler_TEST.cc @@ -19,6 +19,7 @@ // comments when upgrading to gz-cmake's "make codecheck" #include "gz/common/SignalHandler.hh" // NOLINT(*) #include // NOLINT(*) +#include #include // NOLINT(*) #include // NOLINT(*) #include // NOLINT(*) @@ -79,6 +80,7 @@ TEST(SignalHandler, Single) common::SignalHandler handler1; EXPECT_TRUE(handler1.AddCallback(handler1Cb)); std::raise(SIGTERM); + std::this_thread::sleep_for(std::chrono::milliseconds(11)); EXPECT_EQ(SIGTERM, gHandler1Sig); } @@ -98,6 +100,7 @@ TEST(SignalHandler, Multiple) std::raise(SIGINT); + std::this_thread::sleep_for(std::chrono::milliseconds(11)); EXPECT_EQ(-1, gHandler1Sig); EXPECT_EQ(-1, gHandler2Sig); @@ -127,6 +130,7 @@ TEST(SignalHandler, InitFailure) std::raise(SIGINT); + std::this_thread::sleep_for(std::chrono::milliseconds(11)); EXPECT_EQ(-1, gHandler1Sig); EXPECT_EQ(-1, gHandler2Sig); } @@ -287,3 +291,39 @@ TEST(SignalHandler, MultipleThreads) for (int i = 0; i < threadCount; ++i) EXPECT_EQ(SIGINT, results[i]); } + +///////////////////////////////////////////////// +TEST(SignalHandler, RapidFire) +{ + resetSignals(); + std::condition_variable cv; + std::mutex countMutex; + int countHandlerCalls = 0; + constexpr int kNumSignals = 100; + auto cb = [&](int _sig) + { + if (_sig == SIGTERM) + { + std::lock_guard lk(countMutex); + ++countHandlerCalls; + if (countHandlerCalls >= kNumSignals) + { + cv.notify_one(); + } + } + }; + common::SignalHandler handler1; + EXPECT_TRUE(handler1.AddCallback(cb)); + + for (int i=0; i < kNumSignals; ++i) + { + std::raise(SIGTERM); + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + + // wait for callback to be called kNumSignal times with a timeout + std::unique_lock lk(countMutex); + cv.wait_for(lk, std::chrono::seconds(5), + [&] { return countHandlerCalls >= kNumSignals; }); + EXPECT_GE(countHandlerCalls, kNumSignals); +}