Skip to content

Commit

Permalink
fix: fix get db without DbIdList key (#14059)
Browse files Browse the repository at this point in the history
  • Loading branch information
lichuang authored Dec 18, 2023
1 parent 614d556 commit 39eca43
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 1 deletion.
122 changes: 121 additions & 1 deletion src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::cmp::min;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Display;
use std::sync::Arc;

Expand Down Expand Up @@ -485,6 +487,28 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

if_then.push(txn_op_put(&db_id_key, serialize_struct(&db_meta)?)); // (db_id) -> db_meta
}

// add DbIdListKey if not exists
let dbid_idlist = DbIdListKey {
tenant: tenant_dbname.tenant.clone(),
db_name: tenant_dbname.db_name.clone(),
};
let (db_id_list_seq, db_id_list_opt): (_, Option<DbIdList>) =
get_pb_value(self, &dbid_idlist).await?;

if db_id_list_seq == 0 || db_id_list_opt.is_none() {
warn!(
"drop db:{:?}, db_id:{:?} has no DbIdListKey",
tenant_dbname, db_id
);

let mut db_id_list = DbIdList::new();
db_id_list.append(db_id);

condition.push(txn_cond_seq(&dbid_idlist, Eq, db_id_list_seq));
// _fd_db_id_list/<tenant>/<db_name> -> db_id_list
if_then.push(txn_op_put(&dbid_idlist, serialize_struct(&db_id_list)?));
};
}

let txn_req = TxnRequest {
Expand Down Expand Up @@ -795,7 +819,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

// List tables by tenant, db_id, table_name.
let dbid_tbname_idlist = DbIdListKey {
tenant: req.tenant,
tenant: req.tenant.clone(),
// Using a empty db to to list all
db_name: "".to_string(),
};
Expand Down Expand Up @@ -868,6 +892,77 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

// `list_database` can list db which has no `DbIdListKey`
if include_drop_db {
// if `include_drop_db` is true, return all db info which not exist in db_info_list
let db_id_set: HashSet<u64> = db_info_list
.iter()
.map(|db_info| db_info.ident.db_id)
.collect();

let all_dbs = self.list_databases(req).await?;
for db_info in all_dbs {
if !db_id_set.contains(&db_info.ident.db_id) {
warn!(
"get db history db:{:?}, db_id:{:?} has no DbIdListKey",
db_info.name_ident, db_info.ident.db_id
);
db_info_list.push(db_info);
}
}
} else {
// if `include_drop_db` is false, filter out db which drop_on time out of retention time
let db_id_set: HashSet<u64> = db_info_list
.iter()
.map(|db_info| db_info.ident.db_id)
.collect();

let all_dbs = self.list_databases(req).await?;
let mut add_dbinfo_map = HashMap::new();
let mut db_id_list = Vec::new();
for db_info in all_dbs {
if !db_id_set.contains(&db_info.ident.db_id) {
warn!(
"get db history db:{:?}, db_id:{:?} has no DbIdListKey",
db_info.name_ident, db_info.ident.db_id
);
db_id_list.push(DatabaseId {
db_id: db_info.ident.db_id,
});
add_dbinfo_map.insert(db_info.ident.db_id, db_info);
}
}
let inner_keys: Vec<String> = db_id_list
.iter()
.map(|db_id| db_id.to_string_key())
.collect();
let mut db_id_list_iter = db_id_list.into_iter();
for c in inner_keys.chunks(DEFAULT_MGET_SIZE) {
let db_meta_seq_meta_vec: Vec<(u64, Option<DatabaseMeta>)> =
mget_pb_values(self, c).await?;

for (db_meta_seq, db_meta) in db_meta_seq_meta_vec {
let db_id = db_id_list_iter.next().unwrap().db_id;
if db_meta_seq == 0 || db_meta.is_none() {
error!("get_database_history cannot find {:?} db_meta", db_id);
continue;
}
let db_meta = db_meta.unwrap();
// if include drop db, then no need to fill out of retention time db
if is_drop_time_out_of_retention_time(&db_meta.drop_on, &now) {
continue;
}
if let Some(db_info) = add_dbinfo_map.get(&db_id) {
warn!(
"get db history db:{:?}, db_id:{:?} has no DbIdListKey",
db_info.name_ident, db_info.ident.db_id
);
db_info_list.push(db_info.clone());
}
}
}
}

return Ok(db_info_list);
}

Expand Down Expand Up @@ -2524,6 +2619,31 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

// add TableIdListKey if not exist
{
// get table id list from _fd_table_id_list/db_id/table_name
let dbid_tbname_idlist = TableIdListKey {
db_id,
table_name: dbid_tbname.table_name.clone(),
};
let (tb_id_list_seq, _tb_id_list_opt): (_, Option<TableIdList>) =
get_pb_value(self, &dbid_tbname_idlist).await?;
if tb_id_list_seq == 0 {
let mut tb_id_list = TableIdList::new();
tb_id_list.append(table_id);

warn!(
"drop table:{:?}, table_id:{:?} has no TableIdList",
dbid_tbname, table_id
);

condition.push(txn_cond_seq(&dbid_tbname_idlist, Eq, tb_id_list_seq));
if_then.push(txn_op_put(
&dbid_tbname_idlist,
serialize_struct(&tb_id_list)?,
));
}
}
let txn_req = TxnRequest {
condition,
if_then,
Expand Down
117 changes: 117 additions & 0 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ use databend_common_meta_types::UpsertKV;
use log::debug;
use log::info;

use crate::deserialize_struct;
use crate::is_all_db_data_removed;
use crate::kv_app_error::KVAppError;
use crate::serialize_struct;
Expand Down Expand Up @@ -286,6 +287,10 @@ impl SchemaApiTestSuite {
suite
.table_drop_without_db_id_to_name(&b.build().await)
.await?;
suite.list_db_without_db_id_list(&b.build().await).await?;
suite
.drop_table_without_table_id_list(&b.build().await)
.await?;
suite.table_rename(&b.build().await).await?;
suite.table_update_meta(&b.build().await).await?;
suite.table_update_mask_policy(&b.build().await).await?;
Expand Down Expand Up @@ -1910,6 +1915,118 @@ impl SchemaApiTestSuite {
Ok(())
}

#[minitrace::trace]
async fn list_db_without_db_id_list<MT>(&self, mt: &MT) -> anyhow::Result<()>
where MT: SchemaApi + kvapi::AsKVApi<Error = MetaError> {
// test drop a db without db_id_list
{
let tenant = "tenant1";
let db = "db1";
let mut util = Util::new(mt, tenant, db, "tb2", "JSON");
util.create_db().await?;

// remove db id list
let dbid_idlist = DbIdListKey {
tenant: tenant.to_string(),
db_name: db.to_string(),
};
util.mt
.as_kv_api()
.upsert_kv(UpsertKV::delete(dbid_idlist.to_string_key()))
.await?;

// drop db
util.drop_db().await?;

// after drop db, check if db id list has been added
let value = util
.mt
.as_kv_api()
.get_kv(&dbid_idlist.to_string_key())
.await?;

assert!(value.is_some());
let seqv = value.unwrap();
let db_id_list: DbIdList = deserialize_struct(&seqv.data)?;
assert_eq!(db_id_list.id_list[0], util.db_id);
}
// test get_database_history can return db without db_id_list
{
let tenant = "tenant2";
let db = "db2";
let mut util = Util::new(mt, tenant, db, "tb2", "JSON");
util.create_db().await?;

// remove db id list
let dbid_idlist = DbIdListKey {
tenant: tenant.to_string(),
db_name: db.to_string(),
};
util.mt
.as_kv_api()
.upsert_kv(UpsertKV::delete(dbid_idlist.to_string_key()))
.await?;

let res = mt
.get_database_history(ListDatabaseReq {
tenant: tenant.to_string(),
filter: None,
})
.await?;

// check if get_database_history return db_id
let mut found = false;
for db_info in res {
if db_info.ident.db_id == util.db_id {
found = true;
break;
}
}

assert!(found);
}
Ok(())
}

#[minitrace::trace]
async fn drop_table_without_table_id_list<MT>(&self, mt: &MT) -> anyhow::Result<()>
where MT: SchemaApi + kvapi::AsKVApi<Error = MetaError> {
// test drop a table without table_id_list
let tenant = "tenant1";
let db = "db1";
let table = "tb1";
let mut util = Util::new(mt, tenant, db, table, "JSON");
util.create_db().await?;
let (tid, _table_meta) = util.create_table().await?;

// remove db id list
let table_id_idlist = TableIdListKey {
db_id: util.db_id,
table_name: table.to_string(),
};
util.mt
.as_kv_api()
.upsert_kv(UpsertKV::delete(table_id_idlist.to_string_key()))
.await?;

// drop table
util.drop_table_by_id().await?;

// after drop table, check if table id list has been added
let value = util
.mt
.as_kv_api()
.get_kv(&table_id_idlist.to_string_key())
.await?;

assert!(value.is_some());
let seqv = value.unwrap();
let id_list: TableIdList = deserialize_struct(&seqv.data)?;
assert_eq!(id_list.id_list[0], tid);

Ok(())
}

#[minitrace::trace]
async fn table_rename<MT: SchemaApi>(&self, mt: &MT) -> anyhow::Result<()> {
let tenant = "tenant1";
Expand Down

0 comments on commit 39eca43

Please sign in to comment.