Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change ManagedSchemaTransformProvider to take a Row config #30937

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ public class YamlUtils {
.put(Schema.TypeName.BYTES, str -> BaseEncoding.base64().decode(str))
.build();

public static Row toBeamRow(@Nullable String yamlString, Schema schema) {
public static @Nullable Row toBeamRow(@Nullable String yamlString, Schema schema) {
return toBeamRow(yamlString, schema, false);
}

public static Row toBeamRow(
public static @Nullable Row toBeamRow(
@Nullable String yamlString, Schema schema, boolean convertNamesToCamelCase) {
if (yamlString == null || yamlString.isEmpty()) {
List<Field> requiredFields =
schema.getFields().stream()
.filter(field -> !field.getType().getNullable())
.collect(Collectors.toList());
if (requiredFields.isEmpty()) {
return Row.nullRow(schema);
return null;
} else {
throw new IllegalArgumentException(
String.format(
Expand Down Expand Up @@ -147,14 +147,27 @@ public static Row toBeamRow(
}

@SuppressWarnings("nullness")
public static Row toBeamRow(Map<String, Object> yamlMap, Schema rowSchema, boolean toCamelCase) {
public static @Nullable Row toBeamRow(
@Nullable Map<String, Object> map, Schema rowSchema, boolean toCamelCase) {
if (map == null || map.isEmpty()) {
List<Field> requiredFields =
rowSchema.getFields().stream()
.filter(field -> !field.getType().getNullable())
.collect(Collectors.toList());
if (requiredFields.isEmpty()) {
return null;
} else {
throw new IllegalArgumentException(
String.format(
"Received an empty Map, but output schema contains required fields: %s",
requiredFields));
}
}
return rowSchema.getFields().stream()
.map(
field ->
toBeamValue(
field,
yamlMap.get(maybeGetSnakeCase(field.getName(), toCamelCase)),
toCamelCase))
field, map.get(maybeGetSnakeCase(field.getName(), toCamelCase)), toCamelCase))
.collect(toRow(rowSchema));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
package org.apache.beam.sdk.util;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.YamlUtils;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -47,7 +50,7 @@ public String makeNested(String input) {
public void testEmptyYamlString() {
Schema schema = Schema.builder().build();

assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow("", schema));
assertNull(YamlUtils.toBeamRow("", schema));
}

@Test
Expand Down Expand Up @@ -225,4 +228,35 @@ public void testNestedArray() {

assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema));
}

private static final Schema FLAT_SCHEMA_CAMEL_CASE =
Schema.builder()
.addFields(
FLAT_SCHEMA.getFields().stream()
.map(
field ->
field.withName(
CaseFormat.LOWER_UNDERSCORE.to(
CaseFormat.LOWER_CAMEL, field.getName())))
.collect(Collectors.toList()))
.build();

private static final Map<String, Object> FLAT_MAP =
FLAT_SCHEMA.getFields().stream()
.collect(
Collectors.toMap(
Schema.Field::getName,
field -> Preconditions.checkArgumentNotNull(FLAT_ROW.getValue(field.getName()))));

@Test
public void testSnakeCaseMapToCamelCaseRow() {
Row expectedRow =
FLAT_SCHEMA.getFields().stream()
.map(field -> Preconditions.checkStateNotNull(FLAT_ROW.getValue(field.getName())))
.collect(Row.toRow(FLAT_SCHEMA_CAMEL_CASE));

Row convertedRow = YamlUtils.toBeamRow(FLAT_MAP, FLAT_SCHEMA_CAMEL_CASE, true);

assertEquals(expectedRow, convertedRow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.utils.YamlUtils;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
Expand All @@ -36,8 +37,8 @@
*
* <h3>Available transforms</h3>
*
* <p>This API currently supports two operations: {@link Read} and {@link Write}. Each one
* enumerates the available transforms in a {@code TRANSFORMS} map.
* <p>This API currently supports two operations: {@link Managed#read} and {@link Managed#write}.
* Each one enumerates the available transforms in a {@code TRANSFORMS} map.
*
* <h3>Building a Managed turnkey transform</h3>
*
Expand All @@ -48,7 +49,7 @@
* <pre>{@code
* PCollectionRowTuple output = PCollectionRowTuple.empty(pipeline).apply(
* Managed.read(ICEBERG)
* .withConfig(ImmutableMap.<String, Map>.builder()
* .withConfig(ImmutableMap.<String, Object>.builder()
* .put("foo", "abc")
* .put("bar", 123)
* .build()));
Expand Down Expand Up @@ -87,15 +88,14 @@ public class Managed {
.build();

/**
* Instantiates a {@link Managed.Read} transform for the specified source. The supported managed
* sources are:
* Instantiates a {@link Managed.ManagedTransform} transform for the specified source. The
* supported managed sources are:
*
* <ul>
* <li>{@link Managed#ICEBERG} : Read from Apache Iceberg
* </ul>
*/
public static ManagedTransform read(String source) {

return new AutoValue_Managed_ManagedTransform.Builder()
.setIdentifier(
Preconditions.checkNotNull(
Expand All @@ -108,8 +108,8 @@ public static ManagedTransform read(String source) {
}

/**
* Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed
* sinks are:
* Instantiates a {@link Managed.ManagedTransform} transform for the specified sink. The supported
* managed sinks are:
*
* <ul>
* <li>{@link Managed#ICEBERG} : Write to Apache Iceberg
Expand All @@ -131,7 +131,7 @@ public static ManagedTransform write(String sink) {
public abstract static class ManagedTransform extends SchemaTransform {
abstract String getIdentifier();

abstract @Nullable String getConfig();
abstract @Nullable Map<String, Object> getConfig();

abstract @Nullable String getConfigUrl();

Expand All @@ -144,7 +144,7 @@ public abstract static class ManagedTransform extends SchemaTransform {
abstract static class Builder {
abstract Builder setIdentifier(String identifier);

abstract Builder setConfig(@Nullable String config);
abstract Builder setConfig(@Nullable Map<String, Object> config);

abstract Builder setConfigUrl(@Nullable String configUrl);

Expand All @@ -161,7 +161,7 @@ abstract static class Builder {
* SchemaTransformProvider#configurationSchema()}) to see which parameters are available.
*/
public ManagedTransform withConfig(Map<String, Object> config) {
return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build();
return toBuilder().setConfig(config).build();
}

/**
Expand All @@ -179,15 +179,32 @@ ManagedTransform withSupportedIdentifiers(List<String> supportedIdentifiers) {

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
ManagedSchemaTransformProvider provider =
new ManagedSchemaTransformProvider(getSupportedIdentifiers());

SchemaTransformProvider underlyingTransformProvider =
provider.getAllProviders().get(getIdentifier());
if (underlyingTransformProvider == null) {
throw new RuntimeException(
String.format(
"Could not find transform with identifier %s, or it may not be supported.",
getIdentifier()));
}

Row transformConfigRow =
getConfig() != null
? YamlUtils.toBeamRow(
getConfig(), underlyingTransformProvider.configurationSchema(), true)
: null;

ManagedSchemaTransformProvider.ManagedConfig managedConfig =
ManagedSchemaTransformProvider.ManagedConfig.builder()
.setTransformIdentifier(getIdentifier())
.setConfig(getConfig())
.setConfig(transformConfigRow)
.setConfigUrl(getConfigUrl())
.build();

SchemaTransform underlyingTransform =
new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig);
SchemaTransform underlyingTransform = provider.from(managedConfig);

return input.apply(underlyingTransform);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import javax.annotation.Nullable;
Expand All @@ -43,6 +45,7 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;

@AutoService(SchemaTransformProvider.class)
Expand Down Expand Up @@ -90,25 +93,26 @@ public static Builder builder() {
@SchemaFieldDescription("URL path to the YAML config file used to build the underlying IO.")
public abstract @Nullable String getConfigUrl();

@SchemaFieldDescription("YAML string config used to build the underlying IO.")
public abstract @Nullable String getConfig();
@SchemaFieldDescription("Row config used to build the underlying IO.")
public abstract @Nullable Row getConfig();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTransformIdentifier(String identifier);

public abstract Builder setConfigUrl(@Nullable String configUrl);

public abstract Builder setConfig(@Nullable String config);
public abstract Builder setConfig(@Nullable Row config);

public abstract ManagedConfig build();
}

protected void validate() {
boolean configExists = !Strings.isNullOrEmpty(getConfig());
boolean configExists = getConfig() != null;
boolean configUrlExists = !Strings.isNullOrEmpty(getConfigUrl());
List<Boolean> configs = Arrays.asList(configExists, configUrlExists);
checkArgument(
!(configExists && configUrlExists) && (configExists || configUrlExists),
1 == configs.stream().filter(Predicates.equalTo(true)).count(),
"Please specify a config or a config URL, but not both.");
}
}
Expand Down Expand Up @@ -158,25 +162,30 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {

@VisibleForTesting
static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
String transformYamlConfig;
if (!Strings.isNullOrEmpty(config.getConfigUrl())) {
Row transformRowConfig = config.getConfig();
// Attempt to construct a Row config from a YAML file
if (transformRowConfig == null) {
try {
MatchResult.Metadata fileMetaData =
FileSystems.matchSingleFileSpec(Preconditions.checkNotNull(config.getConfigUrl()));
ByteBuffer buffer = ByteBuffer.allocate((int) fileMetaData.sizeBytes());
FileSystems.open(fileMetaData.resourceId()).read(buffer);
transformYamlConfig = new String(buffer.array(), StandardCharsets.UTF_8);
String yamlConfig = new String(buffer.array(), StandardCharsets.UTF_8);
transformRowConfig = YamlUtils.toBeamRow(yamlConfig, transformSchema, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
} else {
transformYamlConfig = config.getConfig();
}

return YamlUtils.toBeamRow(transformYamlConfig, transformSchema, true);
// If our config is still null (perhaps the underlying transform doesn't have any parameters),
// default to an empty row.
if (transformRowConfig == null) {
transformRowConfig = Row.nullRow(transformSchema);
}

return transformRowConfig;
}

@VisibleForTesting
Map<String, SchemaTransformProvider> getAllProviders() {
return schemaTransformProviders;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,27 @@ public void testFailWhenNoConfigSpecified() {
}

@Test
public void testGetRowFromYamlConfig() {
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
public void testGetConfigRow() {
Schema underlyingTransformSchema = new TestSchemaTransformProvider().configurationSchema();
Row configRow =
Row.withSchema(underlyingTransformSchema)
.withFieldValue("extraString", "abc")
.withFieldValue("extraInteger", 123)
.build();
ManagedConfig config =
ManagedConfig.builder()
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
.setConfig(yamlString)
.setConfig(configRow)
.build();
Schema configSchema = new TestSchemaTransformProvider().configurationSchema();
Row expectedRow =
Row.withSchema(configSchema)
.withFieldValue("extraString", "abc")
.withFieldValue("extraInteger", 123)
.build();
Row configRow =
ManagedSchemaTransformProvider.getRowConfig(
config, new TestSchemaTransformProvider().configurationSchema());

assertEquals(expectedRow, configRow);
Row returnedRow =
ManagedSchemaTransformProvider.getRowConfig(config, underlyingTransformSchema);

assertEquals(configRow, returnedRow);
}

@Test
public void testGetRowFromConfigUrl() throws URISyntaxException {
public void testGetConfigRowFromYamlFile() throws URISyntaxException {
String yamlConfigPath =
Paths.get(getClass().getClassLoader().getResource("test_config.yaml").toURI())
.toFile()
Expand Down
Loading