From 34fc9555d6408b31b7ff9081c17b372742db2d0a Mon Sep 17 00:00:00 2001 From: evgeny Date: Tue, 19 Sep 2023 22:31:42 +0100 Subject: [PATCH 1/3] feat!: stringify kafka headers --- .../connect/utils/RecordHeaderConversions.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/ably/kafka/connect/utils/RecordHeaderConversions.java b/src/main/java/com/ably/kafka/connect/utils/RecordHeaderConversions.java index d862a2ff..96aacb0b 100644 --- a/src/main/java/com/ably/kafka/connect/utils/RecordHeaderConversions.java +++ b/src/main/java/com/ably/kafka/connect/utils/RecordHeaderConversions.java @@ -95,9 +95,9 @@ Extras.Builder recordHeaders(Headers headers) { private void buildFromHeaders(Headers headers) { for (Header header : headers) { if (header.key().equals(PUSH_HEADER)) { - extras.pushExtrasValue = header.value(); + extras.pushExtrasValue = stringifyHeaderValueIfPrimitive(header); } else { - headersObject().add(header.key(), header.value()); + headersObject().add(header.key(), stringifyHeaderValueIfPrimitive(header)); } } @@ -180,5 +180,16 @@ MessageExtras toMessageExtras(){ return null; } + /** + * Stringify header value if it is primitive. + */ + private static Object stringifyHeaderValueIfPrimitive(Header header) { + if (header.schema() != null && header.schema().type().isPrimitive()) { + return String.valueOf(header.value()); + } else { + return header.value(); + } + } + } } From a1021ef95ebc9a493c9d6d1b4ed3e2a323620f25 Mon Sep 17 00:00:00 2001 From: evgeny Date: Tue, 19 Sep 2023 23:14:53 +0100 Subject: [PATCH 2/3] feat!: flatten kafka extras into extras --- .../com/ably/kafka/connect/utils/RecordHeaderConversions.java | 2 +- .../java/com/ably/kafka/connect/MessageConversionTest.java | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/ably/kafka/connect/utils/RecordHeaderConversions.java b/src/main/java/com/ably/kafka/connect/utils/RecordHeaderConversions.java index 96aacb0b..93c3ad90 100644 --- a/src/main/java/com/ably/kafka/connect/utils/RecordHeaderConversions.java +++ b/src/main/java/com/ably/kafka/connect/utils/RecordHeaderConversions.java @@ -102,7 +102,7 @@ private void buildFromHeaders(Headers headers) { } if (extras.headersObject != null) { - kafkaExtras().add(HEADERS_KEY, extras.headersObject); + topExtrasObject().add(HEADERS_KEY, extras.headersObject); } buildPushExtras(); diff --git a/src/test/java/com/ably/kafka/connect/MessageConversionTest.java b/src/test/java/com/ably/kafka/connect/MessageConversionTest.java index 841cab23..5b0e301f 100644 --- a/src/test/java/com/ably/kafka/connect/MessageConversionTest.java +++ b/src/test/java/com/ably/kafka/connect/MessageConversionTest.java @@ -119,8 +119,7 @@ public Header rename(String key) { final MessageExtras messageExtras = MessageConverter.toAblyMessage("name", record).extras; //then - final JsonObject receivedObject = messageExtras.asJsonObject().get("kafka").getAsJsonObject(); - final JsonObject receivedHeaders = receivedObject.get("headers").getAsJsonObject(); + final JsonObject receivedHeaders = messageExtras.asJsonObject().get("headers").getAsJsonObject(); assertEquals(receivedHeaders.get("key1").getAsString(), "value1"); assertEquals(receivedHeaders.get("key2").getAsString(), "value2"); } From c45b50b0e2c298c3768d72a1ea7d63ab5370be37 Mon Sep 17 00:00:00 2001 From: evgeny Date: Wed, 20 Sep 2023 12:25:15 +0100 Subject: [PATCH 3/3] chore: do not stringify push header, stringify all other headers --- .../utils/RecordHeaderConversions.java | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/ably/kafka/connect/utils/RecordHeaderConversions.java b/src/main/java/com/ably/kafka/connect/utils/RecordHeaderConversions.java index 93c3ad90..132cfdb7 100644 --- a/src/main/java/com/ably/kafka/connect/utils/RecordHeaderConversions.java +++ b/src/main/java/com/ably/kafka/connect/utils/RecordHeaderConversions.java @@ -46,9 +46,9 @@ public static MessageExtras toMessageExtras(final SinkRecord record) { return extras.toMessageExtras(); } - /* - Wrapper class representing extras and is to be used to simplify building of extras object - * */ + /** + * Wrapper class representing extras and is to be used to simplify building of extras object + */ private static class Extras { private JsonUtils.JsonUtilsObject kafkaObject; @@ -95,9 +95,15 @@ Extras.Builder recordHeaders(Headers headers) { private void buildFromHeaders(Headers headers) { for (Header header : headers) { if (header.key().equals(PUSH_HEADER)) { - extras.pushExtrasValue = stringifyHeaderValueIfPrimitive(header); + // We don't stringify push header value, it is special header, that end up as a nested JSON object + // @see `Extras#buildPushPayload` + extras.pushExtrasValue = header.value(); } else { - headersObject().add(header.key(), stringifyHeaderValueIfPrimitive(header)); + // Kafka automatically deserialises headers. Because Kafka doesn't know about what types + // were originally, headers just get deserialised to the most obvious type. So that means + // numeric strings end up as a numbers. There’s a problem with Realtime whereby large numbers + // in headers breaks things. That's why we always stringify header value + headersObject().add(header.key(), String.valueOf(header.value())); } } @@ -179,17 +185,5 @@ MessageExtras toMessageExtras(){ } return null; } - - /** - * Stringify header value if it is primitive. - */ - private static Object stringifyHeaderValueIfPrimitive(Header header) { - if (header.schema() != null && header.schema().type().isPrimitive()) { - return String.valueOf(header.value()); - } else { - return header.value(); - } - } - } }