Skip to content

Commit

Permalink
feat: truncate aggrate table
Browse files Browse the repository at this point in the history
  • Loading branch information
dl239 committed Oct 17, 2023
1 parent 6c48ab1 commit b25b165
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 26 deletions.
1 change: 1 addition & 0 deletions src/base/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ enum ReturnCode {
kExceedMaxMemory = 160,
kInvalidArgs = 161,
kCheckIndexFailed = 162,
kCatalogUpdateFailed = 163,
kNameserverIsNotLeader = 300,
kAutoFailoverIsEnabled = 301,
kEndpointIsNotExist = 302,
Expand Down
41 changes: 41 additions & 0 deletions src/cmd/sql_cmd_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,47 @@ TEST_P(DBSDKTest, DeployWithBias) {
ASSERT_TRUE(cs->GetNsClient()->DropDatabase(db, msg));
}

TEST_P(DBSDKTest, Truncate) {
auto cli = GetParam();
sr = cli->sr;
std::string db_name = "test2";
std::string table_name = "test1";
std::string ddl = "create table test1 (c1 string, c2 int, c3 bigint, INDEX(KEY=c1, ts=c3));";
ProcessSQLs(sr, {
"set @@execute_mode = 'online'",
absl::StrCat("create database ", db_name, ";"),
absl::StrCat("use ", db_name, ";"),
ddl,
});
hybridse::sdk::Status status;
sr->ExecuteSQL(absl::StrCat("truncate table ", table_name, ";"), &status);
ASSERT_TRUE(status.IsOK()) << status.ToString();
auto res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status);
ASSERT_EQ(res->Size(), 0);
for (int i = 0; i < 10; i++) {
std::string key = absl::StrCat("key", i);
for (int j = 0; j < 10; j++) {
uint64_t ts = 1000 + j;
sr->ExecuteSQL(absl::StrCat("insert into ", table_name, " values ('", key, "', 11, ", ts, ");"), &status);
}
}

res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status);
ASSERT_EQ(res->Size(), 100);
sr->ExecuteSQL(absl::StrCat("truncate table ", table_name, ";"), &status);
ASSERT_TRUE(status.IsOK()) << status.ToString();
res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status);
ASSERT_EQ(res->Size(), 0);
sr->ExecuteSQL(absl::StrCat("insert into ", table_name, " values ('aa', 11, 100);"), &status);
res = sr->ExecuteSQL(absl::StrCat("select * from ", table_name, ";"), &status);
ASSERT_EQ(res->Size(), 1);
ProcessSQLs(sr, {
absl::StrCat("use ", db_name, ";"),
absl::StrCat("drop table ", table_name),
absl::StrCat("drop database ", db_name),
});
}

TEST_P(DBSDKTest, DeletetRange) {
auto cli = GetParam();
sr = cli->sr;
Expand Down
2 changes: 2 additions & 0 deletions src/storage/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ class Aggregator {
// set the filter column info that not initialized in constructor
bool SetFilter(absl::string_view filter_col);

std::shared_ptr<Table> GetAggTable() { return aggr_table_; }

protected:
codec::Schema base_table_schema_;
codec::Schema aggr_table_schema_;
Expand Down
68 changes: 42 additions & 26 deletions src/tablet/tablet_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3463,6 +3463,38 @@ void TabletImpl::TruncateTable(RpcController* controller, const ::openmldb::api:
brpc::ClosureGuard done_guard(done);
uint32_t tid = request->tid();
uint32_t pid = request->pid();
if (auto status = TruncateTableInternal(tid, pid); !status.OK()) {
base::SetResponseStatus(status, response);
return;
}
auto aggrs = GetAggregators(tid, pid);
if (aggrs) {
for (const auto& aggr : *aggrs) {
auto agg_table = aggr->GetAggTable();
if (!agg_table) {
PDLOG(WARNING, "aggrate table does not exist. tid[%u] pid[%u] index pos[%u]",
tid, pid, aggr->GetIndexPos());
response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist);
response->set_msg("aggrate table does not exist");
return;
}
uint32_t agg_tid = agg_table->GetId();
uint32_t agg_pid = agg_table->GetPid();
if (auto status = TruncateTableInternal(agg_tid, agg_pid); !status.OK()) {
PDLOG(WARNING, "truncate aggrate table failed. tid[%u] pid[%u] index pos[%u]",
agg_tid, agg_pid, aggr->GetIndexPos());
base::SetResponseStatus(status, response);
return;
}
PDLOG(INFO, "truncate aggrate table success. tid[%u] pid[%u] index pos[%u]",
agg_tid, agg_pid, aggr->GetIndexPos());
}
}
response->set_code(::openmldb::base::ReturnCode::kOk);
response->set_msg("ok");
}

base::Status TabletImpl::TruncateTableInternal(uint32_t tid, uint32_t pid) {
std::shared_ptr<Table> table;
std::shared_ptr<Snapshot> snapshot;
std::shared_ptr<LogReplicator> replicator;
Expand All @@ -3471,50 +3503,37 @@ void TabletImpl::TruncateTable(RpcController* controller, const ::openmldb::api:
table = GetTableUnLock(tid, pid);
if (!table) {
DEBUGLOG("table does not exist. tid %u pid %u", tid, pid);
response->set_code(::openmldb::base::ReturnCode::kTableIsNotExist);
response->set_msg("table not found");
return;
return {::openmldb::base::ReturnCode::kTableIsNotExist, "table not found"};
}
snapshot = GetSnapshotUnLock(tid, pid);
if (!snapshot) {
PDLOG(WARNING, "snapshot does not exist. tid[%u] pid[%u]", tid, pid);
response->set_code(::openmldb::base::ReturnCode::kSnapshotIsNotExist);
response->set_msg("snapshot not found");
return;
return {::openmldb::base::ReturnCode::kSnapshotIsNotExist, "snapshot not found"};
}
replicator = GetReplicatorUnLock(tid, pid);
if (!replicator) {
PDLOG(WARNING, "replicator does not exist. tid[%u] pid[%u]", tid, pid);
response->set_code(::openmldb::base::ReturnCode::kReplicatorIsNotExist);
response->set_msg("replicator not found");
return;
return {::openmldb::base::ReturnCode::kReplicatorIsNotExist, "replicator not found"};
}
}
if (replicator->GetOffset() == 0) {
PDLOG(INFO, "table is empty, truncate success. tid[%u] pid[%u]", tid, pid);
response->set_code(::openmldb::base::ReturnCode::kOk);
response->set_msg("ok");
return;
return {};
}
if (table->GetTableStat() == ::openmldb::storage::kMakingSnapshot) {
PDLOG(WARNING, "making snapshot task is running now. tid[%u] pid[%u]", tid, pid);
response->set_code(::openmldb::base::ReturnCode::kTableStatusIsKmakingsnapshot);
response->set_msg("table status is kMakingSnapshot");
return;
return {::openmldb::base::ReturnCode::kTableStatusIsKmakingsnapshot, "table status is kMakingSnapshot"};
} else if (table->GetTableStat() == ::openmldb::storage::kLoading) {
PDLOG(WARNING, "table is loading now. tid[%u] pid[%u]", tid, pid);
response->set_code(::openmldb::base::ReturnCode::kTableIsLoading);
response->set_msg("table is loading data");
return;
return {::openmldb::base::ReturnCode::kTableIsLoading, "table is loading data"};
}
if (table->GetStorageMode() == openmldb::common::kMemory) {
auto table_meta = table->GetTableMeta();
std::shared_ptr<Table> new_table;
new_table = std::make_shared<MemTable>(*table_meta);
if (!new_table->Init()) {
PDLOG(WARNING, "fail to init table. tid %u, pid %u", table_meta->tid(), table_meta->pid());
response->set_msg("fail to init table");
return;
return {::openmldb::base::ReturnCode::kTableMetaIsIllegal, "fail to init table"};
}
new_table->SetTableStat(::openmldb::storage::kNormal);
{
Expand All @@ -3529,20 +3548,17 @@ void TabletImpl::TruncateTable(RpcController* controller, const ::openmldb::api:
} else {
LOG(WARNING) << "fail to add table " << table_meta->name()
<< " to catalog with db " << table_meta->db();
return {::openmldb::base::ReturnCode::kCatalogUpdateFailed, "fail to update catalog"};
}
}
} else {
auto disk_table = std::dynamic_pointer_cast<DiskTable>(table);
if (auto status = disk_table->Truncate(); !status.OK()) {
response->set_code(::openmldb::base::ReturnCode::kTruncateTableFailed);
response->set_msg(status.GetMsg());
return;
return {::openmldb::base::ReturnCode::kTruncateTableFailed, status.GetMsg()};
}
}

PDLOG(INFO, "truncate table success. tid[%u] pid[%u]", tid, pid);
response->set_code(::openmldb::base::ReturnCode::kOk);
response->set_msg("ok");
return {};
}

void TabletImpl::ExecuteGc(RpcController* controller, const ::openmldb::api::ExecuteGcRequest* request,
Expand Down
2 changes: 2 additions & 0 deletions src/tablet/tablet_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ class TabletImpl : public ::openmldb::api::TabletServer {
uint32_t partition_num, uint64_t last_time,
std::shared_ptr<::openmldb::api::TaskInfo> task);

base::Status TruncateTableInternal(uint32_t tid, uint32_t pid);

void ExtractIndexDataInternal(std::shared_ptr<::openmldb::storage::Table> table,
std::shared_ptr<::openmldb::storage::MemTableSnapshot> memtable_snapshot,
const std::vector<::openmldb::common::ColumnKey>& column_key,
Expand Down
36 changes: 36 additions & 0 deletions src/tablet/tablet_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,42 @@ TEST_P(TabletImplTest, ScanWithLatestN) {
ASSERT_FALSE(kv_it.Valid());
}

TEST_P(TabletImplTest, Truncate) {
::openmldb::common::StorageMode storage_mode = GetParam();
TabletImpl tablet;
uint32_t id = counter++;
tablet.Init("");
ASSERT_EQ(0, CreateDefaultTable("db0", "t0", id, 1, 0, 0, kAbsoluteTime, storage_mode, &tablet));
MockClosure closure;
for (int ts = 100; ts < 200; ts++) {
::openmldb::api::PutRequest prequest;
PackDefaultDimension("test1", &prequest);
prequest.set_time(ts);
prequest.set_value(::openmldb::test::EncodeKV("test1", "test" + std::to_string(ts)));
prequest.set_tid(id);
prequest.set_pid(1);
::openmldb::api::PutResponse presponse;
tablet.Put(NULL, &prequest, &presponse, &closure);
ASSERT_EQ(0, presponse.code());
}
::openmldb::api::TraverseRequest sr;
sr.set_tid(id);
sr.set_pid(1);
sr.set_limit(1000);
auto srp = std::make_shared<::openmldb::api::TraverseResponse>();
tablet.Traverse(NULL, &sr, srp.get(), &closure);
ASSERT_EQ(0, srp->code());
ASSERT_EQ(100, (signed)srp->count());
::openmldb::api::TruncateTableRequest tr;
tr.set_tid(id);
tr.set_pid(1);
auto trp = std::make_shared<::openmldb::api::TruncateTableResponse>();
tablet.TruncateTable(NULL, &tr, trp.get(), &closure);
ASSERT_EQ(0, trp->code());
tablet.Traverse(NULL, &sr, srp.get(), &closure);
ASSERT_EQ(0, srp->code());
ASSERT_EQ(0, (signed)srp->count());
}

TEST_P(TabletImplTest, Traverse) {
::openmldb::common::StorageMode storage_mode = GetParam();
Expand Down

0 comments on commit b25b165

Please sign in to comment.