Skip to content

Commit

Permalink
Implement topics for TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 9, 2024
1 parent e14a0a4 commit 88f429a
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 71 deletions.
41 changes: 29 additions & 12 deletions src/main/java/rs/iggy/clients/blocking/TopicsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,35 @@

public interface TopicsClient {

TopicDetails getTopic(Long streamId, Long topicId);
default TopicDetails getTopic(Long streamId, Long topicId) {
return getTopic(StreamId.of(streamId), TopicId.of(topicId));
}

TopicDetails getTopic(StreamId streamId, TopicId topicId);

List<Topic> getTopics(Long streamId);
default List<Topic> getTopics(Long streamId) {
return getTopics(StreamId.of(streamId));
}

List<Topic> getTopics(StreamId streamId);

void createTopic(Long streamId,
Optional<Long> topicId,
Long partitionsCount,
CompressionAlgorithm compressionAlgorithm,
BigInteger messageExpiry,
BigInteger maxTopicSize,
Optional<Short> replicationFactor,
String name);
default void createTopic(Long streamId,
Optional<Long> topicId,
Long partitionsCount,
CompressionAlgorithm compressionAlgorithm,
BigInteger messageExpiry,
BigInteger maxTopicSize,
Optional<Short> replicationFactor,
String name) {
createTopic(StreamId.of(streamId),
topicId,
partitionsCount,
compressionAlgorithm,
messageExpiry,
maxTopicSize,
replicationFactor,
name);
}

void createTopic(StreamId streamId,
Optional<Long> topicId,
Expand All @@ -37,11 +50,15 @@ void createTopic(StreamId streamId,
Optional<Short> replicationFactor,
String name);

void updateTopic(Long streamId, Long topicId, Optional<Long> messageExpiry, String name);
default void updateTopic(Long streamId, Long topicId, Optional<Long> messageExpiry, String name) {
updateTopic(StreamId.of(streamId), TopicId.of(topicId), messageExpiry, name);
}

void updateTopic(StreamId streamId, TopicId topicId, Optional<Long> messageExpiry, String name);

void deleteTopic(Long streamId, Long topicId);
default void deleteTopic(Long streamId, Long topicId) {
deleteTopic(StreamId.of(streamId), TopicId.of(topicId));
}

void deleteTopic(StreamId streamId, TopicId topicId);

Expand Down
39 changes: 0 additions & 39 deletions src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,19 @@ public TopicsHttpClient(HttpClient httpClient) {
this.httpClient = httpClient;
}

@Override
public TopicDetails getTopic(Long streamId, Long topicId) {
return getTopic(StreamId.of(streamId), TopicId.of(topicId));
}

@Override
public TopicDetails getTopic(StreamId streamId, TopicId topicId) {
var request = httpClient.prepareGetRequest(STREAMS + "/" + streamId + TOPICS + "/" + topicId);
return httpClient.execute(request, TopicDetails.class);
}

@Override
public List<Topic> getTopics(Long streamId) {
return getTopics(StreamId.of(streamId));
}

@Override
public List<Topic> getTopics(StreamId streamId) {
var request = httpClient.prepareGetRequest(STREAMS + "/" + streamId + TOPICS);
return httpClient.execute(request, new TypeReference<>() {
});
}

@Override
public void createTopic(Long streamId,
Optional<Long> topicId,
Long partitionsCount,
CompressionAlgorithm compressionAlgorithm,
BigInteger messageExpiry,
BigInteger maxTopicSize,
Optional<Short> replicationFactor,
String name) {
createTopic(StreamId.of(streamId),
topicId,
partitionsCount,
compressionAlgorithm,
messageExpiry,
maxTopicSize,
replicationFactor,
name);
}

@Override
public void createTopic(StreamId streamId,
Optional<Long> topicId,
Expand All @@ -83,23 +54,13 @@ public void createTopic(StreamId streamId,
httpClient.execute(request);
}

@Override
public void updateTopic(Long streamId, Long topicId, Optional<Long> messageExpiry, String name) {
updateTopic(StreamId.of(streamId), TopicId.of(topicId), messageExpiry, name);
}

@Override
public void updateTopic(StreamId streamId, TopicId topicId, Optional<Long> messageExpiry, String name) {
var request = httpClient.preparePutRequest(STREAMS + "/" + streamId + TOPICS + "/" + topicId,
new UpdateTopic(messageExpiry, name));
httpClient.execute(request);
}

@Override
public void deleteTopic(Long streamId, Long topicId) {
deleteTopic(StreamId.of(streamId), TopicId.of(topicId));
}

@Override
public void deleteTopic(StreamId streamId, TopicId topicId) {
var request = httpClient.prepareDeleteRequest(STREAMS + "/" + streamId + TOPICS + "/" + topicId);
Expand Down
55 changes: 47 additions & 8 deletions src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rs.iggy.partition.Partition;
import rs.iggy.stream.StreamBase;
import rs.iggy.stream.StreamDetails;
import rs.iggy.topic.CompressionAlgorithm;
import rs.iggy.topic.Topic;
import rs.iggy.topic.TopicDetails;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

Expand All @@ -21,10 +25,10 @@ private BytesDeserializer() {

static StreamBase readStreamBase(ByteBuf response) {
var streamId = response.readUnsignedIntLE();
var createdAt = readU64ToBigInteger(response);
var createdAt = readU64AsBigInteger(response);
var topicsCount = response.readUnsignedIntLE();
var size = readU64ToBigInteger(response);
var messagesCount = readU64ToBigInteger(response);
var size = readU64AsBigInteger(response);
var messagesCount = readU64AsBigInteger(response);
var nameLength = response.readByte();
var name = response.readCharSequence(nameLength, StandardCharsets.UTF_8).toString();

Expand All @@ -34,17 +38,52 @@ static StreamBase readStreamBase(ByteBuf response) {
static StreamDetails readStreamDetails(ByteBuf response) {
var streamBase = readStreamBase(response);

List<Topic> topics = Collections.emptyList();
List<Topic> topics = new ArrayList<>();
if (response.isReadable()) {
log.debug("has more data"); //TODO(mm): 6.10.2024 Add topics implementation
topics.add(readTopic(response));
}

return new StreamDetails(streamBase, topics);
}

private static BigInteger readU64ToBigInteger(ByteBuf buffer) {
var bytesArray = new byte[8];
buffer.readBytes(bytesArray);
public static TopicDetails readTopicDetails(ByteBuf response) {
var topic = readTopic(response);

List<Partition> partitions = Collections.emptyList();
if (response.isReadable()) {
log.debug("has more data"); //TODO(mm): 8.10.2024 Add partitions
}

return new TopicDetails(topic, partitions);
}

public static Topic readTopic(ByteBuf response) {
var topicId = response.readUnsignedIntLE();
var createdAt = readU64AsBigInteger(response);
var partitionsCount = response.readUnsignedIntLE();
var messageExpiry = readU64AsBigInteger(response);
var compressionAlgorithmCode = response.readByte();
var maxTopicSize = readU64AsBigInteger(response);
var replicationFactor = response.readByte();
var size = readU64AsBigInteger(response);
var messagesCount = readU64AsBigInteger(response);
var nameLength = response.readByte();
var name = response.readCharSequence(nameLength, StandardCharsets.UTF_8).toString();
return new Topic(topicId,
createdAt,
name,
size.toString(),
messageExpiry,
CompressionAlgorithm.fromCode(compressionAlgorithmCode),
maxTopicSize,
(short) replicationFactor,
messagesCount,
partitionsCount);
}

private static BigInteger readU64AsBigInteger(ByteBuf buffer) {
var bytesArray = new byte[9];
buffer.readBytes(bytesArray, 0, 8);
ArrayUtils.reverse(bytesArray);
return new BigInteger(bytesArray);
}
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/rs/iggy/clients/blocking/tcp/BytesSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.ArrayUtils;
import rs.iggy.identifier.Identifier;
import java.math.BigInteger;

final class BytesSerializer {

Expand All @@ -27,4 +29,25 @@ static ByteBuf toBytes(Identifier identifier) {
}
}

static ByteBuf nameToBytes(String name) {
ByteBuf buffer = Unpooled.buffer(1 + name.length());
buffer.writeByte(name.length());
buffer.writeBytes(name.getBytes());
return buffer;
}

static ByteBuf toBytesAsU64(BigInteger value) {
ByteBuf buffer = Unpooled.buffer(8, 8);
byte[] valueAsBytes = value.toByteArray();
if (valueAsBytes.length > 8) {
throw new IllegalArgumentException();
}
ArrayUtils.reverse(valueAsBytes);
buffer.writeBytes(valueAsBytes);
if (valueAsBytes.length < 8) {
buffer.writeZero(8 - valueAsBytes.length);
}
return buffer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ public class IggyTcpClient implements IggyClient {

private final UsersTcpClient usersClient;
private final StreamsTcpClient streamsClient;
private final TopicsTcpClient topicsClient;

public IggyTcpClient(String host, Integer port) {
TcpConnectionHandler connection = new TcpConnectionHandler(host, port);
usersClient = new UsersTcpClient(connection);
streamsClient = new StreamsTcpClient(connection);
topicsClient = new TopicsTcpClient(connection);
}

@Override
Expand All @@ -30,7 +32,7 @@ public UsersClient users() {

@Override
public TopicsClient topics() {
throw new UnsupportedOperationException();
return topicsClient;
}

@Override
Expand Down
19 changes: 11 additions & 8 deletions src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
import rs.iggy.identifier.StreamId;
import rs.iggy.stream.StreamBase;
import rs.iggy.stream.StreamDetails;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readStreamBase;
import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readStreamDetails;
import static rs.iggy.clients.blocking.tcp.BytesSerializer.nameToBytes;
import static rs.iggy.clients.blocking.tcp.BytesSerializer.toBytes;

class StreamsTcpClient implements StreamsClient {
Expand All @@ -28,10 +33,9 @@ public StreamDetails createStream(Optional<Long> streamId, String name) {
var payload = Unpooled.buffer(payloadSize);

payload.writeIntLE(streamId.orElse(0L).intValue());
payload.writeByte(name.length());
payload.writeBytes(name.getBytes());
payload.writeBytes(nameToBytes(name));
var response = connection.send(CREATE_STREAM_CODE, payload);
return BytesDeserializer.readStreamDetails(response);
return readStreamDetails(response);
}

@Override
Expand All @@ -43,15 +47,15 @@ public StreamDetails getStream(Long streamId) {
public StreamDetails getStream(StreamId streamId) {
var payload = toBytes(streamId);
var response = connection.send(GET_STREAM_CODE, payload);
return BytesDeserializer.readStreamDetails(response);
return readStreamDetails(response);
}

@Override
public List<StreamBase> getStreams() {
ByteBuf response = connection.send(GET_STREAMS_CODE);
List<StreamBase> streams = new ArrayList<>();
while (response.isReadable()) {
streams.add(BytesDeserializer.readStreamBase(response));
streams.add(readStreamBase(response));
}
return streams;
}
Expand All @@ -68,8 +72,7 @@ public void updateStream(StreamId streamId, String name) {
var payload = Unpooled.buffer(payloadSize + idBytes.capacity());

payload.writeBytes(idBytes);
payload.writeByte(name.length());
payload.writeBytes(name.getBytes());
payload.writeBytes(nameToBytes(name));
connection.send(UPDATE_STREAM_CODE, payload);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ ByteBuf send(int command, ByteBuf payload) {
return handleResponse(status, responseLength, responseBuffer);
}

ByteBuf handleResponse(long status, int responseLength, ByteBuf responseBuffer) {
private ByteBuf handleResponse(long status, int responseLength, ByteBuf responseBuffer) {
if (status != 0) {
throw new RuntimeException("Received an invalid response with status " + status);
}
Expand Down
Loading

0 comments on commit 88f429a

Please sign in to comment.