diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json index e3d6056a5de9..b26833333238 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2 } diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json new file mode 100644 index 000000000000..e3d6056a5de9 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json @@ -0,0 +1,4 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 +} diff --git a/.github/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml b/.github/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml new file mode 100644 index 000000000000..5092a1981154 --- /dev/null +++ b/.github/workflows/beam_PostCommit_Python_Xlang_IO_Direct.yml @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: PostCommit Python Xlang IO Direct + +on: + schedule: + - cron: '30 5/6 * * *' + pull_request_target: + paths: ['release/trigger_all_tests.json', '.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Direct.json'] + workflow_dispatch: + +#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event +permissions: + actions: write + pull-requests: write + checks: write + contents: read + deployments: read + id-token: none + issues: write + discussions: read + packages: read + pages: read + repository-projects: read + security-events: read + statuses: read + +# This allows a subsequently queued workflow run to interrupt previous runs +concurrency: + group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}' + cancel-in-progress: true + +env: + DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }} + GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} + +jobs: + beam_PostCommit_Python_Xlang_IO_Direct: + if: | + github.event_name == 'workflow_dispatch' || + github.event_name == 'pull_request_target' || + (github.event_name == 'schedule' && github.repository == 'apache/beam') || + github.event.comment.body == 'Run Python_Xlang_IO_Direct PostCommit' + runs-on: [self-hosted, ubuntu-20.04, main] + timeout-minutes: 100 + name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + strategy: + matrix: + job_name: ["beam_PostCommit_Python_Xlang_IO_Direct"] + job_phrase: ["Run Python_Xlang_IO_Direct PostCommit"] + steps: + - uses: actions/checkout@v4 + - name: Setup repository + uses: ./.github/actions/setup-action + with: + comment_phrase: ${{ matrix.job_phrase }} + github_token: ${{ secrets.GITHUB_TOKEN }} + github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) + - name: Setup environment + uses: ./.github/actions/setup-environment-action + with: + python-version: | + 3.9 + 3.12 + - name: run PostCommit Python Xlang IO Direct script + uses: ./.github/actions/gradle-command-self-hosted-action + with: + gradle-command: :sdks:python:test-suites:direct:ioCrossLanguagePostCommit + arguments: -PuseWheelDistribution + - name: Archive Python Test Results + uses: actions/upload-artifact@v4 + if: failure() + with: + name: Python Test Results + path: '**/pytest*.xml' + - name: Publish Python Test Results + uses: EnricoMi/publish-unit-test-result-action@v2 + if: always() + with: + commit: '${{ env.prsha || env.GITHUB_SHA }}' + comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }} + files: '**/pytest*.xml' \ No newline at end of file diff --git a/CHANGES.md b/CHANGES.md index cc1268635046..4e21e400e60d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -59,11 +59,13 @@ * New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). * New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). +* [Python] Introduce Managed Transforms API ([#31495](https://github.com/apache/beam/pull/31495)) ## I/Os * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686)) +* [Managed Iceberg] Now available in Python SDK ([#31495](https://github.com/apache/beam/pull/31495)) * [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) ## New Features / Improvements diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto index 429371e11055..b03350966d6c 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto @@ -59,6 +59,20 @@ message ExpansionMethods { } } +// Defines the URNs for managed transforms. +message ManagedTransforms { + enum Urns { + ICEBERG_READ = 0 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:iceberg_read:v1"]; + ICEBERG_WRITE = 1 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:iceberg_write:v1"]; + KAFKA_READ = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:kafka_read:v1"]; + KAFKA_WRITE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) = + "beam:schematransform:org.apache.beam:kafka_write:v1"]; + } +} + // A configuration payload for an external transform. // Used to define a Java transform that can be directly instantiated by a Java // expansion service. diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index b09a92ca315c..cc8eccf98997 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -44,6 +44,8 @@ ext.summary = "Expansion service serving several Java IOs" dependencies { implementation project(":sdks:java:expansion-service") permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761 + implementation project(":sdks:java:managed") + permitUnusedDeclared project(":sdks:java:managed") // BEAM-11761 implementation project(":sdks:java:io:iceberg") permitUnusedDeclared project(":sdks:java:io:iceberg") // BEAM-11761 implementation project(":sdks:java:io:kafka") diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 3d653d6b276e..e10c6f38e20f 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -44,7 +44,7 @@ def orc_version = "1.9.2" dependencies { implementation library.java.vendored_guava_32_1_2_jre implementation project(path: ":sdks:java:core", configuration: "shadow") - implementation project(":sdks:java:managed") + implementation project(path: ":model:pipeline", configuration: "shadow") implementation library.java.slf4j_api implementation library.java.joda_time implementation "org.apache.parquet:parquet-column:$parquet_version" @@ -55,6 +55,7 @@ dependencies { implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" implementation library.java.hadoop_common + testImplementation project(":sdks:java:managed") testImplementation library.java.hadoop_client testImplementation library.java.bigdataoss_gcsio testImplementation library.java.bigdataoss_gcs_connector diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index fa4ff9714c7f..1d4b36585237 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -24,7 +24,6 @@ import java.util.List; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PBegin; @@ -45,8 +44,8 @@ * A connector that reads and writes to Apache Iceberg * tables. * - *

{@link IcebergIO} is offered as a {@link Managed} transform. This class is subject to change - * and should not be used directly. Instead, use it via {@link Managed#ICEBERG} like so: + *

{@link IcebergIO} is offered as a Managed transform. This class is subject to change and + * should not be used directly. Instead, use it like so: * *

{@code
  * Map config = Map.of(
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
index df7bda4560dd..d44149fda08e 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
@@ -17,10 +17,12 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
+
 import com.google.auto.service.AutoService;
 import java.util.Collections;
 import java.util.List;
-import org.apache.beam.sdk.managed.ManagedTransformConstants;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.schemas.SchemaRegistry;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -53,7 +55,7 @@ public List outputCollectionNames() {
 
   @Override
   public String identifier() {
-    return ManagedTransformConstants.ICEBERG_READ;
+    return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ);
   }
 
   static class IcebergReadSchemaTransform extends SchemaTransform {
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index ea46e8560815..6aa830e7fbc6 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -18,13 +18,14 @@
 package org.apache.beam.sdk.io.iceberg;
 
 import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Configuration;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
 
 import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.sdk.managed.ManagedTransformConstants;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
 import org.apache.beam.sdk.schemas.Schema;
@@ -151,7 +152,7 @@ public List outputCollectionNames() {
 
   @Override
   public String identifier() {
-    return ManagedTransformConstants.ICEBERG_WRITE;
+    return getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE);
   }
 
   static class IcebergWriteSchemaTransform extends SchemaTransform {
diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle
index 0ba6fa642a02..ec4654bd88df 100644
--- a/sdks/java/io/kafka/build.gradle
+++ b/sdks/java/io/kafka/build.gradle
@@ -54,6 +54,7 @@ dependencies {
   provided library.java.jackson_dataformat_csv
   permitUnusedDeclared library.java.jackson_dataformat_csv
   implementation project(path: ":sdks:java:core", configuration: "shadow")
+  implementation project(path: ":model:pipeline", configuration: "shadow")
   implementation project(":sdks:java:extensions:avro")
   implementation project(":sdks:java:extensions:protobuf")
   implementation project(":sdks:java:expansion-service")
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index e87669ab2b0a..a3fd1d8c3fd7 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.kafka;
 
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
 
 import com.google.auto.service.AutoService;
 import java.io.FileOutputStream;
@@ -34,6 +35,7 @@
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
 import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
 import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
@@ -103,7 +105,7 @@ public Row apply(byte[] input) {
 
   @Override
   public String identifier() {
-    return "beam:schematransform:org.apache.beam:kafka_read:v1";
+    return getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ);
   }
 
   @Override
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java
index 09b338492b47..d6f46b11cb7d 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io.kafka;
 
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
+
 import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
@@ -26,6 +28,7 @@
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
 import org.apache.beam.sdk.extensions.protobuf.ProtoByteUtils;
 import org.apache.beam.sdk.metrics.Counter;
@@ -249,7 +252,7 @@ public byte[] apply(Row input) {
 
   @Override
   public @UnknownKeyFor @NonNull @Initialized String identifier() {
-    return "beam:schematransform:org.apache.beam:kafka_write:v1";
+    return getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE);
   }
 
   @Override
diff --git a/sdks/java/managed/build.gradle b/sdks/java/managed/build.gradle
index add0d7f3cc0d..c6e868872246 100644
--- a/sdks/java/managed/build.gradle
+++ b/sdks/java/managed/build.gradle
@@ -28,6 +28,7 @@ ext.summary = """Library that provides managed IOs."""
 
 dependencies {
     implementation project(path: ":sdks:java:core", configuration: "shadow")
+    implementation project(path: ":model:pipeline", configuration: "shadow")
     implementation library.java.vendored_guava_32_1_2_jre
     implementation library.java.slf4j_api
 
diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
index 911e25cdda14..8477726686ee 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java
@@ -17,11 +17,14 @@
  */
 package org.apache.beam.sdk.managed;
 
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
+
 import com.google.auto.value.AutoValue;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
@@ -87,13 +90,13 @@ public class Managed {
   // Supported SchemaTransforms
   public static final Map READ_TRANSFORMS =
       ImmutableMap.builder()
-          .put(ICEBERG, ManagedTransformConstants.ICEBERG_READ)
-          .put(KAFKA, ManagedTransformConstants.KAFKA_READ)
+          .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_READ))
+          .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ))
           .build();
   public static final Map WRITE_TRANSFORMS =
       ImmutableMap.builder()
-          .put(ICEBERG, ManagedTransformConstants.ICEBERG_WRITE)
-          .put(KAFKA, ManagedTransformConstants.KAFKA_WRITE)
+          .put(ICEBERG, getUrn(ExternalTransforms.ManagedTransforms.Urns.ICEBERG_WRITE))
+          .put(KAFKA, getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE))
           .build();
 
   /**
diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
index 51d0b67b4b89..4cf752747be5 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedTransformConstants.java
@@ -17,7 +17,10 @@
  */
 package org.apache.beam.sdk.managed;
 
+import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
+
 import java.util.Map;
+import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 
 /**
@@ -41,12 +44,6 @@ public class ManagedTransformConstants {
   // Standard input PCollection tag
   public static final String INPUT = "input";
 
-  public static final String ICEBERG_READ = "beam:schematransform:org.apache.beam:iceberg_read:v1";
-  public static final String ICEBERG_WRITE =
-      "beam:schematransform:org.apache.beam:iceberg_write:v1";
-  public static final String KAFKA_READ = "beam:schematransform:org.apache.beam:kafka_read:v1";
-  public static final String KAFKA_WRITE = "beam:schematransform:org.apache.beam:kafka_write:v1";
-
   private static final Map KAFKA_READ_MAPPINGS =
       ImmutableMap.builder().put("data_format", "format").build();
 
@@ -55,7 +52,7 @@ public class ManagedTransformConstants {
 
   public static final Map> MAPPINGS =
       ImmutableMap.>builder()
-          .put(KAFKA_READ, KAFKA_READ_MAPPINGS)
-          .put(KAFKA_WRITE, KAFKA_WRITE_MAPPINGS)
+          .put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_READ), KAFKA_READ_MAPPINGS)
+          .put(getUrn(ExternalTransforms.ManagedTransforms.Urns.KAFKA_WRITE), KAFKA_WRITE_MAPPINGS)
           .build();
 }
diff --git a/sdks/python/apache_beam/portability/common_urns.py b/sdks/python/apache_beam/portability/common_urns.py
index 4effc91c3d40..74d9a39bb052 100644
--- a/sdks/python/apache_beam/portability/common_urns.py
+++ b/sdks/python/apache_beam/portability/common_urns.py
@@ -38,6 +38,7 @@
 StandardSideInputTypes = beam_runner_api_pb2_urns.StandardSideInputTypes
 StandardUserStateTypes = beam_runner_api_pb2_urns.StandardUserStateTypes
 ExpansionMethods = external_transforms_pb2_urns.ExpansionMethods
+ManagedTransforms = external_transforms_pb2_urns.ManagedTransforms
 MonitoringInfo = metrics_pb2_urns.MonitoringInfo
 MonitoringInfoSpecs = metrics_pb2_urns.MonitoringInfoSpecs
 MonitoringInfoTypeUrns = metrics_pb2_urns.MonitoringInfoTypeUrns
diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py
index 4e66a290842c..b8b6839019e8 100644
--- a/sdks/python/apache_beam/transforms/__init__.py
+++ b/sdks/python/apache_beam/transforms/__init__.py
@@ -22,6 +22,7 @@
 from apache_beam.transforms import combiners
 from apache_beam.transforms.core import *
 from apache_beam.transforms.external import *
+from apache_beam.transforms.managed import *
 from apache_beam.transforms.ptransform import *
 from apache_beam.transforms.stats import *
 from apache_beam.transforms.timeutil import TimeDomain
diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py
new file mode 100644
index 000000000000..22ee15b1de1c
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/managed.py
@@ -0,0 +1,182 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Managed Transforms.
+
+This module builds and instantiates turnkey transforms that can be managed by
+the underlying runner. This means the runner can upgrade the transform to a
+more optimal/updated version without requiring the user to do anything. It may
+also replace the transform with something entirely different if it chooses to.
+By default, however, the specified transform will remain unchanged.
+
+Using Managed Transforms
+========================
+Managed turnkey transforms have a defined configuration and can be built using
+an inline :class:`dict` like so::
+
+  results = p | beam.managed.Read(
+                    beam.managed.ICEBERG,
+                    config={"table": "foo",
+                            "catalog_name": "bar",
+                            "catalog_properties": {
+                                "warehouse": "path/to/warehouse",
+                                "catalog-impl": "org.apache.my.CatalogImpl"}})
+
+A YAML configuration file can also be used to build a Managed transform. Say we
+have the following `config.yaml` file::
+
+  topic: "foo"
+  bootstrap_servers: "localhost:1234"
+  format: "AVRO"
+
+Simply provide the location to the file like so::
+
+  input_rows = p | beam.Create(...)
+  input_rows | beam.managed.Write(
+                    beam.managed.KAFKA,
+                    config_url="path/to/config.yaml")
+
+Available transforms
+====================
+Available transforms are:
+
+- **Kafka Read and Write**
+- **Iceberg Read and Write**
+
+**Note:** inputs and outputs need to be PCollection(s) of Beam
+:py:class:`apache_beam.pvalue.Row` elements.
+
+**Note:** Today, all managed transforms are essentially cross-language
+transforms, and Java's ManagedSchemaTransform is used under the hood.
+"""
+
+from typing import Any
+from typing import Dict
+from typing import Optional
+
+import yaml
+
+from apache_beam.portability.common_urns import ManagedTransforms
+from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import SchemaAwareExternalTransform
+from apache_beam.transforms.ptransform import PTransform
+
+ICEBERG = "iceberg"
+KAFKA = "kafka"
+_MANAGED_IDENTIFIER = "beam:transform:managed:v1"
+_EXPANSION_SERVICE_JAR_TARGETS = {
+    "sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG],
+}
+
+__all__ = ["ICEBERG", "KAFKA", "Read", "Write"]
+
+
+class Read(PTransform):
+  """Read using Managed Transforms"""
+  _READ_TRANSFORMS = {
+      ICEBERG: ManagedTransforms.Urns.ICEBERG_READ.urn,
+      KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn,
+  }
+
+  def __init__(
+      self,
+      source: str,
+      config: Optional[Dict[str, Any]] = None,
+      config_url: Optional[str] = None,
+      expansion_service=None):
+    super().__init__()
+    self._source = source
+    identifier = self._READ_TRANSFORMS.get(source.lower())
+    if not identifier:
+      raise ValueError(
+          f"An unsupported source was specified: '{source}'. Please specify "
+          f"one of the following sources: {list(self._READ_TRANSFORMS.keys())}")
+
+    self._expansion_service = _resolve_expansion_service(
+        source, identifier, expansion_service)
+    self._underlying_identifier = identifier
+    self._yaml_config = yaml.dump(config)
+    self._config_url = config_url
+
+  def expand(self, input):
+    return input | SchemaAwareExternalTransform(
+        identifier=_MANAGED_IDENTIFIER,
+        expansion_service=self._expansion_service,
+        rearrange_based_on_discovery=True,
+        transform_identifier=self._underlying_identifier,
+        config=self._yaml_config,
+        config_url=self._config_url)
+
+  def default_label(self) -> str:
+    return "Managed Read(%s)" % self._source.upper()
+
+
+class Write(PTransform):
+  """Write using Managed Transforms"""
+  _WRITE_TRANSFORMS = {
+      ICEBERG: ManagedTransforms.Urns.ICEBERG_WRITE.urn,
+      KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn,
+  }
+
+  def __init__(
+      self,
+      sink: str,
+      config: Optional[Dict[str, Any]] = None,
+      config_url: Optional[str] = None,
+      expansion_service=None):
+    super().__init__()
+    self._sink = sink
+    identifier = self._WRITE_TRANSFORMS.get(sink.lower())
+    if not identifier:
+      raise ValueError(
+          f"An unsupported sink was specified: '{sink}'. Please specify "
+          f"one of the following sinks: {list(self._WRITE_TRANSFORMS.keys())}")
+
+    self._expansion_service = _resolve_expansion_service(
+        sink, identifier, expansion_service)
+    self._underlying_identifier = identifier
+    self._yaml_config = yaml.dump(config)
+    self._config_url = config_url
+
+  def expand(self, input):
+    return input | SchemaAwareExternalTransform(
+        identifier=_MANAGED_IDENTIFIER,
+        expansion_service=self._expansion_service,
+        rearrange_based_on_discovery=True,
+        transform_identifier=self._underlying_identifier,
+        config=self._yaml_config,
+        config_url=self._config_url)
+
+  def default_label(self) -> str:
+    return "Managed Write(%s)" % self._sink.upper()
+
+
+def _resolve_expansion_service(
+    transform_name: str, identifier: str, expansion_service):
+  if expansion_service:
+    return expansion_service
+
+  default_target = None
+  for gradle_target, transforms in _EXPANSION_SERVICE_JAR_TARGETS.items():
+    if transform_name.lower() in transforms:
+      default_target = gradle_target
+      break
+  if not default_target:
+    raise ValueError(
+        "No expansion service was specified and could not find a "
+        f"default expansion service for {transform_name}: '{identifier}'.")
+  return BeamJarExpansionService(default_target)
diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py
new file mode 100644
index 000000000000..2d7262bac031
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py
@@ -0,0 +1,70 @@
+import os
+import secrets
+import shutil
+import tempfile
+import time
+import unittest
+
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
+@pytest.mark.uses_io_java_expansion_service
+@unittest.skipUnless(
+    os.environ.get('EXPANSION_JARS'),
+    "EXPANSION_JARS environment var is not provided, "
+    "indicating that jars have not been built")
+class ManagedIcebergIT(unittest.TestCase):
+  def setUp(self):
+    self._tempdir = tempfile.mkdtemp()
+    if not os.path.exists(self._tempdir):
+      os.mkdir(self._tempdir)
+    test_warehouse_name = 'test_warehouse_%d_%s' % (
+        int(time.time()), secrets.token_hex(3))
+    self.warehouse_path = os.path.join(self._tempdir, test_warehouse_name)
+    os.mkdir(self.warehouse_path)
+
+  def tearDown(self):
+    shutil.rmtree(self._tempdir, ignore_errors=False)
+
+  def _create_row(self, num: int):
+    return beam.Row(
+        int_=num,
+        str_=str(num),
+        bytes_=bytes(num),
+        bool_=(num % 2 == 0),
+        float_=(num + float(num) / 100))
+
+  def test_write_read_pipeline(self):
+    iceberg_config = {
+        "table": "test.write_read",
+        "catalog_name": "default",
+        "catalog_properties": {
+            "type": "hadoop",
+            "warehouse": f"file://{self.warehouse_path}",
+        }
+    }
+
+    rows = [self._create_row(i) for i in range(100)]
+    expected_dicts = [row.as_dict() for row in rows]
+
+    with beam.Pipeline() as write_pipeline:
+      _ = (
+          write_pipeline
+          | beam.Create(rows)
+          | beam.managed.Write(beam.managed.ICEBERG, config=iceberg_config))
+
+    with beam.Pipeline() as read_pipeline:
+      output_dicts = (
+          read_pipeline
+          | beam.managed.Read(beam.managed.ICEBERG, config=iceberg_config)
+          | beam.Map(lambda row: row._asdict()))
+
+      assert_that(output_dicts, equal_to(expected_dicts))
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 15671eeb145b..b4175ad98e92 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -391,6 +391,7 @@ def get_portability_package_data():
           'sortedcontainers>=2.4.0',
           'typing-extensions>=3.7.0',
           'zstandard>=0.18.0,<1',
+          'pyyaml>=3.12,<7.0.0',
           # Dynamic dependencies must be specified in a separate list, otherwise
           # Dependabot won't be able to parse the main list. Any dynamic
           # dependencies will not receive updates from Dependabot.
@@ -415,7 +416,6 @@ def get_portability_package_data():
               'pandas<2.2.0',
               'parameterized>=0.7.1,<0.10.0',
               'pyhamcrest>=1.9,!=1.10.0,<3.0.0',
-              'pyyaml>=3.12,<7.0.0',
               'requests_mock>=1.7,<2.0',
               'tenacity>=8.0.0,<9',
               'pytest>=7.1.2,<8.0',
@@ -523,7 +523,6 @@ def get_portability_package_data():
           'yaml': [
               'docstring-parser>=0.15,<1.0',
               'jinja2>=3.0,<3.2',
-              'pyyaml>=3.12,<7.0.0',
               'virtualenv-clone>=0.5,<1.0',
               # https://github.com/PiotrDabkowski/Js2Py/issues/317
               'js2py>=0.74,<1; python_version<"3.12"',
diff --git a/sdks/standard_expansion_services.yaml b/sdks/standard_expansion_services.yaml
index ad965c8a1ee3..623e33fe2e7c 100644
--- a/sdks/standard_expansion_services.yaml
+++ b/sdks/standard_expansion_services.yaml
@@ -48,7 +48,7 @@
     # Handwritten Kafka wrappers already exist in apache_beam/io/kafka.py
     - 'beam:schematransform:org.apache.beam:kafka_write:v1'
     - 'beam:schematransform:org.apache.beam:kafka_read:v1'
-    # Not ready to generate
+    # Available through apache_beam.transforms.managed.[Read/Write]
     - 'beam:schematransform:org.apache.beam:iceberg_write:v1'
     - 'beam:schematransform:org.apache.beam:iceberg_read:v1'