Skip to content

Commit

Permalink
PipeConsensus: Support deletion operation for PipeConsensus (#13016)
Browse files Browse the repository at this point in the history
Co-authored-by: Steve Yurong Su <[email protected]>
  • Loading branch information
Pengzna and SteveYurongSu authored Oct 18, 2024
1 parent 0d8ad0c commit 4a1b083
Show file tree
Hide file tree
Showing 66 changed files with 2,872 additions and 532 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_INCLUSION_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_KEY;

public class ConsensusPipeManager {
// Extract data.insert and data.delete to support deletion.
private static final String CONSENSUS_EXTRACTOR_INCLUSION_VALUE = "data";
private final PipeConsensusConfig.Pipe config;
private final ReplicateMode replicateMode;
private final ConsensusPipeDispatcher dispatcher;
Expand All @@ -61,6 +64,7 @@ public void createConsensusPipe(Peer senderPeer, Peer receiverPeer) throws Excep
consensusPipeName.toString(),
ImmutableMap.<String, String>builder()
.put(EXTRACTOR_KEY, config.getExtractorPluginName())
.put(EXTRACTOR_INCLUSION_KEY, CONSENSUS_EXTRACTOR_INCLUSION_VALUE)
.put(
EXTRACTOR_CONSENSUS_GROUP_ID_KEY,
consensusPipeName.getConsensusGroupId().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import static org.apache.iotdb.commons.conf.IoTDBConstant.CONSENSUS_FOLDER_NAME;
import static org.apache.iotdb.commons.conf.IoTDBConstant.DELETION_FOLDER_NAME;
import static org.apache.iotdb.commons.conf.IoTDBConstant.OBJECT_STORAGE_DIR;
import static org.apache.iotdb.commons.conf.IoTDBConstant.PIPE_FOLDER_NAME;
import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;

public class IoTDBConfig {
Expand Down Expand Up @@ -1144,6 +1147,14 @@ public class IoTDBConfig {
private int iotConsensusV2PipelineSize = 5;
private String iotConsensusV2Mode = ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE;
private String[] iotConsensusV2ReceiverFileDirs = new String[0];
private String iotConsensusV2DeletionFileDir =
systemDir
+ File.separator
+ PIPE_FOLDER_NAME
+ File.separator
+ CONSENSUS_FOLDER_NAME
+ File.separator
+ DELETION_FOLDER_NAME;

/** Load related */
private double maxAllocateMemoryRatioForLoad = 0.8;
Expand Down Expand Up @@ -1348,6 +1359,7 @@ private void formulateFolders() {
systemDir = addDataHomeDir(systemDir);
schemaDir = addDataHomeDir(schemaDir);
consensusDir = addDataHomeDir(consensusDir);
iotConsensusV2DeletionFileDir = addDataHomeDir(iotConsensusV2DeletionFileDir);
dataRegionConsensusDir = addDataHomeDir(dataRegionConsensusDir);
ratisDataRegionSnapshotDir = addDataHomeDir(ratisDataRegionSnapshotDir);
schemaRegionConsensusDir = addDataHomeDir(schemaRegionConsensusDir);
Expand All @@ -1370,6 +1382,7 @@ private void formulateFolders() {
for (int i = 0; i < iotConsensusV2ReceiverFileDirs.length; i++) {
iotConsensusV2ReceiverFileDirs[i] = addDataHomeDir(iotConsensusV2ReceiverFileDirs[i]);
}
iotConsensusV2DeletionFileDir = addDataHomeDir(iotConsensusV2DeletionFileDir);
mqttDir = addDataHomeDir(mqttDir);
extPipeDir = addDataHomeDir(extPipeDir);
queryDir = addDataHomeDir(queryDir);
Expand Down Expand Up @@ -1547,6 +1560,14 @@ public void setSystemDir(String systemDir) {
this.systemDir = systemDir;
}

public String getIotConsensusV2DeletionFileDir() {
return iotConsensusV2DeletionFileDir;
}

public void setIotConsensusV2DeletionFileDir(String iotConsensusV2DeletionFileDir) {
this.iotConsensusV2DeletionFileDir = iotConsensusV2DeletionFileDir;
}

public String[] getLoadTsFileDirs() {
return this.loadTsFileDirs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2536,6 +2536,10 @@ private void loadPipeProps(Properties properties) {
.split(","))
.filter(dir -> !dir.isEmpty())
.toArray(String[]::new));

conf.setIotConsensusV2DeletionFileDir(
properties.getProperty(
"iot_consensus_v2_deletion_file_dir", conf.getIotConsensusV2DeletionFileDir()));
}

private void loadCQProps(Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeDispatcher;
import org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeRuntimeAgentGuardian;
import org.apache.iotdb.db.pipe.consensus.ProgressIndexDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;

Expand Down Expand Up @@ -82,6 +83,7 @@ private static class DataRegionConsensusImplHolder {
static {
reinitializeStatics();
PipeDataNodeAgent.receiver().pipeConsensus().initConsensusInRuntime();
DeletionResourceManager.build();
}

private static void reinitializeStatics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
Expand Down Expand Up @@ -252,18 +251,12 @@ public TSStatus visitDeleteData(DeleteDataNode node, DataRegion dataRegion) {
"now try to delete directly, databasePath: {}, deletePath:{}",
databaseToDelete.getFullPath(),
path.getFullPath());
dataRegion.deleteDataDirectly(
databaseToDelete,
node.getDeleteStartTime(),
node.getDeleteEndTime(),
node.getSearchIndex());
dataRegion.deleteDataDirectly(databaseToDelete, node);
} else {
dataRegion.deleteByDevice(
path, node.getDeleteStartTime(), node.getDeleteEndTime(), node.getSearchIndex());
dataRegion.deleteByDevice(path, node);
}
}
dataRegion.insertSeparatorToWAL();
PipeInsertionDataNodeListener.getInstance().listenToDeleteData(node);
return StatusUtils.OK;
} catch (IOException | IllegalPathException e) {
LOGGER.error("Error in executing plan node: {}", node, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,8 @@ public void assignProgressIndexForTsFileLoad(TsFileResource tsFileResource) {
}

public RecoverProgressIndex getNextProgressIndexForTsFileLoad() {
// The data node id must be negative. Because in the consensus layer Pipe, the processor of the
// pipe task will filter the progress index of the event. Leaving the recover progress index
// containing the datanode id of the datanode where the pipe task is located. The tsfile loaded
// in cannot be filtered by the processor of the datanode where the pipe task is located,
// because the load directly sends the tsfile to all replicas. Therefore, it is necessary to
// ensure that the datanode id generated by the load is negative.
return new RecoverProgressIndex(
-DATA_NODE_ID, simpleProgressIndexAssigner.getSimpleProgressIndex());
DATA_NODE_ID, simpleProgressIndexAssigner.getSimpleProgressIndex());
}

////////////////////// Recover ProgressIndex Assigner //////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,14 @@

import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
Expand Down Expand Up @@ -78,12 +75,8 @@ public void collect(final Event event) {
parseAndCollectEvent((PipeRawTabletInsertionEvent) event);
} else if (event instanceof PipeTsFileInsertionEvent) {
parseAndCollectEvent((PipeTsFileInsertionEvent) event);
} else if (event instanceof PipeSchemaRegionWritePlanEvent
&& ((PipeSchemaRegionWritePlanEvent) event).getPlanNode().getType()
== PlanNodeType.DELETE_DATA) {
// This is only for delete data node in data region since plan nodes in schema regions are
// already parsed in schema region extractor
parseAndCollectEvent((PipeSchemaRegionWritePlanEvent) event);
} else if (event instanceof PipeDeleteDataNodeEvent) {
collectEvent(event);
} else if (!(event instanceof ProgressReportEvent)) {
collectEvent(event);
}
Expand Down Expand Up @@ -145,28 +138,6 @@ private void collectParsedRawTableEvent(final PipeRawTabletInsertionEvent parsed
}
}

private void parseAndCollectEvent(final PipeSchemaRegionWritePlanEvent deleteDataEvent) {
// Only used by events containing delete data node, no need to bind progress index here since
// delete data event does not have progress index currently
IoTDBSchemaRegionExtractor.PATTERN_PARSE_VISITOR
.process(deleteDataEvent.getPlanNode(), (IoTDBTreePattern) deleteDataEvent.getTreePattern())
.map(
planNode ->
new PipeSchemaRegionWritePlanEvent(
planNode,
deleteDataEvent.getPipeName(),
deleteDataEvent.getCreationTime(),
deleteDataEvent.getPipeTaskMeta(),
deleteDataEvent.getTreePattern(),
deleteDataEvent.getTablePattern(),
deleteDataEvent.isGeneratedByPipe()))
.ifPresent(
event -> {
hasNoGeneratedEvent = false;
collectEvent(event);
});
}

private void collectEvent(final Event event) {
if (event instanceof EnrichedEvent) {
if (!((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.iotdb.db.pipe.connector.protocol.airgap;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
Expand All @@ -29,11 +28,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -108,43 +103,4 @@ protected byte[] generateHandShakeV2Payload() throws IOException {

return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
}

protected void doTransferWrapper(
final AirGapSocket socket,
final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
throws PipeException, IOException {
// We increase the reference count for this event to determine if the event may be released.
if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount(
IoTDBDataNodeAirGapConnector.class.getName())) {
return;
}
try {
doTransfer(socket, pipeSchemaRegionWritePlanEvent);
} finally {
pipeSchemaRegionWritePlanEvent.decreaseReferenceCount(
IoTDBDataNodeAirGapConnector.class.getName(), false);
}
}

private void doTransfer(
final AirGapSocket socket,
final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
throws PipeException, IOException {
if (!send(
pipeSchemaRegionWritePlanEvent.getPipeName(),
pipeSchemaRegionWritePlanEvent.getCreationTime(),
socket,
PipeTransferPlanNodeReq.toTPipeTransferBytes(
pipeSchemaRegionWritePlanEvent.getPlanNode()))) {
final String errorMessage =
String.format(
"Transfer data node write plan %s error. Socket: %s.",
pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), socket);
receiverStatusHandler.handle(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
pipeSchemaRegionWritePlanEvent.toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
Expand Down Expand Up @@ -127,8 +128,8 @@ public void transfer(final Event event) throws Exception {
final AirGapSocket socket = sockets.get(socketIndex);

try {
if (event instanceof PipeSchemaRegionWritePlanEvent) {
doTransferWrapper(socket, (PipeSchemaRegionWritePlanEvent) event);
if (event instanceof PipeDeleteDataNodeEvent) {
doTransferWrapper(socket, (PipeDeleteDataNodeEvent) event);
} else if (!(event instanceof PipeHeartbeatEvent || event instanceof PipeTerminateEvent)) {
LOGGER.warn(
"IoTDBDataRegionAirGapConnector does not support transferring generic event: {}.",
Expand All @@ -140,11 +141,48 @@ public void transfer(final Event event) throws Exception {
throw new PipeConnectionException(
String.format(
"Network error when transfer tsfile event %s, because %s.",
((PipeSchemaRegionWritePlanEvent) event).coreReportMessage(), e.getMessage()),
((PipeDeleteDataNodeEvent) event).coreReportMessage(), e.getMessage()),
e);
}
}

private void doTransferWrapper(
final AirGapSocket socket, final PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent)
throws PipeException, IOException {
// We increase the reference count for this event to determine if the event may be released.
if (!pipeDeleteDataNodeEvent.increaseReferenceCount(
IoTDBDataNodeAirGapConnector.class.getName())) {
return;
}
try {
doTransfer(socket, pipeDeleteDataNodeEvent);
} finally {
pipeDeleteDataNodeEvent.decreaseReferenceCount(
IoTDBDataNodeAirGapConnector.class.getName(), false);
}
}

private void doTransfer(
final AirGapSocket socket, final PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent)
throws PipeException, IOException {
if (!send(
pipeDeleteDataNodeEvent.getPipeName(),
pipeDeleteDataNodeEvent.getCreationTime(),
socket,
PipeTransferPlanNodeReq.toTPipeTransferBytes(
pipeDeleteDataNodeEvent.getDeleteDataNode()))) {
final String errorMessage =
String.format(
"Transfer deletion %s error. Socket: %s.",
pipeDeleteDataNodeEvent.getDeleteDataNode().getType(), socket);
receiverStatusHandler.handle(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
pipeDeleteDataNodeEvent.toString());
}
}

private void doTransferWrapper(
final AirGapSocket socket,
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
Expand Down
Loading

0 comments on commit 4a1b083

Please sign in to comment.