Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to adjust ReadChangeStream API timeout to allow longer timeout for longer checkpoint durations #33001

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2114,6 +2114,8 @@ public abstract static class ReadChangeStream
private static final Duration DEFAULT_BACKLOG_REPLICATION_ADJUSTMENT =
Duration.standardSeconds(30);

private static final Duration DEFAULT_READ_CHANGE_STREAM_TIMEOUT = Duration.standardSeconds(15);

static ReadChangeStream create() {
BigtableConfig config = BigtableConfig.builder().setValidate(true).build();
BigtableConfig metadataTableconfig = BigtableConfig.builder().setValidate(true).build();
Expand Down Expand Up @@ -2145,6 +2147,8 @@ static ReadChangeStream create() {

abstract @Nullable Duration getBacklogReplicationAdjustment();

abstract @Nullable Duration getReadChangeStreamTimeout();

abstract @Nullable Boolean getValidateConfig();

abstract ReadChangeStream.Builder toBuilder();
Expand Down Expand Up @@ -2351,6 +2355,22 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) {
return toBuilder().setBacklogReplicationAdjustment(adjustment).build();
}

/**
* Returns a new {@link BigtableIO.ReadChangeStream} that overrides timeout for ReadChangeStream
* requests.
*
* <p>This is useful to override the default of 15s timeout if the checkpoint duration is longer
* than 15s. Setting this value to longer (to add some padding) than periodic checkpoint
* duration ensures that ReadChangeStream will stream until the next checkpoint is initiated.
*
* <p>Optional: defaults to 15 seconds.
*
* <p>Does not modify this object.
*/
public ReadChangeStream withReadChangeStreamTimeout(Duration timeout) {
return toBuilder().setReadChangeStreamTimeout(timeout).build();
}

/**
* Disables validation that the table being read and the metadata table exists, and that the app
* profile used is single cluster and single row transaction enabled. Set this option if the
Expand Down Expand Up @@ -2461,11 +2481,21 @@ public PCollection<KV<ByteString, ChangeStreamMutation>> expand(PBegin input) {
backlogReplicationAdjustment = DEFAULT_BACKLOG_REPLICATION_ADJUSTMENT;
}

Duration readChangeStreamTimeout = getReadChangeStreamTimeout();
if (readChangeStreamTimeout == null) {
readChangeStreamTimeout = DEFAULT_READ_CHANGE_STREAM_TIMEOUT;
}

ActionFactory actionFactory = new ActionFactory();
ChangeStreamMetrics metrics = new ChangeStreamMetrics();
DaoFactory daoFactory =
new DaoFactory(
bigtableConfig, metadataTableConfig, getTableId(), metadataTableId, changeStreamName);
bigtableConfig,
metadataTableConfig,
getTableId(),
metadataTableId,
changeStreamName,
readChangeStreamTimeout);

// Validate the configuration is correct before creating the pipeline, if required.
try {
Expand Down Expand Up @@ -2542,6 +2572,8 @@ abstract ReadChangeStream.Builder setExistingPipelineOptions(

abstract ReadChangeStream.Builder setBacklogReplicationAdjustment(Duration adjustment);

abstract ReadChangeStream.Builder setReadChangeStreamTimeout(Duration timeout);

abstract ReadChangeStream.Builder setValidateConfig(boolean validateConfig);

abstract ReadChangeStream build();
Expand Down Expand Up @@ -2578,7 +2610,14 @@ public static boolean createOrUpdateReadChangeStreamMetadataTable(
tableId = MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME;
}

DaoFactory daoFactory = new DaoFactory(null, bigtableConfig, null, tableId, null);
DaoFactory daoFactory =
new DaoFactory(
null,
bigtableConfig,
null,
tableId,
null,
ReadChangeStream.DEFAULT_READ_CHANGE_STREAM_TIMEOUT);

try {
MetadataTableAdminDao metadataTableAdminDao = daoFactory.getMetadataTableAdminDao();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class BigtableChangeStreamAccessor implements AutoCloseable {
// Create one bigtable data/admin client per bigtable config (project/instance/table/app profile)
private static final ConcurrentHashMap<BigtableConfig, BigtableChangeStreamAccessor>
bigtableAccessors = new ConcurrentHashMap<>();
private static Duration readChangeStreamTimeout = Duration.ofSeconds(15);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be null? DEFAULT_READ_CHANGE_STREAM_TIMEOUT already defines the default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply.

I updated the location of the default value into the Accessor. This makes it more clear what the default value is and where it is set. BigtableIO, instead of setting it, will override the default timestamp if necessary.


private final BigtableDataClient dataClient;
private final BigtableTableAdminClient tableAdminClient;
Expand Down Expand Up @@ -83,6 +84,10 @@ public synchronized void close() {
bigtableAccessors.remove(bigtableConfig);
}

public static void setReadChangeStreamTimeout(Duration timeout) {
readChangeStreamTimeout = timeout;
}

/**
* Create a BigtableAccess if it doesn't exist and store it in the cache for faster access. If it
* does exist, just return it.
Expand Down Expand Up @@ -204,9 +209,9 @@ private static BigtableChangeStreamAccessor createAccessor(@NonNull BigtableConf
.readChangeStreamSettings()
.setRetrySettings(
readChangeStreamRetrySettings
.setInitialRpcTimeout(Duration.ofSeconds(15))
.setTotalTimeout(Duration.ofSeconds(15))
.setMaxRpcTimeout(Duration.ofSeconds(15))
.setInitialRpcTimeout(readChangeStreamTimeout)
.setTotalTimeout(readChangeStreamTimeout)
.setMaxRpcTimeout(readChangeStreamTimeout)
.setMaxAttempts(10)
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@
import java.io.Serializable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableConfig;
import org.joda.time.Duration;

// Allows transient fields to be intialized later
@SuppressWarnings("initialization.fields.uninitialized")
@Internal
public class DaoFactory implements Serializable, AutoCloseable {
private static final long serialVersionUID = 3732208768248394205L;
private static final long serialVersionUID = -3423959768580600281L;

private transient ChangeStreamDao changeStreamDao;
private transient MetadataTableAdminDao metadataTableAdminDao;
Expand All @@ -45,17 +46,21 @@ public class DaoFactory implements Serializable, AutoCloseable {
private final String metadataTableId;
private final String changeStreamName;

private final Duration readChangeStreamTimeout;

public DaoFactory(
BigtableConfig changeStreamConfig,
BigtableConfig metadataTableConfig,
String tableId,
String metadataTableId,
String changeStreamName) {
String changeStreamName,
Duration readChangeStreamTimeout) {
this.changeStreamConfig = changeStreamConfig;
this.metadataTableConfig = metadataTableConfig;
this.changeStreamName = changeStreamName;
this.tableId = tableId;
this.metadataTableId = metadataTableId;
this.readChangeStreamTimeout = readChangeStreamTimeout;
}

@Override
Expand Down Expand Up @@ -106,6 +111,8 @@ public synchronized ChangeStreamDao getChangeStreamDao() throws IOException {
checkArgumentNotNull(changeStreamConfig.getProjectId());
checkArgumentNotNull(changeStreamConfig.getInstanceId());
checkArgumentNotNull(changeStreamConfig.getAppProfileId());
BigtableChangeStreamAccessor.setReadChangeStreamTimeout(
org.threeten.bp.Duration.ofMillis(readChangeStreamTimeout.getMillis()));
BigtableDataClient dataClient =
BigtableChangeStreamAccessor.getOrCreate(changeStreamConfig).getDataClient();
changeStreamDao = new ChangeStreamDao(dataClient, this.tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,7 @@ public void testReadChangeStreamBuildsCorrectly() {
.withMetadataTableAppProfileId("metadata-app-profile")
.withStartTime(startTime)
.withBacklogReplicationAdjustment(Duration.standardMinutes(1))
.withReadChangeStreamTimeout(Duration.standardMinutes(1))
.withCreateOrUpdateMetadataTable(false)
.withExistingPipelineOptions(BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS);
assertEquals("project", readChangeStream.getBigtableConfig().getProjectId().get());
Expand All @@ -2071,6 +2072,7 @@ public void testReadChangeStreamBuildsCorrectly() {
assertEquals("change-stream-name", readChangeStream.getChangeStreamName());
assertEquals(startTime, readChangeStream.getStartTime());
assertEquals(Duration.standardMinutes(1), readChangeStream.getBacklogReplicationAdjustment());
assertEquals(Duration.standardMinutes(1), readChangeStream.getReadChangeStreamTimeout());
assertEquals(false, readChangeStream.getCreateOrUpdateMetadataTable());
assertEquals(
BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS,
Expand Down
Loading