Skip to content

Commit

Permalink
DDH Manager Ping Worker, Worker State Checked (#248)
Browse files Browse the repository at this point in the history
* 调整 JVM 参数适应高版本 JDK,移除内存优化参数,不分不适用,移除 DS copy

* 调整ui的打包,使无需copy dist到api resource;增加versions-maven-plugin

* 支持优先从worker templates下加载配置模版,其次从component根目录下加载

* remove spec comments

* BugFix: 修复 workerjar 打包所有 resource 下资源导致jar太大,修复没有正常日志文件输出

* Manager Ping Worker, Worker State Check

* when start service, read shell first line, use bash or sh exec scripts

* upgrade ant depend to highest version

* merge origin code,check md5 use java api
  • Loading branch information
zhzhenqin authored Jun 14, 2023
1 parent e4ca7b7 commit 13250ec
Show file tree
Hide file tree
Showing 11 changed files with 370 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public class DDHApplicationServer extends SpringBootServletInitializer {

public static void main(String[] args) {
SpringApplication.run(DDHApplicationServer.class, args);
// add shutdown hook, close and shutdown resources
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
shutdown();
}));
}

@PostConstruct
Expand All @@ -48,4 +52,11 @@ public void run() throws UnknownHostException, NoSuchAlgorithmException {
CacheUtils.put(Constants.HOSTNAME, hostName);
ActorUtils.init();
}

/**
* Master 关闭时调用
*/
public static void shutdown() {
ActorUtils.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ public static void init() throws UnknownHostException, NoSuchAlgorithmException
actorSystem.actorOf(Props.create(MasterNodeProcessingActor.class),
getActorRefName(MasterNodeProcessingActor.class));

// 节点检测 5m 检测一次
actorSystem.scheduler().schedule(
FiniteDuration.apply(60L, TimeUnit.SECONDS),
FiniteDuration.apply(5L, TimeUnit.MINUTES),
FiniteDuration.apply(30L, TimeUnit.SECONDS),
FiniteDuration.apply(300L, TimeUnit.SECONDS),
hostCheckActor,
new HostCheckCommand(),
actorSystem.dispatcher(),
Expand Down Expand Up @@ -136,6 +137,18 @@ public static ActorRef getRemoteActor(String hostname, String actorName) {
return actorRef;
}

/**
* shutdown
*/
public static void shutdown() {
if(actorSystem != null) {
try {
actorSystem.shutdown();
} catch (Exception ignore){}
actorSystem = null;
}
}

/**
* Get ActorRef name from Class name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,41 @@

package com.datasophon.api.master;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.datasophon.api.service.ClusterHostService;
import com.datasophon.api.service.ClusterInfoService;
import com.datasophon.api.service.ClusterServiceRoleInstanceService;
import com.datasophon.api.utils.SpringTool;
import com.datasophon.common.command.HostCheckCommand;
import com.datasophon.common.command.PingCommand;
import com.datasophon.common.model.HostInfo;
import com.datasophon.common.utils.ExecResult;
import com.datasophon.common.utils.PromInfoUtils;
import com.datasophon.common.utils.Result;
import com.datasophon.dao.entity.ClusterHostEntity;
import com.datasophon.dao.entity.ClusterInfoEntity;
import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity;

import com.datasophon.dao.enums.MANAGED;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.actor.UntypedActor;

/**
* 节点状态监测
*/
public class HostCheckActor extends UntypedActor {

private static final Logger logger = LoggerFactory.getLogger(HostCheckActor.class);
Expand All @@ -52,16 +66,51 @@ public void onReceive(Object msg) throws Throwable {
SpringTool.getApplicationContext().getBean(ClusterServiceRoleInstanceService.class);
ClusterInfoService clusterInfoService =
SpringTool.getApplicationContext().getBean(ClusterInfoService.class);

// Host or cluster
final HostCheckCommand hostCheckCommand = (HostCheckCommand)msg;
final HostInfo hostInfo = hostCheckCommand.getHostInfo();

// 获取当前安装并且正在运行的集群
Result result = clusterInfoService.runningClusterList();
List<ClusterInfoEntity> clusterList = (List<ClusterInfoEntity>) result.getData();

for (ClusterInfoEntity clusterInfoEntity : clusterList) {
// 获取集群上安装的 Prometheus 服务, 从 Prometheus 获取CPU、磁盘使用量等
ClusterServiceRoleInstanceEntity prometheusInstance =
roleInstanceService.getOneServiceRole("Prometheus", "", clusterInfoEntity.getId());
if (Objects.nonNull(prometheusInstance)) {
// 集群正常安装了 Prometheus
List<ClusterHostEntity> list = clusterHostService.getHostListByClusterId(clusterInfoEntity.getId());
String promUrl = "http://" + prometheusInstance.getHostname() + ":9090/api/v1/query";
for (ClusterHostEntity clusterHostEntity : list) {
if(hostInfo != null && !StringUtils.equals(clusterHostEntity.getHostname(), hostInfo.getHostname())) {
// 指定了节点,直接只处理这一个节点的
continue;
}
try {
// rpc 检测
final ActorRef pingActor = ActorUtils.getRemoteActor(clusterHostEntity.getHostname(), "pingActor");
PingCommand pingCommand = new PingCommand();
pingCommand.setMessage("ping");
Timeout timeout = new Timeout(Duration.create(180, TimeUnit.SECONDS));
Future<Object> execFuture = Patterns.ask(pingActor, pingCommand, timeout);
ExecResult execResult = (ExecResult) Await.result(execFuture, timeout.duration());
if (execResult.getExecResult()) {
logger.info("ping host: {} success", clusterHostEntity.getHostname());
} else {
logger.warn("ping host: {} fail, reason: {}", clusterHostEntity.getHostname(), execResult.getExecOut());
throw new IllegalStateException("ping host: " + clusterHostEntity.getHostname() + " failed.");
}
clusterHostEntity.setHostState(1);
clusterHostEntity.setManaged(MANAGED.YES);
} catch (Exception e) {
logger.warn("host: " + clusterHostEntity.getHostname() + " rpc error, cause: " + e.getMessage());
clusterHostEntity.setHostState(3);
clusterHostEntity.setManaged(MANAGED.NO);
// ping 失败,则修改节点状态为 false
continue;
}
try {
String hostname = clusterHostEntity.getHostname();
// 查询内存总量
Expand Down Expand Up @@ -106,12 +155,51 @@ public void onReceive(Object msg) throws Throwable {
clusterHostEntity.setAverageLoad(cpuLoad);
}
} catch (Exception e) {
logger.info(e.getMessage());
logger.warn("check cluster state error, cause: {}", e.getMessage());
}
}
if (list.size() > 0) {
clusterHostService.updateBatchById(list);
}
} else {
// 没有 Prometheus?直接获取节点,通过 rpc 检测是否启动
List<ClusterHostEntity> hosts = clusterHostService.getHostListByClusterId(clusterInfoEntity.getId());
List<ClusterHostEntity> checkedHosts = new ArrayList<>(hosts.size());
for (ClusterHostEntity host : hosts) {
if(hostInfo != null && !StringUtils.equals(host.getHostname(), hostInfo.getHostname())) {
// 指定了节点,直接只处理这一个节点的
continue;
}
// copy 一个新的,只更新状态
ClusterHostEntity checkedHost = new ClusterHostEntity();
checkedHost.setId(host.getId());
checkedHost.setCheckTime(new Date());
try {
// rpc 检测
final ActorRef pingActor = ActorUtils.getRemoteActor(host.getHostname(), "pingActor");
PingCommand pingCommand = new PingCommand();
pingCommand.setMessage("ping");
Timeout timeout = new Timeout(Duration.create(180, TimeUnit.SECONDS));
Future<Object> execFuture = Patterns.ask(pingActor, pingCommand, timeout);
ExecResult execResult = (ExecResult) Await.result(execFuture, timeout.duration());
if (execResult.getExecResult()) {
logger.info("ping host: {} success", host.getHostname());
} else {
logger.warn("ping host: {} fail, reason: {}", host.getHostname(), execResult.getExecOut());
throw new IllegalStateException("ping host: " + host.getHostname() + " failed.");
}
checkedHost.setHostState(1);
checkedHost.setManaged(MANAGED.YES);
} catch (Exception e) {
logger.warn("host: " + host.getHostname() + " rpc error, cause: " + e.getMessage());
checkedHost.setManaged(MANAGED.NO);
checkedHost.setHostState(3);
}
checkedHosts.add(checkedHost);
}
if (checkedHosts.size() > 0) {
clusterHostService.updateBatchById(checkedHosts);
}
}
}
} else {
Expand Down
6 changes: 6 additions & 0 deletions datasophon-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

<dependency>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
<version>1.10.13</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.datasophon.common.command;

import lombok.Data;

import java.io.Serializable;

/**
*
*
* @author zhenqin
*/
@Data
public class PingCommand implements Serializable {

private String message;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.datasophon.common.utils;

import com.google.common.io.CharStreams;
import com.google.common.io.LineProcessor;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang.StringUtils;
import org.apache.tools.tar.TarEntry;
import org.apache.tools.tar.TarInputStream;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.util.zip.GZIPInputStream;
/**
*
* 基本文件的特殊操作,文件MD5,从 targz 压缩包不解压读取一个文本文件,读取一个文件的第一行 等
*
* <pre>
*
* Created by zhenqin.
* User: zhenqin
* Date: 2023/4/21
* Time: 下午9:58
*
* </pre>
*
* @author zhenqin
*/
public class FileUtils {


/**
* 获取一个文件的md5值(可处理大文件)
* @return md5 value
*/
public static String md5(File file) {
try (FileInputStream fileInputStream = new FileInputStream(file);) {
MessageDigest MD5 = MessageDigest.getInstance("MD5");

byte[] buffer = new byte[8192];
int length;
while ((length = fileInputStream.read(buffer)) != -1) {
MD5.update(buffer, 0, length);
}
return new String(Hex.encodeHex(MD5.digest()));
} catch (Exception e) {
throw new IllegalStateException(e);
}
}


/**
* 从 tar.gz 的压缩包内读取一个 文本文件
* @param targz
* @param name
* @return
* @throws IOException
*/
public static String readTargzTextFile(File targz, String name, Charset charset) throws IOException {
String content = null;
TarEntry tarEntry = null;
try (TarInputStream tarInputStream = new TarInputStream(new GZIPInputStream(new FileInputStream(targz)));
BufferedReader reader = new BufferedReader(new InputStreamReader(tarInputStream, charset));){
boolean hasNext = reader.readLine() != null;
if(hasNext) {
return null;
}
while ((tarEntry = tarInputStream.getNextEntry()) != null ) {
String entryName = tarEntry.getName();
if (tarEntry.isDirectory()) {
// 如果是文件夹,创建文件夹并加速循环
continue;
}
if(entryName.endsWith(name)) {
// 找到第一个文件就结束
content = CharStreams.toString(reader);
break;
}
}
}
return content;
}


/**
* 读取文件第一行,第一行的非空行
* @param file
* @return
* @throws Exception
*/
public static String readFirstLine(File file) throws Exception {
final String firstLine = CharStreams.readLines(new FileReader(file), new LineProcessor<String>() {

String firstLine = null;

@Override
public boolean processLine(String line) throws IOException {
this.firstLine = line;
// 第一行非空则返回
return StringUtils.trimToNull(line) == null;
}

@Override
public String getResult() {
return firstLine;
}
});
return firstLine;
}
}
Loading

0 comments on commit 13250ec

Please sign in to comment.