Skip to content

Commit

Permalink
Merge branch 'dev' into sr-multi-source
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed May 10, 2024
2 parents 30b55b8 + 52d1020 commit b11ce3f
Show file tree
Hide file tree
Showing 45 changed files with 712 additions and 163 deletions.
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Default template:

```sql
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (
${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
UNIQUE KEY (${rowtype_primary_key})
Expand Down
59 changes: 58 additions & 1 deletion docs/en/seatunnel-engine/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,64 @@ network:
### Return details of a job.

<details>
<summary><code>GET</code> <code><b>/hazelcast/rest/maps/running-job/:jobId</b></code> <code>(Return details of a job.)</code></summary>
<summary><code>GET</code> <code><b>/hazelcast/rest/maps/job-info/:jobId</b></code> <code>(Return details of a job. )</code></summary>

#### Parameters

> | name | type | data type | description |
> |-------|----------|-----------|-------------|
> | jobId | required | long | job id |

#### Responses

```json
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"createTime": "",
"jobDag": {
"vertices": [
],
"edges": [
]
},
"metrics": {
"sourceReceivedCount": "",
"sinkWriteCount": ""
},
"finishedTime": "",
"errorMsg": null,
"envOptions": {
},
"pluginJarsUrls": [
],
"isStartWithSavePoint": false
}
```

`jobId`, `jobName`, `jobStatus`, `createTime`, `jobDag`, `metrics` always be returned.
`envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` will return when job is running.
`finishedTime`, `errorMsg` will return when job is finished.

When we can't get the job info, the response will be:

```json
{
"jobId" : ""
}
```

</details>

------------------------------------------------------------------------------------------

### Return details of a job.

This API has been deprecated, please use /hazelcast/rest/maps/job-info/:jobId instead

<details>
<summary><code>GET</code> <code><b>/hazelcast/rest/maps/running-job/:jobId</b></code> <code>(Return details of a job. )</code></summary>

#### Parameters

Expand Down
1 change: 1 addition & 0 deletions docs/zh/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Doris Sink连接器的内部实现是通过stream load批量缓存和导入的

```sql
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (
${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
UNIQUE KEY (${rowtype_primary_key})
Expand Down
57 changes: 57 additions & 0 deletions docs/zh/seatunnel-engine/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,63 @@ network:

### 返回作业的详细信息。

<details>
<summary><code>GET</code> <code><b>/hazelcast/rest/maps/job-info/:jobId</b></code> <code>(返回作业的详细信息。)</code></summary>

#### 参数

> | name | type | data type | description |
> |-------|----------|-----------|-------------|
> | jobId | required | long | job id |

#### 响应

```json
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"createTime": "",
"jobDag": {
"vertices": [
],
"edges": [
]
},
"metrics": {
"sourceReceivedCount": "",
"sinkWriteCount": ""
},
"finishedTime": "",
"errorMsg": null,
"envOptions": {
},
"pluginJarsUrls": [
],
"isStartWithSavePoint": false
}
```

`jobId`, `jobName`, `jobStatus`, `createTime`, `jobDag`, `metrics` 字段总会返回.
`envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` 字段在Job在RUNNING状态时会返回
`finishedTime`, `errorMsg` 字段在Job结束时会返回,结束状态为不为RUNNING,可能为FINISHED,可能为CANCEL

当我们查询不到这个Job时,返回结果为:

```json
{
"jobId" : ""
}
```

</details>

------------------------------------------------------------------------------------------

### 返回作业的详细信息

此API已经弃用,请使用/hazelcast/rest/maps/job-info/:jobId替代。

<details>
<summary><code>GET</code> <code><b>/hazelcast/rest/maps/running-job/:jobId</b></code> <code>(返回作业的详细信息。)</code></summary>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
private List<ChunkRange> splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException {
final String splitColumnName = splitColumn.name();
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName);
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
Expand Down Expand Up @@ -177,28 +177,26 @@ private List<ChunkRange> splitTableIntoChunks(
tableId,
inverseSamplingRate);
Object[] sample =
sampleDataFromColumn(
jdbc, tableId, splitColumnName, inverseSamplingRate);
sampleDataFromColumn(jdbc, tableId, splitColumn, inverseSamplingRate);
log.info(
"Sample data from table {} end, the sample size is {}",
tableId,
sample.length);
return efficientShardingThroughSampling(
tableId, sample, approximateRowCnt, shardCount);
}
return splitUnevenlySizedChunks(
jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize);
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}

/** Split table into unevenly sized chunks by continuously calculating next chunk max value. */
protected List<ChunkRange> splitUnevenlySizedChunks(
JdbcConnection jdbc,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object min,
Object max,
int chunkSize)
Expand All @@ -207,15 +205,15 @@ protected List<ChunkRange> splitUnevenlySizedChunks(
"Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize);
final List<ChunkRange> splits = new ArrayList<>();
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize);
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && ObjectCompare(chunkEnd, max) <= 0) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on MySQL server
maySleep(count++, tableId);
chunkStart = chunkEnd;
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize);
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
Expand All @@ -226,17 +224,17 @@ protected Object nextChunkEnd(
JdbcConnection jdbc,
Object previousChunkEnd,
TableId tableId,
String splitColumnName,
Column splitColumn,
Object max,
int chunkSize)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd);
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (ObjectCompare(chunkEnd, max) >= 0) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;

import java.sql.SQLException;
Expand All @@ -35,16 +36,29 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter {
@Override
Collection<SnapshotSplit> generateSplits(TableId tableId);

/** @deprecated instead by {@link this#queryMinMax(JdbcConnection, TableId, Column)} */
@Deprecated
Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
throws SQLException;

/**
* Query the maximum and minimum value of the column in the table. e.g. query string <code>
* SELECT MIN(%s) FROM %s WHERE %s > ?</code>
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param columnName column name.
* @param column column.
* @return maximum and minimum value.
*/
Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
default Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column)
throws SQLException {
return queryMinMax(jdbc, tableId, column.name());
}

/** @deprecated instead by {@link this#queryMin(JdbcConnection, TableId, Column, Object)} */
@Deprecated
Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
throws SQLException;

/**
Expand All @@ -54,12 +68,19 @@ Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param columnName column name.
* @param column column.
* @param excludedLowerBound the minimum value should be greater than this value.
* @return minimum value.
*/
Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
default Object queryMin(
JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound)
throws SQLException {
return queryMin(jdbc, tableId, column.name(), excludedLowerBound);
}

@Deprecated
Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate)
throws SQLException;

/**
Expand All @@ -68,14 +89,29 @@ Object queryMin(
*
* @param jdbc The JDBC connection object used to connect to the database.
* @param tableId The ID of the table in which the column resides.
* @param columnName The name of the column to be sampled.
* @param column The column to be sampled.
* @param samplingRate samplingRate The inverse of the fraction of the data to be sampled from
* the column. For example, a value of 1000 would mean 1/1000 of the data will be sampled.
* @return Returns a List of sampled data from the specified column.
* @throws SQLException If an SQL error occurs during the sampling operation.
*/
Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate)
default Object[] sampleDataFromColumn(
JdbcConnection jdbc, TableId tableId, Column column, int samplingRate)
throws SQLException {
return sampleDataFromColumn(jdbc, tableId, column.name(), samplingRate);
}

/**
* @deprecated instead by {@link this#queryNextChunkMax(JdbcConnection, TableId, Column, int,
* Object)}
*/
@Deprecated
Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
int chunkSize,
Object includedLowerBound)
throws SQLException;

/**
Expand All @@ -85,18 +121,20 @@ Object[] sampleDataFromColumn(
*
* @param jdbc JDBC connection.
* @param tableId table identity.
* @param columnName column name.
* @param column column.
* @param chunkSize chunk size.
* @param includedLowerBound the previous chunk end value.
* @return next chunk end value.
*/
Object queryNextChunkMax(
default Object queryNextChunkMax(
JdbcConnection jdbc,
TableId tableId,
String columnName,
Column column,
int chunkSize,
Object includedLowerBound)
throws SQLException;
throws SQLException {
return queryNextChunkMax(jdbc, tableId, column.name(), chunkSize, includedLowerBound);
}

/**
* Approximate total number of entries in the lookup table.
Expand All @@ -110,17 +148,14 @@ Object queryNextChunkMax(
/**
* Build the scan query sql of the {@link SnapshotSplit}.
*
* @param tableId table identity.
* @param table table.
* @param splitKeyType primary key type.
* @param isFirstSplit whether the first split.
* @param isLastSplit whether the last split.
* @return query sql.
*/
String buildSplitScanQuery(
TableId tableId,
SeaTunnelRowType splitKeyType,
boolean isFirstSplit,
boolean isLastSplit);
Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, boolean isLastSplit);

/**
* Checks whether split column is evenly distributed across its range.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)

@Override
public String buildSplitScanQuery(
TableId tableId,
Table table,
SeaTunnelRowType splitKeyType,
boolean isFirstSplit,
boolean isLastSplit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;

import java.sql.SQLException;
Expand Down Expand Up @@ -217,7 +218,7 @@ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)

@Override
public String buildSplitScanQuery(
TableId tableId,
Table table,
SeaTunnelRowType splitKeyType,
boolean isFirstSplit,
boolean isLastSplit) {
Expand Down
Loading

0 comments on commit b11ce3f

Please sign in to comment.