Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide split key in tablet reader #355

Merged
merged 15 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions proxy_components/mock-engine-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ rand = "0.8"
resolved_ts = { workspace = true }
resource_control = { workspace = true }
resource_metering = { workspace = true }
serde_json = "1.0"
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
service = { workspace = true }
security = { workspace = true, default-features = false }
server = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ impl ClusterExt {
cluster_ext_ptr: isize,
mock_cfg: MockConfig,
pd_client: Option<Arc<dyn pd_client::PdClient>>,
proxy_cfg: &ProxyConfig,
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
) -> (FFIHelperSet, TikvConfig) {
// We must allocate on heap to avoid move.

let proxy_config_str = serde_json::to_string(proxy_cfg).unwrap_or_default();
let proxy = Box::new(engine_store_ffi::ffi::RaftStoreProxy::new(
AtomicU8::new(RaftProxyStatus::Idle as u8),
key_mgr.clone(),
Expand All @@ -96,6 +99,7 @@ impl ClusterExt {
},
None,
pd_client,
proxy_config_str,
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
));

let proxy_ref = proxy.as_ref();
Expand Down Expand Up @@ -163,6 +167,7 @@ impl ClusterExt {
cluster_ext_ptr,
mock_cfg,
pd_client,
proxy_cfg,
);

// We can not use moved or cloned engines any more.
Expand Down
2 changes: 2 additions & 0 deletions proxy_components/proxy_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ lazy_static = "1.3"
protobuf = { version = "2.8", features = ["bytes"] }
pd_client = { workspace = true }
raftstore = { workspace = true, default-features = false }
serde_json = "1.0"
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
slog-global = { version = "0.1", git = "https://github.com/breeswish/slog-global.git", rev = "d592f88e4dbba5eb439998463054f1a44fbf17b9" }
tikv_util = { workspace = true, default-features = false }
Expand All @@ -46,6 +47,7 @@ tokio-timer = { git = "https://github.com/tikv/tokio", branch = "tokio-timer-hot
tracker = { workspace = true, default-features = false }
reqwest = { version = "0.11", features = ["blocking"] }
url = "2.4.0"
collections = { workspace = true }

[dependencies.rocksdb]
git = "https://github.com/tikv/rust-rocksdb.git"
Expand Down
169 changes: 167 additions & 2 deletions proxy_components/proxy_ffi/src/domain_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE};
use super::{
interfaces_ffi,
interfaces_ffi::{
BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, SSTView, SSTViewVec,
WriteCmdType, WriteCmdsView,
BaseBuffView, ColumnFamilyType, RaftCmdHeader, RawRustPtr, RawVoidPtr, RustStrWithView,
RustStrWithViewVec, SSTView, SSTViewVec, WriteCmdType, WriteCmdsView,
},
read_index_helper, utils,
};
Expand Down Expand Up @@ -131,6 +131,8 @@ pub enum RawRustPtrType {
ReadIndexTask = 1,
ArcFutureWaker = 2,
TimerTask = 3,
String = 4,
VecOfString = 5,
}

impl From<u32> for RawRustPtrType {
Expand Down Expand Up @@ -162,6 +164,12 @@ pub extern "C" fn ffi_gc_rust_ptr(data: RawVoidPtr, type_: interfaces_ffi::RawRu
RawRustPtrType::TimerTask => unsafe {
drop(Box::from_raw(data as *mut utils::TimerTask));
},
RawRustPtrType::String => unsafe {
drop(Box::from_raw(data as *mut RustStrWithViewInner));
},
RawRustPtrType::VecOfString => unsafe {
drop(Box::from_raw(data as *mut RustStrWithViewVecInner));
},
_ => unreachable!(),
}
}
Expand All @@ -180,3 +188,160 @@ impl RawRustPtr {
self.ptr.is_null()
}
}

#[derive(Default)]
pub struct TestGcObjectMonitor {
rust: std::sync::Mutex<collections::HashMap<interfaces_ffi::RawRustPtrType, isize>>,
}

impl TestGcObjectMonitor {
pub fn add_rust(&self, t: &interfaces_ffi::RawRustPtrType, x: isize) {
use std::collections::hash_map::Entry;
let data = &mut *self.rust.lock().unwrap();
match data.entry(*t) {
Entry::Occupied(mut v) => {
*v.get_mut() += x;
}
Entry::Vacant(v) => {
v.insert(x);
}
}
}
pub fn valid_clean_rust(&self) -> bool {
let data = &*self.rust.lock().unwrap();
for (k, v) in data {
if *v != 0 {
tikv_util::error!(
"TestGcObjectMonitor::valid_clean failed at type {} refcount {}",
k,
v
);
return false;
}
}
true
}
pub fn is_empty_rust(&self) -> bool {
let data = &*self.rust.lock().unwrap();
data.is_empty()
}
}

#[cfg(any(test, feature = "testexport"))]
lazy_static::lazy_static! {
pub static ref TEST_GC_OBJ_MONITOR: TestGcObjectMonitor = TestGcObjectMonitor::default();
}

impl Default for RustStrWithViewVec {
fn default() -> Self {
RustStrWithViewVec {
buffs: std::ptr::null_mut(),
len: 0,
inner: RawRustPtr::default(),
}
}
}

struct RustStrWithViewVecInner {
// Hold the Vec of String.
#[allow(clippy::box_collection)]
_data: Pin<Box<Vec<Vec<u8>>>>,
// Hold the BaseBuffView array.
#[allow(clippy::box_collection)]
buff_view_vec: Pin<Box<Vec<BaseBuffView>>>,
}

impl Drop for RustStrWithViewVecInner {
fn drop(&mut self) {
#[cfg(any(test, feature = "testexport"))]
{
TEST_GC_OBJ_MONITOR.add_rust(&RawRustPtrType::VecOfString.into(), -1);
}
}
}

pub fn build_from_vec_string(s: Vec<Vec<u8>>) -> RustStrWithViewVec {
let vec_len = s.len();
let vec_len_64: u64 = vec_len as u64;
let inner_vec_of_string = Box::pin(s);
let mut buff_view_vec: Vec<BaseBuffView> = Vec::with_capacity(vec_len);
for i in 0..vec_len {
buff_view_vec.push(BaseBuffView {
data: inner_vec_of_string[i].as_ptr() as *const _,
len: inner_vec_of_string[i].len() as u64,
});
}
let inner = Box::new(RustStrWithViewVecInner {
_data: inner_vec_of_string,
buff_view_vec: Box::pin(buff_view_vec),
});
#[cfg(any(test, feature = "testexport"))]
{
TEST_GC_OBJ_MONITOR.add_rust(&RawRustPtrType::VecOfString.into(), 1);
}
let inner_wrapped = RawRustPtr {
ptr: inner.as_ref() as *const _ as RawVoidPtr,
type_: RawRustPtrType::VecOfString.into(),
};
let buff_view_vec_ptr = inner.buff_view_vec.as_ptr();
std::mem::forget(inner);

RustStrWithViewVec {
buffs: buff_view_vec_ptr,
len: vec_len_64,
inner: inner_wrapped,
}
}

impl Default for RustStrWithView {
fn default() -> Self {
RustStrWithView {
buff: BaseBuffView {
data: std::ptr::null(),
len: 0,
},
inner: RawRustPtr::default(),
}
}
}

struct RustStrWithViewInner {
// Hold the String.
#[allow(clippy::box_collection)]
_data: Pin<Box<Vec<u8>>>,
}

impl Drop for RustStrWithViewInner {
fn drop(&mut self) {
#[cfg(any(test, feature = "testexport"))]
{
TEST_GC_OBJ_MONITOR.add_rust(&RawRustPtrType::String.into(), -1);
}
}
}

pub fn build_from_string(s: Vec<u8>) -> RustStrWithView {
let str_len = s.len();
let inner_string = Box::pin(s);
let buff = BaseBuffView {
data: inner_string.as_ptr() as *const _,
len: str_len as u64,
};
let inner = Box::new(RustStrWithViewInner {
_data: inner_string,
});
#[cfg(any(test, feature = "testexport"))]
{
TEST_GC_OBJ_MONITOR.add_rust(&RawRustPtrType::String.into(), 1);
}
let inner_wrapped = RawRustPtr {
ptr: inner.as_ref() as *const _ as RawVoidPtr,
type_: RawRustPtrType::String.into(),
};
std::mem::forget(inner);

RustStrWithView {
buff,
inner: inner_wrapped,
}
}
33 changes: 32 additions & 1 deletion proxy_components/proxy_ffi/src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,19 @@ pub mod root {
}
#[repr(C)]
#[derive(Debug)]
pub struct RustStrWithView {
pub buff: root::DB::BaseBuffView,
pub inner: root::DB::RawRustPtr,
}
#[repr(C)]
#[derive(Debug)]
pub struct RustStrWithViewVec {
pub buffs: *const root::DB::BaseBuffView,
pub len: u64,
pub inner: root::DB::RawRustPtr,
}
#[repr(C)]
#[derive(Debug)]
pub struct SSTReaderInterfaces {
pub fn_get_sst_reader: ::std::option::Option<
unsafe extern "C" fn(
Expand Down Expand Up @@ -285,6 +298,18 @@ pub mod root {
arg4: root::DB::BaseBuffView,
),
>,
pub fn_approx_size: ::std::option::Option<
unsafe extern "C" fn(
arg1: root::DB::SSTReaderPtr,
arg2: root::DB::ColumnFamilyType,
) -> u64,
>,
pub fn_get_split_keys: ::std::option::Option<
unsafe extern "C" fn(
arg1: root::DB::SSTReaderPtr,
splits_count: u64,
) -> root::DB::RustStrWithViewVec,
>,
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
Expand Down Expand Up @@ -434,6 +459,12 @@ pub mod root {
applied_index: u64,
),
>,
pub fn_get_config_json: ::std::option::Option<
unsafe extern "C" fn(
arg1: root::DB::RaftStoreProxyPtr,
kind: u64,
) -> root::DB::RustStrWithView,
>,
}
#[repr(C)]
#[derive(Debug)]
Expand Down Expand Up @@ -659,7 +690,7 @@ pub mod root {
arg3: root::DB::RawVoidPtr,
) -> u32;
}
pub const RAFT_STORE_PROXY_VERSION: u64 = 3797917752479181299;
pub const RAFT_STORE_PROXY_VERSION: u64 = 5692329170612304456;
pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639;
}
}
8 changes: 8 additions & 0 deletions proxy_components/proxy_ffi/src/raftstore_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct RaftStoreProxy {
raftstore_proxy_engine: RwLock<Option<Eng>>,
pd_client: Option<Arc<dyn PdClient>>,
cluster_raftstore_ver: RwLock<RaftstoreVer>,
proxy_config_str: String,
}

impl RaftStoreProxy {
Expand All @@ -38,6 +39,7 @@ impl RaftStoreProxy {
read_index_client: Option<Box<dyn read_index_helper::ReadIndex>>,
raftstore_proxy_engine: Option<Eng>,
pd_client: Option<Arc<dyn PdClient>>,
proxy_config_str: String,
) -> Self {
RaftStoreProxy {
status,
Expand All @@ -46,6 +48,7 @@ impl RaftStoreProxy {
raftstore_proxy_engine: RwLock::new(raftstore_proxy_engine),
pd_client,
cluster_raftstore_ver: RwLock::new(RaftstoreVer::Uncertain),
proxy_config_str,
}
}
}
Expand Down Expand Up @@ -378,6 +381,11 @@ impl RaftStoreProxy {
unreachable!()
}
}

// TODO may be we can later move ProxyConfig to proxy_ffi.
pub fn get_proxy_config_str(&self) -> &String {
&self.proxy_config_str
}
}

pub trait RaftStoreProxyEngineTrait {
Expand Down
Loading
Loading