Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed May 9, 2024
1 parent c872490 commit 30b55b8
Showing 1 changed file with 26 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPlan;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSourceTableConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;

Expand All @@ -38,46 +39,43 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

@Slf4j
public class StarRocksQueryPlanReadClient {
private RetryUtils.RetryMaterial retryMaterial;
private SourceConfig sourceConfig;
private SeaTunnelRowType seaTunnelRowType;
private final HttpHelper httpHelper = new HttpHelper();
private final Map<String, StarRocksSourceTableConfig> tables;

private static final long DEFAULT_SLEEP_TIME_MS = 1000L;

public StarRocksQueryPlanReadClient(
SourceConfig sourceConfig, SeaTunnelRowType seaTunnelRowType) {
public StarRocksQueryPlanReadClient(SourceConfig sourceConfig) {
this.sourceConfig = sourceConfig;
this.seaTunnelRowType = seaTunnelRowType;
this.retryMaterial =
new RetryUtils.RetryMaterial(
sourceConfig.getMaxRetries(),
true,
exception -> true,
DEFAULT_SLEEP_TIME_MS);
this.tables =
sourceConfig.getTableConfigList().stream()
.collect(
Collectors.toMap(
StarRocksSourceTableConfig::getTable, Function.identity()));
}

public List<QueryPartition> findPartitions() {
List<String> nodeUrls = sourceConfig.getNodeUrls();
QueryPlan queryPlan = getQueryPlan(genQuerySql(), nodeUrls);
public List<QueryPartition> findPartitions(String table) {
QueryPlan queryPlan = getQueryPlan(genQuerySql(table), table);
Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan);
return tabletsMapToPartition(
be2Tablets,
queryPlan.getQueryPlan(),
sourceConfig.getDatabase(),
sourceConfig.getTable());
return tabletsMapToPartition(be2Tablets, queryPlan.getQueryPlan(), table);
}

private List<QueryPartition> tabletsMapToPartition(
Map<String, List<Long>> be2Tablets,
String opaquedQueryPlan,
String database,
String table)
Map<String, List<Long>> be2Tablets, String opaquedQueryPlan, String table)
throws IllegalArgumentException {
int tabletsSize = sourceConfig.getRequestTabletSize();
List<QueryPartition> partitions = new ArrayList<>();
Expand All @@ -99,7 +97,7 @@ private List<QueryPartition> tabletsMapToPartition(
first = first + tabletsSize;
QueryPartition partitionDefinition =
new QueryPartition(
database,
sourceConfig.getDatabase(),
table,
beInfo.getKey(),
partitionTablets,
Expand Down Expand Up @@ -135,8 +133,9 @@ private Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan) {
return beXTablets;
}

private QueryPlan getQueryPlan(String querySQL, List<String> nodeUrls) {
private QueryPlan getQueryPlan(String querySQL, String table) {

List<String> nodeUrls = sourceConfig.getNodeUrls();
Map<String, Object> bodyMap = new HashMap<>();
bodyMap.put("sql", querySQL);
String body = JsonUtils.toJsonString(bodyMap);
Expand All @@ -148,7 +147,7 @@ private QueryPlan getQueryPlan(String querySQL, List<String> nodeUrls) {
.append("/api/")
.append(sourceConfig.getDatabase())
.append("/")
.append(sourceConfig.getTable())
.append(table)
.append("/_query_plan")
.toString();
try {
Expand Down Expand Up @@ -184,15 +183,17 @@ private Map<String, String> getQueryPlanHttpHeader() {
return headerMap;
}

private String genQuerySql() {
private String genQuerySql(String table) {

StarRocksSourceTableConfig starRocksSourceTableConfig = tables.get(table);
SeaTunnelRowType seaTunnelRowType =
starRocksSourceTableConfig.getCatalogTable().getSeaTunnelRowType();
String columns =
seaTunnelRowType.getFieldNames().length != 0
? String.join(",", seaTunnelRowType.getFieldNames())
: "*";
String filter =
sourceConfig.getScanFilter().isEmpty()
? ""
: " where " + sourceConfig.getScanFilter();
String scanFilter = starRocksSourceTableConfig.getScanFilter();
String filter = scanFilter.isEmpty() ? "" : " where " + scanFilter;

String sql =
"select "
Expand All @@ -203,7 +204,7 @@ private String genQuerySql() {
+ "`"
+ "."
+ "`"
+ sourceConfig.getTable()
+ table
+ "`"
+ filter;
log.debug("Generate query sql '{}'.", sql);
Expand Down

0 comments on commit 30b55b8

Please sign in to comment.