Skip to content

Commit

Permalink
Fixes an upgrade imcompatiblity of BQ read/write transforms (apache#3…
Browse files Browse the repository at this point in the history
…0562)

Co-authored-by: Chamikara Jayalath <[email protected]>
  • Loading branch information
Abacn and chamikaramj authored Mar 8, 2024
1 parent 1b03f8e commit 5d89a33
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.util.construction;

import com.fasterxml.jackson.core.Version;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -25,6 +26,7 @@
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -51,6 +53,7 @@
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannelBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -459,4 +462,59 @@ public static Object fromByteArray(byte[] bytes) throws InvalidClassException {
throw new RuntimeException(e);
}
}

@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
private static Version getVersionFromStr(String version) {
String[] versionParts = Splitter.onPattern("\\.").splitToList(version).toArray(new String[0]);
if (versionParts.length < 2) {
throw new IllegalArgumentException(
"Expected the version string to start with `<major>.<minor>` "
+ "but received "
+ version);
}

// Concatenating patch and suffix to determine the correct patch version.
String patchAndSuffix =
versionParts.length == 2
? ""
: String.join(".", Arrays.copyOfRange(versionParts, 2, versionParts.length));
StringBuilder patchVersionBuilder = new StringBuilder();
for (int i = 0; i < patchAndSuffix.length(); i++) {
if (Character.isDigit(patchAndSuffix.charAt(i))) {
patchVersionBuilder.append(patchAndSuffix.charAt(i));
} else {
break;
}
}
String patchVersion = patchVersionBuilder.toString();
if (patchVersion.isEmpty()) {
patchVersion = "0";
}
return new Version(
Integer.parseInt(versionParts[0]),
Integer.parseInt(versionParts[1]),
Integer.parseInt(patchVersion),
null,
null,
null);
}

/**
* Compares two Beam versions. Expects the versions to be in the format
* <major>.<minor>.<patch><suffix>. <patch> and <suffix> are optional. Version numbers should be
* integers. When comparing suffix will be ignored.
*
* @param firstVersion first version to compare.
* @param secondVersion second version to compare.
* @return a negative number of first version is smaller than the second, a positive number if the
* first version is larger than the second, 0 if versions are equal.
*/
public static int compareVersions(String firstVersion, String secondVersion) {
if (firstVersion.equals(secondVersion)) {
return 0;
}
return getVersionFromStr(firstVersion).compareTo(getVersionFromStr(secondVersion));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -371,4 +371,17 @@ public void testTransformUpgradeMultipleURNs() throws Exception {
.get("TransformUpgraderTest-TestTransform2");
validateTestParam(upgradedTransform2, 4);
}

@Test
public void testVersionComparison() throws Exception {
assertTrue(TransformUpgrader.compareVersions("2.53.0", "2.53.0") == 0);

assertTrue(TransformUpgrader.compareVersions("2.53.0", "2.55.0") < 0);
assertTrue(TransformUpgrader.compareVersions("2.53.0", "2.55.0-SNAPSHOT") < 0);
assertTrue(TransformUpgrader.compareVersions("2.53.0", "2.55.0.dev") < 0);

assertTrue(TransformUpgrader.compareVersions("2.55.0", "2.53.0") > 0);
assertTrue(TransformUpgrader.compareVersions("2.55.0-SNAPSHOT", "2.53.0") > 0);
assertTrue(TransformUpgrader.compareVersions("2.55.0.dev", "2.53.0") > 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler;
import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.sdk.util.construction.TransformUpgrader;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;
Expand Down Expand Up @@ -199,6 +201,14 @@ public Row toConfigRow(TypedRead<?> transform) {

@Override
public TypedRead<?> fromConfigRow(Row configRow, PipelineOptions options) {
String updateCompatibilityBeamVersion =
options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
// We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption
// is not correctly passed in for pipelines that use Beam 2.53.0.
// This is fixed for Beam 2.54.0 and later.
updateCompatibilityBeamVersion =
(updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.53.0";

try {
BigQueryIO.TypedRead.Builder builder = new AutoValue_BigQueryIO_TypedRead.Builder<>();

Expand Down Expand Up @@ -312,11 +322,20 @@ public TypedRead<?> fromConfigRow(Row configRow, PipelineOptions options) {
if (projectionPushdownApplied != null) {
builder = builder.setProjectionPushdownApplied(projectionPushdownApplied);
}
byte[] badRecordRouter = configRow.getBytes("bad_record_router");
builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter));
byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler");
builder.setBadRecordErrorHandler(
(ErrorHandler<BadRecord, ?>) fromByteArray(badRecordErrorHandler));

if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.55.0") < 0) {
// We need to use defaults here for BQ rear/write transforms upgraded
// from older Beam versions.
// See https://github.com/apache/beam/issues/30534.
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new BadRecordErrorHandler.DefaultErrorHandler<>());
} else {
byte[] badRecordRouter = configRow.getBytes("bad_record_router");
builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter));
byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler");
builder.setBadRecordErrorHandler(
(ErrorHandler<BadRecord, ?>) fromByteArray(badRecordErrorHandler));
}

return builder.build();
} catch (InvalidClassException e) {
Expand Down Expand Up @@ -573,6 +592,14 @@ public Row toConfigRow(Write<?> transform) {

@Override
public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
String updateCompatibilityBeamVersion =
options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
// We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption
// is not correctly passed in for pipelines that use Beam 2.53.0.
// This is fixed for Beam 2.54.0 and later.
updateCompatibilityBeamVersion =
(updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.53.0";

try {
BigQueryIO.Write.Builder builder = new AutoValue_BigQueryIO_Write.Builder<>();

Expand Down Expand Up @@ -716,20 +743,11 @@ public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
builder = builder.setMaxBytesPerPartition(maxBytesPerPartition);
}

String updateCompatibilityBeamVersion =
options.as(StreamingOptions.class).getUpdateCompatibilityVersion();

// We need to update the 'triggerring_frequency' field name for pipelines that are upgraded
// from Beam 2.53.0 due to https://github.com/apache/beam/pull/29785.
// We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption
// is not correctly passed in for pipelines that use Beam 2.53.0.
// Both above issues are fixed for Beam 2.54.0 and later.
updateCompatibilityBeamVersion =
(updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.53.0";

// This is fixed for Beam 2.54.0 and later.
String triggeringFrequencyFieldName =
(updateCompatibilityBeamVersion != null
&& updateCompatibilityBeamVersion.equals("2.53.0"))
TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.53.0") == 0
? "triggerring_frequency"
: "triggering_frequency";

Expand Down Expand Up @@ -840,11 +858,20 @@ public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
builder.setRowMutationInformationFn(
(SerializableFunction) fromByteArray(rowMutationInformationFnBytes));
}
byte[] badRecordRouter = configRow.getBytes("bad_record_router");
builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter));
byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler");
builder.setBadRecordErrorHandler(
(ErrorHandler<BadRecord, ?>) fromByteArray(badRecordErrorHandler));

if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.55.0") < 0) {
// We need to use defaults here for BQ rear/write transforms upgraded
// from older Beam versions.
// See https://github.com/apache/beam/issues/30534.
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new BadRecordErrorHandler.DefaultErrorHandler<>());
} else {
byte[] badRecordRouter = configRow.getBytes("bad_record_router");
builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter));
byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler");
builder.setBadRecordErrorHandler(
(ErrorHandler<BadRecord, ?>) fromByteArray(badRecordErrorHandler));
}

return builder.build();
} catch (InvalidClassException e) {
Expand Down

0 comments on commit 5d89a33

Please sign in to comment.