Skip to content

Commit

Permalink
Merge pull request #155 from ably/stringify-headers
Browse files Browse the repository at this point in the history
feat!: stringify and flatten Kafka headers in extras
  • Loading branch information
ttypic authored Sep 21, 2023
2 parents 5f6c156 + c45b50b commit c048a5b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public static MessageExtras toMessageExtras(final SinkRecord record) {
return extras.toMessageExtras();
}

/*
Wrapper class representing extras and is to be used to simplify building of extras object
* */
/**
* Wrapper class representing extras and is to be used to simplify building of extras object
*/
private static class Extras {

private JsonUtils.JsonUtilsObject kafkaObject;
Expand Down Expand Up @@ -95,14 +95,20 @@ Extras.Builder recordHeaders(Headers headers) {
private void buildFromHeaders(Headers headers) {
for (Header header : headers) {
if (header.key().equals(PUSH_HEADER)) {
// We don't stringify push header value, it is special header, that end up as a nested JSON object
// @see `Extras#buildPushPayload`
extras.pushExtrasValue = header.value();
} else {
headersObject().add(header.key(), header.value());
// Kafka automatically deserialises headers. Because Kafka doesn't know about what types
// were originally, headers just get deserialised to the most obvious type. So that means
// numeric strings end up as a numbers. There’s a problem with Realtime whereby large numbers
// in headers breaks things. That's why we always stringify header value
headersObject().add(header.key(), String.valueOf(header.value()));
}
}

if (extras.headersObject != null) {
kafkaExtras().add(HEADERS_KEY, extras.headersObject);
topExtrasObject().add(HEADERS_KEY, extras.headersObject);
}

buildPushExtras();
Expand Down Expand Up @@ -179,6 +185,5 @@ MessageExtras toMessageExtras(){
}
return null;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ public Header rename(String key) {
final MessageExtras messageExtras = MessageConverter.toAblyMessage("name", record).extras;

//then
final JsonObject receivedObject = messageExtras.asJsonObject().get("kafka").getAsJsonObject();
final JsonObject receivedHeaders = receivedObject.get("headers").getAsJsonObject();
final JsonObject receivedHeaders = messageExtras.asJsonObject().get("headers").getAsJsonObject();
assertEquals(receivedHeaders.get("key1").getAsString(), "value1");
assertEquals(receivedHeaders.get("key2").getAsString(), "value2");
}
Expand Down

0 comments on commit c048a5b

Please sign in to comment.