From 2486314f2cad430ae47fbd99000259236b0df798 Mon Sep 17 00:00:00 2001 From: Tony Tang Date: Fri, 1 Nov 2024 11:40:33 -0400 Subject: [PATCH 1/4] Add option to adjust ReadChangeStream API timeout to allow longer timeout for longer checkpoint durations Change-Id: Ia61388bed5997b7138b15a014d6370d49cbf5277 --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 43 ++++++++++++++++++- .../dao/BigtableChangeStreamAccessor.java | 11 +++-- .../changestreams/dao/DaoFactory.java | 11 ++++- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 2 + 4 files changed, 60 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 932099e01763..1918cc1e8639 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -2114,6 +2114,8 @@ public abstract static class ReadChangeStream private static final Duration DEFAULT_BACKLOG_REPLICATION_ADJUSTMENT = Duration.standardSeconds(30); + private static final Duration DEFAULT_READ_CHANGE_STREAM_TIMEOUT = Duration.standardSeconds(15); + static ReadChangeStream create() { BigtableConfig config = BigtableConfig.builder().setValidate(true).build(); BigtableConfig metadataTableconfig = BigtableConfig.builder().setValidate(true).build(); @@ -2145,6 +2147,8 @@ static ReadChangeStream create() { abstract @Nullable Duration getBacklogReplicationAdjustment(); + abstract @Nullable Duration getReadChangeStreamTimeout(); + abstract @Nullable Boolean getValidateConfig(); abstract ReadChangeStream.Builder toBuilder(); @@ -2351,6 +2355,22 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) { return toBuilder().setBacklogReplicationAdjustment(adjustment).build(); } + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that overrides timeout for ReadChangeStream + * requests. + * + *

This is useful to override the default of 15s timeout if the checkpoint duration is longer + * than 15s. Setting this value to longer than periodic checkpoint duration ensures that + * ReadChangeStream will stream until the next checkpoint is initiated. + * + *

Optional: defaults to 15 seconds. + * + *

Does not modify this object. + */ + public ReadChangeStream withReadChangeStreamTimeout(Duration timeout) { + return toBuilder().setReadChangeStreamTimeout(timeout).build(); + } + /** * Disables validation that the table being read and the metadata table exists, and that the app * profile used is single cluster and single row transaction enabled. Set this option if the @@ -2461,11 +2481,21 @@ public PCollection> expand(PBegin input) { backlogReplicationAdjustment = DEFAULT_BACKLOG_REPLICATION_ADJUSTMENT; } + Duration readChangeStreamTimeout = getReadChangeStreamTimeout(); + if (readChangeStreamTimeout == null) { + readChangeStreamTimeout = DEFAULT_READ_CHANGE_STREAM_TIMEOUT; + } + ActionFactory actionFactory = new ActionFactory(); ChangeStreamMetrics metrics = new ChangeStreamMetrics(); DaoFactory daoFactory = new DaoFactory( - bigtableConfig, metadataTableConfig, getTableId(), metadataTableId, changeStreamName); + bigtableConfig, + metadataTableConfig, + getTableId(), + metadataTableId, + changeStreamName, + readChangeStreamTimeout); // Validate the configuration is correct before creating the pipeline, if required. try { @@ -2542,6 +2572,8 @@ abstract ReadChangeStream.Builder setExistingPipelineOptions( abstract ReadChangeStream.Builder setBacklogReplicationAdjustment(Duration adjustment); + abstract ReadChangeStream.Builder setReadChangeStreamTimeout(Duration timeout); + abstract ReadChangeStream.Builder setValidateConfig(boolean validateConfig); abstract ReadChangeStream build(); @@ -2578,7 +2610,14 @@ public static boolean createOrUpdateReadChangeStreamMetadataTable( tableId = MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME; } - DaoFactory daoFactory = new DaoFactory(null, bigtableConfig, null, tableId, null); + DaoFactory daoFactory = + new DaoFactory( + null, + bigtableConfig, + null, + tableId, + null, + ReadChangeStream.DEFAULT_READ_CHANGE_STREAM_TIMEOUT); try { MetadataTableAdminDao metadataTableAdminDao = daoFactory.getMetadataTableAdminDao(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java index cb296aef6c28..4362470c7676 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/BigtableChangeStreamAccessor.java @@ -52,6 +52,7 @@ public class BigtableChangeStreamAccessor implements AutoCloseable { // Create one bigtable data/admin client per bigtable config (project/instance/table/app profile) private static final ConcurrentHashMap bigtableAccessors = new ConcurrentHashMap<>(); + private static Duration readChangeStreamTimeout = Duration.ofSeconds(15); private final BigtableDataClient dataClient; private final BigtableTableAdminClient tableAdminClient; @@ -83,6 +84,10 @@ public synchronized void close() { bigtableAccessors.remove(bigtableConfig); } + public static void setReadChangeStreamTimeout(Duration timeout) { + readChangeStreamTimeout = timeout; + } + /** * Create a BigtableAccess if it doesn't exist and store it in the cache for faster access. If it * does exist, just return it. @@ -204,9 +209,9 @@ private static BigtableChangeStreamAccessor createAccessor(@NonNull BigtableConf .readChangeStreamSettings() .setRetrySettings( readChangeStreamRetrySettings - .setInitialRpcTimeout(Duration.ofSeconds(15)) - .setTotalTimeout(Duration.ofSeconds(15)) - .setMaxRpcTimeout(Duration.ofSeconds(15)) + .setInitialRpcTimeout(readChangeStreamTimeout) + .setTotalTimeout(readChangeStreamTimeout) + .setMaxRpcTimeout(readChangeStreamTimeout) .setMaxAttempts(10) .build()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java index 35ac8ed29c3e..a4f9e31a9a07 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java @@ -26,12 +26,13 @@ import java.io.Serializable; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.gcp.bigtable.BigtableConfig; +import org.joda.time.Duration; // Allows transient fields to be intialized later @SuppressWarnings("initialization.fields.uninitialized") @Internal public class DaoFactory implements Serializable, AutoCloseable { - private static final long serialVersionUID = 3732208768248394205L; + private static final long serialVersionUID = -3423959768580600281L; private transient ChangeStreamDao changeStreamDao; private transient MetadataTableAdminDao metadataTableAdminDao; @@ -45,17 +46,21 @@ public class DaoFactory implements Serializable, AutoCloseable { private final String metadataTableId; private final String changeStreamName; + private final Duration readChangeStreamTimeout; + public DaoFactory( BigtableConfig changeStreamConfig, BigtableConfig metadataTableConfig, String tableId, String metadataTableId, - String changeStreamName) { + String changeStreamName, + Duration readChangeStreamTimeout) { this.changeStreamConfig = changeStreamConfig; this.metadataTableConfig = metadataTableConfig; this.changeStreamName = changeStreamName; this.tableId = tableId; this.metadataTableId = metadataTableId; + this.readChangeStreamTimeout = readChangeStreamTimeout; } @Override @@ -106,6 +111,8 @@ public synchronized ChangeStreamDao getChangeStreamDao() throws IOException { checkArgumentNotNull(changeStreamConfig.getProjectId()); checkArgumentNotNull(changeStreamConfig.getInstanceId()); checkArgumentNotNull(changeStreamConfig.getAppProfileId()); + BigtableChangeStreamAccessor.setReadChangeStreamTimeout( + org.threeten.bp.Duration.ofMillis(readChangeStreamTimeout.getMillis())); BigtableDataClient dataClient = BigtableChangeStreamAccessor.getOrCreate(changeStreamConfig).getDataClient(); changeStreamDao = new ChangeStreamDao(dataClient, this.tableId); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 71c648730bd2..2065772a9a4f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -2053,6 +2053,7 @@ public void testReadChangeStreamBuildsCorrectly() { .withMetadataTableAppProfileId("metadata-app-profile") .withStartTime(startTime) .withBacklogReplicationAdjustment(Duration.standardMinutes(1)) + .withReadChangeStreamTimeout(Duration.standardMinutes(1)) .withCreateOrUpdateMetadataTable(false) .withExistingPipelineOptions(BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS); assertEquals("project", readChangeStream.getBigtableConfig().getProjectId().get()); @@ -2071,6 +2072,7 @@ public void testReadChangeStreamBuildsCorrectly() { assertEquals("change-stream-name", readChangeStream.getChangeStreamName()); assertEquals(startTime, readChangeStream.getStartTime()); assertEquals(Duration.standardMinutes(1), readChangeStream.getBacklogReplicationAdjustment()); + assertEquals(Duration.standardMinutes(1), readChangeStream.getReadChangeStreamTimeout()); assertEquals(false, readChangeStream.getCreateOrUpdateMetadataTable()); assertEquals( BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS, From 04140a68e991b55487e032be452343e6abb1b145 Mon Sep 17 00:00:00 2001 From: Tony Tang Date: Mon, 4 Nov 2024 12:34:47 -0500 Subject: [PATCH 2/4] Update comment to clarify timeout padding Change-Id: If34866637045314dfeef1807122cc34b97cc5ddc --- .../java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 1918cc1e8639..d40f77fdce6b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -2360,8 +2360,8 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) { * requests. * *

This is useful to override the default of 15s timeout if the checkpoint duration is longer - * than 15s. Setting this value to longer than periodic checkpoint duration ensures that - * ReadChangeStream will stream until the next checkpoint is initiated. + * than 15s. Setting this value to longer (to add some padding) than periodic checkpoint + * duration ensures that ReadChangeStream will stream until the next checkpoint is initiated. * *

Optional: defaults to 15 seconds. * From 003dc7721d83508585e8a4d1199ac318de612e8d Mon Sep 17 00:00:00 2001 From: Tony Tang Date: Sun, 8 Dec 2024 20:56:39 -0500 Subject: [PATCH 3/4] Move default change stream timeout into the accessor to avoid confusion about where default is set Change-Id: Ia76ff2e3ba4f6a91299e907ec404b720e900c76f --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 25 +++---------------- .../changestreams/dao/DaoFactory.java | 17 ++++++++----- 2 files changed, 15 insertions(+), 27 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index d40f77fdce6b..7b792903b7c9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -2114,8 +2114,6 @@ public abstract static class ReadChangeStream private static final Duration DEFAULT_BACKLOG_REPLICATION_ADJUSTMENT = Duration.standardSeconds(30); - private static final Duration DEFAULT_READ_CHANGE_STREAM_TIMEOUT = Duration.standardSeconds(15); - static ReadChangeStream create() { BigtableConfig config = BigtableConfig.builder().setValidate(true).build(); BigtableConfig metadataTableconfig = BigtableConfig.builder().setValidate(true).build(); @@ -2481,21 +2479,13 @@ public PCollection> expand(PBegin input) { backlogReplicationAdjustment = DEFAULT_BACKLOG_REPLICATION_ADJUSTMENT; } - Duration readChangeStreamTimeout = getReadChangeStreamTimeout(); - if (readChangeStreamTimeout == null) { - readChangeStreamTimeout = DEFAULT_READ_CHANGE_STREAM_TIMEOUT; - } - ActionFactory actionFactory = new ActionFactory(); ChangeStreamMetrics metrics = new ChangeStreamMetrics(); DaoFactory daoFactory = new DaoFactory( - bigtableConfig, - metadataTableConfig, - getTableId(), - metadataTableId, - changeStreamName, - readChangeStreamTimeout); + bigtableConfig, metadataTableConfig, getTableId(), metadataTableId, changeStreamName); + + daoFactory.setReadChangeStreamTimeout(getReadChangeStreamTimeout()); // Validate the configuration is correct before creating the pipeline, if required. try { @@ -2610,14 +2600,7 @@ public static boolean createOrUpdateReadChangeStreamMetadataTable( tableId = MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME; } - DaoFactory daoFactory = - new DaoFactory( - null, - bigtableConfig, - null, - tableId, - null, - ReadChangeStream.DEFAULT_READ_CHANGE_STREAM_TIMEOUT); + DaoFactory daoFactory = new DaoFactory(null, bigtableConfig, null, tableId, null); try { MetadataTableAdminDao metadataTableAdminDao = daoFactory.getMetadataTableAdminDao(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java index a4f9e31a9a07..81f9c684c958 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java @@ -26,6 +26,7 @@ import java.io.Serializable; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.gcp.bigtable.BigtableConfig; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; // Allows transient fields to be intialized later @@ -46,21 +47,19 @@ public class DaoFactory implements Serializable, AutoCloseable { private final String metadataTableId; private final String changeStreamName; - private final Duration readChangeStreamTimeout; + @Nullable private Duration readChangeStreamTimeout; public DaoFactory( BigtableConfig changeStreamConfig, BigtableConfig metadataTableConfig, String tableId, String metadataTableId, - String changeStreamName, - Duration readChangeStreamTimeout) { + String changeStreamName) { this.changeStreamConfig = changeStreamConfig; this.metadataTableConfig = metadataTableConfig; this.changeStreamName = changeStreamName; this.tableId = tableId; this.metadataTableId = metadataTableId; - this.readChangeStreamTimeout = readChangeStreamTimeout; } @Override @@ -76,6 +75,10 @@ public void close() { } } + public void setReadChangeStreamTimeout(@Nullable Duration readChangeStreamTimeout) { + this.readChangeStreamTimeout = readChangeStreamTimeout; + } + public String getChangeStreamName() { return changeStreamName; } @@ -111,8 +114,10 @@ public synchronized ChangeStreamDao getChangeStreamDao() throws IOException { checkArgumentNotNull(changeStreamConfig.getProjectId()); checkArgumentNotNull(changeStreamConfig.getInstanceId()); checkArgumentNotNull(changeStreamConfig.getAppProfileId()); - BigtableChangeStreamAccessor.setReadChangeStreamTimeout( - org.threeten.bp.Duration.ofMillis(readChangeStreamTimeout.getMillis())); + if (readChangeStreamTimeout != null) { + BigtableChangeStreamAccessor.setReadChangeStreamTimeout( + org.threeten.bp.Duration.ofMillis(readChangeStreamTimeout.getMillis())); + } BigtableDataClient dataClient = BigtableChangeStreamAccessor.getOrCreate(changeStreamConfig).getDataClient(); changeStreamDao = new ChangeStreamDao(dataClient, this.tableId); From 1619f015f3a167e3fca00328a3fbef93ad54a754 Mon Sep 17 00:00:00 2001 From: Tony Tang Date: Sun, 8 Dec 2024 21:13:14 -0500 Subject: [PATCH 4/4] fix format issue Change-Id: I89ba1d48742d5204168249d632accf86d68e140c --- .../java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java | 1 - .../beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 7b792903b7c9..6441524fc847 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -2484,7 +2484,6 @@ public PCollection> expand(PBegin input) { DaoFactory daoFactory = new DaoFactory( bigtableConfig, metadataTableConfig, getTableId(), metadataTableId, changeStreamName); - daoFactory.setReadChangeStreamTimeout(getReadChangeStreamTimeout()); // Validate the configuration is correct before creating the pipeline, if required. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java index 81f9c684c958..c4d83c298c8e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java @@ -47,7 +47,7 @@ public class DaoFactory implements Serializable, AutoCloseable { private final String metadataTableId; private final String changeStreamName; - @Nullable private Duration readChangeStreamTimeout; + private @Nullable Duration readChangeStreamTimeout; public DaoFactory( BigtableConfig changeStreamConfig,