Skip to content

Commit

Permalink
[bugfix][plugin][rdbms]
Browse files Browse the repository at this point in the history
1.DataBaseType should not be call set method,so that the driver is error when rdbmsreader to rdbmswriter
2.hive jdbc don't support method(Timestamp getTimestamp(int columnIndex, Calendar cal))

3.DataBaseType add vertica driver
  • Loading branch information
awol2005ex committed Nov 28, 2024
1 parent 50440b9 commit 783e459
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void init(Configuration readerSliceConfig)
}

public void startRead(Configuration readerSliceConfig, RecordSender recordSender,
TaskPluginCollector taskPluginCollector, int fetchSize)
TaskPluginCollector taskPluginCollector, int fetchSize)
{
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);

Expand Down Expand Up @@ -223,7 +223,7 @@ public void destroy(Configuration originalConfig)
}

protected void transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData,
int columnNumber, TaskPluginCollector taskPluginCollector)
int columnNumber, TaskPluginCollector taskPluginCollector)
{
Record record = buildRecord(recordSender, rs, metaData, columnNumber, taskPluginCollector);
recordSender.sendToWriter(record);
Expand Down Expand Up @@ -272,7 +272,12 @@ protected Column createColumn(ResultSet rs, ResultSetMetaData metaData, int i)
return new DateColumn(rs.getDate(i));

case Types.TIMESTAMP:
return new TimestampColumn(rs.getTimestamp(i, Calendar.getInstance()));
if(!"org.apache.hive.jdbc.HiveDriver".equals(this.dataBaseType.getDriverClassName())){
return new TimestampColumn(rs.getTimestamp(i, Calendar.getInstance()));
}else{
//hive not support method(Timestamp getTimestamp(int columnIndex, Calendar cal))
return new TimestampColumn(rs.getTimestamp(i));
}

case Types.BINARY:
case Types.VARBINARY:
Expand Down Expand Up @@ -311,7 +316,7 @@ protected Column createColumn(ResultSet rs, ResultSetMetaData metaData, int i)
}

protected Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnNumber,
TaskPluginCollector taskPluginCollector)
TaskPluginCollector taskPluginCollector)
{
Record record = recordSender.createRecord();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR;
Expand Down Expand Up @@ -118,7 +119,11 @@ private static void dealJdbcAndTable(Configuration originalConfig)
String driverClass = connConf.getString(Key.JDBC_DRIVER, null);
if (driverClass != null && !driverClass.isEmpty()) {
LOG.warn("use specified driver class: {}", driverClass);
dataBaseType.setDriverClassName(driverClass);


Arrays.stream(DataBaseType.values()).filter(
d -> d.getDriverClassName().equals(driverClass)).findFirst().ifPresent(d ->
dataBaseType=d);
}
connConf.getNecessaryValue(Key.JDBC_URL, REQUIRED_VALUE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public enum DataBaseType
Sybase("sybase", "com.sybase.jdbc4.jdbc.SybDriver"),
Databend("databend", "com.databend.jdbc.DatabendDriver"),
Access("access","net.ucanaccess.jdbc.UcanaccessDriver"),
HANA("hana", "com.sap.db.jdbc.Driver");
HANA("hana", "com.sap.db.jdbc.Driver"),
VERTICA("vertica", "com.vertica.jdbc.Driver");

private static final Pattern jdbcUrlPattern = Pattern.compile("jdbc:\\w+:(?:thin:url=|//|thin:@|)([\\w\\d.,]+).*");

Expand Down Expand Up @@ -162,8 +163,4 @@ public String getTypeName()
return typeName;
}

public void setDriverClassName(String driverClassName)
{
this.driverClassName = driverClassName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.sql.Connection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static com.wgzhao.addax.common.spi.ErrorCode.CONFIG_ERROR;
Expand Down Expand Up @@ -92,7 +93,9 @@ public static void simplifyConf(Configuration originalConfig)
String driverClass = connConf.getString(Key.JDBC_DRIVER, null);
if (driverClass != null && !driverClass.isEmpty()) {
LOG.warn("Use specified driver class [{}]", driverClass);
dataBaseType.setDriverClassName(driverClass);
Arrays.stream(DataBaseType.values()).filter(
d -> d.getDriverClassName().equals(driverClass)).findFirst().ifPresent(d ->
dataBaseType=d);
}
String jdbcUrl = connConf.getString(Key.JDBC_URL);
if (StringUtils.isBlank(jdbcUrl)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
public class RdbmsReader
extends Reader
{
private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;
private static DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;

public static class Job
extends Reader.Job
Expand Down Expand Up @@ -71,11 +71,13 @@ public void init()
final String jdbcType = connection.getString(Key.JDBC_URL).split(":")[1];
Arrays.stream(DataBaseType.values()).filter(
dataBaseType -> dataBaseType.getTypeName().equals(jdbcType)).findFirst().ifPresent(dataBaseType ->
DATABASE_TYPE.setDriverClassName(dataBaseType.getDriverClassName()));
DATABASE_TYPE=dataBaseType);
}
else {
// use custom jdbc driver
DATABASE_TYPE.setDriverClassName(jdbcDriver);
Arrays.stream(DataBaseType.values()).filter(
dataBaseType -> dataBaseType.getDriverClassName().equals(jdbcDriver)).findFirst().ifPresent(dataBaseType ->
DATABASE_TYPE=dataBaseType);
}
this.commonRdbmsReaderMaster = new SubCommonRdbmsReader.Job(DATABASE_TYPE);
this.originalConfig = this.commonRdbmsReaderMaster.init(this.originalConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.wgzhao.addax.rdbms.writer.CommonRdbmsWriter;
import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.List;

import static com.wgzhao.addax.common.base.Key.JDBC_DRIVER;
Expand All @@ -38,7 +39,7 @@
public class RdbmsWriter
extends Writer
{
private static final DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;
private static DataBaseType DATABASE_TYPE = DataBaseType.RDBMS;

public static class Job
extends Writer.Job
Expand All @@ -63,7 +64,9 @@ public void init()
throw AddaxException.asAddaxException(REQUIRED_VALUE, "config 'driver' is required and must not be empty");
}
// use special jdbc driver class
DATABASE_TYPE.setDriverClassName(jdbcDriver);
Arrays.stream(DataBaseType.values()).filter(
d -> d.getDriverClassName().equals(jdbcDriver)).findFirst().ifPresent(d ->
DATABASE_TYPE=d);
this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
commonRdbmsWriterJob.init(originalConfig);
}
Expand Down

0 comments on commit 783e459

Please sign in to comment.