From 905d59045a22df9167aa479dbf5d41a50c0fe479 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 11 Apr 2024 18:33:02 -0400 Subject: [PATCH 1/3] Change ManagedSchemaTransformProvider to take a Row config instead of a Yaml string --- .../beam/sdk/schemas/utils/YamlUtils.java | 27 ++++++++--- .../apache/beam/sdk/util/YamlUtilsTest.java | 36 ++++++++++++++- .../org/apache/beam/sdk/managed/Managed.java | 45 +++++++++++++------ .../ManagedSchemaTransformProvider.java | 33 +++++++++----- .../ManagedSchemaTransformProviderTest.java | 27 ++++++----- 5 files changed, 120 insertions(+), 48 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java index 5c05b2bed396..5571c9e4fed6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -53,11 +53,11 @@ public class YamlUtils { .put(Schema.TypeName.BYTES, str -> BaseEncoding.base64().decode(str)) .build(); - public static Row toBeamRow(@Nullable String yamlString, Schema schema) { + public static @Nullable Row toBeamRow(@Nullable String yamlString, Schema schema) { return toBeamRow(yamlString, schema, false); } - public static Row toBeamRow( + public static @Nullable Row toBeamRow( @Nullable String yamlString, Schema schema, boolean convertNamesToCamelCase) { if (yamlString == null || yamlString.isEmpty()) { List requiredFields = @@ -65,7 +65,7 @@ public static Row toBeamRow( .filter(field -> !field.getType().getNullable()) .collect(Collectors.toList()); if (requiredFields.isEmpty()) { - return Row.nullRow(schema); + return null; } else { throw new IllegalArgumentException( String.format( @@ -147,14 +147,27 @@ public static Row toBeamRow( } @SuppressWarnings("nullness") - public static Row toBeamRow(Map yamlMap, Schema rowSchema, boolean toCamelCase) { + public static @Nullable Row toBeamRow( + @Nullable Map map, Schema rowSchema, boolean toCamelCase) { + if (map == null || map.isEmpty()) { + List requiredFields = + rowSchema.getFields().stream() + .filter(field -> !field.getType().getNullable()) + .collect(Collectors.toList()); + if (requiredFields.isEmpty()) { + return null; + } else { + throw new IllegalArgumentException( + String.format( + "Received an empty Map, but output schema contains required fields: %s", + requiredFields)); + } + } return rowSchema.getFields().stream() .map( field -> toBeamValue( - field, - yamlMap.get(maybeGetSnakeCase(field.getName(), toCamelCase)), - toCamelCase)) + field, map.get(maybeGetSnakeCase(field.getName(), toCamelCase)), toCamelCase)) .collect(toRow(rowSchema)); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java index 6e6984dde3a6..01cf784f6298 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java @@ -18,14 +18,17 @@ package org.apache.beam.sdk.util; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import java.math.BigDecimal; import java.util.Arrays; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; import org.junit.Rule; import org.junit.Test; @@ -47,7 +50,7 @@ public String makeNested(String input) { public void testEmptyYamlString() { Schema schema = Schema.builder().build(); - assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow("", schema)); + assertNull(YamlUtils.toBeamRow("", schema)); } @Test @@ -225,4 +228,35 @@ public void testNestedArray() { assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema)); } + + private static final Schema FLAT_SCHEMA_CAMEL_CASE = + Schema.builder() + .addFields( + FLAT_SCHEMA.getFields().stream() + .map( + field -> + field.withName( + CaseFormat.LOWER_UNDERSCORE.to( + CaseFormat.LOWER_CAMEL, field.getName()))) + .collect(Collectors.toList())) + .build(); + + private static final Map FLAT_MAP = + FLAT_SCHEMA.getFields().stream() + .collect( + Collectors.toMap( + Schema.Field::getName, + field -> Preconditions.checkArgumentNotNull(FLAT_ROW.getValue(field.getName())))); + + @Test + public void testSnakeCaseMapToCamelCaseRow() { + Row expectedRow = + FLAT_SCHEMA.getFields().stream() + .map(field -> Preconditions.checkStateNotNull(FLAT_ROW.getValue(field.getName()))) + .collect(Row.toRow(FLAT_SCHEMA_CAMEL_CASE)); + + Row convertedRow = YamlUtils.toBeamRow(FLAT_MAP, FLAT_SCHEMA_CAMEL_CASE, true); + + assertEquals(expectedRow, convertedRow); + } } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java index b2b010b1e434..d746de306f55 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -36,8 +37,8 @@ * *

Available transforms

* - *

This API currently supports two operations: {@link Read} and {@link Write}. Each one - * enumerates the available transforms in a {@code TRANSFORMS} map. + *

This API currently supports two operations: {@link Managed#read} and {@link Managed#write}. + * Each one enumerates the available transforms in a {@code TRANSFORMS} map. * *

Building a Managed turnkey transform

* @@ -48,7 +49,7 @@ *
{@code
  * PCollectionRowTuple output = PCollectionRowTuple.empty(pipeline).apply(
  *       Managed.read(ICEBERG)
- *           .withConfig(ImmutableMap..builder()
+ *           .withConfig(ImmutableMap..builder()
  *               .put("foo", "abc")
  *               .put("bar", 123)
  *               .build()));
@@ -87,15 +88,14 @@ public class Managed {
           .build();
 
   /**
-   * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed
-   * sources are:
+   * Instantiates a {@link Managed.ManagedTransform} transform for the specified source. The
+   * supported managed sources are:
    *
    * 
    *
  • {@link Managed#ICEBERG} : Read from Apache Iceberg *
*/ public static ManagedTransform read(String source) { - return new AutoValue_Managed_ManagedTransform.Builder() .setIdentifier( Preconditions.checkNotNull( @@ -108,8 +108,8 @@ public static ManagedTransform read(String source) { } /** - * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed - * sinks are: + * Instantiates a {@link Managed.ManagedTransform} transform for the specified sink. The supported + * managed sinks are: * *
    *
  • {@link Managed#ICEBERG} : Write to Apache Iceberg @@ -131,7 +131,7 @@ public static ManagedTransform write(String sink) { public abstract static class ManagedTransform extends SchemaTransform { abstract String getIdentifier(); - abstract @Nullable String getConfig(); + abstract @Nullable Map getConfig(); abstract @Nullable String getConfigUrl(); @@ -144,7 +144,7 @@ public abstract static class ManagedTransform extends SchemaTransform { abstract static class Builder { abstract Builder setIdentifier(String identifier); - abstract Builder setConfig(@Nullable String config); + abstract Builder setConfig(@Nullable Map config); abstract Builder setConfigUrl(@Nullable String configUrl); @@ -161,7 +161,7 @@ abstract static class Builder { * SchemaTransformProvider#configurationSchema()}) to see which parameters are available. */ public ManagedTransform withConfig(Map config) { - return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); + return toBuilder().setConfig(config).build(); } /** @@ -179,15 +179,32 @@ ManagedTransform withSupportedIdentifiers(List supportedIdentifiers) { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { + ManagedSchemaTransformProvider provider = + new ManagedSchemaTransformProvider(getSupportedIdentifiers()); + + SchemaTransformProvider underlyingTransformProvider = + provider.getAllProviders().get(getIdentifier()); + if (underlyingTransformProvider == null) { + throw new RuntimeException( + String.format( + "Could not find transform with identifier %s, or it may not be supported.", + getIdentifier())); + } + + Row transformConfigRow = + getConfig() != null + ? YamlUtils.toBeamRow( + getConfig(), underlyingTransformProvider.configurationSchema(), true) + : null; + ManagedSchemaTransformProvider.ManagedConfig managedConfig = ManagedSchemaTransformProvider.ManagedConfig.builder() .setTransformIdentifier(getIdentifier()) - .setConfig(getConfig()) + .setConfig(transformConfigRow) .setConfigUrl(getConfigUrl()) .build(); - SchemaTransform underlyingTransform = - new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig); + SchemaTransform underlyingTransform = provider.from(managedConfig); return input.apply(underlyingTransform); } diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index 1ee2b11a90ff..f64f71a7259b 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -24,8 +24,10 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.ServiceLoader; import javax.annotation.Nullable; @@ -41,6 +43,7 @@ import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; @@ -90,8 +93,8 @@ public static Builder builder() { @SchemaFieldDescription("URL path to the YAML config file used to build the underlying IO.") public abstract @Nullable String getConfigUrl(); - @SchemaFieldDescription("YAML string config used to build the underlying IO.") - public abstract @Nullable String getConfig(); + @SchemaFieldDescription("Row config used to build the underlying IO.") + public abstract @Nullable Row getConfig(); @AutoValue.Builder public abstract static class Builder { @@ -99,16 +102,17 @@ public abstract static class Builder { public abstract Builder setConfigUrl(@Nullable String configUrl); - public abstract Builder setConfig(@Nullable String config); + public abstract Builder setConfig(@Nullable Row config); public abstract ManagedConfig build(); } protected void validate() { - boolean configExists = !Strings.isNullOrEmpty(getConfig()); + boolean configExists = getConfig() != null; boolean configUrlExists = !Strings.isNullOrEmpty(getConfigUrl()); + List configs = Arrays.asList(configExists, configUrlExists); checkArgument( - !(configExists && configUrlExists) && (configExists || configUrlExists), + 1 == configs.stream().filter(Predicates.equalTo(true)).count(), "Please specify a config or a config URL, but not both."); } } @@ -158,25 +162,30 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { @VisibleForTesting static Row getRowConfig(ManagedConfig config, Schema transformSchema) { - String transformYamlConfig; - if (!Strings.isNullOrEmpty(config.getConfigUrl())) { + Row transformRowConfig = config.getConfig(); + // Attempt to construct a Row config from a YAML file + if (transformRowConfig == null) { try { MatchResult.Metadata fileMetaData = FileSystems.matchSingleFileSpec(Preconditions.checkNotNull(config.getConfigUrl())); ByteBuffer buffer = ByteBuffer.allocate((int) fileMetaData.sizeBytes()); FileSystems.open(fileMetaData.resourceId()).read(buffer); - transformYamlConfig = new String(buffer.array(), StandardCharsets.UTF_8); + String yamlConfig = new String(buffer.array(), StandardCharsets.UTF_8); + transformRowConfig = YamlUtils.toBeamRow(yamlConfig, transformSchema, true); } catch (IOException e) { throw new RuntimeException(e); } - } else { - transformYamlConfig = config.getConfig(); } - return YamlUtils.toBeamRow(transformYamlConfig, transformSchema, true); + // If our config is still null (perhaps the underlying transform doesn't have any parameters), + // default to an empty row. + if (transformRowConfig == null) { + transformRowConfig = Row.nullRow(transformSchema); + } + + return transformRowConfig; } - @VisibleForTesting Map getAllProviders() { return schemaTransformProviders; } diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java index 0c495d0d2c5c..32c858e6497e 100644 --- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java @@ -49,28 +49,27 @@ public void testFailWhenNoConfigSpecified() { } @Test - public void testGetRowFromYamlConfig() { - String yamlString = "extra_string: abc\n" + "extra_integer: 123"; + public void testGetConfigRow() { + Schema underlyingTransformSchema = new TestSchemaTransformProvider().configurationSchema(); + Row configRow = + Row.withSchema(underlyingTransformSchema) + .withFieldValue("extraString", "abc") + .withFieldValue("extraInteger", 123) + .build(); ManagedConfig config = ManagedConfig.builder() .setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER) - .setConfig(yamlString) + .setConfig(configRow) .build(); - Schema configSchema = new TestSchemaTransformProvider().configurationSchema(); - Row expectedRow = - Row.withSchema(configSchema) - .withFieldValue("extraString", "abc") - .withFieldValue("extraInteger", 123) - .build(); - Row configRow = - ManagedSchemaTransformProvider.getRowConfig( - config, new TestSchemaTransformProvider().configurationSchema()); - assertEquals(expectedRow, configRow); + Row returnedRow = + ManagedSchemaTransformProvider.getRowConfig(config, underlyingTransformSchema); + + assertEquals(configRow, returnedRow); } @Test - public void testGetRowFromConfigUrl() throws URISyntaxException { + public void testGetConfigRowFromYamlFile() throws URISyntaxException { String yamlConfigPath = Paths.get(getClass().getClassLoader().getResource("test_config.yaml").toURI()) .toFile() From 6db699a41b1ab5aeafdc4784d674164dc19b2c6a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 11 Apr 2024 18:53:21 -0400 Subject: [PATCH 2/3] spotless --- .../apache/beam/sdk/managed/ManagedSchemaTransformProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index f64f71a7259b..ddbc10b5e694 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -43,7 +43,7 @@ import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Predicates; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; From 301e388ea6a229bd91997727f6926746268b2314 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 11 Apr 2024 19:50:09 -0400 Subject: [PATCH 3/3] spotless --- .../apache/beam/sdk/managed/ManagedSchemaTransformProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java index ddbc10b5e694..3ea6a4bd8470 100644 --- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -43,9 +43,9 @@ import org.apache.beam.sdk.schemas.utils.YamlUtils; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; @AutoService(SchemaTransformProvider.class)