From f9d3af97469f607d54181b870ca88105845b2644 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 25 Sep 2024 20:37:13 +0800 Subject: [PATCH 1/4] May fix --- .../runtime/PipeConfigNodeRuntimeAgent.java | 13 ++++---- .../runtime/PipeConfigRegionListener.java | 7 ++-- .../confignode/persistence/pipe/PipeInfo.java | 33 +++++++++++++++---- .../listening/AbstractPipeListeningQueue.java | 1 + 4 files changed, 38 insertions(+), 16 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java index c3262c681818..cc6056522ef8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java @@ -35,7 +35,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; public class PipeConfigNodeRuntimeAgent implements IService { @@ -90,12 +89,13 @@ public ConfigRegionListeningQueue listener() { return regionListener.listener(); } - public void increaseListenerReference(PipeParameters parameters) throws IllegalPathException { + public void increaseListenerReference(final PipeParameters parameters) + throws IllegalPathException { regionListener.increaseReference(parameters); } - public void decreaseListenerReference(PipeParameters parameters) - throws IllegalPathException, IOException { + public void decreaseListenerReference(final PipeParameters parameters) + throws IllegalPathException { regionListener.decreaseReference(parameters); } @@ -120,7 +120,7 @@ public boolean isLeaderReady() { //////////////////////////// Runtime Exception Handlers //////////////////////////// - public void report(EnrichedEvent event, PipeRuntimeException pipeRuntimeException) { + public void report(final EnrichedEvent event, final PipeRuntimeException pipeRuntimeException) { if (event.getPipeTaskMeta() != null) { report(event.getPipeTaskMeta(), pipeRuntimeException); } else { @@ -128,7 +128,8 @@ public void report(EnrichedEvent event, PipeRuntimeException pipeRuntimeExceptio } } - private void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeException) { + private void report( + final PipeTaskMeta pipeTaskMeta, final PipeRuntimeException pipeRuntimeException) { LOGGER.warn( "Report PipeRuntimeException to local PipeTaskMeta({}), exception message: {}", pipeTaskMeta, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java index 458ed1c57a82..7c8bcca9f366 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigRegionListener.java @@ -24,7 +24,6 @@ import org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningQueue; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; -import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; public class PipeConfigRegionListener { @@ -38,7 +37,7 @@ public synchronized ConfigRegionListeningQueue listener() { return listeningQueue; } - public synchronized void increaseReference(PipeParameters parameters) + public synchronized void increaseReference(final PipeParameters parameters) throws IllegalPathException { if (!ConfigRegionListeningFilter.parseListeningPlanTypeSet(parameters).isEmpty()) { listeningQueueReferenceCount++; @@ -48,8 +47,8 @@ public synchronized void increaseReference(PipeParameters parameters) } } - public synchronized void decreaseReference(PipeParameters parameters) - throws IllegalPathException, IOException { + public synchronized void decreaseReference(final PipeParameters parameters) + throws IllegalPathException { if (!ConfigRegionListeningFilter.parseListeningPlanTypeSet(parameters).isEmpty()) { listeningQueueReferenceCount--; if (listeningQueueReferenceCount == 0) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java index fff581646262..b80070c1d217 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeInfo.java @@ -159,14 +159,35 @@ public TSStatus dropPipe(final DropPipePlanV2 plan) { public TSStatus alterPipe(final AlterPipePlanV2 plan) { try { + final Optional pipeMetaBeforeAlter = + Optional.ofNullable( + pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeStaticMeta().getPipeName())); + pipeTaskInfo.alterPipe(plan); - PipeConfigNodeAgent.task() - .handleSinglePipeMetaChanges( - pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeStaticMeta().getPipeName())); - PipeTemporaryMetaMetrics.getInstance() - .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList()); - return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + final TPushPipeMetaRespExceptionMessage message = + PipeConfigNodeAgent.task() + .handleSinglePipeMetaChanges( + pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeStaticMeta().getPipeName())); + if (message == null) { + pipeMetaBeforeAlter.ifPresent( + meta -> { + try { + PipeConfigNodeAgent.runtime() + .decreaseListenerReference(meta.getStaticMeta().getExtractorParameters()); + } catch (final Exception e) { + throw new PipeException("Failed to decrease listener reference", e); + } + }); + PipeConfigNodeAgent.runtime() + .increaseListenerReference(plan.getPipeStaticMeta().getExtractorParameters()); + PipeTemporaryMetaMetrics.getInstance() + .handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList()); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } else { + return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()) + .setMessage(message.getMessage()); + } } catch (final Exception e) { LOGGER.error("Failed to alter pipe", e); return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode()) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java index c5d83845f5cc..d1beddc3549b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java @@ -82,6 +82,7 @@ public synchronized Pair> findAvailableSnapshots() - PipeConfig.getInstance().getPipeListeningQueueTransferSnapshotThreshold()) { clearSnapshots(); } + System.out.println(queueTailIndex2SnapshotsCache); return queueTailIndex2SnapshotsCache; } From f73700fb95f31807a4ea54f20636728b252187ee Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 25 Sep 2024 20:41:51 +0800 Subject: [PATCH 2/4] Update AbstractPipeListeningQueue.java --- .../queue/listening/AbstractPipeListeningQueue.java | 1 - 1 file changed, 1 deletion(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java index d1beddc3549b..c5d83845f5cc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/queue/listening/AbstractPipeListeningQueue.java @@ -82,7 +82,6 @@ public synchronized Pair> findAvailableSnapshots() - PipeConfig.getInstance().getPipeListeningQueueTransferSnapshotThreshold()) { clearSnapshots(); } - System.out.println(queueTailIndex2SnapshotsCache); return queueTailIndex2SnapshotsCache; } From f40570ccd0821effacdf1f15aac0f713756b4bda Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 25 Sep 2024 20:49:13 +0800 Subject: [PATCH 3/4] Update IoTDBPipeAlterIT.java --- .../pipe/it/autocreate/IoTDBPipeAlterIT.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java index fb541e7dcdb7..83d6bb547359 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java @@ -36,6 +36,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -470,7 +471,7 @@ public void testAlterPipeSourceAndProcessor() { try (final Connection connection = senderEnv.getConnection(); final Statement statement = connection.createStatement()) { statement.execute(sql); - } catch (SQLException e) { + } catch (final SQLException e) { fail(e.getMessage()); } @@ -491,12 +492,13 @@ public void testAlterPipeSourceAndProcessor() { TestUtils.assertDataEventuallyOnEnv( receiverEnv, "select * from root.db.**", "Time,root.db.d1.at1,", expectedResSet); - // Alter pipe (modify 'source.path' and 'processor.tumbling-time.interval-seconds') + // Alter pipe (modify 'source.path', 'source.inclusion' and + // 'processor.tumbling-time.interval-seconds') try (final Connection connection = senderEnv.getConnection(); final Statement statement = connection.createStatement()) { statement.execute( - "alter pipe a2b modify source('source' = 'iotdb-source','source.path'='root.db.d2.**') modify processor ('processor.tumbling-time.interval-seconds'='2')"); - } catch (SQLException e) { + "alter pipe a2b modify source('source' = 'iotdb-source','source.path'='root.db.d2.**', 'source.inclusion'='all') modify processor ('processor.tumbling-time.interval-seconds'='2')"); + } catch (final SQLException e) { fail(e.getMessage()); } @@ -527,5 +529,14 @@ public void testAlterPipeSourceAndProcessor() { "select * from root.db.** where time > 10000", "Time,root.db.d1.at1,root.db.d2.at1,", expectedResSet); + + // Create database on sender + if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "create database root.test")) { + fail(); + } + + // Check database on receiver + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, "count databases", "count,", Collections.singleton("2,")); } } From 95e64dedbd9e1eb2b342059b104bdf3fdf28618f Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 26 Sep 2024 10:36:50 +0800 Subject: [PATCH 4/4] Update IoTDBPipeAlterIT.java --- .../apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java index 83d6bb547359..364c5d475f0c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java @@ -531,12 +531,13 @@ public void testAlterPipeSourceAndProcessor() { expectedResSet); // Create database on sender - if (!TestUtils.tryExecuteNonQueryWithRetry(senderEnv, "create database root.test")) { + if (!TestUtils.tryExecuteNonQueryWithRetry( + senderEnv, "create timeSeries root.db.d2.at2 int32")) { fail(); } // Check database on receiver TestUtils.assertDataEventuallyOnEnv( - receiverEnv, "count databases", "count,", Collections.singleton("2,")); + receiverEnv, "count timeSeries", "count(timeseries),", Collections.singleton("3,")); } }