-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade transforms without upgrading the pipelines #28210
Upgrade transforms without upgrading the pipelines #28210
Conversation
@@ -62,6 +64,11 @@ private CombinePerKeyPayloadTranslator() {} | |||
|
|||
@Override | |||
public String getUrn(Combine.PerKey<?, ?, ?> transform) { | |||
return getUrn(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be in the base class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, moved.
@@ -84,6 +91,16 @@ public FunctionSpec translate( | |||
} | |||
} | |||
|
|||
@Override | |||
public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if these should be in an optional interface, or if default null-returning implementations should be provided. (We could argue that it should be strongly encouraged.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these need a separate interface but good point regarding providing default implementations. Made "toConfigRow" and "fromConfigRow" default methods of the interface so that sub-classes can choose to not implement them.
@Override | ||
public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) { | ||
Impulse impulse = (Impulse) pTransform; | ||
System.out.println("Found impulse transform: " + impulse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove debugging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
ExpandableTransform externalTransform = | ||
External.of(urn, payload.toByteArray(), serviceAddress); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serviceAddress might be unset. (Perhaps don't set it to null above to let the compiler check this for you.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a check to confirm that it's not null (either should be provided by user or should be the address of the Transform Service we start here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Also this logic was moved to PipelineTranslator).
} | ||
externalTransform.expand(input); | ||
|
||
return externalTransform.getExpandedTransform().getSpec(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do the outputs get wired up correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added logic to wire inputs and outputs correctly (in PipelineTranslator now).
.../expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
Show resolved
Hide resolved
continue; | ||
} | ||
|
||
String urn = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can leave this unset and the compiler is smart enough to see the assignment or continue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was reverted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still seeing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be done now.
try { | ||
urn = translator.getUrn(); | ||
if (urn == null) { | ||
LOG.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the warning should be softer, that this translator does not have a tranform-independent URN?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, changed to debug and updated.
+ " to the Expansion Service."); | ||
continue; | ||
} | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would this happen? Feels that this is something exceptional enough to propagate up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is general catch all to prevent a single incorrectly implemented PayloadTranslator available in the system from crashing to whole service. I think it's safer to catch such exceptions and continue to load the rest of the transforms (we do the same for Schema-aware transforms). Note that some of these implementations may come from custom/user libraries.
.../expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
Show resolved
Hide resolved
1063fbf
to
c593512
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Robert. PTAL.
@Override | ||
public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) { | ||
Impulse impulse = (Impulse) pTransform; | ||
System.out.println("Found impulse transform: " + impulse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
ExpandableTransform externalTransform = | ||
External.of(urn, payload.toByteArray(), serviceAddress); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a check to confirm that it's not null (either should be provided by user or should be the address of the Transform Service we start here).
} | ||
|
||
ExpandableTransform externalTransform = | ||
External.of(urn, payload.toByteArray(), serviceAddress); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Also this logic was moved to PipelineTranslator).
} | ||
externalTransform.expand(input); | ||
|
||
return externalTransform.getExpandedTransform().getSpec(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added logic to wire inputs and outputs correctly (in PipelineTranslator now).
PTransform<?, ?> transform = appliedPTransform.getTransform(); | ||
|
||
FunctionSpec spec = null; | ||
if (getUrn(transform) != null && urnsToOverride.contains(getUrn(transform))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think translation time is a good fit for this since we are modifying the translated pipeline, but you are right that this has to be done at a higher level since otherwise it's impossible to wire inputs and outputs correctly.
I moved this logic to PipelineTranslator and fixed the wiring.
try { | ||
urn = translator.getUrn(); | ||
if (urn == null) { | ||
LOG.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, changed to debug and updated.
+ " to the Expansion Service."); | ||
continue; | ||
} | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is general catch all to prevent a single incorrectly implemented PayloadTranslator available in the system from crashing to whole service. I think it's safer to catch such exceptions and continue to load the rest of the transforms (we do the same for Schema-aware transforms). Note that some of these implementations may come from custom/user libraries.
@@ -62,6 +64,11 @@ private CombinePerKeyPayloadTranslator() {} | |||
|
|||
@Override | |||
public String getUrn(Combine.PerKey<?, ?, ?> transform) { | |||
return getUrn(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, moved.
@@ -84,6 +91,16 @@ public FunctionSpec translate( | |||
} | |||
} | |||
|
|||
@Override | |||
public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these need a separate interface but good point regarding providing default implementations. Made "toConfigRow" and "fromConfigRow" default methods of the interface so that sub-classes can choose to not implement them.
.../expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java
Show resolved
Hide resolved
Also confirmed that this work for Dataflow by running a pipeline that upgrades the Create transform to a version available in an expansion service. cc: @kennknowles |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
} | ||
|
||
@Override | ||
public Create.@Nullable Values<?> fromConfigRow(Row configRow) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stray @Nullable
(I don't think this should be nullable.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class was reverted.
} | ||
|
||
@Override | ||
public @Nullable Row toConfigRow(Values<?> transform) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be nullable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class was reverted.
import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
||
@SuppressWarnings({ | ||
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid suppressing rawTypes (or at least make it more local if it can't be avoided)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the suppression.
.forEach( | ||
object -> { | ||
encodedElements.add(toByteArray(object)); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're going to be using the new functional API, maybe use streams + collect to list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class was reverted.
configRow.getArray("values").stream() | ||
.map(bytesValue -> fromByteArray((byte[]) bytesValue)) | ||
.collect(Collectors.toList())); | ||
byte[] serializedCoder = configRow.getBytes("serialized_coder"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would coder ever be null? Preferably we simply require it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class was reverted.
SdkComponents.create( | ||
runnerAPIpipeline.getComponents(), runnerAPIpipeline.getRequirementsList()); | ||
String updatedTransformId = | ||
updatedComponents.registerPTransform(updatedAppliedPTransform, Collections.emptyList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections.emptyList()
doesn't seem correct if this is a composite transform.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted.
// old one. | ||
// Also recording the newly generated id of the old (overridden) transform in the | ||
// updatedRunnerApiComponents. | ||
Map<String, Map<String, String>> transformInputUpdates = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you document what transformInputUpdates
contains? String isn't very descriptive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted.
+ oldTransformIds.size() | ||
+ " were updated."); | ||
} | ||
String oldTransformId = oldTransformIds.get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we already know exactly what transform we're updating? This seems a round-about way of getting it, and will break if a transform doesn't have outputs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted.
updaterRootTransformIds.addAll(runnerAPIpipeline.getRootTransformIdsList()); | ||
if (updaterRootTransformIds.contains(oldTransformId)) { | ||
updaterRootTransformIds.remove(oldTransformId); | ||
updaterRootTransformIds.add(updatedTransformId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose the order does not matter here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated logic should preserve the order.
// Will contain the outputs of the upgraded transform. | ||
Map<TupleTag<?>, PCollection<?>> newOutputs = new HashMap<>(); | ||
|
||
if (output instanceof PCollectionTuple) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code deals with both the Pipeline object and the Pipeline proto, and it's a bit hard to follow where the source of truth is. My personal preference would be to simply act on the protos consuming the expansion service response directly. Honestly, I don't think that'd be any more complicated than what you're already having to do here already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Updated code (now in TransformUpgrader) to directly update the proto and directly invoke the TransformService using protos instead of going through External.java.
bc993a3
to
d5fd742
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Robert. PTAL.
continue; | ||
} | ||
|
||
String urn = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be done now.
import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
||
@SuppressWarnings({ | ||
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the suppression.
} | ||
|
||
@Override | ||
public @Nullable Row toConfigRow(Values<?> transform) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class was reverted.
.forEach( | ||
object -> { | ||
encodedElements.add(toByteArray(object)); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class was reverted.
encodedElements.add(toByteArray(object)); | ||
}); | ||
|
||
byte[] serializedCoder = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class was reverted.
// old one. | ||
// Also recording the newly generated id of the old (overridden) transform in the | ||
// updatedRunnerApiComponents. | ||
Map<String, Map<String, String>> transformInputUpdates = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted.
+ oldTransformIds.size() | ||
+ " were updated."); | ||
} | ||
String oldTransformId = oldTransformIds.get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted.
updaterRootTransformIds.addAll(runnerAPIpipeline.getRootTransformIdsList()); | ||
if (updaterRootTransformIds.contains(oldTransformId)) { | ||
updaterRootTransformIds.remove(oldTransformId); | ||
updaterRootTransformIds.add(updatedTransformId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated logic should preserve the order.
@@ -2572,6 +2572,11 @@ public String getUrn(PTransform transform) { | |||
return "dataflow_stub:" + transform.getClass().getName(); | |||
} | |||
|
|||
@Override | |||
public String getUrn() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The access modifier of interface method is public and Java doesn't allow reducing access in sub-classes.
With the updated interface, the callers should be able to directly invoke getUrn() unless the sub-class cannot determine the URN without looking at the transform (which should be rare and we should not use the same translator for multiple transform classes.
/** | ||
* Translates the given transform represented by the provided {@code AppliedPTransform} to a | ||
* {@code FunctionSpec} with a URN and a payload. | ||
*/ | ||
@Nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
d0fd5df
to
6a85535
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is looking pretty good. Only some minor comments.
// optional. | ||
} | ||
if (configRow != null) { | ||
ByteStringOutputStream rowOutputStream = new ByteStringOutputStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's a CoderUtils that does this for you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
|
||
ByteStringOutputStream schemaOutputStream = new ByteStringOutputStream(); | ||
try (ObjectOutputStream schemaObjOut = new ObjectOutputStream(schemaOutputStream)) { | ||
schemaObjOut.writeObject(SchemaTranslation.schemaToProto(configRow.getSchema(), true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SchemaTranslation.schemaToProto is a Proto object. Just use toByteArray
to serialize this proto. You can use ByteString.copyFrom to go from byte[] to ByteString and not have to deal with streams at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
RunnerApi.Pipeline updateTransformViaTransformService( | ||
RunnerApi.Pipeline runnerAPIpipeline, | ||
String transformId, | ||
Endpoints.ApiServiceDescriptor transformServiceEndpoint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps create the client once and pass it here, rather than passing the endpoint and re-connecting it every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"DefaultExpansionServiceClientFactory" caches channels for endpoints. So I don't think this is an issue.
*/ | ||
public RunnerApi.Pipeline upgradeTransformsViaTransformService( | ||
RunnerApi.Pipeline pipeline, List<String> urnsToOverride, ExternalTranslationOptions options) | ||
throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid throwing generic Exception. (And below.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
PTransform transformToUpgrade = | ||
runnerAPIpipeline.getComponents().getTransformsMap().get(transformId); | ||
if (transformToUpgrade == null) { | ||
throw new Exception("Could not find a transform with the ID " + transformId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a good candidate for Preconditions.checkNotNull()
. Or throw something like IllegalArgumentException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to throw IllegalArgumentException.
throw new RuntimeException(e); | ||
} | ||
|
||
Row configRow = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You already have configRowBytes
, no need to encode it just to recode it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
clientFactory.getExpansionServiceClient(transformServiceEndpoint).expand(request); | ||
|
||
if (!Strings.isNullOrEmpty(response.getError())) { | ||
throw new IOException(String.format("expansion service error: %s", response.getError())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IOException doesn't seem to be the right thing to throw for a mis-configured transform.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to RuntimeException.
} | ||
} | ||
|
||
String newTransformId = transformId + "_upgraded"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't guarantee this won't be a collision. On the other hand, there's no need to give it a new name at all, just put the new transform in the map at the old transforms name (and all references will remain valid).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
transformBuilder.putAllInputs(updatedInputsMap); | ||
} | ||
|
||
// Fix sub-transforms |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you keep the name, you won't have to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
newComponentsBuilder.putAllTransforms(updatedExpandedTransformMap); | ||
newComponentsBuilder.putTransforms(newTransformId, expandedTransform); | ||
|
||
// We fix the root in case the overridden transform was one of the roots. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. PTAL.
*/ | ||
public RunnerApi.Pipeline upgradeTransformsViaTransformService( | ||
RunnerApi.Pipeline pipeline, List<String> urnsToOverride, ExternalTranslationOptions options) | ||
throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
RunnerApi.Pipeline updateTransformViaTransformService( | ||
RunnerApi.Pipeline runnerAPIpipeline, | ||
String transformId, | ||
Endpoints.ApiServiceDescriptor transformServiceEndpoint) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"DefaultExpansionServiceClientFactory" caches channels for endpoints. So I don't think this is an issue.
PTransform transformToUpgrade = | ||
runnerAPIpipeline.getComponents().getTransformsMap().get(transformId); | ||
if (transformToUpgrade == null) { | ||
throw new Exception("Could not find a transform with the ID " + transformId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to throw IllegalArgumentException.
throw new RuntimeException(e); | ||
} | ||
|
||
Row configRow = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
clientFactory.getExpansionServiceClient(transformServiceEndpoint).expand(request); | ||
|
||
if (!Strings.isNullOrEmpty(response.getError())) { | ||
throw new IOException(String.format("expansion service error: %s", response.getError())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to RuntimeException.
|
||
ByteStringOutputStream schemaOutputStream = new ByteStringOutputStream(); | ||
try (ObjectOutputStream schemaObjOut = new ObjectOutputStream(schemaOutputStream)) { | ||
schemaObjOut.writeObject(SchemaTranslation.schemaToProto(configRow.getSchema(), true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
// optional. | ||
} | ||
if (configRow != null) { | ||
ByteStringOutputStream rowOutputStream = new ByteStringOutputStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
} | ||
} | ||
|
||
String newTransformId = transformId + "_upgraded"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
transformBuilder.putAllInputs(updatedInputsMap); | ||
} | ||
|
||
// Fix sub-transforms |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
newComponentsBuilder.putAllTransforms(updatedExpandedTransformMap); | ||
newComponentsBuilder.putTransforms(newTransformId, expandedTransform); | ||
|
||
// We fix the root in case the overridden transform was one of the roots. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Friendly ping :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! This looks good to me.
Thanks. |
This uses the Beam Transform Service to support upgrading Beam Java transforms used by pipelines without upgrading the Beam version used by the pipelines.
A similar solution can be introduced for other SDKs in the future.
For more details please see #27943.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.