Skip to content

Commit

Permalink
Adjust CRDB watch to set the checkpoint frequency equal to the resolved
Browse files Browse the repository at this point in the history
Should reduce watch pressure from the caller to CRDB
  • Loading branch information
josephschorr committed Oct 23, 2024
1 parent fcd2670 commit b2e55af
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
2 changes: 2 additions & 0 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
),
CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock},
dburl: url,
version: version,
watchBufferLength: config.watchBufferLength,
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
watchConnectTimeout: config.watchConnectTimeout,
Expand Down Expand Up @@ -286,6 +287,7 @@ type crdbDatastore struct {
revisions.CommonDecoder

dburl string
version crdbVersion
readPool, writePool *pool.RetryPool
watchBufferLength uint16
watchBufferWriteTimeout time.Duration
Expand Down
10 changes: 8 additions & 2 deletions internal/datastore/crdb/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

const (
queryChangefeed = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '0';"
queryChangefeed = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '%s';"
queryChangefeedPreV22 = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s';"
)

Expand Down Expand Up @@ -140,7 +140,13 @@ func (cds *crdbDatastore) watch(
}

resolvedDurationString := strconv.FormatInt(resolvedDuration.Milliseconds(), 10) + "ms"
interpolated := fmt.Sprintf(cds.beginChangefeedQuery, strings.Join(tableNames, ","), afterRevision, resolvedDurationString)

var interpolated string
if cds.version.Major >= 22 {
interpolated = fmt.Sprintf(cds.beginChangefeedQuery, strings.Join(tableNames, ","), afterRevision, resolvedDurationString, resolvedDurationString)
} else {
interpolated = fmt.Sprintf(cds.beginChangefeedQuery, strings.Join(tableNames, ","), afterRevision, resolvedDurationString)
}

sendError := func(err error) {
if errors.Is(ctx.Err(), context.Canceled) {
Expand Down

0 comments on commit b2e55af

Please sign in to comment.