From 1c50fd2e6a8bd43f767f2ef49d4d5ecbf9c84f68 Mon Sep 17 00:00:00 2001 From: Chamikara Jayalath Date: Wed, 27 Sep 2023 14:44:41 -0700 Subject: [PATCH] Upgrade transforms without upgrading the pipelines (#28210) * Upgrade Java transforms without upgrading the pipelines * Addresses reviewer comments * Reduce visibility of the test-only constructor * Fix compile errors * Fix spotless * Addressing reviewer comments * Do not bundle Transform Service Launcher in the harness * Fix harness build and a fix for when a runner invokes toProto() multiple times --- runners/core-construction-java/build.gradle | 1 + .../core/construction/CombineTranslation.java | 6 +- .../CreatePCollectionViewTranslation.java | 2 +- .../runners/core/construction/External.java | 10 +- .../ExternalTranslationOptions.java | 43 ++ .../ExternalTranslationOptionsRegistrar.java | 36 ++ .../core/construction/FlattenTranslator.java | 2 +- .../construction/GroupByKeyTranslation.java | 2 +- .../GroupIntoBatchesTranslation.java | 4 +- .../core/construction/ImpulseTranslation.java | 2 +- .../construction/PTransformTranslation.java | 102 ++++- .../construction/PipelineTranslation.java | 15 + .../core/construction/ReadTranslation.java | 4 +- .../construction/ReshuffleTranslation.java | 2 +- .../core/construction/SplittableParDo.java | 2 +- .../construction/TestStreamTranslation.java | 2 +- .../core/construction/TransformUpgrader.java | 330 ++++++++++++++++ .../construction/WindowIntoTranslation.java | 5 +- .../construction/WriteFilesTranslation.java | 2 +- .../construction/TransformUpgraderTest.java | 369 ++++++++++++++++++ .../FlinkStreamingTransformTranslators.java | 2 +- .../beam/runners/dataflow/DataflowRunner.java | 6 + .../dataflow/PrimitiveParDoSingleFactory.java | 2 +- .../runners/dataflow/DataflowRunnerTest.java | 2 +- .../samza/translation/SamzaPublishView.java | 2 +- .../StreamingTransformTranslator.java | 4 +- .../expansion/service/ExpansionService.java | 65 +++ sdks/java/harness/build.gradle | 2 + .../gcp/pubsub/PubSubPayloadTranslation.java | 9 +- 29 files changed, 994 insertions(+), 41 deletions(-) create mode 100644 runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExternalTranslationOptions.java create mode 100644 runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExternalTranslationOptionsRegistrar.java create mode 100644 runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java create mode 100644 runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java diff --git a/runners/core-construction-java/build.gradle b/runners/core-construction-java/build.gradle index feac7c37c8e0..f593865b3fe9 100644 --- a/runners/core-construction-java/build.gradle +++ b/runners/core-construction-java/build.gradle @@ -55,6 +55,7 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":sdks:java:extensions:avro") implementation project(path: ":sdks:java:fn-execution") + implementation project(path: ":sdks:java:transform-service:launcher") implementation library.java.vendored_grpc_1_54_0 implementation library.java.vendored_guava_32_1_2_jre implementation library.java.classgraph diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java index 3f902acf250c..fbe4876b414f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java @@ -61,7 +61,7 @@ public static class CombinePerKeyPayloadTranslator private CombinePerKeyPayloadTranslator() {} @Override - public String getUrn(Combine.PerKey transform) { + public String getUrn() { return COMBINE_PER_KEY_TRANSFORM_URN; } @@ -108,7 +108,7 @@ public static class CombineGloballyPayloadTranslator private CombineGloballyPayloadTranslator() {} @Override - public String getUrn(Combine.Globally transform) { + public String getUrn() { return COMBINE_GLOBALLY_TRANSFORM_URN; } @@ -165,7 +165,7 @@ public static class CombineGroupedValuesPayloadTranslator private CombineGroupedValuesPayloadTranslator() {} @Override - public String getUrn(Combine.GroupedValues transform) { + public String getUrn() { return COMBINE_GROUPED_VALUES_TRANSFORM_URN; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java index a679737fd616..71038564ec4c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslation.java @@ -90,7 +90,7 @@ public static PCollectionView getView( static class CreatePCollectionViewTranslator implements TransformPayloadTranslator> { @Override - public String getUrn(View.CreatePCollectionView transform) { + public String getUrn() { return PTransformTranslation.CREATE_VIEW_TRANSFORM_URN; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java index cedd8875751f..534a2b5fe0e6 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java @@ -295,7 +295,7 @@ public OutputT expand(InputT input) { response .getComponents() .toBuilder() - .putAllEnvironments(resolveArtifacts(newEnvironmentsWithDependencies)) + .putAllEnvironments(resolveArtifacts(newEnvironmentsWithDependencies, endpoint)) .build(); expandedTransform = response.getTransform(); expandedRequirements = response.getRequirementsList(); @@ -338,8 +338,8 @@ public OutputT expand(InputT input) { return toOutputCollection(outputMapBuilder.build()); } - private Map resolveArtifacts( - Map environments) { + static Map resolveArtifacts( + Map environments, Endpoints.ApiServiceDescriptor endpoint) { if (environments.size() == 0) { return environments; } @@ -367,7 +367,7 @@ private Map resolveArtifacts( } } - private RunnerApi.Environment resolveArtifacts( + private static RunnerApi.Environment resolveArtifacts( ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub retrievalStub, RunnerApi.Environment environment) throws IOException { @@ -378,7 +378,7 @@ private RunnerApi.Environment resolveArtifacts( .build(); } - private List resolveArtifacts( + private static List resolveArtifacts( ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub retrievalStub, List artifacts) throws IOException { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExternalTranslationOptions.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExternalTranslationOptions.java new file mode 100644 index 000000000000..4b3ef24ca1d2 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExternalTranslationOptions.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import java.util.List; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.resourcehints.ResourceHintsOptions.EmptyListDefault; + +public interface ExternalTranslationOptions extends PipelineOptions { + + @Description("Set of URNs of transforms to be overriden using the transform service.") + @Default.InstanceFactory(EmptyListDefault.class) + List getTransformsToOverride(); + + void setTransformsToOverride(List transformsToOverride); + + @Description("Address of an already available transform service.") + String getTransformServiceAddress(); + + void setTransformServiceAddress(String transformServiceAddress); + + @Description("An available Beam version which will be used to start a transform service.") + String getTransformServiceBeamVersion(); + + void setTransformServiceBeamVersion(String transformServiceBeamVersion); +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExternalTranslationOptionsRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExternalTranslationOptionsRegistrar.java new file mode 100644 index 000000000000..6296f4c83775 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExternalTranslationOptionsRegistrar.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import com.google.auto.service.AutoService; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsRegistrar; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +/** A registrar for ExternalTranslationOptions. */ +@AutoService(PipelineOptionsRegistrar.class) +@Internal +public class ExternalTranslationOptionsRegistrar implements PipelineOptionsRegistrar { + @Override + public Iterable> getPipelineOptions() { + return ImmutableList.>builder() + .add(ExternalTranslationOptions.class) + .build(); + } +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java index 37c09663c5a2..201a65e6233c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/FlattenTranslator.java @@ -43,7 +43,7 @@ public static TransformPayloadTranslator create() { private FlattenTranslator() {} @Override - public String getUrn(Flatten.PCollections transform) { + public String getUrn() { return PTransformTranslation.FLATTEN_TRANSFORM_URN; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java index e6bbbf0767a5..183fa7ffcdc9 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupByKeyTranslation.java @@ -38,7 +38,7 @@ public class GroupByKeyTranslation { static class GroupByKeyTranslator implements TransformPayloadTranslator> { @Override - public String getUrn(GroupByKey transform) { + public String getUrn() { return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupIntoBatchesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupIntoBatchesTranslation.java index c91e9cedb9ac..7c81afd8ae07 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupIntoBatchesTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupIntoBatchesTranslation.java @@ -39,7 +39,7 @@ public class GroupIntoBatchesTranslation { static class GroupIntoBatchesTranslator implements TransformPayloadTranslator> { @Override - public String getUrn(GroupIntoBatches transform) { + public String getUrn() { return PTransformTranslation.GROUP_INTO_BATCHES_URN; } @@ -61,7 +61,7 @@ public RunnerApi.FunctionSpec translate( static class ShardedGroupIntoBatchesTranslator implements TransformPayloadTranslator.WithShardedKey> { @Override - public String getUrn(GroupIntoBatches.WithShardedKey transform) { + public String getUrn() { return PTransformTranslation.GROUP_INTO_BATCHES_WITH_SHARDED_KEY_URN; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java index 3de0ce9de8ac..25f0cd7749b5 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java @@ -37,7 +37,7 @@ public class ImpulseTranslation { private static class ImpulseTranslator implements TransformPayloadTranslator { @Override - public String getUrn(Impulse transform) { + public String getUrn() { return PTransformTranslation.IMPULSE_TRANSFORM_URN; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index 2acd77885fcc..8f415e718e95 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -37,15 +37,20 @@ import org.apache.beam.runners.core.construction.ExternalTranslation.ExternalTranslator; import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoTranslator; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.SchemaTranslation; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.common.ReflectHelpers.ObjectsClassComparator; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; @@ -54,6 +59,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Utilities for converting {@link PTransform PTransforms} to {@link RunnerApi Runner API protocol @@ -65,10 +72,14 @@ "keyfor" }) // TODO(https://github.com/apache/beam/issues/20497) public class PTransformTranslation { + + private static final Logger LOG = LoggerFactory.getLogger(PTransformTranslation.class); + // We specifically copy the values here so that they can be used in switch case statements // and we validate that the value matches the actual URN in the static block below. // Primitives + public static final String CREATE_TRANSFORM_URN = "beam:transform:create:v1"; public static final String PAR_DO_TRANSFORM_URN = "beam:transform:pardo:v1"; public static final String FLATTEN_TRANSFORM_URN = "beam:transform:flatten:v1"; public static final String GROUP_BY_KEY_TRANSFORM_URN = "beam:transform:group_by_key:v1"; @@ -83,6 +94,10 @@ public class PTransformTranslation { public static final ImmutableSet RUNNER_IMPLEMENTED_TRANSFORMS = ImmutableSet.of(GROUP_BY_KEY_TRANSFORM_URN, IMPULSE_TRANSFORM_URN); + public static final String CONFIG_ROW_KEY = "config_row"; + + public static final String CONFIG_ROW_SCHEMA_KEY = "config_row_schema"; + // DeprecatedPrimitives /** * @deprecated SDKs should move away from creating `Read` transforms and migrate to using Impulse @@ -435,10 +450,9 @@ public RunnerApi.PTransform translate( RunnerApi.PTransform.Builder transformBuilder = translateAppliedPTransform(appliedPTransform, subtransforms, components); - FunctionSpec spec = - KNOWN_PAYLOAD_TRANSLATORS - .get(appliedPTransform.getTransform().getClass()) - .translate(appliedPTransform, components); + TransformPayloadTranslator payloadTranslator = + KNOWN_PAYLOAD_TRANSLATORS.get(appliedPTransform.getTransform().getClass()); + FunctionSpec spec = payloadTranslator.translate(appliedPTransform, components); if (spec != null) { transformBuilder.setSpec(spec); @@ -461,6 +475,33 @@ public RunnerApi.PTransform translate( } } } + + Row configRow = null; + try { + configRow = payloadTranslator.toConfigRow(appliedPTransform.getTransform()); + } catch (UnsupportedOperationException e) { + // Optional toConfigRow() has not been implemented. We can just ignore. + } catch (Exception e) { + LOG.warn( + "Could not attach the config row for transform " + + appliedPTransform.getTransform().getName() + + ": " + + e); + // Ignoring the error and continuing with the translation since attaching config rows is + // optional. + } + if (configRow != null) { + transformBuilder.putAnnotations( + CONFIG_ROW_KEY, + ByteString.copyFrom( + CoderUtils.encodeToByteArray(RowCoder.of(configRow.getSchema()), configRow))); + + transformBuilder.putAnnotations( + CONFIG_ROW_SCHEMA_KEY, + ByteString.copyFrom( + SchemaTranslation.schemaToProto(configRow.getSchema(), true).toByteArray())); + } + return transformBuilder.build(); } } @@ -508,14 +549,63 @@ static RunnerApi.PTransform.Builder translateAppliedPTransform( * *

When going to a protocol buffer message, the translator produces a payload corresponding to * the Java representation while registering components that payload references. + * + *

Also, provides methods for generating a Row-based constructor config for the transform that + * can be later used to re-construct the transform. */ public interface TransformPayloadTranslator> { - String getUrn(T transform); + /** + * Provides a unique URN for transforms represented by this {@code TransformPayloadTranslator}. + */ + String getUrn(); + + /** + * Same as {@link #getUrn()} but the returned URN may depend on the transform provided. + * + *

Only override this if the same {@code TransformPayloadTranslator} used for multiple + * transforms. Otherwise, use {@link #getUrn()}. + */ + default String getUrn(T transform) { + return getUrn(); + } + + /** */ + /** + * Translates the given transform represented by the provided {@code AppliedPTransform} to a + * {@code FunctionSpec} with a URN and a payload. + * + * @param application an {@code AppliedPTransform} that includes the transform to be expanded. + * @param components components of the pipeline that includes the transform. + * @return a generated spec for the transform to be included in the pipeline proto. If return + * value is null, transform should include an empty spec. + * @throws IOException + */ @Nullable FunctionSpec translate(AppliedPTransform application, SdkComponents components) throws IOException; + /** + * Generates a Row-based construction configuration for the provided transform. + * + * @param transform a transform represented by the current {@code TransformPayloadTranslator}. + * @return + */ + default Row toConfigRow(T transform) { + throw new UnsupportedOperationException("Not implemented"); + } + + /** + * Construts a transform from a provided Row-based construction configuration. + * + * @param configRow a construction configuration similar to what would be generated by the + * {@link #toConfigRow(PTransform)} method. + * @return a transform represented by the current {@code TransformPayloadTranslator}. + */ + default T fromConfigRow(Row configRow) { + throw new UnsupportedOperationException("Not implemented"); + } + /** * A {@link TransformPayloadTranslator} for transforms that contain no references to components, * so they do not need a specialized rehydration. @@ -526,7 +616,7 @@ abstract class NotSerializable> public static NotSerializable forUrn(final String urn) { return new NotSerializable>() { @Override - public String getUrn(PTransform transform) { + public String getUrn() { return urn; } }; diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java index 53553e7062b3..e39a38a74c2c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java @@ -102,6 +102,21 @@ public void visitPrimitiveTransform(Node node) { // TODO(JIRA-5649): Don't even emit these transforms in the generated protos. res = elideDeprecatedViews(res); } + + ExternalTranslationOptions externalTranslationOptions = + pipeline.getOptions().as(ExternalTranslationOptions.class); + List urnsToOverride = externalTranslationOptions.getTransformsToOverride(); + if (urnsToOverride.size() > 0) { + try (TransformUpgrader upgrader = TransformUpgrader.of()) { + res = + upgrader.upgradeTransformsViaTransformService( + res, urnsToOverride, externalTranslationOptions); + } catch (Exception e) { + throw new RuntimeException( + "Could not override the transforms with URNs " + urnsToOverride, e); + } + } + // Validate that translation didn't produce an invalid pipeline. PipelineValidator.validate(res); return res; diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java index 40a7205b5d57..f04d25509593 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java @@ -154,7 +154,7 @@ public static TransformPayloadTranslator create() { private UnboundedReadPayloadTranslator() {} @Override - public String getUrn(SplittableParDo.PrimitiveUnboundedRead transform) { + public String getUrn() { return PTransformTranslation.READ_TRANSFORM_URN; } @@ -181,7 +181,7 @@ public static TransformPayloadTranslator create() { private BoundedReadPayloadTranslator() {} @Override - public String getUrn(SplittableParDo.PrimitiveBoundedRead transform) { + public String getUrn() { return PTransformTranslation.READ_TRANSFORM_URN; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReshuffleTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReshuffleTranslation.java index 98d39c8ff0ac..bd91673f3818 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReshuffleTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReshuffleTranslation.java @@ -38,7 +38,7 @@ public class ReshuffleTranslation { static class ReshuffleTranslator implements TransformPayloadTranslator> { @Override - public String getUrn(Reshuffle transform) { + public String getUrn() { return PTransformTranslation.RESHUFFLE_URN; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java index 42c9e523e965..5ea2c4968dd9 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java @@ -395,7 +395,7 @@ public static TransformPayloadTranslator create() { private ProcessKeyedElementsTranslator() {} @Override - public String getUrn(ProcessKeyedElements transform) { + public String getUrn() { return PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java index aa582bf14f3c..53bb324d03fa 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java @@ -168,7 +168,7 @@ static TestStream.Event eventFromProto( /** A translator registered to translate {@link TestStream} objects to protobuf representation. */ static class TestStreamTranslator implements TransformPayloadTranslator> { @Override - public String getUrn(TestStream transform) { + public String getUrn() { return TEST_STREAM_TRANSFORM_URN; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java new file mode 100644 index 000000000000..d657bb31b184 --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import org.apache.beam.model.expansion.v1.ExpansionApi; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; +import org.apache.beam.model.pipeline.v1.SchemaApi; +import org.apache.beam.sdk.transformservice.launcher.TransformServiceLauncher; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannelBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; + +/** + * A utility class that allows upgrading transforms of a given pipeline using the Beam Transform + * Service. + */ +public class TransformUpgrader implements AutoCloseable { + private static final String UPGRADE_NAMESPACE = "transform:upgrade:"; + + private ExpansionServiceClientFactory clientFactory; + + private TransformUpgrader() { + // Creating a default 'ExpansionServiceClientFactory' instance per 'TransformUpgrader' instance + // so that each instance can maintain a set of live channels and close them independently. + clientFactory = + DefaultExpansionServiceClientFactory.create( + endPoint -> ManagedChannelBuilder.forTarget(endPoint.getUrl()).usePlaintext().build()); + } + + private TransformUpgrader(ExpansionServiceClientFactory clientFactory) { + this.clientFactory = clientFactory; + } + + public static TransformUpgrader of() { + return new TransformUpgrader(); + } + + @VisibleForTesting + static TransformUpgrader of(ExpansionServiceClientFactory clientFactory) { + return new TransformUpgrader(clientFactory); + } + + /** + * Upgrade identified transforms in a given pipeline using the Transform Service. + * + * @param pipeline the pipeline proto. + * @param urnsToOverride URNs of the transforms to be overridden. + * @param options options for determining the transform service to use. + * @return pipelines with transforms upgraded using the Transform Service. + * @throws Exception + */ + public RunnerApi.Pipeline upgradeTransformsViaTransformService( + RunnerApi.Pipeline pipeline, List urnsToOverride, ExternalTranslationOptions options) + throws IOException, TimeoutException { + List transformsToOverride = + pipeline.getComponents().getTransformsMap().entrySet().stream() + .filter( + entry -> { + String urn = entry.getValue().getSpec().getUrn(); + if (urn != null && urnsToOverride.contains(urn)) { + return true; + } + return false; + }) + .map( + entry -> { + return entry.getKey(); + }) + .collect(Collectors.toList()); + + String serviceAddress; + TransformServiceLauncher service = null; + + if (options.getTransformServiceAddress() != null) { + serviceAddress = options.getTransformServiceAddress(); + } else if (options.getTransformServiceBeamVersion() != null) { + String projectName = UUID.randomUUID().toString(); + int port = findAvailablePort(); + service = TransformServiceLauncher.forProject(projectName, port); + service.setBeamVersion(options.getTransformServiceBeamVersion()); + + // Starting the transform service. + service.start(); + service.waitTillUp(-1); + serviceAddress = "localhost:" + Integer.toString(port); + } else { + throw new IllegalArgumentException( + "Either option TransformServiceAddress or option TransformServiceBeamVersion should be " + + "provided to override a transform using the transform service"); + } + + Endpoints.ApiServiceDescriptor expansionServiceEndpoint = + Endpoints.ApiServiceDescriptor.newBuilder().setUrl(serviceAddress).build(); + + for (String transformId : transformsToOverride) { + pipeline = + updateTransformViaTransformService(pipeline, transformId, expansionServiceEndpoint); + } + + if (service != null) { + service.shutdown(); + } + + return pipeline; + } + + private < + InputT extends PInput, + OutputT extends POutput, + TransformT extends org.apache.beam.sdk.transforms.PTransform> + RunnerApi.Pipeline updateTransformViaTransformService( + RunnerApi.Pipeline runnerAPIpipeline, + String transformId, + Endpoints.ApiServiceDescriptor transformServiceEndpoint) + throws IOException { + PTransform transformToUpgrade = + runnerAPIpipeline.getComponents().getTransformsMap().get(transformId); + if (transformToUpgrade == null) { + throw new IllegalArgumentException("Could not find a transform with the ID " + transformId); + } + + ByteString configRowBytes = + transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_KEY); + ByteString configRowSchemaBytes = + transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_SCHEMA_KEY); + SchemaApi.Schema configRowSchemaProto = + SchemaApi.Schema.parseFrom(configRowSchemaBytes.toByteArray()); + + ExternalTransforms.ExternalConfigurationPayload payload = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .setSchema(configRowSchemaProto) + .setPayload(configRowBytes) + .build(); + + RunnerApi.PTransform.Builder ptransformBuilder = + RunnerApi.PTransform.newBuilder() + .setUniqueName(transformToUpgrade.getUniqueName() + "_external") + .setSpec( + RunnerApi.FunctionSpec.newBuilder() + .setUrn(transformToUpgrade.getSpec().getUrn()) + .setPayload(ByteString.copyFrom(payload.toByteArray())) + .build()); + + for (Map.Entry entry : transformToUpgrade.getInputsMap().entrySet()) { + ptransformBuilder.putInputs(entry.getKey(), entry.getValue()); + } + for (Map.Entry entry : transformToUpgrade.getOutputsMap().entrySet()) { + ptransformBuilder.putOutputs(entry.getKey(), entry.getValue()); + } + + ExpansionApi.ExpansionRequest.Builder requestBuilder = + ExpansionApi.ExpansionRequest.newBuilder(); + ExpansionApi.ExpansionRequest request = + requestBuilder + .setComponents(runnerAPIpipeline.getComponents()) + .setTransform(ptransformBuilder.build()) + .setNamespace(UPGRADE_NAMESPACE) + .build(); + + ExpansionApi.ExpansionResponse response = + clientFactory.getExpansionServiceClient(transformServiceEndpoint).expand(request); + + if (!Strings.isNullOrEmpty(response.getError())) { + throw new RuntimeException(String.format("expansion service error: %s", response.getError())); + } + + Map newEnvironmentsWithDependencies = + response.getComponents().getEnvironmentsMap().entrySet().stream() + .filter( + kv -> + !runnerAPIpipeline.getComponents().getEnvironmentsMap().containsKey(kv.getKey()) + && kv.getValue().getDependenciesCount() != 0) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + RunnerApi.Components expandedComponents = + response + .getComponents() + .toBuilder() + .putAllEnvironments( + External.ExpandableTransform.resolveArtifacts( + newEnvironmentsWithDependencies, transformServiceEndpoint)) + .build(); + RunnerApi.PTransform expandedTransform = response.getTransform(); + List expandedRequirements = response.getRequirementsList(); + + RunnerApi.Components.Builder newComponentsBuilder = expandedComponents.toBuilder(); + + // We record transforms that consume outputs of the old transform and update them to consume + // outputs of the new (upgraded) transform. + Collection oldOutputs = transformToUpgrade.getOutputsMap().values(); + Map inputReplacements = new HashMap<>(); + if (transformToUpgrade.getOutputsMap().size() == 1) { + inputReplacements.put( + oldOutputs.iterator().next(), + expandedTransform.getOutputsMap().values().iterator().next()); + } else { + for (Map.Entry entry : transformToUpgrade.getOutputsMap().entrySet()) { + if (expandedTransform.getOutputsMap().keySet().contains(entry.getKey())) { + throw new IllegalArgumentException( + "Original transform did not have an output with tag " + + entry.getKey() + + " but upgraded transform did."); + } + String newOutput = expandedTransform.getOutputsMap().get(entry.getKey()); + if (newOutput == null) { + throw new IllegalArgumentException( + "Could not find an output with tag " + + entry.getKey() + + " for the transform " + + expandedTransform); + } + inputReplacements.put(entry.getValue(), newOutput); + } + } + + // The list of obsolete (overridden) transforms that should be removed from the pipeline + // produced by this method. + List transformsToRemove = new ArrayList<>(); + recursivelyFindSubTransforms( + transformId, runnerAPIpipeline.getComponents(), transformsToRemove); + + Map updatedExpandedTransformMap = + expandedComponents.getTransformsMap().entrySet().stream() + .filter( + entry -> { + // Do not include already overridden transforms. + return !transformsToRemove.contains(entry.getKey()); + }) + .collect( + Collectors.toMap( + entry -> entry.getKey(), + entry -> { + // Fix inputs + Map inputsMap = entry.getValue().getInputsMap(); + PTransform.Builder transformBuilder = entry.getValue().toBuilder(); + if (!Collections.disjoint(inputsMap.values(), inputReplacements.keySet())) { + Map updatedInputsMap = new HashMap<>(); + for (Map.Entry inputEntry : inputsMap.entrySet()) { + String updaterValue = + inputReplacements.containsKey(inputEntry.getValue()) + ? inputReplacements.get(inputEntry.getValue()) + : inputEntry.getValue(); + updatedInputsMap.put(inputEntry.getKey(), updaterValue); + } + transformBuilder.clearInputs(); + transformBuilder.putAllInputs(updatedInputsMap); + } + return transformBuilder.build(); + })); + + newComponentsBuilder.clearTransforms(); + newComponentsBuilder.putAllTransforms(updatedExpandedTransformMap); + newComponentsBuilder.putTransforms(transformId, expandedTransform); + + RunnerApi.Pipeline.Builder newRunnerAPIPipelineBuilder = runnerAPIpipeline.toBuilder(); + newRunnerAPIPipelineBuilder.clearComponents(); + newRunnerAPIPipelineBuilder.setComponents(newComponentsBuilder.build()); + + newRunnerAPIPipelineBuilder.addAllRequirements(expandedRequirements); + + return newRunnerAPIPipelineBuilder.build(); + } + + private static void recursivelyFindSubTransforms( + String transformId, RunnerApi.Components components, List results) { + results.add(transformId); + PTransform transform = components.getTransformsMap().get(transformId); + if (transform == null) { + throw new IllegalArgumentException("Could not find a transform with id " + transformId); + } + List subTransforms = transform.getSubtransformsList(); + if (subTransforms != null) { + for (String subTransformId : subTransforms) { + recursivelyFindSubTransforms(subTransformId, components, results); + } + } + } + + private static int findAvailablePort() throws IOException { + ServerSocket s = new ServerSocket(0); + try { + return s.getLocalPort(); + } finally { + s.close(); + try { + // Some systems don't free the port for future use immediately. + Thread.sleep(100); + } catch (InterruptedException exn) { + // ignore + } + } + } + + @Override + public void close() throws Exception { + clientFactory.close(); + } +} diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java index 1b3aa50c7b44..294d89308a31 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.Window.Assign; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.InvalidProtocolBufferException; import org.checkerframework.checker.nullness.qual.Nullable; @@ -47,7 +46,7 @@ public class WindowIntoTranslation { static class WindowAssignTranslator implements TransformPayloadTranslator> { @Override - public String getUrn(Assign transform) { + public String getUrn() { return PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN; } @@ -116,7 +115,7 @@ public static TransformPayloadTranslator create() { private WindowIntoPayloadTranslator() {} @Override - public String getUrn(Window.Assign transform) { + public String getUrn() { return PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java index cce140536114..3a23ed073776 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java @@ -276,7 +276,7 @@ public boolean isRunnerDeterminedSharding() { static class WriteFilesTranslator implements TransformPayloadTranslator> { @Override - public String getUrn(WriteFiles transform) { + public String getUrn() { return WRITE_FILES_TRANSFORM_URN; } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java new file mode 100644 index 000000000000..6620e780bc16 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction; + +import static org.junit.Assert.assertEquals; + +import com.google.auto.service.AutoService; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.nio.charset.Charset; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.model.expansion.v1.ExpansionApi; +import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.ToString; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for TransformServiceBasedOverride. */ +@RunWith(JUnit4.class) +public class TransformUpgraderTest { + static class TestTransform extends PTransform, PCollection> { + private int testParam; + + public TestTransform(int testParam) { + this.testParam = testParam; + } + + @Override + public PCollection expand(PCollection input) { + return input.apply( + MapElements.via( + new SimpleFunction() { + @Override + public Integer apply(Integer input) { + return input * testParam; + } + })); + } + + public Integer getTestParam() { + return testParam; + } + } + + static class TestTransformPayloadTranslator + implements PTransformTranslation.TransformPayloadTranslator { + + static final String URN = "beam:transform:test:transform_to_update"; + + Schema configRowSchema = Schema.builder().addInt32Field("multiplier").build(); + + @Override + public String getUrn() { + return URN; + } + + @Override + public TestTransform fromConfigRow(Row configRow) { + return new TestTransform(configRow.getInt32("multiplier")); + } + + @Override + public Row toConfigRow(TestTransform transform) { + return Row.withSchema(configRowSchema) + .withFieldValue("multiplier", transform.getTestParam()) + .build(); + } + + @Override + public RunnerApi.@Nullable FunctionSpec translate( + AppliedPTransform application, SdkComponents components) + throws IOException { + + int testParam = application.getTransform().getTestParam(); + + FunctionSpec.Builder specBuilder = FunctionSpec.newBuilder(); + specBuilder.setUrn(getUrn()); + + ByteStringOutputStream byteStringOut = new ByteStringOutputStream(); + ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteStringOut); + objectOutputStream.writeObject(testParam); + objectOutputStream.flush(); + specBuilder.setPayload(byteStringOut.toByteString()); + + return specBuilder.build(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class Registrar implements TransformPayloadTranslatorRegistrar { + @Override + public Map, TestTransformPayloadTranslator> + getTransformPayloadTranslators() { + return Collections.singletonMap(TestTransform.class, new TestTransformPayloadTranslator()); + } + } + + static class TestTransform2 extends TestTransform { + public TestTransform2(int testParam) { + super(testParam); + } + } + + static class TestTransformPayloadTranslator2 extends TestTransformPayloadTranslator { + static final String URN = "beam:transform:test:transform_to_update2"; + + @Override + public String getUrn() { + return URN; + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class Registrar2 implements TransformPayloadTranslatorRegistrar { + @Override + public Map, TestTransformPayloadTranslator2> + getTransformPayloadTranslators() { + return Collections.singletonMap(TestTransform2.class, new TestTransformPayloadTranslator2()); + } + } + + static class TestExpansionServiceClientFactory implements ExpansionServiceClientFactory { + ExpansionApi.ExpansionResponse response; + + @Override + public ExpansionServiceClient getExpansionServiceClient( + Endpoints.ApiServiceDescriptor endpoint) { + return new ExpansionServiceClient() { + @Override + public ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest request) { + RunnerApi.Components.Builder responseComponents = request.getComponents().toBuilder(); + RunnerApi.PTransform transformToUpgrade = + request.getComponents().getTransformsMap().get("TransformUpgraderTest-TestTransform"); + ByteString alreadyUpgraded = ByteString.empty(); + try { + alreadyUpgraded = transformToUpgrade.getAnnotationsOrThrow("already_upgraded"); + } catch (Exception e) { + // Ignore + } + if (!alreadyUpgraded.isEmpty()) { + transformToUpgrade = + request + .getComponents() + .getTransformsMap() + .get("TransformUpgraderTest-TestTransform2"); + } + if (!transformToUpgrade + .getSpec() + .getUrn() + .equals(request.getTransform().getSpec().getUrn())) { + throw new RuntimeException("Could not find a valid transform to upgrade"); + } + + Integer oldParam; + try { + ByteArrayInputStream byteArrayInputStream = + new ByteArrayInputStream(transformToUpgrade.getSpec().getPayload().toByteArray()); + ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); + oldParam = (Integer) objectInputStream.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + RunnerApi.PTransform.Builder upgradedTransform = transformToUpgrade.toBuilder(); + FunctionSpec.Builder specBuilder = upgradedTransform.getSpecBuilder(); + + ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream(); + try { + ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteStringOutputStream); + objectOutputStream.writeObject(oldParam * 2); + objectOutputStream.flush(); + specBuilder.setPayload(byteStringOutputStream.toByteString()); + } catch (IOException e) { + throw new RuntimeException(e); + } + + upgradedTransform.setSpec(specBuilder.build()); + upgradedTransform.putAnnotations( + "already_upgraded", + ByteString.copyFrom("dummyvalue".getBytes(Charset.defaultCharset()))); + + response = + ExpansionApi.ExpansionResponse.newBuilder() + .setComponents(responseComponents.build()) + .setTransform(upgradedTransform.build()) + .build(); + return response; + } + + @Override + public ExpansionApi.DiscoverSchemaTransformResponse discover( + ExpansionApi.DiscoverSchemaTransformRequest request) { + return null; + } + + @Override + public void close() throws Exception { + // do nothing + } + }; + } + + @Override + public void close() throws Exception { + // do nothing + } + } + + private void validateTestParam(RunnerApi.PTransform updatedTestTransform, Integer expectedValue) { + Integer updatedParam; + try { + ByteArrayInputStream byteArrayInputStream = + new ByteArrayInputStream(updatedTestTransform.getSpec().getPayload().toByteArray()); + ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); + updatedParam = (Integer) objectInputStream.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + assertEquals(Integer.valueOf(expectedValue), updatedParam); + } + + @Test + public void testTransformUpgrade() throws Exception { + Pipeline pipeline = Pipeline.create(); + pipeline + .apply(Create.of(1, 2, 3)) + .apply(new TestTransform(2)) + .apply(ToString.elements()) + .apply(TextIO.write().to("dummyfilename")); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, false); + ExternalTranslationOptions options = + PipelineOptionsFactory.create().as(ExternalTranslationOptions.class); + List urnsToOverride = ImmutableList.of(TestTransformPayloadTranslator.URN); + options.setTransformsToOverride(urnsToOverride); + options.setTransformServiceAddress("dummyaddress"); + + RunnerApi.Pipeline upgradedPipelineProto = + TransformUpgrader.of(new TestExpansionServiceClientFactory()) + .upgradeTransformsViaTransformService(pipelineProto, urnsToOverride, options); + + RunnerApi.PTransform upgradedTransform = + upgradedPipelineProto + .getComponents() + .getTransformsMap() + .get("TransformUpgraderTest-TestTransform"); + + validateTestParam(upgradedTransform, 4); + } + + @Test + public void testTransformUpgradeMultipleOccurrences() throws Exception { + Pipeline pipeline = Pipeline.create(); + pipeline + .apply(Create.of(1, 2, 3)) + .apply(new TestTransform(2)) + .apply(ToString.elements()) + .apply(TextIO.write().to("dummyfilename")); + pipeline + .apply(Create.of(1, 2, 3)) + .apply(new TestTransform(2)) + .apply(ToString.elements()) + .apply(TextIO.write().to("dummyfilename")); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, false); + ExternalTranslationOptions options = + PipelineOptionsFactory.create().as(ExternalTranslationOptions.class); + List urnsToOverride = ImmutableList.of(TestTransformPayloadTranslator.URN); + options.setTransformsToOverride(urnsToOverride); + options.setTransformServiceAddress("dummyaddress"); + + RunnerApi.Pipeline upgradedPipelineProto = + TransformUpgrader.of(new TestExpansionServiceClientFactory()) + .upgradeTransformsViaTransformService(pipelineProto, urnsToOverride, options); + + RunnerApi.PTransform upgradedTransform1 = + upgradedPipelineProto + .getComponents() + .getTransformsMap() + .get("TransformUpgraderTest-TestTransform"); + validateTestParam(upgradedTransform1, 4); + + RunnerApi.PTransform upgradedTransform2 = + upgradedPipelineProto + .getComponents() + .getTransformsMap() + .get("TransformUpgraderTest-TestTransform2"); + validateTestParam(upgradedTransform2, 4); + } + + @Test + public void testTransformUpgradeMultipleURNs() throws Exception { + Pipeline pipeline = Pipeline.create(); + pipeline + .apply(Create.of(1, 2, 3)) + .apply(new TestTransform(2)) + .apply(ToString.elements()) + .apply(TextIO.write().to("dummyfilename")); + pipeline + .apply(Create.of(1, 2, 3)) + .apply(new TestTransform2(2)) + .apply(ToString.elements()) + .apply(TextIO.write().to("dummyfilename")); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, false); + ExternalTranslationOptions options = + PipelineOptionsFactory.create().as(ExternalTranslationOptions.class); + List urnsToOverride = + ImmutableList.of(TestTransformPayloadTranslator.URN, TestTransformPayloadTranslator2.URN); + options.setTransformsToOverride(urnsToOverride); + options.setTransformServiceAddress("dummyaddress"); + + RunnerApi.Pipeline upgradedPipelineProto = + TransformUpgrader.of(new TestExpansionServiceClientFactory()) + .upgradeTransformsViaTransformService(pipelineProto, urnsToOverride, options); + + RunnerApi.PTransform upgradedTransform1 = + upgradedPipelineProto + .getComponents() + .getTransformsMap() + .get("TransformUpgraderTest-TestTransform"); + validateTestParam(upgradedTransform1, 4); + + RunnerApi.PTransform upgradedTransform2 = + upgradedPipelineProto + .getComponents() + .getTransformsMap() + .get("TransformUpgraderTest-TestTransform2"); + validateTestParam(upgradedTransform2, 4); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index b725bfbb8d40..6d42d0c3b485 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -1446,7 +1446,7 @@ private static class CreateStreamingFlinkViewPayloadTranslator private CreateStreamingFlinkViewPayloadTranslator() {} @Override - public String getUrn(CreateStreamingFlinkView.CreateFlinkPCollectionView transform) { + public String getUrn() { return CreateStreamingFlinkView.CREATE_STREAMING_FLINK_VIEW_URN; } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 17aea34045ff..26548038a1df 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -2572,6 +2572,12 @@ public String getUrn(PTransform transform) { return "dataflow_stub:" + transform.getClass().getName(); } + @Override + public String getUrn() { + throw new UnsupportedOperationException( + "URN of DataflowPayloadTranslator depends on the transform. Please use 'getUrn(PTransform transform)' instead."); + } + @Override public RunnerApi.FunctionSpec translate( AppliedPTransform> application, SdkComponents components) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java index facbbb3f1b44..140858d88c04 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java @@ -157,7 +157,7 @@ public static PTransformTranslation.TransformPayloadTranslator create() { private PayloadTranslator() {} @Override - public String getUrn(ParDoSingle transform) { + public String getUrn() { return PAR_DO_TRANSFORM_URN; } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 3c50ae6019f8..078f25e0e38e 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -1632,7 +1632,7 @@ public PCollection expand(PCollection input) { private static class TestTransformTranslator implements TransformPayloadTranslator { @Override - public String getUrn(TestTransform transform) { + public String getUrn() { return "test_transform"; } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPublishView.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPublishView.java index 9a50d3d579ac..a3ebbffef9a8 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPublishView.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPublishView.java @@ -59,7 +59,7 @@ static class SamzaPublishViewPayloadTranslator SamzaPublishViewPayloadTranslator() {} @Override - public String getUrn(SamzaPublishView transform) { + public String getUrn() { return SAMZA_PUBLISH_VIEW_URN; } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index eaa267375db3..266b67798a22 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -636,7 +636,7 @@ private static class SparkConsoleIOWriteUnboundedPayloadTranslator ConsoleIO.Write.Unbound> { @Override - public String getUrn(ConsoleIO.Write.Unbound transform) { + public String getUrn() { return ConsoleIO.Write.Unbound.TRANSFORM_URN; } } @@ -645,7 +645,7 @@ private static class SparkCreateStreamPayloadTranslator extends PTransformTranslation.TransformPayloadTranslator.NotSerializable> { @Override - public String getUrn(CreateStream transform) { + public String getUrn() { return CreateStream.TRANSFORM_URN; } } diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index 6b52f8d1245e..ec53e3f11e43 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -18,10 +18,12 @@ package org.apache.beam.sdk.expansion.service; import static org.apache.beam.runners.core.construction.BeamUrns.getUrn; +import static org.apache.beam.runners.core.construction.PTransformTranslation.READ_TRANSFORM_URN; import static org.apache.beam.runners.core.construction.resources.PipelineResources.detectClassPathResourcesToStage; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import com.google.auto.service.AutoService; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -45,10 +47,12 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.runners.core.construction.Environments; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.RehydratedComponents; import org.apache.beam.runners.core.construction.SdkComponents; import org.apache.beam.runners.core.construction.SplittableParDo; +import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar; import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -89,8 +93,10 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Converter; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -178,6 +184,65 @@ public List getDependencies( } } + List deprecatedTransformURNs = ImmutableList.of(READ_TRANSFORM_URN); + for (TransformPayloadTranslatorRegistrar registrar : + ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) { + for (Map.Entry, ? extends TransformPayloadTranslator> + entry : registrar.getTransformPayloadTranslators().entrySet()) { + @Initialized TransformPayloadTranslator translator = entry.getValue(); + if (translator == null) { + continue; + } + + String urn; + try { + urn = translator.getUrn(); + if (urn == null) { + LOG.debug( + "Could not load the TransformPayloadTranslator " + + translator + + " to the Expansion Service since it did not produce a unique URN."); + continue; + } + } catch (Exception e) { + LOG.info( + "Could not load the TransformPayloadTranslator " + + translator + + " to the Expansion Service."); + continue; + } + + if (deprecatedTransformURNs.contains(urn)) { + continue; + } + final String finalUrn = urn; + TransformProvider transformProvider = + spec -> { + try { + ExternalConfigurationPayload payload = + ExternalConfigurationPayload.parseFrom(spec.getPayload()); + Row configRow = + RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema())) + .decode(new ByteArrayInputStream(payload.getPayload().toByteArray())); + PTransform transformFromRow = translator.fromConfigRow(configRow); + if (transformFromRow != null) { + return transformFromRow; + } else { + throw new RuntimeException( + String.format( + "A transform cannot be initiated using the provided config row %s and the TransformPayloadTranslator %s", + configRow, translator)); + } + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to build transform %s from spec %s", finalUrn, spec), + e); + } + }; + builder.put(finalUrn, transformProvider); + } + } + return builder.build(); } diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle index f157cbadee57..25d6b2ac4040 100644 --- a/sdks/java/harness/build.gradle +++ b/sdks/java/harness/build.gradle @@ -29,6 +29,7 @@ dependencies { // :sdks:java:core and transitive dependencies provided project(path: ":model:pipeline", configuration: "shadow") provided project(path: ":sdks:java:core", configuration: "shadow") + provided project(path: ":sdks:java:transform-service:launcher", configuration: "shadow") provided library.java.joda_time provided library.java.slf4j_api provided library.java.vendored_grpc_1_54_0 @@ -80,6 +81,7 @@ dependencies { implementation project(":runners:core-construction-java") implementation project(":runners:core-java") implementation project(":sdks:java:fn-execution") + permitUnusedDeclared project(path: ":sdks:java:transform-service:launcher") testImplementation library.java.junit testImplementation library.java.mockito_core shadowTestRuntimeClasspath project(path: ":sdks:java:core", configuration: "shadowTest") diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java index c8214529d580..4722a3833fa9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java @@ -52,10 +52,7 @@ static class PubSubReadPayloadTranslator implements TransformPayloadTranslator> { @Override - public String getUrn(Read.Unbounded transform) { - if (!(transform.getSource() instanceof PubsubUnboundedSource.PubsubSource)) { - return null; - } + public String getUrn() { return PTransformTranslation.PUBSUB_READ; } @@ -106,7 +103,7 @@ public RunnerApi.FunctionSpec translate( static class PubSubWritePayloadTranslator implements TransformPayloadTranslator { @Override - public String getUrn(PubsubUnboundedSink.PubsubSink transform) { + public String getUrn() { return PTransformTranslation.PUBSUB_WRITE; } @@ -140,7 +137,7 @@ public RunnerApi.FunctionSpec translate( static class PubSubDynamicWritePayloadTranslator implements TransformPayloadTranslator { @Override - public String getUrn(PubsubUnboundedSink.PubsubDynamicSink transform) { + public String getUrn() { return PTransformTranslation.PUBSUB_WRITE_DYNAMIC; }