You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I wanted to use Stream.groupedWithin to emit chunks from a stream when either the count or a timeout was reached. For my application, the upstream was an effect pulling from an AWS SQS Queue.
SQS Queue polling produces up to N messages from the queue, which are then unavailable to other pollers for a visibility timeout. If the downstream decides that it can no longer process messages, the aggregator shouldn't get ahead of the downstream, and the current implementation of ChannelExecuter seems like it defaults to an upstream pull strategy of 'after all enqueued', which means that as soon as its has successfully pushed messages to the downstream, it starts polling the upstream for further messages.
In the case of SQS polling, you would want to only poll for new messages if you know you are going to attempt to process them, or you incur an SQS costs for the redundant read, and then either incur another charge for placing making the messages visible in the queue again, or increased latency by waiting for the message to return to visible.
Aside from SQS, other source might have a resource usage 'cost' for reads that aren't then processed.
The downstream in this case might want to limit processing for a certain message count, total duration, or on encountering an error such as further downstream limits (like Lambda invocation limits).
What is the feature you are proposing to solve the problem?
It seems the the ChannelExecuter that underpins Streams supports the concept of 'after all enqueued' and 'on next' upstream polling. The stock implementation is referenced by the aggregateWithinEither implementation, and does not appear to expose it to allow a consumer of the stream to switch to the 'on next' strategy, which would appear to ensure that the upstream pull isn't attempted until the downstream ask for further messages.
A mechanism could be provided that allows either to expose the ChannelExcecutor, or allow options to be passed to any of the grouped/aggregate functions that specifies the desired upstream pull strategy. Or maybe this could just be a modifier that a stream can be piped through (e.g. Stream.upstreamPullStrategy('on next' or similar).
The implementation of aggregateWithinEither is non-trivial, and the apparent inability to influence the upstream pull strategy makes it a black box that cannot be reused.
What alternatives have you considered?
Bounded Queues or Mailboxes were explored, but these lack the ability to aggregate without recourse to piping to a stream, which this makes the whole pipeline eager.
In the examples, a workaround of using Effect.sleep(0) was used in the producer, but this makes an assumption that a consumer can influence the producers implementation, and I suspect whilst it works in the examples, its timing dependent, merely creating a Fiber context switch long enough to allow the downstream to consume before the upstream is polled again.
Note that in the examples quoted in the discord discussion, sometimes the stream becomes fully eager (comsuming all of the upstream messages) and other times is only 1 pull ahead of the consumer, which suggests operation is non-deterministic due to timing (when not using the Effect.sleep(0)).
The text was updated successfully, but these errors were encountered:
@dmeehan1968 - thank you very much for the detailed explanation of the issue! However, it's a bit hard to investigate the problem without a minimal reproduction.
Is there any chance you could provide us with a minimal reproduction of the problem so that we can investigate? 🙏
@IMax153 Just to draw attention to the feeling that I think this is, at least in part, actually a facet of Streams - they seem not to expose the Channels UpstreamPullStrategy, and the ChannelExecutor is an imported dependency, as opposed to an injected dependency, so we are stuck with the default implementation (although I may be missing on a compositional trick that could be used).
Because streams appear to pull from upstream as soon as there is space in the downstream queue, they are effectively 'ahead' of what the downstream consumer may want to do. Aggregation is merely an example of how this can impact.
As I pointed out in this issue, how far ahead the upstream can get seems to be a produce of downstream consumption timing (hence how the Effect.sleep can alter the output). In my first example in the discussion (https://effect.website/play#90406f35099d) this seems to read the entire upstream before the downstream has consumed anything, at least some of the time. Other times it is only 1 ahead, even though the code is identical - hence my mention about non-deterministic).
I realised after posting this that its likely a bigger issue than merely when aggregates are used, but I couldn't find another way of aggregating short of a custom implementation.
What is the problem this feature would solve?
Discord discussion: https://discord.com/channels/795981131316985866/1308468509067710484
I wanted to use
Stream.groupedWithin
to emit chunks from a stream when either the count or a timeout was reached. For my application, the upstream was an effect pulling from an AWS SQS Queue.SQS Queue polling produces up to N messages from the queue, which are then unavailable to other pollers for a visibility timeout. If the downstream decides that it can no longer process messages, the aggregator shouldn't get ahead of the downstream, and the current implementation of ChannelExecuter seems like it defaults to an upstream pull strategy of 'after all enqueued', which means that as soon as its has successfully pushed messages to the downstream, it starts polling the upstream for further messages.
In the case of SQS polling, you would want to only poll for new messages if you know you are going to attempt to process them, or you incur an SQS costs for the redundant read, and then either incur another charge for placing making the messages visible in the queue again, or increased latency by waiting for the message to return to visible.
Aside from SQS, other source might have a resource usage 'cost' for reads that aren't then processed.
The downstream in this case might want to limit processing for a certain message count, total duration, or on encountering an error such as further downstream limits (like Lambda invocation limits).
What is the feature you are proposing to solve the problem?
It seems the the ChannelExecuter that underpins Streams supports the concept of 'after all enqueued' and 'on next' upstream polling. The stock implementation is referenced by the aggregateWithinEither implementation, and does not appear to expose it to allow a consumer of the stream to switch to the 'on next' strategy, which would appear to ensure that the upstream pull isn't attempted until the downstream ask for further messages.
A mechanism could be provided that allows either to expose the ChannelExcecutor, or allow options to be passed to any of the grouped/aggregate functions that specifies the desired upstream pull strategy. Or maybe this could just be a modifier that a stream can be piped through (e.g.
Stream.upstreamPullStrategy('on next'
or similar).The implementation of aggregateWithinEither is non-trivial, and the apparent inability to influence the upstream pull strategy makes it a black box that cannot be reused.
What alternatives have you considered?
Bounded Queues or Mailboxes were explored, but these lack the ability to aggregate without recourse to piping to a stream, which this makes the whole pipeline eager.
In the examples, a workaround of using
Effect.sleep(0)
was used in the producer, but this makes an assumption that a consumer can influence the producers implementation, and I suspect whilst it works in the examples, its timing dependent, merely creating a Fiber context switch long enough to allow the downstream to consume before the upstream is polled again.Note that in the examples quoted in the discord discussion, sometimes the stream becomes fully eager (comsuming all of the upstream messages) and other times is only 1 pull ahead of the consumer, which suggests operation is non-deterministic due to timing (when not using the
Effect.sleep(0)
).The text was updated successfully, but these errors were encountered: