diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 7275986de8b5..ea0c34e576b9 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1759,10 +1759,10 @@ public void populateDisplayData(DisplayData.Builder builder) { static class KafkaHeader { String key; - byte[] value; + byte @Nullable [] value; @SchemaCreate - public KafkaHeader(String key, byte[] value) { + public KafkaHeader(String key, byte @Nullable [] value) { this.key = key; this.value = value; } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java index 2cb1efe65704..dbb3a053099c 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -44,7 +45,7 @@ public class KafkaRecordCoder extends StructuredCoder> { private static final Coder longCoder = VarLongCoder.of(); private static final Coder intCoder = VarIntCoder.of(); private static final Coder>> headerCoder = - IterableCoder.of(KvCoder.of(stringCoder, ByteArrayCoder.of())); + IterableCoder.of(KvCoder.of(stringCoder, NullableCoder.of(ByteArrayCoder.of()))); private final KvCoder kvCoder; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java index 6720d67821ae..84d8cedb895a 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoderTest.java @@ -55,6 +55,13 @@ public void testKafkaRecordSerializableWithoutHeaders() throws IOException { verifySerialization(consumerRecord.headers()); } + @Test + public void testKafkaRecordSerializableWithNullValueHeader() throws IOException { + RecordHeaders headers = new RecordHeaders(); + headers.add("headerKey", null); + verifySerialization(headers); + } + private void verifySerialization(Headers headers) throws IOException { KafkaRecord kafkaRecord = new KafkaRecord<>(