Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for client and server message buffering #28

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Commits on Jun 4, 2021

  1. Use opts instead of co

    jvican committed Jun 4, 2021
    Configuration menu
    Copy the full SHA
    fcd33b6 View commit details
    Browse the repository at this point in the history
  2. Configuration menu
    Copy the full SHA
    359e5c9 View commit details
    Browse the repository at this point in the history
  3. Remove unnecessary CallOptionsMethods

    It has been renamed to `ClientCallOptionsApi` so it's no longer needed.
    jvican committed Jun 4, 2021
    Configuration menu
    Copy the full SHA
    5d89d68 View commit details
    Browse the repository at this point in the history
  4. Implement client buffering with asyncBoundary

    One of the current problems with our implementation is that `request` is
    only called when the downstream consumer finishes the processing of a
    message received from the server.
    
    That is, if the server sends 10 messages on the same pipe and then the
    client consumer is busy processing the first message, the other nine
    messages will not be populated in the observable and might not even
    reach the client grpc netty buffer. Only when the client finishes
    processing the first message, it then calls `request`, processes another
    message, and the same pattern continues where every message is pulled
    right before it's going to be processed and not before. This means that
    there is obviously a performance penalty and that such behavior can
    change the way the consumer processes messages that can be sent or
    processed in parallel, resulting in unexpected user semantics.
    
    This commit uses `asyncBoundary` to fix this problem and have monix-grpc
    populate an internal buffer with as many messages as they fit, while
    still allowing the consumers to process these messages individually
    without waiting to completely fill the buffer. If the server sends more
    messages than they fit in the buffer, then `asyncBoundary` stops
    consuming messages from the grpc source and consequently stop running
    `request` calls, so the grpc runtime doesn't call `onMessage` anymore
    until the messages in the buffer are processed downstream.
    
    This behavior is great because the messages processed by `onMessage` are
    added to a hot observable and `asyncBoundary` gives us all the benefits
    of a backpressured source without actually using that source. As a
    result, this means our hot observable will never grow larger than our
    buffer because `onMessage` is only called when `request` is called.
    
    So this simple solution actually gives us the semantics we want.
    However, there are still some things to improve, namely:
    
    - Users might want to specify a "low tide" to this buffer. That is, a
      number below which we will always rehydrate the buffer until
      completing it but above which we will not rehydrate it. This argument
      gives the client the flexibility to optimize for memory if it doesn't
      want the buffer filled up to its limit to save space.
    - We might want to `request` several messages at once instead of one at
      a time when repopulating the buffer. It's not clear if this gives us
      any benefit actually as grpc's internal engine is likely already
      buffering things in Netty byte buffers and calling `request` for each
      message might not poise any performance penalty.
    jvican committed Jun 4, 2021
    Configuration menu
    Copy the full SHA
    9f04101 View commit details
    Browse the repository at this point in the history

Commits on Jun 14, 2021

  1. Add server call options API + support server-side buffering

    A similar adjustment to the one made for the client call options API.
    I've taken the libery to redesign how server call options are passed in
    regardless of binary compatibility as we don't promise backwards
    compatibility until we make a stable release >= 1.0.
    
    I have added some more documentation to the server call to make it clear
    that we are trying to replicate the API in the underlying grpc java
    library.
    jvican committed Jun 14, 2021
    Configuration menu
    Copy the full SHA
    104b3b1 View commit details
    Browse the repository at this point in the history