Skip to content

Commit

Permalink
Rename internal http and tcp clients
Browse files Browse the repository at this point in the history
  • Loading branch information
mmodzelewski committed Oct 25, 2024
1 parent 4132255 commit f5290f1
Show file tree
Hide file tree
Showing 22 changed files with 101 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@

class ConsumerGroupsHttpClient implements ConsumerGroupsClient {

private final HttpClient httpClient;
private final InternalHttpClient httpClient;

public ConsumerGroupsHttpClient(HttpClient httpClient) {
public ConsumerGroupsHttpClient(InternalHttpClient httpClient) {
this.httpClient = httpClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
class ConsumerOffsetsHttpClient implements ConsumerOffsetsClient {

private static final String DEFAULT_PARTITION_ID = "1";
private final HttpClient httpClient;
private final InternalHttpClient httpClient;

public ConsumerOffsetsHttpClient(HttpClient httpClient) {
public ConsumerOffsetsHttpClient(InternalHttpClient httpClient) {
this.httpClient = httpClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class IggyHttpClient implements IggyClient {
private final PersonalAccessTokensHttpClient personalAccessTokensHttpClient;

public IggyHttpClient(String url) {
HttpClient httpClient = new HttpClient(url);
InternalHttpClient httpClient = new InternalHttpClient(url);
systemClient = new SystemHttpClient(httpClient);
streamsClient = new StreamsHttpClient(httpClient);
usersClient = new UsersHttpClient(httpClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
import java.io.IOException;
import java.util.Optional;

class HttpClient {
final class InternalHttpClient {

private static final Logger log = LoggerFactory.getLogger(HttpClient.class);
private static final Logger log = LoggerFactory.getLogger(InternalHttpClient.class);

private static final String AUTHORIZATION = "Authorization";
private final String url;
private final ObjectMapper objectMapper = ObjectMapperFactory.getInstance();
private Optional<String> token = Optional.empty();

HttpClient(String url) {
InternalHttpClient(String url) {
this.url = url;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

class MessagesHttpClient implements MessagesClient {

private final HttpClient httpClient;
private final InternalHttpClient httpClient;

public MessagesHttpClient(HttpClient httpClient) {
public MessagesHttpClient(InternalHttpClient httpClient) {
this.httpClient = httpClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ class PartitionsHttpClient implements PartitionsClient {
private static final String STREAMS = "/streams";
private static final String TOPICS = "/topics";
private static final String PARTITIONS = "/partitions";
private final HttpClient httpClient;
private final InternalHttpClient httpClient;

public PartitionsHttpClient(HttpClient httpClient) {
public PartitionsHttpClient(InternalHttpClient httpClient) {
this.httpClient = httpClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
class PersonalAccessTokensHttpClient implements PersonalAccessTokensClient {

private static final String PERSONAL_ACCESS_TOKENS = "/personal-access-tokens";
private final HttpClient httpClient;
private final InternalHttpClient httpClient;

public PersonalAccessTokensHttpClient(HttpClient httpClient) {
public PersonalAccessTokensHttpClient(InternalHttpClient httpClient) {
this.httpClient = httpClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
class StreamsHttpClient implements StreamsClient {

private static final String STREAMS = "/streams";
private final HttpClient httpClient;
private final InternalHttpClient httpClient;

public StreamsHttpClient(HttpClient httpClient) {
public StreamsHttpClient(InternalHttpClient httpClient) {
this.httpClient = httpClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ class SystemHttpClient implements SystemClient {
private static final String STATS = "/stats";
private static final String CLIENTS = "/clients";
private static final String PING = "/ping";
private final HttpClient httpClient;
private final InternalHttpClient httpClient;

public SystemHttpClient(HttpClient httpClient) {
public SystemHttpClient(InternalHttpClient httpClient) {
this.httpClient = httpClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ class TopicsHttpClient implements TopicsClient {

private static final String STREAMS = "/streams";
private static final String TOPICS = "/topics";
private final HttpClient httpClient;
private final InternalHttpClient httpClient;

public TopicsHttpClient(HttpClient httpClient) {
public TopicsHttpClient(InternalHttpClient httpClient) {
this.httpClient = httpClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
class UsersHttpClient implements UsersClient {

private static final String USERS = "/users";
private final HttpClient httpClient;
private final InternalHttpClient httpClient;

public UsersHttpClient(HttpClient httpClient) {
public UsersHttpClient(InternalHttpClient httpClient) {
this.httpClient = httpClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ class ConsumerGroupsTcpClient implements ConsumerGroupsClient {
private static final int JOIN_CONSUMER_GROUP_CODE = 604;
private static final int LEAVE_CONSUMER_GROUP_CODE = 605;

private final TcpConnectionHandler connection;
private final InternalTcpClient tcpClient;

public ConsumerGroupsTcpClient(TcpConnectionHandler connection) {
this.connection = connection;
public ConsumerGroupsTcpClient(InternalTcpClient tcpClient) {
this.tcpClient = tcpClient;
}

@Override
public Optional<ConsumerGroupDetails> getConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId groupId) {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(groupId));
var response = connection.send(GET_CONSUMER_GROUP_CODE, payload);
var response = tcpClient.send(GET_CONSUMER_GROUP_CODE, payload);
if (response.isReadable()) {
return Optional.of(readConsumerGroupDetails(response));
}
Expand All @@ -47,7 +47,7 @@ public Optional<ConsumerGroupDetails> getConsumerGroup(StreamId streamId, TopicI
public List<ConsumerGroup> getConsumerGroups(StreamId streamId, TopicId topicId) {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
var response = connection.send(GET_CONSUMER_GROUPS_CODE, payload);
var response = tcpClient.send(GET_CONSUMER_GROUPS_CODE, payload);
List<ConsumerGroup> groups = new ArrayList<>();
while (response.isReadable()) {
groups.add(readConsumerGroup(response));
Expand All @@ -66,7 +66,7 @@ public ConsumerGroupDetails createConsumerGroup(StreamId streamId, TopicId topic
payload.writeIntLE(groupId.orElse(0L).intValue());
payload.writeBytes(nameToBytes(name));

ByteBuf response = connection.send(CREATE_CONSUMER_GROUP_CODE, payload);
ByteBuf response = tcpClient.send(CREATE_CONSUMER_GROUP_CODE, payload);
return readConsumerGroupDetails(response);
}

Expand All @@ -75,7 +75,7 @@ public void deleteConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId g
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(groupId));
connection.send(DELETE_CONSUMER_GROUP_CODE, payload);
tcpClient.send(DELETE_CONSUMER_GROUP_CODE, payload);
}

@Override
Expand All @@ -84,7 +84,7 @@ public void joinConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId gro
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(groupId));

connection.send(JOIN_CONSUMER_GROUP_CODE, payload);
tcpClient.send(JOIN_CONSUMER_GROUP_CODE, payload);
}

@Override
Expand All @@ -93,7 +93,7 @@ public void leaveConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId gr
payload.writeBytes(toBytes(topicId));
payload.writeBytes(toBytes(groupId));

connection.send(LEAVE_CONSUMER_GROUP_CODE, payload);
tcpClient.send(LEAVE_CONSUMER_GROUP_CODE, payload);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ class ConsumerOffsetTcpClient implements ConsumerOffsetsClient {
private static final int GET_CONSUMER_OFFSET_CODE = 120;
private static final int STORE_CONSUMER_OFFSET_CODE = 121;

private final TcpConnectionHandler connection;
private final InternalTcpClient tcpClient;

public ConsumerOffsetTcpClient(TcpConnectionHandler connection) {
this.connection = connection;
public ConsumerOffsetTcpClient(InternalTcpClient tcpClient) {
this.tcpClient = tcpClient;
}

@Override
Expand All @@ -30,7 +30,7 @@ public void storeConsumerOffset(StreamId streamId, TopicId topicId, Optional<Lon
payload.writeIntLE(partitionId.orElse(0L).intValue());
payload.writeBytes(toBytesAsU64(offset));

connection.send(STORE_CONSUMER_OFFSET_CODE, payload);
tcpClient.send(STORE_CONSUMER_OFFSET_CODE, payload);
}

@Override
Expand All @@ -40,7 +40,7 @@ public Optional<ConsumerOffsetInfo> getConsumerOffset(StreamId streamId, TopicId
payload.writeBytes(toBytes(topicId));
payload.writeIntLE(partitionId.orElse(0L).intValue());

var response = connection.send(GET_CONSUMER_OFFSET_CODE, payload);
var response = tcpClient.send(GET_CONSUMER_OFFSET_CODE, payload);
if (response.isReadable()) {
return Optional.of(readConsumerOffsetInfo(response));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ public class IggyTcpClient implements IggyClient {
private final PersonalAccessTokensTcpClient personalAccessTokensClient;

public IggyTcpClient(String host, Integer port) {
TcpConnectionHandler connection = new TcpConnectionHandler(host, port);
usersClient = new UsersTcpClient(connection);
streamsClient = new StreamsTcpClient(connection);
topicsClient = new TopicsTcpClient(connection);
partitionsClient = new PartitionsTcpClient(connection);
consumerGroupsClient = new ConsumerGroupsTcpClient(connection);
consumerOffsetsClient = new ConsumerOffsetTcpClient(connection);
messagesClient = new MessagesTcpClient(connection);
systemClient = new SystemTcpClient(connection);
personalAccessTokensClient = new PersonalAccessTokensTcpClient(connection);
InternalTcpClient tcpClient = new InternalTcpClient(host, port);
usersClient = new UsersTcpClient(tcpClient);
streamsClient = new StreamsTcpClient(tcpClient);
topicsClient = new TopicsTcpClient(tcpClient);
partitionsClient = new PartitionsTcpClient(tcpClient);
consumerGroupsClient = new ConsumerGroupsTcpClient(tcpClient);
consumerOffsetsClient = new ConsumerOffsetTcpClient(tcpClient);
messagesClient = new MessagesTcpClient(tcpClient);
systemClient = new SystemTcpClient(tcpClient);
personalAccessTokensClient = new PersonalAccessTokensTcpClient(tcpClient);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

final class TcpConnectionHandler {
final class InternalTcpClient {

private static final int REQUEST_INITIAL_BYTES_LENGTH = 4;
private static final int COMMAND_LENGTH = 4;
Expand All @@ -20,7 +20,7 @@ final class TcpConnectionHandler {
private final Connection connection;
private final BlockingQueue<IggyResponse> responses = new LinkedBlockingQueue<>();

TcpConnectionHandler(String host, Integer port) {
InternalTcpClient(String host, Integer port) {
this.connection = TcpClient.create()
.host(host)
.port(port)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ class MessagesTcpClient implements MessagesClient {
private static final int POLL_MESSAGES_CODE = 100;
private static final int SEND_MESSAGES_CODE = 101;

private final TcpConnectionHandler connection;
private final InternalTcpClient tcpClient;

public MessagesTcpClient(TcpConnectionHandler connection) {
this.connection = connection;
public MessagesTcpClient(InternalTcpClient tcpClient) {
this.tcpClient = tcpClient;
}

@Override
Expand All @@ -33,7 +33,7 @@ public PolledMessages pollMessages(StreamId streamId, TopicId topicId, Optional<
payload.writeIntLE(count.intValue());
payload.writeByte(autoCommit ? 1 : 0);

var response = connection.send(POLL_MESSAGES_CODE, payload);
var response = tcpClient.send(POLL_MESSAGES_CODE, payload);

return BytesDeserializer.readPolledMessages(response);
}
Expand All @@ -47,6 +47,6 @@ public void sendMessages(StreamId streamId, TopicId topicId, Partitioning partit
payload.writeBytes(toBytes(message));
}

connection.send(SEND_MESSAGES_CODE, payload);
tcpClient.send(SEND_MESSAGES_CODE, payload);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,25 @@ class PartitionsTcpClient implements PartitionsClient {

private static final int CREATE_PARTITION_CODE = 402;
private static final int DELETE_PARTITION_CODE = 403;
private final TcpConnectionHandler connection;
private final InternalTcpClient tcpClient;

PartitionsTcpClient(TcpConnectionHandler connection) {
this.connection = connection;
PartitionsTcpClient(InternalTcpClient tcpClient) {
this.tcpClient = tcpClient;
}

@Override
public void createPartitions(StreamId streamId, TopicId topicId, Long partitionsCount) {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeIntLE(partitionsCount.intValue());
connection.send(CREATE_PARTITION_CODE, payload);
tcpClient.send(CREATE_PARTITION_CODE, payload);
}

@Override
public void deletePartitions(StreamId streamId, TopicId topicId, Long partitionsCount) {
var payload = toBytes(streamId);
payload.writeBytes(toBytes(topicId));
payload.writeIntLE(partitionsCount.intValue());
connection.send(DELETE_PARTITION_CODE, payload);
tcpClient.send(DELETE_PARTITION_CODE, payload);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,24 @@ class PersonalAccessTokensTcpClient implements PersonalAccessTokensClient {
private static final int DELETE_PERSONAL_ACCESS_TOKEN_CODE = 43;
private static final int LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE = 44;

private final TcpConnectionHandler connection;
private final InternalTcpClient tcpClient;

public PersonalAccessTokensTcpClient(TcpConnectionHandler connection) {
this.connection = connection;
public PersonalAccessTokensTcpClient(InternalTcpClient tcpClient) {
this.tcpClient = tcpClient;
}

@Override
public RawPersonalAccessToken createPersonalAccessToken(String name, BigInteger expiry) {
var payload = Unpooled.buffer();
payload.writeBytes(nameToBytes(name));
payload.writeBytes(toBytesAsU64(expiry));
var response = connection.send(CREATE_PERSONAL_ACCESS_TOKEN_CODE, payload);
var response = tcpClient.send(CREATE_PERSONAL_ACCESS_TOKEN_CODE, payload);
return readRawPersonalAccessToken(response);
}

@Override
public List<PersonalAccessTokenInfo> getPersonalAccessTokens() {
var response = connection.send(GET_PERSONAL_ACCESS_TOKENS_CODE);
var response = tcpClient.send(GET_PERSONAL_ACCESS_TOKENS_CODE);
var tokens = new ArrayList<PersonalAccessTokenInfo>();
while (response.isReadable()) {
tokens.add(readPersonalAccessTokenInfo(response));
Expand All @@ -49,13 +49,13 @@ public List<PersonalAccessTokenInfo> getPersonalAccessTokens() {
@Override
public void deletePersonalAccessToken(String name) {
var payload = nameToBytes(name);
connection.send(DELETE_PERSONAL_ACCESS_TOKEN_CODE, payload);
tcpClient.send(DELETE_PERSONAL_ACCESS_TOKEN_CODE, payload);
}

@Override
public IdentityInfo loginWithPersonalAccessToken(String token) {
var payload = nameToBytes(token);
var response = connection.send(LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, payload);
var response = tcpClient.send(LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, payload);
var userId = response.readUnsignedIntLE();
return new IdentityInfo(userId, Optional.empty());
}
Expand Down
Loading

0 comments on commit f5290f1

Please sign in to comment.