Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Access the low level Kafka consumer / producer from the Kafka connector #955

Closed
loicmathieu opened this issue Jan 26, 2021 · 14 comments · Fixed by #1177
Closed

Access the low level Kafka consumer / producer from the Kafka connector #955

loicmathieu opened this issue Jan 26, 2021 · 14 comments · Fixed by #1177

Comments

@loicmathieu
Copy link
Contributor

For some low level stuff, we need to have access from the channel the corresponding Kafka Consumer / Producer.

Idealy, those needs to be recoverable from CDI (directly of via the connector).

This allow for example to manyally pause a consumer.

@ferencbeutel4711
Copy link

this is badly needed for any kind of error handling on connectivity issues :(

@cescoffier
Copy link
Contributor

How would it help you do handle connectivity issues?

@cescoffier
Copy link
Contributor

(just to be sure I design the API correctly)

@ferencbeutel4711
Copy link

ferencbeutel4711 commented Mar 29, 2021

hi, please imagine the following example:

Kafka -> Quarkus App with smallrye -> Database

So in this example, the application is reading from a kafka topic, does a transformation on the data and writes the result into a database.
If there is a connectivity issue with the database, some form of error handling is neccessary. One solution could be to pause the consumption with an exponentially increasing timeout for example. However, this is currently to my knowledge not possible since smallrye is only providing 3 forms of error handling:

  • completely and permanently stop consumption
  • write the message to a DLQ (and loose ordering/ introduces synchronization issues between the DLQ and the "main" queue)
  • skip the message completely (which looses the data)

If it would be possible to for example inject a KafkaConsumer into a @Incoming-Function, the exponential pause (or any other form of error handling) could be implemented there.

For our use case specifically it would actually be perfect if there were a higher-level form of error handling which just means "try again later (after x ms for example, or maybe even on fulfilling a given prefix which would be tried periodically)" - however, that wouldnt be as flexible for general usage

@cescoffier
Copy link
Contributor

In the 3.x version, the consumption will be automatically paused if there are no requests.

Also, you can use Fault-Tolerance to add a retry (I'm not sure about the backoff - @Ladicek is this supported?).
You would be able to achieve the same with something like:

Multi<X> processAndRetryOnFailure(Multi<X> multi) {
       multi // For each item, write and retry on failure (use concatenate to preserve ordering if needed)
          .onItem().transformToUniAndConcatenate(x -> {
             // Write to DB and retry
              Uni.createFrom().item(x)
                  .onItem().transformToUni(x -> writeInDatabase(x))
                  .onFailure().retry().withBackoff(Duration.ofSeconds(10)).atMost(100)
          });
}

So, with the next version, when a failure happens, it will stops requesting, and so pause the consumption.

@ferencbeutel4711
Copy link

Hi, thank you for your answer. This does sound very promising, however, I still have some questions concerning your code example:

  • Does this mean that the message will be kept in memory until it eventually could be processed successfully?
  • will new messages be consumed from the source in the meantime and added to the Multi?
  • Will the offset commit only be done on success?

@cescoffier
Copy link
Contributor

Does this mean that the message will be kept in memory until it eventually could be processed successfully?

yes, however, as the offset is not committed, if it crashed, it will re-process it.

will new messages be consumed from the source in the meantime and added to the Multi?

only the current batch (the number of records contained in a batch is configurable). Once the number of stored messages is greater than the requests, it pauses the consumption.

Will the offset commit only be done on success?

Yes, if post-acknowledgement is used.

@Ladicek
Copy link
Collaborator

Ladicek commented Mar 30, 2021

Unrelated, but since @cescoffier asked -- no, MicroProfile Fault Tolerance's @Retry doesn't support backoff (we agreed we want it, but couldn't agree on the API :-) ).

@cescoffier
Copy link
Contributor

@Ladicek isn't this something we could add as a smallrye only thing?

@Ladicek
Copy link
Collaborator

Ladicek commented Mar 30, 2021

I did think about that, yes (mostly because I wanted to show how the API should look like from my perspective, but also because it's useful :-) ). The thing with @Retry is that it's already overcrowded and hardcodes a single backoff strategy (constant delay). I have created an issue in SRye FT: smallrye/smallrye-fault-tolerance#394

@ferencbeutel4711
Copy link

Hello again,

thank you so much for your ongoing help with all of our questions. We have tried out the mentioned approach today and can report that the proposed solution does work for us. We still have 3 questions though:

  • Right now, we had to implement another inMemory-Channel to get the other channel started. Our assumption is that since the implementation proposal returns a Multi again, there needs to be some sort of terminating consumer for the whole Multi-Stream. Is there a smarter way of doing this than to define a (otherwise useless) @Incoming-Channel, which is consuming on payload-level?
  • We have read a lot about acknowledgement and offset-committing for the throttled-scenario. In case we would like to acknowledge manually, would it suffice to simply call message.ack() after the writeInDatabase call from your example?
  • In general, is this approach of error handling and working directly on the Multi-Stream common practice or are we far off the beaten track here?

Looking forward to reading your response!

@cescoffier
Copy link
Contributor

Right now, we had to implement another inMemory-Channel to get the other channel started. Our assumption is that since the implementation proposal returns a Multi again, there needs to be some sort of terminating consumer for the whole Multi-Stream. Is there a smarter way of doing this than to define a (otherwise useless) @Incoming-Channel, which is consuming on payload-level?

There is an issue about that. Unfortunately, it's not done yet, and this new signature it not supported by the current specification.

We have read a lot about acknowledgement and offset-committing for the throttled-scenario. In case we would like to acknowledge manually, would it suffice to simply call message.ack() after the writeInDatabase call from your example?
In general, is this approach of error handling and working directly on the Multi-Stream common practice or are we far off the beaten track here?

You can use message.ack() and message.nack(...), it's a right way as soon as you need to customize acknowledgements.

@ferencbeutel4711
Copy link

alright, thank you again so much for your support :) I think this is a good way to start including error handling and are eagerly awaiting version 3.x!

@pawel-ochrymowicz
Copy link

pawel-ochrymowicz commented Apr 21, 2021

Hi,

I have another case which requires the access to underlying kafka-consumer.

I have topics:
price
price-retry-0
price-retry-n

I want to limit interval of polling in underlying kafka-consumer to achieve following behaviour using consumers from above topics:
price - (stream processing, no added delay)
price-retry-0 (lower delay, 0-60s)
price-retry-n (higher delay, 60s-600s)

At this moment, the only solution I see is to use smallrye abstractions to consume from "price" topic and plain consumers for "price-retry-*" topics.

What do you think about making consumer polling configurable?

cescoffier added a commit that referenced this issue Apr 26, 2021
Provide access to the low-level Kafka client.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants