Skip to content

Commit

Permalink
Use self-pipe trick to implement signal handlers (#618)
Browse files Browse the repository at this point in the history
This allows our callbacks to freely use any function they want including printing to the console. Previously, since the callbacks were called in a signal handler, they were limited to using async-signal-safe functions. However, most of our callbacks did not comply with this and so risked causing a deadlock when a signal was fired. For example, malloc or new is not async-signal-safe, but one of the signal handlers in gz-sim emits a Stop event which requires allocating memory. If before the signal was fired, another thread was allocating memory, trying to emit this event inside the signal handler will lock up since it'll internally try to lock a mutex when calling malloc/new. The same is true for using functions like gzdbg to print to the console which is not permitted inside a signal hander.


---------

Signed-off-by: Addisu Z. Taddese <[email protected]>
  • Loading branch information
azeey authored Aug 21, 2024
1 parent 1f52701 commit b733ac3
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 7 deletions.
150 changes: 143 additions & 7 deletions src/SignalHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@
// comments when upgrading to gz-cmake's "make codecheck"
#include "gz/common/SignalHandler.hh" // NOLINT(*)
#include <atomic>
#include <cstring>
#include <csignal> // NOLINT(*)
#include <functional> // NOLINT(*)
#include <fcntl.h>
#ifndef _WIN32
#include <unistd.h>
#else
#include <io.h>
#endif
#include <map> // NOLINT(*)
#include <mutex> // NOLINT(*)
#include <utility> // NOLINT(*)
Expand All @@ -31,22 +38,150 @@ using namespace gz;
using namespace common;

// A wrapper for the sigaction sa_handler.
// TODO(azeey) We should avoid using objects with non-trivial destructors as
// globals.
GZ_COMMON_VISIBLE std::map<int, std::function<void(int)>> gOnSignalWrappers;
std::mutex gWrapperMutex;

#ifdef _WIN32
#define write _write
#define read _read
#endif
namespace
{

/// \brief Index of the read file descriptor
constexpr int kReadFd = 0;
/// \brief Index of the write file descriptor
constexpr int kWriteFd = 1;

/// \brief Class to encalpsulate the self-pipe trick which is a way enable
/// the user of non async-signal-safe functions in downstream signal handler
/// callbacks.
///
/// It works by creating a pipe between the actual signal handler and
/// a servicing thread. When a signal is received the signal handler
/// writes a byte to the pipe and returns. The servicing thread reads the
/// byte from the pipe and calls all of the registered callbacks. Since
/// the registered callbacks are called from a regular thread instead of
/// an actual signal handler, the callbacks are free to use any function
/// (e.g. call gzdbg).
class SelfPipe {

/// \brief The two pipes the comprise the self-pipe
public: static int pipeFd[2];

/// \brief Static function to create a singleton SelfPipe object
public: static void Initialize();

/// \brief Destructor.
public: ~SelfPipe();

/// \brief Constructor
/// Creates the pipes, applies configuration flags and starts the servicing
/// thread
private: SelfPipe();

/// \brief Servicing thread
private: void CheckPipe();

/// \brief Handle for CheckPipe thread
private: std::thread checkPipeThread;

/// \brief Whether the program is running. This is set to true by the
/// Constructor and set to false by the destructor
private: std::atomic<bool> running{false};
};

int SelfPipe::pipeFd[2];

/////////////////////////////////////////////////
/// \brief Callback to execute when a signal is received.
/// This simply writes a byte to a pipe and returns
/// \param[in] _value Signal number.
void onSignal(int _value)
void onSignalWriteToSelfPipe(int _value)
{
std::lock_guard<std::mutex> lock(gWrapperMutex);
// Send the signal to each wrapper
for (std::pair<int, std::function<void(int)>> func : gOnSignalWrappers)
#ifdef _WIN32
// Windows resets the signal handler every time a signal is handled.
std::signal(_value, onSignalWriteToSelfPipe);
#endif
auto valueByte = static_cast<std::uint8_t>(_value);
if (write(SelfPipe::pipeFd[kWriteFd], &valueByte, 1) == -1)
{
// TODO(azeey) Not clear what to do here.
}
}

/////////////////////////////////////////////////
SelfPipe::SelfPipe()
{
#ifdef _WIN32
if (_pipe(this->pipeFd, 256, O_BINARY) == -1)
#else
if (pipe(this->pipeFd) == -1)
#endif
{
func.second(_value);
gzerr << "Unable to create pipe\n";
}

#ifndef _WIN32
int flags = fcntl(this->pipeFd[kWriteFd], F_GETFL);
if (fcntl(this->pipeFd[kWriteFd], F_SETFL, flags | O_NONBLOCK) < 0)
{
gzerr << "Failed to set flags on pipe. "
<< "Signal handling may not work properly" << std::endl;
}
#endif
this->running = true;
this->checkPipeThread = std::thread(&SelfPipe::CheckPipe, this);
}

/////////////////////////////////////////////////
SelfPipe::~SelfPipe()
{
this->running = false;
// Write a dummy signal value to the pipe. This is not a real signal, but we
// need to wakeup the CheckPipe thread so it can cleanup properly. The value
// was chosen to make it clear that this is not one of the standard signals.
onSignalWriteToSelfPipe(127);
this->checkPipeThread.join();
}

/////////////////////////////////////////////////
void SelfPipe::Initialize()
{
// We actually need this object to be destructed in order to join the thread,
// so we can't use gz::utils::NeverDestroyed here.
static SelfPipe selfPipe;
}

/////////////////////////////////////////////////
void SelfPipe::CheckPipe()
{
while (this->running)
{
std::uint8_t signal;
if (read(SelfPipe::pipeFd[kReadFd], &signal, 1) != -1)
{
if (this->running)
{
std::lock_guard<std::mutex> lock(gWrapperMutex);
// Send the signal to each wrapper
for (std::pair<int, std::function<void(int)>> func : gOnSignalWrappers)
{
func.second(signal);
}
}
}
else
{
gzerr << errno << " " << std::strerror(errno) << std::endl;
}
}
}

} // namespace

/////////////////////////////////////////////////
class common::SignalHandlerPrivate
{
Expand Down Expand Up @@ -74,14 +209,15 @@ SignalHandler::SignalHandler()
static int counter = 0;
std::lock_guard<std::mutex> lock(gWrapperMutex);

if (std::signal(SIGINT, onSignal) == SIG_ERR)
SelfPipe::Initialize();
if (std::signal(SIGINT, onSignalWriteToSelfPipe) == SIG_ERR)
{
gzerr << "Unable to catch SIGINT.\n"
<< " Please visit http://community.gazebosim.org for help.\n";
return;
}

if (std::signal(SIGTERM, onSignal) == SIG_ERR)
if (std::signal(SIGTERM, onSignalWriteToSelfPipe) == SIG_ERR)
{
gzerr << "Unable to catch SIGTERM.\n"
<< " Please visit http://community.gazebosim.org for help.\n";
Expand Down
40 changes: 40 additions & 0 deletions src/SignalHandler_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// comments when upgrading to gz-cmake's "make codecheck"
#include "gz/common/SignalHandler.hh" // NOLINT(*)
#include <gtest/gtest.h> // NOLINT(*)
#include <chrono>
#include <csignal> // NOLINT(*)
#include <condition_variable> // NOLINT(*)
#include <map> // NOLINT(*)
Expand Down Expand Up @@ -79,6 +80,7 @@ TEST(SignalHandler, Single)
common::SignalHandler handler1;
EXPECT_TRUE(handler1.AddCallback(handler1Cb));
std::raise(SIGTERM);
std::this_thread::sleep_for(std::chrono::milliseconds(11));
EXPECT_EQ(SIGTERM, gHandler1Sig);
}

Expand All @@ -98,6 +100,7 @@ TEST(SignalHandler, Multiple)

std::raise(SIGINT);

std::this_thread::sleep_for(std::chrono::milliseconds(11));
EXPECT_EQ(-1, gHandler1Sig);
EXPECT_EQ(-1, gHandler2Sig);

Expand Down Expand Up @@ -127,6 +130,7 @@ TEST(SignalHandler, InitFailure)

std::raise(SIGINT);

std::this_thread::sleep_for(std::chrono::milliseconds(11));
EXPECT_EQ(-1, gHandler1Sig);
EXPECT_EQ(-1, gHandler2Sig);
}
Expand Down Expand Up @@ -287,3 +291,39 @@ TEST(SignalHandler, MultipleThreads)
for (int i = 0; i < threadCount; ++i)
EXPECT_EQ(SIGINT, results[i]);
}

/////////////////////////////////////////////////
TEST(SignalHandler, RapidFire)
{
resetSignals();
std::condition_variable cv;
std::mutex countMutex;
int countHandlerCalls = 0;
constexpr int kNumSignals = 100;
auto cb = [&](int _sig)
{
if (_sig == SIGTERM)
{
std::lock_guard<std::mutex> lk(countMutex);
++countHandlerCalls;
if (countHandlerCalls >= kNumSignals)
{
cv.notify_one();
}
}
};
common::SignalHandler handler1;
EXPECT_TRUE(handler1.AddCallback(cb));

for (int i=0; i < kNumSignals; ++i)
{
std::raise(SIGTERM);
std::this_thread::sleep_for(std::chrono::microseconds(100));
}

// wait for callback to be called kNumSignal times with a timeout
std::unique_lock<std::mutex> lk(countMutex);
cv.wait_for(lk, std::chrono::seconds(5),
[&] { return countHandlerCalls >= kNumSignals; });
EXPECT_GE(countHandlerCalls, kNumSignals);
}

0 comments on commit b733ac3

Please sign in to comment.