Skip to content

Commit

Permalink
[chore](table) Add batch method to get visible version of the olap ta…
Browse files Browse the repository at this point in the history
…ble (apache#38949)

Since get visible version is a heavy operation in the cloud mode, this
PR add a batch method, to obtain all visible versions via only one RPC.
  • Loading branch information
w41ter authored Aug 12, 2024
1 parent c47399c commit 285f68a
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 50 deletions.
11 changes: 7 additions & 4 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,19 +315,22 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
return;
}

size_t num_acquired = request->partition_ids_size();
size_t num_acquired =
is_table_version ? request->table_ids_size() : request->partition_ids_size();
response->mutable_versions()->Reserve(num_acquired);
response->mutable_db_ids()->CopyFrom(request->db_ids());
response->mutable_table_ids()->CopyFrom(request->table_ids());
response->mutable_partition_ids()->CopyFrom(request->partition_ids());
if (!is_table_version) {
response->mutable_partition_ids()->CopyFrom(request->partition_ids());
}

constexpr size_t BATCH_SIZE = 500;
std::vector<std::string> version_keys;
std::vector<std::optional<std::string>> version_values;
version_keys.reserve(BATCH_SIZE);
version_values.reserve(BATCH_SIZE);
while ((code == MetaServiceCode::OK || code == MetaServiceCode::KV_TXN_TOO_OLD) &&
response->versions_size() < response->partition_ids_size()) {
response->versions_size() < num_acquired) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
Expand All @@ -343,11 +346,11 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
for (size_t j = i; j < limit; j++) {
int64_t db_id = request->db_ids(j);
int64_t table_id = request->table_ids(j);
int64_t partition_id = request->partition_ids(j);
std::string ver_key;
if (is_table_version) {
table_version_key({instance_id, db_id, table_id}, &ver_key);
} else {
int64_t partition_id = request->partition_ids(j);
partition_version_key({instance_id, db_id, table_id, partition_id}, &ver_key);
}
version_keys.push_back(std::move(ver_key));
Expand Down
116 changes: 87 additions & 29 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,16 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.DeepCopy;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVVersionSnapshot;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.resource.Tag;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.statistics.AnalysisInfo;
Expand Down Expand Up @@ -2225,7 +2224,6 @@ public int getBaseSchemaVersion() {
return baseIndexMeta.getSchemaVersion();
}


public void setEnableSingleReplicaCompaction(boolean enableSingleReplicaCompaction) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
Expand Down Expand Up @@ -2849,6 +2847,7 @@ public long getVisibleVersion() {
if (Config.isNotCloudMode()) {
return tableAttributes.getVisibleVersion();
}

// get version rpc
Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
.setDbId(this.getDatabase().getId())
Expand All @@ -2858,7 +2857,7 @@ public long getVisibleVersion() {
.build();

try {
Cloud.GetVersionResponse resp = getVersionFromMeta(request);
Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(request);
long version = -1;
if (resp.getStatus().getCode() == Cloud.MetaServiceCode.OK) {
version = resp.getVersion();
Expand All @@ -2874,7 +2873,90 @@ public long getVisibleVersion() {
}
return version;
} catch (RpcException e) {
throw new RuntimeException("get version from meta service failed");
throw new RuntimeException("get version from meta service failed", e);
}
}

// Get the table versions in batch.
public static List<Long> getVisibleVersionByTableIds(Collection<Long> tableIds) {
List<OlapTable> tables = new ArrayList<>();

InternalCatalog catalog = Env.getCurrentEnv().getInternalCatalog();
for (long tableId : tableIds) {
Table table = catalog.getTableByTableId(tableId);
if (table == null) {
throw new RuntimeException("get table visible version failed, no such table " + tableId + " exists");
}
if (table.getType() != TableType.OLAP) {
throw new RuntimeException(
"get table visible version failed, table " + tableId + " is not a OLAP table");
}
tables.add((OlapTable) table);
}

return getVisibleVersionInBatch(tables);
}

// Get the table versions in batch.
public static List<Long> getVisibleVersionInBatch(Collection<OlapTable> tables) {
if (tables.isEmpty()) {
return new ArrayList<>();
}

if (Config.isNotCloudMode()) {
return tables.stream()
.map(table -> table.tableAttributes.getVisibleVersion())
.collect(Collectors.toList());
}

List<Long> dbIds = new ArrayList<>();
List<Long> tableIds = new ArrayList<>();
for (OlapTable table : tables) {
dbIds.add(table.getDatabase().getId());
tableIds.add(table.getId());
}

return getVisibleVersionFromMeta(dbIds, tableIds);
}

private static List<Long> getVisibleVersionFromMeta(List<Long> dbIds, List<Long> tableIds) {
// get version rpc
Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
.setDbId(-1)
.setTableId(-1)
.setPartitionId(-1)
.addAllDbIds(dbIds)
.addAllTableIds(tableIds)
.setBatchMode(true)
.setIsTableVersion(true)
.build();

try {
Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(request);
if (resp.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
throw new RpcException("get table visible version", "unexpected status " + resp.getStatus());
}

List<Long> versions = resp.getVersionsList();
if (versions.size() != tableIds.size()) {
throw new RpcException("get table visible version",
"wrong number of versions, required " + tableIds.size() + ", but got " + versions.size());
}

if (LOG.isDebugEnabled()) {
LOG.debug("get table version from meta service, tables: {}, versions: {}", tableIds, versions);
}

for (int i = 0; i < versions.size(); i++) {
// Set visible version to 1 if no such table version exists.
if (versions.get(i) <= 0L) {
versions.set(i, 1L);
}
}

return versions;
} catch (RpcException e) {
throw new RuntimeException("get table version from meta service failed", e);
}
}

Expand Down Expand Up @@ -2921,19 +3003,6 @@ public MTMVSnapshotIf getTableSnapshot() {
return new MTMVVersionSnapshot(visibleVersion);
}

private static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req)
throws RpcException {
long startAt = System.nanoTime();
try {
return VersionHelper.getVisibleVersion(req);
} finally {
SummaryProfile profile = getSummaryProfile();
if (profile != null) {
profile.addGetTableVersionTime(System.nanoTime() - startAt);
}
}
}

@Override
public boolean needAutoRefresh() {
return true;
Expand All @@ -2944,17 +3013,6 @@ public boolean isPartitionColumnAllowNull() {
return true;
}

private static SummaryProfile getSummaryProfile() {
ConnectContext ctx = ConnectContext.get();
if (ctx != null) {
StmtExecutor executor = ctx.getExecutor();
if (executor != null) {
return executor.getSummaryProfile();
}
}
return null;
}

public void setStatistics(Statistics statistics) {
this.statistics = statistics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public long getVisibleVersion() {
.build();

try {
Cloud.GetVersionResponse resp = getVersionFromMeta(request);
Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(request);
long version = -1;
if (resp.getStatus().getCode() == MetaServiceCode.OK) {
version = resp.getVersion();
Expand Down Expand Up @@ -238,7 +238,7 @@ public static List<Long> getSnapshotVisibleVersion(List<Long> dbIds, List<Long>
if (LOG.isDebugEnabled()) {
LOG.debug("getVisibleVersion use CloudPartition {}", partitionIds.toString());
}
Cloud.GetVersionResponse resp = getVersionFromMeta(req);
Cloud.GetVersionResponse resp = VersionHelper.getVersionFromMeta(req);
if (resp.getStatus().getCode() != MetaServiceCode.OK) {
throw new RpcException("get visible version", "unexpected status " + resp.getStatus());
}
Expand Down Expand Up @@ -339,19 +339,6 @@ public boolean hasData() {
return getVisibleVersion() > Partition.PARTITION_INIT_VERSION;
}

private static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req)
throws RpcException {
long startAt = System.nanoTime();
try {
return VersionHelper.getVisibleVersion(req);
} finally {
SummaryProfile profile = getSummaryProfile();
if (profile != null) {
profile.addGetPartitionVersionTime(System.nanoTime() - startAt);
}
}
}

private static boolean isEmptyPartitionPruneDisabled() {
ConnectContext ctx = ConnectContext.get();
if (ctx != null && (ctx.getSessionVariable().getDisableNereidsRules().get(RuleType.valueOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.common.Config;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.RpcException;

import org.apache.logging.log4j.LogManager;
Expand All @@ -32,6 +35,26 @@
public class VersionHelper {
private static final Logger LOG = LogManager.getLogger(VersionHelper.class);

// Call get_version() from meta service, and save the elapsed to summary profile.
public static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req)
throws RpcException {
long startAt = System.nanoTime();
boolean isTableVersion = req.getIsTableVersion();
try {
return getVisibleVersion(req);
} finally {
SummaryProfile profile = getSummaryProfile();
if (profile != null) {
long elapsed = System.nanoTime() - startAt;
if (isTableVersion) {
profile.addGetTableVersionTime(elapsed);
} else {
profile.addGetPartitionVersionTime(elapsed);
}
}
}
}

public static Cloud.GetVersionResponse getVisibleVersion(Cloud.GetVersionRequest request) throws RpcException {
int tryTimes = 0;
while (tryTimes++ < Config.metaServiceRpcRetryTimes()) {
Expand Down Expand Up @@ -65,8 +88,7 @@ public static Cloud.GetVersionResponse getVisibleVersionInternal(Cloud.GetVersio
long deadline = System.currentTimeMillis() + timeoutMs;
Cloud.GetVersionResponse resp = null;
try {
Future<Cloud.GetVersionResponse> future =
MetaServiceProxy.getInstance().getVisibleVersionAsync(request);
Future<Cloud.GetVersionResponse> future = MetaServiceProxy.getInstance().getVisibleVersionAsync(request);

while (resp == null) {
try {
Expand All @@ -89,4 +111,16 @@ private static void sleepSeveralMs(int lowerMs, int upperMs) {
LOG.warn("get snapshot from meta service: sleep get interrupted exception");
}
}

private static SummaryProfile getSummaryProfile() {
ConnectContext ctx = ConnectContext.get();
if (ctx != null) {
StmtExecutor executor = ctx.getExecutor();
if (executor != null) {
return executor.getSummaryProfile();
}
}
return null;
}

}

0 comments on commit 285f68a

Please sign in to comment.