From b6b4041acb859f9e6300d33305705878151538fb Mon Sep 17 00:00:00 2001 From: F-ca7 Date: Wed, 17 Apr 2024 21:47:15 +0800 Subject: [PATCH 1/4] to #56171689 support only export specified partitions --- batch-tool/pom.xml | 2 +- batch-tool/src/main/java/cmd/CommandUtil.java | 29 +++++++ .../src/main/java/cmd/ConfigArgOption.java | 2 + .../exec/export/ShardingExportExecutor.java | 21 ++++- .../main/java/model/config/BaseConfig.java | 83 +++++++++++-------- .../java/worker/export/BaseExportWorker.java | 8 +- 6 files changed, 105 insertions(+), 40 deletions(-) diff --git a/batch-tool/pom.xml b/batch-tool/pom.xml index 13246ca..e4f8e55 100644 --- a/batch-tool/pom.xml +++ b/batch-tool/pom.xml @@ -6,7 +6,7 @@ com.alibaba.polardbx batch-tool - 1.4.0 + 1.4.1 UTF-8 diff --git a/batch-tool/src/main/java/cmd/CommandUtil.java b/batch-tool/src/main/java/cmd/CommandUtil.java index 891b2de..931a2c2 100644 --- a/batch-tool/src/main/java/cmd/CommandUtil.java +++ b/batch-tool/src/main/java/cmd/CommandUtil.java @@ -103,6 +103,7 @@ import static cmd.ConfigArgOption.ARG_SHORT_VERSION; import static cmd.ConfigArgOption.ARG_SHORT_WHERE; import static cmd.ConfigArgOption.ARG_SHORT_WITH_DDL; +import static cmd.ConfigArgOption.ARG_TBL_PART; import static cmd.FlagOption.ARG_DROP_TABLE_IF_EXISTS; import static cmd.FlagOption.ARG_EMPTY_AS_NULL; import static cmd.FlagOption.ARG_SHORT_ENABLE_SHARDING; @@ -462,10 +463,38 @@ private static ExportCommand parseExportCommand(ConfigResult result) { setFileLine(result, exportConfig); setOrderBy(result, exportConfig); setColumnMaskerMap(result, exportConfig); + setPartitions(result, exportConfig); exportConfig.validate(); return new ExportCommand(getDbName(result), tableNames, exportConfig); } + private static void setPartitions(ConfigResult result, ExportConfig exportConfig) { + if (result.hasOption(ARG_TBL_PART)) { + String partOpt = result.getOptionValue(ARG_TBL_PART); + if (StringUtils.isNotEmpty(partOpt)) { + String[] parts = StringUtils.split(partOpt, ":"); + if (parts.length == 2) { + int startPart = Integer.parseInt(parts[0]); + int endPart = Integer.parseInt(parts[1]); + if (startPart < 0) { + throw new IllegalArgumentException("Illegal start partition: " + startPart); + } + if (endPart < 0) { + throw new IllegalArgumentException("Illegal end partition: " + endPart); + } + if (endPart < startPart) { + throw new IllegalArgumentException( + "End partition should be greater than start partition: " + endPart); + } + exportConfig.setStartPart(startPart); + exportConfig.setEndPart(endPart); + return; + } + } + throw new IllegalArgumentException("Illegal table part option: " + partOpt); + } + } + private static void setDir(ConfigResult result, ExportConfig exportConfig) { if (result.hasOption(ARG_SHORT_DIRECTORY)) { String dirPath = result.getOptionValue(ARG_SHORT_DIRECTORY); diff --git a/batch-tool/src/main/java/cmd/ConfigArgOption.java b/batch-tool/src/main/java/cmd/ConfigArgOption.java index a3a4e64..d40b1f4 100644 --- a/batch-tool/src/main/java/cmd/ConfigArgOption.java +++ b/batch-tool/src/main/java/cmd/ConfigArgOption.java @@ -112,6 +112,8 @@ public class ConfigArgOption { of("benchmark", "benchmark", "Fast loading benchmark data (dafault NONE).", "NONE | TPCH"); public static final ConfigArgOption ARG_SHORT_SCALE = of("scale", "scale", "The size scale benchmark data (GB for tpch).", "size"); + public static final ConfigArgOption ARG_TBL_PART = + of("part", "tblPart", "Partitions of the target tables, starting from 0 (both inclusive).", "start:end"); protected final String argShort; protected final String argLong; diff --git a/batch-tool/src/main/java/exec/export/ShardingExportExecutor.java b/batch-tool/src/main/java/exec/export/ShardingExportExecutor.java index fef72d8..83bd3a6 100644 --- a/batch-tool/src/main/java/exec/export/ShardingExportExecutor.java +++ b/batch-tool/src/main/java/exec/export/ShardingExportExecutor.java @@ -44,6 +44,7 @@ import java.sql.Connection; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; @@ -87,6 +88,23 @@ private void doExportWithSharding(String tableName) { throw new RuntimeException(e); } + int startOffset = 0; + if (config.getStartPart() >= 0 && config.getEndPart() >= 0) { + int startPart = config.getStartPart(); + int endPart = config.getEndPart(); + if (endPart >= topologyList.size()) { + throw new IllegalArgumentException( + String.format("EndPart %d is larger than topology size %d", endPart, topologyList.size())); + } + startOffset = startPart; + // topologyList is in order + List newTopologyList = new ArrayList<>(topologyList.subList(startPart, endPart + 1)); + topologyList = newTopologyList; + logger.info("Exporting partitions {}~{} of table {}", startPart, endPart, tableName); + } else { + logger.info("Exporting all partitions of table {}, total part count: {}", tableName, topologyList.size()); + } + try (Connection connection = dataSource.getConnection()) { boolean isBroadCast = DbUtil.isBroadCast(connection, tableName); if (isBroadCast) { @@ -109,9 +127,10 @@ private void doExportWithSharding(String tableName) { case MAX_LINE_NUM_IN_SINGLE_FILE: case DEFAULT: for (int i = 0; i < shardSize; i++) { + int suffix = startOffset + i; directExportWorker = ExportWorkerFactory.buildDefaultDirectExportWorker(dataSource, topologyList.get(i), tableFieldMetaInfo, - filePathPrefix + i, config); + filePathPrefix + suffix, config); directExportWorker.setCountDownLatch(countDownLatch); directExportWorker.setPermitted(permitted); executor.submit(directExportWorker); diff --git a/batch-tool/src/main/java/model/config/BaseConfig.java b/batch-tool/src/main/java/model/config/BaseConfig.java index d46595e..092b680 100644 --- a/batch-tool/src/main/java/model/config/BaseConfig.java +++ b/batch-tool/src/main/java/model/config/BaseConfig.java @@ -24,36 +24,7 @@ */ public class BaseConfig { - private static class FileMode { - static final byte COMPRESS_FLAG = 1; - static final byte ENCRYPTION_FLAG = 1 << 2; - static final byte FILE_FORMAT_FLAG = 1 << 3; - - byte flag = 0; - - void setCompress() { - this.flag |= COMPRESS_FLAG; - } - - void setEncryption() { - this.flag |= ENCRYPTION_FLAG; - } - - void setFileFormat() { - this.flag |= FILE_FORMAT_FLAG; - } - - int bitCount() { - int count = 0; - byte n = flag; - while (n != 0) { - n &= n - 1; - count++; - } - return count; - } - } - + private final FileMode fileMode = new FileMode(); /** * 分隔符 */ @@ -84,17 +55,31 @@ int bitCount() { * 引号模式 */ protected QuoteEncloseMode quoteEncloseMode; - - private final FileMode fileMode = new FileMode(); - private boolean isWithLastSep = false; - private boolean isWithView = false; + private int startPart = -1; + private int endPart = -1; public BaseConfig(boolean shardingEnabled) { this.shardingEnabled = shardingEnabled; } + public int getStartPart() { + return startPart; + } + + public void setStartPart(int startPart) { + this.startPart = startPart; + } + + public int getEndPart() { + return endPart; + } + + public void setEndPart(int endPart) { + this.endPart = endPart; + } + public String getSeparator() { return separator; } @@ -231,4 +216,34 @@ public String toString() { ", encryptionConfig='" + encryptionConfig + '\'' + '}'; } + + private static class FileMode { + static final byte COMPRESS_FLAG = 1; + static final byte ENCRYPTION_FLAG = 1 << 2; + static final byte FILE_FORMAT_FLAG = 1 << 3; + + byte flag = 0; + + void setCompress() { + this.flag |= COMPRESS_FLAG; + } + + void setEncryption() { + this.flag |= ENCRYPTION_FLAG; + } + + void setFileFormat() { + this.flag |= FILE_FORMAT_FLAG; + } + + int bitCount() { + int count = 0; + byte n = flag; + while (n != 0) { + n &= n - 1; + count++; + } + return count; + } + } } diff --git a/batch-tool/src/main/java/worker/export/BaseExportWorker.java b/batch-tool/src/main/java/worker/export/BaseExportWorker.java index 27af13d..c9cd7dd 100644 --- a/batch-tool/src/main/java/worker/export/BaseExportWorker.java +++ b/batch-tool/src/main/java/worker/export/BaseExportWorker.java @@ -168,10 +168,6 @@ protected void afterProduceData() { * @param columnIdx 从 0 开始 */ protected void writeFieldValue(ByteArrayOutputStream os, byte[] value, int columnIdx) throws IOException { - if (columnDataMaskerList != null && columnDataMaskerList.get(columnIdx) != null) { - value = columnDataMaskerList.get(columnIdx).doMask(value); - } - boolean isStringType = isStringTypeList.get(columnIdx); switch (quoteEncloseMode) { case NONE: FileUtil.writeToByteArrayStream(os, value); @@ -180,6 +176,10 @@ protected void writeFieldValue(ByteArrayOutputStream os, byte[] value, int colum FileUtil.writeToByteArrayStreamWithQuote(os, value); break; case AUTO: + if (columnDataMaskerList != null && columnDataMaskerList.get(columnIdx) != null) { + value = columnDataMaskerList.get(columnIdx).doMask(value); + } + boolean isStringType = isStringTypeList.get(columnIdx); if (!isStringType) { FileUtil.writeToByteArrayStream(os, value); } else { From 1b4388b516f8e9f06c386045f841125b673560d5 Mon Sep 17 00:00:00 2001 From: F-ca7 Date: Thu, 18 Apr 2024 10:25:09 +0800 Subject: [PATCH 2/4] to #56171689 support import with filename prefix --- batch-tool/src/main/java/cmd/CommandUtil.java | 15 +++-- .../src/main/java/exec/BaseExecutor.java | 61 +++++++++++-------- .../java/model/ProducerExecutionContext.java | 10 +++ 3 files changed, 55 insertions(+), 31 deletions(-) diff --git a/batch-tool/src/main/java/cmd/CommandUtil.java b/batch-tool/src/main/java/cmd/CommandUtil.java index 931a2c2..a358afc 100644 --- a/batch-tool/src/main/java/cmd/CommandUtil.java +++ b/batch-tool/src/main/java/cmd/CommandUtil.java @@ -509,11 +509,7 @@ private static void setDir(ConfigResult result, ExportConfig exportConfig) { } private static void setFilenamePrefix(ConfigResult result, ExportConfig exportConfig) { - if (result.hasOption(ARG_SHORT_PREFIX)) { - exportConfig.setFilenamePrefix(result.getOptionValue(ARG_SHORT_PREFIX)); - } else { - exportConfig.setFilenamePrefix(""); - } + exportConfig.setFilenamePrefix(getPrefix(result)); } private static void setOrderBy(ConfigResult result, ExportConfig exportConfig) { @@ -620,6 +616,7 @@ private static void configureProducerContext(ConfigResult result, if (producerExecutionContext.getDdlMode() != DdlMode.NO_DDL) { producerExecutionContext.setDdlFileLineRecordList(getDdlFileRecordList(result)); } + producerExecutionContext.setFilenamePrefix(getPrefix(result)); producerExecutionContext.setParallelism(getProducerParallelism(result)); producerExecutionContext.setReadBlockSizeInMb(getReadBlockSizeInMb(result)); producerExecutionContext.setWithHeader(getWithHeader(result)); @@ -706,6 +703,14 @@ private static int getProducerParallelism(ConfigResult result) { } } + private static String getPrefix(ConfigResult result) { + if (result.hasOption(ARG_SHORT_PREFIX)) { + return result.getOptionValue(ARG_SHORT_PREFIX); + } else { + return ""; + } + } + private static QuoteEncloseMode getQuoteEncloseMode(ConfigResult result) { if (result.hasOption(ARG_SHORT_QUOTE_ENCLOSE_MODE)) { return QuoteEncloseMode.parseMode(result.getOptionValue(ARG_SHORT_QUOTE_ENCLOSE_MODE)); diff --git a/batch-tool/src/main/java/exec/BaseExecutor.java b/batch-tool/src/main/java/exec/BaseExecutor.java index d6c380e..9b1cac8 100644 --- a/batch-tool/src/main/java/exec/BaseExecutor.java +++ b/batch-tool/src/main/java/exec/BaseExecutor.java @@ -37,6 +37,7 @@ import model.config.FileLineRecord; import model.config.GlobalVar; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import util.DbUtil; @@ -65,10 +66,9 @@ public abstract class BaseExecutor { private static final Logger logger = LoggerFactory.getLogger(BaseExecutor.class); - - private final DataSourceConfig dataSourceConfig; protected final DataSource dataSource; protected final BaseOperateCommand command; + private final DataSourceConfig dataSourceConfig; public BaseExecutor(DataSourceConfig dataSourceConfig, DataSource dataSource, @@ -78,24 +78,6 @@ public BaseExecutor(DataSourceConfig dataSourceConfig, this.command = baseCommand; } - public void preCheck() { - - } - - protected void checkTableExists(List tableNames) { - for (String tableName : tableNames) { - try (Connection connection = dataSource.getConnection()) { - if (!DbUtil.checkTableExists(connection, tableName)) { - throw new RuntimeException(String.format("Table [%s] does not exist", tableName)); - } - } catch (SQLException | DatabaseException e) { - throw new RuntimeException(e.getMessage()); - } - } - } - - public abstract void execute(); - public static BaseExecutor getExecutor(BaseOperateCommand command, DataSourceConfig dataSourceConfig, DruidDataSource druid) { if (command instanceof ExportCommand) { @@ -124,6 +106,24 @@ private static BaseExecutor getExportExecutor(DataSourceConfig dataSourceConfig, return new SingleThreadExportExecutor(dataSourceConfig, druid, command); } + public void preCheck() { + + } + + protected void checkTableExists(List tableNames) { + for (String tableName : tableNames) { + try (Connection connection = dataSource.getConnection()) { + if (!DbUtil.checkTableExists(connection, tableName)) { + throw new RuntimeException(String.format("Table [%s] does not exist", tableName)); + } + } catch (SQLException | DatabaseException e) { + throw new RuntimeException(e.getMessage()); + } + } + } + + public abstract void execute(); + /** * 对生产者、消费者的上下文进行通用配置 * 并开始执行任务 等待结束 @@ -134,13 +134,14 @@ protected void configureCommonContextAndRun(Class cla String tableName, boolean usingBlockReader) { List fileLineRecordList = - getFileRecordList(producerExecutionContext.getDataFileLineRecordList(), tableName); + getFileRecordList(producerExecutionContext.getDataFileLineRecordList(), tableName, + producerExecutionContext.getFilenamePrefix()); if (CollectionUtils.isEmpty(fileLineRecordList)) { if (command.isDbOperation()) { logger.warn("Skip table {} operation since no filename matches", tableName); return; } - throw new IllegalArgumentException("No filename with suffix starts with table name: " + tableName); + throw new IllegalArgumentException("No filename matches table name with BatchTool format: " + tableName); } String producerType; if (!usingBlockReader) { @@ -172,9 +173,9 @@ protected void configureCommonContextAndRun(Class cla consumerExecutionContext.setBatchTpsLimitPerConsumer((double) consumerExecutionContext.getTpsLimit() / (consumerNum * GlobalVar.EMIT_BATCH_SIZE)); - - ThreadPoolExecutor consumerThreadPool = MyThreadPool.createExecutorWithEnsure(clazz.getSimpleName() + "-consumer", - consumerNum); + ThreadPoolExecutor consumerThreadPool = + MyThreadPool.createExecutorWithEnsure(clazz.getSimpleName() + "-consumer", + consumerNum); EventFactory factory = BatchLineEvent::new; RingBuffer ringBuffer = MyWorkerPool.createRingBuffer(factory); @@ -244,7 +245,8 @@ protected int getConsumerNum(ConsumerExecutionContext consumerExecutionContext) /** * 获取当前导入表对应的文件路径 */ - private List getFileRecordList(List allFilePathList, String tableName) { + private static List getFileRecordList(List allFilePathList, String tableName, + String filenamePrefix) { if (allFilePathList == null || allFilePathList.isEmpty()) { throw new IllegalArgumentException("File path list cannot be empty"); } @@ -256,6 +258,13 @@ private List getFileRecordList(List allFilePathL List fileRecordList = allFilePathList.stream() .filter(fileRecord -> { String fileName = new File(fileRecord.getFilePath()).getName(); + + if (StringUtils.isNotBlank(filenamePrefix)) { + // 如果指定了文件名前缀,则匹配优先级最高 + return fileName.startsWith(filenamePrefix); + } + + // 按照 BatchTool 自身的导出命名规则进行匹配 if (!(fileName.length() >= tableName.length() + 2)) { return false; } diff --git a/batch-tool/src/main/java/model/ProducerExecutionContext.java b/batch-tool/src/main/java/model/ProducerExecutionContext.java index 6cb3ba1..67ed5e1 100644 --- a/batch-tool/src/main/java/model/ProducerExecutionContext.java +++ b/batch-tool/src/main/java/model/ProducerExecutionContext.java @@ -42,6 +42,8 @@ public class ProducerExecutionContext extends BaseConfig { private ThreadPoolExecutor producerExecutor; + private String filenamePrefix; + private List dataFileRecordList; private List ddlFileRecordList; @@ -96,6 +98,14 @@ public void setProducerExecutor(ThreadPoolExecutor producerExecutor) { this.producerExecutor = producerExecutor; } + public String getFilenamePrefix() { + return filenamePrefix; + } + + public void setFilenamePrefix(String filenamePrefix) { + this.filenamePrefix = filenamePrefix; + } + public List getDataFileLineRecordList() { return dataFileRecordList; } From b96554f3c7502d8dbbdb6faaefc40dceeb0e66f6 Mon Sep 17 00:00:00 2001 From: F-ca7 Date: Fri, 10 May 2024 15:18:14 +0800 Subject: [PATCH 3/4] to #56171689 add csv max multiline limit --- batch-tool/docs/usage-details.md | 12 ++++++++++++ batch-tool/src/main/java/model/config/GlobalVar.java | 5 +++++ .../main/java/worker/common/reader/CsvReader.java | 3 ++- 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/batch-tool/docs/usage-details.md b/batch-tool/docs/usage-details.md index a5b0361..ac8baa3 100644 --- a/batch-tool/docs/usage-details.md +++ b/batch-tool/docs/usage-details.md @@ -252,3 +252,15 @@ mask: >- **原因**:BatchTool 在 v1.4.0 以前的版本,在导出整库时会默认导出所有表和视图的数据,无法控制是否导出视图 **解决**:自v1.4.0开始,可指定参数`-withView true`来导出视图数据 + +14. 指定目录导入时报错:`No filename with suffix starts with table name...` 或者 `No filename matches table name...` + + **原因**:BatchTool 为了适配自身整库导出的文件名规则,要求文件名必须以表名开头,且文件名后缀满足一定规则(如分区表的分区下标); + 因此对于包含了自定义文件名导出的目录时,BatchTool 无法对,进而会报错 + + **解决**:自v1.4.1开始,导入时可指定参数`--prefix "${文件名前缀}"`来指定目录里与导入表匹配的文件名前缀 + +15. 导出 PolarDB-X 某张表的指定物理分片 + + **解决**:例如某张表有128个物理分片,想导出第0号分片至第63号分片; + 自v1.4.1开始,可指定参数`-part 0:63`来导出第0号分片至第63号分片 \ No newline at end of file diff --git a/batch-tool/src/main/java/model/config/GlobalVar.java b/batch-tool/src/main/java/model/config/GlobalVar.java index 21fc63d..62c3c1d 100644 --- a/batch-tool/src/main/java/model/config/GlobalVar.java +++ b/batch-tool/src/main/java/model/config/GlobalVar.java @@ -56,6 +56,11 @@ public class GlobalVar { */ public static int TPCH_UPDATE_INSERT_BATCH_NUM = 20; + /** + * max line count for single csv row + */ + public static int MAX_CSV_MULTI_LINE = 500; + public static void setTpchUpdateBatchSize(int batchSize) { if (batchSize >= BaseOrderLineUpdateGenerator.SCALE_BASE) { throw new IllegalArgumentException( diff --git a/batch-tool/src/main/java/worker/common/reader/CsvReader.java b/batch-tool/src/main/java/worker/common/reader/CsvReader.java index e78f477..cb1272f 100644 --- a/batch-tool/src/main/java/worker/common/reader/CsvReader.java +++ b/batch-tool/src/main/java/worker/common/reader/CsvReader.java @@ -24,6 +24,7 @@ import com.opencsv.exceptions.CsvValidationException; import model.ProducerExecutionContext; import model.config.ConfigConstant; +import model.config.GlobalVar; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import util.IOUtil; @@ -54,7 +55,7 @@ public CsvReader(ProducerExecutionContext context, try { this.reader = new CSVReaderBuilder(new InputStreamReader( new FileInputStream(fileList.get(fileIndex).getAbsolutePath()), context.getCharset())) - .withCSVParser(parser).build(); + .withCSVParser(parser).withMultilineLimit(GlobalVar.MAX_CSV_MULTI_LINE).build(); } catch (FileNotFoundException e) { throw new RuntimeException(e.getMessage()); } From f8115a1fd3f19c7eb3bc0dba0240ac2eaa23a65c Mon Sep 17 00:00:00 2001 From: F-ca7 Date: Fri, 24 May 2024 17:08:48 +0800 Subject: [PATCH 4/4] to #56171689 support retry when error occurs during importing --- batch-tool/docs/usage-details.md | 8 +++++++- .../src/main/java/cmd/ConfigArgOption.java | 2 +- .../worker/common/BaseDefaultConsumer.java | 18 +++++++++++++++++- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/batch-tool/docs/usage-details.md b/batch-tool/docs/usage-details.md index ac8baa3..bc63c2e 100644 --- a/batch-tool/docs/usage-details.md +++ b/batch-tool/docs/usage-details.md @@ -263,4 +263,10 @@ mask: >- 15. 导出 PolarDB-X 某张表的指定物理分片 **解决**:例如某张表有128个物理分片,想导出第0号分片至第63号分片; - 自v1.4.1开始,可指定参数`-part 0:63`来导出第0号分片至第63号分片 \ No newline at end of file + 自v1.4.1开始,可指定参数`-part 0:63`来导出第0号分片至第63号分片 + +16. 导入时,由于一些数据库侧的偶发报错,希望能自动重试 + + **原因**:BatchTool 默认情况下,导入失败会直接退出,不会自动重试 + + **解决**:自v1.4.1开始,可指定参数`-maxError 10`来指定最大错误重试次数为10次,目前暂不支持根据错误码来进行重试 \ No newline at end of file diff --git a/batch-tool/src/main/java/cmd/ConfigArgOption.java b/batch-tool/src/main/java/cmd/ConfigArgOption.java index d40b1f4..27a2bba 100644 --- a/batch-tool/src/main/java/cmd/ConfigArgOption.java +++ b/batch-tool/src/main/java/cmd/ConfigArgOption.java @@ -100,7 +100,7 @@ public class ConfigArgOption { public static final ConfigArgOption ARG_SHORT_FILE_FORMAT = of("format", "fileFormat", "File format (default NONE).", "NONE | TXT | CSV | XLS | XLSX"); public static final ConfigArgOption ARG_SHORT_MAX_ERROR = - of("error", "maxError", "Max error count threshold, program exits when the limit is exceeded.", + of("maxError", "maxError", "Max error count threshold, program exits when the limit is exceeded.", "max error count"); public static final ConfigArgOption ARG_SHORT_MASK = of("mask", "mask", "Masking sensitive columns while exporting data.", "Json format config"); diff --git a/batch-tool/src/main/java/worker/common/BaseDefaultConsumer.java b/batch-tool/src/main/java/worker/common/BaseDefaultConsumer.java index 238cd74..a39e900 100644 --- a/batch-tool/src/main/java/worker/common/BaseDefaultConsumer.java +++ b/batch-tool/src/main/java/worker/common/BaseDefaultConsumer.java @@ -38,9 +38,11 @@ public abstract class BaseDefaultConsumer extends BaseWorkHandler { protected int estimateFieldCount = 16; protected final SqlStat sqlStat = new SqlStat(); + protected int maxRetry = 0; protected void initLocalVars() { super.initLocalVars(); + maxRetry = consumerContext.getMaxRetry(); } @Override @@ -107,7 +109,21 @@ protected void execSql(StringBuilder data) throws SQLException { stmt = conn.createStatement(); sql = getSql(data); long startTime = System.nanoTime(); - stmt.execute(sql); + for (int i = 0; i <= maxRetry; i++) { + try { + stmt.execute(sql); + break; + } catch (SQLException e) { + logger.error("Error executing SQL (retry count: {}): {}", + i, e.getMessage()); + // 如果达到最大重试次数,抛出异常 + if (i >= maxRetry) { + throw e; + } + // 暂不添加延迟逻辑 + } + } + long endTime = System.nanoTime(); sqlStat.addTimeNs(endTime - startTime); } catch (SQLException e) {