Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomic refactor for lock-free ringbuffer [Foxy] #302

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ouster-ros/src/os_driver_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,13 @@ class OusterDriver : public OusterSensor {
static_cast<int64_t>(ptp_utc_tai_offset * 1e+9));
}

virtual void on_lidar_packet_msg(const uint8_t* raw_lidar_packet) override {
if (lidar_packet_handler) lidar_packet_handler(raw_lidar_packet);
void process_lidar_packet() override {
if (lidar_packet_handler) lidar_packet_handler(lidar_packet.buf.data());
}

virtual void on_imu_packet_msg(const uint8_t* raw_imu_packet) override {
void process_imu_packet() override {
if (imu_packet_handler)
imu_pub->publish(imu_packet_handler(raw_imu_packet));
imu_pub->publish(imu_packet_handler(imu_packet.buf.data()));
}

virtual void cleanup() override {
Expand Down
10 changes: 8 additions & 2 deletions ouster-ros/src/os_sensor_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ void OusterSensor::on_lidar_packet_msg(const uint8_t* raw_lidar_packet) {
// now we are focusing on optimizing the code for OusterDriver
std::memcpy(lidar_packet.buf.data(), raw_lidar_packet,
lidar_packet.buf.size());
lidar_packet_pub->publish(lidar_packet);
process_lidar_packet();
}

void OusterSensor::on_imu_packet_msg(const uint8_t* raw_imu_packet) {
Expand All @@ -849,9 +849,15 @@ void OusterSensor::on_imu_packet_msg(const uint8_t* raw_imu_packet) {
// OusterSensor has its own RingBuffer of PacketMsg but for
// now we are focusing on optimizing the code for OusterDriver
std::memcpy(imu_packet.buf.data(), raw_imu_packet, imu_packet.buf.size());
imu_packet_pub->publish(imu_packet);
process_imu_packet();
}

void OusterSensor::process_lidar_packet() {
lidar_packet_pub->publish(lidar_packet);
}

void OusterSensor::process_imu_packet() { imu_packet_pub->publish(imu_packet); }

} // namespace ouster_ros

#include <rclcpp_components/register_node_macro.hpp>
Expand Down
14 changes: 10 additions & 4 deletions ouster-ros/src/os_sensor_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ class OusterSensor : public OusterSensorNodeBase {

virtual void create_publishers();

virtual void on_lidar_packet_msg(const uint8_t* raw_lidar_packet);
virtual void process_lidar_packet();

virtual void on_imu_packet_msg(const uint8_t* raw_imu_packet);
virtual void process_imu_packet();

virtual void cleanup();

Expand Down Expand Up @@ -141,15 +141,21 @@ class OusterSensor : public OusterSensorNodeBase {

void stop_packet_processing_threads();

void on_lidar_packet_msg(const uint8_t* raw_lidar_packet);

void on_imu_packet_msg(const uint8_t* raw_imu_packet);

protected:
ouster_sensor_msgs::msg::PacketMsg lidar_packet;
ouster_sensor_msgs::msg::PacketMsg imu_packet;

private:
std::string sensor_hostname;
std::string staged_config;
std::string active_config;
std::string mtp_dest;
bool mtp_main;
std::shared_ptr<sensor::client> sensor_client;
ouster_sensor_msgs::msg::PacketMsg lidar_packet;
ouster_sensor_msgs::msg::PacketMsg imu_packet;
rclcpp_lifecycle::LifecyclePublisher<ouster_sensor_msgs::msg::PacketMsg>::SharedPtr
lidar_packet_pub;
rclcpp_lifecycle::LifecyclePublisher<ouster_sensor_msgs::msg::PacketMsg>::SharedPtr
Expand Down
211 changes: 152 additions & 59 deletions ouster-ros/src/thread_safe_ring_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#pragma once

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <vector>
Expand All @@ -22,13 +23,15 @@ class ThreadSafeRingBuffer {
item_size(item_size_),
max_items_count(items_count_),
active_items_count(0),
write_idx(0),
read_idx(0) {}
write_idx(SIZE_MAX),
read_idx(SIZE_MAX),
new_data_lock(mutex, std::defer_lock),
free_space_lock(mutex, std::defer_lock) {}

/**
* Gets the maximum number of items that this ring buffer can hold.
*/
size_t capacity() const { return max_items_count; }
[[nodiscard]] size_t capacity() const { return max_items_count; }

/**
* Gets the number of item that currently occupy the ring buffer. This
Expand All @@ -39,10 +42,7 @@ class ThreadSafeRingBuffer {
* this does not guarantee that a subsequent call to read() or write()
* wouldn't cause the calling thread to be blocked.
*/
size_t size() const {
std::lock_guard<std::mutex> lock(mutex);
return active_items_count;
}
[[nodiscard]] size_t size() const { return active_items_count.load(); }

/**
* Checks if the ring buffer is empty.
Expand All @@ -51,10 +51,7 @@ class ThreadSafeRingBuffer {
* if empty() returns true this does not guarantee that calling the write()
* operation directly right after wouldn't block the calling thread.
*/
bool empty() const {
std::lock_guard<std::mutex> lock(mutex);
return active_items_count == 0;
}
[[nodiscard]] bool empty() const { return active_items_count.load() == 0; }

/**
* Checks if the ring buffer is full.
Expand All @@ -63,84 +60,180 @@ class ThreadSafeRingBuffer {
* if full() returns true this does not guarantee that calling the read()
* operation directly right after wouldn't block the calling thread.
*/
bool full() const {
std::lock_guard<std::mutex> lock(mutex);
return active_items_count == max_items_count;
[[nodiscard]] bool full() const {
return active_items_count.load() == capacity();
}

/**
* Writes to the buffer safely, the method will keep blocking until the
* Writes to the buffer safely, this method will keep blocking until the
* there is a space available within the buffer.
*/
template <class BufferWriteFn>
void write(BufferWriteFn&& buffer_write) {
std::unique_lock<std::mutex> lock(mutex);
fullCondition.wait(lock,
[this] { return active_items_count < capacity(); });
buffer_write(&buffer[write_idx * item_size]);
write_idx = (write_idx + 1) % capacity();
++active_items_count;
emptyCondition.notify_one();
free_space_lock.lock();
free_space_condition.wait(free_space_lock, [this] { return !full(); });
free_space_lock.unlock();
perform_write(buffer_write);
}

/**
* Writes to the buffer safely, if there is not space left then this method
* will overite the last item.
* Writes to the buffer safely, if there is no space left, then this method
* will overwrite the last item.
*/
template <class BufferWriteFn>
void write_overwrite(BufferWriteFn&& buffer_write) {
std::unique_lock<std::mutex> lock(mutex);
buffer_write(&buffer[write_idx * item_size]);
write_idx = (write_idx + 1) % capacity();
if (active_items_count < capacity()) {
++active_items_count;
} else {
read_idx = (read_idx + 1) % capacity();
}
emptyCondition.notify_one();
perform_write(buffer_write);
}

/**
* Gives access to read the buffer through a callback, the method will block
* until there is something to read is available.
* Writes to the buffer safely, this method will return immediately and if
* there is no space left, the data will not be written (will be dropped).
*/
template <class BufferWriteFn>
void write_nonblock(BufferWriteFn&& buffer_write) {
if (!full()) perform_write(buffer_write);
}

/**
* Gives access to read the buffer through a callback, this method will block
* until there is something to read available.
*/
template <typename BufferReadFn>
void read(BufferReadFn&& buffer_read) {
std::unique_lock<std::mutex> lock(mutex);
emptyCondition.wait(lock, [this] { return active_items_count > 0; });
buffer_read(&buffer[read_idx * item_size]);
read_idx = (read_idx + 1) % capacity();
--active_items_count;
fullCondition.notify_one();
new_data_lock.lock();
new_data_condition.wait(new_data_lock, [this] { return !empty(); });
new_data_lock.unlock();
perform_read(buffer_read);
}

/**
* Gives access to read the buffer through a callback, if buffer is
* inaccessible the method will timeout and buffer_read gets a nullptr.
* inaccessible this method will timeout and the callback is not performed.
*/
template <typename BufferReadFn>
void read_timeout(BufferReadFn&& buffer_read,
std::chrono::seconds timeout) {
std::unique_lock<std::mutex> lock(mutex);
if (emptyCondition.wait_for(
lock, timeout, [this] { return active_items_count > 0; })) {
buffer_read(&buffer[read_idx * item_size]);
read_idx = (read_idx + 1) % capacity();
--active_items_count;
fullCondition.notify_one();
} else {
buffer_read((uint8_t*)nullptr);
new_data_lock.lock();
if (new_data_condition.wait_for(
new_data_lock, timeout, [this] { return !empty(); })) {
new_data_lock.unlock();
perform_read(buffer_read);
return;
}
new_data_lock.unlock();
}

/**
* Gives access to read the buffer through a callback, this method will return
* immediately and the callback is performed only when there is data available.
*/
template <typename BufferReadFn>
void read_nonblock(BufferReadFn&& buffer_read) {
if (!empty()) perform_read(buffer_read);
}

protected:
/**
* Resets the write_idx to an initial value.
* @remarks
* Should be mostly used by tests to allow reading of the final item left
* in the buffer or restarting the test scenario.
*/
void reset_write_idx() { write_idx = SIZE_MAX; }

/**
* Resets the read_idx to an initial value.
* @remarks
* Should be mostly used by tests to allow restarting the test scenario.
*/
void reset_read_idx() { read_idx = SIZE_MAX; }

private:
/**
* Performs the actual sequence of operations for writing.
* @tparam BufferWriteFn
* @param buffer_write
*/
template <class BufferWriteFn>
void perform_write(BufferWriteFn&& buffer_write) {
buffer_write(&buffer[increment_with_capacity(write_idx) * item_size]);
push();
new_data_condition.notify_all();
}

/**
* Performs the actual sequence of operations for reading.
* @tparam BufferReadFn
* @param buffer_read
* @remarks
* If this function attempts to read using an index currently held by the
* writer, it will not perform the operations.
*/
template <typename BufferReadFn>
void perform_read(BufferReadFn&& buffer_read) {
if (incremented_with_capacity(read_idx.load()) != write_idx.load()) {
buffer_read(&buffer[increment_with_capacity(read_idx) * item_size]);
pop();
free_space_condition.notify_one();
}
}

/**
* Atomically increments a given index, wrapping around with the buffer capacity.
* Also returns the incremented value so that only a single atomic load is
* performed for this operation.
* @param idx Reference to the index to be incremented.
* @return The new incremented value of the index.
*/
[[nodiscard]] size_t increment_with_capacity(std::atomic_size_t &idx) const {
const size_t incremented = (idx.load() + 1) % capacity();
idx = incremented;
return incremented;
}

/**
* Returns an incremented value of the given index, wrapping around with the
* buffer capacity. This function does not modify the given index.
* @param idx Current index value.
* @return Incremented value of the given index.
*/
[[nodiscard]] size_t incremented_with_capacity(const size_t idx) const {
return (idx + 1) % capacity();
}

/**
* Atomically increments the buffer active elements count, clamping at the
* buffer capacity.
*/
void push() {
size_t overflow = capacity() + 1;
++active_items_count;
active_items_count.compare_exchange_strong(overflow, capacity());
}

/**
* Atomically decrements the buffer active elements count, clamping at zero.
*/
void pop() {
size_t overflow = SIZE_MAX;
--active_items_count;
active_items_count.compare_exchange_strong(overflow, 0);
}

std::vector<uint8_t> buffer;
size_t item_size;
size_t max_items_count;
size_t active_items_count;
size_t write_idx;
size_t read_idx;
mutable std::mutex mutex;
std::condition_variable fullCondition;
std::condition_variable emptyCondition;

const size_t item_size;
const size_t max_items_count;

std::atomic_size_t active_items_count;
std::atomic_size_t write_idx;
std::atomic_size_t read_idx;

std::mutex mutex;
std::condition_variable new_data_condition;
std::unique_lock<std::mutex> new_data_lock;
std::condition_variable free_space_condition;
std::unique_lock<std::mutex> free_space_lock;

friend class ThreadSafeRingBufferTest;
};
Loading