Skip to content

Commit

Permalink
update updateCommandStateToFailed
Browse files Browse the repository at this point in the history
  • Loading branch information
datasophon committed Nov 28, 2022
1 parent 3e6a12e commit bf208d2
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 199 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
package com.datasophon.api.controller;

import java.io.IOException;
import java.util.*;

import com.datasophon.api.service.FrameServiceService;
import com.datasophon.common.model.AlertItem;
import com.datasophon.common.model.Generators;
import com.datasophon.common.utils.FreemakerUtils;
import com.datasophon.dao.entity.FrameServiceEntity;
import freemarker.template.TemplateException;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import com.datasophon.common.utils.Result;
Expand All @@ -29,44 +21,6 @@ public class ClusterAlertQuotaController {
@Autowired
private ClusterAlertQuotaService clusterAlertQuotaService;

@Autowired
private FrameServiceService service;


/**
* 列表
*/
@RequestMapping("/generateAlertYml")
public Result list(Integer clusterId) throws IOException, TemplateException {
List<ClusterAlertQuota> list = clusterAlertQuotaService.list();
List<FrameServiceEntity> serviceList = service.list();
for (FrameServiceEntity serviceEntity : serviceList) {
Generators generators = new Generators();
generators.setFilename(serviceEntity.getServiceName().toLowerCase()+".yml");
generators.setConfigFormat("prometheus");
generators.setOutputDirectory("D:\\360downloads\\test\\");
ArrayList<AlertItem> alertItems = new ArrayList<>();
for (ClusterAlertQuota clusterAlertQuota : list) {
if(clusterAlertQuota.getServiceCategory().equals(serviceEntity.getServiceName()) && StringUtils.isNotBlank(clusterAlertQuota.getServiceRoleName())){
AlertItem alertItem = new AlertItem();
alertItem.setAlertName(clusterAlertQuota.getAlertQuotaName());
alertItem.setAlertExpr(clusterAlertQuota.getAlertExpr()+" "+ clusterAlertQuota.getCompareMethod()+" "+clusterAlertQuota.getAlertThreshold());
alertItem.setClusterId(clusterId);
alertItem.setServiceRoleName(clusterAlertQuota.getServiceRoleName());
alertItem.setAlertLevel(clusterAlertQuota.getAlertLevel().getDesc());
alertItem.setAlertAdvice(clusterAlertQuota.getAlertAdvice());
alertItem.setTriggerDuration(clusterAlertQuota.getTriggerDuration());
alertItems.add(alertItem);
}
}
if(alertItems.size() > 0){
FreemakerUtils.generatePromAlertFile(generators,alertItems,serviceEntity.getServiceName());
}
}

return Result.success();
}


/**
* 信息
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,8 @@ public Result update(@RequestBody ClusterServiceInstanceEntity clusterServiceIns
* 删除
*/
@RequestMapping("/delete")
public Result delete(@RequestBody Integer[] ids){
clusterServiceInstanceService.removeByIds(Arrays.asList(ids));

return Result.success();
public Result delete(Integer serviceInstanceIds){
return clusterServiceInstanceService.delServiceInstance(serviceInstanceIds);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@
import com.datasophon.common.command.SubmitActiveTaskNodeCommand;
import com.datasophon.common.enums.ServiceExecuteState;
import com.datasophon.common.enums.ServiceRoleType;
import com.datasophon.common.model.DAGGraph;
import com.datasophon.common.model.ServiceExecuteResultMessage;
import com.datasophon.common.model.ServiceNode;
import com.datasophon.common.model.ServiceRoleInfo;
import com.datasophon.common.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -37,23 +35,24 @@ public void onReceive(Object message) throws Throwable {
ServiceNode servicNode = dag.getNode(node);
if (result.getServiceRoleType().equals(ServiceRoleType.MASTER)) {
if (result.getServiceExecuteState().equals(ServiceExecuteState.ERROR)) {
//该节点master角色操作失败,移动到error列表
//move to error list
errorTaskList.put(node, "");
activeTaskList.remove(node);
readyToSubmitTaskList.remove(node);
completeTaskList.put(node, "");
//更改指令执行状态,依赖该节点的下游服务指令状态改为取消
logger.info("{} master roles failed , cancel all next node by hostCommandId {}",node,servicNode.getMasterRoles().get(0).getHostCommandId());
String hostCommandId = servicNode.getMasterRoles().get(0).getHostCommandId();
ProcessUtils.updateCommandStateToFailed( hostCommandId);
//cancel all next node
logger.info("{} master roles failed , cancel all next node by commandId {}", node, servicNode.getCommandId());
List<String> commandIds = new ArrayList<String>();
listCancelCommand(dag,node,commandIds);
ProcessUtils.updateCommandStateToFailed(commandIds);
} else if (result.getServiceExecuteState().equals(ServiceExecuteState.SUCCESS)) {
//该节点master角色指令执行完毕,开始执行worker节点操作
//submit worker node
ServiceNode serviceNode = dag.getNode(node);
List<ServiceRoleInfo> elseRoles = serviceNode.getElseRoles();
if (elseRoles.size() > 0) {
logger.info("start to submit worker/client roles");
for (ServiceRoleInfo elseRole : serviceNode.getElseRoles()) {
ActorRef serviceActor = ActorUtils.getLocalActor(WorkerServiceActor.class, result.getClusterCode() + "-serviceActor-" + node+"-"+elseRole.getHostname());
ActorRef serviceActor = ActorUtils.getLocalActor(WorkerServiceActor.class, result.getClusterCode() + "-serviceActor-" + node + "-" + elseRole.getHostname());
ProcessUtils.buildExecuteServiceRoleCommand(
result.getClusterId(),
result.getCommandType(),
Expand All @@ -75,14 +74,25 @@ public void onReceive(Object message) throws Throwable {
readyToSubmitTaskList.remove(node);
}
logger.info("start to submit next node");
tellToSubmitActiveTaskNode(result, dag, activeTaskList, errorTaskList, readyToSubmitTaskList, completeTaskList, submitTaskNodeActor,node);
tellToSubmitActiveTaskNode(result, dag, activeTaskList, errorTaskList, readyToSubmitTaskList, completeTaskList, submitTaskNodeActor, node);
}
}
} else {
unhandled(message);
}
}

public void listCancelCommand(DAGGraph<String, ServiceNode, String> dag, String node, List<String> commandIds) {
if (dag.getSubsequentNodes(node).size() == 0) {
return;
}
Set<String> subsequentNodes = dag.getSubsequentNodes(node);
for (String subsequentNode : subsequentNodes) {
commandIds.add(dag.getNode(subsequentNode).getCommandId());
listCancelCommand(dag, subsequentNode, commandIds);
}
}

private void tellToSubmitActiveTaskNode(ServiceExecuteResultMessage result,
DAGGraph<String, ServiceNode, String> dag,
Map<String, ServiceExecuteState> activeTaskList,
Expand All @@ -92,7 +102,7 @@ private void tellToSubmitActiveTaskNode(ServiceExecuteResultMessage result,
ActorRef submitTaskNodeActor,
String node) {
Set<String> subsequentNodes = dag.getSubsequentNodes(node);
logger.info("{}'s subsequent nodes is {}", node , subsequentNodes.toString());
logger.info("{}'s subsequent nodes is {}", node, subsequentNodes.toString());
for (String subsequentNode : subsequentNodes) {
readyToSubmitTaskList.put(subsequentNode, "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public void onReceive(Object message) throws Throwable {
for (String previousNode : previousNodes) {
if (errorTaskList.containsKey(previousNode)) {
readyToSubmitTaskList.remove(node);
}
if(!completeTaskList.containsKey(previousNode)){
readyToSubmitTaskList.remove(node);
continue;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ public interface ClusterServiceInstanceService extends IService<ClusterServiceIn
Result getServiceRoleType(Integer serviceInstanceId);

Result configVersionCompare(Integer serviceInstanceId,Integer roleGroupId);

Result delServiceInstance(Integer serviceInstanceId);
}

Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,7 @@ public interface ClusterServiceRoleInstanceService extends IService<ClusterServi
List<ClusterServiceRoleInstanceEntity> getObsoleteService(Integer id);

List<ClusterServiceRoleInstanceEntity> getStoppedRoleInstanceOnHost(Integer clusterId, String hostname, ServiceRoleState state);

void reomveRoleInstance(Integer serviceInstanceId);
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@


import com.datasophon.dao.mapper.ClusterServiceInstanceMapper;
import org.springframework.transaction.annotation.Transactional;


@Service("clusterServiceInstanceService")
@Transactional
public class ClusterServiceInstanceServiceImpl extends ServiceImpl<ClusterServiceInstanceMapper, ClusterServiceInstanceEntity> implements ClusterServiceInstanceService {

@Autowired
Expand All @@ -50,9 +52,6 @@ public class ClusterServiceInstanceServiceImpl extends ServiceImpl<ClusterServic
@Autowired
private FrameServiceRoleService frameServiceRoleService;

@Autowired
private ClusterServiceCommandService commandService;

@Autowired
private ClusterServiceRoleGroupConfigService roleGroupConfigService;

Expand Down Expand Up @@ -183,4 +182,25 @@ public Result configVersionCompare( Integer serviceInstanceId,Integer roleGroupI
}
return Result.success(map);
}

@Override
public Result delServiceInstance(Integer serviceInstanceId) {
//has role instance?
if(hasRoleInstance(serviceInstanceId)){
return Result.error("has running role instance");
}
this.removeById(serviceInstanceId);
//remove role instance
roleInstanceService.reomveRoleInstance(serviceInstanceId);
return Result.success();
}

private boolean hasRoleInstance(Integer serviceInstanceId) {

List<ClusterServiceRoleInstanceEntity> list = roleInstanceService.getRunningServiceRoleInstanceListByServiceId(serviceInstanceId);
if(list.size() > 0){
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,11 @@ public List<ClusterServiceRoleInstanceEntity> getStoppedRoleInstanceOnHost(Integ
.eq(Constants.HOSTNAME, hostname)
.eq(Constants.SERVICE_ROLE_STATE, state));
}

@Override
public void reomveRoleInstance(Integer serviceInstanceId) {
this.remove(new QueryWrapper<ClusterServiceRoleInstanceEntity>()
.eq(Constants.SERVICE_ID,serviceInstanceId)
.eq(Constants.SERVICE_ROLE_STATE,ServiceRoleState.STOP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,38 +151,37 @@ public static void saveHostInstallInfo(StartWorkerMessage message, String cluste
clusterHostService.save(clusterHostEntity);
}

public static void updateCommandStateToFailed(String hostCommandId) {
logger.info("hostCommandId is {}", hostCommandId);
//worker以及下游节点全部取消
ClusterServiceCommandHostCommandService service = SpringTool.getApplicationContext().getBean(ClusterServiceCommandHostCommandService.class);
ClusterServiceCommandHostCommandEntity hostCommand = service.getByHostCommandId(hostCommandId);
logger.info("hostCommandName is {}", hostCommand.getCommandName());
ActorRef commandActor = ActorUtils.getLocalActor(ServiceCommandActor.class, "commandActor");
List<ClusterServiceCommandHostCommandEntity> hostCommandList = service.getHostCommandListByCommandId(hostCommand.getCommandId());
for (ClusterServiceCommandHostCommandEntity hostCommandEntity : hostCommandList) {
if (hostCommandEntity.getCommandState() == CommandState.RUNNING && hostCommandEntity.getHostCommandId() != hostCommandId) {
logger.info("{} host command set to failed", hostCommandEntity.getCommandName());
hostCommandEntity.setCommandState(CommandState.FAILED);
hostCommandEntity.setCommandProgress(100);
service.updateByHostCommandId(hostCommandEntity);
UpdateCommandHostMessage message = new UpdateCommandHostMessage();
message.setCommandId(hostCommand.getCommandId());
message.setCommandHostId(hostCommandEntity.getCommandHostId());
message.setHostname(hostCommandEntity.getHostname());
if (hostCommand.getServiceRoleType() == RoleType.MASTER) {
message.setServiceRoleType(ServiceRoleType.MASTER);
} else {
message.setServiceRoleType(ServiceRoleType.WORKER);
public static void updateCommandStateToFailed(List<String> commandIds) {
for (String commandId : commandIds) {
logger.info("command id is {}", commandId);
//cancel worker and sub node
ClusterServiceCommandHostCommandService service = SpringTool.getApplicationContext().getBean(ClusterServiceCommandHostCommandService.class);
ActorRef commandActor = ActorUtils.getLocalActor(ServiceCommandActor.class, "commandActor");
List<ClusterServiceCommandHostCommandEntity> hostCommandList = service.getHostCommandListByCommandId(commandId);
for (ClusterServiceCommandHostCommandEntity hostCommandEntity : hostCommandList) {
if (hostCommandEntity.getCommandState() == CommandState.RUNNING) {
logger.info("{} host command set to failed", hostCommandEntity.getCommandName());
hostCommandEntity.setCommandState(CommandState.FAILED);
hostCommandEntity.setCommandProgress(100);
service.updateByHostCommandId(hostCommandEntity);
UpdateCommandHostMessage message = new UpdateCommandHostMessage();
message.setCommandId(commandId);
message.setCommandHostId(hostCommandEntity.getCommandHostId());
message.setHostname(hostCommandEntity.getHostname());
if (hostCommandEntity.getServiceRoleType() == RoleType.MASTER) {
message.setServiceRoleType(ServiceRoleType.MASTER);
} else {
message.setServiceRoleType(ServiceRoleType.WORKER);
}
ActorUtils.actorSystem.scheduler().scheduleOnce(
FiniteDuration.apply(3L, TimeUnit.SECONDS),
commandActor,
message,
ActorUtils.actorSystem.dispatcher(),
ActorRef.noSender());
}
ActorUtils.actorSystem.scheduler().scheduleOnce(
FiniteDuration.apply(3L, TimeUnit.SECONDS),
commandActor,
message,
ActorUtils.actorSystem.dispatcher(),
ActorRef.noSender());
}
}

}

public static void tellCommandActorResult(String serviceName, ExecuteServiceRoleCommand executeServiceRoleCommand, ServiceExecuteState state) {
Expand Down
Loading

0 comments on commit bf208d2

Please sign in to comment.