diff --git a/batch-tool/src/main/java/cmd/CommandUtil.java b/batch-tool/src/main/java/cmd/CommandUtil.java index 931a2c2..a358afc 100644 --- a/batch-tool/src/main/java/cmd/CommandUtil.java +++ b/batch-tool/src/main/java/cmd/CommandUtil.java @@ -509,11 +509,7 @@ private static void setDir(ConfigResult result, ExportConfig exportConfig) { } private static void setFilenamePrefix(ConfigResult result, ExportConfig exportConfig) { - if (result.hasOption(ARG_SHORT_PREFIX)) { - exportConfig.setFilenamePrefix(result.getOptionValue(ARG_SHORT_PREFIX)); - } else { - exportConfig.setFilenamePrefix(""); - } + exportConfig.setFilenamePrefix(getPrefix(result)); } private static void setOrderBy(ConfigResult result, ExportConfig exportConfig) { @@ -620,6 +616,7 @@ private static void configureProducerContext(ConfigResult result, if (producerExecutionContext.getDdlMode() != DdlMode.NO_DDL) { producerExecutionContext.setDdlFileLineRecordList(getDdlFileRecordList(result)); } + producerExecutionContext.setFilenamePrefix(getPrefix(result)); producerExecutionContext.setParallelism(getProducerParallelism(result)); producerExecutionContext.setReadBlockSizeInMb(getReadBlockSizeInMb(result)); producerExecutionContext.setWithHeader(getWithHeader(result)); @@ -706,6 +703,14 @@ private static int getProducerParallelism(ConfigResult result) { } } + private static String getPrefix(ConfigResult result) { + if (result.hasOption(ARG_SHORT_PREFIX)) { + return result.getOptionValue(ARG_SHORT_PREFIX); + } else { + return ""; + } + } + private static QuoteEncloseMode getQuoteEncloseMode(ConfigResult result) { if (result.hasOption(ARG_SHORT_QUOTE_ENCLOSE_MODE)) { return QuoteEncloseMode.parseMode(result.getOptionValue(ARG_SHORT_QUOTE_ENCLOSE_MODE)); diff --git a/batch-tool/src/main/java/exec/BaseExecutor.java b/batch-tool/src/main/java/exec/BaseExecutor.java index d6c380e..9b1cac8 100644 --- a/batch-tool/src/main/java/exec/BaseExecutor.java +++ b/batch-tool/src/main/java/exec/BaseExecutor.java @@ -37,6 +37,7 @@ import model.config.FileLineRecord; import model.config.GlobalVar; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import util.DbUtil; @@ -65,10 +66,9 @@ public abstract class BaseExecutor { private static final Logger logger = LoggerFactory.getLogger(BaseExecutor.class); - - private final DataSourceConfig dataSourceConfig; protected final DataSource dataSource; protected final BaseOperateCommand command; + private final DataSourceConfig dataSourceConfig; public BaseExecutor(DataSourceConfig dataSourceConfig, DataSource dataSource, @@ -78,24 +78,6 @@ public BaseExecutor(DataSourceConfig dataSourceConfig, this.command = baseCommand; } - public void preCheck() { - - } - - protected void checkTableExists(List tableNames) { - for (String tableName : tableNames) { - try (Connection connection = dataSource.getConnection()) { - if (!DbUtil.checkTableExists(connection, tableName)) { - throw new RuntimeException(String.format("Table [%s] does not exist", tableName)); - } - } catch (SQLException | DatabaseException e) { - throw new RuntimeException(e.getMessage()); - } - } - } - - public abstract void execute(); - public static BaseExecutor getExecutor(BaseOperateCommand command, DataSourceConfig dataSourceConfig, DruidDataSource druid) { if (command instanceof ExportCommand) { @@ -124,6 +106,24 @@ private static BaseExecutor getExportExecutor(DataSourceConfig dataSourceConfig, return new SingleThreadExportExecutor(dataSourceConfig, druid, command); } + public void preCheck() { + + } + + protected void checkTableExists(List tableNames) { + for (String tableName : tableNames) { + try (Connection connection = dataSource.getConnection()) { + if (!DbUtil.checkTableExists(connection, tableName)) { + throw new RuntimeException(String.format("Table [%s] does not exist", tableName)); + } + } catch (SQLException | DatabaseException e) { + throw new RuntimeException(e.getMessage()); + } + } + } + + public abstract void execute(); + /** * 对生产者、消费者的上下文进行通用配置 * 并开始执行任务 等待结束 @@ -134,13 +134,14 @@ protected void configureCommonContextAndRun(Class cla String tableName, boolean usingBlockReader) { List fileLineRecordList = - getFileRecordList(producerExecutionContext.getDataFileLineRecordList(), tableName); + getFileRecordList(producerExecutionContext.getDataFileLineRecordList(), tableName, + producerExecutionContext.getFilenamePrefix()); if (CollectionUtils.isEmpty(fileLineRecordList)) { if (command.isDbOperation()) { logger.warn("Skip table {} operation since no filename matches", tableName); return; } - throw new IllegalArgumentException("No filename with suffix starts with table name: " + tableName); + throw new IllegalArgumentException("No filename matches table name with BatchTool format: " + tableName); } String producerType; if (!usingBlockReader) { @@ -172,9 +173,9 @@ protected void configureCommonContextAndRun(Class cla consumerExecutionContext.setBatchTpsLimitPerConsumer((double) consumerExecutionContext.getTpsLimit() / (consumerNum * GlobalVar.EMIT_BATCH_SIZE)); - - ThreadPoolExecutor consumerThreadPool = MyThreadPool.createExecutorWithEnsure(clazz.getSimpleName() + "-consumer", - consumerNum); + ThreadPoolExecutor consumerThreadPool = + MyThreadPool.createExecutorWithEnsure(clazz.getSimpleName() + "-consumer", + consumerNum); EventFactory factory = BatchLineEvent::new; RingBuffer ringBuffer = MyWorkerPool.createRingBuffer(factory); @@ -244,7 +245,8 @@ protected int getConsumerNum(ConsumerExecutionContext consumerExecutionContext) /** * 获取当前导入表对应的文件路径 */ - private List getFileRecordList(List allFilePathList, String tableName) { + private static List getFileRecordList(List allFilePathList, String tableName, + String filenamePrefix) { if (allFilePathList == null || allFilePathList.isEmpty()) { throw new IllegalArgumentException("File path list cannot be empty"); } @@ -256,6 +258,13 @@ private List getFileRecordList(List allFilePathL List fileRecordList = allFilePathList.stream() .filter(fileRecord -> { String fileName = new File(fileRecord.getFilePath()).getName(); + + if (StringUtils.isNotBlank(filenamePrefix)) { + // 如果指定了文件名前缀,则匹配优先级最高 + return fileName.startsWith(filenamePrefix); + } + + // 按照 BatchTool 自身的导出命名规则进行匹配 if (!(fileName.length() >= tableName.length() + 2)) { return false; } diff --git a/batch-tool/src/main/java/model/ProducerExecutionContext.java b/batch-tool/src/main/java/model/ProducerExecutionContext.java index 6cb3ba1..67ed5e1 100644 --- a/batch-tool/src/main/java/model/ProducerExecutionContext.java +++ b/batch-tool/src/main/java/model/ProducerExecutionContext.java @@ -42,6 +42,8 @@ public class ProducerExecutionContext extends BaseConfig { private ThreadPoolExecutor producerExecutor; + private String filenamePrefix; + private List dataFileRecordList; private List ddlFileRecordList; @@ -96,6 +98,14 @@ public void setProducerExecutor(ThreadPoolExecutor producerExecutor) { this.producerExecutor = producerExecutor; } + public String getFilenamePrefix() { + return filenamePrefix; + } + + public void setFilenamePrefix(String filenamePrefix) { + this.filenamePrefix = filenamePrefix; + } + public List getDataFileLineRecordList() { return dataFileRecordList; }