Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use self-pipe trick to implement signal handlers #618

Merged
merged 9 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 143 additions & 7 deletions src/SignalHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@
// comments when upgrading to gz-cmake's "make codecheck"
#include "gz/common/SignalHandler.hh" // NOLINT(*)
#include <atomic>
#include <cstring>
#include <csignal> // NOLINT(*)
#include <functional> // NOLINT(*)
#include <fcntl.h>
#ifndef _WIN32
#include <unistd.h>
#else
#include <io.h>
#endif
#include <map> // NOLINT(*)
#include <mutex> // NOLINT(*)
#include <utility> // NOLINT(*)
Expand All @@ -31,22 +38,150 @@ using namespace gz;
using namespace common;

// A wrapper for the sigaction sa_handler.
// TODO(azeey) We should avoid using objects with non-trivial destructors as
// globals.
GZ_COMMON_VISIBLE std::map<int, std::function<void(int)>> gOnSignalWrappers;
std::mutex gWrapperMutex;

#ifdef _WIN32
#define write _write
#define read _read
#endif
namespace
{

/// \brief Index of the read file descriptor
constexpr int kReadFd = 0;
/// \brief Index of the write file descriptor
constexpr int kWriteFd = 1;

/// \brief Class to encalpsulate the self-pipe trick which is a way enable
/// the user of non async-signal-safe functions in downstream signal handler
/// callbacks.
///
/// It works by creating a pipe between the actual signal handler and
/// a servicing thread. When a signal is received the signal handler
/// writes a byte to the pipe and returns. The servicing thread reads the
/// byte from the pipe and calls all of the registered callbacks. Since
/// the registered callbacks are called from a regular thread instead of
/// an actual signal handler, the callbacks are free to use any function
/// (e.g. call gzdbg).
class SelfPipe {

/// \brief The two pipes the comprise the self-pipe
public: static int pipeFd[2];

/// \brief Static function to create a singleton SelfPipe object
public: static void Initialize();

/// \brief Destructor.
public: ~SelfPipe();

/// \brief Constructor
/// Creates the pipes, applies configuration flags and starts the servicing
/// thread
private: SelfPipe();

/// \brief Servicing thread
private: void CheckPipe();

/// \brief Handle for CheckPipe thread
private: std::thread checkPipeThread;

/// \brief Whether the program is running. This is set to true by the
/// Constructor and set to false by the destructor
private: std::atomic<bool> running{false};
};

int SelfPipe::pipeFd[2];

/////////////////////////////////////////////////
/// \brief Callback to execute when a signal is received.
/// This simply writes a byte to a pipe and returns
/// \param[in] _value Signal number.
void onSignal(int _value)
void onSignalWriteToSelfPipe(int _value)
{
std::lock_guard<std::mutex> lock(gWrapperMutex);
// Send the signal to each wrapper
for (std::pair<int, std::function<void(int)>> func : gOnSignalWrappers)
#ifdef _WIN32
// Windows resets the signal handler every time a signal is handled.
std::signal(_value, onSignalWriteToSelfPipe);
#endif
auto valueByte = static_cast<std::uint8_t>(_value);
if (write(SelfPipe::pipeFd[kWriteFd], &valueByte, 1) == -1)
{
// TODO(azeey) Not clear what to do here.
}
}

/////////////////////////////////////////////////
SelfPipe::SelfPipe()
{
#ifdef _WIN32
if (_pipe(this->pipeFd, 256, O_BINARY) == -1)
#else
if (pipe(this->pipeFd) == -1)
#endif
{
func.second(_value);
gzerr << "Unable to create pipe\n";
}

#ifndef _WIN32
int flags = fcntl(this->pipeFd[kWriteFd], F_GETFL);
if (fcntl(this->pipeFd[kWriteFd], F_SETFL, flags | O_NONBLOCK) < 0)
{
gzerr << "Failed to set flags on pipe. "
<< "Signal handling may not work properly" << std::endl;
}
#endif
this->running = true;
this->checkPipeThread = std::thread(&SelfPipe::CheckPipe, this);
}

/////////////////////////////////////////////////
SelfPipe::~SelfPipe()
{
this->running = false;
// Write a dummy signal value to the pipe. This is not a real signal, but we
// need to wakeup the CheckPipe thread so it can cleanup properly. The value
// was chosen to make it clear that this is not one of the standard signals.
onSignalWriteToSelfPipe(127);
this->checkPipeThread.join();
}

/////////////////////////////////////////////////
void SelfPipe::Initialize()
{
// We actually need this object to be destructed in order to join the thread,
// so we can't use gz::utils::NeverDestroyed here.
static SelfPipe selfPipe;
}

/////////////////////////////////////////////////
void SelfPipe::CheckPipe()
{
while (this->running)
{
std::uint8_t signal;
if (read(SelfPipe::pipeFd[kReadFd], &signal, 1) != -1)
{
if (this->running)
{
std::lock_guard<std::mutex> lock(gWrapperMutex);
// Send the signal to each wrapper
for (std::pair<int, std::function<void(int)>> func : gOnSignalWrappers)
{
func.second(signal);
}
}
}
else
{
gzerr << errno << " " << std::strerror(errno) << std::endl;
}
}
}

} // namespace

/////////////////////////////////////////////////
class common::SignalHandlerPrivate
{
Expand Down Expand Up @@ -74,14 +209,15 @@ SignalHandler::SignalHandler()
static int counter = 0;
std::lock_guard<std::mutex> lock(gWrapperMutex);

if (std::signal(SIGINT, onSignal) == SIG_ERR)
SelfPipe::Initialize();
if (std::signal(SIGINT, onSignalWriteToSelfPipe) == SIG_ERR)
{
gzerr << "Unable to catch SIGINT.\n"
<< " Please visit http://community.gazebosim.org for help.\n";
return;
}

if (std::signal(SIGTERM, onSignal) == SIG_ERR)
if (std::signal(SIGTERM, onSignalWriteToSelfPipe) == SIG_ERR)
{
gzerr << "Unable to catch SIGTERM.\n"
<< " Please visit http://community.gazebosim.org for help.\n";
Expand Down
40 changes: 40 additions & 0 deletions src/SignalHandler_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// comments when upgrading to gz-cmake's "make codecheck"
#include "gz/common/SignalHandler.hh" // NOLINT(*)
#include <gtest/gtest.h> // NOLINT(*)
#include <chrono>
#include <csignal> // NOLINT(*)
#include <condition_variable> // NOLINT(*)
#include <map> // NOLINT(*)
Expand Down Expand Up @@ -79,6 +80,7 @@ TEST(SignalHandler, Single)
common::SignalHandler handler1;
EXPECT_TRUE(handler1.AddCallback(handler1Cb));
std::raise(SIGTERM);
std::this_thread::sleep_for(std::chrono::milliseconds(11));
EXPECT_EQ(SIGTERM, gHandler1Sig);
}

Expand All @@ -98,6 +100,7 @@ TEST(SignalHandler, Multiple)

std::raise(SIGINT);

std::this_thread::sleep_for(std::chrono::milliseconds(11));
EXPECT_EQ(-1, gHandler1Sig);
EXPECT_EQ(-1, gHandler2Sig);

Expand Down Expand Up @@ -127,6 +130,7 @@ TEST(SignalHandler, InitFailure)

std::raise(SIGINT);

std::this_thread::sleep_for(std::chrono::milliseconds(11));
EXPECT_EQ(-1, gHandler1Sig);
EXPECT_EQ(-1, gHandler2Sig);
}
Expand Down Expand Up @@ -287,3 +291,39 @@ TEST(SignalHandler, MultipleThreads)
for (int i = 0; i < threadCount; ++i)
EXPECT_EQ(SIGINT, results[i]);
}

/////////////////////////////////////////////////
TEST(SignalHandler, RapidFire)
{
resetSignals();
std::condition_variable cv;
std::mutex countMutex;
int countHandlerCalls = 0;
constexpr int kNumSignals = 100;
auto cb = [&](int _sig)
{
if (_sig == SIGTERM)
{
std::lock_guard<std::mutex> lk(countMutex);
++countHandlerCalls;
if (countHandlerCalls >= kNumSignals)
{
cv.notify_one();
}
}
};
common::SignalHandler handler1;
EXPECT_TRUE(handler1.AddCallback(cb));

for (int i=0; i < kNumSignals; ++i)
{
std::raise(SIGTERM);
std::this_thread::sleep_for(std::chrono::microseconds(100));
}

// wait for callback to be called kNumSignal times with a timeout
std::unique_lock<std::mutex> lk(countMutex);
cv.wait_for(lk, std::chrono::seconds(5),
[&] { return countHandlerCalls >= kNumSignals; });
EXPECT_GE(countHandlerCalls, kNumSignals);
}
Loading