Skip to content

Commit

Permalink
Merge branch 'apache:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou authored Apr 2, 2024
2 parents cb97c2c + 80f392a commit d8f8c27
Show file tree
Hide file tree
Showing 51 changed files with 3,271 additions and 108 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ jobs:
- name: run seatunnel zeta integration test
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-seatunnel-e2e-base -am -Pci
./mvnw -T 1 -B verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-seatunnel-e2e-base,:connector-console-seatunnel-e2e -am -Pci
env:
MAVEN_OPTS: -Xmx4096m
engine-k8s-it:
Expand All @@ -578,6 +578,8 @@ jobs:
env:
KUBECONFIG: /etc/rancher/k3s/k3s.yaml
- uses: actions/checkout@v2
- name: free disk space
run: tools/github/free_disk_space.sh
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v3
with:
Expand Down Expand Up @@ -995,7 +997,7 @@ jobs:
java-version: ${{ matrix.java }}
distribution: 'temurin'
cache: 'maven'
- name: run jdbc connectors integration test (part-6)
- name: run jdbc connectors integration test (part-7)
if: needs.changes.outputs.api == 'true'
run: |
./mvnw -B -T 1 verify -DskipUT=true -DskipIT=false -D"license.skipAddThirdParty"=true --no-snapshot-updates -pl :connector-jdbc-e2e-part-7 -am -Pci
Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ there are some reference value for params above.
| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar |
| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | / | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar |
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar |
| xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | / | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar |

## Example

Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ there are some reference value for params above.
| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar |
| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar |
| Hive | org.apache.hive.jdbc.HiveDriver | jdbc:hive2://localhost:10000 | https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar |
| xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar |

## Example

Expand Down
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
<groupId>org.apache</groupId>
<artifactId>apache</artifactId>
<version>31</version>
<relativePath />
</parent>

<groupId>org.apache.seatunnel</groupId>
Expand Down Expand Up @@ -116,6 +117,7 @@
<jcommander.version>1.81</jcommander.version>
<junit4.version>4.13.2</junit4.version>
<junit5.version>5.9.0</junit5.version>
<mockito.version>4.11.0</mockito.version>
<config.version>1.3.3</config.version>
<maven-shade-plugin.version>3.3.0</maven-shade-plugin.version>
<maven-helper-plugin.version>3.2.0</maven-helper-plugin.version>
Expand Down Expand Up @@ -356,6 +358,13 @@
<version>${junit4.version}</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down Expand Up @@ -520,6 +529,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ public static List<CatalogTable> getCatalogTables(
return optionalCatalog
.map(
c -> {
long startTime = System.currentTimeMillis();
try (Catalog catalog = c) {
long startTime = System.currentTimeMillis();
catalog.open();
List<CatalogTable> catalogTables =
catalog.getTables(readonlyConfig);
log.info(
String.format(
"Get catalog tables, cost time: %d",
"Get catalog tables, cost time: %d ms",
System.currentTimeMillis() - startTime));
if (catalogTables.isEmpty()) {
throw new SeaTunnelException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
Expand All @@ -37,6 +39,8 @@
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;

/**
Expand Down Expand Up @@ -71,6 +75,10 @@ public JdbcSourceEventDispatcher(
filter,
changeEventCreator,
metadataProvider,
Heartbeat.create(
getHeartbeatInterval(connectorConfig),
topicSelector.getHeartbeatTopic(),
connectorConfig.getLogicalName()),
schemaNameAdjuster);
this.queue = queue;
this.topic = topicSelector.getPrimaryTopic();
Expand All @@ -92,4 +100,14 @@ public void dispatchWatermarkEvent(
sourcePartition, topic, sourceSplit.splitId(), watermarkKind, watermark);
queue.enqueue(new DataChangeEvent(sourceRecord));
}

private static Duration getHeartbeatInterval(CommonConnectorConfig connectorConfig) {
Configuration configuration = connectorConfig.getConfig();
Duration heartbeatInterval =
configuration.getDuration(Heartbeat.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS);
if (heartbeatInterval.isZero()) {
return Duration.ofMillis(5000);
}
return heartbeatInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.cdc.base.source.enumerator;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;

import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.HybridPendingSplitsState;
Expand All @@ -31,9 +33,11 @@
import io.debezium.relational.TableId;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;

/** Assigner for Hybrid split which contains snapshot splits and incremental splits. */
public class HybridSplitAssigner<C extends SourceConfig> implements SplitAssigner {
Expand Down Expand Up @@ -146,4 +150,22 @@ public void notifyCheckpointComplete(long checkpointId) {
snapshotSplitAssigner.notifyCheckpointComplete(checkpointId);
incrementalSplitAssigner.notifyCheckpointComplete(checkpointId);
}

@VisibleForTesting
IncrementalSplitAssigner<C> getIncrementalSplitAssigner() {
return incrementalSplitAssigner;
}

@VisibleForTesting
SnapshotSplitAssigner<C> getSnapshotSplitAssigner() {
return snapshotSplitAssigner;
}

public boolean completedSnapshotPhase(List<TableId> tableIds) {
return Arrays.asList(
snapshotSplitAssigner.completedSnapshotPhase(tableIds),
incrementalSplitAssigner.completedSnapshotPhase(tableIds))
.stream()
.allMatch(Predicate.isEqual(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.PendingSplitsState;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsAckEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
Expand Down Expand Up @@ -120,6 +121,17 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
.map(SnapshotSplitWatermark::getSplitId)
.collect(Collectors.toList()));
context.sendEventToSourceReader(subtaskId, ackEvent);
} else if (sourceEvent instanceof CompletedSnapshotPhaseEvent) {
LOG.debug(
"The enumerator receives completed snapshot phase event {} from subtask {}.",
sourceEvent,
subtaskId);
CompletedSnapshotPhaseEvent event = (CompletedSnapshotPhaseEvent) sourceEvent;
if (splitAssigner instanceof HybridSplitAssigner) {
((HybridSplitAssigner) splitAssigner).completedSnapshotPhase(event.getTableIds());
LOG.info(
"Clean the SnapshotSplitAssigner#assignedSplits/splitCompletedOffsets to empty.");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.cdc.base.source.enumerator;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;

import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.state.IncrementalPhaseState;
Expand Down Expand Up @@ -45,6 +47,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;

/** Assigner for incremental split. */
public class IncrementalSplitAssigner<C extends SourceConfig> implements SplitAssigner {

Expand Down Expand Up @@ -255,4 +259,23 @@ private IncrementalSplit createIncrementalSplit(
completedSnapshotSplitInfos,
checkpointDataType);
}

@VisibleForTesting
void setSplitAssigned(boolean assigned) {
this.splitAssigned = assigned;
}

public boolean completedSnapshotPhase(List<TableId> tableIds) {
checkArgument(splitAssigned && noMoreSplits());

for (String splitKey : new ArrayList<>(context.getAssignedSnapshotSplit().keySet())) {
SnapshotSplit assignedSplit = context.getAssignedSnapshotSplit().get(splitKey);
if (tableIds.contains(assignedSplit.getTableId())) {
context.getAssignedSnapshotSplit().remove(splitKey);
context.getSplitCompletedOffsets().remove(assignedSplit.splitId());
}
}
return context.getAssignedSnapshotSplit().isEmpty()
&& context.getSplitCompletedOffsets().isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.cdc.base.source.enumerator;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;

import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
Expand Down Expand Up @@ -45,6 +47,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;

/** Assigner for snapshot split. */
public class SnapshotSplitAssigner<C extends SourceConfig> implements SplitAssigner {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitAssigner.class);
Expand Down Expand Up @@ -278,4 +282,28 @@ public boolean isCompleted() {
private boolean allSplitsCompleted() {
return noMoreSplits() && assignedSplits.size() == splitCompletedOffsets.size();
}

@VisibleForTesting
Map<String, SnapshotSplit> getAssignedSplits() {
return assignedSplits;
}

@VisibleForTesting
Map<String, SnapshotSplitWatermark> getSplitCompletedOffsets() {
return splitCompletedOffsets;
}

public boolean completedSnapshotPhase(List<TableId> tableIds) {
checkArgument(isCompleted() && allSplitsCompleted());

for (String splitKey : new ArrayList<>(assignedSplits.keySet())) {
SnapshotSplit assignedSplit = assignedSplits.get(splitKey);
if (tableIds.contains(assignedSplit.getTableId())) {
assignedSplits.remove(splitKey);
splitCompletedOffsets.remove(assignedSplit.splitId());
}
}

return assignedSplits.isEmpty() && splitCompletedOffsets.isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.cdc.base.source.event;

import org.apache.seatunnel.api.source.SourceEvent;

import io.debezium.relational.TableId;
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.List;

@Data
@AllArgsConstructor
public class CompletedSnapshotPhaseEvent implements SourceEvent {
private static final long serialVersionUID = 1L;

private List<TableId> tableIds;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
Expand Down Expand Up @@ -207,7 +208,19 @@ protected SourceSplitStateBase initializedState(SourceSplitBase split) {
debeziumDeserializationSchema.restoreCheckpointProducedType(
incrementalSplit.getCheckpointDataType());
}
return new IncrementalSplitState(split.asIncrementalSplit());
IncrementalSplitState splitState = new IncrementalSplitState(incrementalSplit);
if (splitState.autoEnterPureIncrementPhaseIfAllowed()) {
log.info(
"The incremental split[{}] startup position {} is equal the maxSnapshotSplitsHighWatermark {}, auto enter pure increment phase.",
incrementalSplit.splitId(),
splitState.getStartupOffset(),
splitState.getMaxSnapshotSplitsHighWatermark());
log.info("Clean the IncrementalSplit#completedSnapshotSplitInfos to empty.");
CompletedSnapshotPhaseEvent event =
new CompletedSnapshotPhaseEvent(splitState.getTableIds());
context.sendSourceEventToEnumerator(event);
}
return splitState;
}
}

Expand Down
Loading

0 comments on commit d8f8c27

Please sign in to comment.