Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Beam 2.61.0 #2033

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
<!-- <os.detected.classifier>osx-x86_64</os.detected.classifier>-->
damccorm marked this conversation as resolved.
Show resolved Hide resolved
<!-- Plugins -->
<templates-maven-plugin.version>1.0-SNAPSHOT</templates-maven-plugin.version>
<maven-checkstyle-plugin.version>3.2.1</maven-checkstyle-plugin.version>
Expand All @@ -47,8 +48,8 @@
<jacoco.version>0.8.8</jacoco.version>

<!-- Beam and linked versions -->
<beam.version>2.60.0</beam.version>
<beam-python.version>2.60.0</beam-python.version>
<beam.version>2.61.0</beam.version>
<beam-python.version>2.61.0</beam-python.version>
<beam-maven-repo></beam-maven-repo>

<!-- Common dependency versions -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.NameGenerator.generatePartitionMetadataTableName;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -60,6 +59,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
Expand All @@ -70,6 +70,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.InitializeDoFn;
Expand Down Expand Up @@ -1551,9 +1552,13 @@
getSpannerConfig().getProjectId().get(),
partitionMetadataInstanceId,
partitionMetadataDatabaseId);
final String partitionMetadataTableName =
MoreObjects.firstNonNull(
getMetadataTable(), generatePartitionMetadataTableName(partitionMetadataDatabaseId));
PartitionMetadataTableNames partitionMetadataTableNames =
Optional.ofNullable(getMetadataTable())
.map(

Check warning on line 1557 in v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java#L1555-L1557

Added lines #L1555 - L1557 were not covered by tests
table ->
PartitionMetadataTableNames.fromExistingTable(

Check warning on line 1559 in v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java#L1559

Added line #L1559 was not covered by tests
partitionMetadataDatabaseId, table))
.orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId));

Check warning on line 1561 in v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java#L1561

Added line #L1561 was not covered by tests

SpannerConfig changeStreamSpannerConfig = getSpannerConfig();
// Set default retryable errors for ReadChangeStream
Expand Down Expand Up @@ -1609,7 +1614,7 @@
changeStreamSpannerConfig,
changeStreamName,
partitionMetadataSpannerConfig,
partitionMetadataTableName,
partitionMetadataTableNames,
rpcPriority,
input.getPipeline().getOptions().getJobName(),
changeStreamDatabaseDialect,
Expand All @@ -1625,12 +1630,13 @@
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);

LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName);
LOG.info(
"Partition metadata table that will be used is " + partitionMetadataTableNames.getTableName());

Check warning on line 1634 in v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java#L1633-L1634

Added lines #L1633 - L1634 were not covered by tests
input
.getPipeline()
.getOptions()
.as(SpannerChangeStreamOptions.class)
.setMetadataTable(partitionMetadataTableName);
.setMetadataTable(partitionMetadataTableNames.getTableName());

Check warning on line 1639 in v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java

View check run for this annotation

Codecov / codecov/patch

v1/src/main/java/org/apache/beam/sdk/io/gcp/spanner/LocalSpannerIO.java#L1639

Added line #L1639 was not covered by tests

PCollection<byte[]> impulseOut = input.apply(Impulse.create());
PCollection<DataChangeRecord> results =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public static void validate(MqttToPubsubOptions options) {
public static PipelineResult run(MqttToPubsubOptions options) {
validate(options);
Pipeline pipeline = Pipeline.create(options);
MqttIO.Read mqttIo;
MqttIO.Read<byte[]> mqttIo;
if (!options.getUsername().isEmpty() || !options.getPassword().isBlank()) {
mqttIo =
MqttIO.read()
Expand Down