diff --git a/build.gradle.kts b/build.gradle.kts index ed3122da..769b16ee 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -51,7 +51,7 @@ ext { set("springfoxVersion", "3.0.0") set("metricsVersion", "4.2.0") set("jjwtVersion", "0.11.2") - set("cosIdVersion", "1.3.14") + set("cosIdVersion", "1.3.17") set("libraryProjects", libraryProjects) } diff --git a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/Instance.java b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/Instance.java index a231d3ce..9a443393 100644 --- a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/Instance.java +++ b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/Instance.java @@ -13,6 +13,7 @@ package me.ahoo.cosky.discovery; +import com.google.common.base.Objects; import com.google.common.base.Strings; import java.net.URI; @@ -96,4 +97,22 @@ public void setPort(int port) { public boolean isSecure() { return secureSchemas.contains(schema); } + + @Override + public String toString() { + return instanceId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Instance)) return false; + Instance instance = (Instance) o; + return getPort() == instance.getPort() && Objects.equal(getServiceId(), instance.getServiceId()) && Objects.equal(getSchema(), instance.getSchema()) && Objects.equal(getHost(), instance.getHost()); + } + + @Override + public int hashCode() { + return Objects.hashCode(getServiceId(), getSchema(), getHost(), getPort()); + } } diff --git a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/RenewInstanceService.java b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/RenewInstanceService.java index 02cf4383..6f612fdc 100644 --- a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/RenewInstanceService.java +++ b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/RenewInstanceService.java @@ -99,7 +99,7 @@ private void renew() { } }).doOnComplete(() -> { if (log.isDebugEnabled()) { - log.debug("renew - instances size:{} start - times@[{}] taken:[{}ms].", instances.size(), times, stopwatch.elapsed(TimeUnit.MILLISECONDS)); + log.debug("renew - instances size:{} end - times@[{}] taken:[{}ms].", instances.size(), times, stopwatch.elapsed(TimeUnit.MILLISECONDS)); } }).subscribe(); } diff --git a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/ServiceInstance.java b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/ServiceInstance.java index f2b59e74..18661f5b 100644 --- a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/ServiceInstance.java +++ b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/ServiceInstance.java @@ -13,8 +13,6 @@ package me.ahoo.cosky.discovery; -import lombok.var; - import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -66,7 +64,23 @@ public boolean isExpired() { if (!ephemeral) { return false; } - var nowTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); + long nowTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); return ttlAt < nowTimeSeconds; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ServiceInstance)) return false; + return super.equals(o); +// if (!super.equals(o)) return false; +// ServiceInstance that = (ServiceInstance) o; + +// return getWeight() == that.getWeight() && isEphemeral() == that.isEphemeral() && getTtlAt() == that.getTtlAt() && Objects.equal(getMetadata(), that.getMetadata()); + } + + @Override + public int hashCode() { + return super.hashCode(); + } } diff --git a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/loadbalancer/AbstractLoadBalancer.java b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/loadbalancer/AbstractLoadBalancer.java index d40ef493..5b71ec85 100644 --- a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/loadbalancer/AbstractLoadBalancer.java +++ b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/loadbalancer/AbstractLoadBalancer.java @@ -30,16 +30,18 @@ public abstract class AbstractLoadBalancer private final ConcurrentHashMap> serviceMapChooser; private final ConsistencyRedisServiceDiscovery serviceDiscovery; + private final Listener listener; public AbstractLoadBalancer(ConsistencyRedisServiceDiscovery serviceDiscovery) { this.serviceDiscovery = serviceDiscovery; - serviceMapChooser = new ConcurrentHashMap<>(); + this.serviceMapChooser = new ConcurrentHashMap<>(); + this.listener = new Listener(); } private Mono ensureChooser(NamespacedServiceId namespacedServiceId) { return serviceMapChooser.computeIfAbsent(namespacedServiceId, key -> { - serviceDiscovery.addListener(key, new Listener()); + serviceDiscovery.addListener(key, this.listener); return serviceDiscovery.getInstances(key.getNamespace(), key.getServiceId()) .map(this::createChooser) .cache(); diff --git a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/ConsistencyRedisServiceDiscovery.java b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/ConsistencyRedisServiceDiscovery.java index 398ef0e5..05fe9838 100644 --- a/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/ConsistencyRedisServiceDiscovery.java +++ b/cosky-discovery/src/main/java/me/ahoo/cosky/discovery/redis/ConsistencyRedisServiceDiscovery.java @@ -220,14 +220,24 @@ public void onMessage(@Nullable String pattern, String channel, String message) .filter(itc -> itc.getInstanceId().equals(instanceId)) .findFirst().orElse(ServiceInstance.NOT_FOUND); - if (ServiceChangedEvent.REGISTER.equals(message)) { - if (log.isInfoEnabled()) { - log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] add registered Instance.", pattern, channel, message); + if (ServiceInstance.NOT_FOUND.equals(cachedInstance)) { + if (!ServiceChangedEvent.REGISTER.equals(message) && !ServiceChangedEvent.RENEW.equals(message)) { + if (log.isWarnEnabled()) { + log.warn("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] not found cached Instance.", pattern, channel, message); + } + return Mono.empty(); } - return delegate.getInstance(namespace, serviceId, instanceId).doOnNext(registeredInstance -> { - if (ServiceInstance.NOT_FOUND.equals(cachedInstance)) { - cachedInstances.add(registeredInstance); - } else { + return delegate.getInstance(namespace, serviceId, instanceId).doOnNext(serviceInstance -> { + if (log.isInfoEnabled()) { + log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] add registered Instance.", pattern, channel, message); + } + cachedInstances.add(serviceInstance); + }); + } + + switch (message) { + case ServiceChangedEvent.REGISTER: { + return delegate.getInstance(namespace, serviceId, instanceId).doOnNext(registeredInstance -> { cachedInstance.setSchema(registeredInstance.getSchema()); cachedInstance.setHost(registeredInstance.getHost()); cachedInstance.setPort(registeredInstance.getPort()); @@ -235,18 +245,8 @@ public void onMessage(@Nullable String pattern, String channel, String message) cachedInstance.setTtlAt(registeredInstance.getTtlAt()); cachedInstance.setWeight(registeredInstance.getWeight()); cachedInstance.setMetadata(registeredInstance.getMetadata()); - } - }); - } - - if (ServiceInstance.NOT_FOUND.equals(cachedInstance)) { - if (log.isWarnEnabled()) { - log.warn("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] not found cached Instance.", pattern, channel, message); + }); } - return Mono.empty(); - } - - switch (message) { case ServiceChangedEvent.RENEW: { if (log.isInfoEnabled()) { log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] setTtlAt.", pattern, channel, message); diff --git a/cosky-rest-api/build.gradle.kts b/cosky-rest-api/build.gradle.kts index 368be298..153bd1a8 100644 --- a/cosky-rest-api/build.gradle.kts +++ b/cosky-rest-api/build.gradle.kts @@ -63,6 +63,7 @@ dependencies { implementation(platform(project(":cosky-dependencies"))) implementation("io.springfox:springfox-boot-starter") implementation("org.springframework.boot:spring-boot-starter-actuator") +// implementation("org.springframework.boot:spring-boot-starter-mail") implementation(project(":spring-cloud-starter-cosky-config")) implementation(project(":spring-cloud-starter-cosky-discovery")) implementation("com.google.guava:guava") diff --git a/cosky-rest-api/src/main/java/me/ahoo/cosky/rest/controller/StatController.java b/cosky-rest-api/src/main/java/me/ahoo/cosky/rest/controller/StatController.java index 090c0152..53ba0e84 100644 --- a/cosky-rest-api/src/main/java/me/ahoo/cosky/rest/controller/StatController.java +++ b/cosky-rest-api/src/main/java/me/ahoo/cosky/rest/controller/StatController.java @@ -49,9 +49,9 @@ public StatController(NamespaceService namespaceService, ConfigService configSer @GetMapping public Mono getStat(@PathVariable String namespace) { - var getNamespacesFuture = namespaceService.getNamespaces(); - var getConfigsFuture = configService.getConfigs(namespace); - var getServiceStatsFuture = serviceStatistic.getServiceStats(namespace); + Mono> getNamespacesFuture = namespaceService.getNamespaces(); + Mono> getConfigsFuture = configService.getConfigs(namespace); + Mono> getServiceStatsFuture = serviceStatistic.getServiceStats(namespace); return Mono.zip(getNamespacesFuture, getConfigsFuture, getServiceStatsFuture) .map(tuple -> { diff --git a/cosky-rest-api/src/main/java/me/ahoo/cosky/rest/job/StatServiceJob.java b/cosky-rest-api/src/main/java/me/ahoo/cosky/rest/job/StatServiceJob.java index 3e1e8114..1920ba2a 100644 --- a/cosky-rest-api/src/main/java/me/ahoo/cosky/rest/job/StatServiceJob.java +++ b/cosky-rest-api/src/main/java/me/ahoo/cosky/rest/job/StatServiceJob.java @@ -14,15 +14,11 @@ package me.ahoo.cosky.rest.job; import lombok.extern.slf4j.Slf4j; -import lombok.var; import me.ahoo.cosky.core.NamespaceService; import me.ahoo.cosky.core.NamespacedContext; import me.ahoo.cosky.discovery.ServiceStatistic; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import reactor.core.publisher.Mono; - -import java.util.concurrent.CompletableFuture; /** * @author ahoo wang @@ -43,18 +39,17 @@ public void doStatService() { if (log.isInfoEnabled()) { log.info("doStatService - start."); } - var currentNamespace = NamespacedContext.GLOBAL.getNamespace(); - namespaceService.getNamespaces().flatMap(namespaces -> { + + final String currentNamespace = NamespacedContext.GLOBAL.getNamespace(); + namespaceService.getNamespaces() + .flatMapIterable(namespaces -> { if (!namespaces.contains(currentNamespace)) { - return namespaceService.setNamespace(currentNamespace); + namespaceService.setNamespace(currentNamespace).subscribe(); } - var futures = namespaces.stream() - .map(serviceStatistic::statService) - .toArray(Mono[]::new); - return Mono.when(futures); - }).doOnSuccess(nil -> { - log.info("doStatService - end."); + return namespaces; }) + .flatMap(serviceStatistic::statService) + .doOnComplete(() -> log.info("doStatService - end.")) .subscribe(); } diff --git a/gradle.properties b/gradle.properties index 71bf1557..b8840241 100644 --- a/gradle.properties +++ b/gradle.properties @@ -12,7 +12,7 @@ # group=me.ahoo.cosky -version=1.3.5 +version=1.3.8 description=High-performance, low-cost microservice governance platform. Service Discovery and Configuration Service. website=https://github.com/Ahoo-Wang/cosky diff --git a/k8s/docker/rest-api-local/Dockerfile b/k8s/docker/rest-api-local/Dockerfile index 199a0e07..5cdaad07 100644 --- a/k8s/docker/rest-api-local/Dockerfile +++ b/k8s/docker/rest-api-local/Dockerfile @@ -1,8 +1,8 @@ -# docker buildx build --push --platform linux/arm/v7 --build-arg COSKY_VERSION=1.3.5 --build-arg JDK_VERSION=armv7l-centos-jdk-11.0.11_9-slim -t ahoowang/cosky-rest-api:1.3.5-armv7 . -# docker buildx build --push --platform linux/amd64,linux/arm64 --build-arg COSKY_VERSION=1.3.5 --build-arg JDK_VERSION=jdk11u-centos-nightly-slim -t ahoowang/cosky-rest-api:1.3.5 . +# docker buildx build --push --platform linux/arm/v7 --build-arg COSKY_VERSION=1.3.8 --build-arg JDK_VERSION=armv7l-centos-jdk-11.0.11_9-slim -t ahoowang/cosky-rest-api:1.3.8-armv7 . +# docker buildx build --push --platform linux/amd64,linux/arm64 --build-arg COSKY_VERSION=1.3.8 --build-arg JDK_VERSION=jdk11u-centos-nightly-slim -t ahoowang/cosky-rest-api:1.3.8 . ARG JDK_VERSION=jdk11u-centos-nightly-slim -ARG COSKY_VERSION=1.3.5 +ARG COSKY_VERSION=1.3.8 ARG COSKY_HOME=/cosky FROM adoptopenjdk/openjdk11:${JDK_VERSION} AS base