Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: Fixed the bug that alter source will not alter the reference count of ConfigRegionListeningQueue #13626

Merged
merged 5 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -470,7 +471,7 @@ public void testAlterPipeSourceAndProcessor() {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(sql);
} catch (SQLException e) {
} catch (final SQLException e) {
fail(e.getMessage());
}

Expand All @@ -491,12 +492,13 @@ public void testAlterPipeSourceAndProcessor() {
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "select * from root.db.**", "Time,root.db.d1.at1,", expectedResSet);

// Alter pipe (modify 'source.path' and 'processor.tumbling-time.interval-seconds')
// Alter pipe (modify 'source.path', 'source.inclusion' and
// 'processor.tumbling-time.interval-seconds')
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
"alter pipe a2b modify source('source' = 'iotdb-source','source.path'='root.db.d2.**') modify processor ('processor.tumbling-time.interval-seconds'='2')");
} catch (SQLException e) {
"alter pipe a2b modify source('source' = 'iotdb-source','source.path'='root.db.d2.**', 'source.inclusion'='all') modify processor ('processor.tumbling-time.interval-seconds'='2')");
} catch (final SQLException e) {
fail(e.getMessage());
}

Expand Down Expand Up @@ -527,5 +529,15 @@ public void testAlterPipeSourceAndProcessor() {
"select * from root.db.** where time > 10000",
"Time,root.db.d1.at1,root.db.d2.at1,",
expectedResSet);

// Create database on sender
if (!TestUtils.tryExecuteNonQueryWithRetry(
senderEnv, "create timeSeries root.db.d2.at2 int32")) {
fail();
}

// Check database on receiver
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "count timeSeries", "count(timeseries),", Collections.singleton("3,"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class PipeConfigNodeRuntimeAgent implements IService {
Expand Down Expand Up @@ -90,12 +89,13 @@ public ConfigRegionListeningQueue listener() {
return regionListener.listener();
}

public void increaseListenerReference(PipeParameters parameters) throws IllegalPathException {
public void increaseListenerReference(final PipeParameters parameters)
throws IllegalPathException {
regionListener.increaseReference(parameters);
}

public void decreaseListenerReference(PipeParameters parameters)
throws IllegalPathException, IOException {
public void decreaseListenerReference(final PipeParameters parameters)
throws IllegalPathException {
regionListener.decreaseReference(parameters);
}

Expand All @@ -120,15 +120,16 @@ public boolean isLeaderReady() {

//////////////////////////// Runtime Exception Handlers ////////////////////////////

public void report(EnrichedEvent event, PipeRuntimeException pipeRuntimeException) {
public void report(final EnrichedEvent event, final PipeRuntimeException pipeRuntimeException) {
if (event.getPipeTaskMeta() != null) {
report(event.getPipeTaskMeta(), pipeRuntimeException);
} else {
LOGGER.warn("Attempt to report pipe exception to a null PipeTaskMeta.", pipeRuntimeException);
}
}

private void report(PipeTaskMeta pipeTaskMeta, PipeRuntimeException pipeRuntimeException) {
private void report(
final PipeTaskMeta pipeTaskMeta, final PipeRuntimeException pipeRuntimeException) {
LOGGER.warn(
"Report PipeRuntimeException to local PipeTaskMeta({}), exception message: {}",
pipeTaskMeta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.iotdb.confignode.manager.pipe.extractor.ConfigRegionListeningQueue;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class PipeConfigRegionListener {
Expand All @@ -38,7 +37,7 @@ public synchronized ConfigRegionListeningQueue listener() {
return listeningQueue;
}

public synchronized void increaseReference(PipeParameters parameters)
public synchronized void increaseReference(final PipeParameters parameters)
throws IllegalPathException {
if (!ConfigRegionListeningFilter.parseListeningPlanTypeSet(parameters).isEmpty()) {
listeningQueueReferenceCount++;
Expand All @@ -48,8 +47,8 @@ public synchronized void increaseReference(PipeParameters parameters)
}
}

public synchronized void decreaseReference(PipeParameters parameters)
throws IllegalPathException, IOException {
public synchronized void decreaseReference(final PipeParameters parameters)
throws IllegalPathException {
if (!ConfigRegionListeningFilter.parseListeningPlanTypeSet(parameters).isEmpty()) {
listeningQueueReferenceCount--;
if (listeningQueueReferenceCount == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,35 @@ public TSStatus dropPipe(final DropPipePlanV2 plan) {

public TSStatus alterPipe(final AlterPipePlanV2 plan) {
try {
final Optional<PipeMeta> pipeMetaBeforeAlter =
Optional.ofNullable(
pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeStaticMeta().getPipeName()));

pipeTaskInfo.alterPipe(plan);

PipeConfigNodeAgent.task()
.handleSinglePipeMetaChanges(
pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeStaticMeta().getPipeName()));
PipeTemporaryMetaMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
final TPushPipeMetaRespExceptionMessage message =
PipeConfigNodeAgent.task()
.handleSinglePipeMetaChanges(
pipeTaskInfo.getPipeMetaByPipeName(plan.getPipeStaticMeta().getPipeName()));
if (message == null) {
pipeMetaBeforeAlter.ifPresent(
meta -> {
try {
PipeConfigNodeAgent.runtime()
.decreaseListenerReference(meta.getStaticMeta().getExtractorParameters());
} catch (final Exception e) {
throw new PipeException("Failed to decrease listener reference", e);
}
});
PipeConfigNodeAgent.runtime()
.increaseListenerReference(plan.getPipeStaticMeta().getExtractorParameters());
PipeTemporaryMetaMetrics.getInstance()
.handleTemporaryMetaChanges(pipeTaskInfo.getPipeMetaList());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
.setMessage(message.getMessage());
}
} catch (final Exception e) {
LOGGER.error("Failed to alter pipe", e);
return new TSStatus(TSStatusCode.PIPE_ERROR.getStatusCode())
Expand Down
Loading