diff --git a/README.md b/README.md
index 546aa2ad..f7cfcb82 100644
--- a/README.md
+++ b/README.md
@@ -32,7 +32,7 @@ between process cache and Redis.
> Kotlin DSL
``` kotlin
- val coskyVersion = "1.2.5";
+ val coskyVersion = "1.2.7";
implementation("me.ahoo.cosky:spring-cloud-starter-cosky-config:${coskyVersion}")
implementation("me.ahoo.cosky:spring-cloud-starter-cosky-discovery:${coskyVersion}")
implementation("org.springframework.cloud:spring-cloud-starter-loadbalancer:3.0.3")
@@ -50,7 +50,7 @@ between process cache and Redis.
4.0.0
demo
- 1.2.5
+ 1.2.7
@@ -99,21 +99,21 @@ logging:
#### Option 1:Download the executable file
-> Download [cosky-rest-api-server](https://github.com/Ahoo-Wang/cosky/releases/download/1.2.5/cosky-rest-api-1.2.5.tar)
+> Download [cosky-rest-api-server](https://github.com/Ahoo-Wang/cosky/releases/download/1.2.7/cosky-rest-api-1.2.7.tar)
-> tar *cosky-rest-api-1.2.5.tar*
+> tar *cosky-rest-api-1.2.7.tar*
```shell
-cd cosky-rest-api-1.2.5
-# Working directory: cosky-rest-api-1.2.5
+cd cosky-rest-api-1.2.7
+# Working directory: cosky-rest-api-1.2.7
bin/cosky-rest-api --server.port=8080 --cosky.redis.uri=redis://localhost:6379
```
#### Option 2:Run On Docker
```shell
-docker pull ahoowang/cosky-rest-api:1.2.5
-docker run --name cosky-rest-api -d -p 8080:8080 --link redis -e COSKY_REDIS_URI=redis://redis:6379 ahoowang/cosky-rest-api:1.2.5
+docker pull ahoowang/cosky-rest-api:1.2.7
+docker run --name cosky-rest-api -d -p 8080:8080 --link redis -e COSKY_REDIS_URI=redis://redis:6379 ahoowang/cosky-rest-api:1.2.7
```
#### Option 3:Run On Kubernetes
@@ -141,7 +141,7 @@ spec:
value: standalone
- name: COSKY_REDIS_URI
value: redis://redis-uri:6379
- image: ahoowang/cosky-rest-api:1.2.5
+ image: ahoowang/cosky-rest-api:1.2.7
name: cosky-rest-api
ports:
- containerPort: 8080
@@ -306,7 +306,7 @@ spec:
``` shell
gradle cosky-config:jmh
# or
-java -jar cosky-config/build/libs/cosky-config-1.2.5-jmh.jar -bm thrpt -t 25 -wi 1 -rf json -f 1
+java -jar cosky-config/build/libs/cosky-config-1.2.7-jmh.jar -bm thrpt -t 25 -wi 1 -rf json -f 1
```
```
@@ -321,7 +321,7 @@ RedisConfigServiceBenchmark.setConfig thrpt 140461.112
``` shell
gradle cosky-discovery:jmh
# or
-java -jar cosky-discovery/build/libs/cosky-discovery-1.2.5-jmh.jar -bm thrpt -t 25 -wi 1 -rf json -f 1
+java -jar cosky-discovery/build/libs/cosky-discovery-1.2.7-jmh.jar -bm thrpt -t 25 -wi 1 -rf json -f 1
```
```
diff --git a/README.zh-CN.md b/README.zh-CN.md
index fa8928e5..55cfa29b 100644
--- a/README.zh-CN.md
+++ b/README.zh-CN.md
@@ -31,7 +31,7 @@
> Kotlin DSL
``` kotlin
- val coskyVersion = "1.2.5";
+ val coskyVersion = "1.2.7";
implementation("me.ahoo.cosky:spring-cloud-starter-cosky-config:${coskyVersion}")
implementation("me.ahoo.cosky:spring-cloud-starter-cosky-discovery:${coskyVersion}")
implementation("org.springframework.cloud:spring-cloud-starter-loadbalancer:3.0.3")
@@ -49,7 +49,7 @@
4.0.0
demo
- 1.2.5
+ 1.2.7
@@ -98,21 +98,21 @@ logging:
#### 方式一:下载可执行文件
-> 下载 [rest-api-server](https://github.com/Ahoo-Wang/cosky/releases/download/1.2.5/cosky-rest-api-1.2.5.tar)
+> 下载 [rest-api-server](https://github.com/Ahoo-Wang/cosky/releases/download/1.2.7/cosky-rest-api-1.2.7.tar)
-> 解压 *cosky-rest-api-1.2.5.tar*
+> 解压 *cosky-rest-api-1.2.7.tar*
```shell
-cd cosky-rest-api-1.2.5
-# 工作目录: cosky-rest-api-1.2.5
+cd cosky-rest-api-1.2.7
+# 工作目录: cosky-rest-api-1.2.7
bin/cosky-rest-api --server.port=8080 --cosky.redis.uri=redis://localhost:6379
```
#### 方式二:在 Docker 中运行
```shell
-docker pull ahoowang/cosky-rest-api:1.2.5
-docker run --name cosky-rest-api -d -p 8080:8080 --link redis -e COSKY_REDIS_URI=redis://redis:6379 ahoowang/cosky-rest-api:1.2.5
+docker pull ahoowang/cosky-rest-api:1.2.7
+docker run --name cosky-rest-api -d -p 8080:8080 --link redis -e COSKY_REDIS_URI=redis://redis:6379 ahoowang/cosky-rest-api:1.2.7
```
#### 方式三:在 Kubernetes 中运行
@@ -140,7 +140,7 @@ spec:
value: standalone
- name: COSKY_REDIS_URI
value: redis://redis-uri:6379
- image: ahoowang/cosky-rest-api:1.2.5
+ image: ahoowang/cosky-rest-api:1.2.7
name: cosky-rest-api
ports:
- containerPort: 8080
@@ -306,7 +306,7 @@ spec:
``` shell
gradle cosky-config:jmh
# or
-java -jar cosky-config/build/libs/cosky-config-1.2.5-jmh.jar -bm thrpt -t 25 -wi 1 -rf json -f 1
+java -jar cosky-config/build/libs/cosky-config-1.2.7-jmh.jar -bm thrpt -t 25 -wi 1 -rf json -f 1
```
```
@@ -321,7 +321,7 @@ RedisConfigServiceBenchmark.setConfig thrpt 140461.112
``` shell
gradle cosky-discovery:jmh
# or
-java -jar cosky-discovery/build/libs/cosky-discovery-1.2.5-jmh.jar -bm thrpt -t 25 -wi 1 -rf json -f 1
+java -jar cosky-discovery/build/libs/cosky-discovery-1.2.7-jmh.jar -bm thrpt -t 25 -wi 1 -rf json -f 1
```
```
diff --git a/cosky-config/src/main/java/me/ahoo/cosky/config/redis/ConsistencyRedisConfigService.java b/cosky-config/src/main/java/me/ahoo/cosky/config/redis/ConsistencyRedisConfigService.java
index 78975ab2..54065374 100644
--- a/cosky-config/src/main/java/me/ahoo/cosky/config/redis/ConsistencyRedisConfigService.java
+++ b/cosky-config/src/main/java/me/ahoo/cosky/config/redis/ConsistencyRedisConfigService.java
@@ -13,8 +13,9 @@
package me.ahoo.cosky.config.redis;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
-import lombok.var;
import me.ahoo.cosky.config.*;
import me.ahoo.cosky.core.NamespacedContext;
import me.ahoo.cosky.core.listener.ChannelTopic;
@@ -61,18 +62,21 @@ public CompletableFuture> getConfigs(String namespace) {
@Override
public CompletableFuture getConfig(String configId) {
- return getConfig(NamespacedContext.GLOBAL.getNamespace(), configId);
+ return getConfig(NamespacedContext.GLOBAL.getRequiredNamespace(), configId);
}
@Override
public CompletableFuture getConfig(String namespace, String configId) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(configId), "configId can not be empty!");
+
return configMap.computeIfAbsent(NamespacedConfigId.of(namespace, configId), (_configId) -> addListener(namespace, configId).
thenCompose(nil -> delegate.getConfig(namespace, configId)));
}
private CompletableFuture addListener(String namespace, String configId) {
- var topicStr = ConfigKeyGenerator.getConfigKey(namespace, configId);
- var configTopic = ChannelTopic.of(topicStr);
+ String topicStr = ConfigKeyGenerator.getConfigKey(namespace, configId);
+ ChannelTopic configTopic = ChannelTopic.of(topicStr);
return messageListenable.addListener(configTopic, configListener);
}
@@ -171,10 +175,10 @@ public void onMessage(Topic topic, String channel, String message) {
log.info("onMessage@ConfigListener - topic:[{}] - channel:[{}] - message:[{}]", topic, channel, message);
}
- final var configkey = channel;
- var namespacedConfigId = ConfigKeyGenerator.getConfigIdOfKey(configkey);
+ final String configkey = channel;
+ NamespacedConfigId namespacedConfigId = ConfigKeyGenerator.getConfigIdOfKey(configkey);
configMap.put(namespacedConfigId, delegate.getConfig(namespacedConfigId.getNamespace(), namespacedConfigId.getConfigId()));
- var configChangedListeners = configMapListener.get(namespacedConfigId);
+ CopyOnWriteArraySet configChangedListeners = configMapListener.get(namespacedConfigId);
if (Objects.isNull(configChangedListeners) || configChangedListeners.isEmpty()) {
return;
}
diff --git a/cosky-config/src/main/java/me/ahoo/cosky/config/redis/RedisConfigService.java b/cosky-config/src/main/java/me/ahoo/cosky/config/redis/RedisConfigService.java
index d03aa8ce..f65c1b35 100644
--- a/cosky-config/src/main/java/me/ahoo/cosky/config/redis/RedisConfigService.java
+++ b/cosky-config/src/main/java/me/ahoo/cosky/config/redis/RedisConfigService.java
@@ -15,11 +15,12 @@
import com.google.common.base.Charsets;
import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.hash.Hashing;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import lombok.extern.slf4j.Slf4j;
-import lombok.var;
import me.ahoo.cosky.config.*;
import me.ahoo.cosky.core.NamespacedContext;
@@ -43,15 +44,17 @@ public RedisConfigService(RedisClusterAsyncCommands redisCommand
@Override
public CompletableFuture> getConfigs() {
- return getConfigs(NamespacedContext.GLOBAL.getNamespace());
+ return getConfigs(NamespacedContext.GLOBAL.getRequiredNamespace());
}
@Override
public CompletableFuture> getConfigs(String namespace) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+
if (log.isDebugEnabled()) {
log.debug("getConfigs @ namespace:[{}].", namespace);
}
- var configIdxKey = ConfigKeyGenerator.getConfigIdxKey(namespace);
+ String configIdxKey = ConfigKeyGenerator.getConfigIdxKey(namespace);
return redisCommands.smembers(configIdxKey).thenApply(configKeySet ->
configKeySet.stream()
.map(configKey -> ConfigKeyGenerator.getConfigIdOfKey(configKey).getConfigId()
@@ -60,15 +63,18 @@ public CompletableFuture> getConfigs(String namespace) {
@Override
public CompletableFuture getConfig(String configId) {
- return getConfig(NamespacedContext.GLOBAL.getNamespace(), configId);
+ return getConfig(NamespacedContext.GLOBAL.getRequiredNamespace(), configId);
}
@Override
public CompletableFuture getConfig(String namespace, String configId) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(configId), "configId can not be empty!");
+
if (log.isDebugEnabled()) {
log.debug("getConfig - configId:[{}] @ namespace:[{}].", configId, namespace);
}
- var configKey = ConfigKeyGenerator.getConfigKey(namespace, configId);
+ String configKey = ConfigKeyGenerator.getConfigKey(namespace, configId);
return getAndDecodeConfig(configKey, ConfigCodec::decode);
}
@@ -79,11 +85,14 @@ public CompletableFuture getConfig(String namespace, String configId) {
*/
@Override
public CompletableFuture setConfig(String configId, String data) {
- return setConfig(NamespacedContext.GLOBAL.getNamespace(), configId, data);
+ return setConfig(NamespacedContext.GLOBAL.getRequiredNamespace(), configId, data);
}
@Override
public CompletableFuture setConfig(String namespace, String configId, String data) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(configId), "configId can not be empty!");
+
String hash = Hashing.sha256().hashString(data, Charsets.UTF_8).toString();
if (log.isInfoEnabled()) {
log.info("setConfig - configId:[{}] - hash:[{}] @ namespace:[{}].", configId, hash, namespace);
@@ -97,14 +106,18 @@ public CompletableFuture setConfig(String namespace, String configId, S
@Override
public CompletableFuture removeConfig(String configId) {
- return removeConfig(NamespacedContext.GLOBAL.getNamespace(), configId);
+ return removeConfig(NamespacedContext.GLOBAL.getRequiredNamespace(), configId);
}
@Override
public CompletableFuture removeConfig(String namespace, String configId) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(configId), "configId can not be empty!");
+
if (log.isInfoEnabled()) {
log.info("removeConfig - configId:[{}] @ namespace:[{}].", configId, namespace);
}
+
return ConfigRedisScripts.doConfigRemove(redisCommands, sha -> {
String[] keys = {namespace};
String[] values = {configId};
@@ -114,17 +127,23 @@ public CompletableFuture removeConfig(String namespace, String configId
@Override
public CompletableFuture containsConfig(String namespace, String configId) {
- var configKey = ConfigKeyGenerator.getConfigKey(namespace, configId);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(configId), "configId can not be empty!");
+
+ String configKey = ConfigKeyGenerator.getConfigKey(namespace, configId);
return redisCommands.exists(configKey).thenApply(count -> count > 0).toCompletableFuture();
}
@Override
public CompletableFuture rollback(String configId, int targetVersion) {
- return rollback(NamespacedContext.GLOBAL.getNamespace(), configId, targetVersion);
+ return rollback(NamespacedContext.GLOBAL.getRequiredNamespace(), configId, targetVersion);
}
@Override
public CompletableFuture rollback(String namespace, String configId, int targetVersion) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(configId), "configId can not be empty!");
+
if (log.isInfoEnabled()) {
log.info("rollback - configId:[{}] - targetVersion:[{}] @ namespace:[{}].", configId, targetVersion, namespace);
}
@@ -139,12 +158,15 @@ public CompletableFuture rollback(String namespace, String configId, in
@Override
public CompletableFuture> getConfigVersions(String configId) {
- return getConfigVersions(NamespacedContext.GLOBAL.getNamespace(), configId);
+ return getConfigVersions(NamespacedContext.GLOBAL.getRequiredNamespace(), configId);
}
@Override
public CompletableFuture> getConfigVersions(String namespace, String configId) {
- var configHistoryIdxKey = ConfigKeyGenerator.getConfigHistoryIdxKey(namespace, configId);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(configId), "configId can not be empty!");
+
+ String configHistoryIdxKey = ConfigKeyGenerator.getConfigHistoryIdxKey(namespace, configId);
return redisCommands.zrevrange(configHistoryIdxKey, 0, HISTORY_STOP)
.thenApply(configHistoryKeyList ->
configHistoryKeyList.stream()
@@ -156,12 +178,15 @@ public CompletableFuture> getConfigVersions(String namespace
@Override
public CompletableFuture getConfigHistory(String configId, int version) {
- return getConfigHistory(NamespacedContext.GLOBAL.getNamespace(), configId, version);
+ return getConfigHistory(NamespacedContext.GLOBAL.getRequiredNamespace(), configId, version);
}
@Override
public CompletableFuture getConfigHistory(String namespace, String configId, int version) {
- var configHistoryKey = ConfigKeyGenerator.getConfigHistoryKey(namespace, configId, version);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(configId), "configId can not be empty!");
+
+ String configHistoryKey = ConfigKeyGenerator.getConfigHistoryKey(namespace, configId, version);
return getAndDecodeConfig(configHistoryKey, ConfigCodec::decodeHistory);
}
diff --git a/cosky-core/src/main/java/me/ahoo/cosky/core/Namespaced.java b/cosky-core/src/main/java/me/ahoo/cosky/core/Namespaced.java
index a12620a0..c15fe6dd 100644
--- a/cosky-core/src/main/java/me/ahoo/cosky/core/Namespaced.java
+++ b/cosky-core/src/main/java/me/ahoo/cosky/core/Namespaced.java
@@ -26,4 +26,5 @@ public interface Namespaced {
* @return
*/
String getNamespace();
+
}
diff --git a/cosky-core/src/main/java/me/ahoo/cosky/core/NamespacedContext.java b/cosky-core/src/main/java/me/ahoo/cosky/core/NamespacedContext.java
index 0076e2ea..7dd82001 100644
--- a/cosky-core/src/main/java/me/ahoo/cosky/core/NamespacedContext.java
+++ b/cosky-core/src/main/java/me/ahoo/cosky/core/NamespacedContext.java
@@ -13,6 +13,8 @@
package me.ahoo.cosky.core;
+import com.google.common.base.Strings;
+
/**
* @author ahoo wang
*/
@@ -30,6 +32,14 @@ public interface NamespacedContext extends Namespaced {
*/
void setCurrentContextNamespace(String namespace);
+ default String getRequiredNamespace() {
+ final String namespace = getNamespace();
+ if (Strings.isNullOrEmpty(namespace)) {
+ throw new CoskyException("namespace can not be empty!");
+ }
+ return namespace;
+ }
+
static NamespacedContext of(String namespace) {
return new Default(namespace);
}
diff --git a/cosky-core/src/main/java/me/ahoo/cosky/core/redis/RedisConnectionFactory.java b/cosky-core/src/main/java/me/ahoo/cosky/core/redis/RedisConnectionFactory.java
index 44fa9bf9..dad2022a 100644
--- a/cosky-core/src/main/java/me/ahoo/cosky/core/redis/RedisConnectionFactory.java
+++ b/cosky-core/src/main/java/me/ahoo/cosky/core/redis/RedisConnectionFactory.java
@@ -17,7 +17,9 @@
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
+import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.RedisClusterClient;
+import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import io.lettuce.core.codec.StringCodec;
@@ -25,7 +27,6 @@
import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection;
import io.lettuce.core.resource.ClientResources;
import lombok.extern.slf4j.Slf4j;
-import lombok.var;
import me.ahoo.cosky.core.listener.MessageListenable;
import me.ahoo.cosky.core.listener.RedisClusterMessageListenable;
import me.ahoo.cosky.core.listener.RedisMessageListenable;
@@ -42,13 +43,19 @@ public class RedisConnectionFactory implements AutoCloseable {
private final RedisConfig redisConfig;
private final AbstractRedisClient client;
private RedisConnection shareConnection;
+ private boolean isCluster;
public RedisConnectionFactory(ClientResources clientResources, RedisConfig redisConfig) {
this.clientResources = clientResources;
this.redisConfig = redisConfig;
+ this.isCluster = RedisConfig.RedisMode.CLUSTER.equals(redisConfig.getMode());
this.client = createClient();
}
+ public boolean isCluster() {
+ return isCluster;
+ }
+
private AbstractRedisClient createClient() {
if (RedisConfig.RedisMode.CLUSTER.equals(redisConfig.getMode())) {
return RedisClusterClient.create(clientResources, redisConfig.getUrl());
@@ -80,14 +87,14 @@ public RedisClusterCommands getShareSyncCommands() {
public RedisConnection getConnection() {
if (client instanceof RedisClusterClient) {
- var clusterConnection = ((RedisClusterClient) client).connect();
+ StatefulRedisClusterConnection clusterConnection = ((RedisClusterClient) client).connect();
return new RedisConnection(clusterConnection, clusterConnection.sync(), clusterConnection.async());
}
- var redisClient = (RedisClient) client;
+ RedisClient redisClient = (RedisClient) client;
if (Objects.isNull(redisConfig.getReadFrom())) {
- var connection = redisClient.connect();
+ StatefulRedisConnection connection = redisClient.connect();
return new RedisConnection(connection, connection.sync(), connection.async());
}
diff --git a/cosky-core/src/main/java/me/ahoo/cosky/core/redis/RedisNamespaceService.java b/cosky-core/src/main/java/me/ahoo/cosky/core/redis/RedisNamespaceService.java
index 0202cdd9..7287d393 100644
--- a/cosky-core/src/main/java/me/ahoo/cosky/core/redis/RedisNamespaceService.java
+++ b/cosky-core/src/main/java/me/ahoo/cosky/core/redis/RedisNamespaceService.java
@@ -13,6 +13,8 @@
package me.ahoo.cosky.core.redis;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import me.ahoo.cosky.core.NamespaceService;
import me.ahoo.cosky.core.Namespaced;
@@ -40,6 +42,8 @@ public CompletableFuture> getNamespaces() {
@Override
public CompletableFuture setNamespace(String namespace) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+
return redisCommands.sadd(NAMESPACE_IDX_KEY, namespace)
.thenApply(affected -> affected > 0)
.toCompletableFuture();
@@ -47,6 +51,8 @@ public CompletableFuture setNamespace(String namespace) {
@Override
public CompletableFuture removeNamespace(String namespace) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+
return redisCommands.srem(NAMESPACE_IDX_KEY, namespace).thenApply(affected -> affected > 0).toCompletableFuture();
}
diff --git a/cosky-core/src/main/java/me/ahoo/cosky/core/util/RedisKeys.java b/cosky-core/src/main/java/me/ahoo/cosky/core/util/RedisKeys.java
index e4155868..31558b6f 100644
--- a/cosky-core/src/main/java/me/ahoo/cosky/core/util/RedisKeys.java
+++ b/cosky-core/src/main/java/me/ahoo/cosky/core/util/RedisKeys.java
@@ -13,6 +13,7 @@
package me.ahoo.cosky.core.util;
+import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
/**
@@ -24,10 +25,11 @@ public final class RedisKeys {
public static final String WRAP_RIGHT = "}";
private RedisKeys() {
+
}
public static boolean isCluster(RedisClusterAsyncCommands asyncCommands) {
- return true;
+ return asyncCommands instanceof RedisAdvancedClusterAsyncCommands;
}
public static String ofKey(RedisClusterAsyncCommands asyncCommands, String key) {
@@ -41,8 +43,25 @@ public static String ofKey(boolean isCluster, String key) {
return hashTag(key);
}
+ /**
+ * The first '{' index and the first '{' after '}' key
+ *
+ * {system} -> system
+ *
+ * {{system} -> {system
+ *
+ * {{system}} -> {system
+ *
+ * @param key redis key
+ * @return If the key meets the hashtag specification, return true
+ */
public static boolean hasWrap(String key) {
- return key.startsWith(WRAP_LEFT) && key.endsWith(WRAP_RIGHT);
+ int leftIndex = key.indexOf(WRAP_LEFT);
+ if (leftIndex == -1) {
+ return false;
+ }
+ int rightIdx = key.substring(leftIndex).indexOf(WRAP_RIGHT);
+ return rightIdx > -1;
}
public static String wrap(String key) {
@@ -53,7 +72,10 @@ public static String unwrap(String key) {
if (!hasWrap(key)) {
return key;
}
- return key.substring(1, key.length() - 1);
+ int leftIndex = key.indexOf(WRAP_LEFT);
+ int rightIdx = key.substring(leftIndex).indexOf(WRAP_RIGHT);
+
+ return key.substring(leftIndex + 1, leftIndex + rightIdx);
}
public static String hashTag(String key) {
diff --git a/cosky-core/src/test/java/me/ahoo/cosky/core/util/RedisKeysTest.java b/cosky-core/src/test/java/me/ahoo/cosky/core/util/RedisKeysTest.java
index 877ea10d..3021cc2b 100644
--- a/cosky-core/src/test/java/me/ahoo/cosky/core/util/RedisKeysTest.java
+++ b/cosky-core/src/test/java/me/ahoo/cosky/core/util/RedisKeysTest.java
@@ -56,12 +56,16 @@ void hasWrap() {
@Test
void wrap() {
- Assertions.assertEquals("{dev}",RedisKeys.wrap("dev"));
+ Assertions.assertEquals("{dev}", RedisKeys.wrap("dev"));
}
@Test
void unwrap() {
- Assertions.assertEquals("dev",RedisKeys.unwrap("{dev}"));
+ Assertions.assertEquals("dev", RedisKeys.unwrap("{dev}"));
+ Assertions.assertEquals("dev", RedisKeys.unwrap("cosky-{dev}"));
+ Assertions.assertEquals("{dev", RedisKeys.unwrap("cosky-{{dev}"));
+ Assertions.assertEquals("{dev", RedisKeys.unwrap("cosky-{{dev}}"));
+ Assertions.assertEquals("dev", RedisKeys.unwrap("cosky-{dev}-cosky"));
}
}
diff --git a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/ConsistencyRedisServiceDiscovery.java b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/ConsistencyRedisServiceDiscovery.java
index 72464c7a..f3901264 100644
--- a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/ConsistencyRedisServiceDiscovery.java
+++ b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/ConsistencyRedisServiceDiscovery.java
@@ -14,16 +14,18 @@
package me.ahoo.cosky.discovery.redis;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import lombok.extern.slf4j.Slf4j;
-import lombok.var;
import me.ahoo.cosky.discovery.*;
import me.ahoo.cosky.core.NamespacedContext;
import me.ahoo.cosky.core.listener.*;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
@@ -58,9 +60,11 @@ public ConsistencyRedisServiceDiscovery(ServiceDiscovery delegate, MessageListen
@Override
public CompletableFuture> getServices(String namespace) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+
return namespaceMapServices.computeIfAbsent(namespace, (_namespace) -> {
- var serviceIdxKey = DiscoveryKeyGenerator.getServiceIdxKey(namespace);
- var serviceIdxTopic = ChannelTopic.of(serviceIdxKey);
+ String serviceIdxKey = DiscoveryKeyGenerator.getServiceIdxKey(namespace);
+ ChannelTopic serviceIdxTopic = ChannelTopic.of(serviceIdxKey);
return messageListenable.addListener(serviceIdxTopic, serviceIdxListener)
.thenCompose(nil -> delegate.getServices(namespace));
});
@@ -68,16 +72,19 @@ public CompletableFuture> getServices(String namespace) {
@Override
public CompletableFuture> getServices() {
- return getServices(NamespacedContext.GLOBAL.getNamespace());
+ return getServices(NamespacedContext.GLOBAL.getRequiredNamespace());
}
@Override
public CompletableFuture> getInstances(String serviceId) {
- return getInstances(NamespacedContext.GLOBAL.getNamespace(), serviceId);
+ return getInstances(NamespacedContext.GLOBAL.getRequiredNamespace(), serviceId);
}
@Override
public CompletableFuture> getInstances(String namespace, String serviceId) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceId), "serviceId can not be empty!");
+
return serviceMapInstances.computeIfAbsent(NamespacedServiceId.of(namespace, serviceId), (_serviceId) ->
addListener(namespace, serviceId).
thenCompose(nil -> delegate.getInstances(namespace, serviceId)
@@ -88,9 +95,13 @@ public CompletableFuture> getInstances(String namespace, S
}
public CompletableFuture getInstance0(String namespace, String serviceId, String instanceId) {
- var namespacedServiceId = NamespacedServiceId.of(namespace, serviceId);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceId), "serviceId can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(instanceId), "instanceId can not be empty!");
+
+ NamespacedServiceId namespacedServiceId = NamespacedServiceId.of(namespace, serviceId);
- var instancesFuture = serviceMapInstances.get(namespacedServiceId);
+ CompletableFuture> instancesFuture = serviceMapInstances.get(namespacedServiceId);
if (Objects.isNull(instancesFuture)) {
return CompletableFuture.completedFuture(null);
@@ -100,7 +111,7 @@ public CompletableFuture getInstance0(String namespace, String
if (Objects.isNull(serviceInstances)) {
return null;
}
- var cachedInstanceOp = serviceInstances.stream().filter(itc -> itc.getInstanceId().equals(instanceId)).findFirst();
+ Optional cachedInstanceOp = serviceInstances.stream().filter(itc -> itc.getInstanceId().equals(instanceId)).findFirst();
return cachedInstanceOp.orElse(ServiceInstance.NOT_FOUND);
});
@@ -185,8 +196,8 @@ public Future removeListener(String namespace, String serviceId) {
}
private PatternTopic getPatternTopic(String namespace, String serviceId) {
- var instancePattern = DiscoveryKeyGenerator.getInstanceKeyPatternOfService(namespace, serviceId);
- var instanceTopic = PatternTopic.of(instancePattern);
+ String instancePattern = DiscoveryKeyGenerator.getInstanceKeyPatternOfService(namespace, serviceId);
+ PatternTopic instanceTopic = PatternTopic.of(instancePattern);
return instanceTopic;
}
@@ -198,8 +209,8 @@ public void onMessage(Topic topic, String channel, String message) {
if (log.isInfoEnabled()) {
log.info("onMessage@ServiceIdxListener - topic:[{}] - channel:[{}] - message:[{}]", topic, channel, message);
}
- var serviceIdxKey = channel;
- var namespace = DiscoveryKeyGenerator.getNamespaceOfKey(serviceIdxKey);
+ String serviceIdxKey = channel;
+ String namespace = DiscoveryKeyGenerator.getNamespaceOfKey(serviceIdxKey);
namespaceMapServices.put(namespace, delegate.getServices(namespace));
}
}
@@ -213,17 +224,17 @@ public void onMessage(Topic topic, String channel, String message) {
log.info("onMessage@InstanceListener - topic:[{}] - channel:[{}] - message:[{}]", topic, channel, message);
}
- final var instanceKey = channel;
- var namespace = DiscoveryKeyGenerator.getNamespaceOfKey(instanceKey);
- var instanceId = DiscoveryKeyGenerator.getInstanceIdOfKey(namespace, instanceKey);
- var instance = InstanceIdGenerator.DEFAULT.of(instanceId);
- var serviceId = instance.getServiceId();
+ final String instanceKey = channel;
+ String namespace = DiscoveryKeyGenerator.getNamespaceOfKey(instanceKey);
+ String instanceId = DiscoveryKeyGenerator.getInstanceIdOfKey(namespace, instanceKey);
+ Instance instance = InstanceIdGenerator.DEFAULT.of(instanceId);
+ String serviceId = instance.getServiceId();
- var namespacedServiceId = NamespacedServiceId.of(namespace, serviceId);
+ NamespacedServiceId namespacedServiceId = NamespacedServiceId.of(namespace, serviceId);
AtomicReference serviceChangedEvent = new AtomicReference<>(ServiceChangedEvent.of(namespacedServiceId, message, instance));
- var serviceChangedListeners = serviceMapListener.get(namespacedServiceId);
+ CopyOnWriteArraySet serviceChangedListeners = serviceMapListener.get(namespacedServiceId);
- var instancesFuture = serviceMapInstances.get(namespacedServiceId);
+ CompletableFuture> instancesFuture = serviceMapInstances.get(namespacedServiceId);
if (Objects.isNull(instancesFuture)) {
if (log.isInfoEnabled()) {
@@ -241,7 +252,7 @@ public void onMessage(Topic topic, String channel, String message) {
return CompletableFuture.completedFuture(null);
}
- var cachedInstance = cachedInstances.stream()
+ ServiceInstance cachedInstance = cachedInstances.stream()
.filter(itc -> itc.getInstanceId().equals(instanceId))
.findFirst().orElse(ServiceInstance.NOT_FOUND);
diff --git a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/RedisServiceDiscovery.java b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/RedisServiceDiscovery.java
index f942b485..f202e594 100644
--- a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/RedisServiceDiscovery.java
+++ b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/RedisServiceDiscovery.java
@@ -13,6 +13,8 @@
package me.ahoo.cosky.discovery.redis;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import lombok.extern.slf4j.Slf4j;
@@ -37,11 +39,14 @@ public RedisServiceDiscovery(
@Override
public CompletableFuture> getInstances(String serviceId) {
- return getInstances(NamespacedContext.GLOBAL.getNamespace(), serviceId);
+ return getInstances(NamespacedContext.GLOBAL.getRequiredNamespace(), serviceId);
}
@Override
public CompletableFuture> getInstances(String namespace, String serviceId) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceId), "serviceId can not be empty!");
+
return DiscoveryRedisScripts.doDiscoveryGetInstances(redisCommands, sha -> {
String[] keys = {namespace};
String[] values = {serviceId};
@@ -61,6 +66,10 @@ public CompletableFuture> getInstances(String namespace, S
@Override
public CompletableFuture getInstance(String namespace, String serviceId, String instanceId) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceId), "serviceId can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(instanceId), "instanceId can not be empty!");
+
return DiscoveryRedisScripts.doDiscoveryGetInstance(redisCommands, sha -> {
String[] keys = {namespace};
String[] values = {serviceId, instanceId};
@@ -76,6 +85,10 @@ public CompletableFuture getInstance(String namespace, String s
@Override
public CompletableFuture getInstanceTtl(String namespace, String serviceId, String instanceId) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceId), "serviceId can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(instanceId), "instanceId can not be empty!");
+
return DiscoveryRedisScripts.doDiscoveryGetInstanceTtl(redisCommands, sha -> {
String[] keys = {namespace};
String[] values = {serviceId, instanceId};
@@ -85,13 +98,15 @@ public CompletableFuture getInstanceTtl(String namespace, String serviceId
@Override
public CompletableFuture> getServices(String namespace) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+
var serviceIdxKey = DiscoveryKeyGenerator.getServiceIdxKey(namespace);
return redisCommands.smembers(serviceIdxKey).toCompletableFuture();
}
@Override
public CompletableFuture> getServices() {
- return getServices(NamespacedContext.GLOBAL.getNamespace());
+ return getServices(NamespacedContext.GLOBAL.getRequiredNamespace());
}
diff --git a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/RedisServiceRegistry.java b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/RedisServiceRegistry.java
index 6633072c..b733f3fa 100644
--- a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/RedisServiceRegistry.java
+++ b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/RedisServiceRegistry.java
@@ -13,6 +13,7 @@
package me.ahoo.cosky.discovery.redis;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.ScriptOutputType;
@@ -89,6 +90,9 @@ private RedisFuture register0(String namespace, String scriptSha, Servi
@Override
public CompletableFuture setService(String namespace, String serviceId) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceId), "serviceId can not be empty!");
+
if (log.isInfoEnabled()) {
log.info("setService - serviceId:[{}] @ namespace:[{}].", serviceId, namespace);
}
@@ -100,6 +104,9 @@ public CompletableFuture setService(String namespace, String serviceId)
@Override
public CompletableFuture removeService(String namespace, String serviceId) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceId), "serviceId can not be empty!");
+
if (log.isWarnEnabled()) {
log.warn("removeService - serviceId:[{}] @ namespace:[{}].", serviceId, namespace);
}
@@ -113,11 +120,16 @@ public CompletableFuture removeService(String namespace, String service
*/
@Override
public CompletableFuture register(ServiceInstance serviceInstance) {
- return register(NamespacedContext.GLOBAL.getNamespace(), serviceInstance);
+ return register(NamespacedContext.GLOBAL.getRequiredNamespace(), serviceInstance);
}
@Override
public CompletableFuture register(String namespace, ServiceInstance serviceInstance) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceInstance.getServiceId()), "serviceId can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceInstance.getSchema()), "schema can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceInstance.getHost()), "host can not be empty!");
+
ensureInstanceId(serviceInstance);
if (log.isInfoEnabled()) {
log.info("register - instanceId:[{}] @ namespace:[{}].", serviceInstance.getInstanceId(), namespace);
@@ -160,7 +172,7 @@ public Map getRegisteredEphemeralInstance
@Override
public CompletableFuture setMetadata(String serviceId, String instanceId, String key, String value) {
- return setMetadata(NamespacedContext.GLOBAL.getNamespace(), serviceId, instanceId, key, value);
+ return setMetadata(NamespacedContext.GLOBAL.getRequiredNamespace(), serviceId, instanceId, key, value);
}
@Override
@@ -171,7 +183,7 @@ public CompletableFuture setMetadata(String namespace, String serviceId
@Override
public CompletableFuture setMetadata(String serviceId, String instanceId, Map metadata) {
- return setMetadata(NamespacedContext.GLOBAL.getNamespace(), serviceId, instanceId, metadata);
+ return setMetadata(NamespacedContext.GLOBAL.getRequiredNamespace(), serviceId, instanceId, metadata);
}
@Override
@@ -181,6 +193,9 @@ public CompletableFuture setMetadata(String namespace, String serviceId
}
private CompletableFuture setMetadata0(String namespace, String instanceId, String[] args) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(instanceId), "instanceId can not be empty!");
+
if (log.isInfoEnabled()) {
log.info("setMetadata - instanceId:[{}] @ namespace:[{}].", instanceId, namespace);
}
@@ -192,11 +207,17 @@ private CompletableFuture setMetadata0(String namespace, String instanc
@Override
public CompletableFuture renew(ServiceInstance serviceInstance) {
- return renew(NamespacedContext.GLOBAL.getNamespace(), serviceInstance);
+ return renew(NamespacedContext.GLOBAL.getRequiredNamespace(), serviceInstance);
}
@Override
public CompletableFuture renew(String namespace, ServiceInstance serviceInstance) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceInstance.getServiceId()), "serviceId can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceInstance.getSchema()), "schema can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceInstance.getHost()), "host can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceInstance.getInstanceId()), "instanceId can not be empty!");
+
if (log.isDebugEnabled()) {
log.debug("renew - instanceId:[{}] @ namespace:[{}].", serviceInstance.getInstanceId(), namespace);
}
@@ -228,7 +249,7 @@ public CompletableFuture renew(String namespace, ServiceInstance servic
@Override
public CompletableFuture deregister(String serviceId, String instanceId) {
- return deregister(NamespacedContext.GLOBAL.getNamespace(), serviceId, instanceId);
+ return deregister(NamespacedContext.GLOBAL.getRequiredNamespace(), serviceId, instanceId);
}
@Override
@@ -242,6 +263,10 @@ public CompletableFuture deregister(String namespace, String serviceId,
}
private CompletableFuture deregister0(String namespace, String serviceId, String instanceId) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceId), "serviceId can not be empty!");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(instanceId), "instanceId can not be empty!");
+
return DiscoveryRedisScripts.doRegistryDeregister(redisCommands, sha -> {
String[] keys = {namespace};
String[] values = {serviceId, instanceId};
@@ -251,7 +276,7 @@ private CompletableFuture deregister0(String namespace, String serviceI
@Override
public CompletableFuture deregister(ServiceInstance serviceInstance) {
- return deregister(NamespacedContext.GLOBAL.getNamespace(), serviceInstance);
+ return deregister(NamespacedContext.GLOBAL.getRequiredNamespace(), serviceInstance);
}
@Override
diff --git a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/RedisServiceStatistic.java b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/RedisServiceStatistic.java
index d7cc469c..549e7ead 100644
--- a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/RedisServiceStatistic.java
+++ b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/RedisServiceStatistic.java
@@ -13,6 +13,7 @@
package me.ahoo.cosky.discovery.redis;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.primitives.Ints;
import io.lettuce.core.ScriptOutputType;
@@ -26,6 +27,7 @@
import me.ahoo.cosky.core.listener.PatternTopic;
import me.ahoo.cosky.core.listener.Topic;
+import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -69,7 +71,9 @@ public CompletableFuture statService(String namespace, String serviceId) {
return statService0(namespace, serviceId);
}
- private CompletableFuture statService0(String namespace, String serviceId) {
+ private CompletableFuture statService0(String namespace, @Nullable String serviceId) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+
if (log.isInfoEnabled()) {
log.info("statService @ namespace:[{}].", namespace);
}
@@ -85,12 +89,16 @@ private CompletableFuture statService0(String namespace, String serviceId)
}
public CompletableFuture countService(String namespace) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+
var serviceIdxStatKey = DiscoveryKeyGenerator.getServiceStatKey(namespace);
return redisCommands.hlen(serviceIdxStatKey).toCompletableFuture();
}
@Override
public CompletableFuture> getServiceStats(String namespace) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(namespace), "namespace can not be empty!");
+
var serviceIdxStatKey = DiscoveryKeyGenerator.getServiceStatKey(namespace);
return redisCommands.hgetall(serviceIdxStatKey).thenApply(statMap -> statMap.entrySet().stream().map(stat -> {
ServiceStat serviceStat = new ServiceStat();
@@ -111,6 +119,8 @@ public CompletableFuture getInstanceCount(String namespace) {
@Override
public CompletableFuture