diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/ConsumerGroupsHttpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/ConsumerGroupsHttpClient.java index 126970f..a4fc108 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/ConsumerGroupsHttpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/ConsumerGroupsHttpClient.java @@ -12,9 +12,9 @@ class ConsumerGroupsHttpClient implements ConsumerGroupsClient { - private final HttpClient httpClient; + private final InternalHttpClient httpClient; - public ConsumerGroupsHttpClient(HttpClient httpClient) { + public ConsumerGroupsHttpClient(InternalHttpClient httpClient) { this.httpClient = httpClient; } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/ConsumerOffsetsHttpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/ConsumerOffsetsHttpClient.java index 8e0a9f7..843f535 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/ConsumerOffsetsHttpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/ConsumerOffsetsHttpClient.java @@ -12,9 +12,9 @@ class ConsumerOffsetsHttpClient implements ConsumerOffsetsClient { private static final String DEFAULT_PARTITION_ID = "1"; - private final HttpClient httpClient; + private final InternalHttpClient httpClient; - public ConsumerOffsetsHttpClient(HttpClient httpClient) { + public ConsumerOffsetsHttpClient(InternalHttpClient httpClient) { this.httpClient = httpClient; } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/IggyHttpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/IggyHttpClient.java index b0d3fd9..48fcca1 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/IggyHttpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/IggyHttpClient.java @@ -15,7 +15,7 @@ public class IggyHttpClient implements IggyClient { private final PersonalAccessTokensHttpClient personalAccessTokensHttpClient; public IggyHttpClient(String url) { - HttpClient httpClient = new HttpClient(url); + InternalHttpClient httpClient = new InternalHttpClient(url); systemClient = new SystemHttpClient(httpClient); streamsClient = new StreamsHttpClient(httpClient); usersClient = new UsersHttpClient(httpClient); diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/HttpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/InternalHttpClient.java similarity index 97% rename from java-sdk/src/main/java/rs/iggy/clients/blocking/http/HttpClient.java rename to java-sdk/src/main/java/rs/iggy/clients/blocking/http/InternalHttpClient.java index bb828b2..e4830a4 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/HttpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/InternalHttpClient.java @@ -17,16 +17,16 @@ import java.io.IOException; import java.util.Optional; -class HttpClient { +final class InternalHttpClient { - private static final Logger log = LoggerFactory.getLogger(HttpClient.class); + private static final Logger log = LoggerFactory.getLogger(InternalHttpClient.class); private static final String AUTHORIZATION = "Authorization"; private final String url; private final ObjectMapper objectMapper = ObjectMapperFactory.getInstance(); private Optional token = Optional.empty(); - HttpClient(String url) { + InternalHttpClient(String url) { this.url = url; } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/MessagesHttpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/MessagesHttpClient.java index 3f602ea..70e8986 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/MessagesHttpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/MessagesHttpClient.java @@ -14,9 +14,9 @@ class MessagesHttpClient implements MessagesClient { - private final HttpClient httpClient; + private final InternalHttpClient httpClient; - public MessagesHttpClient(HttpClient httpClient) { + public MessagesHttpClient(InternalHttpClient httpClient) { this.httpClient = httpClient; } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/PartitionsHttpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/PartitionsHttpClient.java index 90b8bb3..d8999e4 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/PartitionsHttpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/PartitionsHttpClient.java @@ -10,9 +10,9 @@ class PartitionsHttpClient implements PartitionsClient { private static final String STREAMS = "/streams"; private static final String TOPICS = "/topics"; private static final String PARTITIONS = "/partitions"; - private final HttpClient httpClient; + private final InternalHttpClient httpClient; - public PartitionsHttpClient(HttpClient httpClient) { + public PartitionsHttpClient(InternalHttpClient httpClient) { this.httpClient = httpClient; } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/PersonalAccessTokensHttpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/PersonalAccessTokensHttpClient.java index a899e77..0028783 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/PersonalAccessTokensHttpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/PersonalAccessTokensHttpClient.java @@ -12,9 +12,9 @@ class PersonalAccessTokensHttpClient implements PersonalAccessTokensClient { private static final String PERSONAL_ACCESS_TOKENS = "/personal-access-tokens"; - private final HttpClient httpClient; + private final InternalHttpClient httpClient; - public PersonalAccessTokensHttpClient(HttpClient httpClient) { + public PersonalAccessTokensHttpClient(InternalHttpClient httpClient) { this.httpClient = httpClient; } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/StreamsHttpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/StreamsHttpClient.java index ceb6845..3baeb49 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/StreamsHttpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/StreamsHttpClient.java @@ -11,9 +11,9 @@ class StreamsHttpClient implements StreamsClient { private static final String STREAMS = "/streams"; - private final HttpClient httpClient; + private final InternalHttpClient httpClient; - public StreamsHttpClient(HttpClient httpClient) { + public StreamsHttpClient(InternalHttpClient httpClient) { this.httpClient = httpClient; } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/SystemHttpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/SystemHttpClient.java index 60f6082..1de904f 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/SystemHttpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/SystemHttpClient.java @@ -12,9 +12,9 @@ class SystemHttpClient implements SystemClient { private static final String STATS = "/stats"; private static final String CLIENTS = "/clients"; private static final String PING = "/ping"; - private final HttpClient httpClient; + private final InternalHttpClient httpClient; - public SystemHttpClient(HttpClient httpClient) { + public SystemHttpClient(InternalHttpClient httpClient) { this.httpClient = httpClient; } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java index 5edecde..f4f545b 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/TopicsHttpClient.java @@ -15,9 +15,9 @@ class TopicsHttpClient implements TopicsClient { private static final String STREAMS = "/streams"; private static final String TOPICS = "/topics"; - private final HttpClient httpClient; + private final InternalHttpClient httpClient; - public TopicsHttpClient(HttpClient httpClient) { + public TopicsHttpClient(InternalHttpClient httpClient) { this.httpClient = httpClient; } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/UsersHttpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/UsersHttpClient.java index aa3ed83..957a129 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/http/UsersHttpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/http/UsersHttpClient.java @@ -10,9 +10,9 @@ class UsersHttpClient implements UsersClient { private static final String USERS = "/users"; - private final HttpClient httpClient; + private final InternalHttpClient httpClient; - public UsersHttpClient(HttpClient httpClient) { + public UsersHttpClient(InternalHttpClient httpClient) { this.httpClient = httpClient; } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClient.java index f75a3ab..11da588 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerGroupsTcpClient.java @@ -25,10 +25,10 @@ class ConsumerGroupsTcpClient implements ConsumerGroupsClient { private static final int JOIN_CONSUMER_GROUP_CODE = 604; private static final int LEAVE_CONSUMER_GROUP_CODE = 605; - private final TcpConnectionHandler connection; + private final InternalTcpClient tcpClient; - public ConsumerGroupsTcpClient(TcpConnectionHandler connection) { - this.connection = connection; + public ConsumerGroupsTcpClient(InternalTcpClient tcpClient) { + this.tcpClient = tcpClient; } @Override @@ -36,7 +36,7 @@ public Optional getConsumerGroup(StreamId streamId, TopicI var payload = toBytes(streamId); payload.writeBytes(toBytes(topicId)); payload.writeBytes(toBytes(groupId)); - var response = connection.send(GET_CONSUMER_GROUP_CODE, payload); + var response = tcpClient.send(GET_CONSUMER_GROUP_CODE, payload); if (response.isReadable()) { return Optional.of(readConsumerGroupDetails(response)); } @@ -47,7 +47,7 @@ public Optional getConsumerGroup(StreamId streamId, TopicI public List getConsumerGroups(StreamId streamId, TopicId topicId) { var payload = toBytes(streamId); payload.writeBytes(toBytes(topicId)); - var response = connection.send(GET_CONSUMER_GROUPS_CODE, payload); + var response = tcpClient.send(GET_CONSUMER_GROUPS_CODE, payload); List groups = new ArrayList<>(); while (response.isReadable()) { groups.add(readConsumerGroup(response)); @@ -66,7 +66,7 @@ public ConsumerGroupDetails createConsumerGroup(StreamId streamId, TopicId topic payload.writeIntLE(groupId.orElse(0L).intValue()); payload.writeBytes(nameToBytes(name)); - ByteBuf response = connection.send(CREATE_CONSUMER_GROUP_CODE, payload); + ByteBuf response = tcpClient.send(CREATE_CONSUMER_GROUP_CODE, payload); return readConsumerGroupDetails(response); } @@ -75,7 +75,7 @@ public void deleteConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId g var payload = toBytes(streamId); payload.writeBytes(toBytes(topicId)); payload.writeBytes(toBytes(groupId)); - connection.send(DELETE_CONSUMER_GROUP_CODE, payload); + tcpClient.send(DELETE_CONSUMER_GROUP_CODE, payload); } @Override @@ -84,7 +84,7 @@ public void joinConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId gro payload.writeBytes(toBytes(topicId)); payload.writeBytes(toBytes(groupId)); - connection.send(JOIN_CONSUMER_GROUP_CODE, payload); + tcpClient.send(JOIN_CONSUMER_GROUP_CODE, payload); } @Override @@ -93,7 +93,7 @@ public void leaveConsumerGroup(StreamId streamId, TopicId topicId, ConsumerId gr payload.writeBytes(toBytes(topicId)); payload.writeBytes(toBytes(groupId)); - connection.send(LEAVE_CONSUMER_GROUP_CODE, payload); + tcpClient.send(LEAVE_CONSUMER_GROUP_CODE, payload); } } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerOffsetTcpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerOffsetTcpClient.java index 740deab..b5afe41 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerOffsetTcpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/ConsumerOffsetTcpClient.java @@ -16,10 +16,10 @@ class ConsumerOffsetTcpClient implements ConsumerOffsetsClient { private static final int GET_CONSUMER_OFFSET_CODE = 120; private static final int STORE_CONSUMER_OFFSET_CODE = 121; - private final TcpConnectionHandler connection; + private final InternalTcpClient tcpClient; - public ConsumerOffsetTcpClient(TcpConnectionHandler connection) { - this.connection = connection; + public ConsumerOffsetTcpClient(InternalTcpClient tcpClient) { + this.tcpClient = tcpClient; } @Override @@ -30,7 +30,7 @@ public void storeConsumerOffset(StreamId streamId, TopicId topicId, Optional getConsumerOffset(StreamId streamId, TopicId payload.writeBytes(toBytes(topicId)); payload.writeIntLE(partitionId.orElse(0L).intValue()); - var response = connection.send(GET_CONSUMER_OFFSET_CODE, payload); + var response = tcpClient.send(GET_CONSUMER_OFFSET_CODE, payload); if (response.isReadable()) { return Optional.of(readConsumerOffsetInfo(response)); } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java index ee4fc0c..c6a05b9 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/IggyTcpClient.java @@ -15,16 +15,16 @@ public class IggyTcpClient implements IggyClient { private final PersonalAccessTokensTcpClient personalAccessTokensClient; public IggyTcpClient(String host, Integer port) { - TcpConnectionHandler connection = new TcpConnectionHandler(host, port); - usersClient = new UsersTcpClient(connection); - streamsClient = new StreamsTcpClient(connection); - topicsClient = new TopicsTcpClient(connection); - partitionsClient = new PartitionsTcpClient(connection); - consumerGroupsClient = new ConsumerGroupsTcpClient(connection); - consumerOffsetsClient = new ConsumerOffsetTcpClient(connection); - messagesClient = new MessagesTcpClient(connection); - systemClient = new SystemTcpClient(connection); - personalAccessTokensClient = new PersonalAccessTokensTcpClient(connection); + InternalTcpClient tcpClient = new InternalTcpClient(host, port); + usersClient = new UsersTcpClient(tcpClient); + streamsClient = new StreamsTcpClient(tcpClient); + topicsClient = new TopicsTcpClient(tcpClient); + partitionsClient = new PartitionsTcpClient(tcpClient); + consumerGroupsClient = new ConsumerGroupsTcpClient(tcpClient); + consumerOffsetsClient = new ConsumerOffsetTcpClient(tcpClient); + messagesClient = new MessagesTcpClient(tcpClient); + systemClient = new SystemTcpClient(tcpClient); + personalAccessTokensClient = new PersonalAccessTokensTcpClient(tcpClient); } @Override diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/TcpConnectionHandler.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/InternalTcpClient.java similarity index 97% rename from java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/TcpConnectionHandler.java rename to java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/InternalTcpClient.java index f40e141..376ff0f 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/TcpConnectionHandler.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/InternalTcpClient.java @@ -11,7 +11,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -final class TcpConnectionHandler { +final class InternalTcpClient { private static final int REQUEST_INITIAL_BYTES_LENGTH = 4; private static final int COMMAND_LENGTH = 4; @@ -20,7 +20,7 @@ final class TcpConnectionHandler { private final Connection connection; private final BlockingQueue responses = new LinkedBlockingQueue<>(); - TcpConnectionHandler(String host, Integer port) { + InternalTcpClient(String host, Integer port) { this.connection = TcpClient.create() .host(host) .port(port) diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/MessagesTcpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/MessagesTcpClient.java index f3591d3..70e1243 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/MessagesTcpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/MessagesTcpClient.java @@ -17,10 +17,10 @@ class MessagesTcpClient implements MessagesClient { private static final int POLL_MESSAGES_CODE = 100; private static final int SEND_MESSAGES_CODE = 101; - private final TcpConnectionHandler connection; + private final InternalTcpClient tcpClient; - public MessagesTcpClient(TcpConnectionHandler connection) { - this.connection = connection; + public MessagesTcpClient(InternalTcpClient tcpClient) { + this.tcpClient = tcpClient; } @Override @@ -33,7 +33,7 @@ public PolledMessages pollMessages(StreamId streamId, TopicId topicId, Optional< payload.writeIntLE(count.intValue()); payload.writeByte(autoCommit ? 1 : 0); - var response = connection.send(POLL_MESSAGES_CODE, payload); + var response = tcpClient.send(POLL_MESSAGES_CODE, payload); return BytesDeserializer.readPolledMessages(response); } @@ -47,6 +47,6 @@ public void sendMessages(StreamId streamId, TopicId topicId, Partitioning partit payload.writeBytes(toBytes(message)); } - connection.send(SEND_MESSAGES_CODE, payload); + tcpClient.send(SEND_MESSAGES_CODE, payload); } } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/PartitionsTcpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/PartitionsTcpClient.java index 7b17d3e..44bb9f3 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/PartitionsTcpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/PartitionsTcpClient.java @@ -9,10 +9,10 @@ class PartitionsTcpClient implements PartitionsClient { private static final int CREATE_PARTITION_CODE = 402; private static final int DELETE_PARTITION_CODE = 403; - private final TcpConnectionHandler connection; + private final InternalTcpClient tcpClient; - PartitionsTcpClient(TcpConnectionHandler connection) { - this.connection = connection; + PartitionsTcpClient(InternalTcpClient tcpClient) { + this.tcpClient = tcpClient; } @Override @@ -20,7 +20,7 @@ public void createPartitions(StreamId streamId, TopicId topicId, Long partitions var payload = toBytes(streamId); payload.writeBytes(toBytes(topicId)); payload.writeIntLE(partitionsCount.intValue()); - connection.send(CREATE_PARTITION_CODE, payload); + tcpClient.send(CREATE_PARTITION_CODE, payload); } @Override @@ -28,6 +28,6 @@ public void deletePartitions(StreamId streamId, TopicId topicId, Long partitions var payload = toBytes(streamId); payload.writeBytes(toBytes(topicId)); payload.writeIntLE(partitionsCount.intValue()); - connection.send(DELETE_PARTITION_CODE, payload); + tcpClient.send(DELETE_PARTITION_CODE, payload); } } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/PersonalAccessTokensTcpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/PersonalAccessTokensTcpClient.java index ea587d0..c482df0 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/PersonalAccessTokensTcpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/PersonalAccessTokensTcpClient.java @@ -21,10 +21,10 @@ class PersonalAccessTokensTcpClient implements PersonalAccessTokensClient { private static final int DELETE_PERSONAL_ACCESS_TOKEN_CODE = 43; private static final int LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE = 44; - private final TcpConnectionHandler connection; + private final InternalTcpClient tcpClient; - public PersonalAccessTokensTcpClient(TcpConnectionHandler connection) { - this.connection = connection; + public PersonalAccessTokensTcpClient(InternalTcpClient tcpClient) { + this.tcpClient = tcpClient; } @Override @@ -32,13 +32,13 @@ public RawPersonalAccessToken createPersonalAccessToken(String name, BigInteger var payload = Unpooled.buffer(); payload.writeBytes(nameToBytes(name)); payload.writeBytes(toBytesAsU64(expiry)); - var response = connection.send(CREATE_PERSONAL_ACCESS_TOKEN_CODE, payload); + var response = tcpClient.send(CREATE_PERSONAL_ACCESS_TOKEN_CODE, payload); return readRawPersonalAccessToken(response); } @Override public List getPersonalAccessTokens() { - var response = connection.send(GET_PERSONAL_ACCESS_TOKENS_CODE); + var response = tcpClient.send(GET_PERSONAL_ACCESS_TOKENS_CODE); var tokens = new ArrayList(); while (response.isReadable()) { tokens.add(readPersonalAccessTokenInfo(response)); @@ -49,13 +49,13 @@ public List getPersonalAccessTokens() { @Override public void deletePersonalAccessToken(String name) { var payload = nameToBytes(name); - connection.send(DELETE_PERSONAL_ACCESS_TOKEN_CODE, payload); + tcpClient.send(DELETE_PERSONAL_ACCESS_TOKEN_CODE, payload); } @Override public IdentityInfo loginWithPersonalAccessToken(String token) { var payload = nameToBytes(token); - var response = connection.send(LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, payload); + var response = tcpClient.send(LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, payload); var userId = response.readUnsignedIntLE(); return new IdentityInfo(userId, Optional.empty()); } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java index 0425a34..1527893 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/StreamsTcpClient.java @@ -21,10 +21,10 @@ class StreamsTcpClient implements StreamsClient { private static final int CREATE_STREAM_CODE = 202; private static final int DELETE_STREAM_CODE = 203; private static final int UPDATE_STREAM_CODE = 204; - private final TcpConnectionHandler connection; + private final InternalTcpClient tcpClient; - StreamsTcpClient(TcpConnectionHandler connection) { - this.connection = connection; + StreamsTcpClient(InternalTcpClient tcpClient) { + this.tcpClient = tcpClient; } @Override @@ -34,14 +34,14 @@ public StreamDetails createStream(Optional streamId, String name) { payload.writeIntLE(streamId.orElse(0L).intValue()); payload.writeBytes(nameToBytes(name)); - var response = connection.send(CREATE_STREAM_CODE, payload); + var response = tcpClient.send(CREATE_STREAM_CODE, payload); return readStreamDetails(response); } @Override public Optional getStream(StreamId streamId) { var payload = toBytes(streamId); - var response = connection.send(GET_STREAM_CODE, payload); + var response = tcpClient.send(GET_STREAM_CODE, payload); if (response.isReadable()) { return Optional.of(readStreamDetails(response)); } @@ -50,7 +50,7 @@ public Optional getStream(StreamId streamId) { @Override public List getStreams() { - ByteBuf response = connection.send(GET_STREAMS_CODE); + ByteBuf response = tcpClient.send(GET_STREAMS_CODE); List streams = new ArrayList<>(); while (response.isReadable()) { streams.add(readStreamBase(response)); @@ -66,13 +66,13 @@ public void updateStream(StreamId streamId, String name) { payload.writeBytes(idBytes); payload.writeBytes(nameToBytes(name)); - connection.send(UPDATE_STREAM_CODE, payload); + tcpClient.send(UPDATE_STREAM_CODE, payload); } @Override public void deleteStream(StreamId streamId) { var payload = toBytes(streamId); - connection.send(DELETE_STREAM_CODE, payload); + tcpClient.send(DELETE_STREAM_CODE, payload); } } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/SystemTcpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/SystemTcpClient.java index ca6f01f..bf18b18 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/SystemTcpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/SystemTcpClient.java @@ -16,21 +16,21 @@ class SystemTcpClient implements SystemClient { private static final int GET_CLIENT_CODE = 21; private static final int GET_CLIENTS_CODE = 22; - private final TcpConnectionHandler connection; + private final InternalTcpClient tcpClient; - SystemTcpClient(TcpConnectionHandler connection) { - this.connection = connection; + SystemTcpClient(InternalTcpClient tcpClient) { + this.tcpClient = tcpClient; } @Override public Stats getStats() { - var response = connection.send(GET_STATS_CODE); + var response = tcpClient.send(GET_STATS_CODE); return readStats(response); } @Override public ClientInfoDetails getMe() { - var response = connection.send(GET_ME_CODE); + var response = tcpClient.send(GET_ME_CODE); return readClientInfoDetails(response); } @@ -38,13 +38,13 @@ public ClientInfoDetails getMe() { public ClientInfoDetails getClient(Long clientId) { var payload = Unpooled.buffer(4); payload.writeIntLE(clientId.intValue()); - var response = connection.send(GET_CLIENT_CODE, payload); + var response = tcpClient.send(GET_CLIENT_CODE, payload); return readClientInfoDetails(response); } @Override public List getClients() { - var response = connection.send(GET_CLIENTS_CODE); + var response = tcpClient.send(GET_CLIENTS_CODE); List clients = new ArrayList<>(); while (response.isReadable()) { clients.add(readClientInfo(response)); @@ -54,7 +54,7 @@ public List getClients() { @Override public String ping() { - connection.send(PING_CODE); + tcpClient.send(PING_CODE); return ""; } } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/TopicsTcpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/TopicsTcpClient.java index a471663..849d1ec 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/TopicsTcpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/TopicsTcpClient.java @@ -23,17 +23,17 @@ class TopicsTcpClient implements TopicsClient { private static final int DELETE_TOPIC_CODE = 303; private static final int UPDATE_TOPIC_CODE = 304; private static final int PURGE_TOPIC_CODE = 305; - private final TcpConnectionHandler connection; + private final InternalTcpClient tcpClient; - TopicsTcpClient(TcpConnectionHandler connection) { - this.connection = connection; + TopicsTcpClient(InternalTcpClient tcpClient) { + this.tcpClient = tcpClient; } @Override public Optional getTopic(StreamId streamId, TopicId topicId) { var payload = toBytes(streamId); payload.writeBytes(toBytes(topicId)); - var response = connection.send(GET_TOPIC_CODE, payload); + var response = tcpClient.send(GET_TOPIC_CODE, payload); if (response.isReadable()) { return Optional.of(readTopicDetails(response)); } @@ -43,7 +43,7 @@ public Optional getTopic(StreamId streamId, TopicId topicId) { @Override public List getTopics(StreamId streamId) { var payload = toBytes(streamId); - var response = connection.send(GET_TOPICS_CODE, payload); + var response = tcpClient.send(GET_TOPICS_CODE, payload); List topics = new ArrayList<>(); while (response.isReadable()) { topics.add(readTopic(response)); @@ -65,7 +65,7 @@ public TopicDetails createTopic(StreamId streamId, Optional topicId, Long payload.writeByte(replicationFactor.orElse((short) 0)); payload.writeBytes(nameToBytes(name)); - var response = connection.send(CREATE_TOPIC_CODE, payload); + var response = tcpClient.send(CREATE_TOPIC_CODE, payload); return readTopicDetails(response); } @@ -86,13 +86,13 @@ public void updateTopic(StreamId streamId, payload.writeByte(replicationFactor.orElse((short) 0)); payload.writeBytes(nameToBytes(name)); - connection.send(UPDATE_TOPIC_CODE, payload); + tcpClient.send(UPDATE_TOPIC_CODE, payload); } @Override public void deleteTopic(StreamId streamId, TopicId topicId) { var payload = toBytes(streamId); payload.writeBytes(toBytes(topicId)); - connection.send(DELETE_TOPIC_CODE, payload); + tcpClient.send(DELETE_TOPIC_CODE, payload); } } diff --git a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/UsersTcpClient.java b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/UsersTcpClient.java index 177fc53..97c23f8 100644 --- a/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/UsersTcpClient.java +++ b/java-sdk/src/main/java/rs/iggy/clients/blocking/tcp/UsersTcpClient.java @@ -23,16 +23,16 @@ class UsersTcpClient implements UsersClient { private static final int LOGIN_USER_CODE = 38; private static final int LOGOUT_USER_CODE = 39; - private final TcpConnectionHandler connection; + private final InternalTcpClient tcpClient; - UsersTcpClient(TcpConnectionHandler connection) { - this.connection = connection; + UsersTcpClient(InternalTcpClient tcpClient) { + this.tcpClient = tcpClient; } @Override public Optional getUser(UserId userId) { var payload = toBytes(userId); - var response = connection.send(GET_USER_CODE, payload); + var response = tcpClient.send(GET_USER_CODE, payload); if (response.isReadable()) { return Optional.of(readUserInfoDetails(response)); } @@ -41,7 +41,7 @@ public Optional getUser(UserId userId) { @Override public List getUsers() { - var response = connection.send(GET_USERS_CODE); + var response = tcpClient.send(GET_USERS_CODE); List users = new ArrayList<>(); while (response.isReadable()) { users.add(BytesDeserializer.readUserInfo(response)); @@ -62,14 +62,14 @@ public UserInfoDetails createUser(String username, String password, UserStatus s payload.writeBytes(permissionBytes); }, () -> payload.writeByte(0)); - var response = connection.send(CREATE_USER_CODE, payload); + var response = tcpClient.send(CREATE_USER_CODE, payload); return readUserInfoDetails(response); } @Override public void deleteUser(UserId userId) { var payload = toBytes(userId); - connection.send(DELETE_USER_CODE, payload); + tcpClient.send(DELETE_USER_CODE, payload); } @Override @@ -84,7 +84,7 @@ public void updateUser(UserId userId, Optional usernameOptional, Optiona payload.writeByte(status.asCode()); }, () -> payload.writeByte(0)); - connection.send(UPDATE_USER_CODE, payload); + tcpClient.send(UPDATE_USER_CODE, payload); } @Override @@ -98,7 +98,7 @@ public void updatePermissions(UserId userId, Optional permissionsOp payload.writeBytes(permissionBytes); }, () -> payload.writeByte(0)); - connection.send(UPDATE_PERMISSIONS_CODE, payload); + tcpClient.send(UPDATE_PERMISSIONS_CODE, payload); } @Override @@ -107,7 +107,7 @@ public void changePassword(UserId userId, String currentPassword, String newPass payload.writeBytes(nameToBytes(currentPassword)); payload.writeBytes(nameToBytes(newPassword)); - connection.send(CHANGE_PASSWORD_CODE, payload); + tcpClient.send(CHANGE_PASSWORD_CODE, payload); } @Override @@ -124,7 +124,7 @@ public IdentityInfo login(String username, String password) { payload.writeIntLE(context.length()); payload.writeBytes(context.getBytes()); - var response = connection.send(LOGIN_USER_CODE, payload); + var response = tcpClient.send(LOGIN_USER_CODE, payload); var userId = response.readUnsignedIntLE(); return new IdentityInfo(userId, Optional.empty()); @@ -132,6 +132,6 @@ public IdentityInfo login(String username, String password) { @Override public void logout() { - connection.send(LOGOUT_USER_CODE); + tcpClient.send(LOGOUT_USER_CODE); } }