Skip to content

Commit

Permalink
refact: refact delete in tablet_impl
Browse files Browse the repository at this point in the history
  • Loading branch information
dl239 committed Oct 24, 2023
1 parent 48e8706 commit 823a2b1
Showing 1 changed file with 57 additions and 56 deletions.
113 changes: 57 additions & 56 deletions src/tablet/tablet_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1597,79 +1597,80 @@ void TabletImpl::Delete(RpcController* controller, const ::openmldb::api::Delete
PDLOG(WARNING, "invalid args. tid %u, pid %u", tid, pid);
return;
}
if (table->Delete(entry)) {
response->set_code(::openmldb::base::ReturnCode::kOk);
response->set_msg("ok");
DEBUGLOG("delete ok. tid %u, pid %u, key %s", tid, pid, request->key().c_str());
} else {
response->set_code(::openmldb::base::ReturnCode::kDeleteFailed);
response->set_msg("delete failed");
return;
}
auto get_aggregator = [this](uint32_t tid, uint32_t pid, uint32_t idx) -> std::shared_ptr<Aggregator> {
auto aggrs = GetAggregators(tid, pid);
if (aggrs) {
for (const auto& aggr : *aggrs) {
if (aggr->GetIndexPos() == idx) {
return aggr;
}
}
}
return {};
};
response->set_code(::openmldb::base::ReturnCode::kOk);
response->set_msg("ok");
std::optional<uint64_t> start_ts = entry.has_ts() ? std::optional<uint64_t>{entry.ts()} : std::nullopt;
std::optional<uint64_t> end_ts = entry.has_end_ts() ? std::optional<uint64_t>{entry.end_ts()} : std::nullopt;
if (entry.dimensions_size() > 0) {
for (const auto& dimension : entry.dimensions()) {
if (!table->Delete(dimension.idx(), dimension.key(), start_ts, end_ts)) {
response->set_code(::openmldb::base::ReturnCode::kDeleteFailed);
response->set_msg("delete failed");
return;
}
auto aggr = get_aggregator(tid, pid, dimension.idx());
if (aggr) {
if (!aggr->Delete(dimension.key(), start_ts, end_ts)) {
PDLOG(WARNING, "delete from aggr failed. base table: tid[%u] pid[%u] index[%u] key[%s]. "
"aggr table: tid[%u]",
tid, pid, idx, dimension.key().c_str(), aggr->GetAggrTid());
response->set_code(::openmldb::base::ReturnCode::kDeleteFailed);
response->set_msg("delete from associated pre-aggr table failed");
return;
}
}
DEBUGLOG("delete ok. tid %u, pid %u, key %s", tid, pid, dimension.key().c_str());
auto aggrs = GetAggregators(tid, pid);
if (!aggrs) {
if (table->Delete(entry)) {
DEBUGLOG("delete ok. tid %u, pid %u, key %s", tid, pid, request->key().c_str());
} else {
response->set_code(::openmldb::base::ReturnCode::kDeleteFailed);
response->set_msg("delete failed");
return;

Check warning on line 1607 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1605-L1607

Added lines #L1605 - L1607 were not covered by tests
}
} else {
for (const auto& index_def : table->GetAllIndex()) {
if (!index_def || !index_def->IsReady()) {
continue;
auto get_aggregator = [this](std::shared_ptr<Aggrs> aggrs, uint32_t idx) -> std::shared_ptr<Aggregator> {
if (aggrs) {
for (const auto& aggr : *aggrs) {
if (aggr->GetIndexPos() == idx) {
return aggr;
}
}
}
uint32_t idx = index_def->GetId();
std::unique_ptr<storage::TraverseIterator> iter(table->NewTraverseIterator(idx));
iter->SeekToFirst();
while (iter->Valid()) {
auto pk = iter->GetPK();
iter->NextPK();
if (!table->Delete(idx, pk, start_ts, end_ts)) {
return {};

Check warning on line 1618 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1618

Added line #L1618 was not covered by tests
};
std::optional<uint64_t> start_ts = entry.has_ts() ? std::optional<uint64_t>{entry.ts()} : std::nullopt;
std::optional<uint64_t> end_ts = entry.has_end_ts() ? std::optional<uint64_t>{entry.end_ts()} : std::nullopt;
if (entry.dimensions_size() > 0) {
for (const auto& dimension : entry.dimensions()) {
if (!table->Delete(dimension.idx(), dimension.key(), start_ts, end_ts)) {
response->set_code(::openmldb::base::ReturnCode::kDeleteFailed);
response->set_msg("delete failed");
return;

Check warning on line 1627 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1625-L1627

Added lines #L1625 - L1627 were not covered by tests
}
auto aggr = get_aggregator(tid, pid, idx);
auto aggr = get_aggregator(aggrs, dimension.idx());
if (aggr) {
if (!aggr->Delete(pk, start_ts, end_ts)) {
if (!aggr->Delete(dimension.key(), start_ts, end_ts)) {
PDLOG(WARNING, "delete from aggr failed. base table: tid[%u] pid[%u] index[%u] key[%s]. "

Check warning on line 1632 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1632

Added line #L1632 was not covered by tests
"aggr table: tid[%u]", tid, pid, idx, pk.c_str(), aggr->GetAggrTid());
"aggr table: tid[%u]",
tid, pid, idx, dimension.key().c_str(), aggr->GetAggrTid());
response->set_code(::openmldb::base::ReturnCode::kDeleteFailed);
response->set_msg("delete from associated pre-aggr table failed");
return;

Check warning on line 1637 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1635-L1637

Added lines #L1635 - L1637 were not covered by tests
}
}
DEBUGLOG("delete ok. tid %u, pid %u, key %s", tid, pid, dimension.key().c_str());
}
} else {
for (const auto& index_def : table->GetAllIndex()) {
if (!index_def || !index_def->IsReady()) {
continue;

Check warning on line 1645 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1643-L1645

Added lines #L1643 - L1645 were not covered by tests
}
uint32_t idx = index_def->GetId();
std::unique_ptr<storage::TraverseIterator> iter(table->NewTraverseIterator(idx));
iter->SeekToFirst();
while (iter->Valid()) {
auto pk = iter->GetPK();
iter->NextPK();
if (!table->Delete(idx, pk, start_ts, end_ts)) {
response->set_code(::openmldb::base::ReturnCode::kDeleteFailed);
response->set_msg("delete failed");
return;

Check warning on line 1656 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1647-L1656

Added lines #L1647 - L1656 were not covered by tests
}
auto aggr = get_aggregator(aggrs, idx);
if (aggr) {
if (!aggr->Delete(pk, start_ts, end_ts)) {
PDLOG(WARNING, "delete from aggr failed. base table: tid[%u] pid[%u] index[%u] key[%s]. "

Check warning on line 1661 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1658-L1661

Added lines #L1658 - L1661 were not covered by tests
"aggr table: tid[%u]", tid, pid, idx, pk.c_str(), aggr->GetAggrTid());
response->set_code(::openmldb::base::ReturnCode::kDeleteFailed);
response->set_msg("delete from associated pre-aggr table failed");
return;

Check warning on line 1665 in src/tablet/tablet_impl.cc

View check run for this annotation

Codecov / codecov/patch

src/tablet/tablet_impl.cc#L1663-L1665

Added lines #L1663 - L1665 were not covered by tests
}
}
}
}
}
}
response->set_code(::openmldb::base::ReturnCode::kOk);
response->set_msg("ok");

replicator->AppendEntry(entry);
if (FLAGS_binlog_notify_on_put) {
Expand Down

0 comments on commit 823a2b1

Please sign in to comment.