Skip to content

Commit

Permalink
[Feature][Kafka] Support multi-table source read (apache#5992)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilinli123 authored May 22, 2024
1 parent 4969c91 commit 6010460
Show file tree
Hide file tree
Showing 37 changed files with 999 additions and 492 deletions.
62 changes: 62 additions & 0 deletions docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
| Name | Type | Required | Default | Description |
|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. |
| table_list | Map | No | - | Topic list config You can configure only one `table_list` and one `topic` at the same time |
| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. |
| pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. |
| consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. |
Expand Down Expand Up @@ -180,3 +181,64 @@ source {
}
```

### Multiple Kafka Source

> This is written to the same pg table according to different formats and topics of parsing kafka Perform upsert operations based on the id
```hocon
env {
execution.parallelism = 1
job.mode = "BATCH"
}
source {
Kafka {
bootstrap.servers = "kafka_e2e:9092"
table_list = [
{
topic = "^test-ogg-sou.*"
pattern = "true"
consumer.group = "ogg_multi_group"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = ogg_json
},
{
topic = "test-cdc_mds"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = canal_json
}
]
}
}
sink {
Jdbc {
driver = org.postgresql.Driver
url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
generate_sink_sql = true
database = test
table = public.sink
primary_keys = ["id"]
}
}
```

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class AmazonSqsSource extends AbstractSingleSplitSource<SeaTunnelRow>
private AmazonSqsSourceOptions amazonSqsSourceOptions;
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
private SeaTunnelRowType typeInfo;
private CatalogTable catalogTable;

@Override
public String getPluginName() {
Expand All @@ -84,9 +86,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
amazonSqsSourceOptions = new AmazonSqsSourceOptions(pluginConfig);
typeInfo = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();

this.amazonSqsSourceOptions = new AmazonSqsSourceOptions(pluginConfig);
this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
this.typeInfo = catalogTable.getSeaTunnelRowType();
setDeserialization(pluginConfig);
}

Expand All @@ -109,11 +111,11 @@ public AbstractSingleSplitReader<SeaTunnelRow> createReader(

private void setDeserialization(Config config) {
if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
MessageFormat format = ReadonlyConfig.fromConfig(config).get(FORMAT);
switch (format) {
case JSON:
deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
deserializationSchema =
new JsonDeserializationSchema(catalogTable, false, false);
break;
case TEXT:
String delimiter = DEFAULT_FIELD_DELIMITER;
Expand All @@ -128,7 +130,7 @@ private void setDeserialization(Config config) {
break;
case CANAL_JSON:
deserializationSchema =
CanalJsonDeserializationSchema.builder(typeInfo)
CanalJsonDeserializationSchema.builder(catalogTable)
.setIgnoreParseErrors(true)
.build();
break;
Expand All @@ -138,7 +140,8 @@ private void setDeserialization(Config config) {
includeSchema = config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
}
deserializationSchema =
new DebeziumJsonDeserializationSchema(typeInfo, true, includeSchema);
new DebeziumJsonDeserializationSchema(
catalogTable, true, includeSchema);
break;
default:
throw new SeaTunnelJsonFormatException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public FakeDataGenerator(FakeConfig fakeConfig) {
this.jsonDeserializationSchema =
fakeConfig.getFakeRows() == null
? null
: new JsonDeserializationSchema(
false, false, catalogTable.getSeaTunnelRowType());
: new JsonDeserializationSchema(catalogTable, false, false);
this.fakeDataRandomUtils = new FakeDataRandomUtils(fakeConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -46,6 +47,7 @@
public class SheetsSource extends AbstractSingleSplitSource<SeaTunnelRow> {

private SeaTunnelRowType seaTunnelRowType;
private CatalogTable catalogTable;

private SheetsParameters sheetsParameters;

Expand Down Expand Up @@ -75,12 +77,13 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
}
this.sheetsParameters = new SheetsParameters().buildWithConfig(pluginConfig);
if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
this.seaTunnelRowType =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
} else {
this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
this.catalogTable = CatalogTableUtil.buildSimpleTextTable();
}
this.deserializationSchema = new JsonDeserializationSchema(false, false, seaTunnelRowType);

this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
this.deserializationSchema = new JsonDeserializationSchema(catalogTable, false, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.google.sheets.deserialize;

import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
Expand All @@ -39,8 +41,11 @@ public class GoogleSheetsDeserializerTest {
public void testJsonParseError() {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new SeaTunnelDataType[] {STRING_TYPE});

CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "", schema);

final DeserializationSchema<SeaTunnelRow> deser =
new JsonDeserializationSchema(false, false, schema);
new JsonDeserializationSchema(catalogTables, false, false);
final GoogleSheetsDeserializer googleSheetsDeser =
new GoogleSheetsDeserializer(schema.getFieldNames(), deser);
List<Object> row = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ protected void buildSchemaWithConfig(Config pluginConfig) {
switch (format) {
case JSON:
this.deserializationSchema =
new JsonDeserializationSchema(
false, false, catalogTable.getSeaTunnelRowType());
new JsonDeserializationSchema(catalogTable, false, false);
if (pluginConfig.hasPath(HttpConfig.JSON_FIELD.key())) {
jsonField =
getJsonField(pluginConfig.getConfig(HttpConfig.JSON_FIELD.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,10 @@ public class Config {
.defaultValue(KafkaSemantics.NON)
.withDescription(
"Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
public static final Option<List<Map<String, Object>>> TABLE_LIST =
Options.key("table_list")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription(
"Topic list config. You can configure only one `table_list` or one `topic` at the same time");
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;

import org.apache.kafka.common.TopicPartition;
Expand All @@ -33,11 +36,11 @@ public class ConsumerMetadata implements Serializable {

private String topic;
private boolean isPattern = false;
private String bootstrapServers;
private Properties properties;
private String consumerGroup;
private boolean commitOnCheckpoint = false;
private StartMode startMode = StartMode.GROUP_OFFSETS;
private Map<TopicPartition, Long> specificStartOffsets;
private Long startOffsetsTimestamp;
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
private CatalogTable catalogTable;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ public class KafkaConsumerThread implements Runnable {

private final LinkedBlockingQueue<Consumer<KafkaConsumer<byte[], byte[]>>> tasks;

public KafkaConsumerThread(ConsumerMetadata metadata) {
public KafkaConsumerThread(KafkaSourceConfig kafkaSourceConfig, ConsumerMetadata metadata) {
this.metadata = metadata;
this.tasks = new LinkedBlockingQueue<>();
this.consumer =
initConsumer(
this.metadata.getBootstrapServers(),
this.metadata.getConsumerGroup(),
this.metadata.getProperties(),
!this.metadata.isCommitOnCheckpoint());
kafkaSourceConfig.getBootstrap(),
metadata.getConsumerGroup(),
kafkaSourceConfig.getProperties(),
kafkaSourceConfig.isCommitOnCheckpoint());
}

@Override
Expand All @@ -64,7 +64,9 @@ public void run() {
}
} finally {
try {
consumer.close();
if (consumer != null) {
consumer.close();
}
} catch (Throwable t) {
throw new KafkaConnectorException(KafkaConnectorErrorCode.CONSUMER_CLOSE_FAILED, t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;

import com.google.common.collect.Lists;

import java.util.List;
import java.util.stream.Collectors;

public class KafkaSource
implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, KafkaSourceState>,
Expand Down Expand Up @@ -59,37 +58,32 @@ public String getPluginName() {

@Override
public List<CatalogTable> getProducedCatalogTables() {
return Lists.newArrayList(kafkaSourceConfig.getCatalogTable());
return kafkaSourceConfig.getMapMetadata().values().stream()
.map(ConsumerMetadata::getCatalogTable)
.collect(Collectors.toList());
}

@Override
public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
SourceReader.Context readerContext) {
return new KafkaSourceReader(
kafkaSourceConfig.getMetadata(),
kafkaSourceConfig.getDeserializationSchema(),
kafkaSourceConfig,
readerContext,
kafkaSourceConfig.getMessageFormatErrorHandleWay());
}

@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> createEnumerator(
SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext) {
return new KafkaSourceSplitEnumerator(
kafkaSourceConfig.getMetadata(),
enumeratorContext,
kafkaSourceConfig.getDiscoveryIntervalMillis());
return new KafkaSourceSplitEnumerator(kafkaSourceConfig, enumeratorContext, null);
}

@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerator(
SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext,
KafkaSourceState checkpointState) {
return new KafkaSourceSplitEnumerator(
kafkaSourceConfig.getMetadata(),
enumeratorContext,
checkpointState,
kafkaSourceConfig.getDiscoveryIntervalMillis());
kafkaSourceConfig, enumeratorContext, checkpointState);
}

@Override
Expand Down
Loading

0 comments on commit 6010460

Please sign in to comment.