Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to properly reconsume messages after their processing failed in High-Level balanced consumer? #4841

Open
fdr400 opened this issue Sep 7, 2024 Discussed in #4722 · 2 comments

Comments

@fdr400
Copy link

fdr400 commented Sep 7, 2024

Discussed in #4722

Originally posted by fdr400 May 17, 2024
Hello everyone! I am currently implementing the driver for Kafka in @userver-framework. The code is available here: https://github.com/userver-framework/userver/tree/develop/kafka

It provides an interface for processing message batches received from the topics consumer has subscribed on. Common use-case of consumer looks like this:

consumer.Start([](kafka::MessageBatchView batch) {
    /* message batch processing */
   consumer.AsyncCommit(); // wrapper for rd_kafka_commit(consumer_handle, 1)
});

kafka::MessagesBatchView is an alias for std::span<const kafka::Message>, where kafka::Message is a C++ wrapper for rd_kafka_message_t*

The library periodically polls the message batches and invokes the user-provided callback passing the message batch view.

I am trying to support the message reconsumption when the callback throws an exception.
To implement such logic, current implementation closes the consumer and subscribes for the same topics, after catching the exception, to start reading the topic partition from the last committed offset.

But the implementation looks not optimal, because, for example, when using an EAGER rebalance strategy, after closing the consumer, all partitions are firstly revoked from all consumers in the current group and then assigned to them again.

I read lots of issues in the repo and googled a lot, but, unfortunately, did not find the, how to reread not committed messages
using the balanced consumer.

I saw several solutions to reread the messages, but, as I understand, the won't work when consumer subscribed to topics:

  1. First solution is to fetch the current consumer assignment with rd_kafka_assignment and assign it to consumer with rd_kafka_assign. But in many issues I read that rd_kafka_assign assigns the static set to consumer and can not be used outside of rebalance_callback
  2. Second, is to use the rd_kafka_seek to move the fetched offsets back to the commited values. But, again, I read that seek can used only after manual partition assignment with rd_kafka_assign
  3. The last solution is to cache the unprocessed message batch to pass it again and again to the use callback, until the callback invokation succeeded. But in such solution consumer stops the polling loop and may be kicked from the consumer group after some time

So, the question is, how properly reconsume the uncommitted messages using the balanced consumer (rd_kafka_subscribe), with minimum overhead?

@emasab
Copy link
Contributor

emasab commented Oct 15, 2024

Hi @fdr400 this is the correct thing to do

Second, is to use the rd_kafka_seek to move the fetched offsets back to the commited values. But, again, I read that seek can used only after manual partition assignment with rd_kafka_assign

It's possible even when subscribing, given you seek only partitions that are assigned to you, that you can check with rd_kafka_assignment. You should use rd_kafka_seek_partitions to seek.

Check out this example:

static void rewind_consumer(rd_kafka_t *consumer) {

You should probably need also the rebalance_cb to check if partitions are revoked from your member. That callback is called on consume, so after processing your last consumed message, but partition isn't revoked until you call assign(NULL) or incremental_unassign() inside that one.

@fdr400
Copy link
Author

fdr400 commented Oct 16, 2024

@emasab thanks for your answer! I'll try to do so.
But it is not obvious from documentation, because rd_kafka_seek* functions are in Simple Consumer API group

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants