diff --git a/src/meta/raft-store/src/lib.rs b/src/meta/raft-store/src/lib.rs index f26f60b358a1..0096002d5aa6 100644 --- a/src/meta/raft-store/src/lib.rs +++ b/src/meta/raft-store/src/lib.rs @@ -14,10 +14,9 @@ #![allow(clippy::uninlined_format_args)] #![feature(impl_trait_in_assoc_type)] -// #![feature(type_alias_impl_trait)] +#![feature(return_position_impl_trait_in_trait)] // #![allow(incomplete_features)] -// #![feature(return_position_impl_trait_in_trait)] pub mod applier; pub(crate) mod compat07; diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/level.rs b/src/meta/raft-store/src/sm_v002/leveled_store/level.rs index 7142d3dbc39a..737d0c2491d0 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/level.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/level.rs @@ -20,6 +20,7 @@ use common_meta_types::KVMeta; use futures_util::stream::BoxStream; use futures_util::StreamExt; +use crate::sm_v002::leveled_store::map_api::AsMap; use crate::sm_v002::leveled_store::map_api::MapApi; use crate::sm_v002::leveled_store::map_api::MapApiRO; use crate::sm_v002::leveled_store::map_api::MapKey; @@ -126,7 +127,7 @@ impl MapApi for Level { Marked::new_tomb_stone(seq) }; - let prev = MapApiRO::::get(&*self, key.as_str()).await; + let prev = (*self).str_map().get(&key).await; self.kv.insert(key, marked.clone()); (prev, marked) } @@ -180,7 +181,7 @@ impl MapApi for Level { Marked::TombStone { internal_seq: seq } }; - let prev = MapApiRO::::get(&*self, &key).await; + let prev = (*self).expire_map().get(&key).await; self.expire.insert(key, marked.clone()); (prev, marked) } diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs b/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs index eb96d25dc084..a502ca697383 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map.rs @@ -96,6 +96,7 @@ impl LeveledMap { RefMut::new(&mut self.writable, &self.frozen) } + #[allow(dead_code)] pub(crate) fn to_ref(&self) -> Ref { Ref::new(Some(&self.writable), &self.frozen) } @@ -143,7 +144,5 @@ where { let mut l = self.to_ref_mut(); MapApi::set(&mut l, key, value).await - - // (&mut l).set(key, value).await } } diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map_test.rs b/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map_test.rs index 13ed09d2bee9..920adffe2ea4 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map_test.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/leveled_map_test.rs @@ -16,6 +16,7 @@ use common_meta_types::KVMeta; use futures_util::StreamExt; use crate::sm_v002::leveled_store::leveled_map::LeveledMap; +use crate::sm_v002::leveled_store::map_api::AsMap; use crate::sm_v002::leveled_store::map_api::MapApi; use crate::sm_v002::leveled_store::map_api::MapApiExt; use crate::sm_v002::leveled_store::map_api::MapApiRO; @@ -26,22 +27,19 @@ async fn test_freeze() -> anyhow::Result<()> { let mut l = LeveledMap::default(); // Insert an entry at level 0 - let (prev, result) = MapApi::::set(&mut l, s("a1"), Some((b("b0"), None))).await; + let (prev, result) = l.str_map_mut().set(s("a1"), Some((b("b0"), None))).await; assert_eq!(prev, Marked::new_tomb_stone(0)); assert_eq!(result, Marked::new_normal(1, b("b0"), None)); // Insert the same entry at level 1 l.freeze_writable(); - let (prev, result) = MapApi::::set(&mut l, s("a1"), Some((b("b1"), None))).await; + let (prev, result) = l.str_map_mut().set(s("a1"), Some((b("b1"), None))).await; assert_eq!(prev, Marked::new_normal(1, b("b0"), None)); assert_eq!(result, Marked::new_normal(2, b("b1"), None)); // Listing entries from all levels see the latest - let got = MapApiRO::::range(&l, s("")..) - .await - .collect::>() - .await; + let got = l.str_map().range(s("")..).await.collect::>().await; assert_eq!(got, vec![ // (s("a1"), Marked::new_normal(2, b("b1"), None)), @@ -50,7 +48,9 @@ async fn test_freeze() -> anyhow::Result<()> { // Listing from the base level sees the old value. let frozen = l.frozen_ref(); - let got = MapApiRO::::range(frozen, s("")..) + let got = frozen + .str_map() + .range(s("")..) .await .collect::>() .await; @@ -67,30 +67,30 @@ async fn test_single_level() -> anyhow::Result<()> { let mut l = LeveledMap::default(); // Write a1 - let (prev, result) = MapApi::::set(&mut l, s("a1"), Some((b("b1"), None))).await; + let (prev, result) = l.str_map_mut().set(s("a1"), Some((b("b1"), None))).await; assert_eq!(prev, Marked::new_tomb_stone(0)); assert_eq!(result, Marked::new_normal(1, b("b1"), None)); // Write more - let (_prev, result) = MapApi::::set(&mut l, s("a2"), Some((b("b2"), None))).await; + let (_prev, result) = l.str_map_mut().set(s("a2"), Some((b("b2"), None))).await; assert_eq!(result, Marked::new_normal(2, b("b2"), None)); - let (_prev, result) = MapApi::::set(&mut l, s("a3"), Some((b("b3"), None))).await; + let (_prev, result) = l.str_map_mut().set(s("a3"), Some((b("b3"), None))).await; assert_eq!(result, Marked::new_normal(3, b("b3"), None)); - let (_prev, result) = MapApi::::set(&mut l, s("x1"), Some((b("y1"), None))).await; + let (_prev, result) = l.str_map_mut().set(s("x1"), Some((b("y1"), None))).await; assert_eq!(result, Marked::new_normal(4, b("y1"), None)); - let (_prev, result) = MapApi::::set(&mut l, s("x2"), Some((b("y2"), None))).await; + let (_prev, result) = l.str_map_mut().set(s("x2"), Some((b("y2"), None))).await; assert_eq!(result, Marked::new_normal(5, b("y2"), None)); // Override a1 - let (prev, result) = MapApi::::set(&mut l, s("a1"), Some((b("b1"), None))).await; + let (prev, result) = l.str_map_mut().set(s("a1"), Some((b("b1"), None))).await; assert_eq!(prev, Marked::new_normal(1, b("b1"), None)); assert_eq!(result, Marked::new_normal(6, b("b1"), None)); // Delete a3 - let (prev, result) = MapApi::::set(&mut l, s("a3"), None).await; + let (prev, result) = l.str_map_mut().set(s("a3"), None).await; assert_eq!(prev, Marked::new_normal(3, b("b3"), None)); assert_eq!( result, @@ -99,7 +99,7 @@ async fn test_single_level() -> anyhow::Result<()> { ); // Range - let it = MapApiRO::::range(&l, s("")..).await; + let it = l.str_map().range(s("")..).await; let got = it.collect::>().await; assert_eq!(got, vec![ // @@ -111,10 +111,10 @@ async fn test_single_level() -> anyhow::Result<()> { ]); // Get - let got = MapApiRO::::get(&l, "a2").await; + let got = l.str_map().get("a2").await; assert_eq!(got, Marked::new_normal(2, b("b2"), None)); - let got = MapApiRO::::get(&l, "a3").await; + let got = l.str_map().get("a3").await; assert_eq!(got, Marked::new_tomb_stone(6)); Ok(()) } @@ -125,12 +125,12 @@ async fn test_two_levels() -> anyhow::Result<()> { let mut l = LeveledMap::default(); - MapApi::::set(&mut l, s("a1"), Some((b("b1"), None))).await; - MapApi::::set(&mut l, s("a2"), Some((b("b2"), None))).await; - MapApi::::set(&mut l, s("x1"), Some((b("y1"), None))).await; - MapApi::::set(&mut l, s("x2"), Some((b("y2"), None))).await; + l.str_map_mut().set(s("a1"), Some((b("b1"), None))).await; + l.str_map_mut().set(s("a2"), Some((b("b2"), None))).await; + l.str_map_mut().set(s("x1"), Some((b("y1"), None))).await; + l.str_map_mut().set(s("x2"), Some((b("y2"), None))).await; - let it = MapApiRO::::range(&l, s("")..).await; + let it = l.str_map().range(s("")..).await; let got = it.collect::>().await; assert_eq!(got, vec![ // @@ -145,27 +145,27 @@ async fn test_two_levels() -> anyhow::Result<()> { l.freeze_writable(); // Override - let (prev, result) = MapApi::::set(&mut l, s("a2"), Some((b("b3"), None))).await; + let (prev, result) = l.str_map_mut().set(s("a2"), Some((b("b3"), None))).await; assert_eq!(prev, Marked::new_normal(2, b("b2"), None)); assert_eq!(result, Marked::new_normal(5, b("b3"), None)); // Override again - let (prev, result) = MapApi::::set(&mut l, s("a2"), Some((b("b4"), None))).await; + let (prev, result) = l.str_map_mut().set(s("a2"), Some((b("b4"), None))).await; assert_eq!(prev, Marked::new_normal(5, b("b3"), None)); assert_eq!(result, Marked::new_normal(6, b("b4"), None)); // Delete by adding a tombstone - let (prev, result) = MapApi::::set(&mut l, s("a1"), None).await; + let (prev, result) = l.str_map_mut().set(s("a1"), None).await; assert_eq!(prev, Marked::new_normal(1, b("b1"), None)); assert_eq!(result, Marked::new_tomb_stone(6)); // Override tombstone - let (prev, result) = MapApi::::set(&mut l, s("a1"), Some((b("b5"), None))).await; + let (prev, result) = l.str_map_mut().set(s("a1"), Some((b("b5"), None))).await; assert_eq!(prev, Marked::new_tomb_stone(6)); assert_eq!(result, Marked::new_normal(7, b("b5"), None)); // Range - let it = MapApiRO::::range(&l, s("")..).await; + let it = l.str_map().range(s("")..).await; let got = it.collect::>().await; assert_eq!(got, vec![ // @@ -177,20 +177,20 @@ async fn test_two_levels() -> anyhow::Result<()> { // Get - let got = MapApiRO::::get(&l, "a1").await; + let got = l.str_map().get("a1").await; assert_eq!(got, Marked::new_normal(7, b("b5"), None)); - let got = MapApiRO::::get(&l, "a2").await; + let got = l.str_map().get("a2").await; assert_eq!(got, Marked::new_normal(6, b("b4"), None)); - let got = MapApiRO::::get(&l, "w1").await; + let got = l.str_map().get("w1").await; assert_eq!(got, Marked::new_tomb_stone(0)); // Check base level let frozen = l.frozen_ref(); - let it = MapApiRO::::range(frozen, s("")..).await; + let it = frozen.str_map().range(s("")..).await; let got = it.collect::>().await; assert_eq!(got, vec![ // @@ -213,21 +213,21 @@ async fn test_two_levels() -> anyhow::Result<()> { async fn build_3_levels() -> LeveledMap { let mut l = LeveledMap::default(); // internal_seq: 0 - MapApi::::set(&mut l, s("a"), Some((b("a0"), None))).await; - MapApi::::set(&mut l, s("b"), Some((b("b0"), None))).await; - MapApi::::set(&mut l, s("c"), Some((b("c0"), None))).await; - MapApi::::set(&mut l, s("d"), Some((b("d0"), None))).await; + l.str_map_mut().set(s("a"), Some((b("a0"), None))).await; + l.str_map_mut().set(s("b"), Some((b("b0"), None))).await; + l.str_map_mut().set(s("c"), Some((b("c0"), None))).await; + l.str_map_mut().set(s("d"), Some((b("d0"), None))).await; l.freeze_writable(); // internal_seq: 4 - MapApi::::set(&mut l, s("b"), None).await; - MapApi::::set(&mut l, s("c"), Some((b("c1"), None))).await; - MapApi::::set(&mut l, s("e"), Some((b("e1"), None))).await; + l.str_map_mut().set(s("b"), None).await; + l.str_map_mut().set(s("c"), Some((b("c1"), None))).await; + l.str_map_mut().set(s("e"), Some((b("e1"), None))).await; l.freeze_writable(); // internal_seq: 6 - MapApi::::set(&mut l, s("c"), None).await; - MapApi::::set(&mut l, s("d"), Some((b("d2"), None))).await; + l.str_map_mut().set(s("c"), None).await; + l.str_map_mut().set(s("d"), Some((b("d2"), None))).await; l } @@ -236,28 +236,25 @@ async fn build_3_levels() -> LeveledMap { async fn test_three_levels_get_range() -> anyhow::Result<()> { let l = build_3_levels().await; - let got = MapApiRO::::get(&l, "a").await; + let got = l.str_map().get("a").await; assert_eq!(got, Marked::new_normal(1, b("a0"), None)); - let got = MapApiRO::::get(&l, "b").await; + let got = l.str_map().get("b").await; assert_eq!(got, Marked::new_tomb_stone(4)); - let got = MapApiRO::::get(&l, "c").await; + let got = l.str_map().get("c").await; assert_eq!(got, Marked::new_tomb_stone(6)); - let got = MapApiRO::::get(&l, "d").await; + let got = l.str_map().get("d").await; assert_eq!(got, Marked::new_normal(7, b("d2"), None)); - let got = MapApiRO::::get(&l, "e").await; + let got = l.str_map().get("e").await; assert_eq!(got, Marked::new_normal(6, b("e1"), None)); - let got = MapApiRO::::get(&l, "f").await; + let got = l.str_map().get("f").await; assert_eq!(got, Marked::new_tomb_stone(0)); - let got = MapApiRO::::range(&l, s("")..) - .await - .collect::>() - .await; + let got = l.str_map().range(s("")..).await.collect::>().await; assert_eq!(got, vec![ // (s("a"), Marked::new_normal(1, b("a0"), None)), @@ -274,34 +271,31 @@ async fn test_three_levels_get_range() -> anyhow::Result<()> { async fn test_three_levels_override() -> anyhow::Result<()> { let mut l = build_3_levels().await; - let (prev, result) = MapApi::::set(&mut l, s("a"), Some((b("x"), None))).await; + let (prev, result) = l.str_map_mut().set(s("a"), Some((b("x"), None))).await; assert_eq!(prev, Marked::new_normal(1, b("a0"), None)); assert_eq!(result, Marked::new_normal(8, b("x"), None)); - let (prev, result) = MapApi::::set(&mut l, s("b"), Some((b("y"), None))).await; + let (prev, result) = l.str_map_mut().set(s("b"), Some((b("y"), None))).await; assert_eq!(prev, Marked::new_tomb_stone(4)); assert_eq!(result, Marked::new_normal(9, b("y"), None)); - let (prev, result) = MapApi::::set(&mut l, s("c"), Some((b("z"), None))).await; + let (prev, result) = l.str_map_mut().set(s("c"), Some((b("z"), None))).await; assert_eq!(prev, Marked::new_tomb_stone(6)); assert_eq!(result, Marked::new_normal(10, b("z"), None)); - let (prev, result) = MapApi::::set(&mut l, s("d"), Some((b("u"), None))).await; + let (prev, result) = l.str_map_mut().set(s("d"), Some((b("u"), None))).await; assert_eq!(prev, Marked::new_normal(7, b("d2"), None)); assert_eq!(result, Marked::new_normal(11, b("u"), None)); - let (prev, result) = MapApi::::set(&mut l, s("e"), Some((b("v"), None))).await; + let (prev, result) = l.str_map_mut().set(s("e"), Some((b("v"), None))).await; assert_eq!(prev, Marked::new_normal(6, b("e1"), None)); assert_eq!(result, Marked::new_normal(12, b("v"), None)); - let (prev, result) = MapApi::::set(&mut l, s("f"), Some((b("w"), None))).await; + let (prev, result) = l.str_map_mut().set(s("f"), Some((b("w"), None))).await; assert_eq!(prev, Marked::new_tomb_stone(0)); assert_eq!(result, Marked::new_normal(13, b("w"), None)); - let got = MapApiRO::::range(&l, s("")..) - .await - .collect::>() - .await; + let got = l.str_map().range(s("")..).await.collect::>().await; assert_eq!(got, vec![ // (s("a"), Marked::new_normal(8, b("x"), None)), @@ -319,34 +313,31 @@ async fn test_three_levels_override() -> anyhow::Result<()> { async fn test_three_levels_delete() -> anyhow::Result<()> { let mut l = build_3_levels().await; - let (prev, result) = MapApi::::set(&mut l, s("a"), None).await; + let (prev, result) = l.str_map_mut().set(s("a"), None).await; assert_eq!(prev, Marked::new_normal(1, b("a0"), None)); assert_eq!(result, Marked::new_tomb_stone(7)); - let (prev, result) = MapApi::::set(&mut l, s("b"), None).await; + let (prev, result) = l.str_map_mut().set(s("b"), None).await; assert_eq!(prev, Marked::new_tomb_stone(4)); assert_eq!(result, Marked::new_tomb_stone(7)); - let (prev, result) = MapApi::::set(&mut l, s("c"), None).await; + let (prev, result) = l.str_map_mut().set(s("c"), None).await; assert_eq!(prev, Marked::new_tomb_stone(6)); assert_eq!(result, Marked::new_tomb_stone(7)); - let (prev, result) = MapApi::::set(&mut l, s("d"), None).await; + let (prev, result) = l.str_map_mut().set(s("d"), None).await; assert_eq!(prev, Marked::new_normal(7, b("d2"), None)); assert_eq!(result, Marked::new_tomb_stone(7)); - let (prev, result) = MapApi::::set(&mut l, s("e"), None).await; + let (prev, result) = l.str_map_mut().set(s("e"), None).await; assert_eq!(prev, Marked::new_normal(6, b("e1"), None)); assert_eq!(result, Marked::new_tomb_stone(7)); - let (prev, result) = MapApi::::set(&mut l, s("f"), None).await; + let (prev, result) = l.str_map_mut().set(s("f"), None).await; assert_eq!(prev, Marked::new_tomb_stone(0)); assert_eq!(result, Marked::new_tomb_stone(0)); - let got = MapApiRO::::range(&l, s("")..) - .await - .collect::>() - .await; + let got = l.str_map().range(s("")..).await.collect::>().await; assert_eq!(got, vec![ // (s("a"), Marked::new_tomb_stone(7)), @@ -367,35 +358,29 @@ async fn build_2_level_with_meta() -> LeveledMap { let mut l = LeveledMap::default(); // internal_seq: 0 - MapApi::::set( - &mut l, - s("a"), - Some((b("a0"), Some(KVMeta { expire_at: Some(1) }))), - ) - .await; - MapApi::::set(&mut l, s("b"), Some((b("b0"), None))).await; - MapApi::::set( - &mut l, - s("c"), - Some((b("c0"), Some(KVMeta { expire_at: Some(2) }))), - ) - .await; + l.str_map_mut() + .set(s("a"), Some((b("a0"), Some(KVMeta { expire_at: Some(1) })))) + .await; + l.str_map_mut().set(s("b"), Some((b("b0"), None))).await; + l.str_map_mut() + .set(s("c"), Some((b("c0"), Some(KVMeta { expire_at: Some(2) })))) + .await; l.freeze_writable(); // internal_seq: 3 - MapApi::::set( - &mut l, - s("b"), - Some(( - b("b1"), - Some(KVMeta { - expire_at: Some(10), - }), - )), - ) - .await; - MapApi::::set(&mut l, s("c"), Some((b("c1"), None))).await; + l.str_map_mut() + .set( + s("b"), + Some(( + b("b1"), + Some(KVMeta { + expire_at: Some(10), + }), + )), + ) + .await; + l.str_map_mut().set(s("c"), Some((b("c1"), None))).await; l } @@ -416,7 +401,7 @@ async fn test_two_level_update_value() -> anyhow::Result<()> { Marked::new_normal(6, b("a1"), Some(KVMeta { expire_at: Some(1) })) ); - let got = MapApiRO::::get(&l, "a").await; + let got = l.str_map().get("a").await; assert_eq!( got, Marked::new_normal(6, b("a1"), Some(KVMeta { expire_at: Some(1) })) @@ -449,7 +434,7 @@ async fn test_two_level_update_value() -> anyhow::Result<()> { ) ); - let got = MapApiRO::::get(&l, "b").await; + let got = l.str_map().get("b").await; assert_eq!( got, Marked::new_normal( @@ -470,7 +455,7 @@ async fn test_two_level_update_value() -> anyhow::Result<()> { assert_eq!(prev, Marked::new_tomb_stone(0)); assert_eq!(result, Marked::new_normal(6, b("d1"), None)); - let got = MapApiRO::::get(&l, "d").await; + let got = l.str_map().get("d").await; assert_eq!(got, Marked::new_normal(6, b("d1"), None)); } @@ -494,7 +479,7 @@ async fn test_two_level_update_meta() -> anyhow::Result<()> { Marked::new_normal(6, b("a0"), Some(KVMeta { expire_at: Some(2) })) ); - let got = MapApiRO::::get(&l, "a").await; + let got = l.str_map().get("a").await; assert_eq!( got, Marked::new_normal(6, b("a0"), Some(KVMeta { expire_at: Some(2) })) @@ -518,7 +503,7 @@ async fn test_two_level_update_meta() -> anyhow::Result<()> { ); assert_eq!(result, Marked::new_normal(6, b("b1"), None)); - let got = MapApiRO::::get(&l, "b").await; + let got = l.str_map().get("b").await; assert_eq!(got, Marked::new_normal(6, b("b1"), None)); } @@ -546,7 +531,7 @@ async fn test_two_level_update_meta() -> anyhow::Result<()> { ) ); - let got = MapApiRO::::get(&l, "c").await; + let got = l.str_map().get("c").await; assert_eq!( got, Marked::new_normal( @@ -568,7 +553,7 @@ async fn test_two_level_update_meta() -> anyhow::Result<()> { assert_eq!(prev, Marked::new_tomb_stone(0)); assert_eq!(result, Marked::new_tomb_stone(0)); - let got = MapApiRO::::get(&l, "d").await; + let got = l.str_map().get("d").await; assert_eq!(got, Marked::new_tomb_stone(0)); } diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/map_api.rs b/src/meta/raft-store/src/sm_v002/leveled_store/map_api.rs index e3e1691ffd7d..6c31a3e64dc8 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/map_api.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/map_api.rs @@ -24,6 +24,7 @@ use stream_more::StreamMore; use crate::sm_v002::leveled_store::level::Level; use crate::sm_v002::leveled_store::util; use crate::sm_v002::marked::Marked; +use crate::state_machine::ExpireKey; /// MapKey defines the behavior of a key in a map. /// @@ -80,6 +81,43 @@ where K: MapKey R: RangeBounds + Send + Sync + Clone; } +/// Trait for using Self as an implementation of the MapApi. +pub(in crate::sm_v002) trait AsMap { + /// Use Self as an implementation of the [`MapApiRO`] (Read-Only) interface. + fn as_map(&self) -> &impl MapApiRO + where Self: MapApiRO + Sized { + self + } + + /// Use Self as an implementation of the [`MapApi`] interface, allowing for mutation. + fn as_map_mut(&mut self) -> &mut impl MapApi + where Self: MapApi + Sized { + self + } + + fn str_map(&self) -> &impl MapApiRO + where Self: MapApiRO + Sized { + self + } + + fn expire_map(&self) -> &impl MapApiRO + where Self: MapApiRO + Sized { + self + } + + fn str_map_mut(&mut self) -> &mut impl MapApi + where Self: MapApi + Sized { + self + } + + fn expire_map_mut(&mut self) -> &mut impl MapApi + where Self: MapApi + Sized { + self + } +} + +impl AsMap for T {} + /// Provide a read-write key-value map API set, used to access state machine data. #[async_trait::async_trait] pub(in crate::sm_v002) trait MapApi: MapApiRO diff --git a/src/meta/raft-store/src/sm_v002/sm_v002.rs b/src/meta/raft-store/src/sm_v002/sm_v002.rs index 1463240e2e43..9ffc4b7ec501 100644 --- a/src/meta/raft-store/src/sm_v002/sm_v002.rs +++ b/src/meta/raft-store/src/sm_v002/sm_v002.rs @@ -52,6 +52,7 @@ use crate::applier::Applier; use crate::key_spaces::RaftStoreEntry; use crate::sm_v002::leveled_store::level::Level; use crate::sm_v002::leveled_store::leveled_map::LeveledMap; +use crate::sm_v002::leveled_store::map_api::AsMap; use crate::sm_v002::leveled_store::map_api::MapApi; use crate::sm_v002::leveled_store::map_api::MapApiExt; use crate::sm_v002::leveled_store::map_api::MapApiRO; @@ -251,7 +252,7 @@ impl SMV002 { /// /// It does not check expiration of the returned entry. pub async fn get_kv(&self, key: &str) -> Option { - let got = MapApiRO::::get(&self.levels.to_ref(), key).await; + let got = self.levels.str_map().get(key).await; Into::>::into(got) } @@ -261,7 +262,7 @@ impl SMV002 { pub async fn prefix_list_kv(&self, prefix: &str) -> Vec<(String, SeqV)> { let p = prefix.to_string(); let mut res = Vec::new(); - let strm = MapApiRO::::range(&self.levels, p..).await; + let strm = self.levels.str_map().range(p..).await; { let mut strm = std::pin::pin!(strm); @@ -297,7 +298,8 @@ impl SMV002 { /// List expiration index by expiration time. pub(crate) async fn list_expire_index(&self) -> impl Stream + '_ { self.levels - .range::(&self.expire_cursor..) + .expire_map() + .range(&self.expire_cursor..) .await // Return only non-deleted records .filter_map(|(k, v)| async move { @@ -390,9 +392,7 @@ impl SMV002 { &mut self, upsert_kv: &UpsertKV, ) -> (Marked>, Marked>) { - let prev = MapApiRO::::get(&self.levels, &upsert_kv.key) - .await - .clone(); + let prev = self.levels.str_map().get(&upsert_kv.key).await.clone(); if upsert_kv.seq.match_seq(prev.seq()).is_err() { return (prev.clone(), prev); diff --git a/src/meta/raft-store/src/sm_v002/sm_v002_test.rs b/src/meta/raft-store/src/sm_v002/sm_v002_test.rs index d2bae9d6e184..14fe98d66216 100644 --- a/src/meta/raft-store/src/sm_v002/sm_v002_test.rs +++ b/src/meta/raft-store/src/sm_v002/sm_v002_test.rs @@ -18,6 +18,7 @@ use common_meta_types::UpsertKV; use futures_util::StreamExt; use pretty_assertions::assert_eq; +use crate::sm_v002::leveled_store::map_api::AsMap; use crate::sm_v002::leveled_store::map_api::MapApiRO; use crate::sm_v002::marked::Marked; use crate::sm_v002::SMV002; @@ -183,7 +184,8 @@ async fn test_internal_expire_index() -> anyhow::Result<()> { // Check internal expire index let got = sm .levels - .range::(..) + .expire_map() + .range(..) .await .collect::>() .await; @@ -250,7 +252,8 @@ async fn test_inserting_expired_becomes_deleting() -> anyhow::Result<()> { // Check expire store let got = sm .levels - .range::(..) + .expire_map() + .range(..) .await .collect::>() .await; diff --git a/src/meta/raft-store/src/sm_v002/snapshot_view_v002.rs b/src/meta/raft-store/src/sm_v002/snapshot_view_v002.rs index d1a60296fa00..521c0127b6bd 100644 --- a/src/meta/raft-store/src/sm_v002/snapshot_view_v002.rs +++ b/src/meta/raft-store/src/sm_v002/snapshot_view_v002.rs @@ -23,11 +23,11 @@ use futures_util::StreamExt; use crate::key_spaces::RaftStoreEntry; use crate::ondisk::Header; use crate::ondisk::OnDisk; +use crate::sm_v002::leveled_store::map_api::AsMap; use crate::sm_v002::leveled_store::map_api::MapApiRO; use crate::sm_v002::leveled_store::static_leveled_map::StaticLeveledMap; use crate::sm_v002::leveled_store::sys_data_api::SysDataApiRO; use crate::sm_v002::marked::Marked; -use crate::state_machine::ExpireKey; use crate::state_machine::ExpireValue; use crate::state_machine::MetaSnapshotId; use crate::state_machine::StateMachineMetaKey; @@ -93,24 +93,23 @@ impl SnapshotViewV002 { let mut data = self.compacted.newest().unwrap().new_level(); // `range()` will compact tombstone internally - let strm = MapApiRO::::range::(&self.compacted, ..) - .await - .filter(|(_k, v)| { - let x = !v.is_tomb_stone(); - async move { x } - }); + + let strm = self.compacted.str_map().range::(..).await; + let strm = strm.filter(|(_k, v)| { + let x = !v.is_tomb_stone(); + async move { x } + }); let bt = strm.collect().await; data.replace_kv(bt); // `range()` will compact tombstone internally - let strm = MapApiRO::::range(&self.compacted, ..) - .await - .filter(|(_k, v)| { - let x = !v.is_tomb_stone(); - async move { x } - }); + let strm = self.compacted.expire_map().range(..).await; + let strm = strm.filter(|(_k, v)| { + let x = !v.is_tomb_stone(); + async move { x } + }); let bt = strm.collect().await; @@ -171,46 +170,44 @@ impl SnapshotViewV002 { // kv - let kv_iter = MapApiRO::::range::(&self.compacted, ..) - .await - .filter_map(|(k, v)| async move { - if let Marked::Normal { - internal_seq, - value, - meta, - } = v - { - let seqv = SeqV::with_meta(internal_seq, meta, value); - Some(RaftStoreEntry::GenericKV { - key: k.clone(), - value: seqv, - }) - } else { - None - } - }); + let strm = self.compacted.str_map().range::(..).await; + let kv_iter = strm.filter_map(|(k, v)| async move { + if let Marked::Normal { + internal_seq, + value, + meta, + } = v + { + let seqv = SeqV::with_meta(internal_seq, meta, value); + Some(RaftStoreEntry::GenericKV { + key: k.clone(), + value: seqv, + }) + } else { + None + } + }); // expire index - let expire_iter = MapApiRO::::range(&self.compacted, ..) - .await - .filter_map(|(k, v)| async move { - if let Marked::Normal { - internal_seq, - value, - meta: _, - } = v - { - let ev = ExpireValue::new(value, internal_seq); - - Some(RaftStoreEntry::Expire { - key: k.clone(), - value: ev, - }) - } else { - None - } - }); + let strm = self.compacted.expire_map().range(..).await; + let expire_iter = strm.filter_map(|(k, v)| async move { + if let Marked::Normal { + internal_seq, + value, + meta: _, + } = v + { + let ev = ExpireValue::new(value, internal_seq); + + Some(RaftStoreEntry::Expire { + key: k.clone(), + value: ev, + }) + } else { + None + } + }); futures::stream::iter(sm_meta) .chain(kv_iter) diff --git a/src/meta/raft-store/src/sm_v002/snapshot_view_v002_test.rs b/src/meta/raft-store/src/sm_v002/snapshot_view_v002_test.rs index 2071fe57b3e9..f89b327cb16a 100644 --- a/src/meta/raft-store/src/sm_v002/snapshot_view_v002_test.rs +++ b/src/meta/raft-store/src/sm_v002/snapshot_view_v002_test.rs @@ -27,6 +27,7 @@ use pretty_assertions::assert_eq; use crate::key_spaces::RaftStoreEntry; use crate::sm_v002::leveled_store::leveled_map::LeveledMap; +use crate::sm_v002::leveled_store::map_api::AsMap; use crate::sm_v002::leveled_store::map_api::MapApi; use crate::sm_v002::leveled_store::map_api::MapApiRO; use crate::sm_v002::leveled_store::static_leveled_map::StaticLeveledMap; @@ -61,7 +62,9 @@ async fn test_compact_copied_value_and_kv() -> anyhow::Result<()> { &btreemap! {3=>Node::new("3", Endpoint::new("3", 3))} ); - let got = MapApiRO::::range::(d, ..) + let got = d + .str_map() + .range::(..) .await .collect::>() .await; @@ -72,10 +75,7 @@ async fn test_compact_copied_value_and_kv() -> anyhow::Result<()> { (s("e"), Marked::new_normal(6, b("e1"), None)), ]); - let got = MapApiRO::::range(d, ..) - .await - .collect::>() - .await; + let got = d.expire_map().range(..).await.collect::>().await; assert_eq!(got, vec![]); Ok(()) @@ -93,7 +93,9 @@ async fn test_compact_expire_index() -> anyhow::Result<()> { let d = compacted.newest().unwrap().as_ref(); - let got = MapApiRO::::range::(d, ..) + let got = d + .str_map() + .range::(..) .await .collect::>() .await; @@ -125,10 +127,7 @@ async fn test_compact_expire_index() -> anyhow::Result<()> { ), ]); - let got = MapApiRO::::range(d, ..) - .await - .collect::>() - .await; + let got = d.expire_map().range(..).await.collect::>().await; assert_eq!(got, vec![ // ( @@ -256,10 +255,10 @@ async fn build_3_levels() -> LeveledMap { *sd.nodes_mut() = btreemap! {1=>Node::new("1", Endpoint::new("1", 1))}; // internal_seq: 0 - MapApi::::set(&mut l, s("a"), Some((b("a0"), None))).await; - MapApi::::set(&mut l, s("b"), Some((b("b0"), None))).await; - MapApi::::set(&mut l, s("c"), Some((b("c0"), None))).await; - MapApi::::set(&mut l, s("d"), Some((b("d0"), None))).await; + l.str_map_mut().set(s("a"), Some((b("a0"), None))).await; + l.str_map_mut().set(s("b"), Some((b("b0"), None))).await; + l.str_map_mut().set(s("c"), Some((b("c0"), None))).await; + l.str_map_mut().set(s("d"), Some((b("d0"), None))).await; l.freeze_writable(); let sd = l.writable_mut().sys_data_mut(); @@ -270,9 +269,9 @@ async fn build_3_levels() -> LeveledMap { *sd.nodes_mut() = btreemap! {2=>Node::new("2", Endpoint::new("2", 2))}; // internal_seq: 4 - MapApi::::set(&mut l, s("b"), None).await; - MapApi::::set(&mut l, s("c"), Some((b("c1"), None))).await; - MapApi::::set(&mut l, s("e"), Some((b("e1"), None))).await; + l.str_map_mut().set(s("b"), None).await; + l.str_map_mut().set(s("c"), Some((b("c1"), None))).await; + l.str_map_mut().set(s("e"), Some((b("e1"), None))).await; l.freeze_writable(); let sd = l.writable_mut().sys_data_mut(); @@ -283,8 +282,8 @@ async fn build_3_levels() -> LeveledMap { *sd.nodes_mut() = btreemap! {3=>Node::new("3", Endpoint::new("3", 3))}; // internal_seq: 6 - MapApi::::set(&mut l, s("c"), None).await; - MapApi::::set(&mut l, s("d"), Some((b("d2"), None))).await; + l.str_map_mut().set(s("c"), None).await; + l.str_map_mut().set(s("d"), Some((b("d2"), None))).await; l }