Skip to content

Commit

Permalink
Merge pull request #25 from F-ca7/feature/v1_4_1
Browse files Browse the repository at this point in the history
Feature/v1_4_1
  • Loading branch information
F-ca7 authored May 24, 2024
2 parents 6feabb0 + f8115a1 commit 17b80cf
Show file tree
Hide file tree
Showing 12 changed files with 203 additions and 74 deletions.
18 changes: 18 additions & 0 deletions batch-tool/docs/usage-details.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,21 @@ mask: >-
**原因**:BatchTool 在 v1.4.0 以前的版本,在导出整库时会默认导出所有表和视图的数据,无法控制是否导出视图

**解决**:自v1.4.0开始,可指定参数`-withView true`来导出视图数据

14. 指定目录导入时报错:`No filename with suffix starts with table name...` 或者 `No filename matches table name...`

**原因**:BatchTool 为了适配自身整库导出的文件名规则,要求文件名必须以表名开头,且文件名后缀满足一定规则(如分区表的分区下标);
因此对于包含了自定义文件名导出的目录时,BatchTool 无法对,进而会报错

**解决**:自v1.4.1开始,导入时可指定参数`--prefix "${文件名前缀}"`来指定目录里与导入表匹配的文件名前缀

15. 导出 PolarDB-X 某张表的指定物理分片

**解决**:例如某张表有128个物理分片,想导出第0号分片至第63号分片;
自v1.4.1开始,可指定参数`-part 0:63`来导出第0号分片至第63号分片

16. 导入时,由于一些数据库侧的偶发报错,希望能自动重试

**原因**:BatchTool 默认情况下,导入失败会直接退出,不会自动重试

**解决**:自v1.4.1开始,可指定参数`-maxError 10`来指定最大错误重试次数为10次,目前暂不支持根据错误码来进行重试
2 changes: 1 addition & 1 deletion batch-tool/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.alibaba.polardbx</groupId>
<artifactId>batch-tool</artifactId>
<version>1.4.0</version>
<version>1.4.1</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
44 changes: 39 additions & 5 deletions batch-tool/src/main/java/cmd/CommandUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import static cmd.ConfigArgOption.ARG_SHORT_VERSION;
import static cmd.ConfigArgOption.ARG_SHORT_WHERE;
import static cmd.ConfigArgOption.ARG_SHORT_WITH_DDL;
import static cmd.ConfigArgOption.ARG_TBL_PART;
import static cmd.FlagOption.ARG_DROP_TABLE_IF_EXISTS;
import static cmd.FlagOption.ARG_EMPTY_AS_NULL;
import static cmd.FlagOption.ARG_SHORT_ENABLE_SHARDING;
Expand Down Expand Up @@ -462,10 +463,38 @@ private static ExportCommand parseExportCommand(ConfigResult result) {
setFileLine(result, exportConfig);
setOrderBy(result, exportConfig);
setColumnMaskerMap(result, exportConfig);
setPartitions(result, exportConfig);
exportConfig.validate();
return new ExportCommand(getDbName(result), tableNames, exportConfig);
}

private static void setPartitions(ConfigResult result, ExportConfig exportConfig) {
if (result.hasOption(ARG_TBL_PART)) {
String partOpt = result.getOptionValue(ARG_TBL_PART);
if (StringUtils.isNotEmpty(partOpt)) {
String[] parts = StringUtils.split(partOpt, ":");
if (parts.length == 2) {
int startPart = Integer.parseInt(parts[0]);
int endPart = Integer.parseInt(parts[1]);
if (startPart < 0) {
throw new IllegalArgumentException("Illegal start partition: " + startPart);
}
if (endPart < 0) {
throw new IllegalArgumentException("Illegal end partition: " + endPart);
}
if (endPart < startPart) {
throw new IllegalArgumentException(
"End partition should be greater than start partition: " + endPart);
}
exportConfig.setStartPart(startPart);
exportConfig.setEndPart(endPart);
return;
}
}
throw new IllegalArgumentException("Illegal table part option: " + partOpt);
}
}

private static void setDir(ConfigResult result, ExportConfig exportConfig) {
if (result.hasOption(ARG_SHORT_DIRECTORY)) {
String dirPath = result.getOptionValue(ARG_SHORT_DIRECTORY);
Expand All @@ -480,11 +509,7 @@ private static void setDir(ConfigResult result, ExportConfig exportConfig) {
}

private static void setFilenamePrefix(ConfigResult result, ExportConfig exportConfig) {
if (result.hasOption(ARG_SHORT_PREFIX)) {
exportConfig.setFilenamePrefix(result.getOptionValue(ARG_SHORT_PREFIX));
} else {
exportConfig.setFilenamePrefix("");
}
exportConfig.setFilenamePrefix(getPrefix(result));
}

private static void setOrderBy(ConfigResult result, ExportConfig exportConfig) {
Expand Down Expand Up @@ -591,6 +616,7 @@ private static void configureProducerContext(ConfigResult result,
if (producerExecutionContext.getDdlMode() != DdlMode.NO_DDL) {
producerExecutionContext.setDdlFileLineRecordList(getDdlFileRecordList(result));
}
producerExecutionContext.setFilenamePrefix(getPrefix(result));
producerExecutionContext.setParallelism(getProducerParallelism(result));
producerExecutionContext.setReadBlockSizeInMb(getReadBlockSizeInMb(result));
producerExecutionContext.setWithHeader(getWithHeader(result));
Expand Down Expand Up @@ -677,6 +703,14 @@ private static int getProducerParallelism(ConfigResult result) {
}
}

private static String getPrefix(ConfigResult result) {
if (result.hasOption(ARG_SHORT_PREFIX)) {
return result.getOptionValue(ARG_SHORT_PREFIX);
} else {
return "";
}
}

private static QuoteEncloseMode getQuoteEncloseMode(ConfigResult result) {
if (result.hasOption(ARG_SHORT_QUOTE_ENCLOSE_MODE)) {
return QuoteEncloseMode.parseMode(result.getOptionValue(ARG_SHORT_QUOTE_ENCLOSE_MODE));
Expand Down
4 changes: 3 additions & 1 deletion batch-tool/src/main/java/cmd/ConfigArgOption.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class ConfigArgOption {
public static final ConfigArgOption ARG_SHORT_FILE_FORMAT =
of("format", "fileFormat", "File format (default NONE).", "NONE | TXT | CSV | XLS | XLSX");
public static final ConfigArgOption ARG_SHORT_MAX_ERROR =
of("error", "maxError", "Max error count threshold, program exits when the limit is exceeded.",
of("maxError", "maxError", "Max error count threshold, program exits when the limit is exceeded.",
"max error count");
public static final ConfigArgOption ARG_SHORT_MASK =
of("mask", "mask", "Masking sensitive columns while exporting data.", "Json format config");
Expand All @@ -112,6 +112,8 @@ public class ConfigArgOption {
of("benchmark", "benchmark", "Fast loading benchmark data (dafault NONE).", "NONE | TPCH");
public static final ConfigArgOption ARG_SHORT_SCALE =
of("scale", "scale", "The size scale benchmark data (GB for tpch).", "size");
public static final ConfigArgOption ARG_TBL_PART =
of("part", "tblPart", "Partitions of the target tables, starting from 0 (both inclusive).", "start:end");

protected final String argShort;
protected final String argLong;
Expand Down
61 changes: 35 additions & 26 deletions batch-tool/src/main/java/exec/BaseExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import model.config.FileLineRecord;
import model.config.GlobalVar;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.DbUtil;
Expand Down Expand Up @@ -65,10 +66,9 @@

public abstract class BaseExecutor {
private static final Logger logger = LoggerFactory.getLogger(BaseExecutor.class);

private final DataSourceConfig dataSourceConfig;
protected final DataSource dataSource;
protected final BaseOperateCommand command;
private final DataSourceConfig dataSourceConfig;

public BaseExecutor(DataSourceConfig dataSourceConfig,
DataSource dataSource,
Expand All @@ -78,24 +78,6 @@ public BaseExecutor(DataSourceConfig dataSourceConfig,
this.command = baseCommand;
}

public void preCheck() {

}

protected void checkTableExists(List<String> tableNames) {
for (String tableName : tableNames) {
try (Connection connection = dataSource.getConnection()) {
if (!DbUtil.checkTableExists(connection, tableName)) {
throw new RuntimeException(String.format("Table [%s] does not exist", tableName));
}
} catch (SQLException | DatabaseException e) {
throw new RuntimeException(e.getMessage());
}
}
}

public abstract void execute();

public static BaseExecutor getExecutor(BaseOperateCommand command, DataSourceConfig dataSourceConfig,
DruidDataSource druid) {
if (command instanceof ExportCommand) {
Expand Down Expand Up @@ -124,6 +106,24 @@ private static BaseExecutor getExportExecutor(DataSourceConfig dataSourceConfig,
return new SingleThreadExportExecutor(dataSourceConfig, druid, command);
}

public void preCheck() {

}

protected void checkTableExists(List<String> tableNames) {
for (String tableName : tableNames) {
try (Connection connection = dataSource.getConnection()) {
if (!DbUtil.checkTableExists(connection, tableName)) {
throw new RuntimeException(String.format("Table [%s] does not exist", tableName));
}
} catch (SQLException | DatabaseException e) {
throw new RuntimeException(e.getMessage());
}
}
}

public abstract void execute();

/**
* 对生产者、消费者的上下文进行通用配置
* 并开始执行任务 等待结束
Expand All @@ -134,13 +134,14 @@ protected void configureCommonContextAndRun(Class<? extends BaseWorkHandler> cla
String tableName,
boolean usingBlockReader) {
List<FileLineRecord> fileLineRecordList =
getFileRecordList(producerExecutionContext.getDataFileLineRecordList(), tableName);
getFileRecordList(producerExecutionContext.getDataFileLineRecordList(), tableName,
producerExecutionContext.getFilenamePrefix());
if (CollectionUtils.isEmpty(fileLineRecordList)) {
if (command.isDbOperation()) {
logger.warn("Skip table {} operation since no filename matches", tableName);
return;
}
throw new IllegalArgumentException("No filename with suffix starts with table name: " + tableName);
throw new IllegalArgumentException("No filename matches table name with BatchTool format: " + tableName);
}
String producerType;
if (!usingBlockReader) {
Expand Down Expand Up @@ -172,9 +173,9 @@ protected void configureCommonContextAndRun(Class<? extends BaseWorkHandler> cla
consumerExecutionContext.setBatchTpsLimitPerConsumer((double) consumerExecutionContext.getTpsLimit()
/ (consumerNum * GlobalVar.EMIT_BATCH_SIZE));


ThreadPoolExecutor consumerThreadPool = MyThreadPool.createExecutorWithEnsure(clazz.getSimpleName() + "-consumer",
consumerNum);
ThreadPoolExecutor consumerThreadPool =
MyThreadPool.createExecutorWithEnsure(clazz.getSimpleName() + "-consumer",
consumerNum);
EventFactory<BatchLineEvent> factory = BatchLineEvent::new;
RingBuffer<BatchLineEvent> ringBuffer = MyWorkerPool.createRingBuffer(factory);

Expand Down Expand Up @@ -244,7 +245,8 @@ protected int getConsumerNum(ConsumerExecutionContext consumerExecutionContext)
/**
* 获取当前导入表对应的文件路径
*/
private List<FileLineRecord> getFileRecordList(List<FileLineRecord> allFilePathList, String tableName) {
private static List<FileLineRecord> getFileRecordList(List<FileLineRecord> allFilePathList, String tableName,
String filenamePrefix) {
if (allFilePathList == null || allFilePathList.isEmpty()) {
throw new IllegalArgumentException("File path list cannot be empty");
}
Expand All @@ -256,6 +258,13 @@ private List<FileLineRecord> getFileRecordList(List<FileLineRecord> allFilePathL
List<FileLineRecord> fileRecordList = allFilePathList.stream()
.filter(fileRecord -> {
String fileName = new File(fileRecord.getFilePath()).getName();

if (StringUtils.isNotBlank(filenamePrefix)) {
// 如果指定了文件名前缀,则匹配优先级最高
return fileName.startsWith(filenamePrefix);
}

// 按照 BatchTool 自身的导出命名规则进行匹配
if (!(fileName.length() >= tableName.length() + 2)) {
return false;
}
Expand Down
21 changes: 20 additions & 1 deletion batch-tool/src/main/java/exec/export/ShardingExportExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
Expand Down Expand Up @@ -87,6 +88,23 @@ private void doExportWithSharding(String tableName) {
throw new RuntimeException(e);
}

int startOffset = 0;
if (config.getStartPart() >= 0 && config.getEndPart() >= 0) {
int startPart = config.getStartPart();
int endPart = config.getEndPart();
if (endPart >= topologyList.size()) {
throw new IllegalArgumentException(
String.format("EndPart %d is larger than topology size %d", endPart, topologyList.size()));
}
startOffset = startPart;
// topologyList is in order
List<TableTopology> newTopologyList = new ArrayList<>(topologyList.subList(startPart, endPart + 1));
topologyList = newTopologyList;
logger.info("Exporting partitions {}~{} of table {}", startPart, endPart, tableName);
} else {
logger.info("Exporting all partitions of table {}, total part count: {}", tableName, topologyList.size());
}

try (Connection connection = dataSource.getConnection()) {
boolean isBroadCast = DbUtil.isBroadCast(connection, tableName);
if (isBroadCast) {
Expand All @@ -109,9 +127,10 @@ private void doExportWithSharding(String tableName) {
case MAX_LINE_NUM_IN_SINGLE_FILE:
case DEFAULT:
for (int i = 0; i < shardSize; i++) {
int suffix = startOffset + i;
directExportWorker = ExportWorkerFactory.buildDefaultDirectExportWorker(dataSource,
topologyList.get(i), tableFieldMetaInfo,
filePathPrefix + i, config);
filePathPrefix + suffix, config);
directExportWorker.setCountDownLatch(countDownLatch);
directExportWorker.setPermitted(permitted);
executor.submit(directExportWorker);
Expand Down
10 changes: 10 additions & 0 deletions batch-tool/src/main/java/model/ProducerExecutionContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class ProducerExecutionContext extends BaseConfig {

private ThreadPoolExecutor producerExecutor;

private String filenamePrefix;

private List<FileLineRecord> dataFileRecordList;

private List<FileLineRecord> ddlFileRecordList;
Expand Down Expand Up @@ -96,6 +98,14 @@ public void setProducerExecutor(ThreadPoolExecutor producerExecutor) {
this.producerExecutor = producerExecutor;
}

public String getFilenamePrefix() {
return filenamePrefix;
}

public void setFilenamePrefix(String filenamePrefix) {
this.filenamePrefix = filenamePrefix;
}

public List<FileLineRecord> getDataFileLineRecordList() {
return dataFileRecordList;
}
Expand Down
Loading

0 comments on commit 17b80cf

Please sign in to comment.