Skip to content

Commit

Permalink
Bugfix/kafka nullable header coders (#29244)
Browse files Browse the repository at this point in the history
* Update 2.50 release notes to include new Kafka topicPattern feature

* Create groovy class for io performance tests
Create gradle task and github actions config for GCS using this.

* delete unnecessary class

* fix env call

* fix call to gradle

* run on hosted runner for testing

* add additional checkout

* add destination for triggered tests

* move env variables to correct location

* try uploading against separate dataset

* try without a user

* update branch checkout, try to view the failure log

* run on failure

* update to use correct BigQuery instance

* convert to matrix

* add result reporting

* add failure clause

* remove failure clause, update to run on self-hosted

* address comments, clean up build

* clarify branching

* update kafka coders to support nullable header values
  • Loading branch information
johnjcasey authored Nov 1, 2023
1 parent f53ebfe commit 3d3a7af
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1759,10 +1759,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
static class KafkaHeader {

String key;
byte[] value;
byte @Nullable [] value;

@SchemaCreate
public KafkaHeader(String key, byte[] value) {
public KafkaHeader(String key, byte @Nullable [] value) {
this.key = key;
this.value = value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
Expand All @@ -44,7 +45,7 @@ public class KafkaRecordCoder<K, V> extends StructuredCoder<KafkaRecord<K, V>> {
private static final Coder<Long> longCoder = VarLongCoder.of();
private static final Coder<Integer> intCoder = VarIntCoder.of();
private static final Coder<Iterable<KV<String, byte[]>>> headerCoder =
IterableCoder.of(KvCoder.of(stringCoder, ByteArrayCoder.of()));
IterableCoder.of(KvCoder.of(stringCoder, NullableCoder.of(ByteArrayCoder.of())));

private final KvCoder<K, V> kvCoder;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ public void testKafkaRecordSerializableWithoutHeaders() throws IOException {
verifySerialization(consumerRecord.headers());
}

@Test
public void testKafkaRecordSerializableWithNullValueHeader() throws IOException {
RecordHeaders headers = new RecordHeaders();
headers.add("headerKey", null);
verifySerialization(headers);
}

private void verifySerialization(Headers headers) throws IOException {
KafkaRecord<String, String> kafkaRecord =
new KafkaRecord<>(
Expand Down

0 comments on commit 3d3a7af

Please sign in to comment.