diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json new file mode 100644 index 000000000000..a03c067d2c4e --- /dev/null +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -0,0 +1,3 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run" +} diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 32747f4efb17..78a6b371e6f0 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -692,7 +692,9 @@ class BeamModulePlugin implements Plugin { aws_java_sdk2_profiles : "software.amazon.awssdk:profiles:$aws_java_sdk2_version", azure_sdk_bom : "com.azure:azure-sdk-bom:1.2.14", bigdataoss_gcsio : "com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version", + bigdataoss_gcs_connector : "com.google.cloud.bigdataoss:gcs-connector:hadoop2-$google_cloud_bigdataoss_version", bigdataoss_util : "com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version", + bigdataoss_util_hadoop : "com.google.cloud.bigdataoss:util-hadoop:hadoop2-$google_cloud_bigdataoss_version", byte_buddy : "net.bytebuddy:byte-buddy:1.14.12", cassandra_driver_core : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version", cassandra_driver_mapping : "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version", diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ExternalTranslationOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ExternalTranslationOptions.java index 5ed14faf31ee..f444d2e3e301 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ExternalTranslationOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ExternalTranslationOptions.java @@ -25,7 +25,9 @@ public interface ExternalTranslationOptions extends PipelineOptions { - @Description("Set of URNs of transforms to be overriden using the transform service.") + @Description( + "Set of URNs of transforms to be overriden using the transform service. The provided strings " + + "can be transform URNs of schema-transform IDs") @Default.InstanceFactory(EmptyListDefault.class) List getTransformsToOverride(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java index e2b6d95057fd..3d9142723737 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java @@ -535,7 +535,7 @@ public RunnerApi.PTransform translate( if (underlyingIdentifier == null) { throw new IllegalStateException( String.format( - "Encountered a Managed Transform that has an empty \"transform_identifier\": \n%s", + "Encountered a Managed Transform that has an empty \"transform_identifier\": %n%s", configRow)); } transformBuilder.putAnnotations( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java index 941a5daf689b..f88d55a1f5cf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/TransformUpgrader.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.util.construction; +import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM; + import com.fasterxml.jackson.core.Version; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -51,6 +53,7 @@ import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannelBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; @@ -113,6 +116,22 @@ public RunnerApi.Pipeline upgradeTransformsViaTransformService( if (urn != null && urnsToOverride.contains(urn)) { return true; } + + // Also check if the URN is a schema-transform ID. + if (urn.equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) { + try { + ExternalTransforms.SchemaTransformPayload schemaTransformPayload = + ExternalTransforms.SchemaTransformPayload.parseFrom( + entry.getValue().getSpec().getPayload()); + String schemaTransformId = schemaTransformPayload.getIdentifier(); + if (urnsToOverride.contains(schemaTransformId)) { + return true; + } + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + } + return false; }) .map( @@ -184,20 +203,27 @@ RunnerApi.Pipeline updateTransformViaTransformService( if (transformToUpgrade == null) { throw new IllegalArgumentException("Could not find a transform with the ID " + transformId); } - ByteString configRowBytes = - transformToUpgrade.getAnnotationsOrThrow( - BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_KEY)); - ByteString configRowSchemaBytes = - transformToUpgrade.getAnnotationsOrThrow( - BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_SCHEMA_KEY)); - SchemaApi.Schema configRowSchemaProto = - SchemaApi.Schema.parseFrom(configRowSchemaBytes.toByteArray()); - - ExternalTransforms.ExternalConfigurationPayload payload = - ExternalTransforms.ExternalConfigurationPayload.newBuilder() - .setSchema(configRowSchemaProto) - .setPayload(configRowBytes) - .build(); + + byte[] payloadBytes = null; + + if (!transformToUpgrade.getSpec().getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) { + ByteString configRowBytes = + transformToUpgrade.getAnnotationsOrThrow( + BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_KEY)); + ByteString configRowSchemaBytes = + transformToUpgrade.getAnnotationsOrThrow( + BeamUrns.getConstant(ExternalTransforms.Annotations.Enum.CONFIG_ROW_SCHEMA_KEY)); + SchemaApi.Schema configRowSchemaProto = + SchemaApi.Schema.parseFrom(configRowSchemaBytes.toByteArray()); + payloadBytes = + ExternalTransforms.ExternalConfigurationPayload.newBuilder() + .setSchema(configRowSchemaProto) + .setPayload(configRowBytes) + .build() + .toByteArray(); + } else { + payloadBytes = transformToUpgrade.getSpec().getPayload().toByteArray(); + } RunnerApi.PTransform.Builder ptransformBuilder = RunnerApi.PTransform.newBuilder() @@ -205,7 +231,7 @@ RunnerApi.Pipeline updateTransformViaTransformService( .setSpec( RunnerApi.FunctionSpec.newBuilder() .setUrn(transformToUpgrade.getSpec().getUrn()) - .setPayload(ByteString.copyFrom(payload.toByteArray())) + .setPayload(ByteString.copyFrom(payloadBytes)) .build()); for (Map.Entry entry : transformToUpgrade.getInputsMap().entrySet()) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java index 65b3e8e89cad..a54383b46e7c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TransformUpgraderTest.java @@ -39,16 +39,22 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformTranslation.SchemaTransformPayloadTranslator; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.ToString; import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.Test; import org.junit.runner.RunWith; @@ -158,6 +164,60 @@ public static class Registrar2 implements TransformPayloadTranslatorRegistrar { } } + public static class TestSchemaTransformProvider implements SchemaTransformProvider { + + @Override + public String identifier() { + return "dummy_schema_transform"; + } + + @Override + public Schema configurationSchema() { + return Schema.builder().build(); + } + + @Override + public SchemaTransform from(Row configuration) { + return new TestSchemaTransform(); + } + } + + public static class TestSchemaTransform extends SchemaTransform { + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + return input; + } + } + + static class TestSchemaTransformTranslator + extends SchemaTransformPayloadTranslator { + @Override + public SchemaTransformProvider provider() { + return new TestSchemaTransformProvider(); + } + + @Override + public Row toConfigRow(TestSchemaTransform transform) { + return Row.withSchema(Schema.builder().build()).build(); + } + } + + @AutoService(TransformPayloadTranslatorRegistrar.class) + public static class TestSchemaTransformPayloadTranslatorRegistrar + implements TransformPayloadTranslatorRegistrar { + @Override + @SuppressWarnings({ + "rawtypes", + }) + public Map, ? extends TransformPayloadTranslator> + getTransformPayloadTranslators() { + return ImmutableMap., TransformPayloadTranslator>builder() + .put(TestSchemaTransform.class, new TestSchemaTransformTranslator()) + .build(); + } + } + static class TestExpansionServiceClientFactory implements ExpansionServiceClientFactory { ExpansionApi.ExpansionResponse response; @@ -183,6 +243,18 @@ public ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest reque .getTransformsMap() .get("TransformUpgraderTest-TestTransform2"); } + + boolean schemaTransformTest = false; + if (transformToUpgrade == null) { + // This is running a schema-transform test. + transformToUpgrade = + request + .getComponents() + .getTransformsMap() + .get("TransformUpgraderTest-TestSchemaTransform"); + schemaTransformTest = true; + } + if (!transformToUpgrade .getSpec() .getUrn() @@ -190,27 +262,30 @@ public ExpansionApi.ExpansionResponse expand(ExpansionApi.ExpansionRequest reque throw new RuntimeException("Could not find a valid transform to upgrade"); } - Integer oldParam; - try { - ByteArrayInputStream byteArrayInputStream = - new ByteArrayInputStream(transformToUpgrade.getSpec().getPayload().toByteArray()); - ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); - oldParam = (Integer) objectInputStream.readObject(); - } catch (Exception e) { - throw new RuntimeException(e); - } - RunnerApi.PTransform.Builder upgradedTransform = transformToUpgrade.toBuilder(); FunctionSpec.Builder specBuilder = upgradedTransform.getSpecBuilder(); - ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream(); - try { - ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteStringOutputStream); - objectOutputStream.writeObject(oldParam * 2); - objectOutputStream.flush(); - specBuilder.setPayload(byteStringOutputStream.toByteString()); - } catch (IOException e) { - throw new RuntimeException(e); + if (!schemaTransformTest) { + Integer oldParam; + try { + ByteArrayInputStream byteArrayInputStream = + new ByteArrayInputStream(transformToUpgrade.getSpec().getPayload().toByteArray()); + ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); + oldParam = (Integer) objectInputStream.readObject(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + ByteStringOutputStream byteStringOutputStream = new ByteStringOutputStream(); + try { + ObjectOutputStream objectOutputStream = + new ObjectOutputStream(byteStringOutputStream); + objectOutputStream.writeObject(oldParam * 2); + objectOutputStream.flush(); + specBuilder.setPayload(byteStringOutputStream.toByteString()); + } catch (IOException e) { + throw new RuntimeException(e); + } } upgradedTransform.setSpec(specBuilder.build()); @@ -291,6 +366,34 @@ public void testTransformUpgrade() throws Exception { assertTrue(upgradedTransform.getAnnotationsMap().containsKey(TransformUpgrader.UPGRADE_KEY)); } + @Test + public void testTransformUpgradeSchemaTransform() throws Exception { + Pipeline pipeline = Pipeline.create(); + + // Build the pipeline + PCollectionRowTuple.empty(pipeline).apply(new TestSchemaTransform()); + + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, false); + ExternalTranslationOptions options = + PipelineOptionsFactory.create().as(ExternalTranslationOptions.class); + List urnsToOverride = ImmutableList.of("dummy_schema_transform"); + options.setTransformsToOverride(urnsToOverride); + options.setTransformServiceAddress("dummyaddress"); + + RunnerApi.Pipeline upgradedPipelineProto = + TransformUpgrader.of(new TestExpansionServiceClientFactory()) + .upgradeTransformsViaTransformService(pipelineProto, urnsToOverride, options); + + RunnerApi.PTransform upgradedTransform = + upgradedPipelineProto + .getComponents() + .getTransformsMap() + .get("TransformUpgraderTest-TestSchemaTransform"); + + // Confirm that the upgraded transform includes the upgrade annotation. + assertTrue(upgradedTransform.getAnnotationsMap().containsKey(TransformUpgrader.UPGRADE_KEY)); + } + @Test public void testTransformUpgradeMultipleOccurrences() throws Exception { Pipeline pipeline = Pipeline.create(); diff --git a/sdks/java/expansion-service/container/expansion_service_config.yml b/sdks/java/expansion-service/container/expansion_service_config.yml index 653629aa153e..4f48efd59478 100644 --- a/sdks/java/expansion-service/container/expansion_service_config.yml +++ b/sdks/java/expansion-service/container/expansion_service_config.yml @@ -17,6 +17,9 @@ allowlist: - "beam:transform:org.apache.beam:schemaio_jdbc_read:v1" - "beam:transform:org.apache.beam:schemaio_jdbc_write:v1" - "beam:schematransform:org.apache.beam:bigquery_storage_write:v1" +# By default, the Expansion Service container will include all dependencies in +# the classpath. Following config can be used to override this behavior per +# transform URN or schema-transform ID. dependencies: "beam:transform:org.apache.beam:kafka_read_with_metadata:v1": - path: "jars/beam-sdks-java-io-expansion-service.jar" diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index 5d4364511786..770da14fa1cf 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -81,6 +81,7 @@ import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.SplittableParDo; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; @@ -246,6 +247,11 @@ private static class TransformProviderForPayloadTranslator< private final TransformPayloadTranslator> payloadTranslator; + // Returns true if the underlying transform represented by this is a schema-aware transform. + private boolean isSchemaTransform() { + return (payloadTranslator instanceof SchemaTransformPayloadTranslator); + } + private TransformProviderForPayloadTranslator( TransformPayloadTranslator> payloadTranslator) { this.payloadTranslator = payloadTranslator; @@ -254,28 +260,51 @@ private TransformProviderForPayloadTranslator( @Override public PTransform getTransform( RunnerApi.FunctionSpec spec, PipelineOptions options) { - try { - ExternalConfigurationPayload payload = - ExternalConfigurationPayload.parseFrom(spec.getPayload()); - Row configRow = - RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema())) - .decode(new ByteArrayInputStream(payload.getPayload().toByteArray())); - PTransform transformFromRow = payloadTranslator.fromConfigRow(configRow, options); - if (transformFromRow != null) { - return transformFromRow; - } else { + if (isSchemaTransform()) { + return ExpansionServiceSchemaTransformProvider.of().getTransform(spec, options); + } else { + try { + ExternalConfigurationPayload payload = + ExternalConfigurationPayload.parseFrom(spec.getPayload()); + Row configRow = + RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema())) + .decode(new ByteArrayInputStream(payload.getPayload().toByteArray())); + PTransform transformFromRow = payloadTranslator.fromConfigRow(configRow, options); + if (transformFromRow != null) { + return transformFromRow; + } else { + throw new RuntimeException( + String.format( + "A transform cannot be initiated using the provided config row %s and the" + + " TransformPayloadTranslator %s", + configRow, payloadTranslator)); + } + } catch (Exception e) { throw new RuntimeException( String.format( - "A transform cannot be initiated using the provided config row %s and the" - + " TransformPayloadTranslator %s", - configRow, payloadTranslator)); + "Failed to build transform %s from spec %s: %s", + spec.getUrn(), spec, e.getMessage()), + e); } - } catch (Exception e) { - throw new RuntimeException( - String.format( - "Failed to build transform %s from spec %s: %s", - spec.getUrn(), spec, e.getMessage()), - e); + } + } + + @Override + public InputT createInput(Pipeline p, Map> inputs) { + if (isSchemaTransform()) { + return (InputT) ExpansionServiceSchemaTransformProvider.of().createInput(p, inputs); + } else { + return TransformProvider.super.createInput(p, inputs); + } + } + + @Override + public Map> extractOutputs(OutputT output) { + if (isSchemaTransform()) { + return ExpansionServiceSchemaTransformProvider.of() + .extractOutputs((PCollectionRowTuple) output); + } else { + return TransformProvider.super.extractOutputs(output); } } diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index e9fe4c1fe630..95a843e51fd1 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -43,6 +43,11 @@ dependencies { permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761 implementation project(":sdks:java:io:kafka:upgrade") permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761 + + // Needed by Iceberg I/O users that use GCS for the warehouse location. + implementation library.java.bigdataoss_gcs_connector + permitUnusedDeclared library.java.bigdataoss_gcs_connector + runtimeOnly library.java.kafka_clients runtimeOnly library.java.slf4j_jdk14 } diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 765fa4948752..7965cde86e7d 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -57,8 +57,8 @@ dependencies { testImplementation library.java.hadoop_client testImplementation library.java.bigdataoss_gcsio - testImplementation "com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.16" - testImplementation "com.google.cloud.bigdataoss:util-hadoop:hadoop2-2.2.16" + testImplementation library.java.bigdataoss_gcs_connector + testImplementation library.java.bigdataoss_util_hadoop testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 54e1404c650c..0702137cffd3 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -63,28 +63,19 @@ public String identifier() { return "beam:transform:managed:v1"; } - private final Map schemaTransformProviders = new HashMap<>(); + // Use 'getAllProviders()' instead of this cache. + private final Map schemaTransformProvidersCache = + new HashMap<>(); + private boolean providersCached = false; - public ManagedSchemaTransformProvider() {} + private @Nullable Collection supportedIdentifiers; + + public ManagedSchemaTransformProvider() { + this(null); + } ManagedSchemaTransformProvider(@Nullable Collection supportedIdentifiers) { - try { - for (SchemaTransformProvider schemaTransformProvider : - ServiceLoader.load(SchemaTransformProvider.class)) { - if (schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) { - throw new IllegalArgumentException( - "Found multiple SchemaTransformProvider implementations with the same identifier " - + schemaTransformProvider.identifier()); - } - if (supportedIdentifiers == null - || supportedIdentifiers.contains(schemaTransformProvider.identifier())) { - schemaTransformProviders.put( - schemaTransformProvider.identifier(), schemaTransformProvider); - } - } - } catch (Exception e) { - throw new RuntimeException(e.getMessage()); - } + this.supportedIdentifiers = supportedIdentifiers; } @DefaultSchema(AutoValueSchema.class) @@ -149,7 +140,7 @@ protected SchemaTransform from(ManagedConfig managedConfig) { managedConfig.validate(); SchemaTransformProvider schemaTransformProvider = Preconditions.checkNotNull( - schemaTransformProviders.get(managedConfig.getTransformIdentifier()), + getAllProviders().get(managedConfig.getTransformIdentifier()), "Could not find a transform with the identifier " + "%s. This could be either due to the dependency with the " + "transform not being available in the classpath or due to " @@ -236,8 +227,36 @@ static Row getRowConfig(ManagedConfig config, Schema transformSchema) { return YamlUtils.toBeamRow(configMap, transformSchema, false); } - Map getAllProviders() { - return schemaTransformProviders; + // We load providers seperately, after construction, to prevent the + // 'ManagedSchemaTransformProvider' from being initialized in a recursive loop + // when being loaded using 'AutoValue'. + synchronized Map getAllProviders() { + if (this.providersCached) { + return schemaTransformProvidersCache; + } + try { + for (SchemaTransformProvider schemaTransformProvider : + ServiceLoader.load(SchemaTransformProvider.class)) { + if (schemaTransformProvidersCache.containsKey(schemaTransformProvider.identifier())) { + throw new IllegalArgumentException( + "Found multiple SchemaTransformProvider implementations with the same identifier " + + schemaTransformProvider.identifier()); + } + if (supportedIdentifiers == null + || supportedIdentifiers.contains(schemaTransformProvider.identifier())) { + if (schemaTransformProvider.identifier().equals("beam:transform:managed:v1")) { + // Prevent recursively adding the 'ManagedSchemaTransformProvider'. + continue; + } + schemaTransformProvidersCache.put( + schemaTransformProvider.identifier(), schemaTransformProvider); + } + } + this.providersCached = true; + return schemaTransformProvidersCache; + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } } // TODO: set global snake_case naming convention and remove these special cases