Skip to content

Commit

Permalink
Update message payload mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 11, 2024
1 parent 77db3f3 commit f11378a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 34 deletions.
59 changes: 32 additions & 27 deletions src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import io.netty.buffer.ByteBuf;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rs.iggy.consumergroup.ConsumerGroup;
import rs.iggy.consumergroup.ConsumerGroupDetails;
import rs.iggy.consumergroup.ConsumerGroupMember;
Expand All @@ -21,8 +19,6 @@

final class BytesDeserializer {

private static final Logger log = LoggerFactory.getLogger(BytesDeserializer.class);

private BytesDeserializer() {
}

Expand Down Expand Up @@ -60,7 +56,7 @@ public static TopicDetails readTopicDetails(ByteBuf response) {
return new TopicDetails(topic, partitions);
}

private static Partition readPartition(ByteBuf response) {
static Partition readPartition(ByteBuf response) {
var partitionId = response.readUnsignedIntLE();
var createdAt = readU64AsBigInteger(response);
var segmentsCount = response.readUnsignedIntLE();
Expand Down Expand Up @@ -105,7 +101,7 @@ public static ConsumerGroupDetails readConsumerGroupDetails(ByteBuf response) {
return new ConsumerGroupDetails(consumerGroup, members);
}

private static ConsumerGroupMember readConsumerGroupMember(ByteBuf response) {
static ConsumerGroupMember readConsumerGroupMember(ByteBuf response) {
var memberId = response.readUnsignedIntLE();
var partitionsCount = response.readUnsignedIntLE();
List<Long> partitionIds = new ArrayList<>();
Expand All @@ -131,32 +127,18 @@ public static ConsumerOffsetInfo readConsumerOffsetInfo(ByteBuf response) {
return new ConsumerOffsetInfo(partitionId, currentOffset, storedOffset);
}

private static BigInteger readU64AsBigInteger(ByteBuf buffer) {
var bytesArray = new byte[9];
buffer.readBytes(bytesArray, 0, 8);
ArrayUtils.reverse(bytesArray);
return new BigInteger(bytesArray);
}

private static BigInteger readU128AsBigInteger(ByteBuf buffer) {
var bytesArray = new byte[17];
buffer.readBytes(bytesArray, 0, 16);
ArrayUtils.reverse(bytesArray);
return new BigInteger(bytesArray);
}

public static PolledMessages readPolledMessages(ByteBuf response) {
var partitionId = response.readUnsignedIntLE();
var currentOffset = readU64AsBigInteger(response);
var messagesCount = response.readUnsignedIntLE();
var _messagesCount = response.readUnsignedIntLE();
var messages = new ArrayList<Message>();
while (response.isReadable()) {
messages.add(readMessage(response));
}
return new PolledMessages(partitionId, currentOffset, messages);
}

private static Message readMessage(ByteBuf response) {
static Message readMessage(ByteBuf response) {
var offset = readU64AsBigInteger(response);
var stateCode = response.readByte();
var state = MessageState.fromCode(stateCode);
Expand All @@ -167,21 +149,44 @@ private static Message readMessage(ByteBuf response) {
var headers = Optional.<Map<String, HeaderValue>>empty();
if (headersLength > 0) {
var headersMap = new HashMap<String, HeaderValue>();
ByteBuf headersBytes = response.readBytes(Long.valueOf(headersLength).intValue());
ByteBuf headersBytes = response.readBytes(toInt(headersLength));
while (headersBytes.isReadable()) {
var keyLength = headersBytes.readUnsignedIntLE();
var key = headersBytes.readCharSequence(Long.valueOf(keyLength).intValue(), StandardCharsets.UTF_8).toString();
var key = headersBytes.readCharSequence(toInt(keyLength), StandardCharsets.UTF_8).toString();
var kindCode = headersBytes.readByte();
var kind = HeaderKind.fromCode(kindCode);
var valueLength = headersBytes.readUnsignedIntLE();
var value = headersBytes.readCharSequence(Long.valueOf(valueLength).intValue(), StandardCharsets.UTF_8);
var value = headersBytes.readCharSequence(toInt(valueLength), StandardCharsets.UTF_8);
headersMap.put(key, new HeaderValue(kind, String.valueOf(value)));
}
headers = Optional.of(headersMap);
}
var payloadLength = response.readUnsignedIntLE();
var payload = new byte[Long.valueOf(payloadLength).intValue()];
var payload = newByteArray(payloadLength);
response.readBytes(payload);
return new Message(offset, state, timestamp, id, checksum, headers, new String(payload));
return new Message(offset, state, timestamp, id, checksum, headers, payload);
}

private static BigInteger readU64AsBigInteger(ByteBuf buffer) {
var bytesArray = new byte[9];
buffer.readBytes(bytesArray, 0, 8);
ArrayUtils.reverse(bytesArray);
return new BigInteger(bytesArray);
}

private static BigInteger readU128AsBigInteger(ByteBuf buffer) {
var bytesArray = new byte[17];
buffer.readBytes(bytesArray, 0, 16);
ArrayUtils.reverse(bytesArray);
return new BigInteger(bytesArray);
}

private static int toInt(Long size) {
return size.intValue();
}

private static byte[] newByteArray(Long size) {
return new byte[size.intValue()];
}

}
8 changes: 1 addition & 7 deletions src/main/java/rs/iggy/message/Message.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rs.iggy.message;

import java.math.BigInteger;
import java.util.Base64;
import java.util.Map;
import java.util.Optional;

Expand All @@ -12,11 +11,6 @@ public record Message(
BigInteger id,
Long checksum,
Optional<Map<String, HeaderValue>> headers,
String payload
byte[] payload
) {

String getDecodedPayload() {
return new String(Base64.getDecoder().decode(payload));
}

}

0 comments on commit f11378a

Please sign in to comment.