Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uorb optimization3 #579

Merged
merged 3 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 25 additions & 54 deletions platforms/common/uORB/SubscriptionCallback.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,14 @@
#include <px4_platform_common/px4_work_queue/WorkItem.hpp>
#include <px4_platform_common/posix.h>

#ifdef CONFIG_BUILD_FLAT
#define CB_LOCK()
#define CB_UNLOCK()
#else
#define CB_LOCK() lock()
#define CB_UNLOCK() unlock()
#endif

namespace uORB
{

// Subscription wrapper class with callbacks on new publications
class SubscriptionCallback : public SubscriptionInterval
#ifndef CONFIG_BUILD_FLAT
, public ListNode<SubscriptionCallback *>
#endif
{
public:
/**
Expand All @@ -67,52 +62,34 @@ class SubscriptionCallback : public SubscriptionInterval
SubscriptionCallback(const orb_metadata *meta, uint32_t interval_us = 0, uint8_t instance = 0) :
SubscriptionInterval(meta, interval_us, instance)
{
#ifndef CONFIG_BUILD_FLAT
px4_sem_init(&_lock, 1, 1);
#endif
}

virtual ~SubscriptionCallback()
{
unregisterCallback();
#ifndef CONFIG_BUILD_FLAT
px4_sem_destroy(&_lock);
#endif
};

bool registerCallback()
{
CB_LOCK();

if (!registered()) {
if (!orb_advert_valid(_subscription.get_node())) {
// force topic creation
if (!_subscription.subscribe(true)) {
CB_UNLOCK();
return false;
}
if (!orb_advert_valid(_subscription.get_node())) {
// force topic creation
if (!_subscription.subscribe(true)) {
return false;
}
}

if (orb_advert_valid(_subscription.get_node())) {
_cb_handle = DeviceNode::register_callback(_subscription.get_node(), this, -1, _last_update, _interval_us);
}
bool ret = false;

if (orb_advert_valid(_subscription.get_node())) {
ret = Manager::registerCallback(_subscription.get_node(), this, _last_update, _interval_us, _cb_handle);
}

CB_UNLOCK();
return registered();
return ret;
}

void unregisterCallback()
{
CB_LOCK();

if (registered()) {
uorb_cb_handle_t handle = _cb_handle;
_cb_handle = UORB_INVALID_CB_HANDLE;
DeviceNode::unregister_callback(_subscription.get_node(), handle);
}

CB_UNLOCK();
Manager::unregisterCallback(_subscription.get_node(), this, _cb_handle);
}

/**
Expand Down Expand Up @@ -148,28 +125,21 @@ class SubscriptionCallback : public SubscriptionInterval

virtual void call() = 0;

void do_call()
#ifndef CONFIG_BUILD_FLAT
bool do_call()
{
CB_LOCK();
bool dequeued = DeviceNode::cb_dequeue(_subscription.get_node(), _cb_handle);

// Run the callback if it is still valid
if (registered()) {
if (dequeued) {
call();
}

CB_UNLOCK();
return dequeued;
}
#endif

bool registered() const { return uorb_cb_handle_valid(_cb_handle); }

private:
#ifndef CONFIG_BUILD_FLAT
/* Make sure the callback remains valid during callback execution */

void lock() { do {} while (px4_sem_wait(&_lock) != 0); }
void unlock() { px4_sem_post(&_lock); }
px4_sem_t _lock; /* Lock to protect registered callback */
#endif
protected:

uorb_cb_handle_t _cb_handle{UORB_INVALID_CB_HANDLE};
Expand Down Expand Up @@ -240,14 +210,15 @@ class SubscriptionPollable : public SubscriptionInterval

void registerPoll(int8_t lock_idx)
{
_cb_handle = DeviceNode::register_callback(_subscription.get_node(), nullptr, lock_idx, _last_update, _interval_us);
DeviceNode::register_callback(_subscription.get_node(), nullptr, lock_idx, _last_update, _interval_us, _cb_handle);
}

void unregisterPoll()
{
uorb_cb_handle_t handle = _cb_handle;
_cb_handle = UORB_INVALID_CB_HANDLE;
DeviceNode::unregister_callback(_subscription.get_node(), handle);
// Calling this while a poll is not registered is a no-op
if (uorb_cb_handle_valid(_cb_handle)) {
DeviceNode::unregister_callback(_subscription.get_node(), _cb_handle);
}
}
private:
uorb_cb_handle_t _cb_handle{UORB_INVALID_CB_HANDLE};
Expand Down
71 changes: 38 additions & 33 deletions platforms/common/uORB/uORBDeviceNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,32 +599,38 @@ uORB::DeviceNode::write(const char *buffer, const orb_metadata *meta, orb_advert
/* Mark at least one data has been published */
_data_valid = true;

uORB::DeviceNode *n = node(handle);
IndexedStackHandle<CB_LIST_T> callbacks(n->_callbacks);
IndexedStackHandle<CB_LIST_T> callbacks(_callbacks);

uorb_cb_handle_t cb = callbacks.head();

while (callbacks.handle_valid(cb)) {
EventWaitItem *item = callbacks.peek(cb);

if (item->interval_us == 0 || hrt_elapsed_time(&item->last_update) >= item->interval_us) {
if (item->subscriber != nullptr) {

#ifdef CONFIG_BUILD_FLAT

if (item->subscriber != nullptr) {
// execute callback
item->subscriber->call();
#else
Manager::queueCallback(item->subscriber, item->lock);
#endif

} else {
// release poll
Manager::unlockThread(item->lock);
}

// Release poll waiters (and callback threads in non-flat builds)
if (item->lock != -1) {
if (Manager::isThreadAlive(item->lock)) {
Manager::unlockThread(item->lock);
#else

} else {
remove_cb = cb;
}
// Release poll waiters and callback threads
if (Manager::isThreadAlive(item->lock)) {
__atomic_fetch_add(&item->cb_triggered, 1, __ATOMIC_SEQ_CST);
Manager::unlockThread(item->lock);

} else {
remove_cb = cb;
}

#endif
}

cb = callbacks.next(cb);
Expand Down Expand Up @@ -854,69 +860,68 @@ int uORB::DeviceNode::update_queue_size(unsigned int queue_size)
return PX4_OK;
}

//TODO: make this a normal member function
uorb_cb_handle_t
bool
uORB::DeviceNode::_register_callback(uORB::SubscriptionCallback *cb_sub,
int8_t poll_lock, hrt_abstime last_update, uint32_t interval_us)
int8_t poll_lock, hrt_abstime last_update, uint32_t interval_us, uorb_cb_handle_t &cb_handle)
{
#ifndef CONFIG_BUILD_FLAT
// Get the cb lock for this process from the Manager
int8_t cb_lock = poll_lock == -1 ? Manager::getCallbackLock() : poll_lock;
#else
int8_t cb_lock = poll_lock;
#endif

// TODO: Check for duplicate registrations?

IndexedStackHandle<CB_LIST_T> callbacks(_callbacks);

ATOMIC_ENTER;

uorb_cb_handle_t i = callbacks.pop_free();
EventWaitItem *item = callbacks.peek(i);
cb_handle = callbacks.pop_free();
EventWaitItem *item = callbacks.peek(cb_handle);

#ifdef CONFIG_BUILD_FLAT

if (!item) {
px4_leave_critical_section(flags);
item = new EventWaitItem;
flags = px4_enter_critical_section();
i = item;
cb_handle = item;
}

#endif

if (item != nullptr) {
item->lock = cb_lock;
#ifdef CONFIG_BUILD_FLAT
item->subscriber = cb_sub;
#else
item->cb_triggered = 0;
#endif
item->last_update = last_update;
item->interval_us = interval_us;

callbacks.push(i);
callbacks.push(cb_handle);

} else {
PX4_ERR("register fail\n");
}

ATOMIC_LEAVE;

return i;
return uorb_cb_handle_valid(cb_handle);
}

//TODO: make this a normal member function?
void
uORB::DeviceNode::_unregister_callback(uorb_cb_handle_t cb_handle)
bool
uORB::DeviceNode::_unregister_callback(uorb_cb_handle_t &cb_handle)
{
IndexedStackHandle<CB_LIST_T> callbacks(_callbacks);

ATOMIC_ENTER;

if (!callbacks.rm(cb_handle)) {
bool ret = callbacks.rm(cb_handle);

if (!ret) {
PX4_ERR("unregister fail\n");

} else {
callbacks.push_free(cb_handle);
cb_handle = UORB_INVALID_CB_HANDLE;
}

ATOMIC_LEAVE;

return ret;
}
43 changes: 29 additions & 14 deletions platforms/common/uORB/uORBDeviceNode.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,18 +218,15 @@ class DeviceNode
*/
bool copy(void *dst, orb_advert_t &handle, unsigned &generation);

static void orb_callback(int signo, siginfo_t *si_info, void *data);

static uorb_cb_handle_t register_callback(orb_advert_t &node_handle, SubscriptionCallback *callback_sub,
int8_t poll_lock,
hrt_abstime last_update, uint32_t interval_us)
static bool register_callback(orb_advert_t &node_handle, SubscriptionCallback *callback_sub, int8_t poll_lock,
hrt_abstime last_update, uint32_t interval_us, uorb_cb_handle_t &cb_handle)
{
return node(node_handle)->_register_callback(callback_sub, poll_lock, last_update, interval_us);
return node(node_handle)->_register_callback(callback_sub, poll_lock, last_update, interval_us, cb_handle);
}

static void unregister_callback(orb_advert_t &node_handle, uorb_cb_handle_t cb_handle)
static bool unregister_callback(orb_advert_t &node_handle, uorb_cb_handle_t &cb_handle)
{
node(node_handle)->_unregister_callback(cb_handle);
return node(node_handle)->_unregister_callback(cb_handle);
}

void *operator new (size_t, void *p)
Expand All @@ -243,6 +240,21 @@ class DeviceNode

const char *get_devname() const {return _devname;}

#ifndef CONFIG_BUILD_FLAT
static bool cb_dequeue(orb_advert_t &node_handle, uorb_cb_handle_t cb)
{
IndexedStackHandle<CB_LIST_T> callbacks(node(node_handle)->_callbacks);
EventWaitItem *item = callbacks.peek(cb);

if (__atomic_load_n(&item->cb_triggered, __ATOMIC_SEQ_CST) > 0) {
pussuw marked this conversation as resolved.
Show resolved Hide resolved
__atomic_fetch_sub(&item->cb_triggered, 1, __ATOMIC_SEQ_CST);
return true;
}

return false;
}
#endif

private:
friend uORBTest::UnitTest;

Expand All @@ -253,11 +265,15 @@ class DeviceNode
px4::atomic<unsigned> _generation{0}; /**< object generation count */

struct EventWaitItem {
class SubscriptionCallback *subscriber;
hrt_abstime last_update;
#ifdef CONFIG_BUILD_FLAT
class SubscriptionCallback *subscriber;
#else
uint32_t cb_triggered;
#endif
uint32_t interval_us;
int8_t lock;
uorb_cb_handle_t next; // List ptr
int8_t lock;
};

IndexedStack<CB_LIST_T> _callbacks;
Expand Down Expand Up @@ -328,10 +344,9 @@ class DeviceNode
inline static void *node_data(const orb_advert_t &handle) { return handle.data; }
#endif

uorb_cb_handle_t _register_callback(SubscriptionCallback *callback_sub,
int8_t poll_lock,
hrt_abstime last_update, uint32_t interval_us);
void _unregister_callback(uorb_cb_handle_t cb_handle);
bool _register_callback(SubscriptionCallback *callback_sub, int8_t poll_lock, hrt_abstime last_update,
uint32_t interval_us, uorb_cb_handle_t &cb_handle);
bool _unregister_callback(uorb_cb_handle_t &cb_handle);

#ifdef CONFIG_BUILD_FLAT
char *_devname;
Expand Down
Loading