Skip to content

Commit

Permalink
Pipe: Fixed the bug that alter source will not alter the reference co…
Browse files Browse the repository at this point in the history
…unt of ConfigRegionListeningQueue (#13626)
  • Loading branch information
Caideyipi authored Sep 26, 2024
1 parent c4f1756 commit e7a37dc
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -470,7 +471,7 @@ public void testAlterPipeSourceAndProcessor() {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(sql);
} catch (SQLException e) {
} catch (final SQLException e) {
fail(e.getMessage());
}

Expand All @@ -491,12 +492,13 @@ public void testAlterPipeSourceAndProcessor() {
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "select * from root.db.**", "Time,root.db.d1.at1,", expectedResSet);

// Alter pipe (modify 'source.path' and 'processor.tumbling-time.interval-seconds')
// Alter pipe (modify 'source.path', 'source.inclusion' and
// 'processor.tumbling-time.interval-seconds')
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
"alter pipe a2b modify source('source' = 'iotdb-source','source.path'='root.db.d2.**') modify processor ('processor.tumbling-time.interval-seconds'='2')");
} catch (SQLException e) {
"alter pipe a2b modify source('source' = 'iotdb-source','source.path'='root.db.d2.**', 'source.inclusion'='all') modify processor ('processor.tumbling-time.interval-seconds'='2')");
} catch (final SQLException e) {
fail(e.getMessage());
}

Expand Down Expand Up @@ -527,5 +529,15 @@ public void testAlterPipeSourceAndProcessor() {
"select * from root.db.** where time > 10000",
"Time,root.db.d1.at1,root.db.d2.at1,",
expectedResSet);

// Create database on sender
if (!TestUtils.tryExecuteNonQueryWithRetry(
senderEnv, "create timeSeries root.db.d2.at2 int32")) {
fail();
}

// Check database on receiver
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "count timeSeries", "count(timeseries),", Collections.singleton("3,"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class PipeConfigNodeRuntimeAgent implements IService {
Expand Down Expand Up @@ -90,12 +89,13 @@ public ConfigRegionListeningQueue listener() {
return regionListener.listener();
}

public void increaseListenerReference(PipeParameters parameters) throws IllegalPathException {
public void increaseListenerReference(final PipeParameters parameters)
throws IllegalPathException {
regionListener.increaseReference(parameters);
}

public void decreaseListenerReference(PipeParameters parameters)
throws IllegalPathException, IOException {
public void decreaseListenerReference(final PipeParameters parameters)
throws IllegalPathException {
regionListener.decreaseReference(parameters);
}

Expand All @@ -120,15 +120,16 @@ public boolean isLeaderReady() {

//////////////////////////// Runtime Exception Handlers ////////////////////////////

public void report(EnrichedEvent event, PipeRuntimeException pipeRuntimeException) {
public void report(final EnrichedEvent event, final PipeRuntimeException pipeRuntimeException) {
if (event.getPipeTaskMeta() != null) {
report(event.getPipeTaskMeta(), pipeRuntimeException);
} else {
LOGGER.warn("Attempt to report pipe exception to a null PipeTaskMeta.", pipeRuntimeException);
}
}

private void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeException) {
private void report(
final PipeTaskMeta pipeTaskMeta, final PipeRuntimeException pipeRuntimeException) {
LOGGER.warn(
"Report PipeRuntimeException to local PipeTaskMeta({}), exception message: {}",
pipeTaskMeta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningQueue;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class PipeConfigRegionListener {
Expand All @@ -38,7 +37,7 @@ public synchronized ConfigRegionListeningQueue listener() {
return listeningQueue;
}

public synchronized void increaseReference(PipeParameters parameters)
public synchronized void increaseReference(final PipeParameters parameters)
throws IllegalPathException {
if (!ConfigRegionListeningFilter.parseListeningPlanTypeSet(parameters).isEmpty()) {
listeningQueueReferenceCount++;
Expand All @@ -48,8 +47,8 @@ public synchronized void increaseReference(PipeParameters parameters)
}
}

public synchronized void decreaseReference(PipeParameters parameters)
throws IllegalPathException, IOException {
public synchronized void decreaseReference(final PipeParameters parameters)
throws IllegalPathException {
if (!ConfigRegionListeningFilter.parseListeningPlanTypeSet(parameters).isEmpty()) {
listeningQueueReferenceCount--;
if (listeningQueueReferenceCount == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,35 @@ public TSStatus dropPipe(final DropPipePlanV2 plan) {

public TSStatus alterPipe(final AlterPipePlanV2 plan) {
try {
final Optional<PipeMeta> pipeMetaBeforeAlter =
Optional.ofNullable(
pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeStaticMeta().getPipeName()));

pipeTaskInfo.alterPipe(plan);

PipeConfigNodeAgent.task()
.handleSinglePipeMetaChanges(
pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeStaticMeta().getPipeName()));
PipeTemporaryMetaMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
final TPushPipeMetaRespExceptionMessage message =
PipeConfigNodeAgent.task()
.handleSinglePipeMetaChanges(
pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeStaticMeta().getPipeName()));
if (message == null) {
pipeMetaBeforeAlter.ifPresent(
meta -> {
try {
PipeConfigNodeAgent.runtime()
.decreaseListenerReference(meta.getStaticMeta().getExtractorParameters());
} catch (final Exception e) {
throw new PipeException("Failed to decrease listener reference", e);
}
});
PipeConfigNodeAgent.runtime()
.increaseListenerReference(plan.getPipeStaticMeta().getExtractorParameters());
PipeTemporaryMetaMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(message.getMessage());
}
} catch (final Exception e) {
LOGGER.error("Failed to alter pipe", e);
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
Expand Down

0 comments on commit e7a37dc

Please sign in to comment.