Skip to content

Commit

Permalink
Implement consumer groups for TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 9, 2024
1 parent 956fabf commit 278d653
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 50 deletions.
34 changes: 23 additions & 11 deletions src/main/java/rs/iggy/clients/blocking/ConsumerGroupsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,40 @@

public interface ConsumerGroupsClient {

ConsumerGroupDetails getConsumerGroup(Long streamId, Long topicId, Long consumerGroupId);
default ConsumerGroupDetails getConsumerGroup(Long streamId, Long topicId, Long groupId) {
return getConsumerGroup(StreamId.of(streamId), TopicId.of(topicId), ConsumerGroupId.of(groupId));
}

ConsumerGroupDetails getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId consumerGroupId);
ConsumerGroupDetails getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId groupId);

List<ConsumerGroup> getConsumerGroups(Long streamId, Long topicId);
default List<ConsumerGroup> getConsumerGroups(Long streamId, Long topicId) {
return getConsumerGroups(StreamId.of(streamId), TopicId.of(topicId));
}

List<ConsumerGroup> getConsumerGroups(StreamId streamId, TopicId topicId);

ConsumerGroupDetails createConsumerGroup(Long streamId, Long topicId, Optional<Long> consumerGroupId, String consumerGroupName);
default ConsumerGroupDetails createConsumerGroup(Long streamId, Long topicId, Optional<Long> groupId, String name) {
return createConsumerGroup(StreamId.of(streamId), TopicId.of(topicId), groupId, name);
}

ConsumerGroupDetails createConsumerGroup(StreamId streamId, TopicId topicId, Optional<Long> consumerGroupId, String consumerGroupName);
ConsumerGroupDetails createConsumerGroup(StreamId streamId, TopicId topicId, Optional<Long> groupId, String name);

void deleteConsumerGroup(Long streamId, Long topicId, Long consumerGroupId);
default void deleteConsumerGroup(Long streamId, Long topicId, Long groupId) {
deleteConsumerGroup(StreamId.of(streamId), TopicId.of(topicId), ConsumerGroupId.of(groupId));
}

void deleteConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId consumerGroupId);
void deleteConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId groupId);

void joinConsumerGroup(Long streamId, Long topicId, Long consumerGroupId);
default void joinConsumerGroup(Long streamId, Long topicId, Long groupId) {
joinConsumerGroup(StreamId.of(streamId), TopicId.of(topicId), ConsumerGroupId.of(groupId));
}

void joinConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId consumerGroupId);
void joinConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId groupId);

void leaveConsumerGroup(Long streamId, Long topicId, Long consumerGroupId);
default void leaveConsumerGroup(Long streamId, Long topicId, Long groupId) {
leaveConsumerGroup(StreamId.of(streamId), TopicId.of(topicId), ConsumerGroupId.of(groupId));
}

void leaveConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId consumerGroupId);
void leaveConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId groupId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,11 @@ public ConsumerGroupsHttpClient(HttpClient httpClient) {
}

@Override
public ConsumerGroupDetails getConsumerGroup(Long streamId, Long topicId, Long consumerGroupId) {
return getConsumerGroup(StreamId.of(streamId), TopicId.of(topicId), ConsumerGroupId.of(consumerGroupId));
}

@Override
public ConsumerGroupDetails getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId consumerGroupId) {
var request = httpClient.prepareGetRequest(path(streamId, topicId) + "/" + consumerGroupId);
public ConsumerGroupDetails getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId groupId) {
var request = httpClient.prepareGetRequest(path(streamId, topicId) + "/" + groupId);
return httpClient.execute(request, ConsumerGroupDetails.class);
}

@Override
public List<ConsumerGroup> getConsumerGroups(Long streamId, Long topicId) {
return getConsumerGroups(StreamId.of(streamId), TopicId.of(topicId));
}

@Override
public List<ConsumerGroup> getConsumerGroups(StreamId streamId, TopicId topicId) {
var request = httpClient.prepareGetRequest(path(streamId, topicId));
Expand All @@ -42,46 +32,26 @@ public List<ConsumerGroup> getConsumerGroups(StreamId streamId, TopicId topicId)
}

@Override
public ConsumerGroupDetails createConsumerGroup(Long streamId, Long topicId, Optional<Long> consumerGroupId, String consumerGroupName) {
return createConsumerGroup(StreamId.of(streamId), TopicId.of(topicId), consumerGroupId, consumerGroupName);
}

@Override
public ConsumerGroupDetails createConsumerGroup(StreamId streamId, TopicId topicId, Optional<Long> consumerGroupId, String consumerGroupName) {
public ConsumerGroupDetails createConsumerGroup(StreamId streamId, TopicId topicId, Optional<Long> groupId, String name) {
var request = httpClient.preparePostRequest(path(streamId, topicId),
new CreateConsumerGroup(consumerGroupId, consumerGroupName));
new CreateConsumerGroup(groupId, name));
return httpClient.execute(request, new TypeReference<>() {
});
}

@Override
public void deleteConsumerGroup(Long streamId, Long topicId, Long consumerGroupId) {
deleteConsumerGroup(StreamId.of(streamId), TopicId.of(topicId), ConsumerGroupId.of(consumerGroupId));
}

@Override
public void deleteConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId consumerGroupId) {
var request = httpClient.prepareDeleteRequest(path(streamId, topicId) + "/" + consumerGroupId);
public void deleteConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId groupId) {
var request = httpClient.prepareDeleteRequest(path(streamId, topicId) + "/" + groupId);
httpClient.execute(request);
}

@Override
public void joinConsumerGroup(Long streamId, Long topicId, Long consumerGroupId) {
joinConsumerGroup(StreamId.of(streamId), TopicId.of(topicId), ConsumerGroupId.of(consumerGroupId));
}

@Override
public void joinConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId consumerGroupId) {
public void joinConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId groupId) {
throw new UnsupportedOperationException("Method not available in HTTP client");
}

@Override
public void leaveConsumerGroup(Long streamId, Long topicId, Long consumerGroupId) {
leaveConsumerGroup(StreamId.of(streamId), TopicId.of(topicId), ConsumerGroupId.of(consumerGroupId));
}

@Override
public void leaveConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId consumerGroupId) {
public void leaveConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId groupId) {
throw new UnsupportedOperationException("Method not available in HTTP client");
}

Expand Down
23 changes: 23 additions & 0 deletions src/main/java/rs/iggy/clients/blocking/tcp/BytesDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rs.iggy.consumergroup.ConsumerGroup;
import rs.iggy.consumergroup.ConsumerGroupDetails;
import rs.iggy.consumergroup.ConsumerGroupMember;
import rs.iggy.partition.Partition;
import rs.iggy.stream.StreamBase;
import rs.iggy.stream.StreamDetails;
Expand Down Expand Up @@ -81,6 +84,26 @@ public static Topic readTopic(ByteBuf response) {
partitionsCount);
}

public static ConsumerGroupDetails readConsumerGroupDetails(ByteBuf response) {
var consumerGroup = readConsumerGroup(response);

List<ConsumerGroupMember> members = new ArrayList<>();
if (response.isReadable()) {
log.debug("has more data"); //TODO(mm): 9.10.2024 add consumer group members
}

return new ConsumerGroupDetails(consumerGroup, members);
}

public static ConsumerGroup readConsumerGroup(ByteBuf response) {
var groupId = response.readUnsignedIntLE();
var partitionsCount = response.readUnsignedIntLE();
var membersCount = response.readUnsignedIntLE();
var nameLength = response.readByte();
var name = response.readCharSequence(nameLength, StandardCharsets.UTF_8).toString();
return new ConsumerGroup(groupId, name, partitionsCount, membersCount);
}

private static BigInteger readU64AsBigInteger(ByteBuf buffer) {
var bytesArray = new byte[9];
buffer.readBytes(bytesArray, 0, 8);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package rs.iggy.clients.blocking.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import rs.iggy.clients.blocking.ConsumerGroupsClient;
import rs.iggy.consumergroup.ConsumerGroup;
import rs.iggy.consumergroup.ConsumerGroupDetails;
import rs.iggy.identifier.ConsumerGroupId;
import rs.iggy.identifier.StreamId;
import rs.iggy.identifier.TopicId;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readConsumerGroup;
import static rs.iggy.clients.blocking.tcp.BytesDeserializer.readConsumerGroupDetails;
import static rs.iggy.clients.blocking.tcp.BytesSerializer.nameToBytes;
import static rs.iggy.clients.blocking.tcp.BytesSerializer.toBytes;

class ConsumerGroupsTcpClient implements ConsumerGroupsClient {

private static final int GET_CONSUMER_GROUP_CODE = 600;
private static final int GET_CONSUMER_GROUPS_CODE = 601;
private static final int CREATE_CONSUMER_GROUP_CODE = 602;
private static final int DELETE_CONSUMER_GROUP_CODE = 603;
private static final int JOIN_CONSUMER_GROUP_CODE = 604;
private static final int LEAVE_CONSUMER_GROUP_CODE = 605;

private final TcpConnectionHandler connection;

public ConsumerGroupsTcpClient(TcpConnectionHandler connection) {
this.connection = connection;
}

@Override
public ConsumerGroupDetails getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId groupId) {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(groupId));
var response = connection.send(GET_CONSUMER_GROUP_CODE, payload);
return readConsumerGroupDetails(response);
}

@Override
public List<ConsumerGroup> getConsumerGroups(StreamId streamId, TopicId topicId) {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
var response = connection.send(GET_CONSUMER_GROUPS_CODE, payload);
List<ConsumerGroup> groups = new ArrayList<>();
while (response.isReadable()) {
groups.add(readConsumerGroup(response));
}
return groups;
}

@Override
public ConsumerGroupDetails createConsumerGroup(StreamId streamId, TopicId topicId, Optional<Long> groupId, String name) {
var streamIdBytes = toBytes(streamId);
var topicIdBytes = toBytes(topicId);
var payload = Unpooled.buffer(5 + streamIdBytes.readableBytes() + topicIdBytes.readableBytes() + name.length());

payload.writeBytes(streamIdBytes);
payload.writeBytes(topicIdBytes);
payload.writeIntLE(groupId.orElse(0L).intValue());
payload.writeBytes(nameToBytes(name));

ByteBuf response = connection.send(CREATE_CONSUMER_GROUP_CODE, payload);
return readConsumerGroupDetails(response);
}

@Override
public void deleteConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId groupId) {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(groupId));
connection.send(DELETE_CONSUMER_GROUP_CODE, payload);
}

@Override
public void joinConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId groupId) {
throw new UnsupportedOperationException();
}

@Override
public void leaveConsumerGroup(StreamId streamId, TopicId topicId, ConsumerGroupId groupId) {
throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ public class IggyTcpClient implements IggyClient {
private final StreamsTcpClient streamsClient;
private final TopicsTcpClient topicsClient;
private final PartitionsTcpClient partitionsClient;
private final ConsumerGroupsTcpClient consumerGroupsClient;

public IggyTcpClient(String host, Integer port) {
TcpConnectionHandler connection = new TcpConnectionHandler(host, port);
usersClient = new UsersTcpClient(connection);
streamsClient = new StreamsTcpClient(connection);
topicsClient = new TopicsTcpClient(connection);
partitionsClient = new PartitionsTcpClient(connection);
consumerGroupsClient = new ConsumerGroupsTcpClient(connection);
}

@Override
Expand Down Expand Up @@ -44,7 +46,7 @@ public PartitionsClient partitions() {

@Override
public ConsumerGroupsClient consumerGroups() {
throw new UnsupportedOperationException();
return consumerGroupsClient;
}

@Override
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/rs/iggy/consumergroup/ConsumerGroupDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,11 @@ public record ConsumerGroupDetails(
Long membersCount,
List<ConsumerGroupMember> members
) {
public ConsumerGroupDetails(ConsumerGroup consumerGroup, List<ConsumerGroupMember> members) {
this(consumerGroup.id(),
consumerGroup.name(),
consumerGroup.partitionsCount(),
consumerGroup.membersCount(),
members);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package rs.iggy.clients.blocking.tcp;

import rs.iggy.clients.blocking.ConsumerGroupsClientBaseTest;
import rs.iggy.clients.blocking.IggyClient;

class ConsumerGroupsTcpClientTest extends ConsumerGroupsClientBaseTest {

@Override
protected IggyClient getClient() {
return TcpClientFactory.create(iggyServer);
}

}

0 comments on commit 278d653

Please sign in to comment.