-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: checkpoint commit callback (for cdc-connector) #15464
Comments
Note down the discussion with @BugenZhao. Today we realized the requirement not only works for source but also works for sink. For example, if we want to provide exactly-once delivery for sink, the implementation will be like
Hope we can design a general framework for this requirement, rather than being specific |
I think we can generalize the sink coordinator framework for this feature. |
I think we have already implemented a general framework (sink coordinator) for sink and we need to extend it to source. you can check with @wenym1 for the details. IIRC, the iceberg sink already leverages sink coordinator to coordinate commit from different sink actors in the same fragment. |
BTW, this situation is also true for persisted pulsar source with 0 retention. |
Does |
The problem is we need to ack the source offset to upstream database after checkpoint has been committed.
I revisited the workflow of sink coordinator, which is independent of the barriers collection workflow (initiated by
This is a good idea. Right now there is a existing
Btw source exec can call |
I think the workflow can be simplified without RPC:
|
This is exactly what I mean! There's one more thing to be note that the procedure should be done in a concurrent or asynchronous manner. |
The coding of PR #16058 is done, Problem resolved. should wait for the epoch.prev instead of epoch.curr. |
Hi all, I am revisiting the issue for implementing a general ack mechanism in source, thanks @xxchan for mentioning this. My major questions are:
@yuhao-su can you share more about the case? IIRC, a non-persistent pulsar topic cannot seek to spec position and I think it is expected. |
Replied in the issue. |
Some background go first
Our cdc connector consumes PG cdc events while acking to the PG server at regular intervals the offset (lsn) that has been consumed. Then upstream PG will assume that wal log of those offsets can be discarded.
risingwave/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEventConsumer.java
Lines 189 to 199 in e6d8d88
DebeziumEngine will commit those marked offsets to upstream:
https://github.com/debezium/debezium/blob/4ca2a67b0d302c611b89b1931728377cf232ab6c/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java#L435-L436
Findings
After some investigation, I think the reason for the "Cannot seek to the last known offset" error is that we ack the offset to PG before the checkpoint commit. So that when the cluster recovered from a committed checkpoint, the restored offset may already been discarded by upstream PG.
Currently our framework doesn't have a checkpoint commit callback mechanism to notify the source executor. An intuitive idea is to let Meta make a broadcast RPCs to each CNs in the cluster. cc @hzxa21
To confirm the findings, I increase the offset flush interval to 30mins which is much large than the time required for the test and rerun the chaos test (stresschaos only and w/o memtable spill: 599, 603 ), the results show that the "Cannot seek" error is gone and btw mv check is passed.
But! when I run chaos test with 3CNs (601) , even the error is gone, the source table is still unsynced with PG, I have no idea of this one right now.
Originally posted by @StrikeW in #15141 (comment)
The text was updated successfully, but these errors were encountered: