From 9a2d456f72b07ac449cd24c6195179a3b1b8be65 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Tue, 15 Oct 2024 11:10:31 +0300 Subject: [PATCH] [Python] Managed Transforms API (#31495) * managed module * clean up * lint * try with real example * cleanup * add documentation * fix doc * add pyyaml dependency * cleanup * return deps * return deps * fix doc * address some comments * doc updates * define managed transform URNs in proto * fix URN * remove managed dependency * add managed iceberg integration test * lint * lint * dependency fix * lint * dependency fix * dependency fix * lint * lint * dependency fix * rename test file --- ...m_PostCommit_Python_Xlang_IO_Dataflow.json | 2 +- ...eam_PostCommit_Python_Xlang_IO_Direct.json | 4 + ...beam_PostCommit_Python_Xlang_IO_Direct.yml | 96 +++++++++ CHANGES.md | 2 + .../pipeline/v1/external_transforms.proto | 14 ++ sdks/java/io/expansion-service/build.gradle | 2 + sdks/java/io/iceberg/build.gradle | 3 +- .../apache/beam/sdk/io/iceberg/IcebergIO.java | 5 +- .../IcebergReadSchemaTransformProvider.java | 6 +- .../IcebergWriteSchemaTransformProvider.java | 5 +- sdks/java/io/kafka/build.gradle | 1 + .../KafkaReadSchemaTransformProvider.java | 4 +- .../KafkaWriteSchemaTransformProvider.java | 5 +- sdks/java/managed/build.gradle | 1 + .../org/apache/beam/sdk/managed/Managed.java | 11 +- .../managed/ManagedTransformConstants.java | 13 +- .../apache_beam/portability/common_urns.py | 1 + .../python/apache_beam/transforms/__init__.py | 1 + sdks/python/apache_beam/transforms/managed.py | 182 ++++++++++++++++++ .../transforms/managed_iceberg_it_test.py | 70 +++++++ sdks/python/setup.py | 3 +- sdks/standard_expansion_services.yaml | 2 +- 22 files changed, 407 insertions(+), 26 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json create mode 100644 .github/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml create mode 100644 sdks/python/apache_beam/transforms/managed.py create mode 100644 sdks/python/apache_beam/transforms/managed_iceberg_it_test.py diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json index e3d6056a5de9..b26833333238 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json new file mode 100644 index 000000000000..e3d6056a5de9 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 +} diff --git a/.github/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml b/.github/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml new file mode 100644 index 000000000000..5092a1981154 --- /dev/null +++ b/.github/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: PostCommit Python Xlang IO Direct + +on: + schedule: + - cron: '30 5/6 * * *' + pull_request_target: + paths: ['release/trigger_all_tests.json', '.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json'] + workflow_dispatch: + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: write + checks: write + contents: read + deployments: read + id-token: none + issues: write + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +env: + DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + beam_PostCommit_Python_Xlang_IO_Direct: + if: | + github.event_name == 'workflow_dispatch' || + github.event_name == 'pull_request_target' || + (github.event_name == 'schedule' && github.repository == 'apache/beam') || + github.event.comment.body == 'Run Python_Xlang_IO_Direct PostCommit' + runs-on: [self-hosted, ubuntu-20.04, main] + timeout-minutes: 100 + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + strategy: + matrix: + job_name: ["beam_PostCommit_Python_Xlang_IO_Direct"] + job_phrase: ["Run Python_Xlang_IO_Direct PostCommit"] + steps: + - uses: actions/checkout@v4 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup environment + uses: ./.github/actions/setup-environment-action + with: + python-version: | + 3.9 + 3.12 + - name: run PostCommit Python Xlang IO Direct script + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :sdks:python:test-suites:direct:ioCrossLanguagePostCommit + arguments: -PuseWheelDistribution + - name: Archive Python Test Results + uses: actions/upload-artifact@v4 + if: failure() + with: + name: Python Test Results + path: '**/pytest*.xml' + - name: Publish Python Test Results + uses: EnricoMi/publish-unit-test-result-action@v2 + if: always() + with: + commit: '${{ env.prsha || env.GITHUB_SHA }}' + comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} + files: '**/pytest*.xml' \ No newline at end of file diff --git a/CHANGES.md b/CHANGES.md index cc1268635046..4e21e400e60d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -59,11 +59,13 @@ * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). +* [Python] Introduce Managed Transforms API ([#31495](https://github.com/apache/beam/pull/31495)) ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686)) +* [Managed Iceberg] Now available in Python SDK ([#31495](https://github.com/apache/beam/pull/31495)) * [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) ## New Features / Improvements diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index 429371e11055..b03350966d6c 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -59,6 +59,20 @@ message ExpansionMethods { } } +// Defines the URNs for managed transforms. +message ManagedTransforms { + enum Urns { + ICEBERG_READ = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:iceberg_read:v1"]; + ICEBERG_WRITE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:iceberg_write:v1"]; + KAFKA_READ = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:kafka_read:v1"]; + KAFKA_WRITE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:kafka_write:v1"]; + } +} + // A configuration payload for an external transform. // Used to define a Java transform that can be directly instantiated by a Java // expansion service. diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index b09a92ca315c..cc8eccf98997 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -44,6 +44,8 @@ ext.summary = "Expansion service serving several Java IOs" dependencies { implementation project(":sdks:java:expansion-service") permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761 + implementation project(":sdks:java:managed") + permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761 implementation project(":sdks:java:io:iceberg") permitUnusedDeclared project(":sdks:java:io:iceberg") // BEAM-11761 implementation project(":sdks:java:io:kafka") diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 3d653d6b276e..e10c6f38e20f 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -44,7 +44,7 @@ def orc_version = "1.9.2" dependencies { implementation library.java.vendored_guava_32_1_2_jre implementation project(path: ":sdks:java:core", configuration: "shadow") - implementation project(":sdks:java:managed") + implementation project(path: ":model:pipeline", configuration: "shadow") implementation library.java.slf4j_api implementation library.java.joda_time implementation "org.apache.parquet:parquet-column:$parquet_version" @@ -55,6 +55,7 @@ dependencies { implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" implementation library.java.hadoop_common + testImplementation project(":sdks:java:managed") testImplementation library.java.hadoop_client testImplementation library.java.bigdataoss_gcsio testImplementation library.java.bigdataoss_gcs_connector diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index fa4ff9714c7f..1d4b36585237 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -24,7 +24,6 @@ import java.util.List; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; @@ -45,8 +44,8 @@ * A connector that reads and writes to Apache Iceberg * tables. * - *
{@link IcebergIO} is offered as a {@link Managed} transform. This class is subject to change - * and should not be used directly. Instead, use it via {@link Managed#ICEBERG} like so: + *
{@link IcebergIO} is offered as a Managed transform. This class is subject to change and + * should not be used directly. Instead, use it like so: * *
{@code * Mapconfig = Map.of( diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index df7bda4560dd..d44149fda08e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -17,10 +17,12 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import com.google.auto.service.AutoService; import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.managed.ManagedTransformConstants; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; @@ -53,7 +55,7 @@ public List outputCollectionNames() { @Override public String identifier() { - return ManagedTransformConstants.ICEBERG_READ; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ); } static class IcebergReadSchemaTransform extends SchemaTransform { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index ea46e8560815..6aa830e7fbc6 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -18,13 +18,14 @@ package org.apache.beam.sdk.io.iceberg; import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Configuration; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.beam.sdk.managed.ManagedTransformConstants; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; @@ -151,7 +152,7 @@ public List outputCollectionNames() { @Override public String identifier() { - return ManagedTransformConstants.ICEBERG_WRITE; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE); } static class IcebergWriteSchemaTransform extends SchemaTransform { diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index 0ba6fa642a02..ec4654bd88df 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -54,6 +54,7 @@ dependencies { provided library.java.jackson_dataformat_csv permitUnusedDeclared library.java.jackson_dataformat_csv implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(path: ":model:pipeline", configuration: "shadow") implementation project(":sdks:java:extensions:avro") implementation project(":sdks:java:extensions:protobuf") implementation project(":sdks:java:expansion-service") diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java index e87669ab2b0a..a3fd1d8c3fd7 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.kafka; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; import com.google.auto.service.AutoService; import java.io.FileOutputStream; @@ -34,6 +35,7 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; @@ -103,7 +105,7 @@ public Row apply(byte[] input) { @Override public String identifier() { - return "beam:schematransform:org.apache.beam:kafka_read:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ); } @Override diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java index 09b338492b47..d6f46b11cb7d 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.kafka; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.io.Serializable; @@ -26,6 +28,7 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils; import org.apache.beam.sdk.metrics.Counter; @@ -249,7 +252,7 @@ public byte[] apply(Row input) { @Override public @UnknownKeyFor @NonNull @Initialized String identifier() { - return "beam:schematransform:org.apache.beam:kafka_write:v1"; + return getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE); } @Override diff --git a/sdks/java/managed/build.gradle b/sdks/java/managed/build.gradle index add0d7f3cc0d..c6e868872246 100644 --- a/sdks/java/managed/build.gradle +++ b/sdks/java/managed/build.gradle @@ -28,6 +28,7 @@ ext.summary = """Library that provides managed IOs.""" dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(path: ":model:pipeline", configuration: "shadow") implementation library.java.vendored_guava_32_1_2_jre implementation library.java.slf4j_api diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index 911e25cdda14..8477726686ee 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -17,11 +17,14 @@ */ package org.apache.beam.sdk.managed; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import com.google.auto.value.AutoValue; import java.util.ArrayList; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; @@ -87,13 +90,13 @@ public class Managed { // Supported SchemaTransforms public static final Map READ_TRANSFORMS = ImmutableMap. builder() - .put(ICEBERG, ManagedTransformConstants.ICEBERG_READ) - .put(KAFKA, ManagedTransformConstants.KAFKA_READ) + .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ)) + .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ)) .build(); public static final Map WRITE_TRANSFORMS = ImmutableMap. builder() - .put(ICEBERG, ManagedTransformConstants.ICEBERG_WRITE) - .put(KAFKA, ManagedTransformConstants.KAFKA_WRITE) + .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE)) + .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE)) .build(); /** diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java index 51d0b67b4b89..4cf752747be5 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java @@ -17,7 +17,10 @@ */ package org.apache.beam.sdk.managed; +import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn; + import java.util.Map; +import org.apache.beam.model.pipeline.v1.ExternalTransforms; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; /** @@ -41,12 +44,6 @@ public class ManagedTransformConstants { // Standard input PCollection tag public static final String INPUT = "input"; - public static final String ICEBERG_READ = "beam:schematransform:org.apache.beam:iceberg_read:v1"; - public static final String ICEBERG_WRITE = - "beam:schematransform:org.apache.beam:iceberg_write:v1"; - public static final String KAFKA_READ = "beam:schematransform:org.apache.beam:kafka_read:v1"; - public static final String KAFKA_WRITE = "beam:schematransform:org.apache.beam:kafka_write:v1"; - private static final Map KAFKA_READ_MAPPINGS = ImmutableMap. builder().put("data_format", "format").build(); @@ -55,7 +52,7 @@ public class ManagedTransformConstants { public static final Map > MAPPINGS = ImmutableMap. >builder() - .put(KAFKA_READ, KAFKA_READ_MAPPINGS) - .put(KAFKA_WRITE, KAFKA_WRITE_MAPPINGS) + .put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ), KAFKA_READ_MAPPINGS) + .put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE), KAFKA_WRITE_MAPPINGS) .build(); } diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py index 4effc91c3d40..74d9a39bb052 100644 --- a/sdks/python/apache_beam/portability/common_urns.py +++ b/sdks/python/apache_beam/portability/common_urns.py @@ -38,6 +38,7 @@ StandardSideInputTypes = beam_runner_api_pb2_urns.StandardSideInputTypes StandardUserStateTypes = beam_runner_api_pb2_urns.StandardUserStateTypes ExpansionMethods = external_transforms_pb2_urns.ExpansionMethods +ManagedTransforms = external_transforms_pb2_urns.ManagedTransforms MonitoringInfo = metrics_pb2_urns.MonitoringInfo MonitoringInfoSpecs = metrics_pb2_urns.MonitoringInfoSpecs MonitoringInfoTypeUrns = metrics_pb2_urns.MonitoringInfoTypeUrns diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py index 4e66a290842c..b8b6839019e8 100644 --- a/sdks/python/apache_beam/transforms/__init__.py +++ b/sdks/python/apache_beam/transforms/__init__.py @@ -22,6 +22,7 @@ from apache_beam.transforms import combiners from apache_beam.transforms.core import * from apache_beam.transforms.external import * +from apache_beam.transforms.managed import * from apache_beam.transforms.ptransform import * from apache_beam.transforms.stats import * from apache_beam.transforms.timeutil import TimeDomain diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py new file mode 100644 index 000000000000..22ee15b1de1c --- /dev/null +++ b/sdks/python/apache_beam/transforms/managed.py @@ -0,0 +1,182 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Managed Transforms. + +This module builds and instantiates turnkey transforms that can be managed by +the underlying runner. This means the runner can upgrade the transform to a +more optimal/updated version without requiring the user to do anything. It may +also replace the transform with something entirely different if it chooses to. +By default, however, the specified transform will remain unchanged. + +Using Managed Transforms +======================== +Managed turnkey transforms have a defined configuration and can be built using +an inline :class:`dict` like so:: + + results = p | beam.managed.Read( + beam.managed.ICEBERG, + config={"table": "foo", + "catalog_name": "bar", + "catalog_properties": { + "warehouse": "path/to/warehouse", + "catalog-impl": "org.apache.my.CatalogImpl"}}) + +A YAML configuration file can also be used to build a Managed transform. Say we +have the following `config.yaml` file:: + + topic: "foo" + bootstrap_servers: "localhost:1234" + format: "AVRO" + +Simply provide the location to the file like so:: + + input_rows = p | beam.Create(...) + input_rows | beam.managed.Write( + beam.managed.KAFKA, + config_url="path/to/config.yaml") + +Available transforms +==================== +Available transforms are: + +- **Kafka Read and Write** +- **Iceberg Read and Write** + +**Note:** inputs and outputs need to be PCollection(s) of Beam +:py:class:`apache_beam.pvalue.Row` elements. + +**Note:** Today, all managed transforms are essentially cross-language +transforms, and Java's ManagedSchemaTransform is used under the hood. +""" + +from typing import Any +from typing import Dict +from typing import Optional + +import yaml + +from apache_beam.portability.common_urns import ManagedTransforms +from apache_beam.transforms.external import BeamJarExpansionService +from apache_beam.transforms.external import SchemaAwareExternalTransform +from apache_beam.transforms.ptransform import PTransform + +ICEBERG = "iceberg" +KAFKA = "kafka" +_MANAGED_IDENTIFIER = "beam:transform:managed:v1" +_EXPANSION_SERVICE_JAR_TARGETS = { + "sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG], +} + +__all__ = ["ICEBERG", "KAFKA", "Read", "Write"] + + +class Read(PTransform): + """Read using Managed Transforms""" + _READ_TRANSFORMS = { + ICEBERG: ManagedTransforms.Urns.ICEBERG_READ.urn, + KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn, + } + + def __init__( + self, + source: str, + config: Optional[Dict[str, Any]] = None, + config_url: Optional[str] = None, + expansion_service=None): + super().__init__() + self._source = source + identifier = self._READ_TRANSFORMS.get(source.lower()) + if not identifier: + raise ValueError( + f"An unsupported source was specified: '{source}'. Please specify " + f"one of the following sources: {list(self._READ_TRANSFORMS.keys())}") + + self._expansion_service = _resolve_expansion_service( + source, identifier, expansion_service) + self._underlying_identifier = identifier + self._yaml_config = yaml.dump(config) + self._config_url = config_url + + def expand(self, input): + return input | SchemaAwareExternalTransform( + identifier=_MANAGED_IDENTIFIER, + expansion_service=self._expansion_service, + rearrange_based_on_discovery=True, + transform_identifier=self._underlying_identifier, + config=self._yaml_config, + config_url=self._config_url) + + def default_label(self) -> str: + return "Managed Read(%s)" % self._source.upper() + + +class Write(PTransform): + """Write using Managed Transforms""" + _WRITE_TRANSFORMS = { + ICEBERG: ManagedTransforms.Urns.ICEBERG_WRITE.urn, + KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn, + } + + def __init__( + self, + sink: str, + config: Optional[Dict[str, Any]] = None, + config_url: Optional[str] = None, + expansion_service=None): + super().__init__() + self._sink = sink + identifier = self._WRITE_TRANSFORMS.get(sink.lower()) + if not identifier: + raise ValueError( + f"An unsupported sink was specified: '{sink}'. Please specify " + f"one of the following sinks: {list(self._WRITE_TRANSFORMS.keys())}") + + self._expansion_service = _resolve_expansion_service( + sink, identifier, expansion_service) + self._underlying_identifier = identifier + self._yaml_config = yaml.dump(config) + self._config_url = config_url + + def expand(self, input): + return input | SchemaAwareExternalTransform( + identifier=_MANAGED_IDENTIFIER, + expansion_service=self._expansion_service, + rearrange_based_on_discovery=True, + transform_identifier=self._underlying_identifier, + config=self._yaml_config, + config_url=self._config_url) + + def default_label(self) -> str: + return "Managed Write(%s)" % self._sink.upper() + + +def _resolve_expansion_service( + transform_name: str, identifier: str, expansion_service): + if expansion_service: + return expansion_service + + default_target = None + for gradle_target, transforms in _EXPANSION_SERVICE_JAR_TARGETS.items(): + if transform_name.lower() in transforms: + default_target = gradle_target + break + if not default_target: + raise ValueError( + "No expansion service was specified and could not find a " + f"default expansion service for {transform_name}: '{identifier}'.") + return BeamJarExpansionService(default_target) diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py new file mode 100644 index 000000000000..2d7262bac031 --- /dev/null +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -0,0 +1,70 @@ +import os +import secrets +import shutil +import tempfile +import time +import unittest + +import pytest + +import apache_beam as beam +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + + +@pytest.mark.uses_io_java_expansion_service +@unittest.skipUnless( + os.environ.get('EXPANSION_JARS'), + "EXPANSION_JARS environment var is not provided, " + "indicating that jars have not been built") +class ManagedIcebergIT(unittest.TestCase): + def setUp(self): + self._tempdir = tempfile.mkdtemp() + if not os.path.exists(self._tempdir): + os.mkdir(self._tempdir) + test_warehouse_name = 'test_warehouse_%d_%s' % ( + int(time.time()), secrets.token_hex(3)) + self.warehouse_path = os.path.join(self._tempdir, test_warehouse_name) + os.mkdir(self.warehouse_path) + + def tearDown(self): + shutil.rmtree(self._tempdir, ignore_errors=False) + + def _create_row(self, num: int): + return beam.Row( + int_=num, + str_=str(num), + bytes_=bytes(num), + bool_=(num % 2 == 0), + float_=(num + float(num) / 100)) + + def test_write_read_pipeline(self): + iceberg_config = { + "table": "test.write_read", + "catalog_name": "default", + "catalog_properties": { + "type": "hadoop", + "warehouse": f"file://{self.warehouse_path}", + } + } + + rows = [self._create_row(i) for i in range(100)] + expected_dicts = [row.as_dict() for row in rows] + + with beam.Pipeline() as write_pipeline: + _ = ( + write_pipeline + | beam.Create(rows) + | beam.managed.Write(beam.managed.ICEBERG, config=iceberg_config)) + + with beam.Pipeline() as read_pipeline: + output_dicts = ( + read_pipeline + | beam.managed.Read(beam.managed.ICEBERG, config=iceberg_config) + | beam.Map(lambda row: row._asdict())) + + assert_that(output_dicts, equal_to(expected_dicts)) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 15671eeb145b..b4175ad98e92 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -391,6 +391,7 @@ def get_portability_package_data(): 'sortedcontainers>=2.4.0', 'typing-extensions>=3.7.0', 'zstandard>=0.18.0,<1', + 'pyyaml>=3.12,<7.0.0', # Dynamic dependencies must be specified in a separate list, otherwise # Dependabot won't be able to parse the main list. Any dynamic # dependencies will not receive updates from Dependabot. @@ -415,7 +416,6 @@ def get_portability_package_data(): 'pandas<2.2.0', 'parameterized>=0.7.1,<0.10.0', 'pyhamcrest>=1.9,!=1.10.0,<3.0.0', - 'pyyaml>=3.12,<7.0.0', 'requests_mock>=1.7,<2.0', 'tenacity>=8.0.0,<9', 'pytest>=7.1.2,<8.0', @@ -523,7 +523,6 @@ def get_portability_package_data(): 'yaml': [ 'docstring-parser>=0.15,<1.0', 'jinja2>=3.0,<3.2', - 'pyyaml>=3.12,<7.0.0', 'virtualenv-clone>=0.5,<1.0', # https://github.com/PiotrDabkowski/Js2Py/issues/317 'js2py>=0.74,<1; python_version<"3.12"', diff --git a/sdks/standard_expansion_services.yaml b/sdks/standard_expansion_services.yaml index ad965c8a1ee3..623e33fe2e7c 100644 --- a/sdks/standard_expansion_services.yaml +++ b/sdks/standard_expansion_services.yaml @@ -48,7 +48,7 @@ # Handwritten Kafka wrappers already exist in apache_beam/io/kafka.py - 'beam:schematransform:org.apache.beam:kafka_write:v1' - 'beam:schematransform:org.apache.beam:kafka_read:v1' - # Not ready to generate + # Available through apache_beam.transforms.managed.[Read/Write] - 'beam:schematransform:org.apache.beam:iceberg_write:v1' - 'beam:schematransform:org.apache.beam:iceberg_read:v1'