Skip to content

Commit

Permalink
Fix the problem of repeated registration of LoadBalancer to listen …
Browse files Browse the repository at this point in the history
…service instance changes
  • Loading branch information
Ahoo-Wang committed Sep 15, 2021
1 parent 8d7a772 commit a038334
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 45 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ ext {
set("springfoxVersion", "3.0.0")
set("metricsVersion", "4.2.0")
set("jjwtVersion", "0.11.2")
set("cosIdVersion", "1.3.14")
set("cosIdVersion", "1.3.17")
set("libraryProjects", libraryProjects)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package me.ahoo.cosky.discovery;

import com.google.common.base.Objects;
import com.google.common.base.Strings;

import java.net.URI;
Expand Down Expand Up @@ -96,4 +97,22 @@ public void setPort(int port) {
public boolean isSecure() {
return secureSchemas.contains(schema);
}

@Override
public String toString() {
return instanceId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Instance)) return false;
Instance instance = (Instance) o;
return getPort() == instance.getPort() && Objects.equal(getServiceId(), instance.getServiceId()) && Objects.equal(getSchema(), instance.getSchema()) && Objects.equal(getHost(), instance.getHost());
}

@Override
public int hashCode() {
return Objects.hashCode(getServiceId(), getSchema(), getHost(), getPort());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private void renew() {
}
}).doOnComplete(() -> {
if (log.isDebugEnabled()) {
log.debug("renew - instances size:{} start - times@[{}] taken:[{}ms].", instances.size(), times, stopwatch.elapsed(TimeUnit.MILLISECONDS));
log.debug("renew - instances size:{} end - times@[{}] taken:[{}ms].", instances.size(), times, stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
}).subscribe();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

package me.ahoo.cosky.discovery;

import lombok.var;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -66,7 +64,23 @@ public boolean isExpired() {
if (!ephemeral) {
return false;
}
var nowTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
long nowTimeSeconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
return ttlAt < nowTimeSeconds;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof ServiceInstance)) return false;
return super.equals(o);
// if (!super.equals(o)) return false;
// ServiceInstance that = (ServiceInstance) o;

// return getWeight() == that.getWeight() && isEphemeral() == that.isEphemeral() && getTtlAt() == that.getTtlAt() && Objects.equal(getMetadata(), that.getMetadata());
}

@Override
public int hashCode() {
return super.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,18 @@ public abstract class AbstractLoadBalancer<Chooser extends LoadBalancer.Chooser>

private final ConcurrentHashMap<NamespacedServiceId, Mono<Chooser>> serviceMapChooser;
private final ConsistencyRedisServiceDiscovery serviceDiscovery;
private final Listener listener;

public AbstractLoadBalancer(ConsistencyRedisServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
serviceMapChooser = new ConcurrentHashMap<>();
this.serviceMapChooser = new ConcurrentHashMap<>();
this.listener = new Listener();
}

private Mono<Chooser> ensureChooser(NamespacedServiceId namespacedServiceId) {
return serviceMapChooser.computeIfAbsent(namespacedServiceId,
key -> {
serviceDiscovery.addListener(key, new Listener());
serviceDiscovery.addListener(key, this.listener);
return serviceDiscovery.getInstances(key.getNamespace(), key.getServiceId())
.map(this::createChooser)
.cache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,33 +220,33 @@ public void onMessage(@Nullable String pattern, String channel, String message)
.filter(itc -> itc.getInstanceId().equals(instanceId))
.findFirst().orElse(ServiceInstance.NOT_FOUND);

if (ServiceChangedEvent.REGISTER.equals(message)) {
if (log.isInfoEnabled()) {
log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] add registered Instance.", pattern, channel, message);
if (ServiceInstance.NOT_FOUND.equals(cachedInstance)) {
if (!ServiceChangedEvent.REGISTER.equals(message) && !ServiceChangedEvent.RENEW.equals(message)) {
if (log.isWarnEnabled()) {
log.warn("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] not found cached Instance.", pattern, channel, message);
}
return Mono.empty();
}
return delegate.getInstance(namespace, serviceId, instanceId).doOnNext(registeredInstance -> {
if (ServiceInstance.NOT_FOUND.equals(cachedInstance)) {
cachedInstances.add(registeredInstance);
} else {
return delegate.getInstance(namespace, serviceId, instanceId).doOnNext(serviceInstance -> {
if (log.isInfoEnabled()) {
log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] add registered Instance.", pattern, channel, message);
}
cachedInstances.add(serviceInstance);
});
}

switch (message) {
case ServiceChangedEvent.REGISTER: {
return delegate.getInstance(namespace, serviceId, instanceId).doOnNext(registeredInstance -> {
cachedInstance.setSchema(registeredInstance.getSchema());
cachedInstance.setHost(registeredInstance.getHost());
cachedInstance.setPort(registeredInstance.getPort());
cachedInstance.setEphemeral(registeredInstance.isEphemeral());
cachedInstance.setTtlAt(registeredInstance.getTtlAt());
cachedInstance.setWeight(registeredInstance.getWeight());
cachedInstance.setMetadata(registeredInstance.getMetadata());
}
});
}

if (ServiceInstance.NOT_FOUND.equals(cachedInstance)) {
if (log.isWarnEnabled()) {
log.warn("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] not found cached Instance.", pattern, channel, message);
});
}
return Mono.empty();
}

switch (message) {
case ServiceChangedEvent.RENEW: {
if (log.isInfoEnabled()) {
log.info("onMessage@InstanceListener - pattern:[{}] - channel:[{}] - message:[{}] setTtlAt.", pattern, channel, message);
Expand Down
1 change: 1 addition & 0 deletions cosky-rest-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies {
implementation(platform(project(":cosky-dependencies")))
implementation("io.springfox:springfox-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-actuator")
// implementation("org.springframework.boot:spring-boot-starter-mail")
implementation(project(":spring-cloud-starter-cosky-config"))
implementation(project(":spring-cloud-starter-cosky-discovery"))
implementation("com.google.guava:guava")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public StatController(NamespaceService namespaceService, ConfigService configSer

@GetMapping
public Mono<GetStatResponse> getStat(@PathVariable String namespace) {
var getNamespacesFuture = namespaceService.getNamespaces();
var getConfigsFuture = configService.getConfigs(namespace);
var getServiceStatsFuture = serviceStatistic.getServiceStats(namespace);
Mono<Set<String>> getNamespacesFuture = namespaceService.getNamespaces();
Mono<Set<String>> getConfigsFuture = configService.getConfigs(namespace);
Mono<List<ServiceStat>> getServiceStatsFuture = serviceStatistic.getServiceStats(namespace);

return Mono.zip(getNamespacesFuture, getConfigsFuture, getServiceStatsFuture)
.map(tuple -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@
package me.ahoo.cosky.rest.job;

import lombok.extern.slf4j.Slf4j;
import lombok.var;
import me.ahoo.cosky.core.NamespaceService;
import me.ahoo.cosky.core.NamespacedContext;
import me.ahoo.cosky.discovery.ServiceStatistic;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

import java.util.concurrent.CompletableFuture;

/**
* @author ahoo wang
Expand All @@ -43,18 +39,17 @@ public void doStatService() {
if (log.isInfoEnabled()) {
log.info("doStatService - start.");
}
var currentNamespace = NamespacedContext.GLOBAL.getNamespace();
namespaceService.getNamespaces().flatMap(namespaces -> {

final String currentNamespace = NamespacedContext.GLOBAL.getNamespace();
namespaceService.getNamespaces()
.flatMapIterable(namespaces -> {
if (!namespaces.contains(currentNamespace)) {
return namespaceService.setNamespace(currentNamespace);
namespaceService.setNamespace(currentNamespace).subscribe();
}
var futures = namespaces.stream()
.map(serviceStatistic::statService)
.toArray(Mono[]::new);
return Mono.when(futures);
}).doOnSuccess(nil -> {
log.info("doStatService - end.");
return namespaces;
})
.flatMap(serviceStatistic::statService)
.doOnComplete(() -> log.info("doStatService - end."))
.subscribe();

}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#

group=me.ahoo.cosky
version=1.3.5
version=1.3.8

description=High-performance, low-cost microservice governance platform. Service Discovery and Configuration Service.
website=https://github.com/Ahoo-Wang/cosky
Expand Down
6 changes: 3 additions & 3 deletions k8s/docker/rest-api-local/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# docker buildx build --push --platform linux/arm/v7 --build-arg COSKY_VERSION=1.3.5 --build-arg JDK_VERSION=armv7l-centos-jdk-11.0.11_9-slim -t ahoowang/cosky-rest-api:1.3.5-armv7 .
# docker buildx build --push --platform linux/amd64,linux/arm64 --build-arg COSKY_VERSION=1.3.5 --build-arg JDK_VERSION=jdk11u-centos-nightly-slim -t ahoowang/cosky-rest-api:1.3.5 .
# docker buildx build --push --platform linux/arm/v7 --build-arg COSKY_VERSION=1.3.8 --build-arg JDK_VERSION=armv7l-centos-jdk-11.0.11_9-slim -t ahoowang/cosky-rest-api:1.3.8-armv7 .
# docker buildx build --push --platform linux/amd64,linux/arm64 --build-arg COSKY_VERSION=1.3.8 --build-arg JDK_VERSION=jdk11u-centos-nightly-slim -t ahoowang/cosky-rest-api:1.3.8 .

ARG JDK_VERSION=jdk11u-centos-nightly-slim
ARG COSKY_VERSION=1.3.5
ARG COSKY_VERSION=1.3.8
ARG COSKY_HOME=/cosky
FROM adoptopenjdk/openjdk11:${JDK_VERSION} AS base

Expand Down

0 comments on commit a038334

Please sign in to comment.