Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Managed BigQueryIO #31486

Merged
merged 30 commits into from
Nov 12, 2024
Merged

Managed BigQueryIO #31486

merged 30 commits into from
Nov 12, 2024

Conversation

ahmedabu98
Copy link
Contributor

No description provided.

@ahmedabu98
Copy link
Contributor Author

R: @chamikaramj

Copy link
Contributor

github-actions bot commented Jun 3, 2024

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@ahmedabu98 ahmedabu98 changed the title Supported BigQueryIO as a Managed Transform Support BigQueryIO (Storage API) as a Managed Transform Jun 4, 2024
@ahmedabu98 ahmedabu98 removed this from the 2.57.0 Release milestone Jun 4, 2024
@ahmedabu98 ahmedabu98 changed the title Support BigQueryIO (Storage API) as a Managed Transform Managed BigQueryIO Jul 9, 2024
@ahmedabu98 ahmedabu98 added this to the 2.58.0 Release milestone Jul 9, 2024
@ahmedabu98
Copy link
Contributor Author

R: @chamikaramj
R: @robertwb

@ahmedabu98 ahmedabu98 removed this from the 2.58.0 Release milestone Jul 10, 2024
Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but this is a lot so good to get a second set of eyes. I could easily have missed something.

@@ -36,6 +36,9 @@ dependencies {
permitUnusedDeclared project(":sdks:java:io:google-cloud-platform") // BEAM-11761
implementation project(":sdks:java:extensions:schemaio-expansion-service")
permitUnusedDeclared project(":sdks:java:extensions:schemaio-expansion-service") // BEAM-11761
implementation project(":sdks:java:managed")
permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notes, no action required on this PR:

  • This is a link to Jira, so probably there's a github issue it is migrated to
  • This should be equivalent to runtimeOnly because it is "implementation" but no static references to it. I would guess this works the same, or else the uberjar plugin might not treat it right.
  • Putting these deps into a docker container without making an uber jar would honestly be better in the case where it does end up in a container, so we keep the original jar metadata.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.


if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
CreateDisposition.valueOf(configuration.getCreateDisposition().toUpperCase());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a larger point, I think we should do any transform overriding in job submission (BQ modes for batch/streaming etc.) so that we can just upgrade in the backend (at least in the first version).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean making this switch in the SDK (ie. construction time)? I assumed we had settled on making it a runner side decision

Some decisions are actually dependent on the runner (e.g. at least one streaming mode in Dataflow)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean making this switch in the SDK (ie. construction time)? I assumed we had settled on making it a runner side decision

Yeah. Added some comments to the relavent doc.

@ahmedabu98 ahmedabu98 removed this from the 2.60.0 Release milestone Oct 4, 2024
@liferoad liferoad added this to the 2.61.0 Release milestone Oct 28, 2024
@chamikaramj
Copy link
Contributor

Based on an offline discussion, we should do the forking to select the correct write transforms at a single 'BigQueryWriteSchemaWriteTransformProvider' instead of expecting the caller to perform the expansion for the exact implementation. The pipeline options needed for this (for example, dataflowServiceOptions=streaming_mode_at_least_once) should be provided to this call (accessible via input.getPipeline().getOptions()). Also, we can use input.get(INPUT_ROWS_TAG).isBounded() to determine whether this is an unbounded call or not.

This should be possible since all BQ write schema-transforms here share the same configuration.

May be we should keep read general in a similar manner to support future expansions / read methods.

cc: @robertwb

@ahmedabu98
Copy link
Contributor Author

ahmedabu98 commented Oct 31, 2024

I suggest we do the forking on the SDK side only if it's actually necessary. Keep in mind this approach will require adding the Dataflow runner as a dependency (to access dataflowServiceOptions) in Managed API or GCP IOs (runner-independent modules).

This PR used to include such forking but that logic was reverted due to this seemingly unavoidable dependency:
74bc178

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@@ -61,7 +60,7 @@ protected SchemaTransform from(BigQueryWriteConfiguration configuration) {

@Override
public String identifier() {
return getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_FILE_LOADS);
return "beam:schematransform:org.apache.beam:bigquery_fileloads:v1";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's still fine to define these URNs in the proto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we don't need to right now? Since we're using a wrapper over file loads and storage api writes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be will be useful for any Python wrappers that directly use specific schema-transforms ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case maybe we can do this in a separate PR that targets all schematransforms?

@@ -100,7 +98,7 @@ protected SchemaTransform from(BigQueryWriteConfiguration configuration) {

@Override
public String identifier() {
return getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_STORAGE_WRITE);
return "beam:schematransform:org.apache.beam:bigquery_storage_write:v2";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

readPipeline
.apply(Managed.read(Managed.BIGQUERY).withConfig(config))
.getSinglePCollection();
PAssert.that(outputRows).containsInAnyOrder(ROWS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also confirm that we end up using the correct sink here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially did not know how we would perform such a check. Will think about it again and give it a try

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests that verifies by looking at the pipeline proto, PTAL!

@chamikaramj
Copy link
Contributor

LGTM. Feel free to merge once comments are addressed.

@@ -121,6 +153,8 @@ public void testStreamingStorageWriteRead() {
// streaming write
PCollectionRowTuple.of("input", getInput(writePipeline, true))
.apply(Managed.write(Managed.BIGQUERY).withConfig(config));
assertPipelineContainsTransformIdentifier(
writePipeline, new BigQueryStorageWriteApiSchemaTransformProvider().identifier());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the MANAGED_UNDERLYING_TRANSFORM_URN_KEY annotation of the "Managed" transform will mention the top level BigQueryWriteSchemaTransform instead of specific implementation for the write method, right ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, we should add a unit test for this if we don't have one already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the MANAGED_UNDERLYING_TRANSFORM_URN_KEY annotation of the "Managed" transform will mention the top level BigQueryWriteSchemaTransform instead of specific implementation for the write method, right ?

Hmmm good catch, I just switched it to mentioning the top level URN. This means we lose information on what the underlying implementation is though

Copy link
Contributor Author

@ahmedabu98 ahmedabu98 Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a unit test for this

Adding unit tests to the respective schematransform test classes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tweaked the test to look for the transform name instead. Not as clean as looking for a URN, but it does the job

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM.

@ahmedabu98
Copy link
Contributor Author

P.S. added one last commit to expose to the Python SDK

@ahmedabu98 ahmedabu98 merged commit 628348b into apache:master Nov 12, 2024
113 of 116 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants