Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Subscribe handler #382

Open
bddckr opened this issue Nov 3, 2019 · 3 comments
Open

Async Subscribe handler #382

bddckr opened this issue Nov 3, 2019 · 3 comments
Labels
documentation Improvements or additions to public interface documentation (API reference or readme).

Comments

@bddckr
Copy link

bddckr commented Nov 3, 2019

Pretty much exactly what was raised in #94, but that one seems to have been closed without any resolution to one part of the issue:
Currently there is no Func<Message, Task> handler support in IRealtimeChannel.Subscribe/Unsubscribe.

I'm fine with there not being any async API for (Un)Subscribe if doing "async over sync" is OK (read below), but I wonder if it's simply an oversight as there is one for IEventEmitter.On/Off.


I'd like to fetch some data from my backend based on the message coming in. In my particular case the message will basically either tell me the latest state of some backend model I'm "watching", or it will tell me "too much changed, you're better off to just fetch again". The fetching of course does some I/O, thus it's async.

Is an alternative solution to just do GetAwaiter().GetResult() in the Subscribe handler to make it synchronous? I'm not a friend of "async over sync" (or the reverse), but I'm pretty sure no user of Ably's .NET API ever needs a Subscribe handler to synchronize back onto the context Ably is calling their handler from, so we should be fine with Solution A in all cases.

I'm primarily worried that I might accidentally block something on Ably's side here as long as my async I/O runs in the handler. Additionally I'd like to avoid having to manage my own Task queue for these Subscribe handler tasks - I'd have to link them to each Subscribe subscription to prevent Unsubscribe not cancelling the (potentially still queued or already running) tasks.

Internal conv: Internal https://ably-real-time.slack.com/archives/C030C5YLY/p1705494514068369

┆Issue is synchronized with this Jira Task by Unito

@marto83
Copy link
Contributor

marto83 commented Nov 4, 2019

That's a good point. I haven't thought about adding an async handler support for subscribing to Presence and Channel updates even thought there is one in the event emitter.

Currently if you block inside the subscribe function it will be Ok as the websocket is an a separate thread and the updates will be processed but will wait to call the Subscribe handlers.

If you don't care about order (which I think you do) you can just fire and forget the task like this
_ = LongRunningAsyncJob() inside the sync handler.
If you care about the order it is ok to use Solution A from Stephen Cleary's answer and block the execution to process what you need to do.

Just to be on the safe side I added the following test to make sure messages are processed correctly with a long blocking handler.

    public async Task WhenSubscribeHandlerBlocks_ShouldBehaveOk(Protocol protocol)
    {
        var client = await GetRealtimeClient(protocol);
        var channel = client.Channels.Get("test".AddRandomSuffix());
        int processedMessaged = 0;
        List<int> order = new List<int>();
        var taskAwaiter = new TaskCompletionAwaiter(20000);
        channel.Subscribe(m =>
        {
            if (processedMessaged <= 0)
            {
                Task.Delay(15000).WaitAndUnwrapException();
            }

            order.Add(int.Parse(m.Data.ToString()));

            processedMessaged++;
            if (processedMessaged == 3)
            {
                taskAwaiter.SetCompleted();
            }
        });

        await channel.AttachAsync();
        channel.Publish("test", "1");
        channel.Publish("test", "2");
        channel.Publish("test", "3");
        await taskAwaiter.Task;

        order.Should().HaveCount(3);
        order.Should().BeInAscendingOrder();
    }

I'm going to add it to the TODO list for the library.

@bddckr
Copy link
Author

bddckr commented Nov 4, 2019

Thanks for the fast response, and an even bigger thanks for that test as the confirmation! I'll continue with the blocking calls as I do indeed care about the order.

In case you are not using issues here on GitHub for the TODOs feel free to close this issue.

@marto83 marto83 added the enhancement New feature or improved functionality. label Nov 4, 2019
@marto83 marto83 self-assigned this Nov 4, 2019
@marto83
Copy link
Contributor

marto83 commented Nov 4, 2019

Thanks @bddckr. I'll just leave this issue open until we get it resolved :)

@ably-sync-bot ably-sync-bot removed the enhancement New feature or improved functionality. label Oct 7, 2020
@jamienewcomb jamienewcomb added the enhancement New feature or improved functionality. label Oct 7, 2020
@sync-by-unito sync-by-unito bot removed the enhancement New feature or improved functionality. label Feb 6, 2023
@ttypic ttypic added the documentation Improvements or additions to public interface documentation (API reference or readme). label Mar 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to public interface documentation (API reference or readme).
Development

No branches or pull requests

6 participants