Skip to content

Commit

Permalink
[wjwwood] Updated "Data race fixes" (#2500)
Browse files Browse the repository at this point in the history
* Fix callback group logic in executor

Signed-off-by: Janosch Machowinski <[email protected]>

* fix: Fixed unnecessary copy of wait_set

Signed-off-by: Janosch Machowinski <[email protected]>

* fix(executor): Fixed race conditions with rebuild of wait_sets

Before this change, the rebuild of wait set would be triggered
after the wait set was waken up. With bad timing, this could
lead to the rebuild not happening with multi threaded executor.

Signed-off-by: Janosch Machowinski <[email protected]>

* fix(Executor): Fixed lost of entities rebuild request


Signed-off-by: Janosch Machowinski <[email protected]>

* chore: Added assert for not set callback_group in execute_any_executable

Signed-off-by: Janosch Machowinski <[email protected]>

* Add test for cbg getting reset

Signed-off-by: Michael Carroll <[email protected]>
Co-authored-by: Janosch Machowinski <[email protected]>

* chore: renamed test cases to snake_case

Signed-off-by: Janosch Machowinski <[email protected]>

* style

Signed-off-by: William Woodall <[email protected]>

* fixup test to avoid polling and short timeouts

Signed-off-by: William Woodall <[email protected]>

* fix: Use correct notify_waitable_ instance

Signed-off-by: Janosch Machowinski <[email protected]>

* fix(StaticSingleThreadedExecutor): Added missing special case handling for current_notify_waitable_

Signed-off-by: Janosch Machowinski <[email protected]>

* fix(TestCallbackGroup): Fixed test after change to timers

Signed-off-by: Janosch Machowinski <[email protected]>

---------

Signed-off-by: Janosch Machowinski <[email protected]>
Signed-off-by: Janosch Machowinski <[email protected]>
Signed-off-by: Michael Carroll <[email protected]>
Signed-off-by: William Woodall <[email protected]>
Co-authored-by: Janosch Machowinski <[email protected]>
Co-authored-by: Michael Carroll <[email protected]>
Co-authored-by: Janosch Machowinski <[email protected]>
  • Loading branch information
4 people authored Apr 12, 2024
1 parent dec22a2 commit 90e4511
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 73 deletions.
9 changes: 9 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,15 @@ class Executor
AnyExecutable & any_executable,
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));

/// This function triggers a recollect of all entities that are registered to the executor.
/**
* Calling this function is thread safe.
*
* \param[in] notify if true will execute a trigger that will wake up a waiting executor
*/
void
trigger_entity_recollect(bool notify);

/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;

Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/wait_result.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ class WaitResult final

if (this->kind() == WaitResultKind::Ready) {
auto & wait_set = this->get_wait_set();
auto rcl_wait_set = wait_set.get_rcl_wait_set();
auto & rcl_wait_set = wait_set.get_rcl_wait_set();
while (next_waitable_index_ < wait_set.size_of_waitables()) {
auto cur_waitable = wait_set.waitables(next_waitable_index_++);
if (cur_waitable != nullptr && cur_waitable->is_ready(rcl_wait_set)) {
Expand Down
125 changes: 59 additions & 66 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <algorithm>
#include <cassert>
#include <chrono>
#include <iterator>
#include <memory>
Expand Down Expand Up @@ -72,13 +73,10 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
}
});

notify_waitable_->set_on_ready_callback(
[this](auto, auto) {
this->entities_need_rebuild_.store(true);
});

notify_waitable_->add_guard_condition(interrupt_guard_condition_);
notify_waitable_->add_guard_condition(shutdown_guard_condition_);

wait_set_.add_waitable(notify_waitable_);
}

Executor::~Executor()
Expand Down Expand Up @@ -122,6 +120,20 @@ Executor::~Executor()
}
}

void Executor::trigger_entity_recollect(bool notify)
{
this->entities_need_rebuild_.store(true);

if (!spinning.load() && entities_need_rebuild_.exchange(false)) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}

if (notify) {
interrupt_guard_condition_->trigger();
}
}

std::vector<rclcpp::CallbackGroup::WeakPtr>
Executor::get_all_callback_groups()
{
Expand Down Expand Up @@ -152,19 +164,12 @@ Executor::add_callback_group(
(void) node_ptr;
this->collector_.add_callback_group(group_ptr);

if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}

if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group add: ") + ex.what());
}
try {
this->trigger_entity_recollect(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group add: ") + ex.what());
}
}

Expand All @@ -173,19 +178,12 @@ Executor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_pt
{
this->collector_.add_node(node_ptr);

if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}

if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node add: ") + ex.what());
}
try {
this->trigger_entity_recollect(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node add: ") + ex.what());
}
}

Expand All @@ -196,18 +194,12 @@ Executor::remove_callback_group(
{
this->collector_.remove_callback_group(group_ptr);

if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}
if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group remove: ") + ex.what());
}
try {
this->trigger_entity_recollect(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on callback group remove: ") + ex.what());
}
}

Expand All @@ -222,19 +214,12 @@ Executor::remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node
{
this->collector_.remove_node(node_ptr);

if (!spinning.load()) {
std::lock_guard<std::mutex> guard(mutex_);
this->collect_entities();
}

if (notify) {
try {
interrupt_guard_condition_->trigger();
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node remove: ") + ex.what());
}
try {
this->trigger_entity_recollect(notify);
} catch (const rclcpp::exceptions::RCLError & ex) {
throw std::runtime_error(
std::string(
"Failed to trigger guard condition on node remove: ") + ex.what());
}
}

Expand Down Expand Up @@ -379,6 +364,10 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
return;
}

assert(
(void("cannot execute an AnyExecutable without a valid callback group"),
any_exec.callback_group));

if (any_exec.timer) {
TRACETOOLS_TRACEPOINT(
rclcpp_executor_execute,
Expand All @@ -403,9 +392,7 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
}

// Reset the callback_group, regardless of type
if (any_exec.callback_group) {
any_exec.callback_group->can_be_taken_from().store(true);
}
any_exec.callback_group->can_be_taken_from().store(true);
}

template<typename Taker, typename Handler>
Expand Down Expand Up @@ -642,7 +629,6 @@ Executor::collect_entities()
// In the case that an entity already has an expired weak pointer
// before being removed from the waitset, additionally prune the waitset.
this->wait_set_.prune_deleted_entities();
this->entities_need_rebuild_.store(false);
}

void
Expand All @@ -655,7 +641,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)

{
std::lock_guard<std::mutex> guard(mutex_);
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
this->collect_entities();
}
}
Expand All @@ -664,6 +650,13 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
RCUTILS_LOG_WARN_NAMED(
"rclcpp",
"empty wait set received in wait(). This should never happen.");
} else {
if (this->wait_result_->kind() == WaitResultKind::Ready && current_notify_waitable_) {
auto & rcl_wait_set = this->wait_result_->get_wait_set().get_rcl_wait_set();
if (current_notify_waitable_->is_ready(rcl_wait_set)) {
current_notify_waitable_->execute(current_notify_waitable_->take_data());
}
}
}
}

Expand All @@ -689,7 +682,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
auto entity_iter = current_collection_.timers.find(timer->get_timer_handle().get());
if (entity_iter != current_collection_.timers.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
current_timer_index++;
continue;
}
Expand Down Expand Up @@ -719,7 +712,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
subscription->get_subscription_handle().get());
if (entity_iter != current_collection_.subscriptions.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.subscription = subscription;
Expand All @@ -735,7 +728,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
auto entity_iter = current_collection_.services.find(service->get_service_handle().get());
if (entity_iter != current_collection_.services.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.service = service;
Expand All @@ -751,7 +744,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
auto entity_iter = current_collection_.clients.find(client->get_client_handle().get());
if (entity_iter != current_collection_.clients.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.client = client;
Expand All @@ -767,7 +760,7 @@ Executor::get_next_ready_executable(AnyExecutable & any_executable)
auto entity_iter = current_collection_.waitables.find(waitable.get());
if (entity_iter != current_collection_.waitables.end()) {
auto callback_group = entity_iter->second.callback_group.lock();
if (callback_group && !callback_group->can_be_taken_from()) {
if (!callback_group || !callback_group->can_be_taken_from()) {
continue;
}
any_executable.waitable = waitable;
Expand Down
10 changes: 5 additions & 5 deletions rclcpp/src/rclcpp/executors/executor_entities_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entity_iter->second.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
if (!entity->call()) {
Expand All @@ -176,7 +176,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entity_iter->second.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
rclcpp::AnyExecutable exec;
Expand All @@ -196,7 +196,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entity_iter->second.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
rclcpp::AnyExecutable exec;
Expand All @@ -216,7 +216,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entity_iter->second.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
rclcpp::AnyExecutable exec;
Expand All @@ -236,7 +236,7 @@ ready_executables(
continue;
}
auto group_info = group_cache(entry.callback_group);
if (group_info && !group_info->can_be_taken_from().load()) {
if (!group_info || !group_info->can_be_taken_from().load()) {
continue;
}
rclcpp::AnyExecutable exec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ StaticSingleThreadedExecutor::spin_once_impl(std::chrono::nanoseconds timeout)
std::optional<rclcpp::WaitResult<rclcpp::WaitSet>>
StaticSingleThreadedExecutor::collect_and_wait(std::chrono::nanoseconds timeout)
{
if (current_collection_.empty() || this->entities_need_rebuild_.load()) {
if (this->entities_need_rebuild_.exchange(false) || current_collection_.empty()) {
this->collect_entities();
}
auto wait_result = wait_set_.wait(std::chrono::nanoseconds(timeout));
Expand All @@ -119,6 +119,13 @@ StaticSingleThreadedExecutor::collect_and_wait(std::chrono::nanoseconds timeout)
"rclcpp",
"empty wait set received in wait(). This should never happen.");
return {};
} else {
if (wait_result.kind() == WaitResultKind::Ready && current_notify_waitable_) {
auto & rcl_wait_set = wait_result.get_wait_set().get_rcl_wait_set();
if (current_notify_waitable_->is_ready(rcl_wait_set)) {
current_notify_waitable_->execute(current_notify_waitable_->take_data());
}
}
}
return wait_result;
}
Expand Down
9 changes: 9 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,15 @@ if(TARGET test_executors)
target_link_libraries(test_executors_timer_cancel_behavior ${PROJECT_NAME} ${rosgraph_msgs_TARGETS})
endif()

ament_add_gtest(
test_executors_callback_group_behavior
executors/test_executors_callback_group_behavior.cpp
APPEND_LIBRARY_DIRS "${append_library_dirs}"
TIMEOUT 180)
if(TARGET test_executors)
target_link_libraries(test_executors_callback_group_behavior ${PROJECT_NAME})
endif()

ament_add_gtest(
test_executors_intraprocess
executors/test_executors_intraprocess.cpp
Expand Down
Loading

0 comments on commit 90e4511

Please sign in to comment.