diff --git a/docs/en/connector-v2/source/StarRocks.md b/docs/en/connector-v2/source/StarRocks.md index 1c1a109480a..5609902a6cd 100644 --- a/docs/en/connector-v2/source/StarRocks.md +++ b/docs/en/connector-v2/source/StarRocks.md @@ -26,6 +26,7 @@ delivers the query plan as a parameter to BE nodes, and then obtains data result | password | string | yes | - | | database | string | yes | - | | table | string | yes | - | +| table_list | array | yes | - | | scan_filter | string | no | - | | schema | config | yes | - | | request_tablet_size | int | no | Integer.MAX_VALUE | @@ -57,6 +58,10 @@ The name of StarRocks database The name of StarRocks table +### table_list [array] + +The list of tables to be read, you can use this configuration instead of `table` + ### scan_filter [string] Filter expression of the query, which is transparently transmitted to StarRocks. StarRocks uses this expression to complete source-side data filtering. @@ -76,7 +81,7 @@ The schema of the starRocks that you want to generate e.g. ``` -schema { +schema { fields { name = string age = int @@ -177,6 +182,67 @@ source { } ``` +## Example 2: Multiple tables + +``` +source { + StarRocks { + nodeUrls = ["starrocks_e2e:8030"] + username = root + password = "" + database = "test" + table_list = [ + { + table = "e2e_table_source" + schema { + fields { + BIGINT_COL = BIGINT + LARGEINT_COL = STRING + SMALLINT_COL = SMALLINT + TINYINT_COL = TINYINT + BOOLEAN_COL = BOOLEAN + DECIMAL_COL = "DECIMAL(20, 1)" + DOUBLE_COL = DOUBLE + FLOAT_COL = FLOAT + INT_COL = INT + CHAR_COL = STRING + VARCHAR_11_COL = STRING + STRING_COL = STRING + DATETIME_COL = TIMESTAMP + DATE_COL = DATE + } + } + }, + { + table = "e2e_table_source_2" + schema = { + fields { + BIGINT_COL_2 = BIGINT + LARGEINT_COL_2 = STRING + SMALLINT_COL_2 = SMALLINT + TINYINT_COL_2 = TINYINT + BOOLEAN_COL_2 = BOOLEAN + DECIMAL_COL_2 = "DECIMAL(20, 1)" + DOUBLE_COL_2 = DOUBLE + FLOAT_COL_2 = FLOAT + INT_COL_2 = INT + CHAR_COL_2 = STRING + VARCHAR_11_COL_2 = STRING + STRING_COL_2 = STRING + DATETIME_COL_2 = TIMESTAMP + DATE_COL_2 = DATE + } + } + }] + scan_batch_rows = 10 + max_retries = 3 + scan.params.scanner_thread_pool_thread_num = "3" + + } +} + +``` + ## Changelog ### next version diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java index 99cb7353cf6..2404a616ace 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java @@ -22,9 +22,13 @@ public enum CommonErrorCode implements SeaTunnelErrorCode { FILE_OPERATION_FAILED("COMMON-01", " file '' failed."), JSON_OPERATION_FAILED( "COMMON-02", " JSON convert/parse '' operation failed."), + UNSUPPORTED_OPERATION("COMMON-05", "Unsupported operation"), + ILLEGAL_ARGUMENT("COMMON-06", "Illegal argument"), UNSUPPORTED_DATA_TYPE( "COMMON-07", "'' unsupported data type '' of ''"), UNSUPPORTED_ENCODING("COMMON-08", "unsupported encoding ''"), + WRITER_OPERATION_FAILED( + "COMMON-11", "Sink writer operation failed, such as (open, close) etc..."), CONVERT_TO_SEATUNNEL_TYPE_ERROR( "COMMON-16", "'' unsupported convert type '' of '' to SeaTunnel data type."), diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java index c0283399a03..60ffd422e3b 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode; @@ -71,7 +72,7 @@ public Boolean doStreamLoad(StarRocksFlushTuple flushData) throws IOException { String host = getAvailableHost(); if (null == host) { throw new StarRocksConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + CommonErrorCode.ILLEGAL_ARGUMENT, "None of the host in `load_url` could be connected."); } String loadUrl = diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java index c0be0106bb0..c5b2480ac88 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java @@ -92,14 +92,17 @@ public StarRocksBeReadClient(String beNodeInfo, SourceConfig sourceConfig) { } public void openScanner(QueryPartition partition, SeaTunnelRowType seaTunnelRowType) { + eos.set(false); + this.readerOffset = 0; + this.rowBatch = null; this.seaTunnelRowType = seaTunnelRowType; Set tabletIds = partition.getTabletIds(); TScanOpenParams params = new TScanOpenParams(); params.setTablet_ids(new ArrayList<>(tabletIds)); params.setOpaqued_query_plan(partition.getQueryPlan()); params.setCluster(DEFAULT_CLUSTER_NAME); - params.setDatabase(sourceConfig.getDatabase()); - params.setTable(sourceConfig.getTable()); + params.setDatabase(partition.getDatabase()); + params.setTable(partition.getTable()); params.setUser(sourceConfig.getUsername()); params.setPasswd(sourceConfig.getPassword()); params.setBatch_size(sourceConfig.getBatchRows()); diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksQueryPlanReadClient.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksQueryPlanReadClient.java index 85458f7151b..f2cea255b4d 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksQueryPlanReadClient.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksQueryPlanReadClient.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition; import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPlan; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig; +import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceTableConfig; import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException; @@ -39,6 +40,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; @Slf4j public class StarRocksQueryPlanReadClient { @@ -46,37 +49,33 @@ public class StarRocksQueryPlanReadClient { private SourceConfig sourceConfig; private SeaTunnelRowType seaTunnelRowType; private final HttpHelper httpHelper = new HttpHelper(); + private final Map tables; private static final long DEFAULT_SLEEP_TIME_MS = 1000L; - public StarRocksQueryPlanReadClient( - SourceConfig sourceConfig, SeaTunnelRowType seaTunnelRowType) { + public StarRocksQueryPlanReadClient(SourceConfig sourceConfig) { this.sourceConfig = sourceConfig; - this.seaTunnelRowType = seaTunnelRowType; this.retryMaterial = new RetryUtils.RetryMaterial( sourceConfig.getMaxRetries(), true, exception -> true, DEFAULT_SLEEP_TIME_MS); + this.tables = + sourceConfig.getTableConfigList().stream() + .collect( + Collectors.toMap( + StarRocksSourceTableConfig::getTable, Function.identity())); } - public List findPartitions() { - List nodeUrls = sourceConfig.getNodeUrls(); - QueryPlan queryPlan = getQueryPlan(genQuerySql(), nodeUrls); + public List findPartitions(String table) { + QueryPlan queryPlan = getQueryPlan(genQuerySql(table), table); Map> be2Tablets = selectBeForTablet(queryPlan); - return tabletsMapToPartition( - be2Tablets, - queryPlan.getQueryPlan(), - sourceConfig.getDatabase(), - sourceConfig.getTable()); + return tabletsMapToPartition(be2Tablets, queryPlan.getQueryPlan(), table); } private List tabletsMapToPartition( - Map> be2Tablets, - String opaquedQueryPlan, - String database, - String table) + Map> be2Tablets, String opaquedQueryPlan, String table) throws IllegalArgumentException { int tabletsSize = sourceConfig.getRequestTabletSize(); List partitions = new ArrayList<>(); @@ -98,7 +97,7 @@ private List tabletsMapToPartition( first = first + tabletsSize; QueryPartition partitionDefinition = new QueryPartition( - database, + sourceConfig.getDatabase(), table, beInfo.getKey(), partitionTablets, @@ -134,8 +133,9 @@ private Map> selectBeForTablet(QueryPlan queryPlan) { return beXTablets; } - private QueryPlan getQueryPlan(String querySQL, List nodeUrls) { + private QueryPlan getQueryPlan(String querySQL, String table) { + List nodeUrls = sourceConfig.getNodeUrls(); Map bodyMap = new HashMap<>(); bodyMap.put("sql", querySQL); String body = JsonUtils.toJsonString(bodyMap); @@ -147,7 +147,7 @@ private QueryPlan getQueryPlan(String querySQL, List nodeUrls) { .append("/api/") .append(sourceConfig.getDatabase()) .append("/") - .append(sourceConfig.getTable()) + .append(table) .append("/_query_plan") .toString(); try { @@ -183,15 +183,17 @@ private Map getQueryPlanHttpHeader() { return headerMap; } - private String genQuerySql() { + private String genQuerySql(String table) { + + StarRocksSourceTableConfig starRocksSourceTableConfig = tables.get(table); + SeaTunnelRowType seaTunnelRowType = + starRocksSourceTableConfig.getCatalogTable().getSeaTunnelRowType(); String columns = seaTunnelRowType.getFieldNames().length != 0 ? String.join(",", seaTunnelRowType.getFieldNames()) : "*"; - String filter = - sourceConfig.getScanFilter().isEmpty() - ? "" - : " where " + sourceConfig.getScanFilter(); + String scanFilter = starRocksSourceTableConfig.getScanFilter(); + String filter = scanFilter.isEmpty() ? "" : " where " + scanFilter; String sql = "select " @@ -202,7 +204,7 @@ private String genQuerySql() { + "`" + "." + "`" - + sourceConfig.getTable() + + table + "`" + filter; log.debug("Generate query sql '{}'.", sql); diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/model/QueryPartition.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/model/QueryPartition.java index f4ec165ff85..3af63ae95e1 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/model/QueryPartition.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/model/QueryPartition.java @@ -68,7 +68,7 @@ public int compareTo(QueryPartition o) { similar.retainAll(o.tabletIds); diffSelf.removeAll(similar); diffOther.removeAll(similar); - if (diffSelf.size() == 0) { + if (diffSelf.isEmpty()) { return 0; } long diff = Collections.min(diffSelf) - Collections.min(diffOther); @@ -103,7 +103,7 @@ public int hashCode() { @Override public String toString() { return "QueryPartition{" - + ", database='" + + "database='" + database + '\'' + ", table='" diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java index c8a4775fcff..61056142108 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/CommonConfig.java @@ -70,13 +70,11 @@ public class CommonConfig implements Serializable { private String username; private String password; private String database; - private String table; public CommonConfig(ReadonlyConfig config) { this.nodeUrls = config.get(NODE_URLS); this.username = config.get(USERNAME); this.password = config.get(PASSWORD); this.database = config.get(DATABASE); - this.table = config.get(TABLE); } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java index d0698638430..d45a5e76d5b 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SourceConfig.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.config; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; + import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.ReadonlyConfig; @@ -24,7 +26,9 @@ import lombok.Getter; import lombok.Setter; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; @Setter @@ -53,6 +57,7 @@ public SourceConfig(ReadonlyConfig config) { key.substring(prefix.length()).toLowerCase(), value); } }); + tableConfigList = StarRocksSourceTableConfig.of(config); } public static final Option MAX_RETRIES = @@ -106,6 +111,12 @@ public SourceConfig(ReadonlyConfig config) { .noDefaultValue() .withDescription("The parameter of the scan data from be"); + public static final Option>> TABLE_LIST = + Options.key("table_list") + .type(new TypeReference>>() {}) + .noDefaultValue() + .withDescription("table list config"); + private int maxRetries = MAX_RETRIES.defaultValue(); private int requestTabletSize = QUERY_TABLET_SIZE.defaultValue(); private String scanFilter = SCAN_FILTER.defaultValue(); @@ -115,4 +126,5 @@ public SourceConfig(ReadonlyConfig config) { private int batchRows = SCAN_BATCH_ROWS.defaultValue(); private int connectTimeoutMs = SCAN_CONNECT_TIMEOUT.defaultValue(); private Map sourceOptionProps = new HashMap<>(); + private List tableConfigList = new ArrayList<>(); } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSourceTableConfig.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSourceTableConfig.java new file mode 100644 index 00000000000..e9b791e6213 --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSourceTableConfig.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.starrocks.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; + +import com.google.common.collect.Lists; +import lombok.Getter; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@Getter +public class StarRocksSourceTableConfig implements Serializable { + + private final String table; + + private final CatalogTable catalogTable; + + private final String scanFilter; + + private StarRocksSourceTableConfig( + String tableName, CatalogTable catalogTable, String scanFilter) { + this.table = tableName; + this.catalogTable = catalogTable; + this.scanFilter = scanFilter; + } + + public static StarRocksSourceTableConfig parseStarRocksSourceConfig(ReadonlyConfig config) { + + String table = config.get(CommonConfig.TABLE); + String dataBase = config.get(CommonConfig.DATABASE); + TablePath tablePath = TablePath.of(dataBase, table); + TableSchema tableSchema = new ReadonlyConfigParser().parse(config); + CatalogTable catalogTable = + CatalogTable.of( + TableIdentifier.of("", tablePath), + tableSchema, + new HashMap<>(), + new ArrayList<>(), + config.get(TableSchemaOptions.TableIdentifierOptions.COMMENT)); + + return new StarRocksSourceTableConfig( + table, catalogTable, config.get(SourceConfig.SCAN_FILTER)); + } + + public static List of(ReadonlyConfig config) { + + if (config.getOptional(SourceConfig.TABLE_LIST).isPresent()) { + List> maps = config.get(SourceConfig.TABLE_LIST); + return maps.stream() + .map(ReadonlyConfig::fromMap) + .map(StarRocksSourceTableConfig::parseStarRocksSourceConfig) + .collect(Collectors.toList()); + } + return Lists.newArrayList(parseStarRocksSourceConfig(config)); + } +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java index 1f6ffcd0769..07acfb18730 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSink.java @@ -33,6 +33,7 @@ import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalog; import org.apache.seatunnel.connectors.seatunnel.starrocks.catalog.StarRocksCatalogFactory; +import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; import java.util.Arrays; @@ -76,7 +77,7 @@ public Optional getSaveModeHandler() { catalogTable.getTableId().getTableName()); Catalog catalog = new StarRocksCatalog( - "StarRocks", + CommonConfig.CONNECTOR_IDENTITY, sinkConfig.getUsername(), sinkConfig.getPassword(), sinkConfig.getJdbcUrl(), diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java index d3664087312..aabda2affe6 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java @@ -25,7 +25,7 @@ import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.starrocks.client.StarRocksSinkManager; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig; @@ -72,7 +72,7 @@ public void write(SeaTunnelRow element) throws IOException { record = serializer.serialize(element); } catch (Exception e) { throw new StarRocksConnectorException( - CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, + CommonErrorCode.WRITER_OPERATION_FAILED, "serialize failed. Row={" + element + "}", e); } @@ -120,8 +120,7 @@ public void close() throws IOException { } } catch (IOException e) { log.error("Close starRocks manager failed.", e); - throw new StarRocksConnectorException( - CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, e); + throw new StarRocksConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, e); } } @@ -137,7 +136,7 @@ public StarRocksISerializer createSerializer( return new StarRocksJsonSerializer(seaTunnelRowType, sinkConfig.isEnableUpsertDelete()); } throw new StarRocksConnectorException( - CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, + CommonErrorCode.ILLEGAL_ARGUMENT, "Failed to create row serializer, unsupported `format` from stream load properties."); } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java index 211a8b96fca..a34616e56fd 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSource.java @@ -25,14 +25,14 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.CommonConfig; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig; +import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceTableConfig; -import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; public class StarRocksSource implements SeaTunnelSource { - private CatalogTable catalogTable; private SourceConfig sourceConfig; @Override @@ -40,9 +40,8 @@ public String getPluginName() { return CommonConfig.CONNECTOR_IDENTITY; } - public StarRocksSource(SourceConfig sourceConfig, CatalogTable catalogTable) { + public StarRocksSource(SourceConfig sourceConfig) { this.sourceConfig = sourceConfig; - this.catalogTable = catalogTable; } @Override @@ -52,13 +51,14 @@ public Boundedness getBoundedness() { @Override public List getProducedCatalogTables() { - return Collections.singletonList(catalogTable); + return sourceConfig.getTableConfigList().stream() + .map(StarRocksSourceTableConfig::getCatalogTable) + .collect(Collectors.toList()); } @Override public SourceReader createReader(SourceReader.Context readerContext) { - return new StarRocksSourceReader( - readerContext, catalogTable.getSeaTunnelRowType(), sourceConfig); + return new StarRocksSourceReader(readerContext, sourceConfig); } @Override @@ -67,15 +67,11 @@ public SourceSplitEnumerator restore StarRocksSourceState checkpointState) throws Exception { return new StartRocksSourceSplitEnumerator( - enumeratorContext, - sourceConfig, - catalogTable.getSeaTunnelRowType(), - checkpointState); + enumeratorContext, sourceConfig, checkpointState); } @Override public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) { - return new StartRocksSourceSplitEnumerator( - enumeratorContext, sourceConfig, catalogTable.getSeaTunnelRowType()); + return new StartRocksSourceSplitEnumerator(enumeratorContext, sourceConfig); } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java index 1f5e3c16905..50fcef90970 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceFactory.java @@ -21,8 +21,6 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SourceSplit; -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import org.apache.seatunnel.api.table.connector.TableSource; import org.apache.seatunnel.api.table.factory.Factory; @@ -49,10 +47,9 @@ public OptionRule optionRule() { SourceConfig.NODE_URLS, SourceConfig.USERNAME, SourceConfig.PASSWORD, - SourceConfig.DATABASE, - SourceConfig.TABLE, - TableSchemaOptions.SCHEMA) + SourceConfig.DATABASE) .optional( + TableSchemaOptions.SCHEMA, SourceConfig.MAX_RETRIES, SourceConfig.QUERY_TABLET_SIZE, SourceConfig.SCAN_FILTER, @@ -61,6 +58,7 @@ public OptionRule optionRule() { SourceConfig.SCAN_KEEP_ALIVE_MIN, SourceConfig.SCAN_BATCH_ROWS, SourceConfig.SCAN_CONNECT_TIMEOUT) + .exclusive(SourceConfig.TABLE, SourceConfig.TABLE_LIST) .build(); } @@ -74,9 +72,7 @@ public Class getSourceClass() { TableSource createSource(TableSourceFactoryContext context) { ReadonlyConfig config = context.getOptions(); SourceConfig starRocksSourceConfig = new SourceConfig(config); - CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(config); return () -> - (SeaTunnelSource) - new StarRocksSource(starRocksSourceConfig, catalogTable); + (SeaTunnelSource) new StarRocksSource(starRocksSourceConfig); } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.java index 7f68d4e3218..d79c8f237d4 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.java @@ -43,18 +43,25 @@ public class StarRocksSourceReader implements SourceReader pendingSplits; private final SourceReader.Context context; private final SourceConfig sourceConfig; - private final SeaTunnelRowType seaTunnelRowType; private Map clientsPools; private volatile boolean noMoreSplitsAssignment; + private final Map tables; - public StarRocksSourceReader( - SourceReader.Context readerContext, - SeaTunnelRowType seaTunnelRowType, - SourceConfig sourceConfig) { + public StarRocksSourceReader(SourceReader.Context readerContext, SourceConfig sourceConfig) { this.pendingSplits = new LinkedList<>(); this.context = readerContext; this.sourceConfig = sourceConfig; - this.seaTunnelRowType = seaTunnelRowType; + Map tables = new HashMap<>(); + sourceConfig + .getTableConfigList() + .forEach( + starRocksSourceTableConfig -> + tables.put( + starRocksSourceTableConfig.getTable(), + starRocksSourceTableConfig + .getCatalogTable() + .getSeaTunnelRowType())); + this.tables = tables; } @Override @@ -103,9 +110,11 @@ private void read(StarRocksSourceSplit split, Collector output) { clientsPools.put(beAddress, client); } // open scanner to be + SeaTunnelRowType seaTunnelRowType = tables.get(partition.getTable()); client.openScanner(partition, seaTunnelRowType); while (client.hasNext()) { SeaTunnelRow seaTunnelRow = client.getNext(); + seaTunnelRow.setTableId(partition.getTable()); output.collect(seaTunnelRow); } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceState.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceState.java index a22ecd15276..78f84e2c858 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceState.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceState.java @@ -31,4 +31,5 @@ public class StarRocksSourceState implements Serializable { private boolean shouldEnumerate; private Map> pendingSplit; + private List pendingTables; } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StartRocksSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StartRocksSourceSplitEnumerator.java index befa9c5608e..2c2bd295514 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StartRocksSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StartRocksSourceSplitEnumerator.java @@ -18,11 +18,11 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.source; import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.StarRocksQueryPlanReadClient; import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition; import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig; +import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceTableConfig; import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException; import lombok.extern.slf4j.Slf4j; @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.stream.Collectors; @Slf4j @@ -46,29 +47,32 @@ public class StartRocksSourceSplitEnumerator private final Object stateLock = new Object(); private volatile boolean shouldEnumerate; private final Context context; + private final ConcurrentLinkedQueue pendingTables; public StartRocksSourceSplitEnumerator( SourceSplitEnumerator.Context context, - SourceConfig sourceConfig, - SeaTunnelRowType seaTunnelRowType) { - this(context, sourceConfig, seaTunnelRowType, null); + SourceConfig sourceConfig) { + this(context, sourceConfig, null); } public StartRocksSourceSplitEnumerator( SourceSplitEnumerator.Context context, SourceConfig sourceConfig, - SeaTunnelRowType seaTunnelRowType, StarRocksSourceState sourceState) { this.sourceConfig = sourceConfig; - this.starRocksQueryPlanReadClient = - new StarRocksQueryPlanReadClient(sourceConfig, seaTunnelRowType); - + this.starRocksQueryPlanReadClient = new StarRocksQueryPlanReadClient(sourceConfig); this.context = context; + List tables = + sourceConfig.getTableConfigList().stream() + .map(StarRocksSourceTableConfig::getTable) + .collect(Collectors.toList()); this.pendingSplit = new HashMap<>(); + this.pendingTables = new ConcurrentLinkedQueue<>(tables); this.shouldEnumerate = sourceState == null; if (sourceState != null) { this.shouldEnumerate = sourceState.isShouldEnumerate(); this.pendingSplit.putAll(sourceState.getPendingSplit()); + this.pendingTables.addAll(sourceState.getPendingTables()); } } @@ -76,14 +80,18 @@ public StartRocksSourceSplitEnumerator( public void run() { Set readers = context.registeredReaders(); if (shouldEnumerate) { - List newSplits = getStarRocksSourceSplit(); + while (!pendingTables.isEmpty()) { + String table = pendingTables.poll(); + log.info("Splitting table {}.", table); + List newSplits = getStarRocksSourceSplit(table); + log.info("Split table {} into {} splits.", table, newSplits.size()); + synchronized (stateLock) { + addPendingSplit(newSplits); + shouldEnumerate = false; + } - synchronized (stateLock) { - addPendingSplit(newSplits); - shouldEnumerate = false; + assignSplit(readers); } - - assignSplit(readers); } log.debug( @@ -116,7 +124,8 @@ public void registerReader(int subtaskId) { @Override public StarRocksSourceState snapshotState(long checkpointId) { synchronized (stateLock) { - return new StarRocksSourceState(shouldEnumerate, pendingSplit); + return new StarRocksSourceState( + shouldEnumerate, pendingSplit, new ArrayList<>(pendingTables)); } } @@ -138,7 +147,7 @@ public void close() { @Override public void handleSplitRequest(int subtaskId) { throw new StarRocksConnectorException( - CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + CommonErrorCode.UNSUPPORTED_OPERATION, String.format("Unsupported handleSplitRequest: %d", subtaskId)); } @@ -159,11 +168,9 @@ private void assignSplit(Collection readers) { if (assignmentForReader != null && !assignmentForReader.isEmpty()) { log.info( "Assign splits {} to reader {}", - String.join( - ",", - assignmentForReader.stream() - .map(p -> p.getSplitId()) - .collect(Collectors.toList())), + assignmentForReader.stream() + .map(StarRocksSourceSplit::getSplitId) + .collect(Collectors.joining(",")), reader); try { context.assignSplit(reader, assignmentForReader); @@ -179,13 +186,12 @@ private void assignSplit(Collection readers) { } } - List getStarRocksSourceSplit() { + List getStarRocksSourceSplit(String table) { List sourceSplits = new ArrayList<>(); - List partitions = starRocksQueryPlanReadClient.findPartitions(); - for (int i = 0; i < partitions.size(); i++) { + List partitions = starRocksQueryPlanReadClient.findPartitions(table); + for (QueryPartition partition : partitions) { sourceSplits.add( - new StarRocksSourceSplit( - partitions.get(i), String.valueOf(partitions.get(i).hashCode()))); + new StarRocksSourceSplit(partition, String.valueOf(partition.hashCode()))); } return sourceSplits; } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java index a575da7862a..46280a52b4c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java @@ -27,7 +27,9 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.junit.jupiter.api.AfterAll; @@ -76,6 +78,7 @@ public class StarRocksIT extends TestSuiteBase implements TestResource { private static final String DATABASE = "test"; private static final String URL = "jdbc:mysql://%s:" + SR_PORT; private static final String SOURCE_TABLE = "e2e_table_source"; + private static final String SOURCE_TABLE_3 = "e2e_table_source_3"; private static final String SINK_TABLE = "e2e_table_sink"; private static final String SR_DRIVER_JAR = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar"; @@ -111,6 +114,35 @@ public class StarRocksIT extends TestSuiteBase implements TestResource { + "\"storage_format\" = \"DEFAULT\"" + ")"; + private static final String DDL_SOURCE_3 = + "create table " + + DATABASE + + "." + + SOURCE_TABLE_3 + + " (\n" + + " BIGINT_COL BIGINT,\n" + + " LARGEINT_COL LARGEINT,\n" + + " SMALLINT_COL SMALLINT,\n" + + " TINYINT_COL TINYINT,\n" + + " BOOLEAN_COL BOOLEAN,\n" + + " DECIMAL_COL Decimal(12, 1),\n" + + " DOUBLE_COL DOUBLE,\n" + + " FLOAT_COL FLOAT,\n" + + " INT_COL INT,\n" + + " CHAR_COL CHAR,\n" + + " VARCHAR_11_COL VARCHAR(11),\n" + + " STRING_COL STRING,\n" + + " DATETIME_COL DATETIME,\n" + + " DATE_COL DATE\n" + + ")ENGINE=OLAP\n" + + "DUPLICATE KEY(`BIGINT_COL`)\n" + + "DISTRIBUTED BY HASH(`BIGINT_COL`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\",\n" + + "\"in_memory\" = \"false\"," + + "\"storage_format\" = \"DEFAULT\"" + + ")"; + private static final String DDL_FAKE_SINK_TABLE = "create table " + DATABASE @@ -160,6 +192,30 @@ public class StarRocksIT extends TestSuiteBase implements TestResource { + "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" + ")"; + private static final String INIT_DATA_SQL_3 = + "insert into " + + DATABASE + + "." + + SOURCE_TABLE_3 + + " (\n" + + " BIGINT_COL,\n" + + " LARGEINT_COL,\n" + + " SMALLINT_COL,\n" + + " TINYINT_COL,\n" + + " BOOLEAN_COL,\n" + + " DECIMAL_COL,\n" + + " DOUBLE_COL,\n" + + " FLOAT_COL,\n" + + " INT_COL,\n" + + " CHAR_COL,\n" + + " VARCHAR_11_COL,\n" + + " STRING_COL,\n" + + " DATETIME_COL,\n" + + " DATE_COL\n" + + ")values(\n" + + "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" + + ")"; + private Connection jdbcConnection; private GenericContainer starRocksServer; private static final List TEST_DATASET = generateTestDataSet(); @@ -194,7 +250,7 @@ public void startUp() throws Exception { .atMost(360, TimeUnit.SECONDS) .untilAsserted(this::initializeJdbcConnection); initializeJdbcTable(); - batchInsertData(); + batchInsertData(INIT_DATA_SQL); } private static List generateTestDataSet() { @@ -306,6 +362,7 @@ private void initializeJdbcTable() { statement.execute("create database test"); // create source table statement.execute(DDL_SOURCE); + statement.execute(DDL_SOURCE_3); // create sink table statement.execute(DDL_FAKE_SINK_TABLE); } catch (SQLException e) { @@ -313,12 +370,11 @@ private void initializeJdbcTable() { } } - private void batchInsertData() { + private void batchInsertData(String initSQL) { List rows = TEST_DATASET; try { jdbcConnection.setAutoCommit(false); - try (PreparedStatement preparedStatement = - jdbcConnection.prepareStatement(INIT_DATA_SQL)) { + try (PreparedStatement preparedStatement = jdbcConnection.prepareStatement(initSQL)) { for (int i = 0; i < rows.size(); i++) { for (int index = 0; index < rows.get(i).getFields().length; index++) { preparedStatement.setObject(index + 1, rows.get(i).getFields()[index]); @@ -352,6 +408,14 @@ private void clearSinkTable() { } } + private void deleteTable(String tableName) { + try (Statement statement = jdbcConnection.createStatement()) { + statement.execute(String.format("TRUNCATE TABLE %s.%s", DATABASE, tableName)); + } catch (SQLException e) { + throw new RuntimeException("test starrocks server image error", e); + } + } + @Test public void testCatalog() { TablePath tablePathStarRocksSource = TablePath.of("test", "e2e_table_source"); @@ -411,4 +475,19 @@ public void testCatalog() { Assertions.assertFalse(starRocksCatalog.tableExists(tablePathStarRocksSink)); starRocksCatalog.close(); } + + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently SPARK/FLINK do not support multiple table read") + @TestTemplate + public void testStarRocksMultipleRead(TestContainer container) + throws IOException, InterruptedException { + batchInsertData(INIT_DATA_SQL_3); + assertHasData(SOURCE_TABLE_3); + Container.ExecResult execResult = + container.executeJob("/starrocks-to-assert-with-multipletable.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + deleteTable(SOURCE_TABLE_3); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-to-assert-with-multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-to-assert-with-multipletable.conf new file mode 100644 index 00000000000..6756d108dd8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-to-assert-with-multipletable.conf @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + StarRocks { + nodeUrls = ["starrocks_e2e:8030"] + username = root + password = "" + database = "test" + table_list = [ + { + table = "e2e_table_source" + schema = { + fields { + BIGINT_COL = BIGINT + LARGEINT_COL = STRING + SMALLINT_COL = SMALLINT + TINYINT_COL = TINYINT + BOOLEAN_COL = BOOLEAN + DECIMAL_COL = "DECIMAL(20, 1)" + DOUBLE_COL = DOUBLE + FLOAT_COL = FLOAT + INT_COL = INT + CHAR_COL = STRING + VARCHAR_11_COL = STRING + STRING_COL = STRING + DATETIME_COL = TIMESTAMP + DATE_COL = DATE + } + } + scan_filter = "" + }, + { + table = "e2e_table_source_3" + schema = { + fields { + BIGINT_COL = BIGINT + LARGEINT_COL = STRING + SMALLINT_COL = SMALLINT + TINYINT_COL = TINYINT + BOOLEAN_COL = BOOLEAN + DECIMAL_COL = "DECIMAL(20, 1)" + DOUBLE_COL = DOUBLE + FLOAT_COL = FLOAT + INT_COL = INT + CHAR_COL = STRING + VARCHAR_11_COL = STRING + STRING_COL = STRING + DATETIME_COL = TIMESTAMP + DATE_COL = DATE + } + } + scan_filter = "" + } + ] + max_retries = 3 + scan.params.scanner_thread_pool_thread_num = "3" + result_table_name = "starrocks" + } +} + +transform { +} + +sink { + Assert { + rules { + table-names = ["e2e_table_source"] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java index 10d5685c6d2..cbfe8d4578e 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java @@ -107,6 +107,19 @@ protected Container.ExecResult executeJob( throws IOException, InterruptedException { final String confInContainerPath = copyConfigFileToContainer(container, confFile); // copy connectors + LOG.info( + "Container[{}] Server SR Log: \n" + + "\n==================== Server SR Log====================\n" + + "{}\n" + + "ModulePath: {}\n" + + "namePrefix: {}\n" + + "connectorType: {}\n" + + "\n==================== Server SR Log end ====================", + container.getDockerImageName(), + confFile, + getConnectorModulePath(), + getConnectorNamePrefix(), + getConnectorType()); copyConnectorJarToContainer( container, confFile, diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java index b8ea529eb99..5921f3673f7 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java @@ -228,6 +228,7 @@ public static String adaptPathForWin(String path) { public static List getConnectorFiles( File currentModule, Set connectorNames, String connectorPrefix) { List connectorFiles = new ArrayList<>(); + for (File file : Objects.requireNonNull(currentModule.listFiles())) { getConnectorFiles(file, connectorNames, connectorPrefix, connectorFiles); } @@ -239,12 +240,22 @@ private static void getConnectorFiles( Set connectorNames, String connectorPrefix, List connectors) { + + log.info("getConnectorFiles-connectorNames.size(): {}. \n ", connectorNames.size()); + log.info("getConnectorFiles-connectors.size(): {}. \n ", connectors.size()); if (currentModule.isFile() || connectorNames.size() == connectors.size()) { return; } if (connectorNames.contains(currentModule.getName())) { + log.error( + "getConnectorFiles-currentModule.getName()(): {}. \n ", + currentModule.getName()); File targetPath = new File(currentModule.getAbsolutePath() + File.separator + "target"); - for (File file : Objects.requireNonNull(targetPath.listFiles())) { + log.info("getConnectorFiles-targetPath: {}. \n ", targetPath); + File[] files = targetPath.listFiles(); + for (File file : Objects.requireNonNull(files)) { + log.info("getConnectorFiles-file: {}. \n ", file); + log.info("getConnectorFiles-file.getName(): {}. \n ", file.getName()); if (file.getName().startsWith(currentModule.getName()) && !file.getName().endsWith("javadoc.jar") && !file.getName().endsWith("tests.jar")) {