-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[feat 32473] Added soft deadline logic to Spanner Change Stream IO connector. #32474
base: master
Are you sure you want to change the base?
Conversation
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you do a test run with the specified checkpoint > than 1 minute
and verify the pipeline works well?
} | ||
|
||
continueProcessing &= | ||
new Instant(timeSupplier.get().toSqlTimestamp()).isBefore(softDeadline) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could we do the comparison without having to create a new Instant
on every call?
@@ -88,6 +91,14 @@ public Optional<ProcessContinuation> run( | |||
|
|||
final Timestamp commitTimestamp = record.getCommitTimestamp(); | |||
final Instant commitInstant = new Instant(commitTimestamp.toSqlTimestamp().getTime()); | |||
if (tracker instanceof Interruptible |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we receive an instance of Interruptible instead of having to do an instanceof comparison on every call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as explicit class as this route proved to be infeasible due to Beam adapter pattern
@@ -113,6 +114,14 @@ public Optional<ProcessContinuation> run( | |||
|
|||
final Timestamp startTimestamp = record.getStartTimestamp(); | |||
final Instant startInstant = new Instant(startTimestamp.toSqlTimestamp().getTime()); | |||
if (tracker instanceof Interruptible | |||
&& !((Interruptible) tracker).shouldContinue(startTimestamp)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this change is to help prevent deadline-exceeds on the change stream rpc from causing problems. The deadlines are in terms of walltime, while the restriction tracker and the record timestamps are in terms of event/data time.
If processing falls behind, the record timestamps and restriction tracker ranges will be in the past but we still want to stop processing after 40 seconds elapses in walltime. Currently the soft-deadline is set to now+40 seconds which might not be reached before the rpc deadline exceeds. And if we changed the soft-deadline to be startTimestamp+40sec, we coudl delay catch up by stopping unnecessarily early if that takes less than 40s of walltime to pass.
Instead I think that you could have a StopWatch at the QueryChangeStreamAction and after 40 seconds of walltime elapses you could update the restriction tracker soft-deadline to the current claimed timestamp. That would ensure that all the records for that timestamp are processed but that newer timestamps would not be claimed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. That's what the current logic does
Friendly ping on this, as it would be nice to avoid deadline exceeds from causing retries |
...ava/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordAction.java
Show resolved
Hide resolved
...in/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
Show resolved
Hide resolved
...ava/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupter.java
Outdated
Show resolved
Hide resolved
...org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java
Show resolved
Hide resolved
...org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/RestrictionInterrupterTest.java
Show resolved
Hide resolved
Tested with BigQuery template and sdf checkpoint interval = 80s. There don't seem to be deadline exceeded anymore and data freshness by state oscillates between 40-50s gcloud dataflow flex-template run $DATAFLOW_JOB --template-file-gcs-location gs://$BUCKET_NAME/templates/flex/Spanner_Change_Streams_to_BigQuery --region us-central1 --parameters spannerInstanceId=$INSTANCE --parameters spannerDatabase=$DATABASE --parameters spannerMetadataInstanceId=$INSTANCE --parameters spannerMetadataDatabase=$META_DATABASE --parameters spannerChangeStreamName=$CHANGE_STREAM --parameters bigQueryDataset=$BIG_QUERY_DATASET --parameters deadLetterQueueDirectory=gs://$BUCKET_NAME/dlq/try1 --parameters experiments=sdf_checkpoint_after_duration=80s |
@thiagotnunes Any other comments before merging? |
Currently, Spanner Change Stream IO connector expects the beam framework to issue a split event either 5s or every 5MB of processed payload. In case that such event doesn't occur processing will continue until ultimately 1m pre-set timeout for Change Stream query is reached. This will cause the existing work to be dropped and query to be rescheduled. This feature tracks work for adding a soft deadline before the hard deadline that will allow to commit already processed work and updates internal processing low watermark
addresses #32473 fixes #32473
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.