Skip to content

Commit

Permalink
Add transient local durability support to publisher and subscriptions…
Browse files Browse the repository at this point in the history
… when using intra-process communication (#2303)

* Add intra process transient local durability support to publisher and subscription

Signed-off-by: Jeffery Hsu <[email protected]>

* Remove durability_is_transient_local_ from publisher_base
Signed-off-by: Jeffery Hsu <[email protected]>

* Design changes that move most transient local publish functionalities out of
intra process manager into intra process manager

Signed-off-by: Jeffery Hsu <[email protected]>

* Move transient local publish to a separate function

Signed-off-by: Jeffery Hsu <[email protected]>

* Remove publisher buffer weak ptr from intra process manager when it associated publisher is removed.

Signed-off-by: Jeffery Hsu <[email protected]>

* Remove incorrectly placed RCLCPP_PUBLIC

Signed-off-by: Jeffery Hsu <[email protected]>

* Add missing RCLCPP_PUBLIC

Signed-off-by: Jeffery Hsu <[email protected]>

* Expand RingBufferImplementation beyond shared_ptr and unique_ptr

Signed-off-by: Jeffery Hsu <[email protected]>

* Comment and format fix

Signed-off-by: Jeffery Hsu <[email protected]>

---------

Signed-off-by: Jeffery Hsu <[email protected]>
  • Loading branch information
jefferyyjhsu authored Jan 18, 2024
1 parent 6f6b5f2 commit 7901bb9
Show file tree
Hide file tree
Showing 20 changed files with 921 additions and 211 deletions.
1 change: 1 addition & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/context.cpp
src/rclcpp/contexts/default_context.cpp
src/rclcpp/detail/add_guard_condition_to_rcl_wait_set.cpp
src/rclcpp/detail/resolve_intra_process_buffer_type.cpp
src/rclcpp/detail/resolve_parameter_overrides.cpp
src/rclcpp/detail/rmw_implementation_specific_payload.cpp
src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ resolve_intra_process_buffer_type(
return resolved_buffer_type;
}

RCLCPP_PUBLIC
rclcpp::IntraProcessBufferType
resolve_intra_process_buffer_type(
const rclcpp::IntraProcessBufferType buffer_type);

} // namespace detail

} // namespace rclcpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_
#define RCLCPP__EXPERIMENTAL__BUFFERS__BUFFER_IMPLEMENTATION_BASE_HPP_

#include <vector>

namespace rclcpp
{
namespace experimental
Expand All @@ -31,6 +33,8 @@ class BufferImplementationBase
virtual BufferT dequeue() = 0;
virtual void enqueue(BufferT request) = 0;

virtual std::vector<BufferT> get_all_data() = 0;

virtual void clear() = 0;
virtual bool has_data() const = 0;
virtual size_t available_capacity() const = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <stdexcept>
#include <type_traits>
#include <utility>
#include <vector>

#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
Expand Down Expand Up @@ -66,6 +67,9 @@ class IntraProcessBuffer : public IntraProcessBufferBase

virtual MessageSharedPtr consume_shared() = 0;
virtual MessageUniquePtr consume_unique() = 0;

virtual std::vector<MessageSharedPtr> get_all_data_shared() = 0;
virtual std::vector<MessageUniquePtr> get_all_data_unique() = 0;
};

template<
Expand Down Expand Up @@ -129,6 +133,16 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
return consume_unique_impl<BufferT>();
}

std::vector<MessageSharedPtr> get_all_data_shared() override
{
return get_all_data_shared_impl();
}

std::vector<MessageUniquePtr> get_all_data_unique() override
{
return get_all_data_unique_impl();
}

bool has_data() const override
{
return buffer_->has_data();
Expand Down Expand Up @@ -243,6 +257,71 @@ class TypedIntraProcessBuffer : public IntraProcessBuffer<MessageT, Alloc, Messa
{
return buffer_->dequeue();
}

// MessageSharedPtr to MessageSharedPtr
template<typename T = BufferT>
typename std::enable_if<
std::is_same<T, MessageSharedPtr>::value,
std::vector<MessageSharedPtr>
>::type
get_all_data_shared_impl()
{
return buffer_->get_all_data();
}

// MessageUniquePtr to MessageSharedPtr
template<typename T = BufferT>
typename std::enable_if<
std::is_same<T, MessageUniquePtr>::value,
std::vector<MessageSharedPtr>
>::type
get_all_data_shared_impl()
{
std::vector<MessageSharedPtr> result;
auto uni_ptr_vec = buffer_->get_all_data();
result.reserve(uni_ptr_vec.size());
for (MessageUniquePtr & uni_ptr : uni_ptr_vec) {
result.emplace_back(std::move(uni_ptr));
}
return result;
}

// MessageSharedPtr to MessageUniquePtr
template<typename T = BufferT>
typename std::enable_if<
std::is_same<T, MessageSharedPtr>::value,
std::vector<MessageUniquePtr>
>::type
get_all_data_unique_impl()
{
std::vector<MessageUniquePtr> result;
auto shared_ptr_vec = buffer_->get_all_data();
result.reserve(shared_ptr_vec.size());
for (MessageSharedPtr shared_msg : shared_ptr_vec) {
MessageUniquePtr unique_msg;
MessageDeleter * deleter = std::get_deleter<MessageDeleter, const MessageT>(shared_msg);
auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
MessageAllocTraits::construct(*message_allocator_.get(), ptr, *shared_msg);
if (deleter) {
unique_msg = MessageUniquePtr(ptr, *deleter);
} else {
unique_msg = MessageUniquePtr(ptr);
}
result.push_back(std::move(unique_msg));
}
return result;
}

// MessageUniquePtr to MessageUniquePtr
template<typename T = BufferT>
typename std::enable_if<
std::is_same<T, MessageUniquePtr>::value,
std::vector<MessageUniquePtr>
>::type
get_all_data_unique_impl()
{
return buffer_->get_all_data();
}
};

} // namespace buffers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_
#define RCLCPP__EXPERIMENTAL__BUFFERS__RING_BUFFER_IMPLEMENTATION_HPP_

#include <memory>
#include <mutex>
#include <stdexcept>
#include <utility>
Expand Down Expand Up @@ -113,6 +114,17 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
return request;
}

/// Get all the elements from the ring buffer
/**
* This member function is thread-safe.
*
* \return a vector containing all the elements from the ring buffer
*/
std::vector<BufferT> get_all_data() override
{
return get_all_data_impl();
}

/// Get the next index value for the ring buffer
/**
* This member function is thread-safe.
Expand Down Expand Up @@ -215,6 +227,71 @@ class RingBufferImplementation : public BufferImplementationBase<BufferT>
return capacity_ - size_;
}

/// Traits for checking if a type is std::unique_ptr
template<typename ...>
struct is_std_unique_ptr final : std::false_type {};
template<class T, typename ... Args>
struct is_std_unique_ptr<std::unique_ptr<T, Args...>> final : std::true_type
{
typedef T Ptr_type;
};

/// Get all the elements from the ring buffer
/**
* This member function is thread-safe.
* Two versions for the implementation of the function.
* One for buffer containing unique_ptr and the other for other types
*
* \return a vector containing all the elements from the ring buffer
*/
template<typename T = BufferT, std::enable_if_t<is_std_unique_ptr<T>::value &&
std::is_copy_constructible<
typename is_std_unique_ptr<T>::Ptr_type
>::value,
void> * = nullptr>
std::vector<BufferT> get_all_data_impl()
{
std::lock_guard<std::mutex> lock(mutex_);
std::vector<BufferT> result_vtr;
result_vtr.reserve(size_);
for (size_t id = 0; id < size_; ++id) {
result_vtr.emplace_back(
new typename is_std_unique_ptr<T>::Ptr_type(
*(ring_buffer_[(read_index_ + id) % capacity_])));
}
return result_vtr;
}

template<typename T = BufferT, std::enable_if_t<
std::is_copy_constructible<T>::value, void> * = nullptr>
std::vector<BufferT> get_all_data_impl()
{
std::lock_guard<std::mutex> lock(mutex_);
std::vector<BufferT> result_vtr;
result_vtr.reserve(size_);
for (size_t id = 0; id < size_; ++id) {
result_vtr.emplace_back(ring_buffer_[(read_index_ + id) % capacity_]);
}
return result_vtr;
}

template<typename T = BufferT, std::enable_if_t<!is_std_unique_ptr<T>::value &&
!std::is_copy_constructible<T>::value, void> * = nullptr>
std::vector<BufferT> get_all_data_impl()
{
throw std::logic_error("Underlined type results in invalid get_all_data_impl()");
return {};
}

template<typename T = BufferT, std::enable_if_t<is_std_unique_ptr<T>::value &&
!std::is_copy_constructible<typename is_std_unique_ptr<T>::Ptr_type>::value,
void> * = nullptr>
std::vector<BufferT> get_all_data_impl()
{
throw std::logic_error("Underlined type in unique_ptr results in invalid get_all_data_impl()");
return {};
}

size_t capacity_;

std::vector<BufferT> ring_buffer_;
Expand Down
Loading

0 comments on commit 7901bb9

Please sign in to comment.