Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: generate random index name for change streams #32689

Merged
merged 15 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.NameGenerator.generatePartitionMetadataTableName;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -61,6 +60,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand All @@ -77,6 +77,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.InitializeDoFn;
Expand Down Expand Up @@ -1772,9 +1773,13 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
+ fullPartitionMetadataDatabaseId
+ " has dialect "
+ metadataDatabaseDialect);
final String partitionMetadataTableName =
MoreObjects.firstNonNull(
getMetadataTable(), generatePartitionMetadataTableName(partitionMetadataDatabaseId));
PartitionMetadataTableNames partitionMetadataTableNames =
Optional.ofNullable(getMetadataTable())
.map(
table ->
PartitionMetadataTableNames.fromExistingTable(
partitionMetadataDatabaseId, table))
.orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId));
final String changeStreamName = getChangeStreamName();
final Timestamp startTimestamp = getInclusiveStartAt();
// Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the
Expand All @@ -1791,7 +1796,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
changeStreamSpannerConfig,
changeStreamName,
partitionMetadataSpannerConfig,
partitionMetadataTableName,
partitionMetadataTableNames,
rpcPriority,
input.getPipeline().getOptions().getJobName(),
changeStreamDatabaseDialect,
Expand All @@ -1807,7 +1812,9 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);

LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName);
LOG.info(
"Partition metadata table that will be used is "
+ partitionMetadataTableNames.getTableName());

final PCollection<byte[]> impulseOut = input.apply(Impulse.create());
final PCollection<PartitionMetadata> partitionsOut =
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class DaoFactory implements Serializable {
private final SpannerConfig metadataSpannerConfig;

private final String changeStreamName;
private final String partitionMetadataTableName;
private final PartitionMetadataTableNames partitionMetadataTableNames;
private final RpcPriority rpcPriority;
private final String jobName;
private final Dialect spannerChangeStreamDatabaseDialect;
Expand All @@ -56,15 +56,15 @@ public class DaoFactory implements Serializable {
* @param changeStreamSpannerConfig the configuration for the change streams DAO
* @param changeStreamName the name of the change stream for the change streams DAO
* @param metadataSpannerConfig the metadata tables configuration
* @param partitionMetadataTableName the name of the created partition metadata table
* @param partitionMetadataTableNames the names of the partition metadata ddl objects
* @param rpcPriority the priority of the requests made by the DAO queries
* @param jobName the name of the running job
*/
public DaoFactory(
SpannerConfig changeStreamSpannerConfig,
String changeStreamName,
SpannerConfig metadataSpannerConfig,
String partitionMetadataTableName,
PartitionMetadataTableNames partitionMetadataTableNames,
RpcPriority rpcPriority,
String jobName,
Dialect spannerChangeStreamDatabaseDialect,
Expand All @@ -78,7 +78,7 @@ public DaoFactory(
this.changeStreamSpannerConfig = changeStreamSpannerConfig;
this.changeStreamName = changeStreamName;
this.metadataSpannerConfig = metadataSpannerConfig;
this.partitionMetadataTableName = partitionMetadataTableName;
this.partitionMetadataTableNames = partitionMetadataTableNames;
this.rpcPriority = rpcPriority;
this.jobName = jobName;
this.spannerChangeStreamDatabaseDialect = spannerChangeStreamDatabaseDialect;
Expand All @@ -102,7 +102,7 @@ public synchronized PartitionMetadataAdminDao getPartitionMetadataAdminDao() {
databaseAdminClient,
metadataSpannerConfig.getInstanceId().get(),
metadataSpannerConfig.getDatabaseId().get(),
partitionMetadataTableName,
partitionMetadataTableNames,
this.metadataDatabaseDialect);
}
return partitionMetadataAdminDao;
Expand All @@ -120,7 +120,7 @@ public synchronized PartitionMetadataDao getPartitionMetadataDao() {
if (partitionMetadataDaoInstance == null) {
partitionMetadataDaoInstance =
new PartitionMetadataDao(
this.partitionMetadataTableName,
this.partitionMetadataTableNames.getTableName(),
spannerAccessor.getDatabaseClient(),
this.metadataDatabaseDialect);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,13 @@ public class PartitionMetadataAdminDao {
*/
public static final String COLUMN_FINISHED_AT = "FinishedAt";

/** Metadata table index for queries over the watermark column. */
public static final String WATERMARK_INDEX = "WatermarkIndex";

/** Metadata table index for queries over the created at / start timestamp columns. */
public static final String CREATED_AT_START_TIMESTAMP_INDEX = "CreatedAtStartTimestampIndex";

private static final int TIMEOUT_MINUTES = 10;
private static final int TTL_AFTER_PARTITION_FINISHED_DAYS = 1;

private final DatabaseAdminClient databaseAdminClient;
private final String instanceId;
private final String databaseId;
private final String tableName;
private final PartitionMetadataTableNames names;
private final Dialect dialect;

/**
Expand All @@ -101,18 +95,18 @@ public class PartitionMetadataAdminDao {
* table
* @param instanceId the instance where the metadata table will reside
* @param databaseId the database where the metadata table will reside
* @param tableName the name of the metadata table
* @param names the names of the metadata table ddl objects
*/
PartitionMetadataAdminDao(
DatabaseAdminClient databaseAdminClient,
String instanceId,
String databaseId,
String tableName,
PartitionMetadataTableNames names,
Dialect dialect) {
this.databaseAdminClient = databaseAdminClient;
this.instanceId = instanceId;
this.databaseId = databaseId;
this.tableName = tableName;
this.names = names;
this.dialect = dialect;
}

Expand All @@ -129,7 +123,7 @@ public void createPartitionMetadataTable() {
// Literals need be added around literals to preserve casing.
ddl.add(
"CREATE TABLE \""
+ tableName
+ names.getTableName()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this method fail if the tables/indexes already exist?

IIUC the issue correctly this is failing currently because the index has the same name across pipelines and the creation is failing. Do we also have to worry about a single pipeline failing if it created the tables but Dataflow retries the work? It seems we might want ot use create if it doesn't exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Since the names have a random component, I assumed the job would fail and we would re-generate them.

Updated with IF NOT EXISTS to circumvent this problem.

+ "\"(\""
+ COLUMN_PARTITION_TOKEN
+ "\" text NOT NULL,\""
Expand Down Expand Up @@ -164,19 +158,19 @@ public void createPartitionMetadataTable() {
+ "\"");
ddl.add(
"CREATE INDEX \""
+ WATERMARK_INDEX
+ names.getWatermarkIndexName()
+ "\" on \""
+ tableName
+ names.getTableName()
+ "\" (\""
+ COLUMN_WATERMARK
+ "\") INCLUDE (\""
+ COLUMN_STATE
+ "\")");
ddl.add(
"CREATE INDEX \""
+ CREATED_AT_START_TIMESTAMP_INDEX
+ names.getCreatedAtIndexName()
+ "\" ON \""
+ tableName
+ names.getTableName()
+ "\" (\""
+ COLUMN_CREATED_AT
+ "\",\""
Expand All @@ -185,7 +179,7 @@ public void createPartitionMetadataTable() {
} else {
ddl.add(
"CREATE TABLE "
+ tableName
+ names.getTableName()
+ " ("
+ COLUMN_PARTITION_TOKEN
+ " STRING(MAX) NOT NULL,"
Expand Down Expand Up @@ -219,19 +213,19 @@ public void createPartitionMetadataTable() {
+ " DAY))");
ddl.add(
"CREATE INDEX "
+ WATERMARK_INDEX
+ names.getWatermarkIndexName()
+ " on "
+ tableName
+ names.getTableName()
+ " ("
+ COLUMN_WATERMARK
+ ") STORING ("
+ COLUMN_STATE
+ ")");
ddl.add(
"CREATE INDEX "
+ CREATED_AT_START_TIMESTAMP_INDEX
+ names.getCreatedAtIndexName()
+ " ON "
+ tableName
+ names.getTableName()
+ " ("
+ COLUMN_CREATED_AT
+ ","
Expand Down Expand Up @@ -261,16 +255,14 @@ public void createPartitionMetadataTable() {
* Drops the metadata table. This operation should complete in {@link
* PartitionMetadataAdminDao#TIMEOUT_MINUTES} minutes.
*/
public void deletePartitionMetadataTable() {
public void deletePartitionMetadataTable(List<String> indexes) {
List<String> ddl = new ArrayList<>();
if (this.isPostgres()) {
ddl.add("DROP INDEX \"" + CREATED_AT_START_TIMESTAMP_INDEX + "\"");
ddl.add("DROP INDEX \"" + WATERMARK_INDEX + "\"");
ddl.add("DROP TABLE \"" + tableName + "\"");
indexes.forEach(index -> ddl.add("DROP INDEX \"" + index + "\""));
ddl.add("DROP TABLE \"" + names.getTableName() + "\"");
} else {
ddl.add("DROP INDEX " + CREATED_AT_START_TIMESTAMP_INDEX);
ddl.add("DROP INDEX " + WATERMARK_INDEX);
ddl.add("DROP TABLE " + tableName);
indexes.forEach(index -> ddl.add("DROP INDEX " + index));
ddl.add("DROP TABLE " + names.getTableName());
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,41 @@ public boolean tableExists() {
}
}

/**
* Finds all indexes for the metadata table.
*
* @return a list of index names for the metadata table.
*/
public List<String> findAllTableIndexes() {
String indexesStmt;
if (this.isPostgres()) {
indexesStmt =
"SELECT index_name FROM information_schema.indexes"
+ " WHERE table_schema = 'public'"
+ " AND table_name = '"
+ metadataTableName
+ "' AND index_type != 'PRIMARY_KEY'";
} else {
indexesStmt =
"SELECT index_name FROM information_schema.indexes"
+ " WHERE table_schema = ''"
+ " AND table_name = '"
+ metadataTableName
+ "' AND index_type != 'PRIMARY_KEY'";
}

List<String> result = new ArrayList<>();
try (ResultSet queryResultSet =
databaseClient
.singleUseReadOnlyTransaction()
.executeQuery(Statement.of(indexesStmt), Options.tag("query=findAllTableIndexes"))) {
while (queryResultSet.next()) {
result.add(queryResultSet.getString("index_name"));
}
}
return result;
}

/**
* Fetches the partition metadata row data for the given partition token.
*
Expand Down
Loading
Loading