diff --git a/batch-tool/docs/usage-details.md b/batch-tool/docs/usage-details.md index a92ccc2..0688c5d 100644 --- a/batch-tool/docs/usage-details.md +++ b/batch-tool/docs/usage-details.md @@ -170,8 +170,18 @@ mask: >- **解决**:加入 `-con 4`参数限制导入数据库的消费者线程数为4 6. 生产者消费者建议的比例数? - + **原因**:默认情况下,生产者数(并发读取文件线程数)为4,消费者数(并发导入数据库线程数)为CPU核心数的4倍; -在不同的规格下,默认参数可能表现不佳 + 在不同的规格下,默认参数可能表现不佳 **解决**:通常建议消费者数`-con `为生产者数`-pro `的6~8倍,具体数值还需根据硬件配置、数据库处理能力来调整 + +7. 报错 `Missing argument for option: ` + + **原因**:参数缺少参数值,自1.2.0版本后,开关类型的参数也需要填入参数(true|false) + +8. 导出数据时,默认分区并行度较低导致导出耗时较长 + + **原因**:默认情况下,导出时分区并行度为常数(4),当数据库实例以及压测机配置较高时,该并行度不足以充分利用硬件资源 + + **解决**:调整参数 -pro 的大小来调节导出分区并行度(导出场景中,生产者为连接数据库拉取数据的线程,消费者为文件写入的线程) diff --git a/batch-tool/pom.xml b/batch-tool/pom.xml index 724a390..dd28bbe 100644 --- a/batch-tool/pom.xml +++ b/batch-tool/pom.xml @@ -6,7 +6,7 @@ com.alibaba.polardbx batch-tool - 1.2.1 + 1.2.2 UTF-8 diff --git a/batch-tool/src/main/java/cmd/CommandUtil.java b/batch-tool/src/main/java/cmd/CommandUtil.java index a937c25..8905966 100644 --- a/batch-tool/src/main/java/cmd/CommandUtil.java +++ b/batch-tool/src/main/java/cmd/CommandUtil.java @@ -98,6 +98,7 @@ import static cmd.ConfigArgOption.ARG_SHORT_VERSION; import static cmd.ConfigArgOption.ARG_SHORT_WHERE; import static cmd.ConfigArgOption.ARG_SHORT_WITH_DDL; +import static cmd.FlagOption.ARG_EMPTY_AS_NULL; import static cmd.FlagOption.ARG_SHORT_ENABLE_SHARDING; import static cmd.FlagOption.ARG_SHORT_IGNORE_AND_RESUME; import static cmd.FlagOption.ARG_SHORT_LOAD_BALANCE; @@ -396,6 +397,10 @@ private static boolean getWithLastSep(ConfigResult result) { return result.getBooleanFlag(ARG_SHORT_WITH_LAST_SEP); } + private static boolean getEmptyAsNull(ConfigResult result) { + return result.getBooleanFlag(ARG_EMPTY_AS_NULL); + } + private static FileFormat getFileFormat(ConfigResult result) { if (result.hasOption(ARG_SHORT_FILE_FORMAT)) { String fileFormat = result.getOptionValue(ARG_SHORT_FILE_FORMAT); @@ -563,6 +568,7 @@ private static void configureConsumerContext(ConfigResult result, consumerExecutionContext.setWithLastSep(getWithLastSep(result)); consumerExecutionContext.setTpsLimit(getTpsLimit(result)); consumerExecutionContext.setUseColumns(getUseColumns(result)); + consumerExecutionContext.setEmptyStrAsNull(getEmptyAsNull(result)); consumerExecutionContext.validate(); } diff --git a/batch-tool/src/main/java/cmd/FlagOption.java b/batch-tool/src/main/java/cmd/FlagOption.java index 3a873cd..a0fb730 100644 --- a/batch-tool/src/main/java/cmd/FlagOption.java +++ b/batch-tool/src/main/java/cmd/FlagOption.java @@ -63,5 +63,7 @@ private static FlagOption of(String argShort, String argLong, String desc, Boole public static final FlagOption ARG_SHORT_PERF_MODE = of("perf", "perfMode", "Use performance mode at the sacrifice of compatibility (default false).", false); public static final FlagOption ARG_TRIM_RIGHT = - of("trimRight", "trimRight", "Remove trailing whitespaces in a line for BlockReader (default false).".trim(), false); + of("trimRight", "trimRight", "Remove trailing whitespaces in a line for BlockReader (default false).", false); + public static final FlagOption ARG_EMPTY_AS_NULL = + of("emptyAsNull", "emptyAsNull", "Treat an empty value for string-type as NULL (default false).", false); } diff --git a/batch-tool/src/main/java/datasource/DataSourceConfig.java b/batch-tool/src/main/java/datasource/DataSourceConfig.java index cd89ead..1393bfd 100644 --- a/batch-tool/src/main/java/datasource/DataSourceConfig.java +++ b/batch-tool/src/main/java/datasource/DataSourceConfig.java @@ -157,6 +157,7 @@ public DataSourceConfig build() { dataSourceConfig.maxWait = this.maxWait; dataSourceConfig.connParam = this.connParam; dataSourceConfig.initSqls = this.initSqls; + dataSourceConfig.loadBalanceEnabled = this.loadBalanceEnabled; String jdbcUrl; if (loadBalanceEnabled) { jdbcUrl = String.format(DataSourceUtil.LOAD_BALANCE_URL_PATTERN, diff --git a/batch-tool/src/main/java/exec/ImportExecutor.java b/batch-tool/src/main/java/exec/ImportExecutor.java index 257d4d4..8d0fe36 100644 --- a/batch-tool/src/main/java/exec/ImportExecutor.java +++ b/batch-tool/src/main/java/exec/ImportExecutor.java @@ -17,7 +17,6 @@ package exec; import cmd.BaseOperateCommand; -import cmd.ImportCommand; import com.alibaba.druid.pool.DruidDataSource; import datasource.DataSourceConfig; import exception.DatabaseException; @@ -114,9 +113,13 @@ public void execute() { producerExecutionContext.getDdlMode()); } configureFieldMetaInfo(); + + logger.debug(producerExecutionContext.toString()); + logger.debug(consumerExecutionContext.toString()); + for (String tableName : tableNames) { if (producerExecutionContext.isSingleThread() - && consumerExecutionContext.isSingleThread()) { + && consumerExecutionContext.isSingleThread()) { // 使用按行读取insert模式 doSingleThreadImport(tableName); } else { diff --git a/batch-tool/src/main/java/model/ConsumerExecutionContext.java b/batch-tool/src/main/java/model/ConsumerExecutionContext.java index aa4ccb8..0f285eb 100644 --- a/batch-tool/src/main/java/model/ConsumerExecutionContext.java +++ b/batch-tool/src/main/java/model/ConsumerExecutionContext.java @@ -129,6 +129,11 @@ public class ConsumerExecutionContext extends BaseConfig { */ private int tpsLimit; + /** + * 字符串类型字段,空值视作NULL + */ + private boolean emptyStrAsNull = false; + private double batchTpsLimitPerConsumer; private List> eventCounter; @@ -308,6 +313,7 @@ public String toString() { ", toUpdateColumns='" + toUpdateColumns + '\'' + ", updateWithFuncPattern='" + updateWithFuncPattern + '\'' + ", sqlEscapeEnabled=" + sqlEscapeEnabled + + ", tableFieldMetaInfo=" + tableFieldMetaInfo + '}'; } @@ -419,6 +425,14 @@ public void setUseColumns(String useColumns) { this.useColumns = useColumns; } + public boolean isEmptyStrAsNull() { + return emptyStrAsNull; + } + + public void setEmptyStrAsNull(boolean emptyStrAsNull) { + this.emptyStrAsNull = emptyStrAsNull; + } + @Override public void validate() { super.validate(); diff --git a/batch-tool/src/main/java/model/db/TableFieldMetaInfo.java b/batch-tool/src/main/java/model/db/TableFieldMetaInfo.java index 8696258..67a2e51 100644 --- a/batch-tool/src/main/java/model/db/TableFieldMetaInfo.java +++ b/batch-tool/src/main/java/model/db/TableFieldMetaInfo.java @@ -16,11 +16,12 @@ package model.db; +import java.util.ArrayList; import java.util.List; public class TableFieldMetaInfo { - private List fieldMetaInfoList; + private List fieldMetaInfoList = new ArrayList<>(); /** * 主键 */ diff --git a/batch-tool/src/main/java/util/DbUtil.java b/batch-tool/src/main/java/util/DbUtil.java index 89a15f9..8a73245 100644 --- a/batch-tool/src/main/java/util/DbUtil.java +++ b/batch-tool/src/main/java/util/DbUtil.java @@ -31,9 +31,9 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.stream.Collectors; public class DbUtil { @@ -272,12 +272,16 @@ public static Map getDbFieldMetaInfo(Connection conn String schemaName, List tableNames) throws DatabaseException { - Map resultMap = new HashMap<>(); - for (String tableName : tableNames) { - TableFieldMetaInfo metaInfo = new TableFieldMetaInfo(); - metaInfo.setFieldMetaInfoList(new ArrayList<>()); - resultMap.put(tableName, metaInfo); - } + + Map resultMap = tableNames.stream() + .collect(Collectors.toMap( + tableName -> tableName, + tableName -> new TableFieldMetaInfo(), + (u, v) -> { + throw new IllegalStateException(String.format("Duplicate key %s", u)); + }, + () -> new TreeMap<>(String.CASE_INSENSITIVE_ORDER))); + Statement stmt = null; ResultSet resultSet = null; String metaInfoSql = String.format(DB_FIELD_INFO_SQL_PATTERN, schemaName); diff --git a/batch-tool/src/main/java/util/FileUtil.java b/batch-tool/src/main/java/util/FileUtil.java index d720338..09c7f80 100644 --- a/batch-tool/src/main/java/util/FileUtil.java +++ b/batch-tool/src/main/java/util/FileUtil.java @@ -178,6 +178,9 @@ private static ArrayList splitWithQuoteEscape(String line, String sep, f // 结尾有分隔符则忽略 len -= sep.length(); } + if (estimateCount <= 0) { + estimateCount = 1; + } ArrayList subStrings = new ArrayList<>(estimateCount); StringBuilder stringBuilder = new StringBuilder(line.length() / estimateCount); char sepStart = sep.charAt(0); diff --git a/batch-tool/src/main/java/worker/common/BaseDefaultConsumer.java b/batch-tool/src/main/java/worker/common/BaseDefaultConsumer.java index 8d4c3b1..66bd8c6 100644 --- a/batch-tool/src/main/java/worker/common/BaseDefaultConsumer.java +++ b/batch-tool/src/main/java/worker/common/BaseDefaultConsumer.java @@ -73,6 +73,9 @@ public void onProxyEvent(BatchLineEvent event) { } catch (Exception e) { consumerContext.setException(e); logger.error("Failed in table [{}], due to {}", tableName, e.getMessage()); + if (e.getStackTrace().length > 0) { + logger.error("{}", e.getStackTrace()[0]); + } // 认为无法恢复 throw new RuntimeException(e); } finally { diff --git a/batch-tool/src/main/java/worker/insert/DirectImportWorker.java b/batch-tool/src/main/java/worker/insert/DirectImportWorker.java index cc457fd..cfc458f 100644 --- a/batch-tool/src/main/java/worker/insert/DirectImportWorker.java +++ b/batch-tool/src/main/java/worker/insert/DirectImportWorker.java @@ -24,26 +24,21 @@ import exception.DatabaseException; import model.ConsumerExecutionContext; import model.ProducerExecutionContext; -import model.config.ConfigConstant; import model.config.FileLineRecord; import model.config.GlobalVar; import model.db.FieldMetaInfo; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import util.DbUtil; import util.IOUtil; import worker.util.ImportUtil; import javax.sql.DataSource; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; import java.sql.Connection; -import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; @@ -67,6 +62,7 @@ public class DirectImportWorker implements Runnable { private final int reportLine; private final boolean sqlEscapeEnabled; + private final boolean emptyStrAsNull; public DirectImportWorker(DataSource dataSource, String tableName, @@ -85,6 +81,7 @@ public DirectImportWorker(DataSource dataSource, this.maxErrorCount = producerContext.getMaxErrorCount(); this.reportLine = GlobalVar.EMIT_BATCH_SIZE * 10; this.sqlEscapeEnabled = consumerContext.isSqlEscapeEnabled(); + this.emptyStrAsNull = consumerContext.isEmptyStrAsNull(); } @Override @@ -112,7 +109,7 @@ public void run() { for (String[] values; (values = reader.readNext()) != null; ) { try { ImportUtil.getDirectImportSql(insertSqlBuilder, tableName, - fieldMetaInfoList, Arrays.asList(values), sqlEscapeEnabled, true); + fieldMetaInfoList, Arrays.asList(values), sqlEscapeEnabled, emptyStrAsNull); stmt.execute(insertSqlBuilder.toString()); importedLines++; diff --git a/batch-tool/src/main/java/worker/insert/ImportConsumer.java b/batch-tool/src/main/java/worker/insert/ImportConsumer.java index 4e9ba48..f12e835 100644 --- a/batch-tool/src/main/java/worker/insert/ImportConsumer.java +++ b/batch-tool/src/main/java/worker/insert/ImportConsumer.java @@ -51,7 +51,7 @@ protected void fillLocalBuffer(StringBuilder stringBuilder, List values) stringBuilder.append("("); try { ImportUtil.appendValuesByFieldMetaInfo(stringBuilder, fieldMetaInfoList, - values, consumerContext.isSqlEscapeEnabled(), hasEscapedQuote); + values, consumerContext.isSqlEscapeEnabled(), consumerContext.isEmptyStrAsNull()); } catch (DatabaseException e) { // 在split预处理过后仍存在的问题 logger.error(StringUtils.join(values, ConfigConstant.MAGIC_CSV_SEP1)); diff --git a/batch-tool/src/main/java/worker/insert/ProcessOnlyImportConsumer.java b/batch-tool/src/main/java/worker/insert/ProcessOnlyImportConsumer.java index 72ea58d..a46dc25 100644 --- a/batch-tool/src/main/java/worker/insert/ProcessOnlyImportConsumer.java +++ b/batch-tool/src/main/java/worker/insert/ProcessOnlyImportConsumer.java @@ -66,7 +66,7 @@ public void onProxyEvent(BatchLineEvent event) { stringBuilder.append("("); try { ImportUtil.appendValuesByFieldMetaInfo(stringBuilder, fieldMetaInfoList, - values, consumerContext.isSqlEscapeEnabled(), hasEscapedQuote); + values, consumerContext.isSqlEscapeEnabled(), consumerContext.isEmptyStrAsNull()); } catch (DatabaseException e) { logger.error("Error {} at line: {}", e.getMessage(), line); // 去除括号 diff --git a/batch-tool/src/main/java/worker/insert/ShardedImportConsumer.java b/batch-tool/src/main/java/worker/insert/ShardedImportConsumer.java index c8fc5f3..fece7b7 100644 --- a/batch-tool/src/main/java/worker/insert/ShardedImportConsumer.java +++ b/batch-tool/src/main/java/worker/insert/ShardedImportConsumer.java @@ -34,7 +34,7 @@ protected void fillLocalBuffer(StringBuilder localBuffer, List fieldMetaInfoList) throws Throwable { localBuffer.append("("); ImportUtil.appendValuesByFieldMetaInfo(localBuffer, fieldMetaInfoList, - values, consumerContext.isSqlEscapeEnabled(), hasEscapedQuote); + values, consumerContext.isSqlEscapeEnabled(), consumerContext.isEmptyStrAsNull()); localBuffer.append("),"); } diff --git a/batch-tool/src/main/java/worker/util/ImportUtil.java b/batch-tool/src/main/java/worker/util/ImportUtil.java index 3acdc63..8e72c77 100644 --- a/batch-tool/src/main/java/worker/util/ImportUtil.java +++ b/batch-tool/src/main/java/worker/util/ImportUtil.java @@ -69,13 +69,19 @@ public static void getBatchInsertSql(StringBuilder insertSqlBuilder, } public static void appendInsertStrValue(StringBuilder sqlStringBuilder, String rawValue, - boolean sqlEscapeEnabled, boolean hasEscapedQuote) { + boolean sqlEscapeEnabled, boolean emptyStrAsNull) { if (rawValue.equals(FileUtil.NULL_ESC_STR)) { // NULL字段处理 sqlStringBuilder.append("NULL"); return; } + if (rawValue.isEmpty() && emptyStrAsNull) { + // 空字符串视作NULL + sqlStringBuilder.append("NULL"); + return; + } if (GlobalVar.IN_PERF_MODE) { + // 预设csv文件中的值已经带上了引号 sqlStringBuilder.append(rawValue); return; } @@ -109,9 +115,13 @@ private static String escapeSqlSpecialChar(String sqlValue) { return sqlValue; } - public static void appendInsertNonStrValue(StringBuilder sqlStringBuilder, String rawValue, - boolean hasEscapedQuote) { - if (rawValue.equals(FileUtil.NULL_ESC_STR)) { + /** + * 对于非字符串字段 + * 空值视为NULL + */ + public static void appendInsertNonStrValue(StringBuilder sqlStringBuilder, String rawValue) { + if (rawValue.equals(FileUtil.NULL_ESC_STR) + || rawValue.isEmpty()) { // NULL字段处理 sqlStringBuilder.append("NULL"); } else { @@ -122,7 +132,7 @@ public static void appendInsertNonStrValue(StringBuilder sqlStringBuilder, Strin public static void appendValuesByFieldMetaInfo(StringBuilder stringBuilder, List fieldMetaInfoList, List values, boolean sqlEscapeEnabled, - boolean hasEscapedQuote) throws DatabaseException { + boolean emptyStrAsNull) throws DatabaseException { if (fieldMetaInfoList.size() != values.size()) { throw new DatabaseException(String.format("required field size %d, " + "actual size %d", fieldMetaInfoList.size(), values.size())); @@ -131,16 +141,16 @@ public static void appendValuesByFieldMetaInfo(StringBuilder stringBuilder, for (int i = 0; i < fieldLen - 1; i++) { if (fieldMetaInfoList.get(i).needQuote()) { // 字符串和日期都需要单引号 - ImportUtil.appendInsertStrValue(stringBuilder, values.get(i), sqlEscapeEnabled, hasEscapedQuote); + ImportUtil.appendInsertStrValue(stringBuilder, values.get(i), sqlEscapeEnabled, emptyStrAsNull); } else { - ImportUtil.appendInsertNonStrValue(stringBuilder, values.get(i), hasEscapedQuote); + ImportUtil.appendInsertNonStrValue(stringBuilder, values.get(i)); } stringBuilder.append(","); } if (fieldMetaInfoList.get(fieldLen - 1).needQuote()) { - ImportUtil.appendInsertStrValue(stringBuilder, values.get(fieldLen - 1), sqlEscapeEnabled, hasEscapedQuote); + ImportUtil.appendInsertStrValue(stringBuilder, values.get(fieldLen - 1), sqlEscapeEnabled, emptyStrAsNull); } else { - ImportUtil.appendInsertNonStrValue(stringBuilder, values.get(fieldLen - 1), hasEscapedQuote); + ImportUtil.appendInsertNonStrValue(stringBuilder, values.get(fieldLen - 1)); } } @@ -148,10 +158,10 @@ public static void getDirectImportSql(StringBuilder stringBuilder, String tableName, List fieldMetaInfoList, List values, boolean sqlEscapeEnabled, - boolean hasEscapedQuote) throws DatabaseException { + boolean emptyStrAsNull) throws DatabaseException { stringBuilder.append("INSERT INTO `").append(tableName).append("` VALUES ("); appendValuesByFieldMetaInfo(stringBuilder, fieldMetaInfoList, values, - sqlEscapeEnabled, hasEscapedQuote); + sqlEscapeEnabled, emptyStrAsNull); stringBuilder.append(");"); }