Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade transforms without upgrading the pipelines #28210

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runners/core-construction-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":sdks:java:extensions:avro")
implementation project(path: ":sdks:java:fn-execution")
implementation project(path: ":sdks:java:transform-service:launcher")
implementation library.java.vendored_grpc_1_54_0
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.classgraph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static class CombinePerKeyPayloadTranslator
private CombinePerKeyPayloadTranslator() {}

@Override
public String getUrn(Combine.PerKey<?, ?, ?> transform) {
public String getUrn() {
return COMBINE_PER_KEY_TRANSFORM_URN;
}

Expand Down Expand Up @@ -108,7 +108,7 @@ public static class CombineGloballyPayloadTranslator
private CombineGloballyPayloadTranslator() {}

@Override
public String getUrn(Combine.Globally<?, ?> transform) {
public String getUrn() {
return COMBINE_GLOBALLY_TRANSFORM_URN;
}

Expand Down Expand Up @@ -165,7 +165,7 @@ public static class CombineGroupedValuesPayloadTranslator
private CombineGroupedValuesPayloadTranslator() {}

@Override
public String getUrn(Combine.GroupedValues<?, ?, ?> transform) {
public String getUrn() {
return COMBINE_GROUPED_VALUES_TRANSFORM_URN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static <ElemT, ViewT> PCollectionView<ViewT> getView(
static class CreatePCollectionViewTranslator
implements TransformPayloadTranslator<View.CreatePCollectionView<?, ?>> {
@Override
public String getUrn(View.CreatePCollectionView<?, ?> transform) {
public String getUrn() {
return PTransformTranslation.CREATE_VIEW_TRANSFORM_URN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public OutputT expand(InputT input) {
response
.getComponents()
.toBuilder()
.putAllEnvironments(resolveArtifacts(newEnvironmentsWithDependencies))
.putAllEnvironments(resolveArtifacts(newEnvironmentsWithDependencies, endpoint))
.build();
expandedTransform = response.getTransform();
expandedRequirements = response.getRequirementsList();
Expand Down Expand Up @@ -338,8 +338,8 @@ public OutputT expand(InputT input) {
return toOutputCollection(outputMapBuilder.build());
}

private Map<String, RunnerApi.Environment> resolveArtifacts(
Map<String, RunnerApi.Environment> environments) {
static Map<String, RunnerApi.Environment> resolveArtifacts(
Map<String, RunnerApi.Environment> environments, Endpoints.ApiServiceDescriptor endpoint) {
if (environments.size() == 0) {
return environments;
}
Expand Down Expand Up @@ -367,7 +367,7 @@ private Map<String, RunnerApi.Environment> resolveArtifacts(
}
}

private RunnerApi.Environment resolveArtifacts(
private static RunnerApi.Environment resolveArtifacts(
ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub retrievalStub,
RunnerApi.Environment environment)
throws IOException {
Expand All @@ -378,7 +378,7 @@ private RunnerApi.Environment resolveArtifacts(
.build();
}

private List<RunnerApi.ArtifactInformation> resolveArtifacts(
private static List<RunnerApi.ArtifactInformation> resolveArtifacts(
ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub retrievalStub,
List<RunnerApi.ArtifactInformation> artifacts)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;

import java.util.List;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHintsOptions.EmptyListDefault;

public interface ExternalTranslationOptions extends PipelineOptions {

@Description("Set of URNs of transforms to be overriden using the transform service.")
@Default.InstanceFactory(EmptyListDefault.class)
List<String> getTransformsToOverride();

void setTransformsToOverride(List<String> transformsToOverride);

@Description("Address of an already available transform service.")
String getTransformServiceAddress();

void setTransformServiceAddress(String transformServiceAddress);

@Description("An available Beam version which will be used to start a transform service.")
String getTransformServiceBeamVersion();

void setTransformServiceBeamVersion(String transformServiceBeamVersion);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;

import com.google.auto.service.AutoService;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

/** A registrar for ExternalTranslationOptions. */
@AutoService(PipelineOptionsRegistrar.class)
@Internal
public class ExternalTranslationOptionsRegistrar implements PipelineOptionsRegistrar {
@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return ImmutableList.<Class<? extends PipelineOptions>>builder()
.add(ExternalTranslationOptions.class)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static TransformPayloadTranslator create() {
private FlattenTranslator() {}

@Override
public String getUrn(Flatten.PCollections<?> transform) {
public String getUrn() {
return PTransformTranslation.FLATTEN_TRANSFORM_URN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class GroupByKeyTranslation {

static class GroupByKeyTranslator implements TransformPayloadTranslator<GroupByKey<?, ?>> {
@Override
public String getUrn(GroupByKey<?, ?> transform) {
public String getUrn() {
return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class GroupIntoBatchesTranslation {
static class GroupIntoBatchesTranslator
implements TransformPayloadTranslator<GroupIntoBatches<?, ?>> {
@Override
public String getUrn(GroupIntoBatches<?, ?> transform) {
public String getUrn() {
return PTransformTranslation.GROUP_INTO_BATCHES_URN;
}

Expand All @@ -61,7 +61,7 @@ public RunnerApi.FunctionSpec translate(
static class ShardedGroupIntoBatchesTranslator
implements TransformPayloadTranslator<GroupIntoBatches<?, ?>.WithShardedKey> {
@Override
public String getUrn(GroupIntoBatches<?, ?>.WithShardedKey transform) {
public String getUrn() {
return PTransformTranslation.GROUP_INTO_BATCHES_WITH_SHARDED_KEY_URN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
public class ImpulseTranslation {
private static class ImpulseTranslator implements TransformPayloadTranslator<Impulse> {
@Override
public String getUrn(Impulse transform) {
public String getUrn() {
return PTransformTranslation.IMPULSE_TRANSFORM_URN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -37,14 +38,18 @@
import org.apache.beam.runners.core.construction.ExternalTranslation.ExternalTranslator;
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.common.ReflectHelpers.ObjectsClassComparator;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
Expand All @@ -54,6 +59,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Utilities for converting {@link PTransform PTransforms} to {@link RunnerApi Runner API protocol
Expand All @@ -65,10 +72,14 @@
"keyfor"
}) // TODO(https://github.com/apache/beam/issues/20497)
public class PTransformTranslation {

private static final Logger LOG = LoggerFactory.getLogger(PTransformTranslation.class);

// We specifically copy the values here so that they can be used in switch case statements
// and we validate that the value matches the actual URN in the static block below.

// Primitives
public static final String CREATE_TRANSFORM_URN = "beam:transform:create:v1";
public static final String PAR_DO_TRANSFORM_URN = "beam:transform:pardo:v1";
public static final String FLATTEN_TRANSFORM_URN = "beam:transform:flatten:v1";
public static final String GROUP_BY_KEY_TRANSFORM_URN = "beam:transform:group_by_key:v1";
Expand All @@ -83,6 +94,10 @@ public class PTransformTranslation {
public static final ImmutableSet<String> RUNNER_IMPLEMENTED_TRANSFORMS =
ImmutableSet.of(GROUP_BY_KEY_TRANSFORM_URN, IMPULSE_TRANSFORM_URN);

public static final String CONFIG_ROW_KEY = "config_row";

public static final String CONFIG_ROW_SCHEMA_KEY = "config_row_schema";

// DeprecatedPrimitives
/**
* @deprecated SDKs should move away from creating `Read` transforms and migrate to using Impulse
Expand Down Expand Up @@ -435,10 +450,9 @@ public RunnerApi.PTransform translate(
RunnerApi.PTransform.Builder transformBuilder =
translateAppliedPTransform(appliedPTransform, subtransforms, components);

FunctionSpec spec =
KNOWN_PAYLOAD_TRANSLATORS
.get(appliedPTransform.getTransform().getClass())
.translate(appliedPTransform, components);
TransformPayloadTranslator payloadTranslator =
KNOWN_PAYLOAD_TRANSLATORS.get(appliedPTransform.getTransform().getClass());
FunctionSpec spec = payloadTranslator.translate(appliedPTransform, components);
if (spec != null) {
transformBuilder.setSpec(spec);

Expand All @@ -461,6 +475,38 @@ public RunnerApi.PTransform translate(
}
}
}

Row configRow = null;
try {
configRow = payloadTranslator.toConfigRow(appliedPTransform.getTransform());
} catch (UnsupportedOperationException e) {
// Optional toConfigRow() has not been implemented. We can just ignore.
} catch (Exception e) {
LOG.warn(
"Could not attach the config row for transform "
+ appliedPTransform.getTransform().getName()
+ ": "
+ e);
// Ignoring the error and continuing with the translation since attaching config rows is
// optional.
}
if (configRow != null) {
ByteStringOutputStream rowOutputStream = new ByteStringOutputStream();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a CoderUtils that does this for you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

try {
RowCoder.of(configRow.getSchema()).encode(configRow, rowOutputStream);
} catch (IOException e) {
throw new RuntimeException(e);
}
transformBuilder.putAnnotations(CONFIG_ROW_KEY, rowOutputStream.toByteString());

ByteStringOutputStream schemaOutputStream = new ByteStringOutputStream();
try (ObjectOutputStream schemaObjOut = new ObjectOutputStream(schemaOutputStream)) {
schemaObjOut.writeObject(SchemaTranslation.schemaToProto(configRow.getSchema(), true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SchemaTranslation.schemaToProto is a Proto object. Just use toByteArray to serialize this proto. You can use ByteString.copyFrom to go from byte[] to ByteString and not have to deal with streams at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

schemaObjOut.flush();
transformBuilder.putAnnotations(CONFIG_ROW_SCHEMA_KEY, schemaOutputStream.toByteString());
}
}

return transformBuilder.build();
}
}
Expand Down Expand Up @@ -508,14 +554,63 @@ static RunnerApi.PTransform.Builder translateAppliedPTransform(
*
* <p>When going to a protocol buffer message, the translator produces a payload corresponding to
* the Java representation while registering components that payload references.
*
* <p>Also, provides methods for generating a Row-based constructor config for the transform that
* can be later used to re-construct the transform.
*/
public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
String getUrn(T transform);

/**
* Provides a unique URN for transforms represented by this {@code TransformPayloadTranslator}.
*/
String getUrn();

/**
* Same as {@link #getUrn()} but the returned URN may depend on the transform provided.
*
* <p>Only override this if the same {@code TransformPayloadTranslator} used for multiple
* transforms. Otherwise, use {@link #getUrn()}.
*/
default String getUrn(T transform) {
return getUrn();
}

/** */
/**
* Translates the given transform represented by the provided {@code AppliedPTransform} to a
* {@code FunctionSpec} with a URN and a payload.
*
* @param application an {@code AppliedPTransform} that includes the transform to be expanded.
* @param components components of the pipeline that includes the transform.
* @return a generated spec for the transform to be included in the pipeline proto. If return
* value is null, transform should include an empty spec.
* @throws IOException
*/
@Nullable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document what a null return value means.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components)
throws IOException;

/**
* Generates a Row-based construction configuration for the provided transform.
*
* @param transform a transform represented by the current {@code TransformPayloadTranslator}.
* @return
*/
default Row toConfigRow(T transform) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Construts a transform from a provided Row-based construction configuration.
*
* @param configRow a construction configuration similar to what would be generated by the
* {@link #toConfigRow(PTransform)} method.
* @return a transform represented by the current {@code TransformPayloadTranslator}.
*/
default T fromConfigRow(Row configRow) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* A {@link TransformPayloadTranslator} for transforms that contain no references to components,
* so they do not need a specialized rehydration.
Expand All @@ -526,7 +621,7 @@ abstract class NotSerializable<T extends PTransform<?, ?>>
public static NotSerializable<?> forUrn(final String urn) {
return new NotSerializable<PTransform<?, ?>>() {
@Override
public String getUrn(PTransform<?, ?> transform) {
public String getUrn() {
return urn;
}
};
Expand Down
Loading
Loading