Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add and remove subscription from waitset depending on wait state #98

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

tobiasstarkwayve
Copy link
Contributor

The current subscriber handler suffers from a busy-waiting loop if r2rs internal 10-element queue fills up. If that ever happens, handle_incoming will return immediately whenever invoked by the waitset and the waitset will wake immediately when going to sleep.

This PR addresses this issue by removing the internal message queue between waitset and subscriber. Instead, it explicitly implements the Stream interface for ROS subscribers, adding them to the waitset if and only if someone is actually blocking for new messages. This ensures that a subscriber that falls behind and queues up messages in the (DDS) queue never wakes the waitset.

As a proof of concept (and to facilitate discussion of the approach), only the regular TypedSubscriber is implemented for now. The other subscriber- and service types will be implemented analogously once agreement on the approach is achieved.

@tobiasstarkwayve tobiasstarkwayve force-pushed the remove-intermediate-channels-for-subscriptions branch 2 times, most recently from 5fa069d to 6985a0c Compare June 11, 2024 07:43
@tobiasstarkwayve tobiasstarkwayve force-pushed the remove-intermediate-channels-for-subscriptions branch from 6985a0c to 09f9819 Compare June 11, 2024 08:36
@tobiasstarkwayve
Copy link
Contributor Author

@m-dahl What do you think about the approach in this PR? Do you agree with my problem diagnosis? Would you merge a version of this PR that implements the shown approach for all the r2r subscriber classes?

@m-dahl
Copy link
Collaborator

m-dahl commented Jun 18, 2024

Hi Tobias,

I really appreciate this work, removing the intermediate channels will be a nice improvement. The channels are there mostly because they were easy to implement when I did the switch to async years ago, but this is much better and more direct. I can't really see any downside to the approach as long as the guard condition is correctly implemented.

I will be happy to merge this once finished. We should make sure its well tested. (I don't really trust the tests that failed in the CI run for this PR, those need to be improved as well). I don't have much time right now but in a couple of weeks I will be able dig in to this and improve the tests. In the meantime I can try to support if you need anything.

@tobiasstarkwayve
Copy link
Contributor Author

@m-dahl FYI, I haven't forgotten about this PR. It just turns out to be trickier than I thought due to the teardown behaviour. As you know r2r allows callers to drop the subscription and keep the tokio stream alive (at which point the stream returns None). But since this PR implies that the stream interacts with the ROS subscriptions, we can no longer call rcl_subscription_fini when the r2r subscription is destroyed. If we did this, any pending stream would be operating on a dangling pointer.

I'm still trying to figure out what the best way to handle this is. My current plan is to have some sort of flag (either atomic or mutex-protected) next to the Waker object that informs the stream that the node is dead and turn the poll_next call into something of the form

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<T>> {
       if self.node_is_dead.lock().unwrap() { // could potentially be an atomic
             return Poll::Ready(None);
       }
       // current poll_next implementation
}

What do you think? Does that make sense?

@tobiasstarkwayve tobiasstarkwayve force-pushed the remove-intermediate-channels-for-subscriptions branch from c81dc6f to ba50f0f Compare July 15, 2024 12:26
@m-dahl
Copy link
Collaborator

m-dahl commented Oct 1, 2024

My apologies for not getting back to you sooner. I haven't had time for open source stuff lately, was hoping to look in to this during summer but it never happened. Did you ever implement this idea? I think the plan you describe makes sense but curious if there were any other challenges.

@tobiasstarkwayve
Copy link
Contributor Author

My apologies for not getting back to you sooner. I haven't had time for open source stuff lately, was hoping to look in to this during summer but it never happened. Did you ever implement this idea? I think the plan you describe makes sense but curious if there were any other challenges.

No worries, I was having similar issues on our side internally. I have an implementation for the approach, but I didn't have a good opportunity to test this internally through the summer.

We recently picked this up again and are currently in the process of testing things out. Once we're done with our testing cycle here I'll push an updated version and get back to you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants