diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java index bcd201a86b63..7a0d6f82e9cc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/SchemaTransform.java @@ -61,11 +61,12 @@ protected SchemaTransform(ConfigT configuration, String identifier) { Class typedClass = (Class) parameterizedType.getActualTypeArguments()[0]; + SchemaRegistry registry = SchemaRegistry.createDefault(); try { // Get initial row with values - Row row = SchemaRegistry.createDefault().getToRowFunction(typedClass).apply(configuration); + Row row = registry.getToRowFunction(typedClass).apply(configuration); // Get sorted Schema and recreate the Row - Schema configurationSchema = SchemaRegistry.createDefault().getSchema(typedClass).sorted(); + Schema configurationSchema = registry.getSchema(typedClass).sorted(); this.configurationRow = configurationSchema.getFields().stream() .map(field -> row.getValue(field.getName())) diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java index e4103b8b3150..9b1314a2c750 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumReadSchemaTransformProvider.java @@ -26,7 +26,9 @@ import java.util.Objects; import java.util.stream.Collectors; import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -169,6 +171,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { return Collections.singletonList("output"); } + @DefaultSchema(AutoValueSchema.class) @AutoValue public abstract static class DebeziumReadSchemaTransformConfiguration { public abstract String getUsername();