-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Python] Managed Transforms API #31495
[Python] Managed Transforms API #31495
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #31495 +/- ##
=========================================
Coverage 71.14% 71.14%
Complexity 3008 3008
=========================================
Files 1055 1055
Lines 133439 133439
Branches 3248 3248
=========================================
Hits 94929 94929
Misses 35382 35382
Partials 3128 3128 ☔ View full report in Codecov by Sentry. |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
…on_managed_transforms
Successful Dataflow run (2024-07-15_13_54_11-2004515317250011573) with the following code: import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.INFO)
options = PipelineOptions([
"--runner=DataflowRunner",
"--job_name=managed-iceberg-read-demo",
"--project=apache-beam-testing",
"--temp_location=gs://apache-beam-testing-ahmedabualsaud/tmp",
"--region=us-central1",
"--sdk_location=sdks/python/dist/apache_beam-2.59.0.dev0.tar.gz",
"--sdk_harness_container_image_overrides=.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest"
])
with beam.Pipeline(options=options) as p:
p | beam.managed.Read(
"iceberg",
config={
"table": "my_database.my_table",
"catalog_name": "ahmed_catalog",
"catalog_properties": {
"catalog-impl": "org.apache.iceberg.hadoop.HadoopCatalog",
"warehouse": "gs://ahmedabualsaud-apache-beam-testing"
}}) | "Log rows" >> beam.Map(_LOGGER.info) Note: I had to include the following dependencies (needed by Iceberg) in the expansion service jar:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments but Cham would have more context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments but Cham would be have more context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
@chamikaramj the last comment was just addressed. It's been a while though, lmk if you'd like to take another quick look before I merge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! LGTM.
import shutil | ||
import tempfile | ||
import time | ||
import unittest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably this file should be named "managed_iceberg_it_test.py"
Fixes #31830 |
Looks like we merged this with |
|
Ahh my apologies, I didn't pay attention and thought the failure was a flake |
Opened #32785 |
* managed module * clean up * lint * try with real example * cleanup * add documentation * fix doc * add pyyaml dependency * cleanup * return deps * return deps * fix doc * address some comments * doc updates * define managed transform URNs in proto * fix URN * remove managed dependency * add managed iceberg integration test * lint * lint * dependency fix * lint * dependency fix * dependency fix * lint * lint * dependency fix * rename test file
Adding Python API for Managed transforms, similar to the Java API.