Skip to content

Commit

Permalink
use smallvec for db query buffering
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Volk <[email protected]>
  • Loading branch information
jevolk committed Nov 28, 2024
1 parent 76c75cc commit 3ad6aa5
Show file tree
Hide file tree
Showing 23 changed files with 173 additions and 98 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@ name = "conduit"

[workspace.dependencies.arrayvec]
version = "0.7.4"
features = ["std", "serde"]
features = ["serde"]

[workspace.dependencies.smallvec]
version = "1.13.2"
features = [
"const_generics",
"const_new",
"serde",
"write",
]

[workspace.dependencies.const-str]
version = "0.5.7"
Expand Down
1 change: 1 addition & 0 deletions src/database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ log.workspace = true
rust-rocksdb.workspace = true
serde.workspace = true
serde_json.workspace = true
smallvec.workspace = true
tokio.workspace = true
tracing.workspace = true

Expand Down
1 change: 1 addition & 0 deletions src/database/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl<'a> Deserialized for Result<&'a Handle<'a>> {
}

impl<'a> Deserialized for &'a Handle<'a> {
#[inline]
fn map_de<T, U, F>(self, f: F) -> Result<U>
where
F: FnOnce(T) -> U,
Expand Down
35 changes: 32 additions & 3 deletions src/database/keyval.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,42 @@
use conduit::Result;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;

use crate::de;
use crate::{de, ser};

pub type KeyVal<'a, K = &'a Slice, V = &'a Slice> = (Key<'a, K>, Val<'a, V>);
pub type Key<'a, T = &'a Slice> = T;
pub type Val<'a, T = &'a Slice> = T;

pub type Slice = [u8];
pub type KeyBuf = KeyBuffer;
pub type ValBuf = ValBuffer;

pub type KeyBuffer<const CAP: usize = KEY_STACK_CAP> = Buffer<CAP>;
pub type ValBuffer<const CAP: usize = VAL_STACK_CAP> = Buffer<CAP>;
pub type Buffer<const CAP: usize = DEF_STACK_CAP> = SmallVec<[Byte; CAP]>;

pub type Slice = [Byte];
pub type Byte = u8;

pub const KEY_STACK_CAP: usize = 128;
pub const VAL_STACK_CAP: usize = 512;
pub const DEF_STACK_CAP: usize = KEY_STACK_CAP;

#[inline]
pub fn serialize_key<T>(val: T) -> Result<KeyBuf>
where
T: Serialize,
{
ser::serialize_to::<KeyBuf, _>(val)
}

#[inline]
pub fn serialize_val<T>(val: T) -> Result<ValBuf>
where
T: Serialize,
{
ser::serialize_to::<ValBuf, _>(val)
}

#[inline]
pub(crate) fn _expect_deserialize<'a, K, V>(kv: Result<KeyVal<'a>>) -> KeyVal<'a, K, V>
Expand Down
7 changes: 5 additions & 2 deletions src/database/map/contains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,25 @@ use conduit::{
use futures::FutureExt;
use serde::Serialize;

use crate::ser;
use crate::{keyval::KeyBuf, ser};

/// Returns true if the map contains the key.
/// - key is serialized into allocated buffer
/// - harder errors may not be reported
#[inline]
#[implement(super::Map)]
pub fn contains<K>(self: &Arc<Self>, key: &K) -> impl Future<Output = bool> + Send + '_
where
K: Serialize + ?Sized + Debug,
{
let mut buf = Vec::<u8>::with_capacity(64);
let mut buf = KeyBuf::new();
self.bcontains(key, &mut buf)
}

/// Returns true if the map contains the key.
/// - key is serialized into stack-buffer
/// - harder errors will panic
#[inline]
#[implement(super::Map)]
pub fn acontains<const MAX: usize, K>(self: &Arc<Self>, key: &K) -> impl Future<Output = bool> + Send + '_
where
Expand All @@ -51,6 +53,7 @@ where

/// Returns Ok if the map contains the key.
/// - key is raw
#[inline]
#[implement(super::Map)]
pub fn exists<'a, K>(self: &'a Arc<Self>, key: &K) -> impl Future<Output = Result> + Send + 'a
where
Expand Down
10 changes: 5 additions & 5 deletions src/database/map/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use rocksdb::DBPinnableSlice;
use serde::Serialize;

use crate::{
keyval::KeyBuf,
ser,
util::{is_incomplete, map_err, or_else},
Handle,
Expand All @@ -18,18 +19,20 @@ type RocksdbResult<'a> = Result<Option<DBPinnableSlice<'a>>, rocksdb::Error>;
/// asynchronously. The key is serialized into an allocated buffer to perform
/// the query.
#[implement(super::Map)]
#[inline]
pub fn qry<K>(self: &Arc<Self>, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
{
let mut buf = Vec::<u8>::with_capacity(64);
let mut buf = KeyBuf::new();
self.bqry(key, &mut buf)
}

/// Fetch a value from the database into cache, returning a reference-handle
/// asynchronously. The key is serialized into a fixed-sized buffer to perform
/// the query. The maximum size is supplied as const generic parameter.
#[implement(super::Map)]
#[inline]
pub fn aqry<const MAX: usize, K>(self: &Arc<Self>, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
where
K: Serialize + ?Sized + Debug,
Expand Down Expand Up @@ -69,11 +72,8 @@ where
debug_assert!(matches!(cached, Ok(None)), "expected status Incomplete");
let cmd = Cmd::Get(Get {
map: self.clone(),
key: key.as_ref().into(),
res: None,
key: key
.as_ref()
.try_into()
.expect("failed to copy key into buffer"),
});

self.db.pool.execute(cmd).boxed()
Expand Down
26 changes: 19 additions & 7 deletions src/database/map/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,25 @@ use conduit::implement;
use rocksdb::WriteBatchWithTransaction;
use serde::Serialize;

use crate::{ser, util::or_else};
use crate::{
keyval::{KeyBuf, ValBuf},
ser,
util::or_else,
};

/// Insert Key/Value
///
/// - Key is serialized
/// - Val is serialized
#[implement(super::Map)]
#[inline]
pub fn put<K, V>(&self, key: K, val: V)
where
K: Serialize + Debug,
V: Serialize,
{
let mut key_buf = Vec::new();
let mut val_buf = Vec::new();
let mut key_buf = KeyBuf::new();
let mut val_buf = ValBuf::new();
self.bput(key, val, (&mut key_buf, &mut val_buf));
}

Expand All @@ -32,12 +37,13 @@ where
/// - Key is serialized
/// - Val is raw
#[implement(super::Map)]
#[inline]
pub fn put_raw<K, V>(&self, key: K, val: V)
where
K: Serialize + Debug,
V: AsRef<[u8]>,
{
let mut key_buf = Vec::new();
let mut key_buf = KeyBuf::new();
self.bput_raw(key, val, &mut key_buf);
}

Expand All @@ -46,12 +52,13 @@ where
/// - Key is raw
/// - Val is serialized
#[implement(super::Map)]
#[inline]
pub fn raw_put<K, V>(&self, key: K, val: V)
where
K: AsRef<[u8]>,
V: Serialize,
{
let mut val_buf = Vec::new();
let mut val_buf = ValBuf::new();
self.raw_bput(key, val, &mut val_buf);
}

Expand All @@ -60,12 +67,13 @@ where
/// - Key is serialized
/// - Val is serialized to stack-buffer
#[implement(super::Map)]
#[inline]
pub fn put_aput<const VMAX: usize, K, V>(&self, key: K, val: V)
where
K: Serialize + Debug,
V: Serialize,
{
let mut key_buf = Vec::new();
let mut key_buf = KeyBuf::new();
let mut val_buf = ArrayVec::<u8, VMAX>::new();
self.bput(key, val, (&mut key_buf, &mut val_buf));
}
Expand All @@ -75,13 +83,14 @@ where
/// - Key is serialized to stack-buffer
/// - Val is serialized
#[implement(super::Map)]
#[inline]
pub fn aput_put<const KMAX: usize, K, V>(&self, key: K, val: V)
where
K: Serialize + Debug,
V: Serialize,
{
let mut key_buf = ArrayVec::<u8, KMAX>::new();
let mut val_buf = Vec::new();
let mut val_buf = ValBuf::new();
self.bput(key, val, (&mut key_buf, &mut val_buf));
}

Expand All @@ -90,6 +99,7 @@ where
/// - Key is serialized to stack-buffer
/// - Val is serialized to stack-buffer
#[implement(super::Map)]
#[inline]
pub fn aput<const KMAX: usize, const VMAX: usize, K, V>(&self, key: K, val: V)
where
K: Serialize + Debug,
Expand All @@ -105,6 +115,7 @@ where
/// - Key is serialized to stack-buffer
/// - Val is raw
#[implement(super::Map)]
#[inline]
pub fn aput_raw<const KMAX: usize, K, V>(&self, key: K, val: V)
where
K: Serialize + Debug,
Expand All @@ -119,6 +130,7 @@ where
/// - Key is raw
/// - Val is serialized to stack-buffer
#[implement(super::Map)]
#[inline]
pub fn raw_aput<const VMAX: usize, K, V>(&self, key: K, val: V)
where
K: AsRef<[u8]>,
Expand Down
13 changes: 7 additions & 6 deletions src/database/map/keys_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@ use conduit::{implement, Result};
use futures::{Stream, StreamExt};
use serde::{Deserialize, Serialize};

use crate::{keyval, keyval::Key, ser, stream};
use crate::{
keyval::{result_deserialize_key, serialize_key, Key},
stream,
};

#[implement(super::Map)]
pub fn keys_from<'a, K, P>(&'a self, from: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send
where
P: Serialize + ?Sized + Debug,
K: Deserialize<'a> + Send,
{
self.keys_from_raw(from)
.map(keyval::result_deserialize_key::<K>)
self.keys_from_raw(from).map(result_deserialize_key::<K>)
}

#[implement(super::Map)]
Expand All @@ -22,7 +24,7 @@ pub fn keys_from_raw<P>(&self, from: &P) -> impl Stream<Item = Result<Key<'_>>>
where
P: Serialize + ?Sized + Debug,
{
let key = ser::serialize_to_vec(from).expect("failed to serialize query key");
let key = serialize_key(from).expect("failed to serialize query key");
self.raw_keys_from(&key)
}

Expand All @@ -32,8 +34,7 @@ where
P: AsRef<[u8]> + ?Sized + Debug + Sync,
K: Deserialize<'a> + Send,
{
self.raw_keys_from(from)
.map(keyval::result_deserialize_key::<K>)
self.raw_keys_from(from).map(result_deserialize_key::<K>)
}

#[implement(super::Map)]
Expand Down
8 changes: 4 additions & 4 deletions src/database/map/keys_prefix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures::{
};
use serde::{Deserialize, Serialize};

use crate::{keyval, keyval::Key, ser};
use crate::keyval::{result_deserialize_key, serialize_key, Key};

#[implement(super::Map)]
pub fn keys_prefix<'a, K, P>(&'a self, prefix: &P) -> impl Stream<Item = Result<Key<'_, K>>> + Send
Expand All @@ -17,7 +17,7 @@ where
K: Deserialize<'a> + Send,
{
self.keys_prefix_raw(prefix)
.map(keyval::result_deserialize_key::<K>)
.map(result_deserialize_key::<K>)
}

#[implement(super::Map)]
Expand All @@ -26,7 +26,7 @@ pub fn keys_prefix_raw<P>(&self, prefix: &P) -> impl Stream<Item = Result<Key<'_
where
P: Serialize + ?Sized + Debug,
{
let key = ser::serialize_to_vec(prefix).expect("failed to serialize query key");
let key = serialize_key(prefix).expect("failed to serialize query key");
self.raw_keys_from(&key)
.try_take_while(move |k: &Key<'_>| future::ok(k.starts_with(&key)))
}
Expand All @@ -38,7 +38,7 @@ where
K: Deserialize<'a> + Send + 'a,
{
self.raw_keys_prefix(prefix)
.map(keyval::result_deserialize_key::<K>)
.map(result_deserialize_key::<K>)
}

#[implement(super::Map)]
Expand Down
6 changes: 4 additions & 2 deletions src/database/map/remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ use arrayvec::ArrayVec;
use conduit::implement;
use serde::Serialize;

use crate::{ser, util::or_else};
use crate::{keyval::KeyBuf, ser, util::or_else};

#[implement(super::Map)]
#[inline]
pub fn del<K>(&self, key: K)
where
K: Serialize + Debug,
{
let mut buf = Vec::<u8>::with_capacity(64);
let mut buf = KeyBuf::new();
self.bdel(key, &mut buf);
}

#[implement(super::Map)]
#[inline]
pub fn adel<const MAX: usize, K>(&self, key: K)
where
K: Serialize + Debug,
Expand Down
Loading

0 comments on commit 3ad6aa5

Please sign in to comment.