diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java index 9006035279f3..d3b58dd26bd2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java @@ -335,7 +335,7 @@ public PipelineResult run(PipelineOptions options) { /** Returns the {@link CoderRegistry} that this {@link Pipeline} uses. */ public CoderRegistry getCoderRegistry() { if (coderRegistry == null) { - coderRegistry = CoderRegistry.createDefault(getSchemaRegistry()); + coderRegistry = CoderRegistry.createDefault(); } return coderRegistry; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java index e404665e4f66..df64789ac3d2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java @@ -42,8 +42,6 @@ import org.apache.beam.sdk.io.fs.MetadataCoder; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.fs.ResourceIdCoder; -import org.apache.beam.sdk.schemas.NoSuchSchemaException; -import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.CoderUtils; @@ -197,17 +195,11 @@ public Coder coderFor( * the lexicographically smallest {@link Class#getName() class name} being used. * */ - public static CoderRegistry createDefault(@Nullable SchemaRegistry schemaRegistry) { - return new CoderRegistry(schemaRegistry); - } - - /** Backwards compatible version of createDefault. */ public static CoderRegistry createDefault() { - return new CoderRegistry(null); + return new CoderRegistry(); } - private CoderRegistry(@Nullable SchemaRegistry schemaRegistry) { - this.schemaRegistry = schemaRegistry; + private CoderRegistry() { coderProviders = new ArrayDeque<>(REGISTERED_CODER_FACTORIES); } @@ -598,8 +590,6 @@ private static boolean isNullOrEmpty(Collection c) { /** The list of {@link CoderProvider coder providers} to use to provide Coders. */ private ArrayDeque coderProviders; - private final @Nullable SchemaRegistry schemaRegistry; - /** * Returns a {@link Coder} to use for values of the given type, in a context where the given types * use the given coders. @@ -660,28 +650,16 @@ private Coder getCoderFromParameterizedType( List> typeArgumentCoders = new ArrayList<>(); for (Type typeArgument : type.getActualTypeArguments()) { - Coder typeArgumentCoder = null; - if (schemaRegistry != null) { - TypeDescriptor typeDescriptor = TypeDescriptor.of(typeArgument); - try { - typeArgumentCoder = schemaRegistry.getSchemaCoder(typeDescriptor); - } catch (NoSuchSchemaException e) { - // No schema. - } - } - - if (typeArgumentCoder == null) { - try { - typeArgumentCoder = - getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings); - } catch (CannotProvideCoderException exc) { - throw new CannotProvideCoderException( - String.format( - "Cannot provide coder for parameterized type %s: %s", type, exc.getMessage()), - exc); - } + try { + Coder typeArgumentCoder = + getCoderFromTypeDescriptor(TypeDescriptor.of(typeArgument), typeCoderBindings); + typeArgumentCoders.add(typeArgumentCoder); + } catch (CannotProvideCoderException exc) { + throw new CannotProvideCoderException( + String.format( + "Cannot provide coder for parameterized type %s: %s", type, exc.getMessage()), + exc); } - typeArgumentCoders.add(typeArgumentCoder); } return getCoderFromFactories(TypeDescriptor.of(type), typeArgumentCoders); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java index 54c80747b13b..55a16e9faf39 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaRegistryTest.java @@ -26,10 +26,6 @@ import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import java.util.List; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean; import org.apache.beam.sdk.schemas.utils.TestPOJOs.SimplePOJO; @@ -227,22 +223,6 @@ public void testRegisterPojo() throws NoSuchSchemaException { assertTrue(SIMPLE_POJO_SCHEMA.equivalent(schema)); } - @Test - public void testSchemaTypeParameterInsideCoder() throws CannotProvideCoderException { - SchemaRegistry schemaRegistry = SchemaRegistry.createDefault(); - schemaRegistry.registerPOJO(SimplePOJO.class); - - CoderRegistry coderRegistry = CoderRegistry.createDefault(schemaRegistry); - Coder> coder = - coderRegistry.getCoder(TypeDescriptors.iterables(TypeDescriptor.of(SimplePOJO.class))); - assertTrue(coder instanceof IterableCoder); - assertEquals(1, coder.getCoderArguments().size()); - assertTrue(coder.getCoderArguments().get(0) instanceof SchemaCoder); - assertTrue( - SIMPLE_POJO_SCHEMA.equivalent( - ((SchemaCoder) coder.getCoderArguments().get(0)).getSchema())); - } - @Test public void testRegisterJavaBean() throws NoSuchSchemaException { SchemaRegistry registry = SchemaRegistry.createDefault();