Skip to content

Commit

Permalink
feat: Introduced MetaSpec for setting relative expiration
Browse files Browse the repository at this point in the history
This is a compatible change with new feature provided.

- This commit introduces a new type, `MetaSpec`, which specifies content
  of the `KVMeta` to be stored for a key. This new type is particularly
  utilized in `upsert` requests to enable the setting of relative
  expiration times.

  Previously, the `KVMeta` type was used directly for this purpose.
  However, to avoid impacting existing storage data types, `MetaSpec` has
  been specifically added for use in `upsert` operations. When applying a
  raft-log, a `KVMeta` is built from `MetaSpec`.

  Designed with backward compatibility, `MetaSpec` maintains a serialized
  format compatible with `KVMeta`, ensuring no disruption to existing
  functionality.

- We introduce two new types `Time` and `Interval` to reprensent
  serde-able time stamp and time interval.

- Tests are added to ensure meta-service works correctly with API with
  ttl support, but databend-query does not use these API yet.
  • Loading branch information
drmingdrmer committed Dec 15, 2023
1 parent 8f4a558 commit 441d209
Show file tree
Hide file tree
Showing 20 changed files with 593 additions and 83 deletions.
4 changes: 2 additions & 2 deletions src/meta/api/src/background_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ use common_meta_app::background::UpdateBackgroundTaskReq;
use common_meta_kvapi::kvapi;
use common_meta_kvapi::kvapi::Key;
use common_meta_kvapi::kvapi::UpsertKVReq;
use common_meta_types::cmd::MetaSpec;
use common_meta_types::ConditionResult::Eq;
use common_meta_types::InvalidReply;
use common_meta_types::KVMeta;
use common_meta_types::MatchSeq;
use common_meta_types::MatchSeq::Any;
use common_meta_types::MetaError;
Expand Down Expand Up @@ -243,7 +243,7 @@ impl<KV: kvapi::KVApi<Error = MetaError>> BackgroundApi for KV {
name_key.to_string_key().as_str(),
Any,
Operation::Update(serialize_struct(&meta)?),
Some(KVMeta::new_expire(req.expire_at)),
Some(MetaSpec::new_expire(req.expire_at)),
))
.await?;
// confirm a successful update
Expand Down
122 changes: 98 additions & 24 deletions src/meta/kvapi/src/kvapi/test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use std::time::Duration;

use common_meta_types::cmd::MetaSpec;
use common_meta_types::protobuf as pb;
use common_meta_types::txn_condition;
use common_meta_types::txn_op;
Expand All @@ -40,6 +40,7 @@ use common_meta_types::TxnRequest;
use common_meta_types::With;
use log::debug;
use log::info;
use minitrace::full_name;
use minitrace::func_name;

use crate::kvapi;
Expand All @@ -58,11 +59,13 @@ impl kvapi::TestSuite {
self.kv_delete(&builder.build().await).await?;
self.kv_update(&builder.build().await).await?;
self.kv_timeout(&builder.build().await).await?;
self.kv_upsert_with_ttl(&builder.build().await).await?;
self.kv_meta(&builder.build().await).await?;
self.kv_list(&builder.build().await).await?;
self.kv_mget(&builder.build().await).await?;
self.kv_txn_absent_seq_0(&builder.build().await).await?;
self.kv_transaction(&builder.build().await).await?;
self.kv_transaction_with_ttl(&builder.build().await).await?;
self.kv_transaction_delete_match_seq_none(&builder.build().await)
.await?;
self.kv_transaction_delete_match_seq_some_not_match(&builder.build().await)
Expand Down Expand Up @@ -248,13 +251,10 @@ impl kvapi::TestSuite {
// - Test list expired and non-expired.
// - Test update with a new expire value.

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let now_sec = SeqV::<()>::now_sec();

let _res = kv
.upsert_kv(UpsertKVReq::update("k1", b"v1").with(KVMeta::new_expire(now + 2)))
.upsert_kv(UpsertKVReq::update("k1", b"v1").with(MetaSpec::new_expire(now_sec + 2)))
.await?;
// dbg!("upsert non expired k1", _res);

Expand All @@ -267,17 +267,14 @@ impl kvapi::TestSuite {

info!("---get expired");
{
tokio::time::sleep(tokio::time::Duration::from_millis(3000)).await;
tokio::time::sleep(Duration::from_millis(3000)).await;
let res = kv.get_kv("k1").await?;
// dbg!("k1 expired", &res);
debug!("got k1:{:?}", res);
assert!(res.is_none(), "got expired");
}

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let now_sec = SeqV::<()>::now_sec();

info!("--- expired entry act as if it does not exist, an ADD op should apply");
{
Expand All @@ -286,7 +283,7 @@ impl kvapi::TestSuite {
.upsert_kv(
UpsertKVReq::update("k1", b"v1")
.with(MatchSeq::Exact(0))
.with(KVMeta::new_expire(now - 1)),
.with(MetaSpec::new_expire(now_sec - 1)),
)
.await?;
// dbg!("update expired k1", _res);
Expand All @@ -295,7 +292,7 @@ impl kvapi::TestSuite {
.upsert_kv(
UpsertKVReq::update("k2", b"v2")
.with(MatchSeq::Exact(0))
.with(KVMeta::new_expire(now + 10)),
.with(MetaSpec::new_expire(now_sec + 10)),
)
.await?;
// dbg!("update non expired k2", _res);
Expand All @@ -306,7 +303,7 @@ impl kvapi::TestSuite {
None,
Some(SeqV::with_meta(
3,
Some(KVMeta::new_expire(now + 10)),
Some(KVMeta::new_expire(now_sec + 10)),
b"v2".to_vec()
))
]);
Expand All @@ -325,7 +322,7 @@ impl kvapi::TestSuite {
kv.upsert_kv(
UpsertKVReq::update("k2", b"v2")
.with(MatchSeq::Exact(3))
.with(KVMeta::new_expire(now - 1)),
.with(MetaSpec::new_expire(now_sec - 1)),
)
.await?;

Expand All @@ -336,16 +333,42 @@ impl kvapi::TestSuite {
Ok(())
}

#[minitrace::trace]
pub async fn kv_upsert_with_ttl<KV: kvapi::KVApi>(&self, kv: &KV) -> anyhow::Result<()> {
// - Add with ttl

info!("--- {}", full_name!());

let _res = kv
.upsert_kv(
UpsertKVReq::update("k1", b"v1")
.with(MetaSpec::new_ttl(Duration::from_millis(2_000))),
)
.await?;

info!("---get unexpired");
{
let res = kv.get_kv("k1").await?;
assert!(res.is_some(), "got unexpired");
}

info!("---get expired");
{
tokio::time::sleep(Duration::from_millis(2_100)).await;
let res = kv.get_kv("k1").await?;
assert!(res.is_none(), "got expired");
}

Ok(())
}

#[minitrace::trace]
pub async fn kv_meta<KV: kvapi::KVApi>(&self, kv: &KV) -> anyhow::Result<()> {
info!("--- kvapi::KVApiTestSuite::kv_meta() start");

let test_key = "test_key_for_update_meta";

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let now_sec = SeqV::<()>::now_sec();

let r = kv.upsert_kv(UpsertKVReq::update(test_key, b"v1")).await?;
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.result);
Expand All @@ -358,7 +381,7 @@ impl kvapi::TestSuite {
test_key,
MatchSeq::Exact(seq + 1),
Operation::AsIs,
Some(KVMeta::new_expire(now + 20)),
Some(MetaSpec::new_expire(now_sec + 20)),
))
.await?;
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.prev);
Expand All @@ -371,14 +394,14 @@ impl kvapi::TestSuite {
test_key,
MatchSeq::Exact(seq),
Operation::AsIs,
Some(KVMeta::new_expire(now + 20)),
Some(MetaSpec::new_expire(now_sec + 20)),
))
.await?;
assert_eq!(Some(SeqV::with_meta(1, None, b"v1".to_vec())), r.prev);
assert_eq!(
Some(SeqV::with_meta(
2,
Some(KVMeta::new_expire(now + 20)),
Some(KVMeta::new_expire(now_sec + 20)),
b"v1".to_vec()
)),
r.result
Expand All @@ -388,7 +411,11 @@ impl kvapi::TestSuite {
let key_value = kv.get_kv(test_key).await?;
assert!(key_value.is_some());
assert_eq!(
SeqV::with_meta(seq + 1, Some(KVMeta::new_expire(now + 20)), b"v1".to_vec()),
SeqV::with_meta(
seq + 1,
Some(KVMeta::new_expire(now_sec + 20)),
b"v1".to_vec()
),
key_value.unwrap(),
);

Expand Down Expand Up @@ -484,6 +511,7 @@ impl kvapi::TestSuite {
value: b"new_v1".to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
}];

Expand Down Expand Up @@ -645,6 +673,7 @@ impl kvapi::TestSuite {
value: b"new_v1".to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
}];

Expand Down Expand Up @@ -697,6 +726,7 @@ impl kvapi::TestSuite {
value: b"new_v1".to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
}];

Expand Down Expand Up @@ -752,6 +782,7 @@ impl kvapi::TestSuite {
value: val1_new.to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
},
// change k2
Expand All @@ -761,6 +792,7 @@ impl kvapi::TestSuite {
value: b"new_v2".to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
},
// get k1
Expand Down Expand Up @@ -870,6 +902,7 @@ impl kvapi::TestSuite {
value: val1_new.to_vec(),
prev_value: true,
expire_at: None,
ttl_ms: None,
})),
},
// get k1
Expand Down Expand Up @@ -915,6 +948,43 @@ impl kvapi::TestSuite {
Ok(())
}

#[minitrace::trace]
pub async fn kv_transaction_with_ttl<KV: kvapi::KVApi>(&self, kv: &KV) -> anyhow::Result<()> {
// - Add a record via transaction with ttl

info!("--- {}", full_name!());

let txn = TxnRequest {
condition: vec![],
if_then: vec![TxnOp::put_with_ttl("k1", b("v1"), Some(2_000))],
else_then: vec![],
};

let _resp = kv.transaction(txn).await?;

let _res = kv
.upsert_kv(
UpsertKVReq::update("k1", b"v1")
.with(MetaSpec::new_ttl(Duration::from_millis(2_000))),
)
.await?;

info!("---get unexpired");
{
let res = kv.get_kv("k1").await?;
assert!(res.is_some(), "got unexpired");
}

info!("---get expired");
{
tokio::time::sleep(Duration::from_millis(2_100)).await;
let res = kv.get_kv("k1").await?;
assert!(res.is_none(), "got expired");
}

Ok(())
}

/// If `TxnDeleteRequest.match_seq` is not set,
/// the delete operation will always be executed.
pub async fn kv_transaction_delete_match_seq_none<KV: kvapi::KVApi>(
Expand Down Expand Up @@ -1086,3 +1156,7 @@ impl kvapi::TestSuite {
Ok(())
}
}

fn b(s: &str) -> Vec<u8> {
s.as_bytes().to_vec()
}
16 changes: 13 additions & 3 deletions src/meta/raft-store/src/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::io;
use std::time::Duration;
use std::time::SystemTime;

use common_meta_types::cmd::CmdContext;
use common_meta_types::cmd::MetaSpec;
use common_meta_types::protobuf as pb;
use common_meta_types::txn_condition;
use common_meta_types::txn_op;
Expand All @@ -26,7 +28,6 @@ use common_meta_types::Cmd;
use common_meta_types::ConditionResult;
use common_meta_types::Entry;
use common_meta_types::EntryPayload;
use common_meta_types::KVMeta;
use common_meta_types::MatchSeq;
use common_meta_types::Node;
use common_meta_types::SeqV;
Expand Down Expand Up @@ -61,6 +62,9 @@ use crate::sm_v002::SMV002;
pub struct Applier<'a> {
sm: &'a mut SMV002,

/// The context of the current applying log.
cmd_ctx: CmdContext,

/// The changes has been made by the applying one log entry
changes: Vec<Change<Vec<u8>, String>>,
}
Expand All @@ -69,6 +73,7 @@ impl<'a> Applier<'a> {
pub fn new(sm: &'a mut SMV002) -> Self {
Self {
sm,
cmd_ctx: CmdContext::from_millis(0),
changes: Vec::new(),
}
}
Expand All @@ -83,6 +88,8 @@ impl<'a> Applier<'a> {
let log_id = &entry.log_id;
let log_time_ms = Self::get_log_time(entry);

self.cmd_ctx = CmdContext::from_millis(log_time_ms);

self.clean_expired_kvs(log_time_ms).await?;

*self.sm.sys_data_mut().last_applied_mut() = Some(*log_id);
Expand Down Expand Up @@ -208,7 +215,10 @@ impl<'a> Applier<'a> {
) -> Result<(Option<SeqV>, Option<SeqV>), io::Error> {
debug!(upsert_kv = as_debug!(upsert_kv); "upsert_kv");

let (prev, result) = self.sm.upsert_kv_primary_index(upsert_kv).await?;
let (prev, result) = self
.sm
.upsert_kv_primary_index(upsert_kv, &self.cmd_ctx)
.await?;

self.sm
.update_expire_index(&upsert_kv.key, &prev, &result)
Expand Down Expand Up @@ -381,7 +391,7 @@ impl<'a> Applier<'a> {
put: &TxnPutRequest,
resp: &mut TxnReply,
) -> Result<(), io::Error> {
let upsert = UpsertKV::update(&put.key, &put.value).with(KVMeta::new(put.expire_at));
let upsert = UpsertKV::update(&put.key, &put.value).with(MetaSpec::new(put.expire_at));

let (prev, _result) = self.upsert_kv(&upsert).await?;

Expand Down
Loading

0 comments on commit 441d209

Please sign in to comment.