Skip to content

Commit

Permalink
Implement messages for TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 10, 2024
1 parent 833ec12 commit 3a2d1c1
Show file tree
Hide file tree
Showing 12 changed files with 277 additions and 51 deletions.
13 changes: 9 additions & 4 deletions src/main/java/rs/iggy/clients/blocking/MessagesClient.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package rs.iggy.clients.blocking;

import rs.iggy.identifier.ConsumerId;
import rs.iggy.consumergroup.Consumer;
import rs.iggy.identifier.StreamId;
import rs.iggy.identifier.TopicId;
import rs.iggy.message.MessageToSend;
Expand All @@ -12,11 +12,16 @@

public interface MessagesClient {

PolledMessages pollMessages(Long streamId, Long topicId, Optional<Long> partitionId, Long consumerId, PollingStrategy strategy, Long count, boolean autoCommit);
default PolledMessages pollMessages(Long streamId, Long topicId, Optional<Long> partitionId, Long consumerId, PollingStrategy strategy, Long count, boolean autoCommit) {
return pollMessages(StreamId.of(streamId), TopicId.of(topicId), partitionId, Consumer.of(consumerId),
strategy, count, autoCommit);
}

PolledMessages pollMessages(StreamId streamId, TopicId topicId, Optional<Long> partitionId, ConsumerId consumerId, PollingStrategy strategy, Long count, boolean autoCommit);
PolledMessages pollMessages(StreamId streamId, TopicId topicId, Optional<Long> partitionId, Consumer consumer, PollingStrategy strategy, Long count, boolean autoCommit);

void sendMessages(Long streamId, Long topicId, Partitioning partitioning, List<MessageToSend> messages);
default void sendMessages(Long streamId, Long topicId, Partitioning partitioning, List<MessageToSend> messages) {
sendMessages(StreamId.of(streamId), TopicId.of(topicId), partitioning, messages);
}

void sendMessages(StreamId streamId, TopicId topicId, Partitioning partitioning, List<MessageToSend> messages);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import org.apache.hc.core5.http.message.BasicNameValuePair;
import rs.iggy.clients.blocking.MessagesClient;
import rs.iggy.identifier.ConsumerId;
import rs.iggy.consumergroup.Consumer;
import rs.iggy.identifier.StreamId;
import rs.iggy.identifier.TopicId;
import rs.iggy.message.MessageToSend;
Expand All @@ -21,15 +21,9 @@ public MessagesHttpClient(HttpClient httpClient) {
}

@Override
public PolledMessages pollMessages(Long streamId, Long topicId, Optional<Long> partitionId, Long consumerId, PollingStrategy strategy, Long count, boolean autoCommit) {
return pollMessages(StreamId.of(streamId), TopicId.of(topicId), partitionId, ConsumerId.of(consumerId),
strategy, count, autoCommit);
}

@Override
public PolledMessages pollMessages(StreamId streamId, TopicId topicId, Optional<Long> partitionId, ConsumerId consumerId, PollingStrategy strategy, Long count, boolean autoCommit) {
public PolledMessages pollMessages(StreamId streamId, TopicId topicId, Optional<Long> partitionId, Consumer consumer, PollingStrategy strategy, Long count, boolean autoCommit) {
var request = httpClient.prepareGetRequest(path(streamId, topicId),
new BasicNameValuePair("consumer_id", consumerId.toString()),
new BasicNameValuePair("consumer_id", consumer.id().toString()),
partitionId.map(id -> new BasicNameValuePair("partition_id", id.toString())).orElse(null),
new BasicNameValuePair("strategy_kind", strategy.kind().name()),
new BasicNameValuePair("strategy_value", strategy.value().toString()),
Expand All @@ -38,11 +32,6 @@ public PolledMessages pollMessages(StreamId streamId, TopicId topicId, Optional<
return httpClient.execute(request, PolledMessages.class);
}

@Override
public void sendMessages(Long streamId, Long topicId, Partitioning partitioning, List<MessageToSend> messages) {
sendMessages(StreamId.of(streamId), TopicId.of(topicId), partitioning, messages);
}

@Override
public void sendMessages(StreamId streamId, TopicId topicId, Partitioning partitioning, List<MessageToSend> messages) {
var request = httpClient.preparePostRequest(path(streamId, topicId), new SendMessages(partitioning, messages));
Expand Down
51 changes: 48 additions & 3 deletions src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import rs.iggy.consumergroup.ConsumerGroupDetails;
import rs.iggy.consumergroup.ConsumerGroupMember;
import rs.iggy.consumeroffset.ConsumerOffsetInfo;
import rs.iggy.message.*;
import rs.iggy.partition.Partition;
import rs.iggy.stream.StreamBase;
import rs.iggy.stream.StreamDetails;
Expand All @@ -16,9 +17,7 @@
import rs.iggy.topic.TopicDetails;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.*;

final class BytesDeserializer {

Expand Down Expand Up @@ -119,4 +118,50 @@ private static BigInteger readU64AsBigInteger(ByteBuf buffer) {
return new BigInteger(bytesArray);
}

private static BigInteger readU128AsBigInteger(ByteBuf buffer) {
var bytesArray = new byte[17];
buffer.readBytes(bytesArray, 0, 16);
ArrayUtils.reverse(bytesArray);
return new BigInteger(bytesArray);
}

public static PolledMessages readPolledMessages(ByteBuf response) {
var partitionId = response.readUnsignedIntLE();
var currentOffset = readU64AsBigInteger(response);
var messagesCount = response.readUnsignedIntLE();
var messages = new ArrayList<Message>();
while (response.isReadable()) {
messages.add(readMessage(response));
}
return new PolledMessages(partitionId, currentOffset, messages);
}

private static Message readMessage(ByteBuf response) {
var offset = readU64AsBigInteger(response);
var stateCode = response.readByte();
var state = MessageState.fromCode(stateCode);
var timestamp = readU64AsBigInteger(response);
var id = readU128AsBigInteger(response);
var checksum = response.readUnsignedIntLE();
var headersLength = response.readUnsignedIntLE();
var headers = Optional.<Map<String, HeaderValue>>empty();
if (headersLength > 0) {
var headersMap = new HashMap<String, HeaderValue>();
ByteBuf headersBytes = response.readBytes(Long.valueOf(headersLength).intValue());
while (headersBytes.isReadable()) {
var keyLength = headersBytes.readUnsignedIntLE();
var key = headersBytes.readCharSequence(Long.valueOf(keyLength).intValue(), StandardCharsets.UTF_8).toString();
var kindCode = headersBytes.readByte();
var kind = HeaderKind.fromCode(kindCode);
var valueLength = headersBytes.readUnsignedIntLE();
var value = headersBytes.readCharSequence(Long.valueOf(valueLength).intValue(), StandardCharsets.UTF_8);
headersMap.put(key, new HeaderValue(kind, String.valueOf(value)));
}
headers = Optional.of(headersMap);
}
var payloadLength = response.readUnsignedIntLE();
var payload = new byte[Long.valueOf(payloadLength).intValue()];
response.readBytes(payload);
return new Message(offset, state, timestamp, id, checksum, headers, new String(payload));
}
}
66 changes: 66 additions & 0 deletions src/main/java/rs/iggy/clients/blocking/tcp/BytesSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import org.apache.commons.lang3.ArrayUtils;
import rs.iggy.consumergroup.Consumer;
import rs.iggy.identifier.Identifier;
import rs.iggy.message.HeaderValue;
import rs.iggy.message.MessageToSend;
import rs.iggy.message.Partitioning;
import rs.iggy.message.PollingStrategy;
import java.math.BigInteger;
import java.util.Map;

final class BytesSerializer {

Expand Down Expand Up @@ -45,6 +50,7 @@ static ByteBuf nameToBytes(String name) {
}

static ByteBuf toBytesAsU64(BigInteger value) {
//TODO(mm): 10.10.2024 fix for bigger values
ByteBuf buffer = Unpooled.buffer(8, 8);
byte[] valueAsBytes = value.toByteArray();
if (valueAsBytes.length > 8) {
Expand All @@ -58,4 +64,64 @@ static ByteBuf toBytesAsU64(BigInteger value) {
return buffer;
}

static ByteBuf toBytes(Partitioning partitioning) {
ByteBuf buffer = Unpooled.buffer(2 + partitioning.value().length);
buffer.writeByte(partitioning.kind().asCode());
buffer.writeByte(partitioning.value().length);
buffer.writeBytes(partitioning.value());
return buffer;
}

static ByteBuf toBytes(MessageToSend message) {
var buffer = Unpooled.buffer();
buffer.writeBytes(toBytesAsU128(message.id()));
message.headers().ifPresentOrElse((headers) -> {
var headersBytes = toBytes(headers);
buffer.writeIntLE(headersBytes.readableBytes());
buffer.writeBytes(headersBytes);
}, () -> buffer.writeIntLE(0));
buffer.writeIntLE(message.payload().length);
buffer.writeBytes(message.payload());
return buffer;
}

private static ByteBuf toBytes(Map<String, HeaderValue> headers) {
if (headers.isEmpty()) {
return Unpooled.EMPTY_BUFFER;
}
var buffer = Unpooled.buffer();
for (Map.Entry<String, HeaderValue> entry : headers.entrySet()) {
String key = entry.getKey();
buffer.writeIntLE(key.length());
buffer.writeBytes(key.getBytes());

HeaderValue value = entry.getValue();
buffer.writeByte(value.kind().asCode());
buffer.writeIntLE(value.value().length());
buffer.writeBytes(value.value().getBytes());
}
return buffer;
}

private static ByteBuf toBytesAsU128(BigInteger value) {
//TODO(mm): 10.10.2024 fix for bigger values
ByteBuf buffer = Unpooled.buffer(16, 16);
byte[] valueAsBytes = value.toByteArray();
if (valueAsBytes.length > 17) {
throw new IllegalArgumentException();
}
ArrayUtils.reverse(valueAsBytes);
buffer.writeBytes(valueAsBytes, 0, Math.min(valueAsBytes.length, 16));
if (valueAsBytes.length < 16) {
buffer.writeZero(16 - valueAsBytes.length);
}
return buffer;
}

public static ByteBuf toBytes(PollingStrategy strategy) {
var buffer = Unpooled.buffer(9);
buffer.writeByte(strategy.kind().asCode());
buffer.writeBytes(toBytesAsU64(strategy.value()));
return buffer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public class IggyTcpClient implements IggyClient {
private final PartitionsTcpClient partitionsClient;
private final ConsumerGroupsTcpClient consumerGroupsClient;
private final ConsumerOffsetTcpClient consumerOffsetsClient;
private final MessagesTcpClient messagesClient;

public IggyTcpClient(String host, Integer port) {
TcpConnectionHandler connection = new TcpConnectionHandler(host, port);
Expand All @@ -19,6 +20,7 @@ public IggyTcpClient(String host, Integer port) {
partitionsClient = new PartitionsTcpClient(connection);
consumerGroupsClient = new ConsumerGroupsTcpClient(connection);
consumerOffsetsClient = new ConsumerOffsetTcpClient(connection);
messagesClient = new MessagesTcpClient(connection);
}

@Override
Expand Down Expand Up @@ -58,6 +60,6 @@ public ConsumerOffsetsClient consumerOffsets() {

@Override
public MessagesClient messages() {
throw new UnsupportedOperationException();
return messagesClient;
}
}
52 changes: 52 additions & 0 deletions src/main/java/rs/iggy/clients/blocking/tcp/MessagesTcpClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package rs.iggy.clients.blocking.tcp;

import rs.iggy.clients.blocking.MessagesClient;
import rs.iggy.consumergroup.Consumer;
import rs.iggy.identifier.StreamId;
import rs.iggy.identifier.TopicId;
import rs.iggy.message.MessageToSend;
import rs.iggy.message.Partitioning;
import rs.iggy.message.PolledMessages;
import rs.iggy.message.PollingStrategy;
import java.util.List;
import java.util.Optional;
import static rs.iggy.clients.blocking.tcp.BytesSerializer.toBytes;

class MessagesTcpClient implements MessagesClient {

private static final int POLL_MESSAGES_CODE = 100;
private static final int SEND_MESSAGES_CODE = 101;

private final TcpConnectionHandler connection;

public MessagesTcpClient(TcpConnectionHandler connection) {
this.connection = connection;
}

@Override
public PolledMessages pollMessages(StreamId streamId, TopicId topicId, Optional<Long> partitionId, Consumer consumer, PollingStrategy strategy, Long count, boolean autoCommit) {
var payload = toBytes(consumer);
payload.writeBytes(toBytes(streamId));
payload.writeBytes(toBytes(topicId));
payload.writeIntLE(partitionId.orElse(0L).intValue());
payload.writeBytes(toBytes(strategy));
payload.writeIntLE(count.intValue());
payload.writeByte(autoCommit ? 1 : 0);

var response = connection.send(POLL_MESSAGES_CODE, payload);

return BytesDeserializer.readPolledMessages(response);
}

@Override
public void sendMessages(StreamId streamId, TopicId topicId, Partitioning partitioning, List<MessageToSend> messages) {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(partitioning));
for (var message : messages) {
payload.writeBytes(toBytes(message));
}

connection.send(SEND_MESSAGES_CODE, payload);
}
}
51 changes: 35 additions & 16 deletions src/main/java/rs/iggy/message/HeaderKind.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
package rs.iggy.message;

enum HeaderKind {
Raw,
String,
Bool,
Int8,
Int16,
Int32,
Int64,
Int128,
Uint8,
Uint16,
Uint32,
Uint64,
Uint128,
Float32,
Float64,
public enum HeaderKind {
Raw(1),
String(2),
Bool(3),
Int8(4),
Int16(5),
Int32(6),
Int64(7),
Int128(8),
Uint8(9),
Uint16(10),
Uint32(11),
Uint64(12),
Uint128(13),
Float32(14),
Float64(15);

private final int code;

HeaderKind(int code) {
this.code = code;
}

public static HeaderKind fromCode(int code) {
for (HeaderKind kind : values()) {
if (kind.code == code) {
return kind;
}
}
throw new IllegalArgumentException("Unknown header kind: " + code);
}

public int asCode() {
return code;
}
}
2 changes: 1 addition & 1 deletion src/main/java/rs/iggy/message/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public record Message(
BigInteger timestamp,
BigInteger id,
Long checksum,
Optional<Map<String, Map>> headers,
Optional<Map<String, HeaderValue>> headers,
String payload
) {

Expand Down
23 changes: 19 additions & 4 deletions src/main/java/rs/iggy/message/MessageState.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
package rs.iggy.message;

public enum MessageState {
Available,
Unavailable,
Poisoned,
MarkedForDeletion,
Available(1),
Unavailable(10),
Poisoned(20),
MarkedForDeletion(30);

private final int code;

MessageState(int code) {
this.code = code;
}

public static MessageState fromCode(int code) {
for (MessageState state : MessageState.values()) {
if (state.code == code) {
return state;
}
}
throw new IllegalArgumentException("Unknown message state code: " + code);
}
}
Loading

0 comments on commit 3a2d1c1

Please sign in to comment.