Skip to content

Commit

Permalink
fix bug issue: #2 #3
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Aug 8, 2022
1 parent 2251bcf commit d403b3c
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,22 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.lang.invoke.MethodHandles;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @program chunjun
* @author: xiuzhu
* @create: 2021/05/10
*/
public class ClickhouseInputFormat extends JdbcInputFormat {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
public void openInternal(InputSplit inputSplit) {
JdbcInputSplit jdbcInputSplit = (JdbcInputSplit) inputSplit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.slice.SliceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -57,6 +60,7 @@
* @create: 2021/06/27 17:25
*/
public class ElasticsearchInputFormat extends BaseRichInputFormat {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected long keepAlive = 1;
/** Elasticsearch Configuration */
private ElasticsearchConf elasticsearchConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.dtstack.chunjun.element.column.TimeColumn;
import com.dtstack.chunjun.element.column.TimestampColumn;

import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
Expand All @@ -58,6 +59,7 @@ public class JdbcColumnConverter
extends AbstractRowConverter<
ResultSet, JsonArray, FieldNamedPreparedStatement, LogicalType> {


public JdbcColumnConverter(RowType rowType) {
this(rowType, null);
}
Expand Down Expand Up @@ -133,7 +135,14 @@ public RowData toInternal(ResultSet resultSet) throws Exception {
public FieldNamedPreparedStatement toExternal(
RowData rowData, FieldNamedPreparedStatement statement) throws Exception {
for (int index = 0; index < rowData.getArity(); index++) {
toExternalConverters.get(index).serialize(rowData, index, statement);
try {
toExternalConverters.get(index).serialize(rowData, index, statement);
} catch (Throwable e) {
if (rowData instanceof GenericRowData) {
throw new IllegalStateException("index:" + index + " val:" + ((GenericRowData) rowData).getField(index), e);
}
throw e;
}
}
return statement;
}
Expand Down Expand Up @@ -208,7 +217,13 @@ protected ISerializationConverter<FieldNamedPreparedStatement> createExternalCon
};
case TINYINT:
return (val, index, statement) -> statement.setByte(index, val.getByte(index));
case SMALLINT:
case SMALLINT: {
return (val, index, statement) -> {
short a = 0;
a = val.getShort(index);
statement.setShort(index, a);
};
}
case INTEGER:
case INTERVAL_YEAR_MONTH:
return (val, index, statement) -> {
Expand All @@ -222,8 +237,8 @@ protected ISerializationConverter<FieldNamedPreparedStatement> createExternalCon
statement.setInt(index, a);
};
case FLOAT:
return (val, index, statement) ->{
// statement.setFloat(index, ((ColumnRowData) val).getField(index).asFloat());
return (val, index, statement) -> {
// statement.setFloat(index, ((ColumnRowData) val).getField(index).asFloat());

statement.setFloat(index, val.getFloat(index));
};
Expand Down Expand Up @@ -266,7 +281,10 @@ protected ISerializationConverter<FieldNamedPreparedStatement> createExternalCon
// val.getTimestamp(index, -1).toLocalDateTime();
// statement.setTime(index, ((ColumnRowData) val).getField(index).asTime());
// val.get
throw new UnsupportedOperationException("index:" + index + ",val:" + val.toString());
// throw new UnsupportedOperationException("index:" + index + ",val:" + val.toString());
// java.sql.Time time =

statement.setTime(index, new Time(val.getInt(index)));
};
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.lang.invoke.MethodHandles;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Connection;
Expand All @@ -64,6 +65,9 @@

import static com.dtstack.chunjun.enums.ColumnType.TIMESTAMPTZ;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* InputFormat for reading data from a database and generate Rows.
*
Expand All @@ -72,7 +76,7 @@
* @author [email protected]
*/
public class JdbcInputFormat extends BaseRichInputFormat {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final long serialVersionUID = 1L;
protected static final int resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
protected static int resultSetType = ResultSet.TYPE_FORWARD_ONLY;
Expand Down Expand Up @@ -454,7 +458,7 @@ private String getMaxValueFromDb() {
LOG.info(String.format("Query max value sql is '%s'", queryMaxValueSql));

conn = getConnection();
st = conn.createStatement(resultSetType, resultSetConcurrency);
st = conn.createStatement(getResultSetType(), resultSetConcurrency);
st.setQueryTimeout(jdbcConf.getQueryTimeOut());
rs = st.executeQuery(queryMaxValueSql);
if (rs.next()) {
Expand Down Expand Up @@ -502,7 +506,7 @@ private Pair<String, String> getSplitRangeFromDb() {
LOG.info(String.format("Query SplitRange sql is '%s'", querySplitRangeSql));

conn = getConnection();
st = conn.createStatement(resultSetType, resultSetConcurrency);
st = conn.createStatement(getResultSetType(), resultSetConcurrency);
st.setQueryTimeout(jdbcConf.getQueryTimeOut());
rs = st.executeQuery(querySplitRangeSql);
if (rs.next()) {
Expand Down Expand Up @@ -887,10 +891,19 @@ protected Pair<List<String>, List<String>> getTableMetaData() {
}
/** init prepareStatement */
public void initPrepareStatement(String querySql) throws SQLException {
ps = dbConn.prepareStatement(querySql, resultSetType, resultSetConcurrency);
ps = dbConn.prepareStatement(querySql, getResultSetType(), resultSetConcurrency);
ps.setFetchSize(jdbcConf.getFetchSize());
ps.setQueryTimeout(jdbcConf.getQueryTimeOut());
}

/**
* The constant indicating the type for a <code>ResultSet</code> object
* @return
*/
protected int getResultSetType() {
return resultSetType;
}

/**
* 间隔轮询查询起始位置
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,21 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.lang.invoke.MethodHandles;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Date: 2022/01/12 Company: www.dtstack.com
*
* @author tudou
*/
public class DistributedJdbcInputFormat extends JdbcInputFormat {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected List<DataSourceConf> sourceList;
protected DistributedJdbcInputSplit inputSplit;
protected int sourceIndex = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,71 +4,80 @@
import com.dtstack.chunjun.connector.jdbc.util.SqlUtil;
import com.dtstack.chunjun.util.ExceptionUtil;

import java.lang.invoke.MethodHandles;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;

public class PostgresqlInputFormat extends JdbcInputFormat {

@Override
protected void queryPollingWithOutStartLocation() throws SQLException {
// In PostgreSQL, if resultCursorType is FORWARD_ONLY
// , the query will report an error after the method
// #setFetchDirection(ResultSet.FETCH_REVERSE) is called.
String querySql =
jdbcConf.getQuerySql() + SqlUtil.buildOrderSql(jdbcConf, jdbcDialect, "ASC");
ps =
dbConn.prepareStatement(
querySql, ResultSet.TYPE_SCROLL_INSENSITIVE, resultSetConcurrency);
ps.setFetchSize(jdbcConf.getFetchSize());
ps.setQueryTimeout(jdbcConf.getQueryTimeOut());
resultSet = ps.executeQuery();
hasNext = resultSet.next();
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

try {
// 间隔轮询一直循环,直到查询到数据库中的数据为止
while (!hasNext) {
TimeUnit.MILLISECONDS.sleep(jdbcConf.getPollingInterval());
resultSet.close();
// 如果事务不提交 就会导致数据库即使插入数据 也无法读到数据
dbConn.commit();
resultSet = ps.executeQuery();
hasNext = resultSet.next();
// 每隔五分钟打印一次,(当前时间 - 任务开始时间) % 300秒 <= 一个间隔轮询周期
if ((System.currentTimeMillis() - startTime) % 300000
<= jdbcConf.getPollingInterval()) {
LOG.info(
"no record matched condition in database, execute query sql = {}, startLocation = {}",
jdbcConf.getQuerySql(),
endLocationAccumulator.getLocalValue());
}
}
} catch (InterruptedException e) {
LOG.warn(
"interrupted while waiting for polling, e = {}",
ExceptionUtil.getErrorMessage(e));
}
public class PostgresqlInputFormat extends JdbcInputFormat {
//private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

// 查询到数据,更新querySql
StringBuilder builder = new StringBuilder(128);
builder.append(jdbcConf.getQuerySql());
if (jdbcConf.getQuerySql().contains("WHERE")) {
builder.append(" AND ");
} else {
builder.append(" WHERE ");
}
builder.append(jdbcDialect.quoteIdentifier(jdbcConf.getIncreColumn()))
.append(" > ? ORDER BY ")
.append(jdbcDialect.quoteIdentifier(jdbcConf.getIncreColumn()))
.append(" ASC");
jdbcConf.setQuerySql(builder.toString());
ps =
dbConn.prepareStatement(
jdbcConf.getQuerySql(),
ResultSet.TYPE_SCROLL_INSENSITIVE,
resultSetConcurrency);
ps.setFetchSize(jdbcConf.getFetchSize());
ps.setQueryTimeout(jdbcConf.getQueryTimeOut());
LOG.info("update querySql, sql = {}", jdbcConf.getQuerySql());
protected int getResultSetType() {
return ResultSet.TYPE_SCROLL_INSENSITIVE;
}

// @Override
// protected void queryPollingWithOutStartLocation() throws SQLException {
// // In PostgreSQL, if resultCursorType is FORWARD_ONLY
// // , the query will report an error after the method
// // #setFetchDirection(ResultSet.FETCH_REVERSE) is called.
// String querySql =
// jdbcConf.getQuerySql() + SqlUtil.buildOrderSql(jdbcConf, jdbcDialect, "ASC");
// ps =
// dbConn.prepareStatement(
// querySql, ResultSet.TYPE_SCROLL_INSENSITIVE, resultSetConcurrency);
// ps.setFetchSize(jdbcConf.getFetchSize());
// ps.setQueryTimeout(jdbcConf.getQueryTimeOut());
// resultSet = ps.executeQuery();
// hasNext = resultSet.next();
//
// try {
// // 间隔轮询一直循环,直到查询到数据库中的数据为止
// while (!hasNext) {
// TimeUnit.MILLISECONDS.sleep(jdbcConf.getPollingInterval());
// resultSet.close();
// // 如果事务不提交 就会导致数据库即使插入数据 也无法读到数据
// dbConn.commit();
// resultSet = ps.executeQuery();
// hasNext = resultSet.next();
// // 每隔五分钟打印一次,(当前时间 - 任务开始时间) % 300秒 <= 一个间隔轮询周期
// if ((System.currentTimeMillis() - startTime) % 300000
// <= jdbcConf.getPollingInterval()) {
// LOG.info(
// "no record matched condition in database, execute query sql = {}, startLocation = {}",
// jdbcConf.getQuerySql(),
// endLocationAccumulator.getLocalValue());
// }
// }
// } catch (InterruptedException e) {
// LOG.warn(
// "interrupted while waiting for polling, e = {}",
// ExceptionUtil.getErrorMessage(e));
// }
//
// // 查询到数据,更新querySql
// StringBuilder builder = new StringBuilder(128);
// builder.append(jdbcConf.getQuerySql());
// if (jdbcConf.getQuerySql().contains("WHERE")) {
// builder.append(" AND ");
// } else {
// builder.append(" WHERE ");
// }
// builder.append(jdbcDialect.quoteIdentifier(jdbcConf.getIncreColumn()))
// .append(" > ? ORDER BY ")
// .append(jdbcDialect.quoteIdentifier(jdbcConf.getIncreColumn()))
// .append(" ASC");
// jdbcConf.setQuerySql(builder.toString());
// ps =
// dbConn.prepareStatement(
// jdbcConf.getQuerySql(),
// ResultSet.TYPE_SCROLL_INSENSITIVE,
// resultSetConcurrency);
// ps.setFetchSize(jdbcConf.getFetchSize());
// ps.setQueryTimeout(jdbcConf.getQueryTimeOut());
// LOG.info("update querySql, sql = {}", jdbcConf.getQuerySql());
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public abstract class BaseRichInputFormat extends RichInputFormat<RowData, Input

// protected final Logger LOG = LoggerFactory.getLogger(getClass());

protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

/** BaseRichInputFormat是否结束 */
private final AtomicBoolean isClosed = new AtomicBoolean(false);
Expand Down
2 changes: 1 addition & 1 deletion deploy.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mvn clean install -pl chunjun-connectors/chunjun-connector-mysql\
mvn clean deploy -pl chunjun-connectors/chunjun-connector-mysql\
,chunjun-connectors/chunjun-connector-clickhouse\
,chunjun-connectors/chunjun-connector-doris\
,chunjun-connectors/chunjun-connector-postgresql\
Expand Down

0 comments on commit d403b3c

Please sign in to comment.