-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Default translation for SchemaTransforms #31558
Conversation
Could you add a description of what is improved here? |
Added a description. This is ready for review now |
R: @robertwb |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
public abstract class SchemaTransform extends PTransform<PCollectionRowTuple, PCollectionRowTuple> { | ||
private @Nullable Row configurationRow; | ||
private @Nullable String identifier; | ||
private boolean registered = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use "registered" throughout, but I'm not sure if it's the best terminology here. Open to other suggestions!
public abstract class SchemaTransform | ||
extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {} | ||
public abstract class SchemaTransform extends PTransform<PCollectionRowTuple, PCollectionRowTuple> { | ||
private @Nullable Row configurationRow; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like putting these (optional) private variables as part of the base class violates separation of concerns.
Taking a step back, is the problem that you're trying to solve is that those transforms that come from a In this case, it's a bit odd because it seems we can only derive the inverse for the specific instances that we created, not generally for all instances of that class. (But maybe that's good enough sometimes?) Might be cleaner in that case to just have a separate (weakref) mapping (registrar) from PTransform instances to their corresponding configs rather add a register method and private members to Separately it'd be good to make defining this pair of mappings so easy that it's "just how users write PTransforms" (maybe working it into builder patterns or providing a good annotation or something like that) and we get their externalization and semantic graph representation for free. But this is likely a larger project. |
Yep that’s the main problem being solved here. To be clear though, transforms that come from any
By design, these SchemaTransforms are always created using the corresponding I share the aversion towards adding unnecessary things to SchemaTransform though. I’ve been trying to cut things down as I go, but there’s probably still a better solution.
Thanks! Will take a stab at this |
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
To make a PTransform upgradable, we need to write some translation logic for it (see TransformPayloadTranslator). Essentially, this is logic that can:
So far, this has meant handwriting large customized logic for each IO (see translation for BigQueryIO, KafkaIO). While this may be unavoidable for some IOs, the logic can be unified for SchemaTransforms, which already operate using Beam Rows.
With these changes, the SchemaTransformProvider implementation needs to just add one line when building the SchemaTransform:
.register(<config row>, <identifier>)
on the SchemaTransform itself, orregister(<config>, <schematransform>)
after building the SchemaTransform in TypedSchemaTransformProvider.By storing this information in the transform instance, it can be translated using the SchemaTransformTranslator.
This PR includes a few such improvements, removing the handwritten translation logic for Iceberg(Read/Write)SchemaTransform, Kafka(Read/Write)SchemaTransform, ManagedSchemaTransform.
P.S. these were already shorter than usual because of the SchemaTransformPayloadTranslator abstraction introduced in #30910. This PR is a continuation to further simplify it