Skip to content

Commit

Permalink
split executor in session and data (#152)
Browse files Browse the repository at this point in the history
* default disable drop connections

* lint

* start check client version cron

* don't use bolt-default-executor (#151)
  • Loading branch information
dzdx authored Dec 2, 2020
1 parent 38bae7d commit b2e430f
Show file tree
Hide file tree
Showing 59 changed files with 834 additions and 338 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
<jraft.version>1.3.5.Alpha1</jraft.version>
<metrics.version>4.0.2</metrics.version>
<commons-io.version>2.4</commons-io.version>
<jetty.version>[9.4.17.v20190418,9.4.19.v20190610]</jetty.version>
<jetty.version>9.4.19.v20190610</jetty.version>
<rocksdbjni.version>6.4.6</rocksdbjni.version>
<main.user.dir>${user.dir}</main.user.dir>
<argLine>-Dnetwork_interface_denylist=docker0</argLine>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Sets;
import io.netty.util.internal.ConcurrentSet;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;

/**
Expand All @@ -29,6 +35,7 @@
public class TaskMetrics {

private final MetricRegistry metrics;
private final Set<String> executorNames = Sets.newConcurrentHashSet();

private TaskMetrics() {
this.metrics = new MetricRegistry();
Expand All @@ -53,21 +60,45 @@ public MetricRegistry getMetricRegistry() {
}

public void registerThreadExecutor(String executorName, ThreadPoolExecutor executor) {
executorNames.add(executorName);

metrics.register(MetricRegistry.name(executorName, "queue"),
(Gauge<Integer>) () -> executor.getQueue().size());
(Gauge<Integer>) () -> executor.getQueue().size());

metrics.register(MetricRegistry.name(executorName, "current"),
(Gauge<Integer>) executor::getPoolSize);
(Gauge<Integer>) executor::getPoolSize);

metrics.register(MetricRegistry.name(executorName, "active"),
(Gauge<Integer>) executor::getActiveCount);
(Gauge<Integer>) executor::getActiveCount);

metrics.register(MetricRegistry.name(executorName, "completed"),
(Gauge<Long>) executor::getCompletedTaskCount);
(Gauge<Long>) executor::getCompletedTaskCount);

metrics.register(MetricRegistry.name(executorName, "task"),
(Gauge<Long>) executor::getTaskCount);
(Gauge<Long>) executor::getTaskCount);
}

public Set<String> getExecutorNames() {
return executorNames;
}

public String metricsString() {
final String SYMBOLIC = " └─ ";
StringBuilder sb = new StringBuilder();
sb.append("\n").append("ExecutorMetrics").append(" >>>>>>>>");
sb.append("\n");
for (String executorName : getExecutorNames()) {
MetricRegistry metricRegistry = getMetricRegistry();
Map<String, Gauge> map = metricRegistry
.getGauges((name, value) -> name.startsWith(executorName));

sb.append(SYMBOLIC).append(executorName);
map.forEach((key, gauge) -> {
String name = key.substring(executorName.length() + 1);
sb.append(", ").append(name).append(":").append(gauge.getValue());
});
sb.append("\n");
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alipay.sofa.registry.server.data.bootstrap.EnableDataServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.alipay.sofa.registry.server.data.remoting.dataserver.task.LogMetricsTask;
import org.glassfish.jersey.jackson.JacksonFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand Down Expand Up @@ -101,7 +102,7 @@
import com.alipay.sofa.registry.server.data.renew.LocalDataServerCleanHandler;
import com.alipay.sofa.registry.server.data.resource.DataDigestResource;
import com.alipay.sofa.registry.server.data.resource.HealthResource;
import com.alipay.sofa.registry.server.data.util.ThreadPoolExecutorDataServer;
import com.alipay.sofa.registry.server.data.util.DataMetricsThreadPoolExecutor;
import com.alipay.sofa.registry.util.NamedThreadFactory;
import com.alipay.sofa.registry.util.PropertySplitter;

Expand Down Expand Up @@ -512,12 +513,18 @@ public RenewNodeTask renewNodeTask() {
return new RenewNodeTask();
}

@Bean
public LogMetricsTask logMetricsTask() {
return new LogMetricsTask();
}

@Bean(name = "tasks")
public List<AbstractTask> tasks() {
List<AbstractTask> list = new ArrayList<>();
list.add(connectionRefreshTask());
list.add(connectionRefreshMetaTask());
list.add(renewNodeTask());
list.add(logMetricsTask());
return list;
}

Expand Down Expand Up @@ -556,7 +563,7 @@ public static class ExecutorConfiguration {

@Bean(name = "publishProcessorExecutor")
public ThreadPoolExecutor publishProcessorExecutor(DataServerConfig dataServerConfig) {
return new ThreadPoolExecutorDataServer("PublishProcessorExecutor",
return new DataMetricsThreadPoolExecutor("PublishProcessorExecutor",
dataServerConfig.getPublishExecutorMinPoolSize(),
dataServerConfig.getPublishExecutorMaxPoolSize(), 300, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(dataServerConfig.getPublishExecutorQueueSize()),
Expand All @@ -565,7 +572,7 @@ public ThreadPoolExecutor publishProcessorExecutor(DataServerConfig dataServerCo

@Bean(name = "renewDatumProcessorExecutor")
public ThreadPoolExecutor renewDatumProcessorExecutor(DataServerConfig dataServerConfig) {
return new ThreadPoolExecutorDataServer("RenewDatumProcessorExecutor",
return new DataMetricsThreadPoolExecutor("RenewDatumProcessorExecutor",
dataServerConfig.getRenewDatumExecutorMinPoolSize(),
dataServerConfig.getRenewDatumExecutorMaxPoolSize(), 300, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(dataServerConfig.getRenewDatumExecutorQueueSize()),
Expand All @@ -574,14 +581,25 @@ public ThreadPoolExecutor renewDatumProcessorExecutor(DataServerConfig dataServe

@Bean(name = "getDataProcessorExecutor")
public ThreadPoolExecutor getDataProcessorExecutor(DataServerConfig dataServerConfig) {
return new ThreadPoolExecutorDataServer("GetDataProcessorExecutor",
return new DataMetricsThreadPoolExecutor("GetDataProcessorExecutor",
dataServerConfig.getGetDataExecutorMinPoolSize(),
dataServerConfig.getGetDataExecutorMaxPoolSize(),
dataServerConfig.getGetDataExecutorKeepAliveTime(), TimeUnit.SECONDS,
new ArrayBlockingQueue<>(dataServerConfig.getGetDataExecutorQueueSize()),
new NamedThreadFactory("DataServer-GetDataProcessor-executor", true));
}

@Bean(name = "defaultRequestExecutor")
public ThreadPoolExecutor defaultRequestExecutor(DataServerConfig dataServerConfig) {
return new DataMetricsThreadPoolExecutor("DefaultRequestExecutor",
dataServerConfig.getDefaultRequestExecutorMinSize(),
dataServerConfig.getDefaultRequestExecutorMaxSize(),
dataServerConfig.getDefaultRequestExecutorKeepAliveTime(), TimeUnit.SECONDS,
new ArrayBlockingQueue<>(dataServerConfig.getDefaultRequestExecutorQueueSize()),
new NamedThreadFactory("DefaultRequestThread", true)

);
}
}

@Configuration
Expand All @@ -601,5 +619,4 @@ public ProvideDataProcessor datumExpireProvideDataProcessor(ProvideDataProcessor
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ public class DataServerConfig {

private int sessionServerNotifierRetryExecutorQueueSize = 10000;

private int defaultRequestExecutorMinSize = 20;

private int defaultRequestExecutorMaxSize = 400;

private int defaultRequestExecutorQueueSize = 600;
private long defaultRequestExecutorKeepAliveTime = 60;

private int logMetricsFixedDelay = 30;

private int renewEnableDelaySec = 30;

private int dataSyncDelayTimeout = 1000;
Expand Down Expand Up @@ -698,7 +707,8 @@ public Set<String> getMetaServerIpAddresses() {
if (localDataCenter != null && !localDataCenter.isEmpty()) {
Collection<String> metas = metaMap.get(localDataCenter);
if (metas != null && !metas.isEmpty()) {
metaIps = metas.stream().map(NetUtil::getIPAddressFromDomain).collect(Collectors.toSet());
metaIps = metas.stream().map(NetUtil::getIPAddressFromDomain)
.collect(Collectors.toSet());
}
}
}
Expand Down Expand Up @@ -873,4 +883,44 @@ public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
}

public int getDefaultRequestExecutorMinSize() {
return defaultRequestExecutorMinSize;
}

public void setDefaultRequestExecutorMinSize(int defaultRequestExecutorMinSize) {
this.defaultRequestExecutorMinSize = defaultRequestExecutorMinSize;
}

public int getDefaultRequestExecutorMaxSize() {
return defaultRequestExecutorMaxSize;
}

public void setDefaultRequestExecutorMaxSize(int defaultRequestExecutorMaxSize) {
this.defaultRequestExecutorMaxSize = defaultRequestExecutorMaxSize;
}

public int getDefaultRequestExecutorQueueSize() {
return defaultRequestExecutorQueueSize;
}

public void setDefaultRequestExecutorQueueSize(int defaultRequestExecutorQueueSize) {
this.defaultRequestExecutorQueueSize = defaultRequestExecutorQueueSize;
}

public long getDefaultRequestExecutorKeepAliveTime() {
return defaultRequestExecutorKeepAliveTime;
}

public void setDefaultRequestExecutorKeepAliveTime(long defaultRequestExecutorKeepAliveTime) {
this.defaultRequestExecutorKeepAliveTime = defaultRequestExecutorKeepAliveTime;
}

public int getLogMetricsFixedDelay() {
return logMetricsFixedDelay;
}

public void setLogMetricsFixedDelay(int logMetricsFixedDelay) {
this.logMetricsFixedDelay = logMetricsFixedDelay;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@ public enum StartTaskTypeEnum {
/**
* VersionCompareTask
*/
VERSION_COMPARE
VERSION_COMPARE,

LOG_METRICS
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.data.util.DataMetricsThreadPoolExecutor;
import com.alipay.sofa.registry.util.NamedThreadFactory;

/**
Expand All @@ -43,8 +44,8 @@ public class ExecutorFactory {

static {
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10);
EXECUTOR = new ThreadPoolExecutor(20, 300, 1, TimeUnit.HOURS, workQueue,
new NamedThreadFactory("CommonExecutor")) {
EXECUTOR = new DataMetricsThreadPoolExecutor("CommonExecutor", 20, 300, 1, TimeUnit.HOURS,
workQueue, new NamedThreadFactory("CommonExecutor")) {

/**
* @see ThreadPoolExecutor#afterExecute(Runnable, Throwable)
Expand All @@ -59,9 +60,11 @@ protected void afterExecute(Runnable r, Throwable t) {
}
};

NOTIFY_SESSION_CALLBACK_EXECUTOR = new ThreadPoolExecutor(10, 20, 300, TimeUnit.SECONDS,
NOTIFY_SESSION_CALLBACK_EXECUTOR = new DataMetricsThreadPoolExecutor(
"NotifySessionCallbackExecutor", 10, 20, 300, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100000), new NamedThreadFactory(
"NotifySessionCallback-executor", true));
"NotifySessionCallbackExecutor", true));

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import com.alipay.sofa.registry.server.data.remoting.dataserver.SyncDataCallback;
import com.alipay.sofa.registry.server.data.remoting.handler.AbstractClientHandler;
import com.alipay.sofa.registry.server.data.util.LocalServerStatusEnum;
import com.alipay.sofa.registry.server.data.util.ThreadPoolExecutorDataServer;
import com.alipay.sofa.registry.server.data.util.DataMetricsThreadPoolExecutor;
import com.alipay.sofa.registry.util.NamedThreadFactory;
import com.alipay.sofa.registry.util.ParaCheckUtil;

Expand Down Expand Up @@ -161,7 +161,7 @@ public Class interest() {
@Override
public Executor getExecutor() {
if (notifyExecutor == null) {
notifyExecutor = new ThreadPoolExecutorDataServer("NotifyDataSyncProcessorExecutor",
notifyExecutor = new DataMetricsThreadPoolExecutor("NotifyDataSyncProcessorExecutor",
dataServerConfig.getNotifyDataSyncExecutorMinPoolSize(),
dataServerConfig.getNotifyDataSyncExecutorMaxPoolSize(),
dataServerConfig.getNotifyDataSyncExecutorKeepAliveTime(), TimeUnit.SECONDS,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.registry.server.data.remoting.dataserver.task;

import com.alipay.remoting.ProtocolCode;
import com.alipay.remoting.ProtocolManager;
import com.alipay.remoting.rpc.protocol.RpcProtocol;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.metrics.TaskMetrics;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.event.StartTaskTypeEnum;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class LogMetricsTask extends AbstractTask {
private TaskMetrics taskMetrics = TaskMetrics.getInstance();

@Autowired
private DataServerConfig dataServerConfig;

private static final Logger EXE_LOGGER = LoggerFactory.getLogger("DATA-PROFILE-DIGEST",
"[ExecutorMetrics]");

public LogMetricsTask() {
ThreadPoolExecutor boltDefaultExecutor = (ThreadPoolExecutor) ProtocolManager
.getProtocol(ProtocolCode.fromBytes(RpcProtocol.PROTOCOL_CODE)).getCommandHandler()
.getDefaultExecutor();
taskMetrics.registerThreadExecutor("Data-BoltDefaultExecutor", boltDefaultExecutor);
}

public void printExecutorMetrics() {
EXE_LOGGER.info(taskMetrics.metricsString());
}

@Override
public void handle() {
printExecutorMetrics();
}

@Override
public int getDelay() {
return dataServerConfig.getLogMetricsFixedDelay();
}

@Override
public int getInitialDelay() {
return dataServerConfig.getLogMetricsFixedDelay();
}

@Override
public TimeUnit getTimeUnit() {
return TimeUnit.SECONDS;
}

@Override
public StartTaskTypeEnum getStartTaskTypeEnum() {
return StartTaskTypeEnum.LOG_METRICS;
}
}
Loading

0 comments on commit b2e430f

Please sign in to comment.