Skip to content

Commit

Permalink
to #56171689 support only export specified partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
F-ca7 committed Apr 17, 2024
1 parent 6feabb0 commit b6b4041
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 40 deletions.
2 changes: 1 addition & 1 deletion batch-tool/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.alibaba.polardbx</groupId>
<artifactId>batch-tool</artifactId>
<version>1.4.0</version>
<version>1.4.1</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
29 changes: 29 additions & 0 deletions batch-tool/src/main/java/cmd/CommandUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import static cmd.ConfigArgOption.ARG_SHORT_VERSION;
import static cmd.ConfigArgOption.ARG_SHORT_WHERE;
import static cmd.ConfigArgOption.ARG_SHORT_WITH_DDL;
import static cmd.ConfigArgOption.ARG_TBL_PART;
import static cmd.FlagOption.ARG_DROP_TABLE_IF_EXISTS;
import static cmd.FlagOption.ARG_EMPTY_AS_NULL;
import static cmd.FlagOption.ARG_SHORT_ENABLE_SHARDING;
Expand Down Expand Up @@ -462,10 +463,38 @@ private static ExportCommand parseExportCommand(ConfigResult result) {
setFileLine(result, exportConfig);
setOrderBy(result, exportConfig);
setColumnMaskerMap(result, exportConfig);
setPartitions(result, exportConfig);
exportConfig.validate();
return new ExportCommand(getDbName(result), tableNames, exportConfig);
}

private static void setPartitions(ConfigResult result, ExportConfig exportConfig) {
if (result.hasOption(ARG_TBL_PART)) {
String partOpt = result.getOptionValue(ARG_TBL_PART);
if (StringUtils.isNotEmpty(partOpt)) {
String[] parts = StringUtils.split(partOpt, ":");
if (parts.length == 2) {
int startPart = Integer.parseInt(parts[0]);
int endPart = Integer.parseInt(parts[1]);
if (startPart < 0) {
throw new IllegalArgumentException("Illegal start partition: " + startPart);
}
if (endPart < 0) {
throw new IllegalArgumentException("Illegal end partition: " + endPart);
}
if (endPart < startPart) {
throw new IllegalArgumentException(
"End partition should be greater than start partition: " + endPart);
}
exportConfig.setStartPart(startPart);
exportConfig.setEndPart(endPart);
return;
}
}
throw new IllegalArgumentException("Illegal table part option: " + partOpt);
}
}

private static void setDir(ConfigResult result, ExportConfig exportConfig) {
if (result.hasOption(ARG_SHORT_DIRECTORY)) {
String dirPath = result.getOptionValue(ARG_SHORT_DIRECTORY);
Expand Down
2 changes: 2 additions & 0 deletions batch-tool/src/main/java/cmd/ConfigArgOption.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public class ConfigArgOption {
of("benchmark", "benchmark", "Fast loading benchmark data (dafault NONE).", "NONE | TPCH");
public static final ConfigArgOption ARG_SHORT_SCALE =
of("scale", "scale", "The size scale benchmark data (GB for tpch).", "size");
public static final ConfigArgOption ARG_TBL_PART =
of("part", "tblPart", "Partitions of the target tables, starting from 0 (both inclusive).", "start:end");

protected final String argShort;
protected final String argLong;
Expand Down
21 changes: 20 additions & 1 deletion batch-tool/src/main/java/exec/export/ShardingExportExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
Expand Down Expand Up @@ -87,6 +88,23 @@ private void doExportWithSharding(String tableName) {
throw new RuntimeException(e);
}

int startOffset = 0;
if (config.getStartPart() >= 0 && config.getEndPart() >= 0) {
int startPart = config.getStartPart();
int endPart = config.getEndPart();
if (endPart >= topologyList.size()) {
throw new IllegalArgumentException(
String.format("EndPart %d is larger than topology size %d", endPart, topologyList.size()));
}
startOffset = startPart;
// topologyList is in order
List<TableTopology> newTopologyList = new ArrayList<>(topologyList.subList(startPart, endPart + 1));
topologyList = newTopologyList;
logger.info("Exporting partitions {}~{} of table {}", startPart, endPart, tableName);
} else {
logger.info("Exporting all partitions of table {}, total part count: {}", tableName, topologyList.size());
}

try (Connection connection = dataSource.getConnection()) {
boolean isBroadCast = DbUtil.isBroadCast(connection, tableName);
if (isBroadCast) {
Expand All @@ -109,9 +127,10 @@ private void doExportWithSharding(String tableName) {
case MAX_LINE_NUM_IN_SINGLE_FILE:
case DEFAULT:
for (int i = 0; i < shardSize; i++) {
int suffix = startOffset + i;
directExportWorker = ExportWorkerFactory.buildDefaultDirectExportWorker(dataSource,
topologyList.get(i), tableFieldMetaInfo,
filePathPrefix + i, config);
filePathPrefix + suffix, config);
directExportWorker.setCountDownLatch(countDownLatch);
directExportWorker.setPermitted(permitted);
executor.submit(directExportWorker);
Expand Down
83 changes: 49 additions & 34 deletions batch-tool/src/main/java/model/config/BaseConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,7 @@
*/
public class BaseConfig {

private static class FileMode {
static final byte COMPRESS_FLAG = 1;
static final byte ENCRYPTION_FLAG = 1 << 2;
static final byte FILE_FORMAT_FLAG = 1 << 3;

byte flag = 0;

void setCompress() {
this.flag |= COMPRESS_FLAG;
}

void setEncryption() {
this.flag |= ENCRYPTION_FLAG;
}

void setFileFormat() {
this.flag |= FILE_FORMAT_FLAG;
}

int bitCount() {
int count = 0;
byte n = flag;
while (n != 0) {
n &= n - 1;
count++;
}
return count;
}
}

private final FileMode fileMode = new FileMode();
/**
* 分隔符
*/
Expand Down Expand Up @@ -84,17 +55,31 @@ int bitCount() {
* 引号模式
*/
protected QuoteEncloseMode quoteEncloseMode;

private final FileMode fileMode = new FileMode();

private boolean isWithLastSep = false;

private boolean isWithView = false;
private int startPart = -1;
private int endPart = -1;

public BaseConfig(boolean shardingEnabled) {
this.shardingEnabled = shardingEnabled;
}

public int getStartPart() {
return startPart;
}

public void setStartPart(int startPart) {
this.startPart = startPart;
}

public int getEndPart() {
return endPart;
}

public void setEndPart(int endPart) {
this.endPart = endPart;
}

public String getSeparator() {
return separator;
}
Expand Down Expand Up @@ -231,4 +216,34 @@ public String toString() {
", encryptionConfig='" + encryptionConfig + '\'' +
'}';
}

private static class FileMode {
static final byte COMPRESS_FLAG = 1;
static final byte ENCRYPTION_FLAG = 1 << 2;
static final byte FILE_FORMAT_FLAG = 1 << 3;

byte flag = 0;

void setCompress() {
this.flag |= COMPRESS_FLAG;
}

void setEncryption() {
this.flag |= ENCRYPTION_FLAG;
}

void setFileFormat() {
this.flag |= FILE_FORMAT_FLAG;
}

int bitCount() {
int count = 0;
byte n = flag;
while (n != 0) {
n &= n - 1;
count++;
}
return count;
}
}
}
8 changes: 4 additions & 4 deletions batch-tool/src/main/java/worker/export/BaseExportWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,6 @@ protected void afterProduceData() {
* @param columnIdx 从 0 开始
*/
protected void writeFieldValue(ByteArrayOutputStream os, byte[] value, int columnIdx) throws IOException {
if (columnDataMaskerList != null && columnDataMaskerList.get(columnIdx) != null) {
value = columnDataMaskerList.get(columnIdx).doMask(value);
}
boolean isStringType = isStringTypeList.get(columnIdx);
switch (quoteEncloseMode) {
case NONE:
FileUtil.writeToByteArrayStream(os, value);
Expand All @@ -180,6 +176,10 @@ protected void writeFieldValue(ByteArrayOutputStream os, byte[] value, int colum
FileUtil.writeToByteArrayStreamWithQuote(os, value);
break;
case AUTO:
if (columnDataMaskerList != null && columnDataMaskerList.get(columnIdx) != null) {
value = columnDataMaskerList.get(columnIdx).doMask(value);
}
boolean isStringType = isStringTypeList.get(columnIdx);
if (!isStringType) {
FileUtil.writeToByteArrayStream(os, value);
} else {
Expand Down

0 comments on commit b6b4041

Please sign in to comment.