Skip to content

Commit

Permalink
Revert global snake_case convention for SchemaTransforms (#31109)
Browse files Browse the repository at this point in the history
* revert global snake_case convention and make it a special case for iceberg and managed

* remove docs and comments too

* cleanup

* revert python and yaml changes too

* fix test
  • Loading branch information
ahmedabu98 authored Apr 26, 2024
1 parent 088c854 commit b33a843
Show file tree
Hide file tree
Showing 17 changed files with 155 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,17 @@
*/
package org.apache.beam.sdk.schemas.transforms;

import static org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.lang.reflect.ParameterizedType;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.values.Row;

Expand All @@ -45,9 +41,6 @@
* {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used produce a {@link
* SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration Schema.
*
* <p>NOTE: The inferred field names in the configuration {@link Schema} and {@link Row} follow the
* {@code snake_case} naming convention.
*
* <p><b>Internal only:</b> This interface is actively being worked on and it will likely change as
* we provide implementations for more standard Beam transforms. We provide no backwards
* compatibility guarantees and it should not be implemented outside of the Beam repository.
Expand Down Expand Up @@ -85,11 +78,10 @@ Optional<List<String>> dependencies(ConfigT configuration, PipelineOptions optio
}

@Override
public final Schema configurationSchema() {
public Schema configurationSchema() {
try {
// Sort the fields by name to ensure a consistent schema is produced
// We also establish a `snake_case` convention for all SchemaTransform configurations
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted().toSnakeCase();
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for "
Expand All @@ -98,12 +90,9 @@ public final Schema configurationSchema() {
}
}

/**
* Produces a {@link SchemaTransform} from a Row configuration. Row fields are expected to have
* `snake_case` naming convention.
*/
/** Produces a {@link SchemaTransform} from a Row configuration. */
@Override
public final SchemaTransform from(Row configuration) {
public SchemaTransform from(Row configuration) {
return from(configFromRow(configuration));
}

Expand All @@ -114,20 +103,9 @@ public final Optional<List<String>> dependencies(Row configuration, PipelineOpti

private ConfigT configFromRow(Row configuration) {
try {
SchemaRegistry registry = SchemaRegistry.createDefault();

// Configuration objects handled by the AutoValueSchema provider will expect Row fields with
// camelCase naming convention
SchemaProvider schemaProvider = registry.getSchemaProvider(configurationClass());
if (schemaProvider.getClass().equals(DefaultSchemaProvider.class)
&& checkNotNull(
((DefaultSchemaProvider) schemaProvider)
.getUnderlyingSchemaProvider(configurationClass()))
.getClass()
.equals(AutoValueSchema.class)) {
configuration = configuration.toCamelCase();
}
return registry.getFromRowFunction(configurationClass()).apply(configuration);
return SchemaRegistry.createDefault()
.getFromRowFunction(configurationClass())
.apply(configuration);
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for " + identifier() + "SchemaTransformProvider's config");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ public void testFrom() {

Row inputConfig =
Row.withSchema(provider.configurationSchema())
.withFieldValue("string_field", "field1")
.withFieldValue("integer_field", Integer.valueOf(13))
.withFieldValue("stringField", "field1")
.withFieldValue("integerField", Integer.valueOf(13))
.build();

Configuration outputConfig = ((FakeSchemaTransform) provider.from(inputConfig)).config;
Expand All @@ -150,8 +150,8 @@ public void testDependencies() {
SchemaTransformProvider provider = new FakeTypedSchemaIOProvider();
Row inputConfig =
Row.withSchema(provider.configurationSchema())
.withFieldValue("string_field", "field1")
.withFieldValue("integer_field", Integer.valueOf(13))
.withFieldValue("stringField", "field1")
.withFieldValue("integerField", Integer.valueOf(13))
.build();

assertEquals(Arrays.asList("field1", "13"), provider.dependencies(inputConfig, null).get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
Expand Down Expand Up @@ -131,4 +132,15 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
}

// TODO: set global snake_case naming convention and remove these special cases
@Override
public SchemaTransform from(Row rowConfig) {
return super.from(rowConfig.toCamelCase());
}

@Override
public Schema configurationSchema() {
return super.configurationSchema().toSnakeCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,15 @@ public Row apply(KV<String, SnapshotInfo> input) {
}
}
}

// TODO: set global snake_case naming convention and remove these special cases
@Override
public SchemaTransform from(Row rowConfig) {
return super.from(rowConfig.toCamelCase());
}

@Override
public Schema configurationSchema() {
return super.configurationSchema().toSnakeCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,17 @@ public void testFindTransformAndMakeItWork() {

assertEquals(
Sets.newHashSet(
"bootstrap_servers",
"bootstrapServers",
"topic",
"schema",
"auto_offset_reset_config",
"consumer_config_updates",
"autoOffsetResetConfig",
"consumerConfigUpdates",
"format",
"confluent_schema_registry_subject",
"confluent_schema_registry_url",
"error_handling",
"file_descriptor_path",
"message_name"),
"confluentSchemaRegistrySubject",
"confluentSchemaRegistryUrl",
"errorHandling",
"fileDescriptorPath",
"messageName"),
kafkaProvider.configurationSchema().getFields().stream()
.map(field -> field.getName())
.collect(Collectors.toSet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ Row getConfigurationRow() {
}
}

/** */
@VisibleForTesting
static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
// May return an empty row (perhaps the underlying transform doesn't have any required
Expand All @@ -209,4 +208,15 @@ static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
Map<String, SchemaTransformProvider> getAllProviders() {
return schemaTransformProviders;
}

// TODO: set global snake_case naming convention and remove these special cases
@Override
public SchemaTransform from(Row rowConfig) {
return super.from(rowConfig.toCamelCase());
}

@Override
public Schema configurationSchema() {
return super.configurationSchema().toSnakeCase();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testFailWhenNoConfigSpecified() {

@Test
public void testGetConfigRowFromYamlString() {
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
String yamlString = "extraString: abc\n" + "extraInteger: 123";
ManagedConfig config =
ManagedConfig.builder()
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
Expand All @@ -60,8 +60,8 @@ public void testGetConfigRowFromYamlString() {

Row expectedRow =
Row.withSchema(TestSchemaTransformProvider.SCHEMA)
.withFieldValue("extra_string", "abc")
.withFieldValue("extra_integer", 123)
.withFieldValue("extraString", "abc")
.withFieldValue("extraInteger", 123)
.build();

Row returnedRow =
Expand All @@ -84,8 +84,8 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
Schema configSchema = new TestSchemaTransformProvider().configurationSchema();
Row expectedRow =
Row.withSchema(configSchema)
.withFieldValue("extra_string", "abc")
.withFieldValue("extra_integer", 123)
.withFieldValue("extraString", "abc")
.withFieldValue("extraInteger", 123)
.build();
Row configRow =
ManagedSchemaTransformProvider.getRowConfig(
Expand All @@ -96,7 +96,7 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {

@Test
public void testBuildWithYamlString() {
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
String yamlString = "extraString: abc\n" + "extraInteger: 123";

ManagedConfig config =
ManagedConfig.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testReCreateTransformFromRowWithConfigUrl() throws URISyntaxExceptio

@Test
public void testReCreateTransformFromRowWithConfig() {
String yamlString = "extra_string: abc\n" + "extra_integer: 123";
String yamlString = "extraString: abc\n" + "extraInteger: 123";

ManagedConfig originalConfig =
ManagedConfig.builder()
Expand Down Expand Up @@ -123,8 +123,8 @@ public void testProtoTranslation() throws Exception {
.setRowSchema(inputSchema);
Map<String, Object> underlyingConfig =
ImmutableMap.<String, Object>builder()
.put("extra_string", "abc")
.put("extra_integer", 123)
.put("extraString", "abc")
.put("extraInteger", 123)
.build();
String yamlStringConfig = YamlUtils.yamlStringFromMap(underlyingConfig);
Managed.ManagedTransform transform =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testManagedTestProviderWithConfigMap() {
.setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
.build()
.withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
.withConfig(ImmutableMap.of("extra_string", "abc", "extra_integer", 123));
.withConfig(ImmutableMap.of("extraString", "abc", "extraInteger", 123));

runTestProviderTest(writeOp);
}
Expand Down
4 changes: 2 additions & 2 deletions sdks/java/managed/src/test/resources/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
# under the License.
#

extra_string: "abc"
extra_integer: 123
extraString: "abc"
extraInteger: 123
14 changes: 7 additions & 7 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2574,13 +2574,13 @@ def expand(self, input):
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
table=table,
create_disposition=self._create_disposition,
write_disposition=self._write_disposition,
triggering_frequency_seconds=self._triggering_frequency,
auto_sharding=self._with_auto_sharding,
num_streams=self._num_storage_api_streams,
use_at_least_once_semantics=self._use_at_least_once,
error_handling={
createDisposition=self._create_disposition,
writeDisposition=self._write_disposition,
triggeringFrequencySeconds=self._triggering_frequency,
autoSharding=self._with_auto_sharding,
numStreams=self._num_storage_api_streams,
useAtLeastOnceSemantics=self._use_at_least_once,
errorHandling={
'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
}))

Expand Down
12 changes: 6 additions & 6 deletions sdks/python/apache_beam/io/gcp/bigtableio.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ def expand(self, input):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
table_id=self._table_id,
instance_id=self._instance_id,
project_id=self._project_id)
tableId=self._table_id,
instanceId=self._instance_id,
projectId=self._project_id)

return (
input
Expand Down Expand Up @@ -323,9 +323,9 @@ def expand(self, input):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
table_id=self._table_id,
instance_id=self._instance_id,
project_id=self._project_id)
tableId=self._table_id,
instanceId=self._instance_id,
projectId=self._project_id)

return (
input.pipeline
Expand Down
35 changes: 33 additions & 2 deletions sdks/python/apache_beam/transforms/external_transform_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,32 @@ def snake_case_to_upper_camel_case(string):
return output


def snake_case_to_lower_camel_case(string):
"""Convert snake_case to lowerCamelCase"""
if len(string) <= 1:
return string.lower()
upper = snake_case_to_upper_camel_case(string)
return upper[0].lower() + upper[1:]


def camel_case_to_snake_case(string):
"""Convert camelCase to snake_case"""
arr = []
word = []
for i, n in enumerate(string):
# If seeing an upper letter after a lower letter, we just witnessed a word
# If seeing an upper letter and the next letter is lower, we may have just
# witnessed an all caps word
if n.isupper() and ((i > 0 and string[i - 1].islower()) or
(i + 1 < len(string) and string[i + 1].islower())):
arr.append(''.join(word))
word = [n.lower()]
else:
word.append(n.lower())
arr.append(''.join(word))
return '_'.join(arr).strip('_')


# Information regarding a Wrapper parameter.
ParamInfo = namedtuple('ParamInfo', ['type', 'description', 'original_name'])

Expand All @@ -50,7 +76,7 @@ def get_config_with_descriptions(
descriptions = schematransform.configuration_schema._field_descriptions
fields_with_descriptions = {}
for field in schema.fields:
fields_with_descriptions[field.name] = ParamInfo(
fields_with_descriptions[camel_case_to_snake_case(field.name)] = ParamInfo(
typing_from_runner_api(field.type),
descriptions[field.name],
field.name)
Expand Down Expand Up @@ -79,11 +105,16 @@ def __init__(self, expansion_service=None, **kwargs):
expansion_service or self.default_expansion_service

def expand(self, input):
camel_case_kwargs = {
snake_case_to_lower_camel_case(k): v
for k, v in self._kwargs.items()
}

external_schematransform = SchemaAwareExternalTransform(
identifier=self.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
**self._kwargs)
**camel_case_kwargs)

return input | external_schematransform

Expand Down
Loading

0 comments on commit b33a843

Please sign in to comment.