Skip to content

Commit

Permalink
support table model deletion for iotv2
Browse files Browse the repository at this point in the history
  • Loading branch information
Pengzna committed Nov 21, 2024
1 parent a181a74 commit 4152d34
Show file tree
Hide file tree
Showing 14 changed files with 245 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;

import org.apache.tsfile.utils.PublicBAOS;
Expand All @@ -39,20 +40,20 @@

public class PipeConsensusDeleteNodeReq extends TPipeConsensusTransferReq {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeConsensusDeleteNodeReq.class);
private transient DeleteDataNode deleteDataNode;
private transient AbstractDeleteDataNode deleteDataNode;

private PipeConsensusDeleteNodeReq() {
// Do nothing
}

public DeleteDataNode getDeleteDataNode() {
public AbstractDeleteDataNode getDeleteDataNode() {
return deleteDataNode;
}

/////////////////////////////// Thrift ///////////////////////////////

public static PipeConsensusDeleteNodeReq toTPipeConsensusTransferReq(
DeleteDataNode deleteDataNode,
AbstractDeleteDataNode deleteDataNode,
TCommitId commitId,
TConsensusGroupId consensusGroupId,
ProgressIndex progressIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,7 +47,7 @@ public class DeletionResource implements PersistentResource {
private static final Logger LOGGER = LoggerFactory.getLogger(DeletionResource.class);
private final Consumer<DeletionResource> removeHook;
private final AtomicInteger pipeTaskReferenceCount;
private final DeleteDataNode deleteDataNode;
private final AbstractDeleteDataNode deleteDataNode;
private final ConsensusGroupId consensusGroupId;
private volatile Status currentStatus;

Expand All @@ -56,7 +56,9 @@ public class DeletionResource implements PersistentResource {
private volatile Exception cause;

public DeletionResource(
DeleteDataNode deleteDataNode, Consumer<DeletionResource> removeHook, String regionId) {
AbstractDeleteDataNode deleteDataNode,
Consumer<DeletionResource> removeHook,
String regionId) {
this.deleteDataNode = deleteDataNode;
this.removeHook = removeHook;
this.currentStatus = Status.RUNNING;
Expand Down Expand Up @@ -135,7 +137,7 @@ public long getFileEndTime() {
return 0;
}

public DeleteDataNode getDeleteDataNode() {
public AbstractDeleteDataNode getDeleteDataNode() {
return deleteDataNode;
}

Expand All @@ -147,13 +149,16 @@ public ByteBuffer serialize() {
}

public static DeletionResource deserialize(
final ByteBuffer buffer, final String regionId, final Consumer<DeletionResource> removeHook)
final ByteBuffer buffer,
final String regionId,
final Consumer<DeletionResource> removeHook,
final boolean isRelational)
throws IOException {
DeleteDataNode node = DeleteDataNode.deserializeFromDAL(buffer);
AbstractDeleteDataNode node = AbstractDeleteDataNode.deserializeFromDAL(buffer, isRelational);
return new DeletionResource(node, removeHook, regionId);
}

public static boolean isDeleteNodeGeneratedInLocalByIoTV2(DeleteDataNode node) {
public static boolean isDeleteNodeGeneratedInLocalByIoTV2(AbstractDeleteDataNode node) {
if (node.getProgressIndex() instanceof RecoverProgressIndex) {
RecoverProgressIndex recoverProgressIndex = (RecoverProgressIndex) node.getProgressIndex();
return recoverProgressIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.pipe.consensus.deletion;

import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.commons.utils.FileUtils;
Expand All @@ -30,7 +31,8 @@
import org.apache.iotdb.db.pipe.consensus.deletion.persist.DeletionBuffer;
import org.apache.iotdb.db.pipe.consensus.deletion.persist.PageCacheDeletionBuffer;
import org.apache.iotdb.db.pipe.consensus.deletion.recover.DeletionReader;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.storageengine.StorageEngine;

import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
Expand All @@ -43,6 +45,7 @@
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
Expand All @@ -61,16 +64,29 @@ public class DeletionResourceManager implements AutoCloseable {
String.format(
"^_(?<%s>\\d+)-(?<%s>\\d+)\\%s$",
REBOOT_TIME, MEM_TABLE_FLUSH_ORDER, DELETION_FILE_SUFFIX);
private static final String TABLE_NAME = "table";
private final String dataRegionId;
private final DeletionBuffer deletionBuffer;
private final File storageDir;
private final Map<DeleteDataNode, DeletionResource> deleteNode2ResourcesMap =
private final Map<AbstractDeleteDataNode, DeletionResource> deleteNode2ResourcesMap =
new ConcurrentHashMap<>();
// 1 data region -> 1 kind of database(either a table one or a tree one) -> Determined
// deleteNodes/DALs type(either table or tree)
private final Lock recoverLock = new ReentrantLock();
private final Condition recoveryReadyCondition = recoverLock.newCondition();
private volatile boolean hasCompletedRecovery = false;
private boolean isRelational;

private DeletionResourceManager(String dataRegionId) throws IOException {
// Tree model by default
this.isRelational = false;
Optional.ofNullable(
StorageEngine.getInstance()
.getDataRegion(new DataRegionId(Integer.parseInt(dataRegionId))))
.ifPresent(
dataRegion -> {
this.isRelational = dataRegion.getDatabaseName().toLowerCase().contains(TABLE_NAME);
});
this.dataRegionId = dataRegionId;
this.storageDir =
new File(
Expand Down Expand Up @@ -103,7 +119,8 @@ private void initAndRecover() throws IOException {

for (Path path : deletionPaths) {
try (DeletionReader deletionReader =
new DeletionReader(path.toFile(), dataRegionId, this::removeDeletionResource)) {
new DeletionReader(
path.toFile(), dataRegionId, this::removeDeletionResource, isRelational)) {
deletionReader
.readAllDeletions()
.forEach(
Expand Down Expand Up @@ -133,7 +150,7 @@ public void close() {
LOGGER.info("Deletion resource manager for {} has been successfully closed!", dataRegionId);
}

public DeletionResource registerDeletionResource(DeleteDataNode deleteDataNode) {
public DeletionResource registerDeletionResource(AbstractDeleteDataNode deleteDataNode) {
DeletionResource deletionResource =
deleteNode2ResourcesMap.computeIfAbsent(
deleteDataNode,
Expand All @@ -144,7 +161,7 @@ public DeletionResource registerDeletionResource(DeleteDataNode deleteDataNode)
return deletionResource;
}

public DeletionResource getDeletionResource(DeleteDataNode deleteDataNode) {
public DeletionResource getDeletionResource(AbstractDeleteDataNode deleteDataNode) {
return deleteNode2ResourcesMap.get(deleteDataNode);
}

Expand Down Expand Up @@ -305,7 +322,7 @@ public static void buildForTest() {
}

@TestOnly
public void recoverForTest() {
public void recoverForTest(boolean isRelational) {
try (Stream<Path> pathStream = Files.walk(Paths.get(storageDir.getPath()), 1)) {
Path[] deletionPaths =
pathStream
Expand All @@ -315,7 +332,8 @@ public void recoverForTest() {

for (Path path : deletionPaths) {
try (DeletionReader deletionReader =
new DeletionReader(path.toFile(), dataRegionId, this::removeDeletionResource)) {
new DeletionReader(
path.toFile(), dataRegionId, this::removeDeletionResource, isRelational)) {
deletionReader
.readAllDeletions()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ public class DeletionReader implements Closeable {
private final File logFile;
private final FileInputStream fileInputStream;
private final FileChannel fileChannel;
private final boolean isRelational;

public DeletionReader(File logFile, String regionId, Consumer<DeletionResource> removeHook)
public DeletionReader(
File logFile, String regionId, Consumer<DeletionResource> removeHook, boolean isRelational)
throws IOException {
this.logFile = logFile;
this.regionId = regionId;
this.fileInputStream = new FileInputStream(logFile);
this.fileChannel = fileInputStream.getChannel();
this.removeHook = removeHook;
this.isRelational = isRelational;
}

public List<DeletionResource> readAllDeletions() throws IOException {
Expand All @@ -76,7 +79,7 @@ public List<DeletionResource> readAllDeletions() throws IOException {

while (byteBuffer.hasRemaining()) {
DeletionResource deletionResource =
DeletionResource.deserialize(byteBuffer, regionId, removeHook);
DeletionResource.deserialize(byteBuffer, regionId, removeHook, isRelational);
deletions.add(deletionResource);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Read deletion: {} from file {}", deletionResource, logFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iotdb.commons.pipe.event.SerializableEvent;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;

import org.apache.tsfile.utils.ReadWriteIOUtils;
Expand All @@ -36,7 +37,7 @@
import java.util.Optional;

public class PipeDeleteDataNodeEvent extends EnrichedEvent implements SerializableEvent {
private DeleteDataNode deleteDataNode;
private AbstractDeleteDataNode deleteDataNode;
private DeletionResource deletionResource;
private boolean isGeneratedByPipe;
private ProgressIndex progressIndex;
Expand All @@ -47,12 +48,12 @@ public PipeDeleteDataNodeEvent() {
}

public PipeDeleteDataNodeEvent(
final DeleteDataNode deleteDataNode, final boolean isGeneratedByPipe) {
final AbstractDeleteDataNode deleteDataNode, final boolean isGeneratedByPipe) {
this(deleteDataNode, null, 0, null, null, null, isGeneratedByPipe);
}

public PipeDeleteDataNodeEvent(
final DeleteDataNode deleteDataNode,
final AbstractDeleteDataNode deleteDataNode,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
Expand All @@ -73,7 +74,7 @@ public PipeDeleteDataNodeEvent(
.ifPresent(node -> this.progressIndex = deleteDataNode.getProgressIndex());
}

public DeleteDataNode getDeleteDataNode() {
public AbstractDeleteDataNode getDeleteDataNode() {
return deleteDataNode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpochManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
Expand Down Expand Up @@ -67,7 +67,7 @@ public static PipeRealtimeEvent createRealtimeEvent(
new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null, null, null);
}

public static PipeRealtimeEvent createRealtimeEvent(final DeleteDataNode node) {
public static PipeRealtimeEvent createRealtimeEvent(final AbstractDeleteDataNode node) {
return new PipeRealtimeEvent(
new PipeDeleteDataNodeEvent(node, node.isGeneratedByPipe()), null, null, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeDataRegionAssigner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;

Expand Down Expand Up @@ -142,7 +141,8 @@ public void listenToInsertNode(
}

// TODO: record database name in enriched events?
public DeletionResource listenToDeleteData(final String regionId, final DeleteDataNode node) {
public DeletionResource listenToDeleteData(
final String regionId, final AbstractDeleteDataNode node) {
final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(regionId);
// only events from registered data region will be extracted
if (assigner == null) {
Expand All @@ -169,24 +169,13 @@ public DeletionResource listenToDeleteData(final String regionId, final DeleteDa
return deletionResource;
}

public DeletionResource listenToDeleteData(
final String regionId, final RelationalDeleteDataNode node) {
// TODO: implement
return null;
}

public void listenToHeartbeat(boolean shouldPrintMessage) {
dataRegionId2Assigner.forEach(
(key, value) ->
value.publishToAssign(
PipeRealtimeEventFactory.createRealtimeEvent(key, shouldPrintMessage)));
}

public void listenToDeleteData(DeleteDataNode node) {
dataRegionId2Assigner.forEach(
(key, value) -> value.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(node)));
}

/////////////////////////////// singleton ///////////////////////////////

private PipeInsertionDataNodeListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusReceiverMetrics;
import org.apache.iotdb.db.pipe.event.common.tsfile.aggregator.TsFileInsertionPointCounter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
Expand Down Expand Up @@ -312,7 +312,7 @@ private TPipeConsensusTransferResp handleTransferDeletion(final PipeConsensusDel
PipeConsensusServerImpl impl =
Optional.ofNullable(pipeConsensus.getImpl(consensusGroupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(consensusGroupId));
final DeleteDataNode planNode = req.getDeleteDataNode();
final AbstractDeleteDataNode planNode = req.getDeleteDataNode();
planNode.markAsGeneratedByRemoteConsensusLeader();
planNode.setProgressIndex(
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
*
* <p>2.{@link DataExecutionVisitor}, to actually write data on data region and mark it as received
* from pipe.
*
* <p>TODO: support relational deleteNode
*/
public class PipeEnrichedDeleteDataNode extends DeleteDataNode {

Expand Down
Loading

0 comments on commit 4152d34

Please sign in to comment.