Skip to content

Commit

Permalink
to #56171689 support import with filename prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
F-ca7 committed Apr 18, 2024
1 parent b6b4041 commit 1b4388b
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 31 deletions.
15 changes: 10 additions & 5 deletions batch-tool/src/main/java/cmd/CommandUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -509,11 +509,7 @@ private static void setDir(ConfigResult result, ExportConfig exportConfig) {
}

private static void setFilenamePrefix(ConfigResult result, ExportConfig exportConfig) {
if (result.hasOption(ARG_SHORT_PREFIX)) {
exportConfig.setFilenamePrefix(result.getOptionValue(ARG_SHORT_PREFIX));
} else {
exportConfig.setFilenamePrefix("");
}
exportConfig.setFilenamePrefix(getPrefix(result));
}

private static void setOrderBy(ConfigResult result, ExportConfig exportConfig) {
Expand Down Expand Up @@ -620,6 +616,7 @@ private static void configureProducerContext(ConfigResult result,
if (producerExecutionContext.getDdlMode() != DdlMode.NO_DDL) {
producerExecutionContext.setDdlFileLineRecordList(getDdlFileRecordList(result));
}
producerExecutionContext.setFilenamePrefix(getPrefix(result));
producerExecutionContext.setParallelism(getProducerParallelism(result));
producerExecutionContext.setReadBlockSizeInMb(getReadBlockSizeInMb(result));
producerExecutionContext.setWithHeader(getWithHeader(result));
Expand Down Expand Up @@ -706,6 +703,14 @@ private static int getProducerParallelism(ConfigResult result) {
}
}

private static String getPrefix(ConfigResult result) {
if (result.hasOption(ARG_SHORT_PREFIX)) {
return result.getOptionValue(ARG_SHORT_PREFIX);
} else {
return "";
}
}

private static QuoteEncloseMode getQuoteEncloseMode(ConfigResult result) {
if (result.hasOption(ARG_SHORT_QUOTE_ENCLOSE_MODE)) {
return QuoteEncloseMode.parseMode(result.getOptionValue(ARG_SHORT_QUOTE_ENCLOSE_MODE));
Expand Down
61 changes: 35 additions & 26 deletions batch-tool/src/main/java/exec/BaseExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import model.config.FileLineRecord;
import model.config.GlobalVar;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.DbUtil;
Expand Down Expand Up @@ -65,10 +66,9 @@

public abstract class BaseExecutor {
private static final Logger logger = LoggerFactory.getLogger(BaseExecutor.class);

private final DataSourceConfig dataSourceConfig;
protected final DataSource dataSource;
protected final BaseOperateCommand command;
private final DataSourceConfig dataSourceConfig;

public BaseExecutor(DataSourceConfig dataSourceConfig,
DataSource dataSource,
Expand All @@ -78,24 +78,6 @@ public BaseExecutor(DataSourceConfig dataSourceConfig,
this.command = baseCommand;
}

public void preCheck() {

}

protected void checkTableExists(List<String> tableNames) {
for (String tableName : tableNames) {
try (Connection connection = dataSource.getConnection()) {
if (!DbUtil.checkTableExists(connection, tableName)) {
throw new RuntimeException(String.format("Table [%s] does not exist", tableName));
}
} catch (SQLException | DatabaseException e) {
throw new RuntimeException(e.getMessage());
}
}
}

public abstract void execute();

public static BaseExecutor getExecutor(BaseOperateCommand command, DataSourceConfig dataSourceConfig,
DruidDataSource druid) {
if (command instanceof ExportCommand) {
Expand Down Expand Up @@ -124,6 +106,24 @@ private static BaseExecutor getExportExecutor(DataSourceConfig dataSourceConfig,
return new SingleThreadExportExecutor(dataSourceConfig, druid, command);
}

public void preCheck() {

}

protected void checkTableExists(List<String> tableNames) {
for (String tableName : tableNames) {
try (Connection connection = dataSource.getConnection()) {
if (!DbUtil.checkTableExists(connection, tableName)) {
throw new RuntimeException(String.format("Table [%s] does not exist", tableName));
}
} catch (SQLException | DatabaseException e) {
throw new RuntimeException(e.getMessage());
}
}
}

public abstract void execute();

/**
* 对生产者、消费者的上下文进行通用配置
* 并开始执行任务 等待结束
Expand All @@ -134,13 +134,14 @@ protected void configureCommonContextAndRun(Class<? extends BaseWorkHandler> cla
String tableName,
boolean usingBlockReader) {
List<FileLineRecord> fileLineRecordList =
getFileRecordList(producerExecutionContext.getDataFileLineRecordList(), tableName);
getFileRecordList(producerExecutionContext.getDataFileLineRecordList(), tableName,
producerExecutionContext.getFilenamePrefix());
if (CollectionUtils.isEmpty(fileLineRecordList)) {
if (command.isDbOperation()) {
logger.warn("Skip table {} operation since no filename matches", tableName);
return;
}
throw new IllegalArgumentException("No filename with suffix starts with table name: " + tableName);
throw new IllegalArgumentException("No filename matches table name with BatchTool format: " + tableName);
}
String producerType;
if (!usingBlockReader) {
Expand Down Expand Up @@ -172,9 +173,9 @@ protected void configureCommonContextAndRun(Class<? extends BaseWorkHandler> cla
consumerExecutionContext.setBatchTpsLimitPerConsumer((double) consumerExecutionContext.getTpsLimit()
/ (consumerNum * GlobalVar.EMIT_BATCH_SIZE));


ThreadPoolExecutor consumerThreadPool = MyThreadPool.createExecutorWithEnsure(clazz.getSimpleName() + "-consumer",
consumerNum);
ThreadPoolExecutor consumerThreadPool =
MyThreadPool.createExecutorWithEnsure(clazz.getSimpleName() + "-consumer",
consumerNum);
EventFactory<BatchLineEvent> factory = BatchLineEvent::new;
RingBuffer<BatchLineEvent> ringBuffer = MyWorkerPool.createRingBuffer(factory);

Expand Down Expand Up @@ -244,7 +245,8 @@ protected int getConsumerNum(ConsumerExecutionContext consumerExecutionContext)
/**
* 获取当前导入表对应的文件路径
*/
private List<FileLineRecord> getFileRecordList(List<FileLineRecord> allFilePathList, String tableName) {
private static List<FileLineRecord> getFileRecordList(List<FileLineRecord> allFilePathList, String tableName,
String filenamePrefix) {
if (allFilePathList == null || allFilePathList.isEmpty()) {
throw new IllegalArgumentException("File path list cannot be empty");
}
Expand All @@ -256,6 +258,13 @@ private List<FileLineRecord> getFileRecordList(List<FileLineRecord> allFilePathL
List<FileLineRecord> fileRecordList = allFilePathList.stream()
.filter(fileRecord -> {
String fileName = new File(fileRecord.getFilePath()).getName();

if (StringUtils.isNotBlank(filenamePrefix)) {
// 如果指定了文件名前缀,则匹配优先级最高
return fileName.startsWith(filenamePrefix);
}

// 按照 BatchTool 自身的导出命名规则进行匹配
if (!(fileName.length() >= tableName.length() + 2)) {
return false;
}
Expand Down
10 changes: 10 additions & 0 deletions batch-tool/src/main/java/model/ProducerExecutionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class ProducerExecutionContext extends BaseConfig {

private ThreadPoolExecutor producerExecutor;

private String filenamePrefix;

private List<FileLineRecord> dataFileRecordList;

private List<FileLineRecord> ddlFileRecordList;
Expand Down Expand Up @@ -96,6 +98,14 @@ public void setProducerExecutor(ThreadPoolExecutor producerExecutor) {
this.producerExecutor = producerExecutor;
}

public String getFilenamePrefix() {
return filenamePrefix;
}

public void setFilenamePrefix(String filenamePrefix) {
this.filenamePrefix = filenamePrefix;
}

public List<FileLineRecord> getDataFileLineRecordList() {
return dataFileRecordList;
}
Expand Down

0 comments on commit 1b4388b

Please sign in to comment.