From 2a3711deda15dba954d3e5998671202106136c2b Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Wed, 28 Feb 2024 11:52:46 -0800 Subject: [PATCH 1/5] Allows customization in the Flink State binding --- .../wrappers/streaming/DoFnOperator.java | 3 +- .../streaming/state/FlinkStateBinders.java | 57 +++++++++++++++++++ .../streaming/state/FlinkStateInternals.java | 22 ++++--- 3 files changed, 73 insertions(+), 9 deletions(-) create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateBinders.java diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 03a9d1464b3e..3e64b1003ad3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -63,6 +63,7 @@ import org.apache.beam.runners.flink.translation.utils.Workarounds; import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferingDoFnRunner; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateBinders; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StructuredCoder; @@ -552,7 +553,7 @@ private void earlyBindStateIfNeeded() throws IllegalArgumentException, IllegalAc if (doFn != null) { DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); FlinkStateInternals.EarlyBinder earlyBinder = - new FlinkStateInternals.EarlyBinder(getKeyedStateBackend(), serializedOptions); + FlinkStateBinders.getEarlyBinder(getKeyedStateBackend(), serializedOptions, stepName); for (DoFnSignature.StateDeclaration value : signature.stateDeclarations().values()) { StateSpec spec = (StateSpec) signature.stateDeclarations().get(value.id()).field().get(doFn); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateBinders.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateBinders.java new file mode 100644 index 000000000000..33e36158c6a4 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateBinders.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.state; + +import java.util.ServiceLoader; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.apache.flink.runtime.state.KeyedStateBackend; + +/** + * LinkedIn-only: allow custom configuration of {@link + * org.apache.flink.api.common.state.StateDescriptor} during the Beam state binding. + */ +@SuppressWarnings({"rawtypes", "nullness"}) +public class FlinkStateBinders { + /** An interface that allows custom {@link org.apache.beam.sdk.state.StateBinder}. */ + public interface Registrar { + FlinkStateInternals.EarlyBinder getEarlyBinder( + KeyedStateBackend keyedStateBackend, + SerializablePipelineOptions pipelineOptions, + String stepName); + } + + private static final Registrar REGISTRAR = + Iterables.getOnlyElement(ServiceLoader.load(Registrar.class), null); + + /** + * Returns {@link + * org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals.EarlyBinder} + * that creates the user states from the Flink state backend. + */ + public static FlinkStateInternals.EarlyBinder getEarlyBinder( + KeyedStateBackend keyedStateBackend, + SerializablePipelineOptions pipelineOptions, + String stepName) { + if (REGISTRAR != null) { + return REGISTRAR.getEarlyBinder(keyedStateBackend, pipelineOptions, stepName); + } else { + return new FlinkStateInternals.EarlyBinder(keyedStateBackend, pipelineOptions); + } + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index 16a28e8ce677..16f8106e02f0 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -1581,7 +1581,7 @@ public EarlyBinder( @Override public ValueState bindValue(String id, StateSpec> spec, Coder coder) { try { - keyedStateBackend.getOrCreateKeyedState( + getOrCreateKeyedState( StringSerializer.INSTANCE, new ValueStateDescriptor<>(id, new CoderTypeSerializer<>(coder, pipelineOptions))); } catch (Exception e) { @@ -1594,7 +1594,7 @@ public ValueState bindValue(String id, StateSpec> spec, Cod @Override public BagState bindBag(String id, StateSpec> spec, Coder elemCoder) { try { - keyedStateBackend.getOrCreateKeyedState( + getOrCreateKeyedState( StringSerializer.INSTANCE, new ListStateDescriptor<>(id, new CoderTypeSerializer<>(elemCoder, pipelineOptions))); } catch (Exception e) { @@ -1607,7 +1607,7 @@ public BagState bindBag(String id, StateSpec> spec, Coder @Override public SetState bindSet(String id, StateSpec> spec, Coder elemCoder) { try { - keyedStateBackend.getOrCreateKeyedState( + getOrCreateKeyedState( StringSerializer.INSTANCE, new MapStateDescriptor<>( id, @@ -1626,7 +1626,7 @@ public org.apache.beam.sdk.state.MapState bindMap( Coder mapKeyCoder, Coder mapValueCoder) { try { - keyedStateBackend.getOrCreateKeyedState( + getOrCreateKeyedState( StringSerializer.INSTANCE, new MapStateDescriptor<>( id, @@ -1642,7 +1642,7 @@ public org.apache.beam.sdk.state.MapState bindMap( public OrderedListState bindOrderedList( String id, StateSpec> spec, Coder elemCoder) { try { - keyedStateBackend.getOrCreateKeyedState( + getOrCreateKeyedState( StringSerializer.INSTANCE, new ListStateDescriptor<>( id, @@ -1671,7 +1671,7 @@ public CombiningState bindCom Coder accumCoder, Combine.CombineFn combineFn) { try { - keyedStateBackend.getOrCreateKeyedState( + getOrCreateKeyedState( StringSerializer.INSTANCE, new ValueStateDescriptor<>(id, new CoderTypeSerializer<>(accumCoder, pipelineOptions))); } catch (Exception e) { @@ -1688,7 +1688,7 @@ CombiningState bindCombiningWithContext( Coder accumCoder, CombineWithContext.CombineFnWithContext combineFn) { try { - keyedStateBackend.getOrCreateKeyedState( + getOrCreateKeyedState( StringSerializer.INSTANCE, new ValueStateDescriptor<>(id, new CoderTypeSerializer<>(accumCoder, pipelineOptions))); } catch (Exception e) { @@ -1701,7 +1701,7 @@ CombiningState bindCombiningWithContext( public WatermarkHoldState bindWatermark( String id, StateSpec spec, TimestampCombiner timestampCombiner) { try { - keyedStateBackend.getOrCreateKeyedState( + getOrCreateKeyedState( VoidNamespaceSerializer.INSTANCE, new MapStateDescriptor<>( "watermark-holds", @@ -1712,5 +1712,11 @@ public WatermarkHoldState bindWatermark( } return null; } + + protected StateT getOrCreateKeyedState( + TypeSerializer namespaceSerializer, StateDescriptor stateDescriptor) + throws Exception { + return (StateT) keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor); + } } } From ebfedf1331cb9ba1c820e0cfefbc1eb3258125b6 Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Wed, 28 Feb 2024 13:11:29 -0800 Subject: [PATCH 2/5] Fix styling issue --- .../wrappers/streaming/state/FlinkStateBinders.java | 5 +---- .../wrappers/streaming/state/FlinkStateInternals.java | 8 +++++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateBinders.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateBinders.java index 33e36158c6a4..412312271bef 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateBinders.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateBinders.java @@ -22,10 +22,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.flink.runtime.state.KeyedStateBackend; -/** - * LinkedIn-only: allow custom configuration of {@link - * org.apache.flink.api.common.state.StateDescriptor} during the Beam state binding. - */ +/** LinkedIn-only: allow customization in Beam state binding. */ @SuppressWarnings({"rawtypes", "nullness"}) public class FlinkStateBinders { /** An interface that allows custom {@link org.apache.beam.sdk.state.StateBinder}. */ diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java index 16f8106e02f0..d7db1f5907b0 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -1713,9 +1713,11 @@ public WatermarkHoldState bindWatermark( return null; } - protected StateT getOrCreateKeyedState( - TypeSerializer namespaceSerializer, StateDescriptor stateDescriptor) - throws Exception { + protected + StateT getOrCreateKeyedState( + TypeSerializer namespaceSerializer, + StateDescriptor stateDescriptor) + throws Exception { return (StateT) keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor); } } From d137d72320ccb15766d61d9368e57d1495ce13cb Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Wed, 28 Feb 2024 13:15:05 -0800 Subject: [PATCH 3/5] Update version --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- gradle.properties | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 21ae001b2d12..5efe2d772034 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin { // Automatically use the official release version if we are performing a release // otherwise append '-SNAPSHOT' - project.version = '2.45.19' + project.version = '2.45.20' if (isLinkedin(project)) { project.ext.mavenGroupId = 'com.linkedin.beam' } diff --git a/gradle.properties b/gradle.properties index 8d286f625d5f..21645e307992 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.45.19 -sdk_version=2.45.19 +version=2.45.20 +sdk_version=2.45.20 javaVersion=1.8 From 0f4b3f441e89b2bb6420b89af49073d6d65661b6 Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Fri, 29 Mar 2024 16:43:58 -0700 Subject: [PATCH 4/5] Fix nested union in avro schema conversion --- .../CustomPipelineOptionsInitializer.java | 31 +++++++++++++------ .../sdk/options/ProxyInvocationHandler.java | 2 +- .../beam/sdk/schemas/utils/AvroUtils.java | 5 ++- .../beam/sdk/schemas/utils/AvroUtilsTest.java | 31 +++++++++++++++++++ 4 files changed, 58 insertions(+), 11 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CustomPipelineOptionsInitializer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CustomPipelineOptionsInitializer.java index b2b05b4881cc..31c9966a0340 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CustomPipelineOptionsInitializer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/CustomPipelineOptionsInitializer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.options; import java.util.Iterator; @@ -6,19 +23,16 @@ import org.checkerframework.checker.initialization.qual.Initialized; import org.checkerframework.checker.nullness.qual.Nullable; - /** - * Interface to support offspring wire-in for Li: if input class is a - * customized pipelineOptions created by an offspring factory, - * the pipelineOptions will be initialized. - * + * Interface to support offspring wire-in for Li: if input class is a customized pipelineOptions + * created by an offspring factory, the pipelineOptions will be initialized. */ @SuppressWarnings("rawtypes") public interface CustomPipelineOptionsInitializer { T init(T pipelineOptions, Class clazz); /** - * Inject the implementation for the interface + * Inject the implementation for the interface. *

Usage: * *

{@code
@@ -34,10 +48,9 @@ interface Registrar {
     CustomPipelineOptionsInitializer create();
   }
 
-
   static @Initialized @Nullable CustomPipelineOptionsInitializer get() {
-    final Iterator
-        initializer = ServiceLoader.load(CustomPipelineOptionsInitializer.Registrar.class).iterator();
+    final Iterator initializer =
+        ServiceLoader.load(CustomPipelineOptionsInitializer.Registrar.class).iterator();
     return initializer.hasNext() ? Iterators.getOnlyElement(initializer).create() : null;
   }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index 53cc62a5ceca..dab71ce7ce27 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -196,7 +196,7 @@ public Object invoke(Object proxy, Method method, Object[] args) {
     } else if (args != null && "as".equals(method.getName()) && args[0] instanceof Class) {
       @SuppressWarnings("unchecked")
       Class clazz = (Class) args[0];
-      return as(clazz, (PipelineOptions)proxy); // LI-specific change to wire in offspring
+      return as(clazz, (PipelineOptions) proxy); // LI-specific change to wire in offspring
     } else if (args != null
         && "populateDisplayData".equals(method.getName())
         && args[0] instanceof DisplayData.Builder) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index 28f231432c71..be387425f511 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -971,7 +971,10 @@ private static org.apache.avro.Schema getFieldSchema(
           baseType =
               org.apache.avro.Schema.createUnion(
                   oneOfType.getOneOfSchema().getFields().stream()
-                      .map(x -> getFieldSchema(x.getType(), x.getName(), namespace))
+                      .map(
+                          x ->
+                              getFieldSchema(
+                                  x.getType().withNullable(false), x.getName(), namespace))
                       .collect(Collectors.toList()));
         } else if ("DATE".equals(identifier) || SqlTypes.DATE.getIdentifier().equals(identifier)) {
           baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
index b8a13770c869..4616fd487ebd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
@@ -930,4 +930,35 @@ public FieldType getBaseType() {
       return base;
     }
   }
+
+  private static final String UNION_SCHEMA =
+      "{"
+          + "\"name\" : \"test\","
+          + "\"type\" : \"record\","
+          + "\"fields\" : ["
+          + "{\"name\" : \"Entity\","
+          + " \"type\" : ["
+          + " {\"type\" : \"record\","
+          + "  \"name\" : \"MemberClient\","
+          + "  \"fields\" : ["
+          + "   {\"name\" : \"member\","
+          + "    \"type\" : \"string\"}]"
+          + " },"
+          + " {\"type\" : \"record\","
+          + "  \"name\" : \"OrganizationClient\","
+          + "  \"fields\" : ["
+          + "    {\"name\" : \"organization\","
+          + "     \"type\" : \"string\"},"
+          + "    {\"name\" : \"displayName\","
+          + "     \"type\" : [ \"null\", \"string\" ]}]"
+          + " }]}"
+          + "]}";
+
+  @Test
+  public void testUnion() {
+    org.apache.avro.Schema schema = org.apache.avro.Schema.parse(UNION_SCHEMA);
+    Schema beamSchema = AvroUtils.toBeamSchema(schema);
+    org.apache.avro.Schema convertedSchema = AvroUtils.toAvroSchema(beamSchema);
+    assertEquals(Type.UNION, convertedSchema.getField("Entity").schema().getType());
+  }
 }

From 5283d04464f87499b9209b266f36caddfeacc17b Mon Sep 17 00:00:00 2001
From: Xinyu Liu 
Date: Fri, 29 Mar 2024 17:06:53 -0700
Subject: [PATCH 5/5] Update version

---
 .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy     | 2 +-
 gradle.properties                                             | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 5efe2d772034..9133795f702f 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -398,7 +398,7 @@ class BeamModulePlugin implements Plugin {
 
     // Automatically use the official release version if we are performing a release
     // otherwise append '-SNAPSHOT'
-    project.version = '2.45.20'
+    project.version = '2.45.21'
     if (isLinkedin(project)) {
       project.ext.mavenGroupId = 'com.linkedin.beam'
     }
diff --git a/gradle.properties b/gradle.properties
index 21645e307992..2667e4b5ee4d 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true
 # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy.
 # To build a custom Beam version make sure you change it in both places, see
 # https://github.com/apache/beam/issues/21302.
-version=2.45.20
-sdk_version=2.45.20
+version=2.45.21
+sdk_version=2.45.21
 
 javaVersion=1.8