Skip to content

Commit

Permalink
[fix](txn-insert) Txn insert support ccr (apache#36859)
Browse files Browse the repository at this point in the history
## Proposed changes
  • Loading branch information
mymeiyi authored Jul 9, 2024
1 parent c8cadba commit c272886
Show file tree
Hide file tree
Showing 8 changed files with 447 additions and 70 deletions.
38 changes: 30 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/UpsertRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public static class PartitionRecord {

@SerializedName(value = "isTempPartition")
public boolean isTemp;

@SerializedName(value = "stid")
public long subTxnId;
}

@SerializedName(value = "partitionRecords")
Expand All @@ -58,8 +61,13 @@ public TableRecord(Set<Long> indexIds) {
this.indexIds = indexIds;
}

public void addPartitionRecord(PartitionCommitInfo partitionCommitInfo) {
private void addPartitionRecord(PartitionCommitInfo partitionCommitInfo) {
addPartitionRecord(-1, partitionCommitInfo);
}

private void addPartitionRecord(long subTxnId, PartitionCommitInfo partitionCommitInfo) {
PartitionRecord partitionRecord = new PartitionRecord();
partitionRecord.subTxnId = subTxnId;
partitionRecord.partitionId = partitionCommitInfo.getPartitionId();
partitionRecord.range = partitionCommitInfo.getPartitionRange();
partitionRecord.version = partitionCommitInfo.getVersion();
Expand Down Expand Up @@ -87,6 +95,8 @@ public List<PartitionRecord> getPartitionRecords() {
// pair is (tableId, tableRecord)
@SerializedName(value = "tableRecords")
private Map<Long, TableRecord> tableRecords;
@SerializedName(value = "stids")
private List<Long> subTxnIds;

// construct from TransactionState
public UpsertRecord(long commitSeq, TransactionState state) {
Expand All @@ -98,13 +108,25 @@ public UpsertRecord(long commitSeq, TransactionState state) {
tableRecords = Maps.newHashMap();

Map<Long, Set<Long>> loadedTableIndexIds = state.getLoadedTblIndexes();
for (TableCommitInfo info : state.getIdToTableCommitInfos().values()) {
Set<Long> indexIds = loadedTableIndexIds.get(info.getTableId());
TableRecord tableRecord = new TableRecord(indexIds);
tableRecords.put(info.getTableId(), tableRecord);

for (PartitionCommitInfo partitionCommitInfo : info.getIdToPartitionCommitInfo().values()) {
tableRecord.addPartitionRecord(partitionCommitInfo);
if (state.getSubTxnIds() != null) {
state.getSubTxnIdToTableCommitInfo().forEach((subTxnId, tableCommitInfo) -> {
Set<Long> indexIds = loadedTableIndexIds.get(tableCommitInfo.getTableId());
TableRecord tableRecord = tableRecords.compute(tableCommitInfo.getTableId(),
(k, v) -> v == null ? new TableRecord(indexIds) : v);
for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) {
tableRecord.addPartitionRecord(subTxnId, partitionCommitInfo);
}
});
subTxnIds = state.getSubTxnIds();
} else {
for (TableCommitInfo info : state.getIdToTableCommitInfos().values()) {
Set<Long> indexIds = loadedTableIndexIds.get(info.getTableId());
TableRecord tableRecord = new TableRecord(indexIds);
tableRecords.put(info.getTableId(), tableRecord);

for (PartitionCommitInfo partitionCommitInfo : info.getIdToPartitionCommitInfo().values()) {
tableRecord.addPartitionRecord(partitionCommitInfo);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@
import org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TStreamLoadPutResult;
import org.apache.doris.thrift.TSubTxnInfo;
import org.apache.doris.thrift.TSyncQueryColumns;
import org.apache.doris.thrift.TTableIndexQueryStats;
import org.apache.doris.thrift.TTableMetadataNameIds;
Expand All @@ -245,6 +246,7 @@
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.SubTransactionState;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
Expand Down Expand Up @@ -1265,6 +1267,9 @@ public TBeginTxnResult beginTxn(TBeginTxnRequest request) throws TException {
try {
TBeginTxnResult tmpRes = beginTxnImpl(request, clientAddr);
result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());
if (tmpRes.isSetSubTxnIds()) {
result.setSubTxnIds(tmpRes.getSubTxnIds());
}
} catch (DuplicatedRequestException e) {
// this is a duplicate request, just return previous txn id
LOG.warn("duplicate request for stream load. request id: {}, txn: {}", e.getDuplicatedRequestId(),
Expand Down Expand Up @@ -1349,6 +1354,12 @@ private TBeginTxnResult beginTxnImpl(TBeginTxnRequest request, String clientIp)
// step 7: return result
TBeginTxnResult result = new TBeginTxnResult();
result.setTxnId(txnId).setDbId(db.getId());
if (request.isSetSubTxnNum() && request.getSubTxnNum() > 0) {
result.addToSubTxnIds(txnId);
for (int i = 0; i < request.getSubTxnNum() - 1; i++) {
result.addToSubTxnIds(Env.getCurrentGlobalTransactionMgr().getNextTransactionId());
}
}
return result;
}

Expand Down Expand Up @@ -1699,8 +1710,14 @@ private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException {
if (!request.isSetTxnId()) {
throw new UserException("txn_id is not set");
}
if (!request.isSetCommitInfos()) {
throw new UserException("commit_infos is not set");
if (request.isSetTxnInsert() && request.isTxnInsert()) {
if (!request.isSetSubTxnInfos()) {
throw new UserException("sub_txn_infos is not set");
}
} else {
if (!request.isSetCommitInfos()) {
throw new UserException("commit_infos is not set");
}
}

// Step 1: get && check database
Expand Down Expand Up @@ -1750,11 +1767,29 @@ private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException {
long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000;

// Step 5: commit and publish
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
if (request.isSetTxnInsert() && request.isTxnInsert()) {
List<Long> subTxnIds = new ArrayList<>();
List<SubTransactionState> subTransactionStates = new ArrayList<>();
for (TSubTxnInfo subTxnInfo : request.getSubTxnInfos()) {
TableIf table = db.getTableNullable(subTxnInfo.getTableId());
if (table == null) {
continue;
}
subTxnIds.add(subTxnInfo.getSubTxnId());
subTransactionStates.add(
new SubTransactionState(subTxnInfo.getSubTxnId(), (Table) table,
subTxnInfo.getTabletCommitInfos(), null));
}
transactionState.setSubTxnIds(subTxnIds);
return Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(db, request.getTxnId(),
subTransactionStates, timeoutMs);
} else {
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
}
}

@Override
Expand Down
7 changes: 7 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ struct TBeginTxnRequest {
10: optional Types.TUniqueId request_id
11: optional string token
12: optional i64 backend_id
// used for ccr
13: optional i64 sub_txn_num = 0
}

struct TBeginTxnResult {
Expand All @@ -662,6 +664,8 @@ struct TBeginTxnResult {
3: optional string job_status // if label already used, set status of existing job
4: optional i64 db_id
5: optional Types.TNetworkAddress master_address
// used for ccr
6: optional list<i64> sub_txn_ids
}

// StreamLoad request, used to load a streaming to engine
Expand Down Expand Up @@ -832,6 +836,9 @@ struct TCommitTxnRequest {
10: optional i64 thrift_rpc_timeout_ms
11: optional string token
12: optional i64 db_id
// used for ccr
13: optional bool txn_insert
14: optional list<TSubTxnInfo> sub_txn_infos
}

struct TCommitTxnResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package org.apache.doris.regression.json
class PartitionData {
public Long partitionId
public Long version
public Long stid // sub txn id

String toString() {
return "(" + partitionId.toString() + ", " + version.toString() + ")"
return "(" + partitionId.toString() + ", " + version.toString() + ", " + stid + ")"
}
}

Expand Down Expand Up @@ -51,13 +52,17 @@ class BinlogData {
public String label
public Long dbId
public Map<Long, PartitionRecords> tableRecords
public List<Long> stids

String toString() {
return "(" + commitSeq.toString() + ", " +
txnId.toString() + ", " +
timeStamp + label + ", " +
dbId.toString() + ", " +
tableRecords.toString()
return "(commitSeq: " + commitSeq
+ ", txnId: " + txnId
+ ", timestamp: " + timeStamp
+ ", label: " + label
+ ", dbId: " + dbId
+", subTxnIds: " + stids
+ ", tableRecords: " + tableRecords
+")"
}
}

Loading

0 comments on commit c272886

Please sign in to comment.