Skip to content

Commit

Permalink
uORB: Optimize callbacks in protected builds
Browse files Browse the repository at this point in the history
In protected builds, put all SubscriptionCallbacks into process specific lists, and in the callback
thread execute them in a loop.

Signed-off-by: Jukka Laitinen <[email protected]>
  • Loading branch information
jlaitine committed Dec 12, 2023
1 parent 4072914 commit 3684258
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 146 deletions.
77 changes: 22 additions & 55 deletions platforms/common/uORB/SubscriptionCallback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,11 @@
#include <px4_platform_common/px4_work_queue/WorkItem.hpp>
#include <px4_platform_common/posix.h>

#ifdef CONFIG_BUILD_FLAT
#define CB_LOCK()
#define CB_UNLOCK()
#else
#define CB_LOCK() lock()
#define CB_UNLOCK() unlock()
#endif

namespace uORB
{

// Subscription wrapper class with callbacks on new publications
class SubscriptionCallback : public SubscriptionInterval
class SubscriptionCallback : public SubscriptionInterval, public ListNode<SubscriptionCallback *>
{
public:
/**
Expand All @@ -67,52 +59,34 @@ class SubscriptionCallback : public SubscriptionInterval
SubscriptionCallback(const orb_metadata *meta, uint32_t interval_us = 0, uint8_t instance = 0) :
SubscriptionInterval(meta, interval_us, instance)
{
#ifndef CONFIG_BUILD_FLAT
px4_sem_init(&_lock, 1, 1);
#endif
}

virtual ~SubscriptionCallback()
{
unregisterCallback();
#ifndef CONFIG_BUILD_FLAT
px4_sem_destroy(&_lock);
#endif
};

bool registerCallback()
{
CB_LOCK();

if (!registered()) {
if (!orb_advert_valid(_subscription.get_node())) {
// force topic creation
if (!_subscription.subscribe(true)) {
CB_UNLOCK();
return false;
}
if (!orb_advert_valid(_subscription.get_node())) {
// force topic creation
if (!_subscription.subscribe(true)) {
return false;
}
}

if (orb_advert_valid(_subscription.get_node())) {
_cb_handle = DeviceNode::register_callback(_subscription.get_node(), this, -1, _last_update, _interval_us);
}
bool ret = false;

if (orb_advert_valid(_subscription.get_node())) {
ret = Manager::registerCallback(_subscription.get_node(), this, _last_update, _interval_us, _cb_handle);
}

CB_UNLOCK();
return registered();
return ret;
}

void unregisterCallback()
{
CB_LOCK();

if (registered()) {
uorb_cb_handle_t handle = _cb_handle;
_cb_handle = UORB_INVALID_CB_HANDLE;
DeviceNode::unregister_callback(_subscription.get_node(), handle);
}

CB_UNLOCK();
Manager::unregisterCallback(_subscription.get_node(), this, _cb_handle);
}

/**
Expand Down Expand Up @@ -148,28 +122,20 @@ class SubscriptionCallback : public SubscriptionInterval

virtual void call() = 0;

#ifndef CONFIG_BUILD_FLAT
void do_call()
{
CB_LOCK();
bool dequeued = uORB::Manager::dequeueCallback(_subscription.get_node(), _cb_handle);

if (dequeued) {

// Run the callback if it is still valid
if (registered()) {
call();
}

CB_UNLOCK();
}
#endif

bool registered() const { return uorb_cb_handle_valid(_cb_handle); }

private:
#ifndef CONFIG_BUILD_FLAT
/* Make sure the callback remains valid during callback execution */

void lock() { do {} while (px4_sem_wait(&_lock) != 0); }
void unlock() { px4_sem_post(&_lock); }
px4_sem_t _lock; /* Lock to protect registered callback */
#endif
protected:

uorb_cb_handle_t _cb_handle{UORB_INVALID_CB_HANDLE};
Expand Down Expand Up @@ -240,14 +206,15 @@ class SubscriptionPollable : public SubscriptionInterval

void registerPoll(int8_t lock_idx)
{
_cb_handle = DeviceNode::register_callback(_subscription.get_node(), nullptr, lock_idx, _last_update, _interval_us);
DeviceNode::register_callback(_subscription.get_node(), nullptr, lock_idx, _last_update, _interval_us, _cb_handle);
}

void unregisterPoll()
{
uorb_cb_handle_t handle = _cb_handle;
_cb_handle = UORB_INVALID_CB_HANDLE;
DeviceNode::unregister_callback(_subscription.get_node(), handle);
// Calling this while a poll is not registered is a no-op
if (uorb_cb_handle_valid(_cb_handle)) {
DeviceNode::unregister_callback(_subscription.get_node(), _cb_handle);
}
}
private:
uorb_cb_handle_t _cb_handle{UORB_INVALID_CB_HANDLE};
Expand Down
71 changes: 38 additions & 33 deletions platforms/common/uORB/uORBDeviceNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,32 +599,38 @@ uORB::DeviceNode::write(const char *buffer, const orb_metadata *meta, orb_advert
/* Mark at least one data has been published */
_data_valid = true;

uORB::DeviceNode *n = node(handle);
IndexedStackHandle<CB_LIST_T> callbacks(n->_callbacks);
IndexedStackHandle<CB_LIST_T> callbacks(_callbacks);

uorb_cb_handle_t cb = callbacks.head();

while (callbacks.handle_valid(cb)) {
EventWaitItem *item = callbacks.peek(cb);

if (item->interval_us == 0 || hrt_elapsed_time(&item->last_update) >= item->interval_us) {
if (item->subscriber != nullptr) {

#ifdef CONFIG_BUILD_FLAT

if (item->subscriber != nullptr) {
// execute callback
item->subscriber->call();
#else
Manager::queueCallback(item->subscriber, item->lock);
#endif

} else {
// release poll
Manager::unlockThread(item->lock);
}

// Release poll waiters (and callback threads in non-flat builds)
if (item->lock != -1) {
if (Manager::isThreadAlive(item->lock)) {
Manager::unlockThread(item->lock);
#else

} else {
remove_cb = cb;
}
// Release poll waiters and callback threads
if (Manager::isThreadAlive(item->lock)) {
item->cb_triggered = true;
Manager::unlockThread(item->lock);

} else {
remove_cb = cb;
}

#endif
}

cb = callbacks.next(cb);
Expand Down Expand Up @@ -854,69 +860,68 @@ int uORB::DeviceNode::update_queue_size(unsigned int queue_size)
return PX4_OK;
}

//TODO: make this a normal member function
uorb_cb_handle_t
bool
uORB::DeviceNode::_register_callback(uORB::SubscriptionCallback *cb_sub,
int8_t poll_lock, hrt_abstime last_update, uint32_t interval_us)
int8_t poll_lock, hrt_abstime last_update, uint32_t interval_us, uorb_cb_handle_t &cb_handle)
{
#ifndef CONFIG_BUILD_FLAT
// Get the cb lock for this process from the Manager
int8_t cb_lock = poll_lock == -1 ? Manager::getCallbackLock() : poll_lock;
#else
int8_t cb_lock = poll_lock;
#endif

// TODO: Check for duplicate registrations?

IndexedStackHandle<CB_LIST_T> callbacks(_callbacks);

ATOMIC_ENTER;

uorb_cb_handle_t i = callbacks.pop_free();
EventWaitItem *item = callbacks.peek(i);
cb_handle = callbacks.pop_free();
EventWaitItem *item = callbacks.peek(cb_handle);

#ifdef CONFIG_BUILD_FLAT

if (!item) {
px4_leave_critical_section(flags);
item = new EventWaitItem;
flags = px4_enter_critical_section();
i = item;
cb_handle = item;
}

#endif

if (item != nullptr) {
item->lock = cb_lock;
#ifdef CONFIG_BUILD_FLAT
item->subscriber = cb_sub;
#else
item->cb_triggered = false;
#endif
item->last_update = last_update;
item->interval_us = interval_us;

callbacks.push(i);
callbacks.push(cb_handle);

} else {
PX4_ERR("register fail\n");
}

ATOMIC_LEAVE;

return i;
return uorb_cb_handle_valid(cb_handle);
}

//TODO: make this a normal member function?
void
uORB::DeviceNode::_unregister_callback(uorb_cb_handle_t cb_handle)
bool
uORB::DeviceNode::_unregister_callback(uorb_cb_handle_t &cb_handle)
{
IndexedStackHandle<CB_LIST_T> callbacks(_callbacks);

ATOMIC_ENTER;

if (!callbacks.rm(cb_handle)) {
bool ret = callbacks.rm(cb_handle);

if (!ret) {
PX4_ERR("unregister fail\n");

} else {
callbacks.push_free(cb_handle);
cb_handle = UORB_INVALID_CB_HANDLE;
}

ATOMIC_LEAVE;

return ret;
}
42 changes: 30 additions & 12 deletions platforms/common/uORB/uORBDeviceNode.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,18 +218,15 @@ class DeviceNode
*/
bool copy(void *dst, orb_advert_t &handle, unsigned &generation);

static void orb_callback(int signo, siginfo_t *si_info, void *data);

static uorb_cb_handle_t register_callback(orb_advert_t &node_handle, SubscriptionCallback *callback_sub,
int8_t poll_lock,
hrt_abstime last_update, uint32_t interval_us)
static bool register_callback(orb_advert_t &node_handle, SubscriptionCallback *callback_sub, int8_t poll_lock,
hrt_abstime last_update, uint32_t interval_us, uorb_cb_handle_t &cb_handle)
{
return node(node_handle)->_register_callback(callback_sub, poll_lock, last_update, interval_us);
return node(node_handle)->_register_callback(callback_sub, poll_lock, last_update, interval_us, cb_handle);
}

static void unregister_callback(orb_advert_t &node_handle, uorb_cb_handle_t cb_handle)
static bool unregister_callback(orb_advert_t &node_handle, uorb_cb_handle_t &cb_handle)
{
node(node_handle)->_unregister_callback(cb_handle);
return node(node_handle)->_unregister_callback(cb_handle);
}

void *operator new (size_t, void *p)
Expand All @@ -243,6 +240,24 @@ class DeviceNode

const char *get_devname() const {return _devname;}

int8_t cb_dequeue(uorb_cb_handle_t cb)
{
IndexedStackHandle<CB_LIST_T> callbacks(_callbacks);
EventWaitItem *item = callbacks.peek(cb);
#ifndef CONFIG_BUILD_FLAT
int8_t ret = -1;

if (item->cb_triggered) {
item->cb_triggered = false;
ret = item->lock;
}

return ret;
#else
return item->lock;
#endif
}

private:
friend uORBTest::UnitTest;

Expand All @@ -253,7 +268,11 @@ class DeviceNode
px4::atomic<unsigned> _generation{0}; /**< object generation count */

struct EventWaitItem {
#ifdef CONFIG_BUILD_FLAT
class SubscriptionCallback *subscriber;
#else
bool cb_triggered;
#endif
hrt_abstime last_update;
uint32_t interval_us;
int8_t lock;
Expand Down Expand Up @@ -328,10 +347,9 @@ class DeviceNode
inline static void *node_data(const orb_advert_t &handle) { return handle.data; }
#endif

uorb_cb_handle_t _register_callback(SubscriptionCallback *callback_sub,
int8_t poll_lock,
hrt_abstime last_update, uint32_t interval_us);
void _unregister_callback(uorb_cb_handle_t cb_handle);
bool _register_callback(SubscriptionCallback *callback_sub, int8_t poll_lock, hrt_abstime last_update,
uint32_t interval_us, uorb_cb_handle_t &cb_handle);
bool _unregister_callback(uorb_cb_handle_t &cb_handle);

#ifdef CONFIG_BUILD_FLAT
char *_devname;
Expand Down
Loading

0 comments on commit 3684258

Please sign in to comment.