diff --git a/platforms/common/uORB/SubscriptionCallback.hpp b/platforms/common/uORB/SubscriptionCallback.hpp index 97b79d7f0ad0..72e9b5e3546c 100644 --- a/platforms/common/uORB/SubscriptionCallback.hpp +++ b/platforms/common/uORB/SubscriptionCallback.hpp @@ -42,19 +42,11 @@ #include #include -#ifdef CONFIG_BUILD_FLAT -#define CB_LOCK() -#define CB_UNLOCK() -#else -#define CB_LOCK() lock() -#define CB_UNLOCK() unlock() -#endif - namespace uORB { // Subscription wrapper class with callbacks on new publications -class SubscriptionCallback : public SubscriptionInterval +class SubscriptionCallback : public SubscriptionInterval, public ListNode { public: /** @@ -67,52 +59,34 @@ class SubscriptionCallback : public SubscriptionInterval SubscriptionCallback(const orb_metadata *meta, uint32_t interval_us = 0, uint8_t instance = 0) : SubscriptionInterval(meta, interval_us, instance) { -#ifndef CONFIG_BUILD_FLAT - px4_sem_init(&_lock, 1, 1); -#endif } virtual ~SubscriptionCallback() { unregisterCallback(); -#ifndef CONFIG_BUILD_FLAT - px4_sem_destroy(&_lock); -#endif }; bool registerCallback() { - CB_LOCK(); - - if (!registered()) { - if (!orb_advert_valid(_subscription.get_node())) { - // force topic creation - if (!_subscription.subscribe(true)) { - CB_UNLOCK(); - return false; - } + if (!orb_advert_valid(_subscription.get_node())) { + // force topic creation + if (!_subscription.subscribe(true)) { + return false; } + } - if (orb_advert_valid(_subscription.get_node())) { - _cb_handle = DeviceNode::register_callback(_subscription.get_node(), this, -1, _last_update, _interval_us); - } + bool ret = false; + + if (orb_advert_valid(_subscription.get_node())) { + ret = Manager::registerCallback(_subscription.get_node(), this, _last_update, _interval_us, _cb_handle); } - CB_UNLOCK(); - return registered(); + return ret; } void unregisterCallback() { - CB_LOCK(); - - if (registered()) { - uorb_cb_handle_t handle = _cb_handle; - _cb_handle = UORB_INVALID_CB_HANDLE; - DeviceNode::unregister_callback(_subscription.get_node(), handle); - } - - CB_UNLOCK(); + Manager::unregisterCallback(_subscription.get_node(), this, _cb_handle); } /** @@ -148,28 +122,21 @@ class SubscriptionCallback : public SubscriptionInterval virtual void call() = 0; - void do_call() +#ifndef CONFIG_BUILD_FLAT + bool do_call() { - CB_LOCK(); + bool dequeued = DeviceNode::cb_dequeue(_subscription.get_node(), _cb_handle); - // Run the callback if it is still valid - if (registered()) { + if (dequeued) { call(); } - CB_UNLOCK(); + return dequeued; } +#endif bool registered() const { return uorb_cb_handle_valid(_cb_handle); } -private: -#ifndef CONFIG_BUILD_FLAT - /* Make sure the callback remains valid during callback execution */ - - void lock() { do {} while (px4_sem_wait(&_lock) != 0); } - void unlock() { px4_sem_post(&_lock); } - px4_sem_t _lock; /* Lock to protect registered callback */ -#endif protected: uorb_cb_handle_t _cb_handle{UORB_INVALID_CB_HANDLE}; @@ -240,14 +207,15 @@ class SubscriptionPollable : public SubscriptionInterval void registerPoll(int8_t lock_idx) { - _cb_handle = DeviceNode::register_callback(_subscription.get_node(), nullptr, lock_idx, _last_update, _interval_us); + DeviceNode::register_callback(_subscription.get_node(), nullptr, lock_idx, _last_update, _interval_us, _cb_handle); } void unregisterPoll() { - uorb_cb_handle_t handle = _cb_handle; - _cb_handle = UORB_INVALID_CB_HANDLE; - DeviceNode::unregister_callback(_subscription.get_node(), handle); + // Calling this while a poll is not registered is a no-op + if (uorb_cb_handle_valid(_cb_handle)) { + DeviceNode::unregister_callback(_subscription.get_node(), _cb_handle); + } } private: uorb_cb_handle_t _cb_handle{UORB_INVALID_CB_HANDLE}; diff --git a/platforms/common/uORB/uORBDeviceNode.cpp b/platforms/common/uORB/uORBDeviceNode.cpp index fbb93d19c7a2..cfb92a21c3b4 100644 --- a/platforms/common/uORB/uORBDeviceNode.cpp +++ b/platforms/common/uORB/uORBDeviceNode.cpp @@ -599,8 +599,7 @@ uORB::DeviceNode::write(const char *buffer, const orb_metadata *meta, orb_advert /* Mark at least one data has been published */ _data_valid = true; - uORB::DeviceNode *n = node(handle); - IndexedStackHandle callbacks(n->_callbacks); + IndexedStackHandle callbacks(_callbacks); uorb_cb_handle_t cb = callbacks.head(); @@ -608,23 +607,30 @@ uORB::DeviceNode::write(const char *buffer, const orb_metadata *meta, orb_advert EventWaitItem *item = callbacks.peek(cb); if (item->interval_us == 0 || hrt_elapsed_time(&item->last_update) >= item->interval_us) { - if (item->subscriber != nullptr) { + #ifdef CONFIG_BUILD_FLAT + + if (item->subscriber != nullptr) { + // execute callback item->subscriber->call(); -#else - Manager::queueCallback(item->subscriber, item->lock); -#endif + + } else { + // release poll + Manager::unlockThread(item->lock); } - // Release poll waiters (and callback threads in non-flat builds) - if (item->lock != -1) { - if (Manager::isThreadAlive(item->lock)) { - Manager::unlockThread(item->lock); +#else - } else { - remove_cb = cb; - } + // Release poll waiters and callback threads + if (Manager::isThreadAlive(item->lock)) { + __atomic_fetch_add(&item->cb_triggered, 1, __ATOMIC_SEQ_CST); + Manager::unlockThread(item->lock); + + } else { + remove_cb = cb; } + +#endif } cb = callbacks.next(cb); @@ -854,26 +860,18 @@ int uORB::DeviceNode::update_queue_size(unsigned int queue_size) return PX4_OK; } -//TODO: make this a normal member function -uorb_cb_handle_t +bool uORB::DeviceNode::_register_callback(uORB::SubscriptionCallback *cb_sub, - int8_t poll_lock, hrt_abstime last_update, uint32_t interval_us) + int8_t poll_lock, hrt_abstime last_update, uint32_t interval_us, uorb_cb_handle_t &cb_handle) { -#ifndef CONFIG_BUILD_FLAT - // Get the cb lock for this process from the Manager - int8_t cb_lock = poll_lock == -1 ? Manager::getCallbackLock() : poll_lock; -#else int8_t cb_lock = poll_lock; -#endif - - // TODO: Check for duplicate registrations? IndexedStackHandle callbacks(_callbacks); ATOMIC_ENTER; - uorb_cb_handle_t i = callbacks.pop_free(); - EventWaitItem *item = callbacks.peek(i); + cb_handle = callbacks.pop_free(); + EventWaitItem *item = callbacks.peek(cb_handle); #ifdef CONFIG_BUILD_FLAT @@ -881,18 +879,21 @@ uORB::DeviceNode::_register_callback(uORB::SubscriptionCallback *cb_sub, px4_leave_critical_section(flags); item = new EventWaitItem; flags = px4_enter_critical_section(); - i = item; + cb_handle = item; } #endif if (item != nullptr) { item->lock = cb_lock; +#ifdef CONFIG_BUILD_FLAT item->subscriber = cb_sub; +#else + item->cb_triggered = 0; +#endif item->last_update = last_update; item->interval_us = interval_us; - - callbacks.push(i); + callbacks.push(cb_handle); } else { PX4_ERR("register fail\n"); @@ -900,23 +901,27 @@ uORB::DeviceNode::_register_callback(uORB::SubscriptionCallback *cb_sub, ATOMIC_LEAVE; - return i; + return uorb_cb_handle_valid(cb_handle); } -//TODO: make this a normal member function? -void -uORB::DeviceNode::_unregister_callback(uorb_cb_handle_t cb_handle) +bool +uORB::DeviceNode::_unregister_callback(uorb_cb_handle_t &cb_handle) { IndexedStackHandle callbacks(_callbacks); ATOMIC_ENTER; - if (!callbacks.rm(cb_handle)) { + bool ret = callbacks.rm(cb_handle); + + if (!ret) { PX4_ERR("unregister fail\n"); } else { callbacks.push_free(cb_handle); + cb_handle = UORB_INVALID_CB_HANDLE; } ATOMIC_LEAVE; + + return ret; } diff --git a/platforms/common/uORB/uORBDeviceNode.hpp b/platforms/common/uORB/uORBDeviceNode.hpp index 0b072326dd9a..5bcae4da9c50 100644 --- a/platforms/common/uORB/uORBDeviceNode.hpp +++ b/platforms/common/uORB/uORBDeviceNode.hpp @@ -218,18 +218,15 @@ class DeviceNode */ bool copy(void *dst, orb_advert_t &handle, unsigned &generation); - static void orb_callback(int signo, siginfo_t *si_info, void *data); - - static uorb_cb_handle_t register_callback(orb_advert_t &node_handle, SubscriptionCallback *callback_sub, - int8_t poll_lock, - hrt_abstime last_update, uint32_t interval_us) + static bool register_callback(orb_advert_t &node_handle, SubscriptionCallback *callback_sub, int8_t poll_lock, + hrt_abstime last_update, uint32_t interval_us, uorb_cb_handle_t &cb_handle) { - return node(node_handle)->_register_callback(callback_sub, poll_lock, last_update, interval_us); + return node(node_handle)->_register_callback(callback_sub, poll_lock, last_update, interval_us, cb_handle); } - static void unregister_callback(orb_advert_t &node_handle, uorb_cb_handle_t cb_handle) + static bool unregister_callback(orb_advert_t &node_handle, uorb_cb_handle_t &cb_handle) { - node(node_handle)->_unregister_callback(cb_handle); + return node(node_handle)->_unregister_callback(cb_handle); } void *operator new (size_t, void *p) @@ -243,6 +240,21 @@ class DeviceNode const char *get_devname() const {return _devname;} +#ifndef CONFIG_BUILD_FLAT + static bool cb_dequeue(orb_advert_t &node_handle, uorb_cb_handle_t cb) + { + IndexedStackHandle callbacks(node(node_handle)->_callbacks); + EventWaitItem *item = callbacks.peek(cb); + + if (__atomic_load_n(&item->cb_triggered, __ATOMIC_SEQ_CST) > 0) { + __atomic_fetch_sub(&item->cb_triggered, 1, __ATOMIC_SEQ_CST); + return true; + } + + return false; + } +#endif + private: friend uORBTest::UnitTest; @@ -253,11 +265,18 @@ class DeviceNode px4::atomic _generation{0}; /**< object generation count */ struct EventWaitItem { - class SubscriptionCallback *subscriber; hrt_abstime last_update; +#ifdef CONFIG_BUILD_FLAT + class SubscriptionCallback *subscriber; +#else + uint32_t cb_triggered; +#endif uint32_t interval_us; - int8_t lock; uorb_cb_handle_t next; // List ptr + int8_t lock; +#ifndef CONFIG_BUILD_FLAT + +#endif }; IndexedStack _callbacks; @@ -328,10 +347,9 @@ class DeviceNode inline static void *node_data(const orb_advert_t &handle) { return handle.data; } #endif - uorb_cb_handle_t _register_callback(SubscriptionCallback *callback_sub, - int8_t poll_lock, - hrt_abstime last_update, uint32_t interval_us); - void _unregister_callback(uorb_cb_handle_t cb_handle); + bool _register_callback(SubscriptionCallback *callback_sub, int8_t poll_lock, hrt_abstime last_update, + uint32_t interval_us, uorb_cb_handle_t &cb_handle); + bool _unregister_callback(uorb_cb_handle_t &cb_handle); #ifdef CONFIG_BUILD_FLAT char *_devname; diff --git a/platforms/common/uORB/uORBManager.cpp b/platforms/common/uORB/uORBManager.cpp index b60302f2107f..736691672ff6 100644 --- a/platforms/common/uORB/uORBManager.cpp +++ b/platforms/common/uORB/uORBManager.cpp @@ -65,6 +65,8 @@ uORB::Manager *uORB::Manager::_Instance = nullptr; #ifndef CONFIG_BUILD_FLAT int8_t uORB::Manager::per_process_lock = -1; pid_t uORB::Manager::per_process_cb_thread = -1; +List uORB::Manager::per_process_cb_list; +px4_sem_t uORB::Manager::per_process_cb_list_mutex; #endif void uORB::Manager::cleanup() @@ -460,6 +462,11 @@ int uORB::Manager::orb_poll(orb_poll_struct_t *fds, unsigned int nfds, int timeo int8_t uORB::Manager::launchCallbackThread() { + if (px4_sem_init(&per_process_cb_list_mutex, 1, 1) != 0) { + PX4_ERR("Can't initialize cb mutex"); + return -1; + } + per_process_lock = Manager::getThreadLock(); if (per_process_lock < 0) { @@ -488,17 +495,26 @@ uORB::Manager::launchCallbackThread() int uORB::Manager::callback_thread(int argc, char *argv[]) { + int count = 1; + while (true) { - lockThread(per_process_lock); + /* Sleep here waiting for callbacks, lock as many times as it has been unlocked */ + lockThread(per_process_lock, count); - class SubscriptionCallback *sub = dequeueCallback(per_process_lock); + lock_cb_list(); - // Pass nullptr to this thread to exit - if (sub == nullptr) { - break; + count = 0; + + for (auto sub : per_process_cb_list) { + /* Just in cast the callback thread has been starved, + * run all the queued callbacks now + */ + while (sub->do_call()) { + count++; + } } - sub->do_call(); + unlock_cb_list(); } Manager::freeThreadLock(per_process_lock); @@ -509,6 +525,55 @@ uORB::Manager::callback_thread(int argc, char *argv[]) #endif +bool +uORB::Manager::registerCallback(orb_advert_t &node_handle, SubscriptionCallback *callback_sub, + hrt_abstime last_update, uint32_t interval_us, uorb_cb_handle_t &cb_handle) +{ + if (uorb_cb_handle_valid(cb_handle)) { + // double registration, ok + return true; + } + + bool ret = false; + int8_t cb_lock = -1; +#ifndef CONFIG_BUILD_FLAT + cb_lock = getCallbackLock(); + + if (cb_lock >= 0) { +#endif + ret = uORB::DeviceNode::register_callback(node_handle, callback_sub, cb_lock, last_update, + + interval_us, cb_handle); +#ifndef CONFIG_BUILD_FLAT + } + + if (ret) { + lock_cb_list(); + per_process_cb_list.add(callback_sub); + unlock_cb_list(); + } + +#endif + + return ret; +} + +void +uORB::Manager::unregisterCallback(orb_advert_t &node_handle, SubscriptionCallback *callback_sub, + uorb_cb_handle_t &cb_handle) +{ + if (!uorb_cb_handle_valid(cb_handle)) { + // not registered + return; + } + +#ifndef CONFIG_BUILD_FLAT + lock_cb_list(); + per_process_cb_list.remove(callback_sub); + unlock_cb_list(); +#endif + DeviceNode::unregister_callback(node_handle, cb_handle); +} void uORB::Manager::GlobalSemPool::init(void) { diff --git a/platforms/common/uORB/uORBManager.hpp b/platforms/common/uORB/uORBManager.hpp index a47e3213b0fd..014b42482b51 100644 --- a/platforms/common/uORB/uORBManager.hpp +++ b/platforms/common/uORB/uORBManager.hpp @@ -43,6 +43,7 @@ #include // For ORB_ID enum #include #include +#include #ifdef CONFIG_ORB_COMMUNICATOR #include "ORBSet.hpp" @@ -370,15 +371,7 @@ class Manager #ifndef CONFIG_BUILD_FLAT static uint8_t getCallbackLock() { - uint8_t cbLock; - - // TODO: think about if this needs protection, maybe not use the - // same lock as for node advertise/subscribe - - _Instance->lock(); - cbLock = per_process_lock >= 0 ? per_process_lock : launchCallbackThread(); - _Instance->unlock(); - return cbLock; + return per_process_lock >= 0 ? per_process_lock : launchCallbackThread(); } #endif @@ -430,9 +423,17 @@ class Manager px4_munmap(p, sizeof(uORB::Manager)); } - static void lockThread(int idx) + static bool registerCallback(orb_advert_t &node_handle, class SubscriptionCallback *callback_sub, + hrt_abstime last_update, uint32_t interval_us, uorb_cb_handle_t &cb_handle); + + static void unregisterCallback(orb_advert_t &node_handle, class SubscriptionCallback *callback_sub, + uorb_cb_handle_t &cb_handle); + + static void lockThread(int idx, unsigned count = 1) { - _Instance->g_sem_pool.take(idx); + while (count--) { + _Instance->g_sem_pool.take(idx); + } } static void unlockThread(int idx) @@ -444,27 +445,6 @@ class Manager static int8_t getThreadLock() {return _Instance->g_sem_pool.reserve();} - static void queueCallback(class SubscriptionCallback *sub, int idx) - { - if (idx >= 0) { - _Instance->g_sem_pool.cb_lock(idx); - _Instance->g_sem_pool.cb_set(idx, sub); - // The manager is unlocked in callback thread - } - } - - static class SubscriptionCallback *dequeueCallback(int idx) - { - class SubscriptionCallback *sub = nullptr; - - if (idx >= 0) { - sub = _Instance->g_sem_pool.cb_get(idx); - _Instance->g_sem_pool.cb_unlock(idx); - } - - return sub; - } - static bool isThreadAlive(int idx) { int value = _Instance->g_sem_pool.value(idx); @@ -647,24 +627,16 @@ class Manager void release(int8_t i) {_global_sem[i].release(); } int value(int8_t i) { return _global_sem[i].value(); } - void cb_lock(int8_t i) { do {} while (_global_sem[i].cb_lock() != 0); } - void cb_unlock(int8_t i) { _global_sem[i].cb_unlock(); } - void cb_set(int8_t i, class SubscriptionCallback *callback_ptr) { _global_sem[i].cb_set(callback_ptr); } - class SubscriptionCallback *cb_get(int8_t i) { return _global_sem[i].cb_get(); } - class GlobalLock { public: void init() { px4_sem_init(&_sem, 1, 0); - px4_sem_init(&_lock, 1, 1); #if defined(__PX4_NUTTX) sem_setprotocol(&_sem, SEM_PRIO_NONE); - sem_setprotocol(&_lock, SEM_PRIO_NONE); #endif in_use = false; - _callback_ptr = nullptr; } void free() { px4_sem_destroy(&_sem); } int take() { return px4_sem_wait(&_sem); } @@ -673,14 +645,8 @@ class Manager int value() { int value; px4_sem_getvalue(&_sem, &value); return value; } bool in_use{false}; - int cb_lock() { return px4_sem_wait(&_lock); } - void cb_unlock() { px4_sem_post(&_lock); } - void cb_set(class SubscriptionCallback *callback_ptr) { _callback_ptr = callback_ptr; } - class SubscriptionCallback *cb_get() { return _callback_ptr; } private: - class SubscriptionCallback *_callback_ptr {nullptr}; px4_sem_t _sem; /* For signaling to the callback thread */ - px4_sem_t _lock; /* For signaling back from the callback thread */ }; private: @@ -692,8 +658,13 @@ class Manager } g_sem_pool; #ifndef CONFIG_BUILD_FLAT + static void lock_cb_list() { do {} while (px4_sem_wait(&per_process_cb_list_mutex) != 0); } + static void unlock_cb_list() { px4_sem_post(&per_process_cb_list_mutex); } + static int8_t per_process_lock; static pid_t per_process_cb_thread; + static List per_process_cb_list; + static px4_sem_t per_process_cb_list_mutex; #endif }; } // namespace uORB