Skip to content

Commit

Permalink
feat: databend-meta watch API provides initial_flush field (#17098)
Browse files Browse the repository at this point in the history
* chore: remove unused setting API

* feat: databend-meta watch API provides `initial_flush` field

If `WatchRequest.initial_flush` is `true`, when the watch stream is
established, all the key values in the specified range will be sent
through the stream to the watching client.

This allows the client to setup a full copy of the key range as a client
side cache, and furthur modifications will be received and the client
side cache can be updated.
  • Loading branch information
drmingdrmer authored Dec 24, 2024
1 parent 9a1b6a6 commit 5921c66
Show file tree
Hide file tree
Showing 18 changed files with 300 additions and 108 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/raft-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ serde_json = { workspace = true }
stream-more = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }

[dev-dependencies]
databend-common-base = { workspace = true }
Expand Down
31 changes: 27 additions & 4 deletions src/meta/raft-store/src/sm_v003/sm_v003.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::Bound;
use std::fmt::Debug;
use std::io;

use databend_common_meta_types::protobuf::WatchResponse;
use databend_common_meta_types::raft_types::Entry;
use databend_common_meta_types::raft_types::StorageError;
use databend_common_meta_types::snapshot_db::DB;
use databend_common_meta_types::sys_data::SysData;
use databend_common_meta_types::AppliedState;
use log::info;
use openraft::RaftLogId;
use tokio::sync::mpsc;
use tonic::Status;

use crate::applier::Applier;
use crate::leveled_store::leveled_map::compactor::Compactor;
Expand All @@ -31,6 +35,7 @@ use crate::sm_v003::sm_v003_kv_api::SMV003KVApi;
use crate::state_machine::ExpireKey;
use crate::state_machine_api::SMEventSender;
use crate::state_machine_api::StateMachineApi;
use crate::state_machine_api_ext::StateMachineApiExt;

#[derive(Debug, Default)]
pub struct SMV003 {
Expand Down Expand Up @@ -62,10 +67,6 @@ impl StateMachineApi for SMV003 {
&mut self.levels
}

// fn sys_data_ref(&self) -> &SysData {
// self.levels.writable_ref().sys_data_ref()
// }

fn sys_data_mut(&mut self) -> &mut SysData {
self.levels.sys_data_mut()
}
Expand Down Expand Up @@ -127,6 +128,28 @@ impl SMV003 {
self.levels.persisted().cloned()
}

/// Atomically reads and forwards a range of key-value pairs to the provided `tx`.
///
/// - Any data publishing must be queued by the singleton sender to maintain ordering.
///
/// - Atomically reading the key-value range within the state machine
/// and sending it to the singleton event sender.
/// Ensuring that there is no event out of order.
pub async fn send_range(
&mut self,
tx: mpsc::Sender<Result<WatchResponse, Status>>,
rng: (Bound<String>, Bound<String>),
) -> Result<(), io::Error> {
let Some(sender) = self.event_sender() else {
return Ok(());
};

let strm = self.range_kv(rng).await?;

sender.send_batch(tx, strm);
Ok(())
}

#[allow(dead_code)]
pub(crate) fn new_applier(&mut self) -> Applier<'_, Self> {
Applier::new(self)
Expand Down
14 changes: 14 additions & 0 deletions src/meta/raft-store/src/state_machine_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,29 @@

use std::fmt::Debug;

use databend_common_meta_types::protobuf::WatchResponse;
use databend_common_meta_types::sys_data::SysData;
use databend_common_meta_types::Change;
use databend_common_meta_types::SeqV;
use tokio::sync::mpsc;
use tonic::Status;

use crate::leveled_store::map_api::IOResultStream;
use crate::leveled_store::map_api::MapApi;
use crate::state_machine::ExpireKey;

/// Send a key-value change event to subscribers.
pub trait SMEventSender: Debug + Sync + Send {
fn send(&self, change: Change<Vec<u8>, String>);

/// Inform to send all items in `strm` to `tx`.
///
/// All event must be sent by the event dispatcher in order to keep the order.
fn send_batch(
&self,
tx: mpsc::Sender<Result<WatchResponse, Status>>,
strm: IOResultStream<(String, SeqV)>,
);
}

/// The API a state machine implements
Expand Down
27 changes: 22 additions & 5 deletions src/meta/raft-store/src/state_machine_api_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::future;
use std::io;
use std::ops::RangeBounds;

use databend_common_meta_types::CmdContext;
use databend_common_meta_types::EvalExpireTime;
Expand All @@ -32,6 +33,7 @@ use crate::leveled_store::map_api::IOResultStream;
use crate::leveled_store::map_api::MapApi;
use crate::leveled_store::map_api::MapApiExt;
use crate::leveled_store::map_api::MapApiRO;
use crate::leveled_store::map_api::MarkedOf;
use crate::marked::Marked;
use crate::state_machine::ExpireKey;
use crate::state_machine_api::StateMachineApi;
Expand Down Expand Up @@ -102,11 +104,18 @@ pub trait StateMachineApiExt: StateMachineApi {
// Return only keys with the expected prefix
.try_take_while(move |(k, _)| future::ready(Ok(k.starts_with(&p))))
// Skip tombstone
.try_filter_map(|(k, marked)| {
let seqv = Into::<Option<SeqV>>::into(marked);
let res = seqv.map(|x| (k, x));
future::ready(Ok(res))
});
.try_filter_map(|(k, marked)| future::ready(Ok(marked_to_seqv(k, marked))));

Ok(strm.boxed())
}

/// Return a range of kv entries.
async fn range_kv<R>(&self, rng: R) -> Result<IOResultStream<(String, SeqV)>, io::Error>
where R: RangeBounds<String> + Send + Sync + Clone + 'static {
let strm = self.map_ref().str_map().range(rng).await?;

// Skip tombstone
let strm = strm.try_filter_map(|(k, marked)| future::ready(Ok(marked_to_seqv(k, marked))));

Ok(strm.boxed())
}
Expand Down Expand Up @@ -197,3 +206,11 @@ pub trait StateMachineApiExt: StateMachineApi {
}

impl<T> StateMachineApiExt for T where T: StateMachineApi {}

/// Convert internal data to a public API format.
///
/// A tombstone is converted to None.
fn marked_to_seqv(k: String, marked: MarkedOf<String>) -> Option<(String, SeqV)> {
let seqv = Into::<Option<SeqV>>::into(marked);
seqv.map(|x| (k, x))
}
17 changes: 15 additions & 2 deletions src/meta/service/src/api/grpc/grpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,26 @@ impl MetaService for MetaServiceImpl {
&self,
request: Request<WatchRequest>,
) -> Result<Response<Self::WatchStream>, Status> {
let watch = request.into_inner();

let key_range = watch.key_range().map_err(Status::invalid_argument)?;
let flush = watch.initial_flush;

let (tx, rx) = mpsc::channel(4);

let mn = &self.meta_node;

let sender = mn.add_watcher(request.into_inner(), tx).await?;

let sender = mn.add_watcher(watch, tx.clone()).await?;
let stream = WatchStream::new(rx, sender, mn.subscriber_handle.clone());

if flush {
let sm = mn.raft_store.state_machine.clone();
{
let mut sm = sm.write().await;
sm.send_range(tx, key_range).await?;
}
}

Ok(Response::new(Box::pin(stream) as Self::WatchStream))
}

Expand Down
7 changes: 7 additions & 0 deletions src/meta/service/src/watcher/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
// limitations under the License.

use databend_common_meta_types::Change;
use futures::future::BoxFuture;

use crate::watcher::subscriber::EventSubscriber;

/// An event sent to EventDispatcher.
#[allow(clippy::type_complexity)]
pub(crate) enum Command {
/// Submit a kv change event to dispatcher
KVChange(Change<Vec<u8>, String>),
Expand All @@ -27,4 +29,9 @@ pub(crate) enum Command {
Request {
req: Box<dyn FnOnce(&mut EventSubscriber) + Send + 'static>,
},

/// Send a fn to [`EventSubscriber`] to run it asynchronously.
RequestAsync {
req: Box<dyn FnOnce(&mut EventSubscriber) -> BoxFuture<'static, ()> + Send + 'static>,
},
}
1 change: 1 addition & 0 deletions src/meta/service/src/watcher/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::watcher::SubscriberHandle;
#[derive(Debug)]
pub(crate) struct WatchStream<T> {
rx: Receiver<T>,
// TODO: use a Box<dyn Fn> to replace these two fields
/// Hold a clone of the sender to remove itself from the dispatcher when dropped.
sender: Arc<StreamSender>,
subscriber_handle: SubscriberHandle,
Expand Down
80 changes: 38 additions & 42 deletions src/meta/service/src/watcher/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
// limitations under the License.

use std::collections::BTreeSet;
use std::collections::Bound;
use std::io;
use std::sync::Arc;

use databend_common_meta_types::protobuf::watch_request::FilterType;
use databend_common_meta_types::protobuf::WatchRequest;
use databend_common_meta_types::protobuf::WatchResponse;
use databend_common_meta_types::Change;
use databend_common_meta_types::SeqV;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::StreamExt;
use log::info;
use log::warn;
use prost::Message;
Expand All @@ -32,7 +36,6 @@ use crate::metrics::server_metrics;
use crate::watcher::command::Command;
use crate::watcher::id::WatcherId;
use crate::watcher::subscriber_handle::SubscriberHandle;
use crate::watcher::KeyRange;
use crate::watcher::StreamSender;
use crate::watcher::WatchDesc;

Expand Down Expand Up @@ -70,12 +73,44 @@ impl EventSubscriber {
self.dispatch(kv_change).await;
}
Command::Request { req } => req(&mut self),
Command::RequestAsync { req } => req(&mut self).await,
}
}

info!("EventDispatcher: all event senders are closed. quit.");
}

/// Send a stream of kv changes to a watcher.
pub fn send_stream(
tx: mpsc::Sender<Result<WatchResponse, Status>>,
mut strm: BoxStream<'static, Result<(String, SeqV), io::Error>>,
) -> BoxFuture<'static, ()> {
let fu = async move {
while let Some(res) = strm.next().await {
let (key, seq_v) = match res {
Ok((key, seq)) => (key, seq),
Err(err) => {
warn!("EventSubscriber: recv error from kv stream: {}", err);
tx.send(Err(Status::internal(err.to_string()))).await.ok();
break;
}
};

let resp =
WatchResponse::new(&Change::new(None, Some(seq_v)).with_id(key)).unwrap();
let resp_size = resp.encoded_len() as u64;

if let Err(_err) = tx.send(Ok(resp)).await {
warn!("EventSubscriber: fail to send to watcher; close this stream");
break;
} else {
network_metrics::incr_sent_bytes(resp_size);
}
}
};
Box::pin(fu)
}

/// Dispatch a kv change event to interested watchers.
async fn dispatch(&mut self, change: Change<Vec<u8>, String>) {
let Some(key) = change.ident.clone() else {
Expand Down Expand Up @@ -153,7 +188,7 @@ impl EventSubscriber {
self.current_watcher_id += 1;
let watcher_id = self.current_watcher_id;

let range = Self::build_key_range(key.clone(), &key_end)?;
let range = WatchRequest::build_key_range(&key, &key_end)?;

let desc = WatchDesc::new(watcher_id, interested, range);
Ok(desc)
Expand All @@ -168,46 +203,7 @@ impl EventSubscriber {
server_metrics::incr_watchers(-1);
}

pub(crate) fn build_key_range(
key: String,
key_end: &Option<String>,
) -> Result<KeyRange, &'static str> {
let left = Bound::Included(key.clone());

match key_end {
Some(key_end) => {
if &key >= key_end {
return Err("empty range");
}
Ok((left, Bound::Excluded(key_end.to_string())))
}
None => Ok((left.clone(), left)),
}
}

pub fn watch_senders(&self) -> BTreeSet<&Arc<StreamSender>> {
self.watchers.values(..)
}
}

#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_key_range() -> Result<(), &'static str> {
let x = EventSubscriber::build_key_range(s("a"), &None)?;
assert_eq!(x, (Bound::Included(s("a")), Bound::Included(s("a"))));

let x = EventSubscriber::build_key_range(s("a"), &Some(s("b")))?;
assert_eq!(x, (Bound::Included(s("a")), Bound::Excluded(s("b"))));

let x = EventSubscriber::build_key_range(s("a"), &Some(s("a")));
assert_eq!(x, Err("empty range"));

Ok(())
}

fn s(x: impl ToString) -> String {
x.to_string()
}
}
19 changes: 19 additions & 0 deletions src/meta/service/src/watcher/subscriber_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io;

use databend_common_meta_raft_store::state_machine_api::SMEventSender;
use databend_common_meta_types::protobuf::WatchResponse;
use databend_common_meta_types::Change;
use databend_common_meta_types::SeqV;
use futures::stream::BoxStream;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::RecvError;
use tonic::Status;

use crate::watcher::command::Command;
use crate::watcher::EventSubscriber;
Expand All @@ -31,6 +38,18 @@ impl SMEventSender for SubscriberHandle {
fn send(&self, change: Change<Vec<u8>, String>) {
let _ = self.tx.send(Command::KVChange(change));
}

fn send_batch(
&self,
tx: Sender<Result<WatchResponse, Status>>,
strm: BoxStream<'static, Result<(String, SeqV), io::Error>>,
) {
self.tx
.send(Command::RequestAsync {
req: Box::new(move |_d| EventSubscriber::send_stream(tx, strm)),
})
.ok();
}
}

impl SubscriberHandle {
Expand Down
Loading

0 comments on commit 5921c66

Please sign in to comment.