From 10ab66f3e024716452fd208276bdef1fdd2f3526 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Thu, 14 Dec 2023 17:48:24 +0800 Subject: [PATCH] feat: Introduced `MetaSpec` for setting relative expiration This is a compatible change with new feature provided. - This commit introduces a new type, `MetaSpec`, which specifies content of the `KVMeta` to be stored for a key. This new type is particularly utilized in `upsert` requests to enable the setting of relative expiration times. Previously, the `KVMeta` type was used directly for this purpose. However, to avoid impacting existing storage data types, `MetaSpec` has been specifically added for use in `upsert` operations. When applying a raft-log, a `KVMeta` is built from `MetaSpec`. Designed with backward compatibility, `MetaSpec` maintains a serialized format compatible with `KVMeta`, ensuring no disruption to existing functionality. - We introduce two new types `Time` and `Interval` to reprensent serde-able time stamp and time interval. - Tests are added to ensure meta-service works correctly with API with ttl support, but databend-query does not use these API yet. --- src/meta/api/src/background_api_impl.rs | 4 +- src/meta/api/src/schema_api_impl.rs | 1 + src/meta/api/src/util.rs | 2 + src/meta/kvapi/src/kvapi/test_suite.rs | 122 ++++++++--- src/meta/raft-store/src/applier.rs | 16 +- src/meta/raft-store/src/sm_v002/sm_v002.rs | 11 +- src/meta/raft-store/src/state_machine/sm.rs | 25 ++- .../tests/it/state_machine/expire.rs | 4 +- .../raft-store/tests/it/state_machine/mod.rs | 20 +- .../tests/it/grpc/metasrv_grpc_kv_read_v1.rs | 4 +- .../tests/it/grpc/metasrv_grpc_watch.rs | 1 + .../it/meta_node/meta_node_kv_api_expire.rs | 5 +- src/meta/types/proto/request.proto | 9 + src/meta/types/src/cmd/cmd_context.rs | 38 ++++ src/meta/types/src/cmd/meta_spec.rs | 132 ++++++++++++ src/meta/types/src/{cmd.rs => cmd/mod.rs} | 34 ++- src/meta/types/src/lib.rs | 5 +- src/meta/types/src/proto_ext/seq_v_ext.rs | 2 +- src/meta/types/src/proto_ext/txn_ext.rs | 16 ++ src/meta/types/src/seq_value.rs | 7 +- src/meta/types/src/time.rs | 194 ++++++++++++++++++ .../management/src/cluster/cluster_mgr.rs | 6 +- src/query/management/tests/it/cluster.rs | 22 +- 23 files changed, 597 insertions(+), 83 deletions(-) create mode 100644 src/meta/types/src/cmd/cmd_context.rs create mode 100644 src/meta/types/src/cmd/meta_spec.rs rename src/meta/types/src/{cmd.rs => cmd/mod.rs} (86%) create mode 100644 src/meta/types/src/time.rs diff --git a/src/meta/api/src/background_api_impl.rs b/src/meta/api/src/background_api_impl.rs index 479e6531953ca..d006148f2a656 100644 --- a/src/meta/api/src/background_api_impl.rs +++ b/src/meta/api/src/background_api_impl.rs @@ -41,9 +41,9 @@ use common_meta_app::background::UpdateBackgroundTaskReq; use common_meta_kvapi::kvapi; use common_meta_kvapi::kvapi::Key; use common_meta_kvapi::kvapi::UpsertKVReq; +use common_meta_types::cmd::MetaSpec; use common_meta_types::ConditionResult::Eq; use common_meta_types::InvalidReply; -use common_meta_types::KVMeta; use common_meta_types::MatchSeq; use common_meta_types::MatchSeq::Any; use common_meta_types::MetaError; @@ -243,7 +243,7 @@ impl> BackgroundApi for KV { name_key.to_string_key().as_str(), Any, Operation::Update(serialize_struct(&meta)?), - Some(KVMeta::new_expire(req.expire_at)), + Some(MetaSpec::new_expire(req.expire_at)), )) .await?; // confirm a successful update diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 893d636205558..f9c515142f81e 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -4182,6 +4182,7 @@ fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp { value: 1_i8.to_le_bytes().to_vec(), prev_value: false, expire_at, + ttl_ms: None, })), } } diff --git a/src/meta/api/src/util.rs b/src/meta/api/src/util.rs index d711dcf2b4fcb..e1b52e41f79e8 100644 --- a/src/meta/api/src/util.rs +++ b/src/meta/api/src/util.rs @@ -332,6 +332,7 @@ pub fn txn_op_put(key: &impl kvapi::Key, value: Vec) -> TxnOp { value, prev_value: true, expire_at: None, + ttl_ms: None, })), } } @@ -344,6 +345,7 @@ pub fn txn_op_put_with_expire(key: &impl kvapi::Key, value: Vec, expire_at: value, prev_value: true, expire_at: Some(expire_at), + ttl_ms: None, })), } } diff --git a/src/meta/kvapi/src/kvapi/test_suite.rs b/src/meta/kvapi/src/kvapi/test_suite.rs index 5e291b33cd943..513a1684a3f52 100644 --- a/src/meta/kvapi/src/kvapi/test_suite.rs +++ b/src/meta/kvapi/src/kvapi/test_suite.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::SystemTime; -use std::time::UNIX_EPOCH; +use std::time::Duration; +use common_meta_types::cmd::MetaSpec; use common_meta_types::protobuf as pb; use common_meta_types::txn_condition; use common_meta_types::txn_op; @@ -40,6 +40,7 @@ use common_meta_types::TxnRequest; use common_meta_types::With; use log::debug; use log::info; +use minitrace::full_name; use minitrace::func_name; use crate::kvapi; @@ -58,11 +59,13 @@ impl kvapi::TestSuite { self.kv_delete(&builder.build().await).await?; self.kv_update(&builder.build().await).await?; self.kv_timeout(&builder.build().await).await?; + self.kv_upsert_with_ttl(&builder.build().await).await?; self.kv_meta(&builder.build().await).await?; self.kv_list(&builder.build().await).await?; self.kv_mget(&builder.build().await).await?; self.kv_txn_absent_seq_0(&builder.build().await).await?; self.kv_transaction(&builder.build().await).await?; + self.kv_transaction_with_ttl(&builder.build().await).await?; self.kv_transaction_delete_match_seq_none(&builder.build().await) .await?; self.kv_transaction_delete_match_seq_some_not_match(&builder.build().await) @@ -248,13 +251,10 @@ impl kvapi::TestSuite { // - Test list expired and non-expired. // - Test update with a new expire value. - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); + let now_sec = SeqV::<()>::now_sec(); let _res = kv - .upsert_kv(UpsertKVReq::update("k1", b"v1").with(KVMeta::new_expire(now + 2))) + .upsert_kv(UpsertKVReq::update("k1", b"v1").with(MetaSpec::new_expire(now_sec + 2))) .await?; // dbg!("upsert non expired k1", _res); @@ -267,17 +267,14 @@ impl kvapi::TestSuite { info!("---get expired"); { - tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await; + tokio::time::sleep(Duration::from_millis(3000)).await; let res = kv.get_kv("k1").await?; // dbg!("k1 expired", &res); debug!("got k1:{:?}", res); assert!(res.is_none(), "got expired"); } - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); + let now_sec = SeqV::<()>::now_sec(); info!("--- expired entry act as if it does not exist, an ADD op should apply"); { @@ -286,7 +283,7 @@ impl kvapi::TestSuite { .upsert_kv( UpsertKVReq::update("k1", b"v1") .with(MatchSeq::Exact(0)) - .with(KVMeta::new_expire(now - 1)), + .with(MetaSpec::new_expire(now_sec - 1)), ) .await?; // dbg!("update expired k1", _res); @@ -295,7 +292,7 @@ impl kvapi::TestSuite { .upsert_kv( UpsertKVReq::update("k2", b"v2") .with(MatchSeq::Exact(0)) - .with(KVMeta::new_expire(now + 10)), + .with(MetaSpec::new_expire(now_sec + 10)), ) .await?; // dbg!("update non expired k2", _res); @@ -306,7 +303,7 @@ impl kvapi::TestSuite { None, Some(SeqV::with_meta( 3, - Some(KVMeta::new_expire(now + 10)), + Some(KVMeta::new_expire(now_sec + 10)), b"v2".to_vec() )) ]); @@ -325,7 +322,7 @@ impl kvapi::TestSuite { kv.upsert_kv( UpsertKVReq::update("k2", b"v2") .with(MatchSeq::Exact(3)) - .with(KVMeta::new_expire(now - 1)), + .with(MetaSpec::new_expire(now_sec - 1)), ) .await?; @@ -336,16 +333,42 @@ impl kvapi::TestSuite { Ok(()) } + #[minitrace::trace] + pub async fn kv_upsert_with_ttl(&self, kv: &KV) -> anyhow::Result<()> { + // - Add with ttl + + info!("--- {}", full_name!()); + + let _res = kv + .upsert_kv( + UpsertKVReq::update("k1", b"v1") + .with(MetaSpec::new_ttl(Duration::from_millis(2_000))), + ) + .await?; + + info!("---get unexpired"); + { + let res = kv.get_kv("k1").await?; + assert!(res.is_some(), "got unexpired"); + } + + info!("---get expired"); + { + tokio::time::sleep(Duration::from_millis(2_100)).await; + let res = kv.get_kv("k1").await?; + assert!(res.is_none(), "got expired"); + } + + Ok(()) + } + #[minitrace::trace] pub async fn kv_meta(&self, kv: &KV) -> anyhow::Result<()> { info!("--- kvapi::KVApiTestSuite::kv_meta() start"); let test_key = "test_key_for_update_meta"; - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); + let now_sec = SeqV::<()>::now_sec(); let r = kv.upsert_kv(UpsertKVReq::update(test_key, b"v1")).await?; assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.result); @@ -358,7 +381,7 @@ impl kvapi::TestSuite { test_key, MatchSeq::Exact(seq + 1), Operation::AsIs, - Some(KVMeta::new_expire(now + 20)), + Some(MetaSpec::new_expire(now_sec + 20)), )) .await?; assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.prev); @@ -371,14 +394,14 @@ impl kvapi::TestSuite { test_key, MatchSeq::Exact(seq), Operation::AsIs, - Some(KVMeta::new_expire(now + 20)), + Some(MetaSpec::new_expire(now_sec + 20)), )) .await?; assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.prev); assert_eq!( Some(SeqV::with_meta( 2, - Some(KVMeta::new_expire(now + 20)), + Some(KVMeta::new_expire(now_sec + 20)), b"v1".to_vec() )), r.result @@ -388,7 +411,11 @@ impl kvapi::TestSuite { let key_value = kv.get_kv(test_key).await?; assert!(key_value.is_some()); assert_eq!( - SeqV::with_meta(seq + 1, Some(KVMeta::new_expire(now + 20)), b"v1".to_vec()), + SeqV::with_meta( + seq + 1, + Some(KVMeta::new_expire(now_sec + 20)), + b"v1".to_vec() + ), key_value.unwrap(), ); @@ -484,6 +511,7 @@ impl kvapi::TestSuite { value: b"new_v1".to_vec(), prev_value: true, expire_at: None, + ttl_ms: None, })), }]; @@ -645,6 +673,7 @@ impl kvapi::TestSuite { value: b"new_v1".to_vec(), prev_value: true, expire_at: None, + ttl_ms: None, })), }]; @@ -697,6 +726,7 @@ impl kvapi::TestSuite { value: b"new_v1".to_vec(), prev_value: true, expire_at: None, + ttl_ms: None, })), }]; @@ -752,6 +782,7 @@ impl kvapi::TestSuite { value: val1_new.to_vec(), prev_value: true, expire_at: None, + ttl_ms: None, })), }, // change k2 @@ -761,6 +792,7 @@ impl kvapi::TestSuite { value: b"new_v2".to_vec(), prev_value: true, expire_at: None, + ttl_ms: None, })), }, // get k1 @@ -870,6 +902,7 @@ impl kvapi::TestSuite { value: val1_new.to_vec(), prev_value: true, expire_at: None, + ttl_ms: None, })), }, // get k1 @@ -915,6 +948,43 @@ impl kvapi::TestSuite { Ok(()) } + #[minitrace::trace] + pub async fn kv_transaction_with_ttl(&self, kv: &KV) -> anyhow::Result<()> { + // - Add a record via transaction with ttl + + info!("--- {}", full_name!()); + + let txn = TxnRequest { + condition: vec![], + if_then: vec![TxnOp::put_with_ttl("k1", b("v1"), Some(2_000))], + else_then: vec![], + }; + + let _resp = kv.transaction(txn).await?; + + let _res = kv + .upsert_kv( + UpsertKVReq::update("k1", b"v1") + .with(MetaSpec::new_ttl(Duration::from_millis(2_000))), + ) + .await?; + + info!("---get unexpired"); + { + let res = kv.get_kv("k1").await?; + assert!(res.is_some(), "got unexpired"); + } + + info!("---get expired"); + { + tokio::time::sleep(Duration::from_millis(2_100)).await; + let res = kv.get_kv("k1").await?; + assert!(res.is_none(), "got expired"); + } + + Ok(()) + } + /// If `TxnDeleteRequest.match_seq` is not set, /// the delete operation will always be executed. pub async fn kv_transaction_delete_match_seq_none( @@ -1086,3 +1156,7 @@ impl kvapi::TestSuite { Ok(()) } } + +fn b(s: &str) -> Vec { + s.as_bytes().to_vec() +} diff --git a/src/meta/raft-store/src/applier.rs b/src/meta/raft-store/src/applier.rs index e9948bee8d99c..a76f9750aa041 100644 --- a/src/meta/raft-store/src/applier.rs +++ b/src/meta/raft-store/src/applier.rs @@ -16,6 +16,8 @@ use std::io; use std::time::Duration; use std::time::SystemTime; +use common_meta_types::cmd::CmdContext; +use common_meta_types::cmd::MetaSpec; use common_meta_types::protobuf as pb; use common_meta_types::txn_condition; use common_meta_types::txn_op; @@ -26,7 +28,6 @@ use common_meta_types::Cmd; use common_meta_types::ConditionResult; use common_meta_types::Entry; use common_meta_types::EntryPayload; -use common_meta_types::KVMeta; use common_meta_types::MatchSeq; use common_meta_types::Node; use common_meta_types::SeqV; @@ -61,6 +62,9 @@ use crate::sm_v002::SMV002; pub struct Applier<'a> { sm: &'a mut SMV002, + /// The context of the current applying log. + cmd_ctx: CmdContext, + /// The changes has been made by the applying one log entry changes: Vec, String>>, } @@ -69,6 +73,7 @@ impl<'a> Applier<'a> { pub fn new(sm: &'a mut SMV002) -> Self { Self { sm, + cmd_ctx: CmdContext::from_millis(0), changes: Vec::new(), } } @@ -83,6 +88,8 @@ impl<'a> Applier<'a> { let log_id = &entry.log_id; let log_time_ms = Self::get_log_time(entry); + self.cmd_ctx = CmdContext::from_millis(log_time_ms); + self.clean_expired_kvs(log_time_ms).await?; *self.sm.sys_data_mut().last_applied_mut() = Some(*log_id); @@ -208,7 +215,10 @@ impl<'a> Applier<'a> { ) -> Result<(Option, Option), io::Error> { debug!(upsert_kv = as_debug!(upsert_kv); "upsert_kv"); - let (prev, result) = self.sm.upsert_kv_primary_index(upsert_kv).await?; + let (prev, result) = self + .sm + .upsert_kv_primary_index(upsert_kv, &self.cmd_ctx) + .await?; self.sm .update_expire_index(&upsert_kv.key, &prev, &result) @@ -381,7 +391,7 @@ impl<'a> Applier<'a> { put: &TxnPutRequest, resp: &mut TxnReply, ) -> Result<(), io::Error> { - let upsert = UpsertKV::update(&put.key, &put.value).with(KVMeta::new(put.expire_at)); + let upsert = UpsertKV::update(&put.key, &put.value).with(MetaSpec::new(put.expire_at)); let (prev, _result) = self.upsert_kv(&upsert).await?; diff --git a/src/meta/raft-store/src/sm_v002/sm_v002.rs b/src/meta/raft-store/src/sm_v002/sm_v002.rs index a164257317b99..737190bba146f 100644 --- a/src/meta/raft-store/src/sm_v002/sm_v002.rs +++ b/src/meta/raft-store/src/sm_v002/sm_v002.rs @@ -23,6 +23,7 @@ use common_meta_kvapi::kvapi::KVStream; use common_meta_kvapi::kvapi::MGetKVReply; use common_meta_kvapi::kvapi::UpsertKVReply; use common_meta_kvapi::kvapi::UpsertKVReq; +use common_meta_types::cmd::CmdContext; use common_meta_types::protobuf::StreamItem; use common_meta_types::AppliedState; use common_meta_types::Entry; @@ -385,6 +386,7 @@ impl SMV002 { pub(crate) async fn upsert_kv_primary_index( &mut self, upsert_kv: &UpsertKV, + cmd_ctx: &CmdContext, ) -> Result<(Marked>, Marked>), io::Error> { let prev = self.levels.str_map().get(&upsert_kv.key).await?.clone(); @@ -397,7 +399,10 @@ impl SMV002 { self.levels .set( upsert_kv.key.clone(), - Some((v.clone(), upsert_kv.value_meta.clone())), + Some(( + v.clone(), + upsert_kv.value_meta.as_ref().map(|m| m.to_kv_meta(cmd_ctx)), + )), ) .await? } @@ -406,13 +411,13 @@ impl SMV002 { MapApiExt::update_meta( &mut self.levels, upsert_kv.key.clone(), - upsert_kv.value_meta.clone(), + upsert_kv.value_meta.as_ref().map(|m| m.to_kv_meta(cmd_ctx)), ) .await? } }; - let expire_ms = upsert_kv.get_expire_at_ms().unwrap_or(u64::MAX); + let expire_ms = upsert_kv.eval_expire_at_ms(); if expire_ms < self.expire_cursor.time_ms { // The record has expired, delete it at once. // diff --git a/src/meta/raft-store/src/state_machine/sm.rs b/src/meta/raft-store/src/state_machine/sm.rs index ef4f6d0186d49..61b9a34d48019 100644 --- a/src/meta/raft-store/src/state_machine/sm.rs +++ b/src/meta/raft-store/src/state_machine/sm.rs @@ -27,6 +27,8 @@ use common_meta_sled_store::SledTree; use common_meta_sled_store::Store; use common_meta_sled_store::TransactionSledTree; use common_meta_stoerr::MetaStorageError; +use common_meta_types::cmd::CmdContext; +use common_meta_types::cmd::MetaSpec; use common_meta_types::protobuf as pb; use common_meta_types::txn_condition; use common_meta_types::txn_op; @@ -37,7 +39,6 @@ use common_meta_types::Cmd; use common_meta_types::ConditionResult; use common_meta_types::Entry; use common_meta_types::EntryPayload; -use common_meta_types::KVMeta; use common_meta_types::LogId; use common_meta_types::MatchSeq; use common_meta_types::MatchSeqExt; @@ -532,7 +533,7 @@ impl StateMachine { ) -> Result<(), MetaStorageError> { let (expired, prev, result) = Self::txn_upsert_kv( txn_tree, - &UpsertKV::update(&put.key, &put.value).with(KVMeta::new(put.expire_at)), + &UpsertKV::update(&put.key, &put.value).with(MetaSpec::new(put.expire_at)), log_time_ms, )?; @@ -903,6 +904,8 @@ impl StateMachine { upsert_kv: &UpsertKV, log_time_ms: u64, ) -> Result<(Option, Option, Option), MetaStorageError> { + let cmd_ctx = CmdContext::from_millis(log_time_ms); + let kvs = txn_tree.key_space::(); let prev = kvs.get(&upsert_kv.key)?; @@ -915,16 +918,26 @@ impl StateMachine { } let mut new_seq_v = match &upsert_kv.value { - Operation::Update(v) => SeqV::with_meta(0, upsert_kv.value_meta.clone(), v.clone()), + Operation::Update(v) => SeqV::with_meta( + 0, + upsert_kv + .value_meta + .as_ref() + .map(|x| x.to_kv_meta(&cmd_ctx)), + v.clone(), + ), Operation::Delete => { kvs.remove(&upsert_kv.key)?; return Ok((expired, prev, None)); } Operation::AsIs => match prev { None => return Ok((expired, prev, None)), - Some(ref prev_kv_value) => { - prev_kv_value.clone().set_meta(upsert_kv.value_meta.clone()) - } + Some(ref prev_kv_value) => prev_kv_value.clone().set_meta( + upsert_kv + .value_meta + .as_ref() + .map(|m| m.to_kv_meta(&cmd_ctx)), + ), }, }; diff --git a/src/meta/raft-store/tests/it/state_machine/expire.rs b/src/meta/raft-store/tests/it/state_machine/expire.rs index c9c69d72db721..24ee8bd73daa8 100644 --- a/src/meta/raft-store/tests/it/state_machine/expire.rs +++ b/src/meta/raft-store/tests/it/state_machine/expire.rs @@ -20,11 +20,11 @@ use common_meta_raft_store::key_spaces::GenericKV; use common_meta_raft_store::state_machine::ExpireKey; use common_meta_raft_store::state_machine::StateMachine; use common_meta_sled_store::AsKeySpace; +use common_meta_types::cmd::MetaSpec; use common_meta_types::new_log_id; use common_meta_types::Cmd; use common_meta_types::Entry; use common_meta_types::EntryPayload; -use common_meta_types::KVMeta; use common_meta_types::LogEntry; use common_meta_types::UpsertKV; use common_meta_types::With; @@ -154,7 +154,7 @@ fn ent(index: u64, key: &str, expire: Option, time_ms: Option) -> Entr payload: EntryPayload::Normal(LogEntry { txid: None, time_ms, - cmd: Cmd::UpsertKV(UpsertKV::update(key, key.as_bytes()).with(KVMeta::new(expire))), + cmd: Cmd::UpsertKV(UpsertKV::update(key, key.as_bytes()).with(MetaSpec::new(expire))), }), } } diff --git a/src/meta/raft-store/tests/it/state_machine/mod.rs b/src/meta/raft-store/tests/it/state_machine/mod.rs index 5b843e6d361a8..9dd053e9c1b34 100644 --- a/src/meta/raft-store/tests/it/state_machine/mod.rs +++ b/src/meta/raft-store/tests/it/state_machine/mod.rs @@ -17,6 +17,8 @@ use std::time::UNIX_EPOCH; use common_meta_kvapi::kvapi::KVApi; use common_meta_raft_store::state_machine::StateMachine; +use common_meta_types::cmd::CmdContext; +use common_meta_types::cmd::MetaSpec; use common_meta_types::new_log_id; use common_meta_types::AppliedState; use common_meta_types::Change; @@ -125,7 +127,7 @@ async fn test_state_machine_apply_non_dup_generic_kv_upsert_get() -> anyhow::Res key: String, seq: MatchSeq, value: Vec, - value_meta: Option, + value_meta: Option, // want: prev: Option>>, result: Option>>, @@ -139,14 +141,20 @@ async fn test_state_machine_apply_non_dup_generic_kv_upsert_get() -> anyhow::Res prev: Option<(u64, &'static str)>, result: Option<(u64, &'static str)>, ) -> T { - let m = meta.map(KVMeta::new_expire); + let m = meta.map(MetaSpec::new_expire); T { key: name.to_string(), seq, value: value.to_string().into_bytes(), value_meta: m.clone(), prev: prev.map(|(a, b)| SeqV::new(a, b.into())), - result: result.map(|(a, b)| SeqV::with_meta(a, m, b.into())), + result: result.map(|(a, b)| { + SeqV::with_meta( + a, + m.map(|m| m.to_kv_meta(&CmdContext::from_millis(0))), + b.into(), + ) + }), } } @@ -281,7 +289,7 @@ async fn test_state_machine_apply_non_dup_generic_kv_value_meta() -> anyhow::Res key: key.clone(), seq: MatchSeq::GE(0), value: Operation::AsIs, - value_meta: Some(KVMeta::new_expire(now + 10)), + value_meta: Some(MetaSpec::new_expire(now + 10)), }), &mut t, None, @@ -306,7 +314,7 @@ async fn test_state_machine_apply_non_dup_generic_kv_value_meta() -> anyhow::Res key: key.clone(), seq: MatchSeq::GE(0), value: Operation::Update(b"value_meta_bar".to_vec()), - value_meta: Some(KVMeta::new_expire(now + 10)), + value_meta: Some(MetaSpec::new_expire(now + 10)), }), &mut t, None, @@ -323,7 +331,7 @@ async fn test_state_machine_apply_non_dup_generic_kv_value_meta() -> anyhow::Res key: key.clone(), seq: MatchSeq::GE(0), value: Operation::AsIs, - value_meta: Some(KVMeta::new_expire(now + 20)), + value_meta: Some(MetaSpec::new_expire(now + 20)), }), &mut t, None, diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs index d14edcd12104b..bd09d9bd4b4b1 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_kv_read_v1.rs @@ -23,9 +23,9 @@ use common_meta_kvapi::kvapi::KVApi; use common_meta_kvapi::kvapi::ListKVReq; use common_meta_kvapi::kvapi::MGetKVReq; use common_meta_kvapi::kvapi::UpsertKVReq; +use common_meta_types::cmd::MetaSpec; use common_meta_types::protobuf as pb; use common_meta_types::protobuf::KvMeta; -use common_meta_types::KVMeta; use common_meta_types::SeqV; use common_meta_types::With; use futures::stream::StreamExt; @@ -80,7 +80,7 @@ async fn initialize_kvs(client: &Arc, now_sec: u64) -> anyhow::Res info!("--- prepare keys: a(meta),c,c1,c2"); let updates = vec![ - UpsertKVReq::insert("a", &b("a")).with(KVMeta::new_expire(now_sec + 10)), + UpsertKVReq::insert("a", &b("a")).with(MetaSpec::new_expire(now_sec + 10)), UpsertKVReq::insert("c", &b("c")), UpsertKVReq::insert("c1", &b("c1")), UpsertKVReq::insert("c2", &b("c2")), diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs index 17958ceeff178..fe13185aaa848 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs @@ -269,6 +269,7 @@ async fn test_watch() -> anyhow::Result<()> { value: txn_val.clone(), prev_value: true, expire_at: None, + ttl_ms: None, })), }, TxnOp { diff --git a/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs b/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs index d9d93ccffee3a..dc0b0351e8909 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_kv_api_expire.rs @@ -16,6 +16,7 @@ use std::time::Duration; use common_base::base::tokio::time::sleep; use common_meta_kvapi::kvapi::KVApi; +use common_meta_types::cmd::MetaSpec; use common_meta_types::Cmd; use common_meta_types::KVMeta; use common_meta_types::LogEntry; @@ -58,7 +59,7 @@ async fn test_meta_node_replicate_kv_with_expire() -> anyhow::Result<()> { info!("--- write a kv expiring in 3 sec"); { - let upsert = UpsertKV::update(key, key.as_bytes()).with(KVMeta::new_expire(now_sec + 3)); + let upsert = UpsertKV::update(key, key.as_bytes()).with(MetaSpec::new_expire(now_sec + 3)); leader.write(LogEntry::new(Cmd::UpsertKV(upsert))).await?; log_index += 1; @@ -76,7 +77,7 @@ async fn test_meta_node_replicate_kv_with_expire() -> anyhow::Result<()> { { let upsert = UpsertKV::update(key, value2.as_bytes()) .with(MatchSeq::Exact(seq)) - .with(KVMeta::new_expire(now_sec + 1000)); + .with(MetaSpec::new_expire(now_sec + 1000)); leader.write(LogEntry::new(Cmd::UpsertKV(upsert))).await?; log_index += 1; } diff --git a/src/meta/types/proto/request.proto b/src/meta/types/proto/request.proto index 5acb948567642..e2b21862dbcda 100644 --- a/src/meta/types/proto/request.proto +++ b/src/meta/types/proto/request.proto @@ -38,11 +38,20 @@ message TxnGetResponse { // Put request and response message TxnPutRequest { string key = 1; + bytes value = 2; + // if or not return the prev value bool prev_value = 3; + // expire time optional uint64 expire_at = 4; + + // Time to last in milliseconds. + // + // TTL is the relative expire time, since the raft-log applied. + // If `ttl_ms` is set, `expire_at` is ignored. + optional uint64 ttl_ms = 5; } message TxnPutResponse { diff --git a/src/meta/types/src/cmd/cmd_context.rs b/src/meta/types/src/cmd/cmd_context.rs new file mode 100644 index 0000000000000..68d3c7d063981 --- /dev/null +++ b/src/meta/types/src/cmd/cmd_context.rs @@ -0,0 +1,38 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::Time; + +/// A context used when executing a [`Cmd`], to provide additional environment information. +/// +/// [`Cmd`]: crate::Cmd +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct CmdContext { + time: Time, +} + +impl CmdContext { + pub fn from_millis(millis: u64) -> Self { + Self::new(Time::from_millis(millis)) + } + + pub fn new(time: Time) -> Self { + CmdContext { time } + } + + /// Returns the time since 1970-01-01 when this log is proposed by the leader. + pub fn time(&self) -> Time { + self.time + } +} diff --git a/src/meta/types/src/cmd/meta_spec.rs b/src/meta/types/src/cmd/meta_spec.rs new file mode 100644 index 0000000000000..c429e49b51192 --- /dev/null +++ b/src/meta/types/src/cmd/meta_spec.rs @@ -0,0 +1,132 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use crate::cmd::CmdContext; +use crate::seq_value::KVMeta; +use crate::time::Interval; + +/// Specifies the metadata associated with a kv record, used in an `upsert` cmd. +/// +/// This is similar to [`KVMeta`] but differs, [`KVMeta`] is used in storage, +/// as this instance is employed for transport purposes. +/// When an `upsert` cmd is applied, this instance is evaluated and a `KVMeta` is built. +#[derive(serde::Serialize, serde::Deserialize, Debug, Default, Clone, Eq, PartialEq)] +pub struct MetaSpec { + /// expiration time in second since 1970 + pub(crate) expire_at: Option, + + /// Relative expiration time interval since when the raft log is applied. + /// + /// Use this field if possible to avoid the clock skew between client and meta-service. + /// `expire_at` may already be expired when it is applied to state machine. + /// + /// If it is not None, once applied, the `expire_at` field will be replaced with the calculated absolute expiration time. + /// + /// For backward compatibility, this field is not serialized if it `None`, as if it does not exist. + #[serde(skip_serializing_if = "Option::is_none")] + pub(crate) ttl: Option, +} + +impl MetaSpec { + /// Create a new KVMeta + pub fn new(expire_at: Option) -> Self { + Self { + expire_at, + ttl: None, + } + } + + /// Create a KVMeta with a absolute expiration time in second since 1970-01-01. + pub fn new_expire(expire_at_sec: u64) -> Self { + Self { + expire_at: Some(expire_at_sec), + ttl: None, + } + } + + /// Create a KVMeta with relative expiration time(ttl). + pub fn new_ttl(ttl: Duration) -> Self { + Self { + expire_at: None, + ttl: Some(Interval::from_duration(ttl)), + } + } + + /// Convert meta spec into a [`KVMeta`] to be stored in storage. + pub fn to_kv_meta(&self, cmd_ctx: &CmdContext) -> KVMeta { + // If `ttl` is set, override `expire_at` + if let Some(ttl) = self.ttl { + return KVMeta::new_expire((cmd_ctx.time() + ttl).seconds()); + } + + // No `ttl`, check if absolute expire time `expire_at` is set. + KVMeta::new(self.expire_at) + } + + /// Returns expire time in millisecond since 1970. + pub fn get_expire_at_ms(&self) -> Option { + self.expire_at.map(|t| t * 1000) + } + + /// Evaluate and returns the absolute expire time in millisecond since 1970. + /// + /// If there is no expire time, return u64::MAX. + pub fn eval_expire_at_ms(&self) -> u64 { + match self.expire_at { + None => u64::MAX, + Some(exp_at_sec) => exp_at_sec * 1000, + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::MetaSpec; + use crate::cmd::CmdContext; + use crate::KVMeta; + use crate::Time; + + #[test] + fn test_serde() { + let meta = MetaSpec::new_expire(100); + let s = serde_json::to_string(&meta).unwrap(); + assert_eq!(r#"{"expire_at":100}"#, s); + + let got: KVMeta = serde_json::from_str(&s).unwrap(); + assert_eq!(Some(100), got.expire_at); + + let meta = MetaSpec::new_ttl(Duration::from_millis(100)); + let s = serde_json::to_string(&meta).unwrap(); + assert_eq!(r#"{"expire_at":null,"ttl_ms":100}"#, s); + } + + #[test] + fn test_to_kv_meta() { + let cmd_ctx = CmdContext::new(Time::from_millis(2000)); + + // ttl + let meta = MetaSpec::new_ttl(Duration::from_millis(1000)); + let kv_meta = meta.to_kv_meta(&cmd_ctx); + assert_eq!(kv_meta.get_expire_at_ms().unwrap(), 3000); + + // expire_at + let meta = MetaSpec::new_expire(5); + let kv_meta = meta.to_kv_meta(&cmd_ctx); + assert_eq!(kv_meta.get_expire_at_ms().unwrap(), 5_000); + } +} diff --git a/src/meta/types/src/cmd.rs b/src/meta/types/src/cmd/mod.rs similarity index 86% rename from src/meta/types/src/cmd.rs rename to src/meta/types/src/cmd/mod.rs index 00e02899640a3..60a955ccdf705 100644 --- a/src/meta/types/src/cmd.rs +++ b/src/meta/types/src/cmd/mod.rs @@ -13,18 +13,24 @@ // limitations under the License. use std::fmt; +use std::time::Duration; use serde::Deserialize; use serde::Serialize; use crate::with::With; -use crate::KVMeta; use crate::MatchSeq; use crate::Node; use crate::NodeId; use crate::Operation; use crate::TxnRequest; +mod cmd_context; +mod meta_spec; + +pub use cmd_context::CmdContext; +pub use meta_spec::MetaSpec; + /// A Cmd describes what a user want to do to raft state machine /// and is the essential part of a raft log. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] @@ -63,7 +69,7 @@ pub struct UpsertKV { pub value: Operation>, /// Meta data of a value. - pub value_meta: Option, + pub value_meta: Option, } impl fmt::Display for Cmd { @@ -108,7 +114,7 @@ impl UpsertKV { key: &str, seq: MatchSeq, value: Operation>, - value_meta: Option, + value_meta: Option, ) -> Self { Self { key: key.to_string(), @@ -146,9 +152,13 @@ impl UpsertKV { } pub fn with_expire_sec(self, expire_at_sec: u64) -> Self { - self.with(KVMeta { - expire_at: Some(expire_at_sec), - }) + self.with(MetaSpec::new_expire(expire_at_sec)) + } + + /// Set the time to last for the value. + /// When the ttl is passed, the value is deleted. + pub fn with_ttl(self, ttl: Duration) -> Self { + self.with(MetaSpec::new_ttl(ttl)) } pub fn get_expire_at_ms(&self) -> Option { @@ -158,6 +168,14 @@ impl UpsertKV { None } } + + pub fn eval_expire_at_ms(&self) -> u64 { + if let Some(meta) = &self.value_meta { + meta.eval_expire_at_ms() + } else { + u64::MAX + } + } } impl With for UpsertKV { @@ -167,8 +185,8 @@ impl With for UpsertKV { } } -impl With for UpsertKV { - fn with(mut self, meta: KVMeta) -> Self { +impl With for UpsertKV { + fn with(mut self, meta: MetaSpec) -> Self { self.value_meta = Some(meta); self } diff --git a/src/meta/types/src/lib.rs b/src/meta/types/src/lib.rs index 9736cba79f26b..cdef6f873578f 100644 --- a/src/meta/types/src/lib.rs +++ b/src/meta/types/src/lib.rs @@ -21,7 +21,7 @@ mod applied_state; mod change; mod cluster; -mod cmd; +pub mod cmd; pub mod config; mod endpoint; pub mod errors; @@ -36,6 +36,7 @@ mod raft_types; mod seq_errors; mod seq_num; mod seq_value; +mod time; mod with; mod proto_display; @@ -104,6 +105,8 @@ pub use seq_value::IntoSeqV; pub use seq_value::KVMeta; pub use seq_value::SeqV; pub use seq_value::SeqValue; +pub use time::Interval; +pub use time::Time; pub use with::With; pub use crate::raft_snapshot_data::SnapshotData; diff --git a/src/meta/types/src/proto_ext/seq_v_ext.rs b/src/meta/types/src/proto_ext/seq_v_ext.rs index c0fb9df92c1f8..3c6dded06e232 100644 --- a/src/meta/types/src/proto_ext/seq_v_ext.rs +++ b/src/meta/types/src/proto_ext/seq_v_ext.rs @@ -19,7 +19,7 @@ use crate::SeqV; impl From for pb::KvMeta { fn from(m: KVMeta) -> Self { Self { - expire_at: m.get_expire_at_sec(), + expire_at: m.get_expire_at_ms().map(|x| x / 1000), } } } diff --git a/src/meta/types/src/proto_ext/txn_ext.rs b/src/meta/types/src/proto_ext/txn_ext.rs index 182e6e3e1cb33..b96fed56078fa 100644 --- a/src/meta/types/src/proto_ext/txn_ext.rs +++ b/src/meta/types/src/proto_ext/txn_ext.rs @@ -43,6 +43,22 @@ impl pb::TxnOp { value, prev_value: true, expire_at, + ttl_ms: None, + })), + } + } + + /// Create a txn operation that puts a record with ttl. + /// + /// `ttl` is relative expire time while `expire_at` is absolute expire time. + pub fn put_with_ttl(key: impl ToString, value: Vec, ttl_ms: Option) -> pb::TxnOp { + pb::TxnOp { + request: Some(pb::txn_op::Request::Put(pb::TxnPutRequest { + key: key.to_string(), + value, + prev_value: true, + expire_at: None, + ttl_ms, })), } } diff --git a/src/meta/types/src/seq_value.rs b/src/meta/types/src/seq_value.rs index 4fb08c2c32f1d..3190b7259adf8 100644 --- a/src/meta/types/src/seq_value.rs +++ b/src/meta/types/src/seq_value.rs @@ -62,18 +62,13 @@ impl KVMeta { Self { expire_at } } - /// Create a new KVMeta with expiration time in second since 1970 + /// Create a KVMeta with a absolute expiration time in second since 1970-01-01. pub fn new_expire(expire_at: u64) -> Self { Self { expire_at: Some(expire_at), } } - /// Returns expire time in second since 1970. - pub fn get_expire_at_sec(&self) -> Option { - self.expire_at - } - /// Returns expire time in millisecond since 1970. pub fn get_expire_at_ms(&self) -> Option { self.expire_at.map(|t| t * 1000) diff --git a/src/meta/types/src/time.rs b/src/meta/types/src/time.rs new file mode 100644 index 0000000000000..702a90ec45049 --- /dev/null +++ b/src/meta/types/src/time.rs @@ -0,0 +1,194 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Add; +use std::ops::Sub; +use std::time::Duration; + +/// A interval of time. +/// +/// As a replacement of [`Duration`], which is not `serde`-able. +/// +/// `Interval` implements: `Interval +- Interval`. +#[derive( + serde::Serialize, + serde::Deserialize, + Debug, + Default, + Clone, + Copy, + Hash, + Eq, + PartialEq, + PartialOrd, + Ord, +)] +pub struct Interval { + pub(crate) millis: u64, +} + +impl Interval { + pub fn from_duration(duration: Duration) -> Self { + Self { + millis: duration.as_millis() as u64, + } + } + + pub fn from_millis(millis: u64) -> Self { + Self::from_duration(Duration::from_millis(millis)) + } + + pub fn from_secs(secs: u64) -> Self { + Self::from_duration(Duration::from_secs(secs)) + } + + pub fn millis(&self) -> u64 { + self.millis + } + + pub fn seconds(&self) -> u64 { + self.millis / 1000 + } +} + +impl Add for Interval { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + Self { + millis: self.millis.saturating_add(rhs.millis), + } + } +} + +impl Sub for Interval { + type Output = Self; + + fn sub(self, rhs: Self) -> Self::Output { + Self { + millis: self.millis.saturating_sub(rhs.millis), + } + } +} + +/// A time point since 1970-01-01. +/// +/// As a replacement of [`Instant`](std::time::Instant), which is not `serde`-able. +/// `Time` implements: `Time +- Interval = Time` and `Time - Time = Interval`. +#[derive( + serde::Serialize, + serde::Deserialize, + Debug, + Default, + Clone, + Copy, + Hash, + Eq, + PartialEq, + PartialOrd, + Ord, +)] +pub struct Time { + pub(crate) time: Interval, +} + +impl Time { + pub fn from_millis(millis: u64) -> Self { + Self { + time: Interval::from_millis(millis), + } + } + + pub fn from_secs(secs: u64) -> Self { + Self { + time: Interval::from_secs(secs), + } + } + + pub fn millis(&self) -> u64 { + self.time.millis() + } + + pub fn seconds(&self) -> u64 { + self.time.seconds() + } +} + +impl Add for Time { + type Output = Self; + + fn add(self, rhs: Interval) -> Self::Output { + Self { + time: self.time + rhs, + } + } +} + +impl Sub for Time { + type Output = Self; + + fn sub(self, rhs: Interval) -> Self::Output { + Self { + time: self.time - rhs, + } + } +} + +impl Sub for Time { + type Output = Interval; + + fn sub(self, rhs: Self) -> Self::Output { + self.time - rhs.time + } +} + +#[cfg(test)] +mod tests { + use super::Interval; + use crate::time::Time; + + #[test] + fn test_interval() { + let interval = Interval::from_millis(1000); + assert_eq!(interval.millis(), 1000); + assert_eq!(interval.seconds(), 1); + + let interval = Interval::from_secs(1); + assert_eq!(interval.millis(), 1000); + assert_eq!(interval.seconds(), 1); + + assert_eq!(interval + interval, Interval::from_millis(2000)); + assert_eq!(interval - interval, Interval::from_millis(0)); + assert_eq!( + interval - Interval::from_millis(1500), + Interval::from_millis(0) + ); + } + + #[test] + fn test_time() { + let time = Time::from_millis(1000); + assert_eq!(time.millis(), 1000); + assert_eq!(time.seconds(), 1); + + let time = Time::from_secs(1); + assert_eq!(time.millis(), 1000); + assert_eq!(time.seconds(), 1); + + assert_eq!(time + Interval::from_millis(1000), Time::from_millis(2000)); + assert_eq!(time - Interval::from_millis(500), Time::from_millis(500)); + assert_eq!(time - Time::from_millis(500), Interval::from_millis(500)); + assert_eq!(time - Time::from_millis(1500), Interval::from_millis(0)); + } +} diff --git a/src/query/management/src/cluster/cluster_mgr.rs b/src/query/management/src/cluster/cluster_mgr.rs index 6f6598e680ba6..91e2e7c6a714c 100644 --- a/src/query/management/src/cluster/cluster_mgr.rs +++ b/src/query/management/src/cluster/cluster_mgr.rs @@ -24,7 +24,7 @@ use common_meta_kvapi::kvapi::KVApi; use common_meta_kvapi::kvapi::UpsertKVReply; use common_meta_kvapi::kvapi::UpsertKVReq; use common_meta_store::MetaStore; -use common_meta_types::KVMeta; +use common_meta_types::cmd::MetaSpec; use common_meta_types::MatchSeq; use common_meta_types::NodeInfo; use common_meta_types::Operation; @@ -65,14 +65,14 @@ impl ClusterMgr { }) } - fn new_lift_time(&self) -> KVMeta { + fn new_lift_time(&self) -> MetaSpec { let now = std::time::SystemTime::now(); let expire_at = now .add(self.lift_time) .duration_since(UNIX_EPOCH) .expect("Time went backwards"); - KVMeta::new_expire(expire_at.as_secs()) + MetaSpec::new_expire(expire_at.as_secs()) } } diff --git a/src/query/management/tests/it/cluster.rs b/src/query/management/tests/it/cluster.rs index 266fa565911f9..5b64afad4a060 100644 --- a/src/query/management/tests/it/cluster.rs +++ b/src/query/management/tests/it/cluster.rs @@ -14,7 +14,6 @@ use std::sync::Arc; use std::time::Duration; -use std::time::UNIX_EPOCH; use common_base::base::tokio; use common_exception::Result; @@ -28,7 +27,7 @@ use common_meta_types::SeqV; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_successfully_add_node() -> Result<()> { - let current_time = current_seconds_time(); + let now_ms = SeqV::<()>::now_ms(); let (kv_api, cluster_api) = new_cluster_api().await?; let node_info = create_test_node_info(); @@ -43,7 +42,7 @@ async fn test_successfully_add_node() -> Result<()> { meta, data: value, }) => { - assert!(meta.unwrap().get_expire_at_sec().unwrap() - current_time >= 60); + assert!(meta.unwrap().get_expire_at_ms().unwrap() - now_ms >= 59_000); assert_eq!(value, serde_json::to_vec(&node_info)?); } catch => panic!("GetKVActionReply{:?}", catch), @@ -116,7 +115,7 @@ async fn test_unknown_node_drop_node() -> Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_successfully_heartbeat_node() -> Result<()> { - let current_time = current_seconds_time(); + let now_ms = SeqV::<()>::now_ms(); let (kv_api, cluster_api) = new_cluster_api().await?; let node_info = create_test_node_info(); @@ -126,26 +125,21 @@ async fn test_successfully_heartbeat_node() -> Result<()> { .get_kv("__fd_clusters/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node") .await?; - assert!(value.unwrap().meta.unwrap().get_expire_at_sec().unwrap() - current_time >= 60); + let meta = value.unwrap().meta.unwrap(); + let expire_ms = meta.get_expire_at_ms().unwrap(); + assert!(expire_ms - now_ms >= 59_000); - let current_time = current_seconds_time(); + let now_ms = SeqV::<()>::now_ms(); cluster_api.heartbeat(&node_info, MatchSeq::GE(1)).await?; let value = kv_api .get_kv("__fd_clusters/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node") .await?; - assert!(value.unwrap().meta.unwrap().get_expire_at_sec().unwrap() - current_time >= 60); + assert!(value.unwrap().meta.unwrap().get_expire_at_ms().unwrap() - now_ms >= 59_000); Ok(()) } -fn current_seconds_time() -> u64 { - let now = std::time::SystemTime::now(); - now.duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_secs() -} - fn create_test_node_info() -> NodeInfo { NodeInfo { id: String::from("test_node"),