From f217cf081dd5fb54abf8c458a3c116a4aea764a1 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Sep 2023 11:54:34 +0800 Subject: [PATCH 01/14] ffi Signed-off-by: CalvinNeo --- proxy_components/proxy_ffi/src/interfaces.rs | 20 ++++++++++++++++++- .../src/raftstore_proxy_helper_impls.rs | 2 ++ .../sst_reader_dispatcher.rs | 18 ++++++++++++++++- .../ffi/src/RaftStoreProxyFFI/@version | 2 +- .../ffi/src/RaftStoreProxyFFI/ProxyFFI.h | 11 ++++++++-- 5 files changed, 48 insertions(+), 5 deletions(-) diff --git a/proxy_components/proxy_ffi/src/interfaces.rs b/proxy_components/proxy_ffi/src/interfaces.rs index b0edffb5139..d7b2f5b93e1 100644 --- a/proxy_components/proxy_ffi/src/interfaces.rs +++ b/proxy_components/proxy_ffi/src/interfaces.rs @@ -234,6 +234,12 @@ pub mod root { } #[repr(C)] #[derive(Debug)] + pub struct RustBaseBuffVec { + pub buffs: *mut root::DB::BaseBuffView, + pub len: u64, + } + #[repr(C)] + #[derive(Debug)] pub struct SSTReaderInterfaces { pub fn_get_sst_reader: ::std::option::Option< unsafe extern "C" fn( @@ -285,6 +291,18 @@ pub mod root { arg4: root::DB::BaseBuffView, ), >, + pub fn_approx_size: ::std::option::Option< + unsafe extern "C" fn( + arg1: root::DB::SSTReaderPtr, + arg2: root::DB::ColumnFamilyType, + ) -> u64, + >, + pub fn_get_split_keys: ::std::option::Option< + unsafe extern "C" fn( + arg1: root::DB::SSTReaderPtr, + splits_count: u64, + ) -> root::DB::RustBaseBuffVec, + >, } #[repr(u32)] #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] @@ -659,7 +677,7 @@ pub mod root { arg3: root::DB::RawVoidPtr, ) -> u32; } - pub const RAFT_STORE_PROXY_VERSION: u64 = 3797917752479181299; + pub const RAFT_STORE_PROXY_VERSION: u64 = 8718362111406035968; pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639; } } diff --git a/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs b/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs index 54476e3c76c..b8fd541d2d6 100644 --- a/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs +++ b/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs @@ -75,6 +75,8 @@ impl RaftStoreProxyFFIHelper { fn_gc: Some(ffi_gc_sst_reader), fn_kind: Some(ffi_sst_reader_format_kind), fn_seek: Some(ffi_sst_reader_seek), + fn_approx_size: Some(ffi_approx_size), + fn_get_split_keys: Some(ffi_get_split_keys), }, fn_server_info: None, fn_make_read_index_task: Some(ffi_make_read_index_task), diff --git a/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs b/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs index 88e6529c131..d6d02c44344 100644 --- a/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs +++ b/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs @@ -5,7 +5,7 @@ use super::{sst_file_reader::*, tablet_reader::TabletReader, LockCFFileReader}; use crate::{ interfaces_ffi::{ BaseBuffView, ColumnFamilyType, EngineIteratorSeekType, RaftStoreProxyPtr, SSTFormatKind, - SSTReaderInterfaces, SSTReaderPtr, SSTView, + SSTReaderInterfaces, SSTReaderPtr, SSTView, RustBaseBuffVec }, raftstore_proxy_helper_impls::RaftStoreProxyFFI, }; @@ -22,6 +22,8 @@ impl Clone for SSTReaderInterfaces { fn_gc: self.fn_gc.clone(), fn_kind: self.fn_kind.clone(), fn_seek: self.fn_seek.clone(), + fn_approx_size: self.fn_approx_size.clone(), + fn_get_split_keys: self.fn_get_split_keys.clone(), } } } @@ -190,3 +192,17 @@ pub unsafe extern "C" fn ffi_sst_reader_seek( SSTFormatKind::KIND_TABLET => reader.as_mut_tablet().ffi_seek(type_, seek_type, key), } } + +pub unsafe extern "C" fn ffi_approx_size( + reader: SSTReaderPtr, + type_: ColumnFamilyType, +) -> u64 { + todo!() +} + +pub unsafe extern "C" fn ffi_get_split_keys( + reader: SSTReaderPtr, + splits_count: u64, +) -> RustBaseBuffVec { + todo!() +} diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version index 3da97c9b260..8d42c40f2d6 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version @@ -1,3 +1,3 @@ #pragma once #include -namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 3797917752479181299ull; } \ No newline at end of file +namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 8718362111406035968ull; } \ No newline at end of file diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h index 42a0f7dc640..f41d596d4b2 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h @@ -123,14 +123,14 @@ struct RawCppPtrCarr { }; // An tuple of pointers, like `void **`, -// Can be used to represent structures. +// Can be used to represent structures created from C side. struct RawCppPtrTuple { RawCppPtr *inner; const uint64_t len; }; // An array of pointers(same type), like `T **`, -// Can be used to represent arrays. +// Can be used to represent arrays created from C side. struct RawCppPtrArr { RawVoidPtr *inner; const uint64_t len; @@ -174,6 +174,11 @@ struct SSTReaderPtr { SSTFormatKind kind; }; +struct RustBaseBuffVec { + BaseBuffView *buffs; + uint64_t len; +}; + struct SSTReaderInterfaces { SSTReaderPtr (*fn_get_sst_reader)(SSTView, RaftStoreProxyPtr); uint8_t (*fn_remained)(SSTReaderPtr, ColumnFamilyType); @@ -184,6 +189,8 @@ struct SSTReaderInterfaces { SSTFormatKind (*fn_kind)(SSTReaderPtr, ColumnFamilyType); void (*fn_seek)(SSTReaderPtr, ColumnFamilyType, EngineIteratorSeekType, BaseBuffView); + uint64_t (*fn_approx_size)(SSTReaderPtr, ColumnFamilyType); + RustBaseBuffVec (*fn_get_split_keys)(SSTReaderPtr, uint64_t splits_count); }; enum class MsgPBType : uint32_t { From 555a007636a829d08967ce64cc3372356d9fbe69 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Sep 2023 17:12:04 +0800 Subject: [PATCH 02/14] add call Signed-off-by: CalvinNeo --- .../proxy_ffi/src/domain_impls.rs | 54 +++++++- proxy_components/proxy_ffi/src/interfaces.rs | 5 +- .../sst_reader_dispatcher.rs | 22 +-- .../snapshot_reader_impls/tablet_reader.rs | 42 +++++- .../proxy/v2_compat/tablet_snapshot.rs | 130 ++++++++++++------ .../ffi/src/RaftStoreProxyFFI/@version | 2 +- .../ffi/src/RaftStoreProxyFFI/ProxyFFI.h | 6 +- 7 files changed, 201 insertions(+), 60 deletions(-) diff --git a/proxy_components/proxy_ffi/src/domain_impls.rs b/proxy_components/proxy_ffi/src/domain_impls.rs index f16dd015fc5..22b20f02d09 100644 --- a/proxy_components/proxy_ffi/src/domain_impls.rs +++ b/proxy_components/proxy_ffi/src/domain_impls.rs @@ -7,8 +7,8 @@ use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE}; use super::{ interfaces_ffi, interfaces_ffi::{ - BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, SSTView, SSTViewVec, - WriteCmdType, WriteCmdsView, + BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, RustBaseBuffVec, + SSTView, SSTViewVec, WriteCmdType, WriteCmdsView, }, read_index_helper, utils, }; @@ -131,6 +131,7 @@ pub enum RawRustPtrType { ReadIndexTask = 1, ArcFutureWaker = 2, TimerTask = 3, + VecOfString = 4, } impl From for RawRustPtrType { @@ -162,6 +163,9 @@ pub extern "C" fn ffi_gc_rust_ptr(data: RawVoidPtr, type_: interfaces_ffi::RawRu RawRustPtrType::TimerTask => unsafe { drop(Box::from_raw(data as *mut utils::TimerTask)); }, + RawRustPtrType::VecOfString => unsafe { + drop(Box::from_raw(data as *mut RustBaseBuffVecInner)); + }, _ => unreachable!(), } } @@ -180,3 +184,49 @@ impl RawRustPtr { self.ptr.is_null() } } + +impl Default for RustBaseBuffVec { + fn default() -> Self { + RustBaseBuffVec { + buffs: std::ptr::null_mut(), + len: 0, + inner: RawRustPtr::default(), + } + } +} + +struct RustBaseBuffVecInner { + // Hold the Vec of String. + _data: Pin>>>, + // Hold the BaseBuffView array. + buff_view_vec: Pin>>, +} + +pub fn build_from_vec_string(s: Vec>) -> RustBaseBuffVec { + let vec_len = s.len(); + let vec_len_64: u64 = vec_len as u64; + let inner_vec_of_string = Box::pin(s); + let mut buff_view_vec: Vec = Vec::with_capacity(vec_len); + for i in 0..vec_len { + buff_view_vec.push(BaseBuffView { + data: inner_vec_of_string[i].as_ptr() as *const _, + len: inner_vec_of_string[i].len() as u64, + }); + } + let inner = RustBaseBuffVecInner { + _data: inner_vec_of_string, + buff_view_vec: Box::pin(buff_view_vec), + }; + let inner_wrapped = RawRustPtr { + ptr: &inner as *const _ as RawVoidPtr, + type_: RawRustPtrType::VecOfString.into(), + }; + let buff_view_vec_ptr = inner.buff_view_vec.as_ptr(); + std::mem::forget(inner); + + RustBaseBuffVec { + buffs: buff_view_vec_ptr, + len: vec_len_64, + inner: inner_wrapped, + } +} diff --git a/proxy_components/proxy_ffi/src/interfaces.rs b/proxy_components/proxy_ffi/src/interfaces.rs index d7b2f5b93e1..eff13b36435 100644 --- a/proxy_components/proxy_ffi/src/interfaces.rs +++ b/proxy_components/proxy_ffi/src/interfaces.rs @@ -235,8 +235,9 @@ pub mod root { #[repr(C)] #[derive(Debug)] pub struct RustBaseBuffVec { - pub buffs: *mut root::DB::BaseBuffView, + pub buffs: *const root::DB::BaseBuffView, pub len: u64, + pub inner: root::DB::RawRustPtr, } #[repr(C)] #[derive(Debug)] @@ -677,7 +678,7 @@ pub mod root { arg3: root::DB::RawVoidPtr, ) -> u32; } - pub const RAFT_STORE_PROXY_VERSION: u64 = 8718362111406035968; + pub const RAFT_STORE_PROXY_VERSION: u64 = 1159357061809427070; pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639; } } diff --git a/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs b/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs index d6d02c44344..d23ef0da54b 100644 --- a/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs +++ b/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs @@ -4,8 +4,8 @@ use super::{sst_file_reader::*, tablet_reader::TabletReader, LockCFFileReader}; use crate::{ interfaces_ffi::{ - BaseBuffView, ColumnFamilyType, EngineIteratorSeekType, RaftStoreProxyPtr, SSTFormatKind, - SSTReaderInterfaces, SSTReaderPtr, SSTView, RustBaseBuffVec + BaseBuffView, ColumnFamilyType, EngineIteratorSeekType, RaftStoreProxyPtr, RawRustPtr, + RustBaseBuffVec, SSTFormatKind, SSTReaderInterfaces, SSTReaderPtr, SSTView, }, raftstore_proxy_helper_impls::RaftStoreProxyFFI, }; @@ -193,16 +193,20 @@ pub unsafe extern "C" fn ffi_sst_reader_seek( } } -pub unsafe extern "C" fn ffi_approx_size( - reader: SSTReaderPtr, - type_: ColumnFamilyType, -) -> u64 { - todo!() +pub unsafe extern "C" fn ffi_approx_size(mut reader: SSTReaderPtr, type_: ColumnFamilyType) -> u64 { + match reader.kind { + SSTFormatKind::KIND_SST => 0, + SSTFormatKind::KIND_TABLET => reader.as_mut_tablet().ffi_approx_size(type_), + } } +// It will generate `splits_count-1` keys to make `splits_count` parts. pub unsafe extern "C" fn ffi_get_split_keys( - reader: SSTReaderPtr, + mut reader: SSTReaderPtr, splits_count: u64, ) -> RustBaseBuffVec { - todo!() + match reader.kind { + SSTFormatKind::KIND_SST => RustBaseBuffVec::default(), + SSTFormatKind::KIND_TABLET => reader.as_mut_tablet().ffi_get_split_keys(splits_count), + } } diff --git a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs index 8b293aba905..f2d39f037ed 100644 --- a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs +++ b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs @@ -3,12 +3,13 @@ use std::{cell::RefCell, sync::Arc}; use encryption::DataKeyManager; use engine_rocks::{get_env, RocksCfOptions, RocksDbOptions}; -use engine_traits::{Iterable, Iterator}; +use engine_traits::{Iterable, Iterator, RangePropertiesExt, CF_WRITE}; use crate::{ - cf_to_name, + build_from_vec_string, cf_to_name, interfaces_ffi::{ - BaseBuffView, ColumnFamilyType, EngineIteratorSeekType, SSTFormatKind, SSTReaderPtr, + BaseBuffView, ColumnFamilyType, EngineIteratorSeekType, RawRustPtr, RustBaseBuffVec, + SSTFormatKind, SSTReaderPtr, }, }; @@ -121,4 +122,39 @@ impl TabletReader { } }; } + + pub fn ffi_approx_size(&self, cf: ColumnFamilyType) -> u64 { + let handle = + engine_rocks::util::get_cf_handle(self.kv_engine.as_inner(), cf_to_name(cf)).unwrap(); + let v = self + .kv_engine + .as_inner() + .get_approximate_sizes_cf(handle, &[rocksdb::Range::new(b"", keys::DATA_MAX_KEY)]); + assert_eq!(v.len(), 1); + v[0] + } + + pub fn ffi_get_split_keys(&self, splits_count: u64) -> RustBaseBuffVec { + let range = engine_traits::Range { + start_key: b"", + end_key: keys::DATA_MAX_KEY, + }; + assert!(splits_count >= 2); + let keys_count: usize = splits_count as usize - 1; + match self + .kv_engine + .get_range_approximate_split_keys_cf(CF_WRITE, range, keys_count) + { + Ok(r) => { + if r.len() < 1 { + return RustBaseBuffVec::default(); + } + build_from_vec_string(r) + } + Err(e) => { + tikv_util::info!("ffi_get_split_keys failed due to {:?}", e); + RustBaseBuffVec::default() + } + } + } } diff --git a/proxy_tests/proxy/v2_compat/tablet_snapshot.rs b/proxy_tests/proxy/v2_compat/tablet_snapshot.rs index 9cf0a3bd202..9e7af8f4d26 100644 --- a/proxy_tests/proxy/v2_compat/tablet_snapshot.rs +++ b/proxy_tests/proxy/v2_compat/tablet_snapshot.rs @@ -77,6 +77,87 @@ fn generate_snap( (msg, snap_key) } +fn prepare_snapshot( + cluster_v1: &mut Cluster, + cluster_v2: &mut test_raftstore_v2::Cluster< + test_raftstore_v2::ServerCluster, + engine_rocks::RocksEngine, + >, + key_num: usize, +) -> PathBuf { + let s1_addr = cluster_v1.get_addr(1); + let region = cluster_v2.get_region(b""); + let region_id = region.get_id(); + let engine = cluster_v2.get_engine(1); + let tablet = engine.get_tablet_by_id(region_id).unwrap(); + + for i in 0..key_num { + let k = format!("zk{:06}", i); + tablet.put(k.as_bytes(), &random_long_vec(1024)).unwrap(); + tablet + .put_cf(CF_LOCK, k.as_bytes(), &random_long_vec(1024)) + .unwrap(); + tablet + .put_cf(CF_WRITE, k.as_bytes(), &random_long_vec(1024)) + .unwrap(); + } + + let snap_mgr = cluster_v2.get_snap_mgr(1); + let security_mgr = cluster_v2.get_security_mgr(); + let (msg, snap_key) = generate_snap(&engine, region_id, &snap_mgr); + let limit = Limiter::new(f64::INFINITY); + let env = Arc::new(Environment::new(1)); + let _ = block_on(async { + let client = + TikvClient::new(security_mgr.connect(ChannelBuilder::new(env.clone()), &s1_addr)); + send_snap_v2(client, snap_mgr.clone(), msg, limit.clone()) + .await + .unwrap() + }); + + // The snapshot has been received by cluster v1, so check it's completeness + let snap_mgr = cluster_v1.get_snap_mgr(1); + snap_mgr + .tablet_snap_manager() + .expect("v1 compact tablet snap mgr") + .final_recv_path(&snap_key) +} + +#[test] +fn test_get_snapshot_split_keys() { + let mut cluster_v1 = new_server_cluster(1, 1); + let mut cluster_v2 = test_raftstore_v2::new_server_cluster(1, 1); + cluster_v1.cfg.raft_store.enable_v2_compatible_learner = true; + cluster_v1.run(); + cluster_v2.run(); + + let key_count = 10000; + let path = prepare_snapshot(&mut cluster_v1, &mut cluster_v2, key_count); + + unsafe { + let reader = TabletReader::ffi_get_cf_file_reader( + path.as_path().to_str().unwrap(), + ColumnFamilyType::Write, + None, + ); + // If we want to split the range into 2 parts, it should return 1 split key. + let res = ffi_get_split_keys(reader.clone(), 4); + assert_eq!(res.len, 3); + for i in 0..res.len { + let buff = res.buffs.add(i as usize); + let slice = (*buff).to_slice(); + let maximum = format!("zk{:06}", key_count); + let minimum = format!("zk{:06}", 0); + // We don't return the boundary. + assert!(slice > minimum.as_bytes()); + assert!(slice < maximum.as_bytes()); + } + } + + cluster_v1.shutdown(); + cluster_v2.shutdown(); +} + #[test] fn test_parse_tablet_snapshot() { let test_parse_snap = |key_num| { @@ -86,56 +167,21 @@ fn test_parse_tablet_snapshot() { cluster_v1.run(); cluster_v2.run(); - let s1_addr = cluster_v1.get_addr(1); - let region = cluster_v2.get_region(b""); - let region_id = region.get_id(); - let engine = cluster_v2.get_engine(1); - let tablet = engine.get_tablet_by_id(region_id).unwrap(); - - for i in 0..key_num { - let k = format!("zk{:04}", i); - tablet.put(k.as_bytes(), &random_long_vec(1024)).unwrap(); - tablet - .put_cf(CF_LOCK, k.as_bytes(), &random_long_vec(1024)) - .unwrap(); - tablet - .put_cf(CF_WRITE, k.as_bytes(), &random_long_vec(1024)) - .unwrap(); - } - - let snap_mgr = cluster_v2.get_snap_mgr(1); - let security_mgr = cluster_v2.get_security_mgr(); - let (msg, snap_key) = generate_snap(&engine, region_id, &snap_mgr); - let limit = Limiter::new(f64::INFINITY); - let env = Arc::new(Environment::new(1)); - let _ = block_on(async { - let client = - TikvClient::new(security_mgr.connect(ChannelBuilder::new(env.clone()), &s1_addr)); - send_snap_v2(client, snap_mgr.clone(), msg, limit.clone()) - .await - .unwrap() - }); - - // The snapshot has been received by cluster v1, so check it's completeness - let snap_mgr = cluster_v1.get_snap_mgr(1); - let path = snap_mgr - .tablet_snap_manager() - .expect("v1 compact tablet snap mgr") - .final_recv_path(&snap_key); + let path = prepare_snapshot(&mut cluster_v1, &mut cluster_v2, key_num); let validate = |cf: ColumnFamilyType| unsafe { let reader = TabletReader::ffi_get_cf_file_reader(path.as_path().to_str().unwrap(), cf, None); // SSTReaderPtr is not aware of the data prefix 'z'. - let k = format!("k{:04}", 5); + let k = format!("k{:06}", 5); let bf = BaseBuffView { data: k.as_ptr() as *const _, len: k.len() as u64, }; ffi_sst_reader_seek(reader.clone(), cf, EngineIteratorSeekType::Key, bf); for i in 5..key_num { - let k = format!("k{:04}", i); + let k = format!("k{:06}", i); assert_eq!(ffi_sst_reader_remained(reader.clone(), cf), 1); let kbf = ffi_sst_reader_key(reader.clone(), cf); assert_eq!(kbf.to_slice(), k.as_bytes()); @@ -145,7 +191,7 @@ fn test_parse_tablet_snapshot() { // If the sst is "empty" to this region. Will not panic, and remained should be // false. - let k = format!("k{:04}", key_num + 10); + let k = format!("k{:06}", key_num + 10); let bf = BaseBuffView { data: k.as_ptr() as *const _, len: k.len() as u64, @@ -160,7 +206,7 @@ fn test_parse_tablet_snapshot() { cluster_v2.shutdown(); }; - test_parse_snap(20); + test_parse_snap(10); } // This test won't run, since we don;t have transport for snapshot data. @@ -238,7 +284,7 @@ fn test_v1_apply_snap_from_v2() { let engine = cluster_v2.get_engine(1); for i in 0..50 { - let k = format!("k{:04}", i); + let k = format!("k{:06}", i); cluster_v2.must_put(k.as_bytes(), b"val"); } cluster_v2.flush_data(); @@ -266,7 +312,7 @@ fn test_v1_apply_snap_from_v2() { let path_str = path.as_path().to_str().unwrap(); for i in 11..50 { - let k = format!("k{:04}", i); + let k = format!("k{:06}", i); check_key( &cluster_v1, k.as_bytes(), diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version index 8d42c40f2d6..eaed969197b 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version @@ -1,3 +1,3 @@ #pragma once #include -namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 8718362111406035968ull; } \ No newline at end of file +namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 1159357061809427070ull; } \ No newline at end of file diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h index f41d596d4b2..7551ce886f9 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h @@ -175,8 +175,12 @@ struct SSTReaderPtr { }; struct RustBaseBuffVec { - BaseBuffView *buffs; + // It is a view of something inside `inner_vec_of_string`. + const BaseBuffView *buffs; + // It is the length of view. uint64_t len; + // Call fn_gc_rust_ptr to this after `buffs` is no longer used. + RawRustPtr inner; }; struct SSTReaderInterfaces { From a76bef6c9436f5a51e59d79942e64360f36e224d Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 15 Sep 2023 21:19:45 +0800 Subject: [PATCH 03/14] add tests Signed-off-by: CalvinNeo --- Cargo.lock | 1 + proxy_components/proxy_ffi/Cargo.toml | 1 + .../proxy_ffi/src/domain_impls.rs | 62 ++++++++++++++++++- .../sst_reader_dispatcher.rs | 4 +- .../snapshot_reader_impls/tablet_reader.rs | 4 +- .../proxy/v2_compat/tablet_snapshot.rs | 25 ++++++-- 6 files changed, 86 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0997dba11b6..82c01b5a8bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4674,6 +4674,7 @@ dependencies = [ name = "proxy_ffi" version = "0.0.1" dependencies = [ + "collections", "encryption", "engine_rocks", "engine_test", diff --git a/proxy_components/proxy_ffi/Cargo.toml b/proxy_components/proxy_ffi/Cargo.toml index efbad494de5..fcf4b4ce04a 100644 --- a/proxy_components/proxy_ffi/Cargo.toml +++ b/proxy_components/proxy_ffi/Cargo.toml @@ -46,6 +46,7 @@ tokio-timer = { git = "https://github.com/tikv/tokio", branch = "tokio-timer-hot tracker = { workspace = true, default-features = false } reqwest = { version = "0.11", features = ["blocking"] } url = "2.4.0" +collections = { workspace = true } [dependencies.rocksdb] git = "https://github.com/tikv/rust-rocksdb.git" diff --git a/proxy_components/proxy_ffi/src/domain_impls.rs b/proxy_components/proxy_ffi/src/domain_impls.rs index 22b20f02d09..f636fbc4f6f 100644 --- a/proxy_components/proxy_ffi/src/domain_impls.rs +++ b/proxy_components/proxy_ffi/src/domain_impls.rs @@ -202,6 +202,58 @@ struct RustBaseBuffVecInner { buff_view_vec: Pin>>, } +#[derive(Default)] +pub struct TestGcObjectMonitor { + rust: std::sync::Mutex>, +} + +impl TestGcObjectMonitor { + pub fn add_rust(&self, t: &interfaces_ffi::RawRustPtrType, x: isize) { + use std::collections::hash_map::Entry; + let data = &mut *self.rust.lock().unwrap(); + match data.entry(*t) { + Entry::Occupied(mut v) => { + *v.get_mut() += x; + } + Entry::Vacant(v) => { + v.insert(x); + } + } + } + pub fn valid_clean_rust(&self) -> bool { + let data = &*self.rust.lock().unwrap(); + for (k, v) in data { + if *v != 0 { + tikv_util::error!( + "TestGcObjectMonitor::valid_clean failed at type {} refcount {}", + k, + v + ); + return false; + } + } + return true; + } + pub fn is_empty_rust(&self) -> bool { + let data = &*self.rust.lock().unwrap(); + data.is_empty() + } +} + +#[cfg(any(test, feature = "testexport"))] +lazy_static::lazy_static! { + pub static ref TEST_GC_OBJ_MONITOR: TestGcObjectMonitor = TestGcObjectMonitor::default(); +} + +impl Drop for RustBaseBuffVecInner { + fn drop(&mut self) { + #[cfg(any(test, feature = "testexport"))] + { + TEST_GC_OBJ_MONITOR.add_rust(&RawRustPtrType::VecOfString.into(), -1); + } + } +} + pub fn build_from_vec_string(s: Vec>) -> RustBaseBuffVec { let vec_len = s.len(); let vec_len_64: u64 = vec_len as u64; @@ -213,12 +265,16 @@ pub fn build_from_vec_string(s: Vec>) -> RustBaseBuffVec { len: inner_vec_of_string[i].len() as u64, }); } - let inner = RustBaseBuffVecInner { + let inner = Box::new(RustBaseBuffVecInner { _data: inner_vec_of_string, buff_view_vec: Box::pin(buff_view_vec), - }; + }); + #[cfg(any(test, feature = "testexport"))] + { + TEST_GC_OBJ_MONITOR.add_rust(&RawRustPtrType::VecOfString.into(), 1); + } let inner_wrapped = RawRustPtr { - ptr: &inner as *const _ as RawVoidPtr, + ptr: inner.as_ref() as *const _ as RawVoidPtr, type_: RawRustPtrType::VecOfString.into(), }; let buff_view_vec_ptr = inner.buff_view_vec.as_ptr(); diff --git a/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs b/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs index d23ef0da54b..585879b2b46 100644 --- a/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs +++ b/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs @@ -4,8 +4,8 @@ use super::{sst_file_reader::*, tablet_reader::TabletReader, LockCFFileReader}; use crate::{ interfaces_ffi::{ - BaseBuffView, ColumnFamilyType, EngineIteratorSeekType, RaftStoreProxyPtr, RawRustPtr, - RustBaseBuffVec, SSTFormatKind, SSTReaderInterfaces, SSTReaderPtr, SSTView, + BaseBuffView, ColumnFamilyType, EngineIteratorSeekType, RaftStoreProxyPtr, RustBaseBuffVec, + SSTFormatKind, SSTReaderInterfaces, SSTReaderPtr, SSTView, }, raftstore_proxy_helper_impls::RaftStoreProxyFFI, }; diff --git a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs index f2d39f037ed..7fded3f5eb4 100644 --- a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs +++ b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs @@ -8,8 +8,8 @@ use engine_traits::{Iterable, Iterator, RangePropertiesExt, CF_WRITE}; use crate::{ build_from_vec_string, cf_to_name, interfaces_ffi::{ - BaseBuffView, ColumnFamilyType, EngineIteratorSeekType, RawRustPtr, RustBaseBuffVec, - SSTFormatKind, SSTReaderPtr, + BaseBuffView, ColumnFamilyType, EngineIteratorSeekType, RustBaseBuffVec, SSTFormatKind, + SSTReaderPtr, }, }; diff --git a/proxy_tests/proxy/v2_compat/tablet_snapshot.rs b/proxy_tests/proxy/v2_compat/tablet_snapshot.rs index 9e7af8f4d26..2a8e040e4be 100644 --- a/proxy_tests/proxy/v2_compat/tablet_snapshot.rs +++ b/proxy_tests/proxy/v2_compat/tablet_snapshot.rs @@ -12,6 +12,8 @@ use mock_engine_store::{ interfaces_ffi::BaseBuffView, mock_cluster::v1::server::new_server_cluster, }; use proxy_ffi::{ + domain_impls::TEST_GC_OBJ_MONITOR, + ffi_gc_rust_ptr, interfaces_ffi::{ColumnFamilyType, EngineIteratorSeekType}, snapshot_reader_impls::{tablet_reader::TabletReader, *}, }; @@ -141,18 +143,33 @@ fn test_get_snapshot_split_keys() { None, ); // If we want to split the range into 2 parts, it should return 1 split key. - let res = ffi_get_split_keys(reader.clone(), 4); - assert_eq!(res.len, 3); + let split_count = 4; + let res = ffi_get_split_keys(reader.clone(), split_count); + let maximum = format!("zk{:06}", key_count); + let minimum = format!("zk{:06}", 0); + tikv_util::debug!("minimum split key is {}", minimum); + tikv_util::debug!("maximum split key is {}", maximum); for i in 0..res.len { let buff = res.buffs.add(i as usize); let slice = (*buff).to_slice(); - let maximum = format!("zk{:06}", key_count); - let minimum = format!("zk{:06}", 0); + // If the snapshot is too small, it will provide worse and less split keys than + // we want. For example, given a 10000 key snapshot, it generates: + // 4-keys is zk000000,zk004065,zk008130,zk009999 + // 3-keys is zk004065,zk008130,zk009999 + // 2-keys is zk004065,zk008130 + tikv_util::debug!( + "the {}-th split key is {}", + i, + std::str::from_utf8_unchecked(slice) + ); // We don't return the boundary. assert!(slice > minimum.as_bytes()); assert!(slice < maximum.as_bytes()); } + assert_eq!(res.len, split_count - 1); + ffi_gc_rust_ptr(res.inner.ptr, res.inner.type_); } + assert!(TEST_GC_OBJ_MONITOR.valid_clean_rust()); cluster_v1.shutdown(); cluster_v2.shutdown(); From fb960a5bd83e5d7a243819e6f2c78f5bb81877cd Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Sep 2023 12:56:42 +0800 Subject: [PATCH 04/14] update version Signed-off-by: CalvinNeo --- proxy_components/proxy_ffi/src/interfaces.rs | 2 +- raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy_components/proxy_ffi/src/interfaces.rs b/proxy_components/proxy_ffi/src/interfaces.rs index eff13b36435..1c528dd63b8 100644 --- a/proxy_components/proxy_ffi/src/interfaces.rs +++ b/proxy_components/proxy_ffi/src/interfaces.rs @@ -678,7 +678,7 @@ pub mod root { arg3: root::DB::RawVoidPtr, ) -> u32; } - pub const RAFT_STORE_PROXY_VERSION: u64 = 1159357061809427070; + pub const RAFT_STORE_PROXY_VERSION: u64 = 16886254392474956029; pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639; } } diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version index eaed969197b..663c96e4645 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version @@ -1,3 +1,3 @@ #pragma once #include -namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 1159357061809427070ull; } \ No newline at end of file +namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 16886254392474956029ull; } \ No newline at end of file From 3c661f48c3bdede0b0774ec47f3fba9c11464bb0 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Sep 2023 14:34:26 +0800 Subject: [PATCH 05/14] f Signed-off-by: CalvinNeo --- Cargo.lock | 2 ++ proxy_scripts/ci_check.sh | 2 +- proxy_tests/Cargo.toml | 2 ++ proxy_tests/proxy/shared/config.rs | 16 ++++++++++++++++ 4 files changed, 21 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 82c01b5a8bc..363bfb3189a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4836,6 +4836,8 @@ dependencies = [ "rand_xorshift", "resource_metering", "security", + "serde", + "serde_derive", "serde_json", "slog", "slog-global", diff --git a/proxy_scripts/ci_check.sh b/proxy_scripts/ci_check.sh index aed2be3ba64..b12451e1182 100755 --- a/proxy_scripts/ci_check.sh +++ b/proxy_scripts/ci_check.sh @@ -40,7 +40,7 @@ elif [[ $M == "testnew" ]]; then cargo test --package proxy_tests --features="$ENABLE_FEATURES" --test proxy shared::write cargo test --package proxy_tests --features="$ENABLE_FEATURES" --test proxy shared::snapshot cargo test --package proxy_tests --features="$ENABLE_FEATURES" --test proxy shared::normal::store - cargo test --package proxy_tests --features="$ENABLE_FEATURES" --test proxy shared::normal::config + cargo test --package proxy_tests --features="$ENABLE_FEATURES" --test proxy shared::config cargo test --package proxy_tests --features="$ENABLE_FEATURES" --test proxy shared::normal::restart cargo test --package proxy_tests --features="$ENABLE_FEATURES" --test proxy shared::normal::persist cargo test --package proxy_tests --features="$ENABLE_FEATURES" --test proxy shared::ingest diff --git a/proxy_tests/Cargo.toml b/proxy_tests/Cargo.toml index 24ecebd13ea..d0b0e65339f 100644 --- a/proxy_tests/Cargo.toml +++ b/proxy_tests/Cargo.toml @@ -84,6 +84,8 @@ raft_log_engine = { workspace = true } raftstore = { workspace = true } raftstore-v2 = { workspace = true } rand = "0.8.3" +serde = "1.0" +serde_derive = "1.0" slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" } tempfile = "3.0" diff --git a/proxy_tests/proxy/shared/config.rs b/proxy_tests/proxy/shared/config.rs index 209c1a26ac7..58d3b004492 100644 --- a/proxy_tests/proxy/shared/config.rs +++ b/proxy_tests/proxy/shared/config.rs @@ -73,6 +73,22 @@ fn test_default_no_config_item() { let mut proxy_config = gen_proxy_config(&cpath, false, &mut v); overwrite_config_with_cmd_args(&mut config, &mut proxy_config, &matches); address_proxy_config(&mut config, &proxy_config); + use serde_derive::{Deserialize, Serialize}; + use serde_json::{Map, Value}; + #[derive(Serialize, Deserialize)] + struct Data { + inner: Map, + } + let json_format_proxy = serde_json::to_string(&proxy_config).unwrap(); + let parsed_json_proxy: Data = serde_json::from_str(json_format_proxy.as_str()).unwrap(); + { + let raft_store = parsed_json_proxy.inner.get("raft_store").unwrap(); + let snap_handle_pool_size = raft_store.get("snap_handle_pool_size").unwrap(); + assert_eq!( + proxy_config.raft_store.snap_handle_pool_size as u64, + snap_handle_pool_size.as_u64().unwrap() + ); + } let total_mem = SysQuota::memory_limit_in_bytes(); let cpu_num = SysQuota::cpu_cores_quota(); From 47e3ecac13432535a0f06f7ae37520e08d30c517 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Sep 2023 17:21:00 +0800 Subject: [PATCH 06/14] return proxy's config to tiflash Signed-off-by: CalvinNeo --- Cargo.lock | 2 + proxy_components/mock-engine-store/Cargo.toml | 1 + .../src/mock_cluster/cluster_ext.rs | 5 + proxy_components/proxy_ffi/Cargo.toml | 1 + .../proxy_ffi/src/domain_impls.rs | 106 +++++++++++++----- proxy_components/proxy_ffi/src/interfaces.rs | 18 ++- .../proxy_ffi/src/raftstore_proxy.rs | 8 ++ .../src/raftstore_proxy_helper_impls.rs | 19 +++- .../sst_reader_dispatcher.rs | 8 +- .../snapshot_reader_impls/tablet_reader.rs | 8 +- proxy_components/proxy_server/src/run.rs | 6 +- proxy_tests/proxy/shared/config.rs | 14 +-- proxy_tests/proxy/shared/ffi.rs | 19 +++- .../ffi/src/RaftStoreProxyFFI/@version | 2 +- .../ffi/src/RaftStoreProxyFFI/ProxyFFI.h | 16 ++- 15 files changed, 181 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 363bfb3189a..9af204564a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3587,6 +3587,7 @@ dependencies = [ "resource_control", "resource_metering", "security", + "serde_json", "server", "service", "slog", @@ -4691,6 +4692,7 @@ dependencies = [ "raftstore", "reqwest", "rocksdb", + "serde_json", "slog", "slog-global", "tikv_util", diff --git a/proxy_components/mock-engine-store/Cargo.toml b/proxy_components/mock-engine-store/Cargo.toml index 7a9931b807c..0328f80f3fc 100644 --- a/proxy_components/mock-engine-store/Cargo.toml +++ b/proxy_components/mock-engine-store/Cargo.toml @@ -68,6 +68,7 @@ rand = "0.8" resolved_ts = { workspace = true } resource_control = { workspace = true } resource_metering = { workspace = true } +serde_json = "1.0" service = { workspace = true } security = { workspace = true, default-features = false } server = { workspace = true } diff --git a/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs b/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs index 2663fbb86bb..caa0ab6cf53 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs @@ -80,8 +80,11 @@ impl ClusterExt { cluster_ext_ptr: isize, mock_cfg: MockConfig, pd_client: Option>, + proxy_cfg: &ProxyConfig, ) -> (FFIHelperSet, TikvConfig) { // We must allocate on heap to avoid move. + + let proxy_config_str = serde_json::to_string(proxy_cfg).unwrap_or(String::new()); let proxy = Box::new(engine_store_ffi::ffi::RaftStoreProxy::new( AtomicU8::new(RaftProxyStatus::Idle as u8), key_mgr.clone(), @@ -96,6 +99,7 @@ impl ClusterExt { }, None, pd_client, + proxy_config_str, )); let proxy_ref = proxy.as_ref(); @@ -163,6 +167,7 @@ impl ClusterExt { cluster_ext_ptr, mock_cfg, pd_client, + proxy_cfg, ); // We can not use moved or cloned engines any more. diff --git a/proxy_components/proxy_ffi/Cargo.toml b/proxy_components/proxy_ffi/Cargo.toml index fcf4b4ce04a..6b32f62c7f8 100644 --- a/proxy_components/proxy_ffi/Cargo.toml +++ b/proxy_components/proxy_ffi/Cargo.toml @@ -38,6 +38,7 @@ lazy_static = "1.3" protobuf = { version = "2.8", features = ["bytes"] } pd_client = { workspace = true } raftstore = { workspace = true, default-features = false } +serde_json = "1.0" slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] } slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" } tikv_util = { workspace = true, default-features = false } diff --git a/proxy_components/proxy_ffi/src/domain_impls.rs b/proxy_components/proxy_ffi/src/domain_impls.rs index f636fbc4f6f..aa3519eeee2 100644 --- a/proxy_components/proxy_ffi/src/domain_impls.rs +++ b/proxy_components/proxy_ffi/src/domain_impls.rs @@ -7,8 +7,8 @@ use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE}; use super::{ interfaces_ffi, interfaces_ffi::{ - BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, RustBaseBuffVec, - SSTView, SSTViewVec, WriteCmdType, WriteCmdsView, + BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, RustStrWithView, + RustStrWithViewVec, SSTView, SSTViewVec, WriteCmdType, WriteCmdsView, }, read_index_helper, utils, }; @@ -131,7 +131,8 @@ pub enum RawRustPtrType { ReadIndexTask = 1, ArcFutureWaker = 2, TimerTask = 3, - VecOfString = 4, + String = 4, + VecOfString = 5, } impl From for RawRustPtrType { @@ -163,8 +164,11 @@ pub extern "C" fn ffi_gc_rust_ptr(data: RawVoidPtr, type_: interfaces_ffi::RawRu RawRustPtrType::TimerTask => unsafe { drop(Box::from_raw(data as *mut utils::TimerTask)); }, + RawRustPtrType::String => unsafe { + drop(Box::from_raw(data as *mut RustStrWithViewInner)); + }, RawRustPtrType::VecOfString => unsafe { - drop(Box::from_raw(data as *mut RustBaseBuffVecInner)); + drop(Box::from_raw(data as *mut RustStrWithViewVecInner)); }, _ => unreachable!(), } @@ -185,23 +189,6 @@ impl RawRustPtr { } } -impl Default for RustBaseBuffVec { - fn default() -> Self { - RustBaseBuffVec { - buffs: std::ptr::null_mut(), - len: 0, - inner: RawRustPtr::default(), - } - } -} - -struct RustBaseBuffVecInner { - // Hold the Vec of String. - _data: Pin>>>, - // Hold the BaseBuffView array. - buff_view_vec: Pin>>, -} - #[derive(Default)] pub struct TestGcObjectMonitor { rust: std::sync::Mutex>, @@ -245,7 +232,24 @@ lazy_static::lazy_static! { pub static ref TEST_GC_OBJ_MONITOR: TestGcObjectMonitor = TestGcObjectMonitor::default(); } -impl Drop for RustBaseBuffVecInner { +impl Default for RustStrWithViewVec { + fn default() -> Self { + RustStrWithViewVec { + buffs: std::ptr::null_mut(), + len: 0, + inner: RawRustPtr::default(), + } + } +} + +struct RustStrWithViewVecInner { + // Hold the Vec of String. + _data: Pin>>>, + // Hold the BaseBuffView array. + buff_view_vec: Pin>>, +} + +impl Drop for RustStrWithViewVecInner { fn drop(&mut self) { #[cfg(any(test, feature = "testexport"))] { @@ -254,7 +258,7 @@ impl Drop for RustBaseBuffVecInner { } } -pub fn build_from_vec_string(s: Vec>) -> RustBaseBuffVec { +pub fn build_from_vec_string(s: Vec>) -> RustStrWithViewVec { let vec_len = s.len(); let vec_len_64: u64 = vec_len as u64; let inner_vec_of_string = Box::pin(s); @@ -265,7 +269,7 @@ pub fn build_from_vec_string(s: Vec>) -> RustBaseBuffVec { len: inner_vec_of_string[i].len() as u64, }); } - let inner = Box::new(RustBaseBuffVecInner { + let inner = Box::new(RustStrWithViewVecInner { _data: inner_vec_of_string, buff_view_vec: Box::pin(buff_view_vec), }); @@ -280,9 +284,61 @@ pub fn build_from_vec_string(s: Vec>) -> RustBaseBuffVec { let buff_view_vec_ptr = inner.buff_view_vec.as_ptr(); std::mem::forget(inner); - RustBaseBuffVec { + RustStrWithViewVec { buffs: buff_view_vec_ptr, len: vec_len_64, inner: inner_wrapped, } } + +impl Default for RustStrWithView { + fn default() -> Self { + RustStrWithView { + buff: BaseBuffView { + data: std::ptr::null(), + len: 0, + }, + inner: RawRustPtr::default(), + } + } +} + +struct RustStrWithViewInner { + // Hold the String. + _data: Pin>>, +} + +impl Drop for RustStrWithViewInner { + fn drop(&mut self) { + #[cfg(any(test, feature = "testexport"))] + { + TEST_GC_OBJ_MONITOR.add_rust(&RawRustPtrType::String.into(), -1); + } + } +} + +pub fn build_from_string(s: Vec) -> RustStrWithView { + let str_len = s.len(); + let inner_string = Box::pin(s); + let buff = BaseBuffView { + data: inner_string.as_ptr() as *const _, + len: str_len as u64, + }; + let inner = Box::new(RustStrWithViewInner { + _data: inner_string, + }); + #[cfg(any(test, feature = "testexport"))] + { + TEST_GC_OBJ_MONITOR.add_rust(&RawRustPtrType::String.into(), 1); + } + let inner_wrapped = RawRustPtr { + ptr: inner.as_ref() as *const _ as RawVoidPtr, + type_: RawRustPtrType::String.into(), + }; + std::mem::forget(inner); + + RustStrWithView { + buff, + inner: inner_wrapped, + } +} diff --git a/proxy_components/proxy_ffi/src/interfaces.rs b/proxy_components/proxy_ffi/src/interfaces.rs index 1c528dd63b8..b688d62697b 100644 --- a/proxy_components/proxy_ffi/src/interfaces.rs +++ b/proxy_components/proxy_ffi/src/interfaces.rs @@ -234,7 +234,13 @@ pub mod root { } #[repr(C)] #[derive(Debug)] - pub struct RustBaseBuffVec { + pub struct RustStrWithView { + pub buff: root::DB::BaseBuffView, + pub inner: root::DB::RawRustPtr, + } + #[repr(C)] + #[derive(Debug)] + pub struct RustStrWithViewVec { pub buffs: *const root::DB::BaseBuffView, pub len: u64, pub inner: root::DB::RawRustPtr, @@ -302,7 +308,7 @@ pub mod root { unsafe extern "C" fn( arg1: root::DB::SSTReaderPtr, splits_count: u64, - ) -> root::DB::RustBaseBuffVec, + ) -> root::DB::RustStrWithViewVec, >, } #[repr(u32)] @@ -453,6 +459,12 @@ pub mod root { applied_index: u64, ), >, + pub fn_get_config_json: ::std::option::Option< + unsafe extern "C" fn( + arg1: root::DB::RaftStoreProxyPtr, + kind: u64, + ) -> root::DB::RustStrWithView, + >, } #[repr(C)] #[derive(Debug)] @@ -678,7 +690,7 @@ pub mod root { arg3: root::DB::RawVoidPtr, ) -> u32; } - pub const RAFT_STORE_PROXY_VERSION: u64 = 16886254392474956029; + pub const RAFT_STORE_PROXY_VERSION: u64 = 5692329170612304456; pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639; } } diff --git a/proxy_components/proxy_ffi/src/raftstore_proxy.rs b/proxy_components/proxy_ffi/src/raftstore_proxy.rs index 3e1000ba6c6..35989ca4b49 100644 --- a/proxy_components/proxy_ffi/src/raftstore_proxy.rs +++ b/proxy_components/proxy_ffi/src/raftstore_proxy.rs @@ -29,6 +29,7 @@ pub struct RaftStoreProxy { raftstore_proxy_engine: RwLock>, pd_client: Option>, cluster_raftstore_ver: RwLock, + proxy_config_str: String, } impl RaftStoreProxy { @@ -38,6 +39,7 @@ impl RaftStoreProxy { read_index_client: Option>, raftstore_proxy_engine: Option, pd_client: Option>, + proxy_config_str: String, ) -> Self { RaftStoreProxy { status, @@ -46,6 +48,7 @@ impl RaftStoreProxy { raftstore_proxy_engine: RwLock::new(raftstore_proxy_engine), pd_client, cluster_raftstore_ver: RwLock::new(RaftstoreVer::Uncertain), + proxy_config_str, } } } @@ -378,6 +381,11 @@ impl RaftStoreProxy { unreachable!() } } + + // TODO may be we can later move ProxyConfig to proxy_ffi. + pub fn get_proxy_config_str(&self) -> &String { + &self.proxy_config_str + } } pub trait RaftStoreProxyEngineTrait { diff --git a/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs b/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs index b8fd541d2d6..03df46773a8 100644 --- a/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs +++ b/proxy_components/proxy_ffi/src/raftstore_proxy_helper_impls.rs @@ -22,7 +22,7 @@ use super::{ interfaces_ffi::{ BaseBuffView, CppStrVecView, KVGetStatus, RaftProxyStatus, RaftStoreProxyFFIHelper, RaftStoreProxyPtr, RaftstoreVer, RawCppPtr, RawCppStringPtr, RawRustPtr, RawVoidPtr, - SSTReaderInterfaces, + RustStrWithView, SSTReaderInterfaces, }, read_index_helper, snapshot_reader_impls::*, @@ -88,6 +88,7 @@ impl RaftStoreProxyFFIHelper { fn_get_region_local_state: Some(ffi_get_region_local_state), fn_get_cluster_raftstore_version: Some(ffi_get_cluster_raftstore_version), fn_notify_compact_log: Some(ffi_notify_compact_log), + fn_get_config_json: Some(ffi_get_config_json), } } } @@ -298,3 +299,19 @@ pub unsafe extern "C" fn ffi_poll_timer_task(task_ptr: RawVoidPtr, waker: RawVoi 0 } } + +pub unsafe extern "C" fn ffi_get_config_json( + proxy_ptr: RaftStoreProxyPtr, + _kind: u64, +) -> RustStrWithView { + let s = proxy_ptr + .as_ref() + .get_proxy_config_str() + .as_bytes() + .to_owned(); + if s.is_empty() { + RustStrWithView::default() + } else { + build_from_string(s) + } +} diff --git a/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs b/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs index 585879b2b46..7cb5e7dbf38 100644 --- a/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs +++ b/proxy_components/proxy_ffi/src/snapshot_reader_impls/sst_reader_dispatcher.rs @@ -4,8 +4,8 @@ use super::{sst_file_reader::*, tablet_reader::TabletReader, LockCFFileReader}; use crate::{ interfaces_ffi::{ - BaseBuffView, ColumnFamilyType, EngineIteratorSeekType, RaftStoreProxyPtr, RustBaseBuffVec, - SSTFormatKind, SSTReaderInterfaces, SSTReaderPtr, SSTView, + BaseBuffView, ColumnFamilyType, EngineIteratorSeekType, RaftStoreProxyPtr, + RustStrWithViewVec, SSTFormatKind, SSTReaderInterfaces, SSTReaderPtr, SSTView, }, raftstore_proxy_helper_impls::RaftStoreProxyFFI, }; @@ -204,9 +204,9 @@ pub unsafe extern "C" fn ffi_approx_size(mut reader: SSTReaderPtr, type_: Column pub unsafe extern "C" fn ffi_get_split_keys( mut reader: SSTReaderPtr, splits_count: u64, -) -> RustBaseBuffVec { +) -> RustStrWithViewVec { match reader.kind { - SSTFormatKind::KIND_SST => RustBaseBuffVec::default(), + SSTFormatKind::KIND_SST => RustStrWithViewVec::default(), SSTFormatKind::KIND_TABLET => reader.as_mut_tablet().ffi_get_split_keys(splits_count), } } diff --git a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs index 7fded3f5eb4..d1ffa254948 100644 --- a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs +++ b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs @@ -8,7 +8,7 @@ use engine_traits::{Iterable, Iterator, RangePropertiesExt, CF_WRITE}; use crate::{ build_from_vec_string, cf_to_name, interfaces_ffi::{ - BaseBuffView, ColumnFamilyType, EngineIteratorSeekType, RustBaseBuffVec, SSTFormatKind, + BaseBuffView, ColumnFamilyType, EngineIteratorSeekType, RustStrWithViewVec, SSTFormatKind, SSTReaderPtr, }, }; @@ -134,7 +134,7 @@ impl TabletReader { v[0] } - pub fn ffi_get_split_keys(&self, splits_count: u64) -> RustBaseBuffVec { + pub fn ffi_get_split_keys(&self, splits_count: u64) -> RustStrWithViewVec { let range = engine_traits::Range { start_key: b"", end_key: keys::DATA_MAX_KEY, @@ -147,13 +147,13 @@ impl TabletReader { { Ok(r) => { if r.len() < 1 { - return RustBaseBuffVec::default(); + return RustStrWithViewVec::default(); } build_from_vec_string(r) } Err(e) => { tikv_util::info!("ffi_get_split_keys failed due to {:?}", e); - RustBaseBuffVec::default() + RustStrWithViewVec::default() } } } diff --git a/proxy_components/proxy_server/src/run.rs b/proxy_components/proxy_server/src/run.rs index 774349f62ab..b3b4d1771bd 100644 --- a/proxy_components/proxy_server/src/run.rs +++ b/proxy_components/proxy_server/src/run.rs @@ -123,6 +123,7 @@ pub fn run_impl( engine_store_server_helper: &EngineStoreServerHelper, ) { let (service_event_tx, service_event_rx) = tikv_util::mpsc::unbounded(); // pipe for controling service + let proxy_config_str = serde_json::to_string(&proxy_config).unwrap_or(String::new()); let engine_store_server_helper_ptr = engine_store_server_helper as *const _ as isize; let mut tikv = TiKvServer::::init( config, @@ -150,6 +151,7 @@ pub fn run_impl( ))), None, Some(tikv.pd_client.clone()), + proxy_config_str, ); info!("start probing cluster's raftstore version"); // We wait for a maximum of 10 seconds for every store. @@ -272,7 +274,7 @@ pub fn run_impl( #[inline] fn run_impl_only_for_decryption( config: TikvConfig, - _proxy_config: ProxyConfig, + proxy_config: ProxyConfig, engine_store_server_helper: &EngineStoreServerHelper, ) { let encryption_key_manager = @@ -287,12 +289,14 @@ fn run_impl_only_for_decryption( .unwrap() .map(Arc::new); + let proxy_config_str = serde_json::to_string(&proxy_config).unwrap_or(String::new()); let mut proxy = RaftStoreProxy::new( AtomicU8::new(RaftProxyStatus::Idle as u8), encryption_key_manager.clone(), Option::None, None, None, + proxy_config_str, ); let proxy_ref = &proxy; diff --git a/proxy_tests/proxy/shared/config.rs b/proxy_tests/proxy/shared/config.rs index 58d3b004492..654df21a36b 100644 --- a/proxy_tests/proxy/shared/config.rs +++ b/proxy_tests/proxy/shared/config.rs @@ -75,15 +75,15 @@ fn test_default_no_config_item() { address_proxy_config(&mut config, &proxy_config); use serde_derive::{Deserialize, Serialize}; use serde_json::{Map, Value}; - #[derive(Serialize, Deserialize)] - struct Data { - inner: Map, - } let json_format_proxy = serde_json::to_string(&proxy_config).unwrap(); - let parsed_json_proxy: Data = serde_json::from_str(json_format_proxy.as_str()).unwrap(); + let parsed_json_proxy: Value = serde_json::from_str(json_format_proxy.as_str()).unwrap(); { - let raft_store = parsed_json_proxy.inner.get("raft_store").unwrap(); - let snap_handle_pool_size = raft_store.get("snap_handle_pool_size").unwrap(); + let raft_store = parsed_json_proxy + .as_object() + .expect("fail") + .get("raftstore") + .unwrap(); + let snap_handle_pool_size = raft_store.get("snap-handle-pool-size").unwrap(); assert_eq!( proxy_config.raft_store.snap_handle_pool_size as u64, snap_handle_pool_size.as_u64().unwrap() diff --git a/proxy_tests/proxy/shared/ffi.rs b/proxy_tests/proxy/shared/ffi.rs index ca5a0ccc49f..3759ff2c344 100644 --- a/proxy_tests/proxy/shared/ffi.rs +++ b/proxy_tests/proxy/shared/ffi.rs @@ -1,11 +1,12 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. use engine_store_ffi::ffi::{ - get_engine_store_server_helper, - interfaces_ffi::{RawCppPtr, RawCppPtrArr, RawCppPtrTuple, RawVoidPtr}, - UnwrapExternCFunc, + ffi_gc_rust_ptr, get_engine_store_server_helper, + interfaces_ffi::{RawCppPtr, RawCppPtrArr, RawCppPtrTuple, RawVoidPtr, RustStrWithView}, + UnwrapExternCFunc, TEST_GC_OBJ_MONITOR, }; use mock_engine_store::{mock_cluster::init_global_ffi_helper_set, mock_store::RawCppPtrTypeImpl}; +use proxy_ffi::build_from_string; #[test] fn test_tuple_of_raw_cpp_ptr() { @@ -96,3 +97,15 @@ fn test_carray_of_raw_cpp_ptr() { ); } } + +#[test] +fn test_rust_owned_objects() { + let s = String::from("hello"); + let sv: Vec = s.as_bytes().to_owned(); + let rs: RustStrWithView = build_from_string(sv.clone()); + assert_eq!(rs.buff.to_slice(), s.as_bytes()); + ffi_gc_rust_ptr(rs.inner.ptr, rs.inner.type_); + let df = RustStrWithView::default(); + ffi_gc_rust_ptr(df.inner.ptr, df.inner.type_); + assert!(TEST_GC_OBJ_MONITOR.valid_clean_rust()); +} diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version index 663c96e4645..a4bd39e8ae5 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version @@ -1,3 +1,3 @@ #pragma once #include -namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 16886254392474956029ull; } \ No newline at end of file +namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 5692329170612304456ull; } \ No newline at end of file diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h index 7551ce886f9..126e187d35e 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h @@ -174,11 +174,20 @@ struct SSTReaderPtr { SSTFormatKind kind; }; -struct RustBaseBuffVec { - // It is a view of something inside `inner_vec_of_string`. +struct RustStrWithView { + // It is a view of something inside `inner`. + BaseBuffView buff; + // Should never be accessed on C++ side. + // Call fn_gc_rust_ptr to this after `buffs` is no longer used. + RawRustPtr inner; +}; + +struct RustStrWithViewVec { + // It is a view of something inside `inner`. const BaseBuffView *buffs; // It is the length of view. uint64_t len; + // Should never be accessed on C++ side. // Call fn_gc_rust_ptr to this after `buffs` is no longer used. RawRustPtr inner; }; @@ -194,7 +203,7 @@ struct SSTReaderInterfaces { void (*fn_seek)(SSTReaderPtr, ColumnFamilyType, EngineIteratorSeekType, BaseBuffView); uint64_t (*fn_approx_size)(SSTReaderPtr, ColumnFamilyType); - RustBaseBuffVec (*fn_get_split_keys)(SSTReaderPtr, uint64_t splits_count); + RustStrWithViewVec (*fn_get_split_keys)(SSTReaderPtr, uint64_t splits_count); }; enum class MsgPBType : uint32_t { @@ -259,6 +268,7 @@ struct RaftStoreProxyFFIHelper { void (*fn_notify_compact_log)(RaftStoreProxyPtr, uint64_t region_id, uint64_t compact_index, uint64_t compact_term, uint64_t applied_index); + RustStrWithView (*fn_get_config_json)(RaftStoreProxyPtr, uint64_t kind); }; struct PageStorageInterfaces { From d5aa4dd4ae95f66e50c3a00acefc8a21de90d959 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Mon, 18 Sep 2023 23:09:31 +0800 Subject: [PATCH 07/14] f Signed-off-by: CalvinNeo --- proxy_components/proxy_ffi/src/domain_impls.rs | 4 +++- .../proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/proxy_components/proxy_ffi/src/domain_impls.rs b/proxy_components/proxy_ffi/src/domain_impls.rs index aa3519eeee2..aa9300fb598 100644 --- a/proxy_components/proxy_ffi/src/domain_impls.rs +++ b/proxy_components/proxy_ffi/src/domain_impls.rs @@ -219,7 +219,7 @@ impl TestGcObjectMonitor { return false; } } - return true; + true } pub fn is_empty_rust(&self) -> bool { let data = &*self.rust.lock().unwrap(); @@ -244,6 +244,7 @@ impl Default for RustStrWithViewVec { struct RustStrWithViewVecInner { // Hold the Vec of String. + #[allow(clippy::box_collection)] _data: Pin>>>, // Hold the BaseBuffView array. buff_view_vec: Pin>>, @@ -305,6 +306,7 @@ impl Default for RustStrWithView { struct RustStrWithViewInner { // Hold the String. + #[allow(clippy::box_collection)] _data: Pin>>, } diff --git a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs index d1ffa254948..f07aa84f3bc 100644 --- a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs +++ b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs @@ -146,7 +146,7 @@ impl TabletReader { .get_range_approximate_split_keys_cf(CF_WRITE, range, keys_count) { Ok(r) => { - if r.len() < 1 { + if r.is_empty() { return RustStrWithViewVec::default(); } build_from_vec_string(r) From 1391d7115cd3085bf15219f84b1c7d0e6d46f2d1 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 21 Sep 2023 11:10:36 +0800 Subject: [PATCH 08/14] f Signed-off-by: CalvinNeo --- proxy_components/proxy_ffi/src/domain_impls.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/proxy_components/proxy_ffi/src/domain_impls.rs b/proxy_components/proxy_ffi/src/domain_impls.rs index aa9300fb598..cb934c974ea 100644 --- a/proxy_components/proxy_ffi/src/domain_impls.rs +++ b/proxy_components/proxy_ffi/src/domain_impls.rs @@ -247,6 +247,7 @@ struct RustStrWithViewVecInner { #[allow(clippy::box_collection)] _data: Pin>>>, // Hold the BaseBuffView array. + #[allow(clippy::box_collection)] buff_view_vec: Pin>>, } From cc30d0f04c1b210c6663637d114ffa924b9ce1b6 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 21 Sep 2023 11:52:54 +0800 Subject: [PATCH 09/14] f2 Signed-off-by: CalvinNeo --- proxy_components/proxy_server/src/run.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy_components/proxy_server/src/run.rs b/proxy_components/proxy_server/src/run.rs index b3b4d1771bd..615114c097d 100644 --- a/proxy_components/proxy_server/src/run.rs +++ b/proxy_components/proxy_server/src/run.rs @@ -123,7 +123,7 @@ pub fn run_impl( engine_store_server_helper: &EngineStoreServerHelper, ) { let (service_event_tx, service_event_rx) = tikv_util::mpsc::unbounded(); // pipe for controling service - let proxy_config_str = serde_json::to_string(&proxy_config).unwrap_or(String::new()); + let proxy_config_str = serde_json::to_string(&proxy_config).unwrap_or_default(); let engine_store_server_helper_ptr = engine_store_server_helper as *const _ as isize; let mut tikv = TiKvServer::::init( config, @@ -289,7 +289,7 @@ fn run_impl_only_for_decryption( .unwrap() .map(Arc::new); - let proxy_config_str = serde_json::to_string(&proxy_config).unwrap_or(String::new()); + let proxy_config_str = serde_json::to_string(&proxy_config).unwrap_or_default(); let mut proxy = RaftStoreProxy::new( AtomicU8::new(RaftProxyStatus::Idle as u8), encryption_key_manager.clone(), From 7f2fbcf171d561828a751fc107048240e977084a Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 21 Sep 2023 17:18:07 +0800 Subject: [PATCH 10/14] fix seek Signed-off-by: CalvinNeo --- .../src/mock_cluster/cluster_ext.rs | 2 +- .../snapshot_reader_impls/tablet_reader.rs | 10 +++- .../proxy/v2_compat/tablet_snapshot.rs | 57 +++++++++++++++++++ 3 files changed, 67 insertions(+), 2 deletions(-) diff --git a/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs b/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs index caa0ab6cf53..527e04a47e4 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/cluster_ext.rs @@ -84,7 +84,7 @@ impl ClusterExt { ) -> (FFIHelperSet, TikvConfig) { // We must allocate on heap to avoid move. - let proxy_config_str = serde_json::to_string(proxy_cfg).unwrap_or(String::new()); + let proxy_config_str = serde_json::to_string(proxy_cfg).unwrap_or_default(); let proxy = Box::new(engine_store_ffi::ffi::RaftStoreProxy::new( AtomicU8::new(RaftProxyStatus::Idle as u8), key_mgr.clone(), diff --git a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs index f07aa84f3bc..925ef0761f5 100644 --- a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs +++ b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs @@ -115,10 +115,18 @@ impl TabletReader { } EngineIteratorSeekType::Last => { let _ = iter.seek_to_last(); + *self.remained.borrow_mut() = false; } EngineIteratorSeekType::Key => { let dk = keys::data_key(bf.to_slice()); - let _ = iter.seek(&dk); + match iter.seek(&dk) { + Ok(x) => { + *self.remained.borrow_mut() = x; + } + Err(_e) => { + *self.remained.borrow_mut() = false; + } + } } }; } diff --git a/proxy_tests/proxy/v2_compat/tablet_snapshot.rs b/proxy_tests/proxy/v2_compat/tablet_snapshot.rs index 2a8e040e4be..be76bc4e481 100644 --- a/proxy_tests/proxy/v2_compat/tablet_snapshot.rs +++ b/proxy_tests/proxy/v2_compat/tablet_snapshot.rs @@ -125,6 +125,63 @@ fn prepare_snapshot( .final_recv_path(&snap_key) } +#[test] +fn test_get_snapshot_seek() { + let mut cluster_v1 = new_server_cluster(1, 1); + let mut cluster_v2 = test_raftstore_v2::new_server_cluster(1, 1); + cluster_v1.cfg.raft_store.enable_v2_compatible_learner = true; + cluster_v1.run(); + cluster_v2.run(); + + let key_count = 100; + let path = prepare_snapshot(&mut cluster_v1, &mut cluster_v2, key_count); + + let reader = TabletReader::ffi_get_cf_file_reader( + path.as_path().to_str().unwrap(), + ColumnFamilyType::Write, + None, + ); + + unsafe { + let k = format!("k{:06}", 99); + let bf = BaseBuffView { + data: k.as_ptr() as *const _, + len: k.len() as u64, + }; + let cf = ColumnFamilyType::Write; + ffi_sst_reader_seek(reader.clone(), cf, EngineIteratorSeekType::Key, bf); + let remained = ffi_sst_reader_remained(reader.clone(), cf); + if remained == 1 { + ffi_sst_reader_next(reader.clone(), cf); + let remained = ffi_sst_reader_remained(reader.clone(), cf); + if remained == 1 { + ffi_sst_reader_key(reader.clone(), cf); + } + } + } + + unsafe { + let k = format!("k{:06}", 100); + let bf = BaseBuffView { + data: k.as_ptr() as *const _, + len: k.len() as u64, + }; + let cf = ColumnFamilyType::Write; + ffi_sst_reader_seek(reader.clone(), cf, EngineIteratorSeekType::Key, bf); + let remained = ffi_sst_reader_remained(reader.clone(), cf); + if remained == 1 { + ffi_sst_reader_next(reader.clone(), cf); + let remained = ffi_sst_reader_remained(reader.clone(), cf); + if remained == 1 { + ffi_sst_reader_key(reader.clone(), cf); + } + } + } + + cluster_v1.shutdown(); + cluster_v2.shutdown(); +} + #[test] fn test_get_snapshot_split_keys() { let mut cluster_v1 = new_server_cluster(1, 1); From 0822048cbe085d312fb771204e584e93607c3879 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 21 Sep 2023 21:37:17 +0800 Subject: [PATCH 11/14] z Signed-off-by: CalvinNeo --- .../proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs index 925ef0761f5..c096300be97 100644 --- a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs +++ b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs @@ -157,6 +157,10 @@ impl TabletReader { if r.is_empty() { return RustStrWithViewVec::default(); } + let truncated_string: Vec> = r + .into_iter() + .map(|s| keys::origin_key(&s).to_owned()) + .collect(); build_from_vec_string(r) } Err(e) => { From 51f92ec9a06d6dbff09afdb9578295a4bc045db7 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Thu, 21 Sep 2023 22:42:24 +0800 Subject: [PATCH 12/14] f Signed-off-by: CalvinNeo --- .../proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs index c096300be97..14af1ba5732 100644 --- a/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs +++ b/proxy_components/proxy_ffi/src/snapshot_reader_impls/tablet_reader.rs @@ -161,7 +161,7 @@ impl TabletReader { .into_iter() .map(|s| keys::origin_key(&s).to_owned()) .collect(); - build_from_vec_string(r) + build_from_vec_string(truncated_string) } Err(e) => { tikv_util::info!("ffi_get_split_keys failed due to {:?}", e); From f42796c8cd1636823941cb57ccc5954420e88b56 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Fri, 22 Sep 2023 12:50:28 +0800 Subject: [PATCH 13/14] a Signed-off-by: CalvinNeo --- proxy_tests/proxy/v2_compat/tablet_snapshot.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy_tests/proxy/v2_compat/tablet_snapshot.rs b/proxy_tests/proxy/v2_compat/tablet_snapshot.rs index be76bc4e481..b73929f1691 100644 --- a/proxy_tests/proxy/v2_compat/tablet_snapshot.rs +++ b/proxy_tests/proxy/v2_compat/tablet_snapshot.rs @@ -202,8 +202,8 @@ fn test_get_snapshot_split_keys() { // If we want to split the range into 2 parts, it should return 1 split key. let split_count = 4; let res = ffi_get_split_keys(reader.clone(), split_count); - let maximum = format!("zk{:06}", key_count); - let minimum = format!("zk{:06}", 0); + let maximum = format!("k{:06}", key_count); + let minimum = format!("k{:06}", 0); tikv_util::debug!("minimum split key is {}", minimum); tikv_util::debug!("maximum split key is {}", maximum); for i in 0..res.len { From 642eb3fa39e816a145a5759ed87a7b30f77f9dd2 Mon Sep 17 00:00:00 2001 From: CalvinNeo Date: Sun, 8 Oct 2023 16:03:35 +0800 Subject: [PATCH 14/14] ftest Signed-off-by: CalvinNeo --- proxy_tests/proxy/v2_compat/tablet_snapshot.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy_tests/proxy/v2_compat/tablet_snapshot.rs b/proxy_tests/proxy/v2_compat/tablet_snapshot.rs index b73929f1691..eb9f2c08cbd 100644 --- a/proxy_tests/proxy/v2_compat/tablet_snapshot.rs +++ b/proxy_tests/proxy/v2_compat/tablet_snapshot.rs @@ -358,7 +358,7 @@ fn test_v1_apply_snap_from_v2() { let engine = cluster_v2.get_engine(1); for i in 0..50 { - let k = format!("k{:06}", i); + let k = format!("k{:04}", i); cluster_v2.must_put(k.as_bytes(), b"val"); } cluster_v2.flush_data(); @@ -386,7 +386,7 @@ fn test_v1_apply_snap_from_v2() { let path_str = path.as_path().to_str().unwrap(); for i in 11..50 { - let k = format!("k{:06}", i); + let k = format!("k{:04}", i); check_key( &cluster_v1, k.as_bytes(),