diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 5efe2d772034..9133795f702f 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' - project.version = '2.45.20' + project.version = '2.45.21' if (isLinkedin(project)) { project.ext.mavenGroupId = 'com.linkedin.beam' } diff --git a/gradle.properties b/gradle.properties index 21645e307992..2667e4b5ee4d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.45.20 -sdk_version=2.45.20 +version=2.45.21 +sdk_version=2.45.21 javaVersion=1.8 diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CustomPipelineOptionsInitializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CustomPipelineOptionsInitializer.java index b2b05b4881cc..31c9966a0340 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CustomPipelineOptionsInitializer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CustomPipelineOptionsInitializer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.options; import java.util.Iterator; @@ -6,19 +23,16 @@ import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.Nullable; - /** - * Interface to support offspring wire-in for Li: if input class is a - * customized pipelineOptions created by an offspring factory, - * the pipelineOptions will be initialized. - * + * Interface to support offspring wire-in for Li: if input class is a customized pipelineOptions + * created by an offspring factory, the pipelineOptions will be initialized. */ @SuppressWarnings("rawtypes") public interface CustomPipelineOptionsInitializer { T init(T pipelineOptions, Class clazz); /** - * Inject the implementation for the interface + * Inject the implementation for the interface. *

Usage: * *

{@code
@@ -34,10 +48,9 @@ interface Registrar {
     CustomPipelineOptionsInitializer create();
   }
 
-
   static @Initialized @Nullable CustomPipelineOptionsInitializer get() {
-    final Iterator
-        initializer = ServiceLoader.load(CustomPipelineOptionsInitializer.Registrar.class).iterator();
+    final Iterator initializer =
+        ServiceLoader.load(CustomPipelineOptionsInitializer.Registrar.class).iterator();
     return initializer.hasNext() ? Iterators.getOnlyElement(initializer).create() : null;
   }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index 53cc62a5ceca..dab71ce7ce27 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -196,7 +196,7 @@ public Object invoke(Object proxy, Method method, Object[] args) {
     } else if (args != null && "as".equals(method.getName()) && args[0] instanceof Class) {
       @SuppressWarnings("unchecked")
       Class clazz = (Class) args[0];
-      return as(clazz, (PipelineOptions)proxy); // LI-specific change to wire in offspring
+      return as(clazz, (PipelineOptions) proxy); // LI-specific change to wire in offspring
     } else if (args != null
         && "populateDisplayData".equals(method.getName())
         && args[0] instanceof DisplayData.Builder) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index 28f231432c71..be387425f511 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -971,7 +971,10 @@ private static org.apache.avro.Schema getFieldSchema(
           baseType =
               org.apache.avro.Schema.createUnion(
                   oneOfType.getOneOfSchema().getFields().stream()
-                      .map(x -> getFieldSchema(x.getType(), x.getName(), namespace))
+                      .map(
+                          x ->
+                              getFieldSchema(
+                                  x.getType().withNullable(false), x.getName(), namespace))
                       .collect(Collectors.toList()));
         } else if ("DATE".equals(identifier) || SqlTypes.DATE.getIdentifier().equals(identifier)) {
           baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
index b8a13770c869..4616fd487ebd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
@@ -930,4 +930,35 @@ public FieldType getBaseType() {
       return base;
     }
   }
+
+  private static final String UNION_SCHEMA =
+      "{"
+          + "\"name\" : \"test\","
+          + "\"type\" : \"record\","
+          + "\"fields\" : ["
+          + "{\"name\" : \"Entity\","
+          + " \"type\" : ["
+          + " {\"type\" : \"record\","
+          + "  \"name\" : \"MemberClient\","
+          + "  \"fields\" : ["
+          + "   {\"name\" : \"member\","
+          + "    \"type\" : \"string\"}]"
+          + " },"
+          + " {\"type\" : \"record\","
+          + "  \"name\" : \"OrganizationClient\","
+          + "  \"fields\" : ["
+          + "    {\"name\" : \"organization\","
+          + "     \"type\" : \"string\"},"
+          + "    {\"name\" : \"displayName\","
+          + "     \"type\" : [ \"null\", \"string\" ]}]"
+          + " }]}"
+          + "]}";
+
+  @Test
+  public void testUnion() {
+    org.apache.avro.Schema schema = org.apache.avro.Schema.parse(UNION_SCHEMA);
+    Schema beamSchema = AvroUtils.toBeamSchema(schema);
+    org.apache.avro.Schema convertedSchema = AvroUtils.toAvroSchema(beamSchema);
+    assertEquals(Type.UNION, convertedSchema.getField("Entity").schema().getType());
+  }
 }