From 783e459e960cb19eb098feb382cc9840397ab2fa Mon Sep 17 00:00:00 2001 From: awol2005ex Date: Thu, 28 Nov 2024 11:39:43 +0800 Subject: [PATCH] =?UTF-8?q?[bugfix][plugin][rdbms]=201.DataBaseType=20shou?= =?UTF-8?q?ld=20not=20be=20call=20set=20method=EF=BC=8Cso=20that=20the=20d?= =?UTF-8?q?river=20is=20error=20when=20rdbmsreader=20to=20rdbmswriter=202.?= =?UTF-8?q?hive=20jdbc=20don't=20support=20method(Timestamp=20getTimestamp?= =?UTF-8?q?(int=20columnIndex,=20Calendar=20cal))?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 3.DataBaseType add vertica driver --- .../addax/rdbms/reader/CommonRdbmsReader.java | 13 +++++++++---- .../reader/util/OriginalConfPretreatmentUtil.java | 7 ++++++- .../com/wgzhao/addax/rdbms/util/DataBaseType.java | 7 ++----- .../writer/util/OriginalConfPretreatmentUtil.java | 5 ++++- .../plugin/reader/rdbmsreader/RdbmsReader.java | 8 +++++--- .../plugin/writer/rdbmswriter/RdbmsWriter.java | 7 +++++-- 6 files changed, 31 insertions(+), 16 deletions(-) diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/CommonRdbmsReader.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/CommonRdbmsReader.java index 8dcc20c74..89b2129d9 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/CommonRdbmsReader.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/CommonRdbmsReader.java @@ -166,7 +166,7 @@ public void init(Configuration readerSliceConfig) } public void startRead(Configuration readerSliceConfig, RecordSender recordSender, - TaskPluginCollector taskPluginCollector, int fetchSize) + TaskPluginCollector taskPluginCollector, int fetchSize) { String querySql = readerSliceConfig.getString(Key.QUERY_SQL); @@ -223,7 +223,7 @@ public void destroy(Configuration originalConfig) } protected void transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, - int columnNumber, TaskPluginCollector taskPluginCollector) + int columnNumber, TaskPluginCollector taskPluginCollector) { Record record = buildRecord(recordSender, rs, metaData, columnNumber, taskPluginCollector); recordSender.sendToWriter(record); @@ -272,7 +272,12 @@ protected Column createColumn(ResultSet rs, ResultSetMetaData metaData, int i) return new DateColumn(rs.getDate(i)); case Types.TIMESTAMP: - return new TimestampColumn(rs.getTimestamp(i, Calendar.getInstance())); + if(!"org.apache.hive.jdbc.HiveDriver".equals(this.dataBaseType.getDriverClassName())){ + return new TimestampColumn(rs.getTimestamp(i, Calendar.getInstance())); + }else{ + //hive not support method(Timestamp getTimestamp(int columnIndex, Calendar cal)) + return new TimestampColumn(rs.getTimestamp(i)); + } case Types.BINARY: case Types.VARBINARY: @@ -311,7 +316,7 @@ protected Column createColumn(ResultSet rs, ResultSetMetaData metaData, int i) } protected Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber, - TaskPluginCollector taskPluginCollector) + TaskPluginCollector taskPluginCollector) { Record record = recordSender.createRecord(); diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/OriginalConfPretreatmentUtil.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/OriginalConfPretreatmentUtil.java index b087d228b..9180a57cb 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/OriginalConfPretreatmentUtil.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/reader/util/OriginalConfPretreatmentUtil.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR; @@ -118,7 +119,11 @@ private static void dealJdbcAndTable(Configuration originalConfig) String driverClass = connConf.getString(Key.JDBC_DRIVER, null); if (driverClass != null && !driverClass.isEmpty()) { LOG.warn("use specified driver class: {}", driverClass); - dataBaseType.setDriverClassName(driverClass); + + + Arrays.stream(DataBaseType.values()).filter( + d -> d.getDriverClassName().equals(driverClass)).findFirst().ifPresent(d -> + dataBaseType=d); } connConf.getNecessaryValue(Key.JDBC_URL, REQUIRED_VALUE); diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DataBaseType.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DataBaseType.java index caac6b37c..8ebf65b60 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DataBaseType.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/util/DataBaseType.java @@ -50,7 +50,8 @@ public enum DataBaseType Sybase("sybase", "com.sybase.jdbc4.jdbc.SybDriver"), Databend("databend", "com.databend.jdbc.DatabendDriver"), Access("access","net.ucanaccess.jdbc.UcanaccessDriver"), - HANA("hana", "com.sap.db.jdbc.Driver"); + HANA("hana", "com.sap.db.jdbc.Driver"), + VERTICA("vertica", "com.vertica.jdbc.Driver"); private static final Pattern jdbcUrlPattern = Pattern.compile("jdbc:\\w+:(?:thin:url=|//|thin:@|)([\\w\\d.,]+).*"); @@ -162,8 +163,4 @@ public String getTypeName() return typeName; } - public void setDriverClassName(String driverClassName) - { - this.driverClassName = driverClassName; - } } diff --git a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/writer/util/OriginalConfPretreatmentUtil.java b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/writer/util/OriginalConfPretreatmentUtil.java index 0bc49d743..cc953df08 100644 --- a/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/writer/util/OriginalConfPretreatmentUtil.java +++ b/lib/addax-rdbms/src/main/java/com/wgzhao/addax/rdbms/writer/util/OriginalConfPretreatmentUtil.java @@ -38,6 +38,7 @@ import java.sql.Connection; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR; @@ -92,7 +93,9 @@ public static void simplifyConf(Configuration originalConfig) String driverClass = connConf.getString(Key.JDBC_DRIVER, null); if (driverClass != null && !driverClass.isEmpty()) { LOG.warn("Use specified driver class [{}]", driverClass); - dataBaseType.setDriverClassName(driverClass); + Arrays.stream(DataBaseType.values()).filter( + d -> d.getDriverClassName().equals(driverClass)).findFirst().ifPresent(d -> + dataBaseType=d); } String jdbcUrl = connConf.getString(Key.JDBC_URL); if (StringUtils.isBlank(jdbcUrl)) { diff --git a/plugin/reader/rdbmsreader/src/main/java/com/wgzhao/addax/plugin/reader/rdbmsreader/RdbmsReader.java b/plugin/reader/rdbmsreader/src/main/java/com/wgzhao/addax/plugin/reader/rdbmsreader/RdbmsReader.java index 3e3d30fd7..c3e59fe0c 100644 --- a/plugin/reader/rdbmsreader/src/main/java/com/wgzhao/addax/plugin/reader/rdbmsreader/RdbmsReader.java +++ b/plugin/reader/rdbmsreader/src/main/java/com/wgzhao/addax/plugin/reader/rdbmsreader/RdbmsReader.java @@ -40,7 +40,7 @@ public class RdbmsReader extends Reader { - private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS; + private static DataBaseType DATABASE_TYPE = DataBaseType.RDBMS; public static class Job extends Reader.Job @@ -71,11 +71,13 @@ public void init() final String jdbcType = connection.getString(Key.JDBC_URL).split(":")[1]; Arrays.stream(DataBaseType.values()).filter( dataBaseType -> dataBaseType.getTypeName().equals(jdbcType)).findFirst().ifPresent(dataBaseType -> - DATABASE_TYPE.setDriverClassName(dataBaseType.getDriverClassName())); + DATABASE_TYPE=dataBaseType); } else { // use custom jdbc driver - DATABASE_TYPE.setDriverClassName(jdbcDriver); + Arrays.stream(DataBaseType.values()).filter( + dataBaseType -> dataBaseType.getDriverClassName().equals(jdbcDriver)).findFirst().ifPresent(dataBaseType -> + DATABASE_TYPE=dataBaseType); } this.commonRdbmsReaderMaster = new SubCommonRdbmsReader.Job(DATABASE_TYPE); this.originalConfig = this.commonRdbmsReaderMaster.init(this.originalConfig); diff --git a/plugin/writer/rdbmswriter/src/main/java/com/wgzhao/addax/plugin/writer/rdbmswriter/RdbmsWriter.java b/plugin/writer/rdbmswriter/src/main/java/com/wgzhao/addax/plugin/writer/rdbmswriter/RdbmsWriter.java index 48112ac60..9c93ce99e 100644 --- a/plugin/writer/rdbmswriter/src/main/java/com/wgzhao/addax/plugin/writer/rdbmswriter/RdbmsWriter.java +++ b/plugin/writer/rdbmswriter/src/main/java/com/wgzhao/addax/plugin/writer/rdbmswriter/RdbmsWriter.java @@ -29,6 +29,7 @@ import com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter; import org.apache.commons.lang3.StringUtils; +import java.util.Arrays; import java.util.List; import static com.wgzhao.addax.common.base.Key.JDBC_DRIVER; @@ -38,7 +39,7 @@ public class RdbmsWriter extends Writer { - private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS; + private static DataBaseType DATABASE_TYPE = DataBaseType.RDBMS; public static class Job extends Writer.Job @@ -63,7 +64,9 @@ public void init() throw AddaxException.asAddaxException(REQUIRED_VALUE, "config 'driver' is required and must not be empty"); } // use special jdbc driver class - DATABASE_TYPE.setDriverClassName(jdbcDriver); + Arrays.stream(DataBaseType.values()).filter( + d -> d.getDriverClassName().equals(jdbcDriver)).findFirst().ifPresent(d -> + DATABASE_TYPE=d); this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE); commonRdbmsWriterJob.init(originalConfig); }