From d403b3cdc6f5d8acf7b3014a7e41a5d71d2bd6d9 Mon Sep 17 00:00:00 2001 From: mozhenghua Date: Mon, 8 Aug 2022 20:26:55 +0800 Subject: [PATCH] fix bug issue: https://github.com/qlangtech/chunjun/issues/2 https://github.com/qlangtech/chunjun/issues/3 --- .../source/ClickhouseInputFormat.java | 6 +- .../source/ElasticsearchInputFormat.java | 4 + .../jdbc/converter/JdbcColumnConverter.java | 28 +++- .../jdbc/source/JdbcInputFormat.java | 21 ++- .../DistributedJdbcInputFormat.java | 6 +- .../source/PostgresqlInputFormat.java | 129 ++++++++++-------- .../source/format/BaseRichInputFormat.java | 2 +- deploy.sh | 2 +- 8 files changed, 125 insertions(+), 73 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-clickhouse/src/main/java/com/dtstack/chunjun/connector/clickhouse/source/ClickhouseInputFormat.java b/chunjun-connectors/chunjun-connector-clickhouse/src/main/java/com/dtstack/chunjun/connector/clickhouse/source/ClickhouseInputFormat.java index 432c2624ad..588ba1e80d 100644 --- a/chunjun-connectors/chunjun-connector-clickhouse/src/main/java/com/dtstack/chunjun/connector/clickhouse/source/ClickhouseInputFormat.java +++ b/chunjun-connectors/chunjun-connector-clickhouse/src/main/java/com/dtstack/chunjun/connector/clickhouse/source/ClickhouseInputFormat.java @@ -30,18 +30,22 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import java.lang.invoke.MethodHandles; import java.sql.Connection; import java.sql.SQLException; import java.util.LinkedList; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * @program chunjun * @author: xiuzhu * @create: 2021/05/10 */ public class ClickhouseInputFormat extends JdbcInputFormat { - + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @Override public void openInternal(InputSplit inputSplit) { JdbcInputSplit jdbcInputSplit = (JdbcInputSplit) inputSplit; diff --git a/chunjun-connectors/chunjun-connector-elasticsearch7/src/main/java/com/dtstack/chunjun/connector/elasticsearch7/source/ElasticsearchInputFormat.java b/chunjun-connectors/chunjun-connector-elasticsearch7/src/main/java/com/dtstack/chunjun/connector/elasticsearch7/source/ElasticsearchInputFormat.java index e4f92b8c3b..d83fff623a 100644 --- a/chunjun-connectors/chunjun-connector-elasticsearch7/src/main/java/com/dtstack/chunjun/connector/elasticsearch7/source/ElasticsearchInputFormat.java +++ b/chunjun-connectors/chunjun-connector-elasticsearch7/src/main/java/com/dtstack/chunjun/connector/elasticsearch7/source/ElasticsearchInputFormat.java @@ -44,8 +44,11 @@ import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.slice.SliceBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.lang.invoke.MethodHandles; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -57,6 +60,7 @@ * @create: 2021/06/27 17:25 */ public class ElasticsearchInputFormat extends BaseRichInputFormat { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); protected long keepAlive = 1; /** Elasticsearch Configuration */ private ElasticsearchConf elasticsearchConf; diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcColumnConverter.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcColumnConverter.java index a99184573b..bd2d1e7504 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/converter/JdbcColumnConverter.java @@ -35,6 +35,7 @@ import com.dtstack.chunjun.element.column.TimeColumn; import com.dtstack.chunjun.element.column.TimestampColumn; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; @@ -58,6 +59,7 @@ public class JdbcColumnConverter extends AbstractRowConverter< ResultSet, JsonArray, FieldNamedPreparedStatement, LogicalType> { + public JdbcColumnConverter(RowType rowType) { this(rowType, null); } @@ -133,7 +135,14 @@ public RowData toInternal(ResultSet resultSet) throws Exception { public FieldNamedPreparedStatement toExternal( RowData rowData, FieldNamedPreparedStatement statement) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters.get(index).serialize(rowData, index, statement); + try { + toExternalConverters.get(index).serialize(rowData, index, statement); + } catch (Throwable e) { + if (rowData instanceof GenericRowData) { + throw new IllegalStateException("index:" + index + " val:" + ((GenericRowData) rowData).getField(index), e); + } + throw e; + } } return statement; } @@ -208,7 +217,13 @@ protected ISerializationConverter createExternalCon }; case TINYINT: return (val, index, statement) -> statement.setByte(index, val.getByte(index)); - case SMALLINT: + case SMALLINT: { + return (val, index, statement) -> { + short a = 0; + a = val.getShort(index); + statement.setShort(index, a); + }; + } case INTEGER: case INTERVAL_YEAR_MONTH: return (val, index, statement) -> { @@ -222,8 +237,8 @@ protected ISerializationConverter createExternalCon statement.setInt(index, a); }; case FLOAT: - return (val, index, statement) ->{ - // statement.setFloat(index, ((ColumnRowData) val).getField(index).asFloat()); + return (val, index, statement) -> { + // statement.setFloat(index, ((ColumnRowData) val).getField(index).asFloat()); statement.setFloat(index, val.getFloat(index)); }; @@ -266,7 +281,10 @@ protected ISerializationConverter createExternalCon // val.getTimestamp(index, -1).toLocalDateTime(); // statement.setTime(index, ((ColumnRowData) val).getField(index).asTime()); // val.get - throw new UnsupportedOperationException("index:" + index + ",val:" + val.toString()); + // throw new UnsupportedOperationException("index:" + index + ",val:" + val.toString()); +// java.sql.Time time = + + statement.setTime(index, new Time(val.getInt(index))); }; case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java index a6c5842983..26ce9a9456 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/JdbcInputFormat.java @@ -44,6 +44,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import java.lang.invoke.MethodHandles; import java.math.BigDecimal; import java.math.BigInteger; import java.sql.Connection; @@ -64,6 +65,9 @@ import static com.dtstack.chunjun.enums.ColumnType.TIMESTAMPTZ; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * InputFormat for reading data from a database and generate Rows. * @@ -72,7 +76,7 @@ * @author huyifan.zju@163.com */ public class JdbcInputFormat extends BaseRichInputFormat { - + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final long serialVersionUID = 1L; protected static final int resultSetConcurrency = ResultSet.CONCUR_READ_ONLY; protected static int resultSetType = ResultSet.TYPE_FORWARD_ONLY; @@ -454,7 +458,7 @@ private String getMaxValueFromDb() { LOG.info(String.format("Query max value sql is '%s'", queryMaxValueSql)); conn = getConnection(); - st = conn.createStatement(resultSetType, resultSetConcurrency); + st = conn.createStatement(getResultSetType(), resultSetConcurrency); st.setQueryTimeout(jdbcConf.getQueryTimeOut()); rs = st.executeQuery(queryMaxValueSql); if (rs.next()) { @@ -502,7 +506,7 @@ private Pair getSplitRangeFromDb() { LOG.info(String.format("Query SplitRange sql is '%s'", querySplitRangeSql)); conn = getConnection(); - st = conn.createStatement(resultSetType, resultSetConcurrency); + st = conn.createStatement(getResultSetType(), resultSetConcurrency); st.setQueryTimeout(jdbcConf.getQueryTimeOut()); rs = st.executeQuery(querySplitRangeSql); if (rs.next()) { @@ -887,10 +891,19 @@ protected Pair, List> getTableMetaData() { } /** init prepareStatement */ public void initPrepareStatement(String querySql) throws SQLException { - ps = dbConn.prepareStatement(querySql, resultSetType, resultSetConcurrency); + ps = dbConn.prepareStatement(querySql, getResultSetType(), resultSetConcurrency); ps.setFetchSize(jdbcConf.getFetchSize()); ps.setQueryTimeout(jdbcConf.getQueryTimeOut()); } + + /** + * The constant indicating the type for a ResultSet object + * @return + */ + protected int getResultSetType() { + return resultSetType; + } + /** * 间隔轮询查询起始位置 * diff --git a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/distribute/DistributedJdbcInputFormat.java b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/distribute/DistributedJdbcInputFormat.java index a5cb91d794..00e2e65d63 100644 --- a/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/distribute/DistributedJdbcInputFormat.java +++ b/chunjun-connectors/chunjun-connector-jdbc-base/src/main/java/com/dtstack/chunjun/connector/jdbc/source/distribute/DistributedJdbcInputFormat.java @@ -33,17 +33,21 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; +import java.lang.invoke.MethodHandles; import java.sql.Connection; import java.sql.SQLException; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Date: 2022/01/12 Company: www.dtstack.com * * @author tudou */ public class DistributedJdbcInputFormat extends JdbcInputFormat { - + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); protected List sourceList; protected DistributedJdbcInputSplit inputSplit; protected int sourceIndex = 0; diff --git a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/source/PostgresqlInputFormat.java b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/source/PostgresqlInputFormat.java index 60d8dc8fd5..769c12cdc1 100644 --- a/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/source/PostgresqlInputFormat.java +++ b/chunjun-connectors/chunjun-connector-postgresql/src/main/java/com/dtstack/chunjun/connector/postgresql/source/PostgresqlInputFormat.java @@ -4,71 +4,80 @@ import com.dtstack.chunjun.connector.jdbc.util.SqlUtil; import com.dtstack.chunjun.util.ExceptionUtil; +import java.lang.invoke.MethodHandles; import java.sql.ResultSet; import java.sql.SQLException; import java.util.concurrent.TimeUnit; -public class PostgresqlInputFormat extends JdbcInputFormat { - - @Override - protected void queryPollingWithOutStartLocation() throws SQLException { - // In PostgreSQL, if resultCursorType is FORWARD_ONLY - // , the query will report an error after the method - // #setFetchDirection(ResultSet.FETCH_REVERSE) is called. - String querySql = - jdbcConf.getQuerySql() + SqlUtil.buildOrderSql(jdbcConf, jdbcDialect, "ASC"); - ps = - dbConn.prepareStatement( - querySql, ResultSet.TYPE_SCROLL_INSENSITIVE, resultSetConcurrency); - ps.setFetchSize(jdbcConf.getFetchSize()); - ps.setQueryTimeout(jdbcConf.getQueryTimeOut()); - resultSet = ps.executeQuery(); - hasNext = resultSet.next(); +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - try { - // 间隔轮询一直循环,直到查询到数据库中的数据为止 - while (!hasNext) { - TimeUnit.MILLISECONDS.sleep(jdbcConf.getPollingInterval()); - resultSet.close(); - // 如果事务不提交 就会导致数据库即使插入数据 也无法读到数据 - dbConn.commit(); - resultSet = ps.executeQuery(); - hasNext = resultSet.next(); - // 每隔五分钟打印一次,(当前时间 - 任务开始时间) % 300秒 <= 一个间隔轮询周期 - if ((System.currentTimeMillis() - startTime) % 300000 - <= jdbcConf.getPollingInterval()) { - LOG.info( - "no record matched condition in database, execute query sql = {}, startLocation = {}", - jdbcConf.getQuerySql(), - endLocationAccumulator.getLocalValue()); - } - } - } catch (InterruptedException e) { - LOG.warn( - "interrupted while waiting for polling, e = {}", - ExceptionUtil.getErrorMessage(e)); - } +public class PostgresqlInputFormat extends JdbcInputFormat { + //private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - // 查询到数据,更新querySql - StringBuilder builder = new StringBuilder(128); - builder.append(jdbcConf.getQuerySql()); - if (jdbcConf.getQuerySql().contains("WHERE")) { - builder.append(" AND "); - } else { - builder.append(" WHERE "); - } - builder.append(jdbcDialect.quoteIdentifier(jdbcConf.getIncreColumn())) - .append(" > ? ORDER BY ") - .append(jdbcDialect.quoteIdentifier(jdbcConf.getIncreColumn())) - .append(" ASC"); - jdbcConf.setQuerySql(builder.toString()); - ps = - dbConn.prepareStatement( - jdbcConf.getQuerySql(), - ResultSet.TYPE_SCROLL_INSENSITIVE, - resultSetConcurrency); - ps.setFetchSize(jdbcConf.getFetchSize()); - ps.setQueryTimeout(jdbcConf.getQueryTimeOut()); - LOG.info("update querySql, sql = {}", jdbcConf.getQuerySql()); + protected int getResultSetType() { + return ResultSet.TYPE_SCROLL_INSENSITIVE; } + +// @Override +// protected void queryPollingWithOutStartLocation() throws SQLException { +// // In PostgreSQL, if resultCursorType is FORWARD_ONLY +// // , the query will report an error after the method +// // #setFetchDirection(ResultSet.FETCH_REVERSE) is called. +// String querySql = +// jdbcConf.getQuerySql() + SqlUtil.buildOrderSql(jdbcConf, jdbcDialect, "ASC"); +// ps = +// dbConn.prepareStatement( +// querySql, ResultSet.TYPE_SCROLL_INSENSITIVE, resultSetConcurrency); +// ps.setFetchSize(jdbcConf.getFetchSize()); +// ps.setQueryTimeout(jdbcConf.getQueryTimeOut()); +// resultSet = ps.executeQuery(); +// hasNext = resultSet.next(); +// +// try { +// // 间隔轮询一直循环,直到查询到数据库中的数据为止 +// while (!hasNext) { +// TimeUnit.MILLISECONDS.sleep(jdbcConf.getPollingInterval()); +// resultSet.close(); +// // 如果事务不提交 就会导致数据库即使插入数据 也无法读到数据 +// dbConn.commit(); +// resultSet = ps.executeQuery(); +// hasNext = resultSet.next(); +// // 每隔五分钟打印一次,(当前时间 - 任务开始时间) % 300秒 <= 一个间隔轮询周期 +// if ((System.currentTimeMillis() - startTime) % 300000 +// <= jdbcConf.getPollingInterval()) { +// LOG.info( +// "no record matched condition in database, execute query sql = {}, startLocation = {}", +// jdbcConf.getQuerySql(), +// endLocationAccumulator.getLocalValue()); +// } +// } +// } catch (InterruptedException e) { +// LOG.warn( +// "interrupted while waiting for polling, e = {}", +// ExceptionUtil.getErrorMessage(e)); +// } +// +// // 查询到数据,更新querySql +// StringBuilder builder = new StringBuilder(128); +// builder.append(jdbcConf.getQuerySql()); +// if (jdbcConf.getQuerySql().contains("WHERE")) { +// builder.append(" AND "); +// } else { +// builder.append(" WHERE "); +// } +// builder.append(jdbcDialect.quoteIdentifier(jdbcConf.getIncreColumn())) +// .append(" > ? ORDER BY ") +// .append(jdbcDialect.quoteIdentifier(jdbcConf.getIncreColumn())) +// .append(" ASC"); +// jdbcConf.setQuerySql(builder.toString()); +// ps = +// dbConn.prepareStatement( +// jdbcConf.getQuerySql(), +// ResultSet.TYPE_SCROLL_INSENSITIVE, +// resultSetConcurrency); +// ps.setFetchSize(jdbcConf.getFetchSize()); +// ps.setQueryTimeout(jdbcConf.getQueryTimeOut()); +// LOG.info("update querySql, sql = {}", jdbcConf.getQuerySql()); +// } } diff --git a/chunjun-core/src/main/java/com/dtstack/chunjun/source/format/BaseRichInputFormat.java b/chunjun-core/src/main/java/com/dtstack/chunjun/source/format/BaseRichInputFormat.java index 16c354ca49..a67b7aa509 100644 --- a/chunjun-core/src/main/java/com/dtstack/chunjun/source/format/BaseRichInputFormat.java +++ b/chunjun-core/src/main/java/com/dtstack/chunjun/source/format/BaseRichInputFormat.java @@ -70,7 +70,7 @@ public abstract class BaseRichInputFormat extends RichInputFormat