diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java index d14dd81283de..deaa77d9b1be 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.util.construction; +import com.fasterxml.jackson.core.Version; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -25,6 +26,7 @@ import java.io.ObjectOutputStream; import java.net.ServerSocket; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -51,6 +53,7 @@ import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannelBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -459,4 +462,59 @@ public static Object fromByteArray(byte[] bytes) throws InvalidClassException { throw new RuntimeException(e); } } + + @SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) + }) + private static Version getVersionFromStr(String version) { + String[] versionParts = Splitter.onPattern("\\.").splitToList(version).toArray(new String[0]); + if (versionParts.length < 2) { + throw new IllegalArgumentException( + "Expected the version string to start with `.` " + + "but received " + + version); + } + + // Concatenating patch and suffix to determine the correct patch version. + String patchAndSuffix = + versionParts.length == 2 + ? "" + : String.join(".", Arrays.copyOfRange(versionParts, 2, versionParts.length)); + StringBuilder patchVersionBuilder = new StringBuilder(); + for (int i = 0; i < patchAndSuffix.length(); i++) { + if (Character.isDigit(patchAndSuffix.charAt(i))) { + patchVersionBuilder.append(patchAndSuffix.charAt(i)); + } else { + break; + } + } + String patchVersion = patchVersionBuilder.toString(); + if (patchVersion.isEmpty()) { + patchVersion = "0"; + } + return new Version( + Integer.parseInt(versionParts[0]), + Integer.parseInt(versionParts[1]), + Integer.parseInt(patchVersion), + null, + null, + null); + } + + /** + * Compares two Beam versions. Expects the versions to be in the format + * ... and are optional. Version numbers should be + * integers. When comparing suffix will be ignored. + * + * @param firstVersion first version to compare. + * @param secondVersion second version to compare. + * @return a negative number of first version is smaller than the second, a positive number if the + * first version is larger than the second, 0 if versions are equal. + */ + public static int compareVersions(String firstVersion, String secondVersion) { + if (firstVersion.equals(secondVersion)) { + return 0; + } + return getVersionFromStr(firstVersion).compareTo(getVersionFromStr(secondVersion)); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java index 7c1a57d571a7..65b3e8e89cad 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java @@ -371,4 +371,17 @@ public void testTransformUpgradeMultipleURNs() throws Exception { .get("TransformUpgraderTest-TestTransform2"); validateTestParam(upgradedTransform2, 4); } + + @Test + public void testVersionComparison() throws Exception { + assertTrue(TransformUpgrader.compareVersions("2.53.0", "2.53.0") == 0); + + assertTrue(TransformUpgrader.compareVersions("2.53.0", "2.55.0") < 0); + assertTrue(TransformUpgrader.compareVersions("2.53.0", "2.55.0-SNAPSHOT") < 0); + assertTrue(TransformUpgrader.compareVersions("2.53.0", "2.55.0.dev") < 0); + + assertTrue(TransformUpgrader.compareVersions("2.55.0", "2.53.0") > 0); + assertTrue(TransformUpgrader.compareVersions("2.55.0-SNAPSHOT", "2.53.0") > 0); + assertTrue(TransformUpgrader.compareVersions("2.55.0.dev", "2.53.0") > 0); + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index 2fa5bdf25a10..9b499ea3253c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -59,9 +59,11 @@ import org.apache.beam.sdk.transforms.errorhandling.BadRecord; import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler; import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.util.construction.TransformUpgrader; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow; @@ -199,6 +201,14 @@ public Row toConfigRow(TypedRead transform) { @Override public TypedRead fromConfigRow(Row configRow, PipelineOptions options) { + String updateCompatibilityBeamVersion = + options.as(StreamingOptions.class).getUpdateCompatibilityVersion(); + // We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption + // is not correctly passed in for pipelines that use Beam 2.53.0. + // This is fixed for Beam 2.54.0 and later. + updateCompatibilityBeamVersion = + (updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.53.0"; + try { BigQueryIO.TypedRead.Builder builder = new AutoValue_BigQueryIO_TypedRead.Builder<>(); @@ -312,11 +322,20 @@ public TypedRead fromConfigRow(Row configRow, PipelineOptions options) { if (projectionPushdownApplied != null) { builder = builder.setProjectionPushdownApplied(projectionPushdownApplied); } - byte[] badRecordRouter = configRow.getBytes("bad_record_router"); - builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter)); - byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler"); - builder.setBadRecordErrorHandler( - (ErrorHandler) fromByteArray(badRecordErrorHandler)); + + if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.55.0") < 0) { + // We need to use defaults here for BQ rear/write transforms upgraded + // from older Beam versions. + // See https://github.com/apache/beam/issues/30534. + builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER); + builder.setBadRecordErrorHandler(new BadRecordErrorHandler.DefaultErrorHandler<>()); + } else { + byte[] badRecordRouter = configRow.getBytes("bad_record_router"); + builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter)); + byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler"); + builder.setBadRecordErrorHandler( + (ErrorHandler) fromByteArray(badRecordErrorHandler)); + } return builder.build(); } catch (InvalidClassException e) { @@ -573,6 +592,14 @@ public Row toConfigRow(Write transform) { @Override public Write fromConfigRow(Row configRow, PipelineOptions options) { + String updateCompatibilityBeamVersion = + options.as(StreamingOptions.class).getUpdateCompatibilityVersion(); + // We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption + // is not correctly passed in for pipelines that use Beam 2.53.0. + // This is fixed for Beam 2.54.0 and later. + updateCompatibilityBeamVersion = + (updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.53.0"; + try { BigQueryIO.Write.Builder builder = new AutoValue_BigQueryIO_Write.Builder<>(); @@ -716,20 +743,11 @@ public Write fromConfigRow(Row configRow, PipelineOptions options) { builder = builder.setMaxBytesPerPartition(maxBytesPerPartition); } - String updateCompatibilityBeamVersion = - options.as(StreamingOptions.class).getUpdateCompatibilityVersion(); - // We need to update the 'triggerring_frequency' field name for pipelines that are upgraded // from Beam 2.53.0 due to https://github.com/apache/beam/pull/29785. - // We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption - // is not correctly passed in for pipelines that use Beam 2.53.0. - // Both above issues are fixed for Beam 2.54.0 and later. - updateCompatibilityBeamVersion = - (updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.53.0"; - + // This is fixed for Beam 2.54.0 and later. String triggeringFrequencyFieldName = - (updateCompatibilityBeamVersion != null - && updateCompatibilityBeamVersion.equals("2.53.0")) + TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.53.0") == 0 ? "triggerring_frequency" : "triggering_frequency"; @@ -840,11 +858,20 @@ public Write fromConfigRow(Row configRow, PipelineOptions options) { builder.setRowMutationInformationFn( (SerializableFunction) fromByteArray(rowMutationInformationFnBytes)); } - byte[] badRecordRouter = configRow.getBytes("bad_record_router"); - builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter)); - byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler"); - builder.setBadRecordErrorHandler( - (ErrorHandler) fromByteArray(badRecordErrorHandler)); + + if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.55.0") < 0) { + // We need to use defaults here for BQ rear/write transforms upgraded + // from older Beam versions. + // See https://github.com/apache/beam/issues/30534. + builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER); + builder.setBadRecordErrorHandler(new BadRecordErrorHandler.DefaultErrorHandler<>()); + } else { + byte[] badRecordRouter = configRow.getBytes("bad_record_router"); + builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter)); + byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler"); + builder.setBadRecordErrorHandler( + (ErrorHandler) fromByteArray(badRecordErrorHandler)); + } return builder.build(); } catch (InvalidClassException e) {