Skip to content

Commit

Permalink
uorb: Move global _callback_ptr to per thread callback pointer
Browse files Browse the repository at this point in the history
There is one semaphore lock per thread already, this same mechanism can
be used to pass the callbacks too.
  • Loading branch information
pussuw committed Oct 3, 2023
1 parent 7174127 commit db35ead
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 30 deletions.
2 changes: 1 addition & 1 deletion platforms/common/uORB/uORBDeviceNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ uORB::DeviceNode::write(const char *buffer, const orb_metadata *meta, orb_advert
#ifdef CONFIG_BUILD_FLAT
item->subscriber->call();
#else
Manager::queueCallback(item->subscriber);
Manager::queueCallback(item->subscriber, item->lock);
#endif
}

Expand Down
10 changes: 1 addition & 9 deletions platforms/common/uORB/uORBManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,12 @@ uORB::Manager::Manager()
PX4_DEBUG("SEM INIT FAIL: ret %d", ret);
}

ret = px4_sem_init(&_callback_lock, 1, 1);

if (ret != 0) {
PX4_DEBUG("SEM INIT FAIL: ret %d", ret);
}

g_sem_pool.init();
}

uORB::Manager::~Manager()
{
px4_sem_destroy(&_lock);
px4_sem_destroy(&_callback_lock);
}

int uORB::Manager::orb_exists(const struct orb_metadata *meta, int instance)
Expand Down Expand Up @@ -493,8 +486,7 @@ uORB::Manager::callback_thread(int argc, char *argv[])
while (true) {
lockThread(per_process_lock);

SubscriptionCallback *sub = _Instance->_callback_ptr;
_Instance->unlock_callbacks();
SubscriptionCallback *sub = dequeueCallback(per_process_lock);

// Pass nullptr to this thread to exit
if (sub == nullptr) {
Expand Down
45 changes: 25 additions & 20 deletions platforms/common/uORB/uORBManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,20 @@ class Manager

static int8_t getThreadLock() {return _Instance->g_sem_pool.reserve();}

static void queueCallback(class SubscriptionCallback *sub)
static void queueCallback(class SubscriptionCallback *sub, int idx)
{
_Instance->lock_callbacks();
_Instance->_callback_ptr = sub;
_Instance->g_sem_pool.cb_lock(idx);
_Instance->g_sem_pool.cb_set(idx, sub);
// The manager is unlocked in callback thread
}

static class SubscriptionCallback *dequeueCallback(int idx)
{
class SubscriptionCallback *sub = _Instance->g_sem_pool.cb_get(idx);
_Instance->g_sem_pool.cb_unlock(idx);
return sub;
}

static bool isThreadAlive(int idx)
{
int value = _Instance->g_sem_pool.value(idx);
Expand Down Expand Up @@ -616,17 +623,6 @@ class Manager
// Global cache for advertised uORB node instances
uint16_t g_has_publisher[ORB_TOPICS_COUNT + 1];

// This (system global) variable is used to pass the subsriber
// pointer to the callback thread. This is in Manager, since
// it needs to be mapped for both advertisers and the subscribers
class SubscriptionCallback *_callback_ptr {nullptr};

// This mutex protects the above pointer for one writer at a time
px4_sem_t _callback_lock;

void lock_callbacks() { do {} while (px4_sem_wait(&_callback_lock) != 0); }
void unlock_callbacks() { px4_sem_post(&_callback_lock); }

// A global pool of semaphores for
// 1) poll locks
// 2) callback thread signalling (except in NuttX flat build)
Expand All @@ -645,14 +641,21 @@ class Manager
void release(int8_t i) {_global_sem[i].release(); }
int value(int8_t i) { return _global_sem[i].value(); }

void cb_lock(int8_t i) { do {} while (_global_sem[i].cb_lock() != 0); }
void cb_unlock(int8_t i) { _global_sem[i].cb_unlock(); }
void cb_set(int8_t i, struct SubscriptionCallback *callback_ptr) { _global_sem[i].cb_set(callback_ptr); }
struct SubscriptionCallback *cb_get(int8_t i) { return _global_sem[i].cb_get(); }

class GlobalLock
{
public:
void init()
{
px4_sem_init(&_sem, 1, 0);
px4_sem_init(&_lock, 1, 1);
#if __PX4_NUTTX
sem_setprotocol(&_sem, SEM_PRIO_NONE);
sem_setprotocol(&_lock, SEM_PRIO_NONE);
#endif
in_use = false;
}
Expand All @@ -670,16 +673,18 @@ class Manager
int value() { int value; px4_sem_getvalue(&_sem, &value); return value; }
bool in_use{false};

int cb_lock() { return px4_sem_wait(&_lock); }
void cb_unlock() { px4_sem_post(&_lock); }
void cb_set(struct SubscriptionCallback *callback_ptr) { _callback_ptr = callback_ptr; }
struct SubscriptionCallback *cb_get() { return _callback_ptr; }
private:
struct SubscriptionCallback *subscriber;
px4_sem_t _sem;
struct SubscriptionCallback *_callback_ptr {nullptr};
px4_sem_t _sem; /* For signaling to the callback thread */
px4_sem_t _lock; /* For signaling back from the callback thread */
};
private:

void lock()
{
do {} while (px4_sem_wait(&_semLock) != 0);
}
void lock() { do {} while (px4_sem_wait(&_semLock) != 0); }
void unlock() { px4_sem_post(&_semLock); }

GlobalLock _global_sem[NUM_GLOBAL_SEMS];
Expand Down

0 comments on commit db35ead

Please sign in to comment.