From 88f429a6026d62217e7a62d5f0b329d54bc5858f Mon Sep 17 00:00:00 2001 From: Maciej Modzelewski Date: Wed, 9 Oct 2024 20:31:19 +0200 Subject: [PATCH] Implement topics for TCP --- .../iggy/clients/blocking/TopicsClient.java | 41 +++++++--- .../blocking/http/TopicsHttpClient.java | 39 --------- .../blocking/tcp/BytesDeserializer.java | 55 +++++++++++-- .../clients/blocking/tcp/BytesSerializer.java | 23 ++++++ .../clients/blocking/tcp/IggyTcpClient.java | 4 +- .../blocking/tcp/StreamsTcpClient.java | 19 +++-- .../blocking/tcp/TcpConnectionHandler.java | 2 +- .../clients/blocking/tcp/TopicsTcpClient.java | 82 +++++++++++++++++++ .../rs/iggy/topic/CompressionAlgorithm.java | 23 +++++- src/main/java/rs/iggy/topic/TopicDetails.java | 13 +++ .../blocking/tcp/TopicsTcpClientTest.java | 13 +++ 11 files changed, 243 insertions(+), 71 deletions(-) create mode 100644 src/main/java/rs/iggy/clients/blocking/tcp/TopicsTcpClient.java create mode 100644 src/test/java/rs/iggy/clients/blocking/tcp/TopicsTcpClientTest.java diff --git a/src/main/java/rs/iggy/clients/blocking/TopicsClient.java b/src/main/java/rs/iggy/clients/blocking/TopicsClient.java index 8b6a78c..ad07ce8 100644 --- a/src/main/java/rs/iggy/clients/blocking/TopicsClient.java +++ b/src/main/java/rs/iggy/clients/blocking/TopicsClient.java @@ -11,22 +11,35 @@ public interface TopicsClient { - TopicDetails getTopic(Long streamId, Long topicId); + default TopicDetails getTopic(Long streamId, Long topicId) { + return getTopic(StreamId.of(streamId), TopicId.of(topicId)); + } TopicDetails getTopic(StreamId streamId, TopicId topicId); - List getTopics(Long streamId); + default List getTopics(Long streamId) { + return getTopics(StreamId.of(streamId)); + } List getTopics(StreamId streamId); - void createTopic(Long streamId, - Optional topicId, - Long partitionsCount, - CompressionAlgorithm compressionAlgorithm, - BigInteger messageExpiry, - BigInteger maxTopicSize, - Optional replicationFactor, - String name); + default void createTopic(Long streamId, + Optional topicId, + Long partitionsCount, + CompressionAlgorithm compressionAlgorithm, + BigInteger messageExpiry, + BigInteger maxTopicSize, + Optional replicationFactor, + String name) { + createTopic(StreamId.of(streamId), + topicId, + partitionsCount, + compressionAlgorithm, + messageExpiry, + maxTopicSize, + replicationFactor, + name); + } void createTopic(StreamId streamId, Optional topicId, @@ -37,11 +50,15 @@ void createTopic(StreamId streamId, Optional replicationFactor, String name); - void updateTopic(Long streamId, Long topicId, Optional messageExpiry, String name); + default void updateTopic(Long streamId, Long topicId, Optional messageExpiry, String name) { + updateTopic(StreamId.of(streamId), TopicId.of(topicId), messageExpiry, name); + } void updateTopic(StreamId streamId, TopicId topicId, Optional messageExpiry, String name); - void deleteTopic(Long streamId, Long topicId); + default void deleteTopic(Long streamId, Long topicId) { + deleteTopic(StreamId.of(streamId), TopicId.of(topicId)); + } void deleteTopic(StreamId streamId, TopicId topicId); diff --git a/src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java b/src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java index f704516..ce7dbb7 100644 --- a/src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java @@ -21,22 +21,12 @@ public TopicsHttpClient(HttpClient httpClient) { this.httpClient = httpClient; } - @Override - public TopicDetails getTopic(Long streamId, Long topicId) { - return getTopic(StreamId.of(streamId), TopicId.of(topicId)); - } - @Override public TopicDetails getTopic(StreamId streamId, TopicId topicId) { var request = httpClient.prepareGetRequest(STREAMS + "/" + streamId + TOPICS + "/" + topicId); return httpClient.execute(request, TopicDetails.class); } - @Override - public List getTopics(Long streamId) { - return getTopics(StreamId.of(streamId)); - } - @Override public List getTopics(StreamId streamId) { var request = httpClient.prepareGetRequest(STREAMS + "/" + streamId + TOPICS); @@ -44,25 +34,6 @@ public List getTopics(StreamId streamId) { }); } - @Override - public void createTopic(Long streamId, - Optional topicId, - Long partitionsCount, - CompressionAlgorithm compressionAlgorithm, - BigInteger messageExpiry, - BigInteger maxTopicSize, - Optional replicationFactor, - String name) { - createTopic(StreamId.of(streamId), - topicId, - partitionsCount, - compressionAlgorithm, - messageExpiry, - maxTopicSize, - replicationFactor, - name); - } - @Override public void createTopic(StreamId streamId, Optional topicId, @@ -83,11 +54,6 @@ public void createTopic(StreamId streamId, httpClient.execute(request); } - @Override - public void updateTopic(Long streamId, Long topicId, Optional messageExpiry, String name) { - updateTopic(StreamId.of(streamId), TopicId.of(topicId), messageExpiry, name); - } - @Override public void updateTopic(StreamId streamId, TopicId topicId, Optional messageExpiry, String name) { var request = httpClient.preparePutRequest(STREAMS + "/" + streamId + TOPICS + "/" + topicId, @@ -95,11 +61,6 @@ public void updateTopic(StreamId streamId, TopicId topicId, Optional messa httpClient.execute(request); } - @Override - public void deleteTopic(Long streamId, Long topicId) { - deleteTopic(StreamId.of(streamId), TopicId.of(topicId)); - } - @Override public void deleteTopic(StreamId streamId, TopicId topicId) { var request = httpClient.prepareDeleteRequest(STREAMS + "/" + streamId + TOPICS + "/" + topicId); diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java b/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java index 88f0775..c043f70 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java @@ -4,11 +4,15 @@ import org.apache.commons.lang3.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import rs.iggy.partition.Partition; import rs.iggy.stream.StreamBase; import rs.iggy.stream.StreamDetails; +import rs.iggy.topic.CompressionAlgorithm; import rs.iggy.topic.Topic; +import rs.iggy.topic.TopicDetails; import java.math.BigInteger; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -21,10 +25,10 @@ private BytesDeserializer() { static StreamBase readStreamBase(ByteBuf response) { var streamId = response.readUnsignedIntLE(); - var createdAt = readU64ToBigInteger(response); + var createdAt = readU64AsBigInteger(response); var topicsCount = response.readUnsignedIntLE(); - var size = readU64ToBigInteger(response); - var messagesCount = readU64ToBigInteger(response); + var size = readU64AsBigInteger(response); + var messagesCount = readU64AsBigInteger(response); var nameLength = response.readByte(); var name = response.readCharSequence(nameLength, StandardCharsets.UTF_8).toString(); @@ -34,17 +38,52 @@ static StreamBase readStreamBase(ByteBuf response) { static StreamDetails readStreamDetails(ByteBuf response) { var streamBase = readStreamBase(response); - List topics = Collections.emptyList(); + List topics = new ArrayList<>(); if (response.isReadable()) { - log.debug("has more data"); //TODO(mm): 6.10.2024 Add topics implementation + topics.add(readTopic(response)); } return new StreamDetails(streamBase, topics); } - private static BigInteger readU64ToBigInteger(ByteBuf buffer) { - var bytesArray = new byte[8]; - buffer.readBytes(bytesArray); + public static TopicDetails readTopicDetails(ByteBuf response) { + var topic = readTopic(response); + + List partitions = Collections.emptyList(); + if (response.isReadable()) { + log.debug("has more data"); //TODO(mm): 8.10.2024 Add partitions + } + + return new TopicDetails(topic, partitions); + } + + public static Topic readTopic(ByteBuf response) { + var topicId = response.readUnsignedIntLE(); + var createdAt = readU64AsBigInteger(response); + var partitionsCount = response.readUnsignedIntLE(); + var messageExpiry = readU64AsBigInteger(response); + var compressionAlgorithmCode = response.readByte(); + var maxTopicSize = readU64AsBigInteger(response); + var replicationFactor = response.readByte(); + var size = readU64AsBigInteger(response); + var messagesCount = readU64AsBigInteger(response); + var nameLength = response.readByte(); + var name = response.readCharSequence(nameLength, StandardCharsets.UTF_8).toString(); + return new Topic(topicId, + createdAt, + name, + size.toString(), + messageExpiry, + CompressionAlgorithm.fromCode(compressionAlgorithmCode), + maxTopicSize, + (short) replicationFactor, + messagesCount, + partitionsCount); + } + + private static BigInteger readU64AsBigInteger(ByteBuf buffer) { + var bytesArray = new byte[9]; + buffer.readBytes(bytesArray, 0, 8); ArrayUtils.reverse(bytesArray); return new BigInteger(bytesArray); } diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/BytesSerializer.java b/src/main/java/rs/iggy/clients/blocking/tcp/BytesSerializer.java index 40cdfc9..dc4c3f5 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/BytesSerializer.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/BytesSerializer.java @@ -2,7 +2,9 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import org.apache.commons.lang3.ArrayUtils; import rs.iggy.identifier.Identifier; +import java.math.BigInteger; final class BytesSerializer { @@ -27,4 +29,25 @@ static ByteBuf toBytes(Identifier identifier) { } } + static ByteBuf nameToBytes(String name) { + ByteBuf buffer = Unpooled.buffer(1 + name.length()); + buffer.writeByte(name.length()); + buffer.writeBytes(name.getBytes()); + return buffer; + } + + static ByteBuf toBytesAsU64(BigInteger value) { + ByteBuf buffer = Unpooled.buffer(8, 8); + byte[] valueAsBytes = value.toByteArray(); + if (valueAsBytes.length > 8) { + throw new IllegalArgumentException(); + } + ArrayUtils.reverse(valueAsBytes); + buffer.writeBytes(valueAsBytes); + if (valueAsBytes.length < 8) { + buffer.writeZero(8 - valueAsBytes.length); + } + return buffer; + } + } diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java index 28914a1..7caf753 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java @@ -6,11 +6,13 @@ public class IggyTcpClient implements IggyClient { private final UsersTcpClient usersClient; private final StreamsTcpClient streamsClient; + private final TopicsTcpClient topicsClient; public IggyTcpClient(String host, Integer port) { TcpConnectionHandler connection = new TcpConnectionHandler(host, port); usersClient = new UsersTcpClient(connection); streamsClient = new StreamsTcpClient(connection); + topicsClient = new TopicsTcpClient(connection); } @Override @@ -30,7 +32,7 @@ public UsersClient users() { @Override public TopicsClient topics() { - throw new UnsupportedOperationException(); + return topicsClient; } @Override diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java index ad01dae..314e4e2 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java @@ -6,7 +6,12 @@ import rs.iggy.identifier.StreamId; import rs.iggy.stream.StreamBase; import rs.iggy.stream.StreamDetails; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readStreamBase; +import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readStreamDetails; +import static rs.iggy.clients.blocking.tcp.BytesSerializer.nameToBytes; import static rs.iggy.clients.blocking.tcp.BytesSerializer.toBytes; class StreamsTcpClient implements StreamsClient { @@ -28,10 +33,9 @@ public StreamDetails createStream(Optional streamId, String name) { var payload = Unpooled.buffer(payloadSize); payload.writeIntLE(streamId.orElse(0L).intValue()); - payload.writeByte(name.length()); - payload.writeBytes(name.getBytes()); + payload.writeBytes(nameToBytes(name)); var response = connection.send(CREATE_STREAM_CODE, payload); - return BytesDeserializer.readStreamDetails(response); + return readStreamDetails(response); } @Override @@ -43,7 +47,7 @@ public StreamDetails getStream(Long streamId) { public StreamDetails getStream(StreamId streamId) { var payload = toBytes(streamId); var response = connection.send(GET_STREAM_CODE, payload); - return BytesDeserializer.readStreamDetails(response); + return readStreamDetails(response); } @Override @@ -51,7 +55,7 @@ public List getStreams() { ByteBuf response = connection.send(GET_STREAMS_CODE); List streams = new ArrayList<>(); while (response.isReadable()) { - streams.add(BytesDeserializer.readStreamBase(response)); + streams.add(readStreamBase(response)); } return streams; } @@ -68,8 +72,7 @@ public void updateStream(StreamId streamId, String name) { var payload = Unpooled.buffer(payloadSize + idBytes.capacity()); payload.writeBytes(idBytes); - payload.writeByte(name.length()); - payload.writeBytes(name.getBytes()); + payload.writeBytes(nameToBytes(name)); connection.send(UPDATE_STREAM_CODE, payload); } diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/TcpConnectionHandler.java b/src/main/java/rs/iggy/clients/blocking/tcp/TcpConnectionHandler.java index 7c9ef1a..6276a14 100644 --- a/src/main/java/rs/iggy/clients/blocking/tcp/TcpConnectionHandler.java +++ b/src/main/java/rs/iggy/clients/blocking/tcp/TcpConnectionHandler.java @@ -62,7 +62,7 @@ ByteBuf send(int command, ByteBuf payload) { return handleResponse(status, responseLength, responseBuffer); } - ByteBuf handleResponse(long status, int responseLength, ByteBuf responseBuffer) { + private ByteBuf handleResponse(long status, int responseLength, ByteBuf responseBuffer) { if (status != 0) { throw new RuntimeException("Received an invalid response with status " + status); } diff --git a/src/main/java/rs/iggy/clients/blocking/tcp/TopicsTcpClient.java b/src/main/java/rs/iggy/clients/blocking/tcp/TopicsTcpClient.java new file mode 100644 index 0000000..ee84d1a --- /dev/null +++ b/src/main/java/rs/iggy/clients/blocking/tcp/TopicsTcpClient.java @@ -0,0 +1,82 @@ +package rs.iggy.clients.blocking.tcp; + +import io.netty.buffer.Unpooled; +import rs.iggy.clients.blocking.TopicsClient; +import rs.iggy.identifier.StreamId; +import rs.iggy.identifier.TopicId; +import rs.iggy.topic.CompressionAlgorithm; +import rs.iggy.topic.Topic; +import rs.iggy.topic.TopicDetails; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readTopic; +import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readTopicDetails; +import static rs.iggy.clients.blocking.tcp.BytesSerializer.nameToBytes; +import static rs.iggy.clients.blocking.tcp.BytesSerializer.toBytes; + +class TopicsTcpClient implements TopicsClient { + + private static final int GET_TOPIC_CODE = 300; + private static final int GET_TOPICS_CODE = 301; + private static final int CREATE_TOPIC_CODE = 302; + private static final int DELETE_TOPIC_CODE = 303; + private static final int UPDATE_TOPIC_CODE = 304; + private static final int PURGE_TOPIC_CODE = 305; + private final TcpConnectionHandler connection; + + TopicsTcpClient(TcpConnectionHandler connection) { + this.connection = connection; + } + + @Override + public TopicDetails getTopic(StreamId streamId, TopicId topicId) { + var payload = toBytes(streamId); + payload.writeBytes(toBytes(topicId)); + var response = connection.send(GET_TOPIC_CODE, payload); + return readTopicDetails(response); + } + + @Override + public List getTopics(StreamId streamId) { + var payload = toBytes(streamId); + var response = connection.send(GET_TOPICS_CODE, payload); + List topics = new ArrayList<>(); + while (response.isReadable()) { + topics.add(readTopic(response)); + } + return topics; + } + + @Override + public void createTopic(StreamId streamId, Optional topicId, Long partitionsCount, CompressionAlgorithm compressionAlgorithm, BigInteger messageExpiry, BigInteger maxTopicSize, Optional replicationFactor, String name) { + var streamIdBytes = toBytes(streamId); + var payload = Unpooled.buffer(23 + streamIdBytes.capacity() + name.length()); + + payload.writeBytes(streamIdBytes); + payload.writeIntLE(topicId.orElse(0L).intValue()); + payload.writeIntLE(partitionsCount.intValue()); + payload.writeByte(compressionAlgorithm.asCode()); + payload.writeBytes(BytesSerializer.toBytesAsU64(messageExpiry)); + payload.writeBytes(BytesSerializer.toBytesAsU64(maxTopicSize)); + payload.writeByte(replicationFactor.orElse((short) 0)); + payload.writeBytes(nameToBytes(name)); + + var response = connection.send(CREATE_TOPIC_CODE, payload); + //TODO(mm): 9.10.2024 return topic details + } + + + @Override + public void updateTopic(StreamId streamId, TopicId topicId, Optional messageExpiry, String name) { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteTopic(StreamId streamId, TopicId topicId) { + var payload = toBytes(streamId); + payload.writeBytes(toBytes(topicId)); + connection.send(DELETE_TOPIC_CODE, payload); + } +} diff --git a/src/main/java/rs/iggy/topic/CompressionAlgorithm.java b/src/main/java/rs/iggy/topic/CompressionAlgorithm.java index be09d13..ec489f6 100644 --- a/src/main/java/rs/iggy/topic/CompressionAlgorithm.java +++ b/src/main/java/rs/iggy/topic/CompressionAlgorithm.java @@ -1,6 +1,25 @@ package rs.iggy.topic; public enum CompressionAlgorithm { - none, - gzip + none(1), + gzip(2); + + private final Integer code; + + CompressionAlgorithm(Integer code) { + this.code = code; + } + + public static CompressionAlgorithm fromCode(byte code) { + for (CompressionAlgorithm algorithm : values()) { + if (algorithm.code == code) { + return algorithm; + } + } + throw new IllegalArgumentException(); + } + + public Integer asCode() { + return code; + } } diff --git a/src/main/java/rs/iggy/topic/TopicDetails.java b/src/main/java/rs/iggy/topic/TopicDetails.java index 4e1395f..135470c 100644 --- a/src/main/java/rs/iggy/topic/TopicDetails.java +++ b/src/main/java/rs/iggy/topic/TopicDetails.java @@ -17,4 +17,17 @@ public record TopicDetails( Long partitionsCount, List partitions ) { + public TopicDetails(Topic topic, List partitions) { + this(topic.id(), + topic.createdAt(), + topic.name(), + topic.size(), + topic.messageExpiry(), + topic.compressionAlgorithm(), + topic.maxTopicSize(), + topic.replicationFactor(), + topic.messagesCount(), + topic.partitionsCount(), + partitions); + } } diff --git a/src/test/java/rs/iggy/clients/blocking/tcp/TopicsTcpClientTest.java b/src/test/java/rs/iggy/clients/blocking/tcp/TopicsTcpClientTest.java new file mode 100644 index 0000000..406b656 --- /dev/null +++ b/src/test/java/rs/iggy/clients/blocking/tcp/TopicsTcpClientTest.java @@ -0,0 +1,13 @@ +package rs.iggy.clients.blocking.tcp; + +import rs.iggy.clients.blocking.IggyClient; +import rs.iggy.clients.blocking.TopicsClientBaseTest; + +class TopicsTcpClientTest extends TopicsClientBaseTest { + + @Override + protected IggyClient getClient() { + return TcpClientFactory.create(iggyServer); + } + +}