From a8504d63f3e595c02b6cd5c2d2a483afb621d395 Mon Sep 17 00:00:00 2001 From: cjj2010 <2449402815@qq.com> Date: Sat, 12 Oct 2024 12:34:24 +0800 Subject: [PATCH] [enhance](cloud)support SHOW-TRANSACTION --- .../CloudGlobalTransactionMgr.java | 5 +- .../doris/cloud/transaction/TxnUtil.java | 52 ------------- .../transaction/DatabaseTransactionMgr.java | 11 ++- .../doris/transaction/TransactionUtil.java | 77 +++++++++++++++++++ 4 files changed, 85 insertions(+), 60 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionUtil.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index aa5e6da6e5ac84..c7dc5f73a9294b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -112,6 +112,7 @@ import org.apache.doris.transaction.TransactionState.LoadJobSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionStatus; +import org.apache.doris.transaction.TransactionUtil; import org.apache.doris.transaction.TxnCommitAttachment; import org.apache.doris.transaction.TxnStateCallbackFactory; import org.apache.doris.transaction.TxnStateChangeCallback; @@ -1704,8 +1705,8 @@ public List> getSingleTranInfo(long dbId, long txnId) throws Analys if (txnState == null) { throw new AnalysisException("transaction with id " + txnId + " does not exist"); } - TxnUtil.checkAuth(dbId, txnState); - infos.add(TxnUtil.getTxnStateInfo(txnState, Lists.newArrayList())); + TransactionUtil.checkAuth(dbId, txnState); + infos.add(TransactionUtil.getTxnStateInfo(txnState, Lists.newArrayList())); return infos; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java index 207125b7ded1c5..3aca54cd150dc6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/TxnUtil.java @@ -17,9 +17,6 @@ package org.apache.doris.cloud.transaction; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.Table; import org.apache.doris.cloud.proto.Cloud.RLTaskTxnCommitAttachmentPB; import org.apache.doris.cloud.proto.Cloud.RoutineLoadProgressPB; import org.apache.doris.cloud.proto.Cloud.TxnCommitAttachmentPB; @@ -31,19 +28,12 @@ import org.apache.doris.cloud.proto.Cloud.TxnInfoPB; import org.apache.doris.cloud.proto.Cloud.TxnSourceTypePB; import org.apache.doris.cloud.proto.Cloud.UniqueIdPB; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.EtlStatus; import org.apache.doris.load.FailMsg; import org.apache.doris.load.loadv2.JobState; import org.apache.doris.load.loadv2.LoadJobFinalOperation; import org.apache.doris.load.routineload.KafkaProgress; import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment; -import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TEtlState; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionState; @@ -58,7 +48,6 @@ import java.util.HashMap; import java.util.List; -import java.util.Set; public class TxnUtil { private static final Logger LOG = LogManager.getLogger(TxnUtil.class); @@ -378,45 +367,4 @@ public static TransactionState transactionStateFromPb(TxnInfoPB txnInfo) { return transactionState; } - public static List getTxnStateInfo(TransactionState txnState, List info) { - info.add(String.valueOf(txnState.getTransactionId())); - info.add(txnState.getLabel()); - info.add(txnState.getCoordinator().toString()); - info.add(txnState.getTransactionStatus().name()); - info.add(txnState.getSourceType().name()); - info.add(TimeUtils.longToTimeString(txnState.getPrepareTime())); - info.add(TimeUtils.longToTimeString(txnState.getPreCommitTime())); - info.add(TimeUtils.longToTimeString(txnState.getCommitTime())); - info.add(TimeUtils.longToTimeString(txnState.getLastPublishVersionTime())); - info.add(TimeUtils.longToTimeString(txnState.getFinishTime())); - info.add(txnState.getReason()); - info.add(String.valueOf(txnState.getErrorReplicas().size())); - info.add(String.valueOf(txnState.getCallbackId())); - info.add(String.valueOf(txnState.getTimeoutMs())); - info.add(txnState.getErrMsg()); - return info; - } - - public static void checkAuth(long dbId, TransactionState txnState) throws AnalysisException { - Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbId); - if (ConnectContext.get() != null) { - // check auth - Set tblIds = txnState.getIdToTableCommitInfos().keySet(); - for (Long tblId : tblIds) { - Table tbl = db.getTableNullable(tblId); - if (tbl != null) { - if (!Env.getCurrentEnv().getAccessManager() - .checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, - db.getFullName(), - tbl.getName(), PrivPredicate.SHOW)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, - "SHOW TRANSACTION", - ConnectContext.get().getQualifiedUser(), - ConnectContext.get().getRemoteIP(), - db.getFullName() + ": " + tbl.getName()); - } - } - } - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 081cd6abee308c..163830e447515f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -33,7 +33,6 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; -import org.apache.doris.cloud.transaction.TxnUtil; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DuplicatedRequestException; @@ -245,7 +244,7 @@ public List> getTxnStateInfoList(boolean running, int limit) { .sorted(TransactionState.TXN_ID_COMPARATOR) .limit(limit) .forEach(t -> { - infos.add(TxnUtil.getTxnStateInfo(t, Lists.newArrayList())); + infos.add(TransactionUtil.getTxnStateInfo(t, Lists.newArrayList())); }); } finally { readUnlock(); @@ -281,7 +280,7 @@ public List> getTxnStateInfoList(TransactionStatus status) { .filter(transactionState -> (transactionState.getTransactionStatus() == status)) .sorted(TransactionState.TXN_ID_COMPARATOR) .forEach(t -> { - infos.add(TxnUtil.getTxnStateInfo(t, Lists.newArrayList())); + infos.add(TransactionUtil.getTxnStateInfo(t, Lists.newArrayList())); }); } finally { readUnlock(); @@ -301,7 +300,7 @@ public List> getTxnStateInfoList(String labelRegex) { .filter(transactionState -> (transactionState.getLabel().matches(labelRegex))) .sorted(TransactionState.TXN_ID_COMPARATOR) .forEach(t -> { - infos.add(TxnUtil.getTxnStateInfo(t, Lists.newArrayList())); + infos.add(TransactionUtil.getTxnStateInfo(t, Lists.newArrayList())); }); } finally { readUnlock(); @@ -2046,8 +2045,8 @@ public List> getSingleTranInfo(long dbId, long txnId) throws Analys if (txnState == null) { throw new AnalysisException("transaction with id " + txnId + " does not exist"); } - TxnUtil.checkAuth(dbId, txnState); - infos.add(TxnUtil.getTxnStateInfo(txnState, Lists.newArrayList())); + TransactionUtil.checkAuth(dbId, txnState); + infos.add(TransactionUtil.getTxnStateInfo(txnState, Lists.newArrayList())); } finally { readUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionUtil.java new file mode 100644 index 00000000000000..0a5fa0de5c8571 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionUtil.java @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.transaction; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.qe.ConnectContext; + +import java.util.List; +import java.util.Set; + +public class TransactionUtil { + + public static List getTxnStateInfo(TransactionState txnState, List info) { + info.add(String.valueOf(txnState.getTransactionId())); + info.add(txnState.getLabel()); + info.add(txnState.getCoordinator().toString()); + info.add(txnState.getTransactionStatus().name()); + info.add(txnState.getSourceType().name()); + info.add(TimeUtils.longToTimeString(txnState.getPrepareTime())); + info.add(TimeUtils.longToTimeString(txnState.getPreCommitTime())); + info.add(TimeUtils.longToTimeString(txnState.getCommitTime())); + info.add(TimeUtils.longToTimeString(txnState.getLastPublishVersionTime())); + info.add(TimeUtils.longToTimeString(txnState.getFinishTime())); + info.add(txnState.getReason()); + info.add(String.valueOf(txnState.getErrorReplicas().size())); + info.add(String.valueOf(txnState.getCallbackId())); + info.add(String.valueOf(txnState.getTimeoutMs())); + info.add(txnState.getErrMsg()); + return info; + } + + public static void checkAuth(long dbId, TransactionState txnState) throws AnalysisException { + Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(dbId); + if (ConnectContext.get() != null) { + // check auth + Set tblIds = txnState.getIdToTableCommitInfos().keySet(); + for (Long tblId : tblIds) { + Table tbl = db.getTableNullable(tblId); + if (tbl != null) { + if (!Env.getCurrentEnv().getAccessManager() + .checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, + db.getFullName(), + tbl.getName(), PrivPredicate.SHOW)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, + "SHOW TRANSACTION", + ConnectContext.get().getQualifiedUser(), + ConnectContext.get().getRemoteIP(), + db.getFullName() + ": " + tbl.getName()); + } + } + } + } + } +}