diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index e3d6056a5de9..b26833333238 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
- "modification": 1
+ "modification": 2
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
index cfd298ae87ee..d5c6c724c6f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
@@ -17,10 +17,8 @@
*/
package org.apache.beam.sdk.schemas.transforms;
-import static org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import java.lang.reflect.ParameterizedType;
import java.util.List;
@@ -28,10 +26,8 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.values.Row;
@@ -45,9 +41,6 @@
* {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used produce a {@link
* SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration Schema.
*
- *
NOTE: The inferred field names in the configuration {@link Schema} and {@link Row} follow the
- * {@code snake_case} naming convention.
- *
*
Internal only: This interface is actively being worked on and it will likely change as
* we provide implementations for more standard Beam transforms. We provide no backwards
* compatibility guarantees and it should not be implemented outside of the Beam repository.
@@ -85,11 +78,10 @@ Optional> dependencies(ConfigT configuration, PipelineOptions optio
}
@Override
- public final Schema configurationSchema() {
+ public Schema configurationSchema() {
try {
// Sort the fields by name to ensure a consistent schema is produced
- // We also establish a `snake_case` convention for all SchemaTransform configurations
- return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted().toSnakeCase();
+ return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for "
@@ -98,12 +90,9 @@ public final Schema configurationSchema() {
}
}
- /**
- * Produces a {@link SchemaTransform} from a Row configuration. Row fields are expected to have
- * `snake_case` naming convention.
- */
+ /** Produces a {@link SchemaTransform} from a Row configuration. */
@Override
- public final SchemaTransform from(Row configuration) {
+ public SchemaTransform from(Row configuration) {
return from(configFromRow(configuration));
}
@@ -114,20 +103,9 @@ public final Optional> dependencies(Row configuration, PipelineOpti
private ConfigT configFromRow(Row configuration) {
try {
- SchemaRegistry registry = SchemaRegistry.createDefault();
-
- // Configuration objects handled by the AutoValueSchema provider will expect Row fields with
- // camelCase naming convention
- SchemaProvider schemaProvider = registry.getSchemaProvider(configurationClass());
- if (schemaProvider.getClass().equals(DefaultSchemaProvider.class)
- && checkNotNull(
- ((DefaultSchemaProvider) schemaProvider)
- .getUnderlyingSchemaProvider(configurationClass()))
- .getClass()
- .equals(AutoValueSchema.class)) {
- configuration = configuration.toCamelCase();
- }
- return registry.getFromRowFunction(configurationClass()).apply(configuration);
+ return SchemaRegistry.createDefault()
+ .getFromRowFunction(configurationClass())
+ .apply(configuration);
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for " + identifier() + "SchemaTransformProvider's config");
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
index 2eef0e30f805..b1dc0911a927 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
@@ -130,8 +130,8 @@ public void testFrom() {
Row inputConfig =
Row.withSchema(provider.configurationSchema())
- .withFieldValue("string_field", "field1")
- .withFieldValue("integer_field", Integer.valueOf(13))
+ .withFieldValue("stringField", "field1")
+ .withFieldValue("integerField", Integer.valueOf(13))
.build();
Configuration outputConfig = ((FakeSchemaTransform) provider.from(inputConfig)).config;
@@ -150,8 +150,8 @@ public void testDependencies() {
SchemaTransformProvider provider = new FakeTypedSchemaIOProvider();
Row inputConfig =
Row.withSchema(provider.configurationSchema())
- .withFieldValue("string_field", "field1")
- .withFieldValue("integer_field", Integer.valueOf(13))
+ .withFieldValue("stringField", "field1")
+ .withFieldValue("integerField", Integer.valueOf(13))
.build();
assertEquals(Arrays.asList("field1", "13"), provider.dependencies(inputConfig, null).get());
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
index fb32e18d9374..bfe2fab1f9a2 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java
@@ -25,6 +25,7 @@
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
@@ -131,4 +132,15 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
}
+
+ // TODO: set global snake_case naming convention and remove these special cases
+ @Override
+ public SchemaTransform from(Row rowConfig) {
+ return super.from(rowConfig.toCamelCase());
+ }
+
+ @Override
+ public Schema configurationSchema() {
+ return super.configurationSchema().toSnakeCase();
+ }
}
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
index b490693a9adb..71183c6b0a03 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java
@@ -176,4 +176,15 @@ public Row apply(KV input) {
}
}
}
+
+ // TODO: set global snake_case naming convention and remove these special cases
+ @Override
+ public SchemaTransform from(Row rowConfig) {
+ return super.from(rowConfig.toCamelCase());
+ }
+
+ @Override
+ public Schema configurationSchema() {
+ return super.configurationSchema().toSnakeCase();
+ }
}
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index bf9895e36b84..f6e231c758a5 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -112,17 +112,17 @@ public void testFindTransformAndMakeItWork() {
assertEquals(
Sets.newHashSet(
- "bootstrap_servers",
+ "bootstrapServers",
"topic",
"schema",
- "auto_offset_reset_config",
- "consumer_config_updates",
+ "autoOffsetResetConfig",
+ "consumerConfigUpdates",
"format",
- "confluent_schema_registry_subject",
- "confluent_schema_registry_url",
- "error_handling",
- "file_descriptor_path",
- "message_name"),
+ "confluentSchemaRegistrySubject",
+ "confluentSchemaRegistryUrl",
+ "errorHandling",
+ "fileDescriptorPath",
+ "messageName"),
kafkaProvider.configurationSchema().getFields().stream()
.map(field -> field.getName())
.collect(Collectors.toSet()));
diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
index cb5088a24cca..e13741e86b4a 100644
--- a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
+++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java
@@ -198,7 +198,6 @@ Row getConfigurationRow() {
}
}
- /** */
@VisibleForTesting
static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
// May return an empty row (perhaps the underlying transform doesn't have any required
@@ -209,4 +208,15 @@ static Row getRowConfig(ManagedConfig config, Schema transformSchema) {
Map getAllProviders() {
return schemaTransformProviders;
}
+
+ // TODO: set global snake_case naming convention and remove these special cases
+ @Override
+ public SchemaTransform from(Row rowConfig) {
+ return super.from(rowConfig.toCamelCase());
+ }
+
+ @Override
+ public Schema configurationSchema() {
+ return super.configurationSchema().toSnakeCase();
+ }
}
diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
index e9edf8751e34..3a3465406c03 100644
--- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
+++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java
@@ -51,7 +51,7 @@ public void testFailWhenNoConfigSpecified() {
@Test
public void testGetConfigRowFromYamlString() {
- String yamlString = "extra_string: abc\n" + "extra_integer: 123";
+ String yamlString = "extraString: abc\n" + "extraInteger: 123";
ManagedConfig config =
ManagedConfig.builder()
.setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER)
@@ -60,8 +60,8 @@ public void testGetConfigRowFromYamlString() {
Row expectedRow =
Row.withSchema(TestSchemaTransformProvider.SCHEMA)
- .withFieldValue("extra_string", "abc")
- .withFieldValue("extra_integer", 123)
+ .withFieldValue("extraString", "abc")
+ .withFieldValue("extraInteger", 123)
.build();
Row returnedRow =
@@ -84,8 +84,8 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
Schema configSchema = new TestSchemaTransformProvider().configurationSchema();
Row expectedRow =
Row.withSchema(configSchema)
- .withFieldValue("extra_string", "abc")
- .withFieldValue("extra_integer", 123)
+ .withFieldValue("extraString", "abc")
+ .withFieldValue("extraInteger", 123)
.build();
Row configRow =
ManagedSchemaTransformProvider.getRowConfig(
@@ -96,7 +96,7 @@ public void testGetConfigRowFromYamlFile() throws URISyntaxException {
@Test
public void testBuildWithYamlString() {
- String yamlString = "extra_string: abc\n" + "extra_integer: 123";
+ String yamlString = "extraString: abc\n" + "extraInteger: 123";
ManagedConfig config =
ManagedConfig.builder()
diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
index b4b41ded841c..7a418976079f 100644
--- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
+++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformTranslationTest.java
@@ -84,7 +84,7 @@ public void testReCreateTransformFromRowWithConfigUrl() throws URISyntaxExceptio
@Test
public void testReCreateTransformFromRowWithConfig() {
- String yamlString = "extra_string: abc\n" + "extra_integer: 123";
+ String yamlString = "extraString: abc\n" + "extraInteger: 123";
ManagedConfig originalConfig =
ManagedConfig.builder()
@@ -123,8 +123,8 @@ public void testProtoTranslation() throws Exception {
.setRowSchema(inputSchema);
Map underlyingConfig =
ImmutableMap.builder()
- .put("extra_string", "abc")
- .put("extra_integer", 123)
+ .put("extraString", "abc")
+ .put("extraInteger", 123)
.build();
String yamlStringConfig = YamlUtils.yamlStringFromMap(underlyingConfig);
Managed.ManagedTransform transform =
diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
index 7ed364d0e174..260085486c81 100644
--- a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
+++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java
@@ -90,7 +90,7 @@ public void testManagedTestProviderWithConfigMap() {
.setIdentifier(TestSchemaTransformProvider.IDENTIFIER)
.build()
.withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER))
- .withConfig(ImmutableMap.of("extra_string", "abc", "extra_integer", 123));
+ .withConfig(ImmutableMap.of("extraString", "abc", "extraInteger", 123));
runTestProviderTest(writeOp);
}
diff --git a/sdks/java/managed/src/test/resources/test_config.yaml b/sdks/java/managed/src/test/resources/test_config.yaml
index 7725c32b348e..3967b6095eac 100644
--- a/sdks/java/managed/src/test/resources/test_config.yaml
+++ b/sdks/java/managed/src/test/resources/test_config.yaml
@@ -17,5 +17,5 @@
# under the License.
#
-extra_string: "abc"
-extra_integer: 123
\ No newline at end of file
+extraString: "abc"
+extraInteger: 123
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index d89ce712d8f6..43bd17022180 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -2574,13 +2574,13 @@ def expand(self, input):
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
table=table,
- create_disposition=self._create_disposition,
- write_disposition=self._write_disposition,
- triggering_frequency_seconds=self._triggering_frequency,
- auto_sharding=self._with_auto_sharding,
- num_streams=self._num_storage_api_streams,
- use_at_least_once_semantics=self._use_at_least_once,
- error_handling={
+ createDisposition=self._create_disposition,
+ writeDisposition=self._write_disposition,
+ triggeringFrequencySeconds=self._triggering_frequency,
+ autoSharding=self._with_auto_sharding,
+ numStreams=self._num_storage_api_streams,
+ useAtLeastOnceSemantics=self._use_at_least_once,
+ errorHandling={
'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
}))
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py
index 0f3944a791bd..f8534f38ddfc 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio.py
@@ -225,9 +225,9 @@ def expand(self, input):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
- table_id=self._table_id,
- instance_id=self._instance_id,
- project_id=self._project_id)
+ tableId=self._table_id,
+ instanceId=self._instance_id,
+ projectId=self._project_id)
return (
input
@@ -323,9 +323,9 @@ def expand(self, input):
identifier=self.schematransform_config.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
- table_id=self._table_id,
- instance_id=self._instance_id,
- project_id=self._project_id)
+ tableId=self._table_id,
+ instanceId=self._instance_id,
+ projectId=self._project_id)
return (
input.pipeline
diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py b/sdks/python/apache_beam/transforms/external_transform_provider.py
index 67adda5aec03..2799bd1b9e93 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider.py
@@ -39,6 +39,32 @@ def snake_case_to_upper_camel_case(string):
return output
+def snake_case_to_lower_camel_case(string):
+ """Convert snake_case to lowerCamelCase"""
+ if len(string) <= 1:
+ return string.lower()
+ upper = snake_case_to_upper_camel_case(string)
+ return upper[0].lower() + upper[1:]
+
+
+def camel_case_to_snake_case(string):
+ """Convert camelCase to snake_case"""
+ arr = []
+ word = []
+ for i, n in enumerate(string):
+ # If seeing an upper letter after a lower letter, we just witnessed a word
+ # If seeing an upper letter and the next letter is lower, we may have just
+ # witnessed an all caps word
+ if n.isupper() and ((i > 0 and string[i - 1].islower()) or
+ (i + 1 < len(string) and string[i + 1].islower())):
+ arr.append(''.join(word))
+ word = [n.lower()]
+ else:
+ word.append(n.lower())
+ arr.append(''.join(word))
+ return '_'.join(arr).strip('_')
+
+
# Information regarding a Wrapper parameter.
ParamInfo = namedtuple('ParamInfo', ['type', 'description', 'original_name'])
@@ -50,7 +76,7 @@ def get_config_with_descriptions(
descriptions = schematransform.configuration_schema._field_descriptions
fields_with_descriptions = {}
for field in schema.fields:
- fields_with_descriptions[field.name] = ParamInfo(
+ fields_with_descriptions[camel_case_to_snake_case(field.name)] = ParamInfo(
typing_from_runner_api(field.type),
descriptions[field.name],
field.name)
@@ -79,11 +105,16 @@ def __init__(self, expansion_service=None, **kwargs):
expansion_service or self.default_expansion_service
def expand(self, input):
+ camel_case_kwargs = {
+ snake_case_to_lower_camel_case(k): v
+ for k, v in self._kwargs.items()
+ }
+
external_schematransform = SchemaAwareExternalTransform(
identifier=self.identifier,
expansion_service=self._expansion_service,
rearrange_based_on_discovery=True,
- **self._kwargs)
+ **camel_case_kwargs)
return input | external_schematransform
diff --git a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
index 95720cee7eee..a53001c85fd3 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider_it_test.py
@@ -37,7 +37,9 @@
from apache_beam.transforms.external_transform_provider import STANDARD_URN_PATTERN
from apache_beam.transforms.external_transform_provider import ExternalTransform
from apache_beam.transforms.external_transform_provider import ExternalTransformProvider
+from apache_beam.transforms.external_transform_provider import camel_case_to_snake_case
from apache_beam.transforms.external_transform_provider import infer_name_from_identifier
+from apache_beam.transforms.external_transform_provider import snake_case_to_lower_camel_case
from apache_beam.transforms.external_transform_provider import snake_case_to_upper_camel_case
from apache_beam.transforms.xlang.io import GenerateSequence
@@ -52,6 +54,26 @@ def test_snake_case_to_upper_camel_case(self):
for case in test_cases:
self.assertEqual(case[1], snake_case_to_upper_camel_case(case[0]))
+ def test_snake_case_to_lower_camel_case(self):
+ test_cases = [("", ""), ("test", "test"), ("test_name", "testName"),
+ ("test_double_underscore", "testDoubleUnderscore"),
+ ("TEST_CAPITALIZED", "testCapitalized"),
+ ("_prepended_underscore", "prependedUnderscore"),
+ ("appended_underscore_", "appendedUnderscore")]
+ for case in test_cases:
+ self.assertEqual(case[1], snake_case_to_lower_camel_case(case[0]))
+
+ def test_camel_case_to_snake_case(self):
+ test_cases = [("", ""), ("Test", "test"), ("TestName", "test_name"),
+ ("TestDoubleUnderscore",
+ "test_double_underscore"), ("MyToLoFo", "my_to_lo_fo"),
+ ("BEGINNINGAllCaps",
+ "beginning_all_caps"), ("AllCapsENDING", "all_caps_ending"),
+ ("AllCapsMIDDLEWord", "all_caps_middle_word"),
+ ("lowerCamelCase", "lower_camel_case")]
+ for case in test_cases:
+ self.assertEqual(case[1], camel_case_to_snake_case(case[0]))
+
def test_infer_name_from_identifier(self):
standard_test_cases = [
("beam:schematransform:org.apache.beam:transform:v1", "Transform"),
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py
index 1e9c7c605460..5f53302028c8 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -889,7 +889,7 @@ def java_window_into(java_provider, windowing):
return java_provider.create_transform(
'WindowIntoStrategy',
{
- 'serialized_windowing_strategy': windowing_strategy.to_runner_api(
+ 'serializedWindowingStrategy': windowing_strategy.to_runner_api(
empty_context).SerializeToString()
},
None)
diff --git a/sdks/python/gen_xlang_wrappers.py b/sdks/python/gen_xlang_wrappers.py
index ea4f496c2d04..a75fc05cba73 100644
--- a/sdks/python/gen_xlang_wrappers.py
+++ b/sdks/python/gen_xlang_wrappers.py
@@ -233,6 +233,24 @@ def pretty_type(tp):
return (tp, nullable)
+def camel_case_to_snake_case(string):
+ """Convert camelCase to snake_case"""
+ arr = []
+ word = []
+ for i, n in enumerate(string):
+ # If seeing an upper letter after a lower letter, we just witnessed a word
+ # If seeing an upper letter and the next letter is lower, we may have just
+ # witnessed an all caps word
+ if n.isupper() and ((i > 0 and string[i - 1].islower()) or
+ (i + 1 < len(string) and string[i + 1].islower())):
+ arr.append(''.join(word))
+ word = [n.lower()]
+ else:
+ word.append(n.lower())
+ arr.append(''.join(word))
+ return '_'.join(arr).strip('_')
+
+
def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]:
"""
Generates code for external transform wrapper classes (subclasses of
@@ -269,8 +287,9 @@ def get_wrappers_from_transform_configs(config_file) -> Dict[str, List[str]]:
parameters = []
for param, info in fields.items():
+ pythonic_name = camel_case_to_snake_case(param)
param_details = {
- "name": param,
+ "name": pythonic_name,
"type": info['type'],
"description": info['description'],
}