Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade transforms without upgrading the pipelines #28210

Merged

Conversation

chamikaramj
Copy link
Contributor

@chamikaramj chamikaramj commented Aug 29, 2023

This uses the Beam Transform Service to support upgrading Beam Java transforms used by pipelines without upgrading the Beam version used by the pipelines.

A similar solution can be introduced for other SDKs in the future.

For more details please see #27943.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@chamikaramj chamikaramj changed the title Upgrade Java transforms without upgrading the pipelines Upgrade transforms without upgrading the pipelines Aug 29, 2023
@@ -62,6 +64,11 @@ private CombinePerKeyPayloadTranslator() {}

@Override
public String getUrn(Combine.PerKey<?, ?, ?> transform) {
return getUrn();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in the base class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, moved.

@@ -84,6 +91,16 @@ public FunctionSpec translate(
}
}

@Override
public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if these should be in an optional interface, or if default null-returning implementations should be provided. (We could argue that it should be strongly encouraged.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these need a separate interface but good point regarding providing default implementations. Made "toConfigRow" and "fromConfigRow" default methods of the interface so that sub-classes can choose to not implement them.

@Override
public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) {
Impulse impulse = (Impulse) pTransform;
System.out.println("Found impulse transform: " + impulse);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

ExpandableTransform externalTransform =
External.of(urn, payload.toByteArray(), serviceAddress);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serviceAddress might be unset. (Perhaps don't set it to null above to let the compiler check this for you.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a check to confirm that it's not null (either should be provided by user or should be the address of the Transform Service we start here).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Also this logic was moved to PipelineTranslator).

}
externalTransform.expand(input);

return externalTransform.getExpandedTransform().getSpec();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do the outputs get wired up correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logic to wire inputs and outputs correctly (in PipelineTranslator now).

continue;
}

String urn = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can leave this unset and the compiler is smart enough to see the assignment or continue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was reverted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still seeing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be done now.

try {
urn = translator.getUrn();
if (urn == null) {
LOG.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the warning should be softer, that this translator does not have a tranform-independent URN?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, changed to debug and updated.

+ " to the Expansion Service.");
continue;
}
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this happen? Feels that this is something exceptional enough to propagate up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is general catch all to prevent a single incorrectly implemented PayloadTranslator available in the system from crashing to whole service. I think it's safer to catch such exceptions and continue to load the rest of the transforms (we do the same for Schema-aware transforms). Note that some of these implementations may come from custom/user libraries.

@chamikaramj chamikaramj force-pushed the cross_version_transform_upgrade branch from 1063fbf to c593512 Compare September 13, 2023 04:30
@chamikaramj chamikaramj marked this pull request as ready for review September 13, 2023 04:31
Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Robert. PTAL.

@Override
public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) {
Impulse impulse = (Impulse) pTransform;
System.out.println("Found impulse transform: " + impulse);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

ExpandableTransform externalTransform =
External.of(urn, payload.toByteArray(), serviceAddress);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a check to confirm that it's not null (either should be provided by user or should be the address of the Transform Service we start here).

}

ExpandableTransform externalTransform =
External.of(urn, payload.toByteArray(), serviceAddress);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Also this logic was moved to PipelineTranslator).

}
externalTransform.expand(input);

return externalTransform.getExpandedTransform().getSpec();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logic to wire inputs and outputs correctly (in PipelineTranslator now).

PTransform<?, ?> transform = appliedPTransform.getTransform();

FunctionSpec spec = null;
if (getUrn(transform) != null && urnsToOverride.contains(getUrn(transform))) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think translation time is a good fit for this since we are modifying the translated pipeline, but you are right that this has to be done at a higher level since otherwise it's impossible to wire inputs and outputs correctly.

I moved this logic to PipelineTranslator and fixed the wiring.

try {
urn = translator.getUrn();
if (urn == null) {
LOG.info(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, changed to debug and updated.

+ " to the Expansion Service.");
continue;
}
} catch (Exception e) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is general catch all to prevent a single incorrectly implemented PayloadTranslator available in the system from crashing to whole service. I think it's safer to catch such exceptions and continue to load the rest of the transforms (we do the same for Schema-aware transforms). Note that some of these implementations may come from custom/user libraries.

@@ -62,6 +64,11 @@ private CombinePerKeyPayloadTranslator() {}

@Override
public String getUrn(Combine.PerKey<?, ?, ?> transform) {
return getUrn();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, moved.

@@ -84,6 +91,16 @@ public FunctionSpec translate(
}
}

@Override
public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these need a separate interface but good point regarding providing default implementations. Made "toConfigRow" and "fromConfigRow" default methods of the interface so that sub-classes can choose to not implement them.

@chamikaramj
Copy link
Contributor Author

Also confirmed that this work for Dataflow by running a pipeline that upgrades the Create transform to a version available in an expansion service.

https://console.cloud.google.com/dataflow/jobs/us-central1/2023-09-12_15_32_47-10327070264136367924?project=apache-beam-testing

cc: @kennknowles

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

}

@Override
public Create.@Nullable Values<?> fromConfigRow(Row configRow) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stray @Nullable (I don't think this should be nullable.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was reverted.

}

@Override
public @Nullable Row toConfigRow(Values<?> transform) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be nullable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was reverted.

import org.checkerframework.checker.nullness.qual.Nullable;

@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid suppressing rawTypes (or at least make it more local if it can't be avoided)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the suppression.

.forEach(
object -> {
encodedElements.add(toByteArray(object));
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're going to be using the new functional API, maybe use streams + collect to list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was reverted.

configRow.getArray("values").stream()
.map(bytesValue -> fromByteArray((byte[]) bytesValue))
.collect(Collectors.toList()));
byte[] serializedCoder = configRow.getBytes("serialized_coder");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would coder ever be null? Preferably we simply require it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was reverted.

SdkComponents.create(
runnerAPIpipeline.getComponents(), runnerAPIpipeline.getRequirementsList());
String updatedTransformId =
updatedComponents.registerPTransform(updatedAppliedPTransform, Collections.emptyList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.emptyList() doesn't seem correct if this is a composite transform.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted.

// old one.
// Also recording the newly generated id of the old (overridden) transform in the
// updatedRunnerApiComponents.
Map<String, Map<String, String>> transformInputUpdates = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you document what transformInputUpdates contains? String isn't very descriptive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted.

+ oldTransformIds.size()
+ " were updated.");
}
String oldTransformId = oldTransformIds.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we already know exactly what transform we're updating? This seems a round-about way of getting it, and will break if a transform doesn't have outputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted.

updaterRootTransformIds.addAll(runnerAPIpipeline.getRootTransformIdsList());
if (updaterRootTransformIds.contains(oldTransformId)) {
updaterRootTransformIds.remove(oldTransformId);
updaterRootTransformIds.add(updatedTransformId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose the order does not matter here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated logic should preserve the order.

// Will contain the outputs of the upgraded transform.
Map<TupleTag<?>, PCollection<?>> newOutputs = new HashMap<>();

if (output instanceof PCollectionTuple) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code deals with both the Pipeline object and the Pipeline proto, and it's a bit hard to follow where the source of truth is. My personal preference would be to simply act on the protos consuming the expansion service response directly. Honestly, I don't think that'd be any more complicated than what you're already having to do here already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Updated code (now in TransformUpgrader) to directly update the proto and directly invoke the TransformService using protos instead of going through External.java.

@chamikaramj chamikaramj force-pushed the cross_version_transform_upgrade branch 2 times, most recently from bc993a3 to d5fd742 Compare September 20, 2023 23:22
Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Robert. PTAL.

continue;
}

String urn = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be done now.

import org.checkerframework.checker.nullness.qual.Nullable;

@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the suppression.

}

@Override
public @Nullable Row toConfigRow(Values<?> transform) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was reverted.

.forEach(
object -> {
encodedElements.add(toByteArray(object));
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was reverted.

encodedElements.add(toByteArray(object));
});

byte[] serializedCoder =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was reverted.

// old one.
// Also recording the newly generated id of the old (overridden) transform in the
// updatedRunnerApiComponents.
Map<String, Map<String, String>> transformInputUpdates = new HashMap<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted.

+ oldTransformIds.size()
+ " were updated.");
}
String oldTransformId = oldTransformIds.get(0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted.

updaterRootTransformIds.addAll(runnerAPIpipeline.getRootTransformIdsList());
if (updaterRootTransformIds.contains(oldTransformId)) {
updaterRootTransformIds.remove(oldTransformId);
updaterRootTransformIds.add(updatedTransformId);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated logic should preserve the order.

@@ -2572,6 +2572,11 @@ public String getUrn(PTransform transform) {
return "dataflow_stub:" + transform.getClass().getName();
}

@Override
public String getUrn() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The access modifier of interface method is public and Java doesn't allow reducing access in sub-classes.

With the updated interface, the callers should be able to directly invoke getUrn() unless the sub-class cannot determine the URN without looking at the transform (which should be rare and we should not use the same translator for multiple transform classes.

/**
* Translates the given transform represented by the provided {@code AppliedPTransform} to a
* {@code FunctionSpec} with a URN and a payload.
*/
@Nullable
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@chamikaramj chamikaramj force-pushed the cross_version_transform_upgrade branch from d0fd5df to 6a85535 Compare September 21, 2023 21:00
Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is looking pretty good. Only some minor comments.

// optional.
}
if (configRow != null) {
ByteStringOutputStream rowOutputStream = new ByteStringOutputStream();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a CoderUtils that does this for you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.


ByteStringOutputStream schemaOutputStream = new ByteStringOutputStream();
try (ObjectOutputStream schemaObjOut = new ObjectOutputStream(schemaOutputStream)) {
schemaObjOut.writeObject(SchemaTranslation.schemaToProto(configRow.getSchema(), true));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SchemaTranslation.schemaToProto is a Proto object. Just use toByteArray to serialize this proto. You can use ByteString.copyFrom to go from byte[] to ByteString and not have to deal with streams at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

RunnerApi.Pipeline updateTransformViaTransformService(
RunnerApi.Pipeline runnerAPIpipeline,
String transformId,
Endpoints.ApiServiceDescriptor transformServiceEndpoint)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps create the client once and pass it here, rather than passing the endpoint and re-connecting it every time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"DefaultExpansionServiceClientFactory" caches channels for endpoints. So I don't think this is an issue.

*/
public RunnerApi.Pipeline upgradeTransformsViaTransformService(
RunnerApi.Pipeline pipeline, List<String> urnsToOverride, ExternalTranslationOptions options)
throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid throwing generic Exception. (And below.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

PTransform transformToUpgrade =
runnerAPIpipeline.getComponents().getTransformsMap().get(transformId);
if (transformToUpgrade == null) {
throw new Exception("Could not find a transform with the ID " + transformId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good candidate for Preconditions.checkNotNull(). Or throw something like IllegalArgumentException.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to throw IllegalArgumentException.

throw new RuntimeException(e);
}

Row configRow =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You already have configRowBytes, no need to encode it just to recode it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

clientFactory.getExpansionServiceClient(transformServiceEndpoint).expand(request);

if (!Strings.isNullOrEmpty(response.getError())) {
throw new IOException(String.format("expansion service error: %s", response.getError()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IOException doesn't seem to be the right thing to throw for a mis-configured transform.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to RuntimeException.

}
}

String newTransformId = transformId + "_upgraded";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't guarantee this won't be a collision. On the other hand, there's no need to give it a new name at all, just put the new transform in the map at the old transforms name (and all references will remain valid).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

transformBuilder.putAllInputs(updatedInputsMap);
}

// Fix sub-transforms
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you keep the name, you won't have to do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

newComponentsBuilder.putAllTransforms(updatedExpandedTransformMap);
newComponentsBuilder.putTransforms(newTransformId, expandedTransform);

// We fix the root in case the overridden transform was one of the roots.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. PTAL.

*/
public RunnerApi.Pipeline upgradeTransformsViaTransformService(
RunnerApi.Pipeline pipeline, List<String> urnsToOverride, ExternalTranslationOptions options)
throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

RunnerApi.Pipeline updateTransformViaTransformService(
RunnerApi.Pipeline runnerAPIpipeline,
String transformId,
Endpoints.ApiServiceDescriptor transformServiceEndpoint)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"DefaultExpansionServiceClientFactory" caches channels for endpoints. So I don't think this is an issue.

PTransform transformToUpgrade =
runnerAPIpipeline.getComponents().getTransformsMap().get(transformId);
if (transformToUpgrade == null) {
throw new Exception("Could not find a transform with the ID " + transformId);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to throw IllegalArgumentException.

throw new RuntimeException(e);
}

Row configRow =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

clientFactory.getExpansionServiceClient(transformServiceEndpoint).expand(request);

if (!Strings.isNullOrEmpty(response.getError())) {
throw new IOException(String.format("expansion service error: %s", response.getError()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to RuntimeException.


ByteStringOutputStream schemaOutputStream = new ByteStringOutputStream();
try (ObjectOutputStream schemaObjOut = new ObjectOutputStream(schemaOutputStream)) {
schemaObjOut.writeObject(SchemaTranslation.schemaToProto(configRow.getSchema(), true));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

// optional.
}
if (configRow != null) {
ByteStringOutputStream rowOutputStream = new ByteStringOutputStream();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

}
}

String newTransformId = transformId + "_upgraded";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

transformBuilder.putAllInputs(updatedInputsMap);
}

// Fix sub-transforms
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

newComponentsBuilder.putAllTransforms(updatedExpandedTransformMap);
newComponentsBuilder.putTransforms(newTransformId, expandedTransform);

// We fix the root in case the overridden transform was one of the roots.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@chamikaramj
Copy link
Contributor Author

Friendly ping :)

Copy link
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This looks good to me.

@chamikaramj
Copy link
Contributor Author

Thanks.

@chamikaramj chamikaramj merged commit 1c50fd2 into apache:master Sep 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants