Skip to content

Commit

Permalink
Upgrade to Beam 2.61.0 (#2033)
Browse files Browse the repository at this point in the history
* Validate Beam 2.61.0rc2

* Revert to non-rc

* Revert to non-rc

---------

Co-authored-by: Yi Hu <[email protected]>
  • Loading branch information
damccorm and Abacn authored Nov 27, 2024
1 parent 031179f commit b68fe21
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
<!-- <os.detected.classifier>osx-x86_64</os.detected.classifier>-->
<!-- Plugins -->
<templates-maven-plugin.version>1.0-SNAPSHOT</templates-maven-plugin.version>
<maven-checkstyle-plugin.version>3.2.1</maven-checkstyle-plugin.version>
Expand All @@ -47,8 +48,8 @@
<jacoco.version>0.8.8</jacoco.version>

<!-- Beam and linked versions -->
<beam.version>2.60.0</beam.version>
<beam-python.version>2.60.0</beam-python.version>
<beam.version>2.61.0</beam.version>
<beam-python.version>2.61.0</beam-python.version>
<beam-maven-repo></beam-maven-repo>

<!-- Common dependency versions -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.NameGenerator.generatePartitionMetadataTableName;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -60,6 +59,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
Expand All @@ -70,6 +70,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.InitializeDoFn;
Expand Down Expand Up @@ -1551,9 +1552,13 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
getSpannerConfig().getProjectId().get(),
partitionMetadataInstanceId,
partitionMetadataDatabaseId);
final String partitionMetadataTableName =
MoreObjects.firstNonNull(
getMetadataTable(), generatePartitionMetadataTableName(partitionMetadataDatabaseId));
PartitionMetadataTableNames partitionMetadataTableNames =
Optional.ofNullable(getMetadataTable())
.map(
table ->
PartitionMetadataTableNames.fromExistingTable(
partitionMetadataDatabaseId, table))
.orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId));

SpannerConfig changeStreamSpannerConfig = getSpannerConfig();
// Set default retryable errors for ReadChangeStream
Expand Down Expand Up @@ -1609,7 +1614,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
changeStreamSpannerConfig,
changeStreamName,
partitionMetadataSpannerConfig,
partitionMetadataTableName,
partitionMetadataTableNames,
rpcPriority,
input.getPipeline().getOptions().getJobName(),
changeStreamDatabaseDialect,
Expand All @@ -1625,12 +1630,13 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);

LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName);
LOG.info(
"Partition metadata table that will be used is " + partitionMetadataTableNames.getTableName());
input
.getPipeline()
.getOptions()
.as(SpannerChangeStreamOptions.class)
.setMetadataTable(partitionMetadataTableName);
.setMetadataTable(partitionMetadataTableNames.getTableName());

PCollection<byte[]> impulseOut = input.apply(Impulse.create());
PCollection<DataChangeRecord> results =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static void validate(MqttToPubsubOptions options) {
public static PipelineResult run(MqttToPubsubOptions options) {
validate(options);
Pipeline pipeline = Pipeline.create(options);
MqttIO.Read mqttIo;
MqttIO.Read<byte[]> mqttIo;
if (!options.getUsername().isEmpty() || !options.getPassword().isBlank()) {
mqttIo =
MqttIO.read()
Expand Down

0 comments on commit b68fe21

Please sign in to comment.