Skip to content

Commit

Permalink
hbase batch mutate
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed Dec 9, 2024
1 parent 16c17a1 commit 8056f8e
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,14 @@ public void open() throws CatalogException {

@Override
public void close() throws CatalogException {
hbaseClient.close();
if (this.hbaseClient != null) {
try {
this.hbaseClient.close();
} catch (Exception e) {
throw new CatalogException("Failed to close HBase connection.", e);
}
log.info("Current HBase connection is closed.");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,16 @@ public void mutate(Put put) throws IOException {
hbaseMutator.mutate(put);
}

/**
* Mutate Put List.
*
* @param putList Hbase put list
* @throws IOException exception
*/
public void batchMutate(List<Put> putList) throws IOException {
hbaseMutator.mutate(putList);
}

/**
* Scan a table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
Expand All @@ -36,8 +36,8 @@

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -56,10 +56,14 @@ public class HbaseSinkWriter

private List<Integer> rowkeyColumnIndexes;

public static final int BATCH_NUM = 10000;

private int versionColumnIndex;

private String defaultFamilyName = "value";

private List<Put> putList;

public HbaseSinkWriter(
SeaTunnelRowType seaTunnelRowType,
HbaseParameters hbaseParameters,
Expand All @@ -76,12 +80,22 @@ public HbaseSinkWriter(
}

this.hbaseClient = HbaseClient.createInstance(hbaseParameters);
putList = new ArrayList<>();
}

@Override
public void write(SeaTunnelRow element) throws IOException {
Put put = convertRowToPut(element);
hbaseClient.mutate(put);
if (!put.isEmpty()) {
putList.add(put);
}
if (putList.size() >= BATCH_NUM) {
try {
hbaseClient.batchMutate(putList);
} finally {
putList.clear();
}
}
}

@Override
Expand All @@ -94,6 +108,15 @@ public void abortPrepare() {}

@Override
public void close() throws IOException {

if (!putList.isEmpty()) {
try {
hbaseClient.batchMutate(putList);
} finally {
putList.clear();
}
}

if (hbaseClient != null) {
hbaseClient.close();
}
Expand All @@ -120,7 +143,6 @@ private Put convertRowToPut(SeaTunnelRow row) {
.collect(Collectors.toList());
for (Integer writeColumnIndex : writeColumnIndexes) {
String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex);
Map<String, String> configurationFamilyNames = hbaseParameters.getFamilyNames();
String familyName =
hbaseParameters.getFamilyNames().getOrDefault(fieldName, defaultFamilyName);
byte[] bytes = convertColumnToBytes(row, writeColumnIndex);
Expand Down Expand Up @@ -184,8 +206,7 @@ private byte[] convertColumnToBytes(SeaTunnelRow row, int index) {
String.format(
"Hbase connector does not support this column type [%s]",
fieldType.getSqlType());
throw new HbaseConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
throw new HbaseConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
}
}
}

0 comments on commit 8056f8e

Please sign in to comment.