Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PipeConsensus: Support deletion operation for PipeConsensus #13016

Merged
merged 98 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 83 commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
79596ec
complete deletion support for pipeconsensus, todo: deletion GC and hi…
Pengzna Jul 24, 2024
ac53ff6
support transfer and receive of deletion
Pengzna Jul 24, 2024
1271ce5
fix ut fail
Pengzna Jul 24, 2024
d876131
minor fix WAL close time
Pengzna Aug 30, 2024
b0aecb8
prepare for deletion ahead log
Pengzna Aug 30, 2024
35d893d
sync, serialize and recover for deletion ahead log
Pengzna Aug 30, 2024
17aaa74
remove useless
Pengzna Aug 30, 2024
464191b
fix
Pengzna Aug 30, 2024
e66ed10
fix
Pengzna Aug 30, 2024
8d32e06
historical extraction for deletion
Pengzna Aug 30, 2024
5b85c76
add config of deletion dirs
Pengzna Aug 31, 2024
eff3507
add base ut
Pengzna Aug 31, 2024
47a335d
fix deletion read
Pengzna Sep 1, 2024
56f5c55
fix deserialize logic
Pengzna Sep 1, 2024
4c45c33
add recover ut
Pengzna Sep 1, 2024
ee9c40a
remove duplicate
Pengzna Sep 1, 2024
d115cdc
remove ut
Pengzna Sep 1, 2024
21252bd
Merge remote-tracking branch 'refs/remotes/base/master' into deletion
Pengzna Sep 1, 2024
bc14898
fix conflict
Pengzna Sep 1, 2024
13bbbc0
spotless
Pengzna Sep 1, 2024
f57b8a9
avoid Concurrent issue
Pengzna Sep 2, 2024
464c136
fix concurrent issue
Pengzna Sep 2, 2024
3f629f1
fix
Pengzna Sep 2, 2024
1468eab
fix
Pengzna Sep 2, 2024
8a47b93
deletion close
Pengzna Sep 2, 2024
848c82d
add log
Pengzna Sep 2, 2024
fa0780c
format
Pengzna Sep 2, 2024
edca176
spotless
Pengzna Sep 2, 2024
19a4471
fix deserialize
Pengzna Sep 2, 2024
2262724
keep config consistent with wal
Pengzna Sep 3, 2024
e60bea6
fix review
Pengzna Sep 5, 2024
360637a
introduce page cache version DAL and fix review
Pengzna Sep 10, 2024
d8743fc
remove and recover for new version of DAL
Pengzna Sep 10, 2024
19216ca
historical data process
Pengzna Sep 10, 2024
7835d8b
add ut
Pengzna Sep 10, 2024
f457fe7
Merge remote-tracking branch 'refs/remotes/base/master' into deletion
Pengzna Sep 10, 2024
cadbaab
merge master
Pengzna Sep 10, 2024
baa71f8
fix bug and ut
Pengzna Sep 11, 2024
e251c43
remove metadata(deletionNum)
Pengzna Sep 11, 2024
afe925b
Merge remote-tracking branch 'refs/remotes/base/master' into deletion
Pengzna Sep 11, 2024
c3d3460
merge
Pengzna Sep 11, 2024
fb8a029
handle corrupted file
Pengzna Sep 11, 2024
30f7956
add ut
Pengzna Sep 11, 2024
b514c97
add license
Pengzna Sep 11, 2024
56ff4bc
retrigger ci
Pengzna Sep 12, 2024
dc30eee
Merge remote-tracking branch 'refs/remotes/base/master' into deletion
Pengzna Sep 13, 2024
3c7d48d
fix review
Pengzna Sep 14, 2024
b4de6d8
fix review
Pengzna Sep 18, 2024
b7e9ca3
introduce PipeDeleteDataNodeEvent & fix review
Pengzna Sep 19, 2024
434f103
fix review: bind pipe task's corresponding event for deletionResource
Pengzna Sep 19, 2024
7e94141
fix review: deletion will only be assigned to corresponding dataRegio…
Pengzna Sep 19, 2024
e13b3e7
fix review
Pengzna Sep 20, 2024
535150e
fix
Pengzna Sep 20, 2024
0b39c8b
feat: timout sync
Pengzna Sep 21, 2024
4045de7
todo: priority queue sort
Pengzna Sep 23, 2024
22a3ca6
fix ut
Pengzna Sep 23, 2024
8bb6cc9
support deletion for consensus pipe
Pengzna Sep 23, 2024
1508215
support new deletion event and replace schemaEvent
Pengzna Sep 23, 2024
f1d052c
Merge remote-tracking branch 'refs/remotes/base/master' into deletion
Pengzna Sep 23, 2024
872f135
spotless
Pengzna Sep 23, 2024
587c7df
rename pipeConsensus to iotV2
Pengzna Sep 23, 2024
85fef64
fix: reference management for deletionResource
Pengzna Sep 25, 2024
7062c37
fix: reference management for deletionResource
Pengzna Sep 25, 2024
127e02b
fix: ut
Pengzna Sep 25, 2024
486ec81
add log
Pengzna Sep 25, 2024
845c196
fix ut
Pengzna Sep 25, 2024
ec11267
fix ut
Pengzna Sep 27, 2024
d6daaab
fix review
Pengzna Sep 29, 2024
e7b64d2
delete TwoStageDeletionBuffer.java
Pengzna Sep 29, 2024
f65fcde
fix: load tsFile's progress index datanode_id can be positive
Pengzna Sep 29, 2024
4121175
fix ut
Pengzna Sep 30, 2024
80e9191
refactor
SteveYurongSu Sep 30, 2024
3be53ab
Merge branch 'master' of github.com:apache/iotdb into pr/13016
SteveYurongSu Sep 30, 2024
004663e
Update PipeDataRegionAssigner.java
SteveYurongSu Sep 30, 2024
1fb8853
add comment
Pengzna Sep 30, 2024
0f6858c
Merge remote-tracking branch 'refs/remotes/origin/deletion' into dele…
Pengzna Sep 30, 2024
26443e8
Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
SteveYurongSu Sep 30, 2024
8d49608
Merge branch 'deletion' of https://github.com/Pengzna/iotdb into pr/1…
SteveYurongSu Sep 30, 2024
fa2a329
Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
SteveYurongSu Sep 30, 2024
f194928
Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
SteveYurongSu Sep 30, 2024
bae8bfe
Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
SteveYurongSu Sep 30, 2024
a9d13fe
Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
SteveYurongSu Sep 30, 2024
fe255f9
Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
SteveYurongSu Sep 30, 2024
d93ec4c
fix review
Pengzna Oct 8, 2024
86d2de5
Merge remote-tracking branch 'refs/remotes/origin/deletion' into dele…
Pengzna Oct 8, 2024
a8d51d2
merge
Pengzna Oct 8, 2024
bf7982e
fix review: manage deletion resource's reference count when restart/h…
Pengzna Oct 11, 2024
e835d84
Merge remote-tracking branch 'refs/remotes/base/master' into deletion
Pengzna Oct 11, 2024
01d3cb9
adjust table model for PipeDeleteDataNodeEvent & spotless
Pengzna Oct 11, 2024
a3368c5
fix review: improve ut and reference count
Pengzna Oct 12, 2024
6c3602d
fix review: delete DAL when time is proper
Pengzna Oct 16, 2024
e931369
Merge remote-tracking branch 'refs/remotes/base/master' into deletion
Pengzna Oct 16, 2024
97b699d
merge remote
Pengzna Oct 16, 2024
6d51a19
Merge remote-tracking branch 'refs/remotes/base/master' into deletion
Pengzna Oct 18, 2024
ff3c9fc
merge remote
Pengzna Oct 18, 2024
86faeca
Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
SteveYurongSu Oct 18, 2024
6fe76f9
Merge branch 'master' of https://github.com/apache/iotdb into pr/13016
SteveYurongSu Oct 18, 2024
47813af
resolve conflicts
SteveYurongSu Oct 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_INCLUSION_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_KEY;

public class ConsensusPipeManager {
// Extract data.insert and data.delete to support deletion.
private static final String CONSENSUS_EXTRACTOR_INCLUSION_VALUE = "data";
private final PipeConsensusConfig.Pipe config;
private final ReplicateMode replicateMode;
private final ConsensusPipeDispatcher dispatcher;
Expand All @@ -61,6 +64,7 @@ public void createConsensusPipe(Peer senderPeer, Peer receiverPeer) throws Excep
consensusPipeName.toString(),
ImmutableMap.<String, String>builder()
.put(EXTRACTOR_KEY, config.getExtractorPluginName())
.put(EXTRACTOR_INCLUSION_KEY, CONSENSUS_EXTRACTOR_INCLUSION_VALUE)
.put(
EXTRACTOR_CONSENSUS_GROUP_ID_KEY,
consensusPipeName.getConsensusGroupId().toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import static org.apache.iotdb.commons.conf.IoTDBConstant.CONSENSUS_FOLDER_NAME;
import static org.apache.iotdb.commons.conf.IoTDBConstant.DELETION_FOLDER_NAME;
import static org.apache.iotdb.commons.conf.IoTDBConstant.OBJECT_STORAGE_DIR;
import static org.apache.iotdb.commons.conf.IoTDBConstant.PIPE_FOLDER_NAME;
import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;

public class IoTDBConfig {
Expand Down Expand Up @@ -1136,6 +1139,14 @@ public class IoTDBConfig {
private int iotConsensusV2PipelineSize = 5;
private String iotConsensusV2Mode = ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE;
private String[] iotConsensusV2ReceiverFileDirs = new String[0];
private String iotConsensusV2DeletionFileDir =
systemDir
+ File.separator
+ PIPE_FOLDER_NAME
+ File.separator
+ CONSENSUS_FOLDER_NAME
+ File.separator
+ DELETION_FOLDER_NAME;

/** Load related */
private double maxAllocateMemoryRatioForLoad = 0.8;
Expand Down Expand Up @@ -1340,6 +1351,7 @@ private void formulateFolders() {
systemDir = addDataHomeDir(systemDir);
schemaDir = addDataHomeDir(schemaDir);
consensusDir = addDataHomeDir(consensusDir);
iotConsensusV2DeletionFileDir = addDataHomeDir(iotConsensusV2DeletionFileDir);
Pengzna marked this conversation as resolved.
Show resolved Hide resolved
dataRegionConsensusDir = addDataHomeDir(dataRegionConsensusDir);
ratisDataRegionSnapshotDir = addDataHomeDir(ratisDataRegionSnapshotDir);
schemaRegionConsensusDir = addDataHomeDir(schemaRegionConsensusDir);
Expand All @@ -1362,6 +1374,7 @@ private void formulateFolders() {
for (int i = 0; i < iotConsensusV2ReceiverFileDirs.length; i++) {
iotConsensusV2ReceiverFileDirs[i] = addDataHomeDir(iotConsensusV2ReceiverFileDirs[i]);
}
iotConsensusV2DeletionFileDir = addDataHomeDir(iotConsensusV2DeletionFileDir);
mqttDir = addDataHomeDir(mqttDir);
extPipeDir = addDataHomeDir(extPipeDir);
queryDir = addDataHomeDir(queryDir);
Expand Down Expand Up @@ -1539,6 +1552,14 @@ public void setSystemDir(String systemDir) {
this.systemDir = systemDir;
}

public String getIotConsensusV2DeletionFileDir() {
return iotConsensusV2DeletionFileDir;
}

public void setIotConsensusV2DeletionFileDir(String iotConsensusV2DeletionFileDir) {
this.iotConsensusV2DeletionFileDir = iotConsensusV2DeletionFileDir;
}

public String[] getLoadTsFileDirs() {
return this.loadTsFileDirs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2503,6 +2503,10 @@ private void loadPipeProps(Properties properties) {
.split(","))
.filter(dir -> !dir.isEmpty())
.toArray(String[]::new));

conf.setIotConsensusV2DeletionFileDir(
properties.getProperty(
"iot_consensus_v2_deletion_file_dir", conf.getIotConsensusV2DeletionFileDir()));
}

private void loadCQProps(Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeDispatcher;
import org.apache.iotdb.db.pipe.consensus.ConsensusPipeDataNodeRuntimeAgentGuardian;
import org.apache.iotdb.db.pipe.consensus.ProgressIndexDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;

Expand Down Expand Up @@ -82,6 +83,7 @@ private static class DataRegionConsensusImplHolder {
static {
reinitializeStatics();
PipeDataNodeAgent.receiver().pipeConsensus().initConsensusInRuntime();
DeletionResourceManager.build();
}

private static void reinitializeStatics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.WriteProcessRejectException;
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
Expand Down Expand Up @@ -249,18 +248,12 @@ public TSStatus visitDeleteData(DeleteDataNode node, DataRegion dataRegion) {
"now try to delete directly, databasePath: {}, deletePath:{}",
databaseToDelete.getFullPath(),
path.getFullPath());
dataRegion.deleteDataDirectly(
databaseToDelete,
node.getDeleteStartTime(),
node.getDeleteEndTime(),
node.getSearchIndex());
dataRegion.deleteDataDirectly(databaseToDelete, node);
} else {
dataRegion.deleteByDevice(
path, node.getDeleteStartTime(), node.getDeleteEndTime(), node.getSearchIndex());
dataRegion.deleteByDevice(path, node);
}
}
dataRegion.insertSeparatorToWAL();
PipeInsertionDataNodeListener.getInstance().listenToDeleteData(node);
return StatusUtils.OK;
} catch (IOException | IllegalPathException e) {
LOGGER.error("Error in executing plan node: {}", node, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public RecoverProgressIndex getNextProgressIndexForTsFileLoad() {
// because the load directly sends the tsfile to all replicas. Therefore, it is necessary to
// ensure that the datanode id generated by the load is negative.
return new RecoverProgressIndex(
-DATA_NODE_ID, simpleProgressIndexAssigner.getSimpleProgressIndex());
DATA_NODE_ID, simpleProgressIndexAssigner.getSimpleProgressIndex());
SteveYurongSu marked this conversation as resolved.
Show resolved Hide resolved
}

////////////////////// Recover ProgressIndex Assigner //////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,14 @@

import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.pipe.api.collector.EventCollector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
Expand Down Expand Up @@ -78,12 +75,8 @@ public void collect(final Event event) {
parseAndCollectEvent((PipeRawTabletInsertionEvent) event);
} else if (event instanceof PipeTsFileInsertionEvent) {
parseAndCollectEvent((PipeTsFileInsertionEvent) event);
} else if (event instanceof PipeSchemaRegionWritePlanEvent
&& ((PipeSchemaRegionWritePlanEvent) event).getPlanNode().getType()
== PlanNodeType.DELETE_DATA) {
// This is only for delete data node in data region since plan nodes in schema regions are
// already parsed in schema region extractor
parseAndCollectEvent((PipeSchemaRegionWritePlanEvent) event);
} else if (event instanceof PipeDeleteDataNodeEvent) {
collectEvent(event);
} else if (!(event instanceof ProgressReportEvent)) {
collectEvent(event);
}
Expand Down Expand Up @@ -141,27 +134,6 @@ private void collectParsedRawTableEvent(final PipeRawTabletInsertionEvent parsed
}
}

private void parseAndCollectEvent(final PipeSchemaRegionWritePlanEvent deleteDataEvent) {
// Only used by events containing delete data node, no need to bind progress index here since
// delete data event does not have progress index currently
IoTDBSchemaRegionExtractor.PATTERN_PARSE_VISITOR
.process(deleteDataEvent.getPlanNode(), (IoTDBPipePattern) deleteDataEvent.getPipePattern())
.map(
planNode ->
new PipeSchemaRegionWritePlanEvent(
planNode,
deleteDataEvent.getPipeName(),
deleteDataEvent.getCreationTime(),
deleteDataEvent.getPipeTaskMeta(),
deleteDataEvent.getPipePattern(),
deleteDataEvent.isGeneratedByPipe()))
.ifPresent(
event -> {
hasNoGeneratedEvent = false;
collectEvent(event);
});
}

private void collectEvent(final Event event) {
if (event instanceof EnrichedEvent) {
if (!((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.iotdb.db.pipe.connector.protocol.airgap;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
Expand All @@ -29,11 +28,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -108,43 +103,4 @@ protected byte[] generateHandShakeV2Payload() throws IOException {

return PipeTransferDataNodeHandshakeV2Req.toTPipeTransferBytes(params);
}

protected void doTransferWrapper(
final AirGapSocket socket,
final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
throws PipeException, IOException {
// We increase the reference count for this event to determine if the event may be released.
if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount(
IoTDBDataNodeAirGapConnector.class.getName())) {
return;
}
try {
doTransfer(socket, pipeSchemaRegionWritePlanEvent);
} finally {
pipeSchemaRegionWritePlanEvent.decreaseReferenceCount(
IoTDBDataNodeAirGapConnector.class.getName(), false);
}
}

private void doTransfer(
final AirGapSocket socket,
final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
throws PipeException, IOException {
if (!send(
pipeSchemaRegionWritePlanEvent.getPipeName(),
pipeSchemaRegionWritePlanEvent.getCreationTime(),
socket,
PipeTransferPlanNodeReq.toTPipeTransferBytes(
pipeSchemaRegionWritePlanEvent.getPlanNode()))) {
final String errorMessage =
String.format(
"Transfer data node write plan %s error. Socket: %s.",
pipeSchemaRegionWritePlanEvent.getPlanNode().getType(), socket);
receiverStatusHandler.handle(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
pipeSchemaRegionWritePlanEvent.toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
Expand Down Expand Up @@ -128,8 +129,8 @@ public void transfer(final Event event) throws Exception {
final AirGapSocket socket = sockets.get(socketIndex);

try {
if (event instanceof PipeSchemaRegionWritePlanEvent) {
doTransferWrapper(socket, (PipeSchemaRegionWritePlanEvent) event);
if (event instanceof PipeDeleteDataNodeEvent) {
doTransferWrapper(socket, (PipeDeleteDataNodeEvent) event);
} else if (!(event instanceof PipeHeartbeatEvent || event instanceof PipeTerminateEvent)) {
LOGGER.warn(
"IoTDBDataRegionAirGapConnector does not support transferring generic event: {}.",
Expand All @@ -141,11 +142,48 @@ public void transfer(final Event event) throws Exception {
throw new PipeConnectionException(
String.format(
"Network error when transfer tsfile event %s, because %s.",
((PipeSchemaRegionWritePlanEvent) event).coreReportMessage(), e.getMessage()),
((PipeDeleteDataNodeEvent) event).coreReportMessage(), e.getMessage()),
e);
}
}

private void doTransferWrapper(
final AirGapSocket socket, final PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent)
throws PipeException, IOException {
// We increase the reference count for this event to determine if the event may be released.
if (!pipeDeleteDataNodeEvent.increaseReferenceCount(
IoTDBDataNodeAirGapConnector.class.getName())) {
return;
}
try {
doTransfer(socket, pipeDeleteDataNodeEvent);
} finally {
pipeDeleteDataNodeEvent.decreaseReferenceCount(
IoTDBDataNodeAirGapConnector.class.getName(), false);
}
}

private void doTransfer(
final AirGapSocket socket, final PipeDeleteDataNodeEvent pipeDeleteDataNodeEvent)
throws PipeException, IOException {
if (!send(
pipeDeleteDataNodeEvent.getPipeName(),
pipeDeleteDataNodeEvent.getCreationTime(),
socket,
PipeTransferPlanNodeReq.toTPipeTransferBytes(
pipeDeleteDataNodeEvent.getDeleteDataNode()))) {
final String errorMessage =
String.format(
"Transfer deletion %s error. Socket: %s.",
pipeDeleteDataNodeEvent.getDeleteDataNode().getType(), socket);
receiverStatusHandler.handle(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
pipeDeleteDataNodeEvent.toString());
}
}

private void doTransferWrapper(
final AirGapSocket socket,
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
Expand Down
Loading
Loading