diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java b/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java index 2ec17da..e6bdc50 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java @@ -2,8 +2,6 @@ import io.netty.buffer.ByteBuf; import org.apache.commons.lang3.ArrayUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import rs.iggy.consumergroup.ConsumerGroup; import rs.iggy.consumergroup.ConsumerGroupDetails; import rs.iggy.consumergroup.ConsumerGroupMember; @@ -21,8 +19,6 @@ final class BytesDeserializer { - private static final Logger log = LoggerFactory.getLogger(BytesDeserializer.class); - private BytesDeserializer() { } @@ -60,7 +56,7 @@ public static TopicDetails readTopicDetails(ByteBuf response) { return new TopicDetails(topic, partitions); } - private static Partition readPartition(ByteBuf response) { + static Partition readPartition(ByteBuf response) { var partitionId = response.readUnsignedIntLE(); var createdAt = readU64AsBigInteger(response); var segmentsCount = response.readUnsignedIntLE(); @@ -105,7 +101,7 @@ public static ConsumerGroupDetails readConsumerGroupDetails(ByteBuf response) { return new ConsumerGroupDetails(consumerGroup, members); } - private static ConsumerGroupMember readConsumerGroupMember(ByteBuf response) { + static ConsumerGroupMember readConsumerGroupMember(ByteBuf response) { var memberId = response.readUnsignedIntLE(); var partitionsCount = response.readUnsignedIntLE(); List partitionIds = new ArrayList<>(); @@ -131,24 +127,10 @@ public static ConsumerOffsetInfo readConsumerOffsetInfo(ByteBuf response) { return new ConsumerOffsetInfo(partitionId, currentOffset, storedOffset); } - private static BigInteger readU64AsBigInteger(ByteBuf buffer) { - var bytesArray = new byte[9]; - buffer.readBytes(bytesArray, 0, 8); - ArrayUtils.reverse(bytesArray); - return new BigInteger(bytesArray); - } - - private static BigInteger readU128AsBigInteger(ByteBuf buffer) { - var bytesArray = new byte[17]; - buffer.readBytes(bytesArray, 0, 16); - ArrayUtils.reverse(bytesArray); - return new BigInteger(bytesArray); - } - public static PolledMessages readPolledMessages(ByteBuf response) { var partitionId = response.readUnsignedIntLE(); var currentOffset = readU64AsBigInteger(response); - var messagesCount = response.readUnsignedIntLE(); + var _messagesCount = response.readUnsignedIntLE(); var messages = new ArrayList(); while (response.isReadable()) { messages.add(readMessage(response)); @@ -156,7 +138,7 @@ public static PolledMessages readPolledMessages(ByteBuf response) { return new PolledMessages(partitionId, currentOffset, messages); } - private static Message readMessage(ByteBuf response) { + static Message readMessage(ByteBuf response) { var offset = readU64AsBigInteger(response); var stateCode = response.readByte(); var state = MessageState.fromCode(stateCode); @@ -167,21 +149,44 @@ private static Message readMessage(ByteBuf response) { var headers = Optional.>empty(); if (headersLength > 0) { var headersMap = new HashMap(); - ByteBuf headersBytes = response.readBytes(Long.valueOf(headersLength).intValue()); + ByteBuf headersBytes = response.readBytes(toInt(headersLength)); while (headersBytes.isReadable()) { var keyLength = headersBytes.readUnsignedIntLE(); - var key = headersBytes.readCharSequence(Long.valueOf(keyLength).intValue(), StandardCharsets.UTF_8).toString(); + var key = headersBytes.readCharSequence(toInt(keyLength), StandardCharsets.UTF_8).toString(); var kindCode = headersBytes.readByte(); var kind = HeaderKind.fromCode(kindCode); var valueLength = headersBytes.readUnsignedIntLE(); - var value = headersBytes.readCharSequence(Long.valueOf(valueLength).intValue(), StandardCharsets.UTF_8); + var value = headersBytes.readCharSequence(toInt(valueLength), StandardCharsets.UTF_8); headersMap.put(key, new HeaderValue(kind, String.valueOf(value))); } headers = Optional.of(headersMap); } var payloadLength = response.readUnsignedIntLE(); - var payload = new byte[Long.valueOf(payloadLength).intValue()]; + var payload = newByteArray(payloadLength); response.readBytes(payload); - return new Message(offset, state, timestamp, id, checksum, headers, new String(payload)); + return new Message(offset, state, timestamp, id, checksum, headers, payload); + } + + private static BigInteger readU64AsBigInteger(ByteBuf buffer) { + var bytesArray = new byte[9]; + buffer.readBytes(bytesArray, 0, 8); + ArrayUtils.reverse(bytesArray); + return new BigInteger(bytesArray); } + + private static BigInteger readU128AsBigInteger(ByteBuf buffer) { + var bytesArray = new byte[17]; + buffer.readBytes(bytesArray, 0, 16); + ArrayUtils.reverse(bytesArray); + return new BigInteger(bytesArray); + } + + private static int toInt(Long size) { + return size.intValue(); + } + + private static byte[] newByteArray(Long size) { + return new byte[size.intValue()]; + } + } diff --git a/src/main/java/rs/iggy/message/Message.java b/src/main/java/rs/iggy/message/Message.java index c7fd067..75a73ea 100644 --- a/src/main/java/rs/iggy/message/Message.java +++ b/src/main/java/rs/iggy/message/Message.java @@ -1,7 +1,6 @@ package rs.iggy.message; import java.math.BigInteger; -import java.util.Base64; import java.util.Map; import java.util.Optional; @@ -12,11 +11,6 @@ public record Message( BigInteger id, Long checksum, Optional> headers, - String payload + byte[] payload ) { - - String getDecodedPayload() { - return new String(Base64.getDecoder().decode(payload)); - } - }