Skip to content

Commit

Permalink
fix(cdc): fix debezium engine config (#15536)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Mar 8, 2024
1 parent a79404d commit bffaa38
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package com.risingwave.connector;

import com.risingwave.connector.source.common.DbzConnectorConfig;
import com.risingwave.metrics.ConnectorNodeMetrics;
import io.grpc.Server;
import io.grpc.ServerBuilder;
Expand All @@ -37,16 +36,6 @@ public static void main(String[] args) throws Exception {
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);

// Quoted from the debezium document:
// > Your application should always properly stop the engine to ensure graceful and complete
// > shutdown and that each source record is sent to the application exactly one time.
// However, in RisingWave we assume the upstream changelog may contain duplicate events and
// handle conflicts in the mview operator, thus we don't need to obey the above
// instructions.
// So we decrease the wait time to 1 second here to reclaim grpc resources faster when the
// grpc channel is broken.
System.setProperty(DbzConnectorConfig.WAIT_FOR_CONNECTOR_EXIT_BEFORE_INTERRUPT_MS, "1000");

int port = DEFAULT_PORT;
String prometheusHttpHost = DEFAULT_PROMETHEUS_HOST;
if (cmd.hasOption("p")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@
public class DbzConnectorConfig {
private static final Logger LOG = LoggerFactory.getLogger(DbzConnectorConfig.class);

/* Debezium private configs */
public static final String WAIT_FOR_CONNECTOR_EXIT_BEFORE_INTERRUPT_MS =
"debezium.embedded.shutdown.pause.before.interrupt.ms";

public static final String WAIT_FOR_STREAMING_START_BEFORE_EXIT_SECS =
"cdc.source.wait.streaming.before.exit.seconds";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,10 @@ interval.handling.mode=string
max.batch.size=${debezium.max.batch.size:-1024}
max.queue.size=${debezium.max.queue.size:-8192}
time.precision.mode=adaptive_time_microseconds
# Quoted from the debezium document:
# > Your application should always properly stop the engine to ensure graceful and complete
# > shutdown and that each source record is sent to the application exactly one time.
# In RisingWave we assume the upstream changelog may contain duplicate events and
# handle conflicts in the mview operator, thus we don't need to obey the above
# instructions. So we decrease the wait time here to reclaim jvm thread faster.
debezium.embedded.shutdown.pause.before.interrupt.ms=1
7 changes: 0 additions & 7 deletions src/jni_core/src/jvm_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,6 @@ impl JavaVmWrapper {
.option(format!("-Djava.class.path={}", class_vec.join(":")))
.option("-Xms16m")
.option(format!("-Xmx{}", jvm_heap_size))
// Quoted from the debezium document:
// > Your application should always properly stop the engine to ensure graceful and complete
// > shutdown and that each source record is sent to the application exactly one time.
// In RisingWave we assume the upstream changelog may contain duplicate events and
// handle conflicts in the mview operator, thus we don't need to obey the above
// instructions. So we decrease the wait time here to reclaim jvm thread faster.
.option("-Ddebezium.embedded.shutdown.pause.before.interrupt.ms=1")
.option("-Dcdc.source.wait.streaming.before.exit.seconds=30");

tracing::info!("JVM args: {:?}", args_builder);
Expand Down

0 comments on commit bffaa38

Please sign in to comment.