Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rotate Interval doesn't work for low volume or irregular traffic #329

Open
dcsobral opened this issue May 18, 2018 · 5 comments
Open

Rotate Interval doesn't work for low volume or irregular traffic #329

dcsobral opened this issue May 18, 2018 · 5 comments

Comments

@dcsobral
Copy link
Contributor

I was looking at the logs on our QA environment and got very concerned about using it in production, because the rotate interval configurations (both rotate.interval.ms and rotate.schedule.interval.ms) do not actually work as described.

Both settings are only looked at when a call to write() is made, which only happens if Connect sends records to the task.

If the traffic comes in spurts, data can stay uncommitted for hours before new traffic comes in and the timer is triggered. On traffic following daylight patterns, the last records will remain uncommitted overnight.

With sporadic traffic it's even worse. Not only records can languish uncommitted, but if load redistribution happens more often than traffic, the uncommitted records get discarded, and a new timer starts once the topic is re-assigned, which might result in records staying uncommitted indefinitely unless a flush size is also set at very low values and, even then, we still face the prospect of going days before the record is finally committed.

Kafka Connect has a solution to that: offset.flush.interval.ms. Kafka Connect HDFS blithely ignores it. And if the connector is closed, all uncommitted data is discarded. There's literally no way of getting Kafka Connect HDFS to commit data without actually sending data.

We have both these patterns as well as high traffic volumes at very different scales, but the producers and consumers of all topics are the same, and it would be extremely inconvenient to have to resort to a separate tool just for it.

@dcsobral dcsobral changed the title Rotate Interval doesn't work for low volume or irregular topics Rotate Interval doesn't work for low volume or irregular traffic May 18, 2018
@dcsobral
Copy link
Contributor Author

I found that the sink worker task writes to every sink after every poll, even if no records are being sent. Though that's good enough, it is undocumented behavior as far as I can tell, whereas the documented behavior that can be used is ignored.

@kkonstantine
Copy link
Member

I believe you've confirmed that the connector will poll periodically for records, will buffer some records according to its partitioning policy and will flush records in configurable intervals according to wall-clock or based on flush.size.

Would you agree on closing this issue?

@gokhansari
Copy link

I would like to share my similar experience with this issue.

After I stopped producing messages to relevant topic, I realized that kafka consumer lag count have stopped decreasing. But when I checked WAL files under related partition directory in HDFS, I saw key value file names for higher offsets were there. I also checked path of that value file under hdfs and confirmed It was written successfully. But kafka consumer still lag behind, no matter how long you wait. Necessarily you need to produce new records.

If you can not produce records anymore for that related partition files (this could be happen when you use date partitioning), consumer will not fall ever and total lag of this consumer will increase in log term.

Any idea, workaround about this?

@cadl
Copy link

cadl commented Dec 19, 2022

I put some code into flush() at HdfsSinkTask, and it works with rotate.schedule.interval.ms setting.

https://github.com/confluentinc/kafka-connect-hdfs/compare/master...cadl:kafka-connect-hdfs:check-rotate-on-offset-flush?expand=1

flush() will run the state machine, check should rotation or not, at every offset.flush.interval.ms.

cc @gokhansari

@Usiel
Copy link

Usiel commented Feb 2, 2024

Another solution with #684

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants