Skip to content

Commit

Permalink
ignore timeout exception for consensus pipe operate procedures (#14116)
Browse files Browse the repository at this point in the history
  • Loading branch information
Pengzna authored Nov 22, 2024
1 parent 22cf944 commit 7852cb6
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,15 @@ public TSStatus dropPipePlugin(TDropPipePluginReq req) {
}
}

public TSStatus createConsensusPipe(TCreatePipeReq req) {
try {
final long procedureId = executor.submitProcedure(new CreatePipeProcedureV2(req));
return handleConsensusPipeProcedure(procedureId);
} catch (Exception e) {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
}
}

public TSStatus createPipe(TCreatePipeReq req) {
try {
final long procedureId = executor.submitProcedure(new CreatePipeProcedureV2(req));
Expand Down Expand Up @@ -1138,6 +1147,15 @@ public TSStatus alterPipe(TAlterPipeReq req) {
}
}

public TSStatus startConsensusPipe(String pipeName) {
try {
final long procedureId = executor.submitProcedure(new StartPipeProcedureV2(pipeName));
return handleConsensusPipeProcedure(procedureId);
} catch (Exception e) {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
}
}

public TSStatus startPipe(String pipeName) {
try {
final long procedureId = executor.submitProcedure(new StartPipeProcedureV2(pipeName));
Expand All @@ -1155,6 +1173,15 @@ public TSStatus startPipe(String pipeName) {
}
}

public TSStatus stopConsensusPipe(String pipeName) {
try {
final long procedureId = executor.submitProcedure(new StopPipeProcedureV2(pipeName));
return handleConsensusPipeProcedure(procedureId);
} catch (Exception e) {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
}
}

public TSStatus stopPipe(String pipeName) {
try {
final long procedureId = executor.submitProcedure(new StopPipeProcedureV2(pipeName));
Expand All @@ -1172,6 +1199,15 @@ public TSStatus stopPipe(String pipeName) {
}
}

public TSStatus dropConsensusPipe(String pipeName) {
try {
final long procedureId = executor.submitProcedure(new DropPipeProcedureV2(pipeName));
return handleConsensusPipeProcedure(procedureId);
} catch (Exception e) {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()).setMessage(e.getMessage());
}
}

public TSStatus dropPipe(String pipeName) {
try {
final long procedureId = executor.submitProcedure(new DropPipeProcedureV2(pipeName));
Expand All @@ -1189,6 +1225,23 @@ public TSStatus dropPipe(String pipeName) {
}
}

private TSStatus handleConsensusPipeProcedure(final long procedureId) {
final List<TSStatus> statusList = new ArrayList<>();
final boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId), statusList);
if (isSucceed) {
return statusList.get(0);
} else {
// if time out, optimistically believe that this procedure will execute successfully.
if (statusList.get(0).getMessage().equals(PROCEDURE_TIMEOUT_MESSAGE)) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
// otherwise, some exceptions must have occurred, throw them.
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(wrapTimeoutMessageForPipeProcedure(statusList.get(0).getMessage()));
}
}

public void pipeHandleLeaderChange(
Map<TConsensusGroupId, Pair<Integer, Integer>> dataRegionGroupToOldAndNewLeaderPairMap) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.pipe.coordinator.task;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.confignode.consensus.request.read.pipe.task.ShowPipePlanV2;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
Expand Down Expand Up @@ -115,7 +116,12 @@ public boolean isLocked() {

/** Caller should ensure that the method is called in the lock {@link #lock()}. */
public TSStatus createPipe(TCreatePipeReq req) {
final TSStatus status = configManager.getProcedureManager().createPipe(req);
TSStatus status = null;
if (req.getPipeName().startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
status = configManager.getProcedureManager().createConsensusPipe(req);
} else {
status = configManager.getProcedureManager().createPipe(req);
}
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn("Failed to create pipe {}. Result status: {}.", req.getPipeName(), status);
}
Expand All @@ -133,7 +139,12 @@ public TSStatus alterPipe(TAlterPipeReq req) {

/** Caller should ensure that the method is called in the lock {@link #lock()}. */
public TSStatus startPipe(String pipeName) {
final TSStatus status = configManager.getProcedureManager().startPipe(pipeName);
TSStatus status = null;
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
status = configManager.getProcedureManager().startConsensusPipe(pipeName);
} else {
status = configManager.getProcedureManager().startPipe(pipeName);
}
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn("Failed to start pipe {}. Result status: {}.", pipeName, status);
}
Expand All @@ -143,7 +154,12 @@ public TSStatus startPipe(String pipeName) {
/** Caller should ensure that the method is called in the lock {@link #lock()}. */
public TSStatus stopPipe(String pipeName) {
final boolean isStoppedByRuntimeException = pipeTaskInfo.isStoppedByRuntimeException(pipeName);
final TSStatus status = configManager.getProcedureManager().stopPipe(pipeName);
TSStatus status = null;
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
status = configManager.getProcedureManager().stopConsensusPipe(pipeName);
} else {
status = configManager.getProcedureManager().stopPipe(pipeName);
}
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
if (isStoppedByRuntimeException) {
// Even if the return status is success, it doesn't imply the success of the
Expand All @@ -164,7 +180,12 @@ public TSStatus stopPipe(String pipeName) {
public TSStatus dropPipe(TDropPipeReq req) {
final String pipeName = req.getPipeName();
final boolean isPipeExistedBeforeDrop = pipeTaskInfo.isPipeExisted(pipeName);
final TSStatus status = configManager.getProcedureManager().dropPipe(pipeName);
TSStatus status = null;
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
status = configManager.getProcedureManager().dropConsensusPipe(pipeName);
} else {
status = configManager.getProcedureManager().dropPipe(pipeName);
}
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn("Failed to drop pipe {}. Result status: {}.", pipeName, status);
}
Expand Down

0 comments on commit 7852cb6

Please sign in to comment.