From 5921c66126fdc23a7652ce57bfc44200d1d161b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 24 Dec 2024 23:54:12 +0800 Subject: [PATCH] feat: databend-meta watch API provides `initial_flush` field (#17098) * chore: remove unused setting API * feat: databend-meta watch API provides `initial_flush` field If `WatchRequest.initial_flush` is `true`, when the watch stream is established, all the key values in the specified range will be sent through the stream to the watching client. This allows the client to setup a full copy of the key range as a client side cache, and furthur modifications will be received and the client side cache can be updated. --- Cargo.lock | 1 + src/meta/raft-store/Cargo.toml | 1 + src/meta/raft-store/src/sm_v003/sm_v003.rs | 31 ++++++- src/meta/raft-store/src/state_machine_api.rs | 14 ++++ .../raft-store/src/state_machine_api_ext.rs | 27 +++++-- src/meta/service/src/api/grpc/grpc_service.rs | 17 +++- src/meta/service/src/watcher/command.rs | 7 ++ src/meta/service/src/watcher/stream.rs | 1 + src/meta/service/src/watcher/subscriber.rs | 80 +++++++++---------- .../service/src/watcher/subscriber_handle.rs | 19 +++++ .../tests/it/grpc/metasrv_grpc_watch.rs | 78 ++++++++++++++++++ src/meta/types/build.rs | 6 +- src/meta/types/proto/meta.proto | 6 ++ src/meta/types/src/proto_ext/watch_ext.rs | 64 +++++++++++++++ src/query/service/src/locks/lock_holder.rs | 7 +- src/query/settings/src/settings_global.rs | 3 +- src/query/users/src/lib.rs | 1 - src/query/users/src/user_setting.rs | 45 ----------- 18 files changed, 300 insertions(+), 108 deletions(-) delete mode 100644 src/query/users/src/user_setting.rs diff --git a/Cargo.lock b/Cargo.lock index 40f348ed57ba..7f82bd53d877 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3723,6 +3723,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tonic", ] [[package]] diff --git a/src/meta/raft-store/Cargo.toml b/src/meta/raft-store/Cargo.toml index f24e8814767a..bb7560648977 100644 --- a/src/meta/raft-store/Cargo.toml +++ b/src/meta/raft-store/Cargo.toml @@ -50,6 +50,7 @@ serde_json = { workspace = true } stream-more = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } +tonic = { workspace = true } [dev-dependencies] databend-common-base = { workspace = true } diff --git a/src/meta/raft-store/src/sm_v003/sm_v003.rs b/src/meta/raft-store/src/sm_v003/sm_v003.rs index 9729d8d598cb..2fce96735e33 100644 --- a/src/meta/raft-store/src/sm_v003/sm_v003.rs +++ b/src/meta/raft-store/src/sm_v003/sm_v003.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::Bound; use std::fmt::Debug; use std::io; +use databend_common_meta_types::protobuf::WatchResponse; use databend_common_meta_types::raft_types::Entry; use databend_common_meta_types::raft_types::StorageError; use databend_common_meta_types::snapshot_db::DB; @@ -22,6 +24,8 @@ use databend_common_meta_types::sys_data::SysData; use databend_common_meta_types::AppliedState; use log::info; use openraft::RaftLogId; +use tokio::sync::mpsc; +use tonic::Status; use crate::applier::Applier; use crate::leveled_store::leveled_map::compactor::Compactor; @@ -31,6 +35,7 @@ use crate::sm_v003::sm_v003_kv_api::SMV003KVApi; use crate::state_machine::ExpireKey; use crate::state_machine_api::SMEventSender; use crate::state_machine_api::StateMachineApi; +use crate::state_machine_api_ext::StateMachineApiExt; #[derive(Debug, Default)] pub struct SMV003 { @@ -62,10 +67,6 @@ impl StateMachineApi for SMV003 { &mut self.levels } - // fn sys_data_ref(&self) -> &SysData { - // self.levels.writable_ref().sys_data_ref() - // } - fn sys_data_mut(&mut self) -> &mut SysData { self.levels.sys_data_mut() } @@ -127,6 +128,28 @@ impl SMV003 { self.levels.persisted().cloned() } + /// Atomically reads and forwards a range of key-value pairs to the provided `tx`. + /// + /// - Any data publishing must be queued by the singleton sender to maintain ordering. + /// + /// - Atomically reading the key-value range within the state machine + /// and sending it to the singleton event sender. + /// Ensuring that there is no event out of order. + pub async fn send_range( + &mut self, + tx: mpsc::Sender>, + rng: (Bound, Bound), + ) -> Result<(), io::Error> { + let Some(sender) = self.event_sender() else { + return Ok(()); + }; + + let strm = self.range_kv(rng).await?; + + sender.send_batch(tx, strm); + Ok(()) + } + #[allow(dead_code)] pub(crate) fn new_applier(&mut self) -> Applier<'_, Self> { Applier::new(self) diff --git a/src/meta/raft-store/src/state_machine_api.rs b/src/meta/raft-store/src/state_machine_api.rs index 2d314155d7c2..d9d701ed524b 100644 --- a/src/meta/raft-store/src/state_machine_api.rs +++ b/src/meta/raft-store/src/state_machine_api.rs @@ -14,15 +14,29 @@ use std::fmt::Debug; +use databend_common_meta_types::protobuf::WatchResponse; use databend_common_meta_types::sys_data::SysData; use databend_common_meta_types::Change; +use databend_common_meta_types::SeqV; +use tokio::sync::mpsc; +use tonic::Status; +use crate::leveled_store::map_api::IOResultStream; use crate::leveled_store::map_api::MapApi; use crate::state_machine::ExpireKey; /// Send a key-value change event to subscribers. pub trait SMEventSender: Debug + Sync + Send { fn send(&self, change: Change, String>); + + /// Inform to send all items in `strm` to `tx`. + /// + /// All event must be sent by the event dispatcher in order to keep the order. + fn send_batch( + &self, + tx: mpsc::Sender>, + strm: IOResultStream<(String, SeqV)>, + ); } /// The API a state machine implements diff --git a/src/meta/raft-store/src/state_machine_api_ext.rs b/src/meta/raft-store/src/state_machine_api_ext.rs index 5679db88c35b..d8e13a0ed7c7 100644 --- a/src/meta/raft-store/src/state_machine_api_ext.rs +++ b/src/meta/raft-store/src/state_machine_api_ext.rs @@ -14,6 +14,7 @@ use std::future; use std::io; +use std::ops::RangeBounds; use databend_common_meta_types::CmdContext; use databend_common_meta_types::EvalExpireTime; @@ -32,6 +33,7 @@ use crate::leveled_store::map_api::IOResultStream; use crate::leveled_store::map_api::MapApi; use crate::leveled_store::map_api::MapApiExt; use crate::leveled_store::map_api::MapApiRO; +use crate::leveled_store::map_api::MarkedOf; use crate::marked::Marked; use crate::state_machine::ExpireKey; use crate::state_machine_api::StateMachineApi; @@ -102,11 +104,18 @@ pub trait StateMachineApiExt: StateMachineApi { // Return only keys with the expected prefix .try_take_while(move |(k, _)| future::ready(Ok(k.starts_with(&p)))) // Skip tombstone - .try_filter_map(|(k, marked)| { - let seqv = Into::>::into(marked); - let res = seqv.map(|x| (k, x)); - future::ready(Ok(res)) - }); + .try_filter_map(|(k, marked)| future::ready(Ok(marked_to_seqv(k, marked)))); + + Ok(strm.boxed()) + } + + /// Return a range of kv entries. + async fn range_kv(&self, rng: R) -> Result, io::Error> + where R: RangeBounds + Send + Sync + Clone + 'static { + let strm = self.map_ref().str_map().range(rng).await?; + + // Skip tombstone + let strm = strm.try_filter_map(|(k, marked)| future::ready(Ok(marked_to_seqv(k, marked)))); Ok(strm.boxed()) } @@ -197,3 +206,11 @@ pub trait StateMachineApiExt: StateMachineApi { } impl StateMachineApiExt for T where T: StateMachineApi {} + +/// Convert internal data to a public API format. +/// +/// A tombstone is converted to None. +fn marked_to_seqv(k: String, marked: MarkedOf) -> Option<(String, SeqV)> { + let seqv = Into::>::into(marked); + seqv.map(|x| (k, x)) +} diff --git a/src/meta/service/src/api/grpc/grpc_service.rs b/src/meta/service/src/api/grpc/grpc_service.rs index 3eea8a71a8e8..a99d0c835439 100644 --- a/src/meta/service/src/api/grpc/grpc_service.rs +++ b/src/meta/service/src/api/grpc/grpc_service.rs @@ -386,13 +386,26 @@ impl MetaService for MetaServiceImpl { &self, request: Request, ) -> Result, Status> { + let watch = request.into_inner(); + + let key_range = watch.key_range().map_err(Status::invalid_argument)?; + let flush = watch.initial_flush; + let (tx, rx) = mpsc::channel(4); let mn = &self.meta_node; - let sender = mn.add_watcher(request.into_inner(), tx).await?; - + let sender = mn.add_watcher(watch, tx.clone()).await?; let stream = WatchStream::new(rx, sender, mn.subscriber_handle.clone()); + + if flush { + let sm = mn.raft_store.state_machine.clone(); + { + let mut sm = sm.write().await; + sm.send_range(tx, key_range).await?; + } + } + Ok(Response::new(Box::pin(stream) as Self::WatchStream)) } diff --git a/src/meta/service/src/watcher/command.rs b/src/meta/service/src/watcher/command.rs index 0970121b288b..d6c18c8e842b 100644 --- a/src/meta/service/src/watcher/command.rs +++ b/src/meta/service/src/watcher/command.rs @@ -13,10 +13,12 @@ // limitations under the License. use databend_common_meta_types::Change; +use futures::future::BoxFuture; use crate::watcher::subscriber::EventSubscriber; /// An event sent to EventDispatcher. +#[allow(clippy::type_complexity)] pub(crate) enum Command { /// Submit a kv change event to dispatcher KVChange(Change, String>), @@ -27,4 +29,9 @@ pub(crate) enum Command { Request { req: Box, }, + + /// Send a fn to [`EventSubscriber`] to run it asynchronously. + RequestAsync { + req: Box BoxFuture<'static, ()> + Send + 'static>, + }, } diff --git a/src/meta/service/src/watcher/stream.rs b/src/meta/service/src/watcher/stream.rs index d5fee25227ca..0c766e5563b5 100644 --- a/src/meta/service/src/watcher/stream.rs +++ b/src/meta/service/src/watcher/stream.rs @@ -27,6 +27,7 @@ use crate::watcher::SubscriberHandle; #[derive(Debug)] pub(crate) struct WatchStream { rx: Receiver, + // TODO: use a Box to replace these two fields /// Hold a clone of the sender to remove itself from the dispatcher when dropped. sender: Arc, subscriber_handle: SubscriberHandle, diff --git a/src/meta/service/src/watcher/subscriber.rs b/src/meta/service/src/watcher/subscriber.rs index 334834e71243..b1603d389419 100644 --- a/src/meta/service/src/watcher/subscriber.rs +++ b/src/meta/service/src/watcher/subscriber.rs @@ -13,13 +13,17 @@ // limitations under the License. use std::collections::BTreeSet; -use std::collections::Bound; +use std::io; use std::sync::Arc; use databend_common_meta_types::protobuf::watch_request::FilterType; use databend_common_meta_types::protobuf::WatchRequest; use databend_common_meta_types::protobuf::WatchResponse; use databend_common_meta_types::Change; +use databend_common_meta_types::SeqV; +use futures::future::BoxFuture; +use futures::stream::BoxStream; +use futures::StreamExt; use log::info; use log::warn; use prost::Message; @@ -32,7 +36,6 @@ use crate::metrics::server_metrics; use crate::watcher::command::Command; use crate::watcher::id::WatcherId; use crate::watcher::subscriber_handle::SubscriberHandle; -use crate::watcher::KeyRange; use crate::watcher::StreamSender; use crate::watcher::WatchDesc; @@ -70,12 +73,44 @@ impl EventSubscriber { self.dispatch(kv_change).await; } Command::Request { req } => req(&mut self), + Command::RequestAsync { req } => req(&mut self).await, } } info!("EventDispatcher: all event senders are closed. quit."); } + /// Send a stream of kv changes to a watcher. + pub fn send_stream( + tx: mpsc::Sender>, + mut strm: BoxStream<'static, Result<(String, SeqV), io::Error>>, + ) -> BoxFuture<'static, ()> { + let fu = async move { + while let Some(res) = strm.next().await { + let (key, seq_v) = match res { + Ok((key, seq)) => (key, seq), + Err(err) => { + warn!("EventSubscriber: recv error from kv stream: {}", err); + tx.send(Err(Status::internal(err.to_string()))).await.ok(); + break; + } + }; + + let resp = + WatchResponse::new(&Change::new(None, Some(seq_v)).with_id(key)).unwrap(); + let resp_size = resp.encoded_len() as u64; + + if let Err(_err) = tx.send(Ok(resp)).await { + warn!("EventSubscriber: fail to send to watcher; close this stream"); + break; + } else { + network_metrics::incr_sent_bytes(resp_size); + } + } + }; + Box::pin(fu) + } + /// Dispatch a kv change event to interested watchers. async fn dispatch(&mut self, change: Change, String>) { let Some(key) = change.ident.clone() else { @@ -153,7 +188,7 @@ impl EventSubscriber { self.current_watcher_id += 1; let watcher_id = self.current_watcher_id; - let range = Self::build_key_range(key.clone(), &key_end)?; + let range = WatchRequest::build_key_range(&key, &key_end)?; let desc = WatchDesc::new(watcher_id, interested, range); Ok(desc) @@ -168,46 +203,7 @@ impl EventSubscriber { server_metrics::incr_watchers(-1); } - pub(crate) fn build_key_range( - key: String, - key_end: &Option, - ) -> Result { - let left = Bound::Included(key.clone()); - - match key_end { - Some(key_end) => { - if &key >= key_end { - return Err("empty range"); - } - Ok((left, Bound::Excluded(key_end.to_string()))) - } - None => Ok((left.clone(), left)), - } - } - pub fn watch_senders(&self) -> BTreeSet<&Arc> { self.watchers.values(..) } } - -#[cfg(test)] -mod tests { - use super::*; - #[test] - fn test_build_key_range() -> Result<(), &'static str> { - let x = EventSubscriber::build_key_range(s("a"), &None)?; - assert_eq!(x, (Bound::Included(s("a")), Bound::Included(s("a")))); - - let x = EventSubscriber::build_key_range(s("a"), &Some(s("b")))?; - assert_eq!(x, (Bound::Included(s("a")), Bound::Excluded(s("b")))); - - let x = EventSubscriber::build_key_range(s("a"), &Some(s("a"))); - assert_eq!(x, Err("empty range")); - - Ok(()) - } - - fn s(x: impl ToString) -> String { - x.to_string() - } -} diff --git a/src/meta/service/src/watcher/subscriber_handle.rs b/src/meta/service/src/watcher/subscriber_handle.rs index 563798f5f16a..cff517378aaf 100644 --- a/src/meta/service/src/watcher/subscriber_handle.rs +++ b/src/meta/service/src/watcher/subscriber_handle.rs @@ -12,11 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io; + use databend_common_meta_raft_store::state_machine_api::SMEventSender; +use databend_common_meta_types::protobuf::WatchResponse; use databend_common_meta_types::Change; +use databend_common_meta_types::SeqV; +use futures::stream::BoxStream; use tokio::sync::mpsc; +use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; use tokio::sync::oneshot::error::RecvError; +use tonic::Status; use crate::watcher::command::Command; use crate::watcher::EventSubscriber; @@ -31,6 +38,18 @@ impl SMEventSender for SubscriberHandle { fn send(&self, change: Change, String>) { let _ = self.tx.send(Command::KVChange(change)); } + + fn send_batch( + &self, + tx: Sender>, + strm: BoxStream<'static, Result<(String, SeqV), io::Error>>, + ) { + self.tx + .send(Command::RequestAsync { + req: Box::new(move |_d| EventSubscriber::send_stream(tx, strm)), + }) + .ok(); + } } impl SubscriberHandle { diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs index ac1010d0b02f..50552c4a337c 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_watch.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -125,6 +127,7 @@ async fn test_watch_single_key() -> anyhow::Result<()> { key: s("a"), key_end: None, filter_type: FilterType::All.into(), + initial_flush: false, }; let key_a = s("a"); @@ -168,6 +171,7 @@ async fn test_watch() -> anyhow::Result<()> { key: "a".to_string(), key_end: Some("z".to_string()), filter_type: FilterType::All.into(), + initial_flush: false, }; let key_a = s("a"); @@ -226,6 +230,7 @@ async fn test_watch() -> anyhow::Result<()> { key_end: None, // filter only delete events filter_type: FilterType::Delete.into(), + initial_flush: false, }; let key = s(key_str); @@ -289,6 +294,7 @@ async fn test_watch() -> anyhow::Result<()> { key: start, key_end: Some(end), filter_type: FilterType::All.into(), + initial_flush: false, }; let conditions = vec![TxnCondition { @@ -335,6 +341,76 @@ async fn test_watch() -> anyhow::Result<()> { Ok(()) } +#[test(harness = meta_service_test_harness)] +#[fastrace::trace] +async fn test_watch_initial_flush() -> anyhow::Result<()> { + let (tc, _addr) = crate::tests::start_metasrv().await?; + let updates = vec![ + UpsertKV::update("a", b"a"), + UpsertKV::update("b", b"b"), + UpsertKV::update("c", b"c"), + UpsertKV::update("d", b"d"), + UpsertKV::update("z", b"z"), + ]; + + let client = tc.grpc_client().await?; + for update in updates.iter() { + client.upsert_kv(update.clone()).await?; + } + + let mut strm = { + let watch = WatchRequest { + key: s("a"), + key_end: Some(s("e")), + filter_type: FilterType::All.into(), + initial_flush: true, + }; + client.request(watch).await? + }; + + let cache = Arc::new(Mutex::new(BTreeMap::new())); + + let c = cache.clone(); + let cache_updater = async move { + while let Ok(Some(resp)) = strm.message().await { + let event = resp.event.unwrap(); + + let mut cache = c.lock().unwrap(); + if let Some(value) = event.current { + cache.insert(event.key, value); + } else { + cache.remove(&event.key); + } + } + }; + + let _h = databend_common_base::runtime::spawn(cache_updater); + + tokio::time::sleep(Duration::from_secs(1)).await; + let keys = { + let cache = cache.lock().unwrap(); + cache.keys().cloned().collect::>() + }; + + assert_eq!(vec![s("a"), s("b"), s("c"), s("d")], keys); + + client.upsert_kv(UpsertKV::update("a", b"a2")).await?; + client.upsert_kv(UpsertKV::delete("c")).await?; + + tokio::time::sleep(Duration::from_secs(1)).await; + let values = { + let cache = cache.lock().unwrap(); + cache + .values() + .map(|seqv| seqv.data.clone()) + .collect::>() + }; + + assert_eq!(vec![b("a2"), b("b"), b("d")], values); + + Ok(()) +} + #[test(harness = meta_service_test_harness)] #[fastrace::trace] async fn test_watch_expired_events() -> anyhow::Result<()> { @@ -386,6 +462,7 @@ async fn test_watch_expired_events() -> anyhow::Result<()> { key: start, key_end: Some(end), filter_type: FilterType::All.into(), + initial_flush: false, }; watch_client.request(watch).await? }; @@ -482,6 +559,7 @@ async fn test_watch_stream_count() -> anyhow::Result<()> { key: "a".to_string(), key_end: Some("z".to_string()), filter_type: FilterType::All.into(), + initial_flush: false, }; let client1 = make_client(&addr)?; diff --git a/src/meta/types/build.rs b/src/meta/types/build.rs index 44e750d703a1..161e98e9962c 100644 --- a/src/meta/types/build.rs +++ b/src/meta/types/build.rs @@ -133,15 +133,15 @@ fn build_proto() { ) .type_attribute( "WatchRequest", - "#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]", + "#[derive(Eq, deepsize::DeepSizeOf)]", ) .type_attribute( "WatchResponse", - "#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]", + "#[derive(Eq, deepsize::DeepSizeOf)]", ) .type_attribute( "Event", - "#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]", + "#[derive(Eq, deepsize::DeepSizeOf)]", ) .type_attribute( "KVMeta", diff --git a/src/meta/types/proto/meta.proto b/src/meta/types/proto/meta.proto index fd323ee4cc0f..0903e4cd43e7 100644 --- a/src/meta/types/proto/meta.proto +++ b/src/meta/types/proto/meta.proto @@ -82,6 +82,12 @@ message WatchRequest { DELETE = 2; } FilterType filter_type = 3; + + // Whether to send current values of all watched keys when watch starts + // Useful for initializing client-side caches. The client: + // - first get a full copy of the key-values, + // - then update every time a key-value is changed. + bool initial_flush = 4; } message Event { diff --git a/src/meta/types/src/proto_ext/watch_ext.rs b/src/meta/types/src/proto_ext/watch_ext.rs index 83affb26745a..dd2b572a8bca 100644 --- a/src/meta/types/src/proto_ext/watch_ext.rs +++ b/src/meta/types/src/proto_ext/watch_ext.rs @@ -12,10 +12,52 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::Bound; + use crate::protobuf as pb; +use crate::protobuf::watch_request::FilterType; +use crate::protobuf::WatchRequest; use crate::protobuf::WatchResponse; use crate::Change; +impl WatchRequest { + /// Build a key range from a `key` and an optional `key_end`. + pub fn build_key_range( + key: &String, + key_end: &Option, + ) -> Result<(Bound, Bound), &'static str> { + let left = Bound::Included(key.clone()); + + match key_end { + Some(key_end) => { + if key >= key_end { + return Err("empty range"); + } + Ok((left, Bound::Excluded(key_end.to_string()))) + } + None => Ok((left.clone(), left)), + } + } + + pub fn new(key: String, key_end: Option) -> Self { + WatchRequest { + key, + key_end, + filter_type: FilterType::All as _, + initial_flush: false, + } + } + + pub fn with_filter(mut self, filter_type: FilterType) -> Self { + self.filter_type = filter_type as _; + self + } + + pub fn key_range(&self) -> Result<(Bound, Bound), &'static str> { + Self::build_key_range(&self.key, &self.key_end) + } +} + impl WatchResponse { pub fn new(change: &Change, String>) -> Option { let ev = pb::Event { @@ -27,3 +69,25 @@ impl WatchResponse { Some(WatchResponse { event: Some(ev) }) } } + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_build_key_range() -> Result<(), &'static str> { + let x = WatchRequest::build_key_range(&s("a"), &None)?; + assert_eq!(x, (Bound::Included(s("a")), Bound::Included(s("a")))); + + let x = WatchRequest::build_key_range(&s("a"), &Some(s("b")))?; + assert_eq!(x, (Bound::Included(s("a")), Bound::Excluded(s("b")))); + + let x = WatchRequest::build_key_range(&s("a"), &Some(s("a"))); + assert_eq!(x, Err("empty range")); + + Ok(()) + } + + fn s(x: impl ToString) -> String { + x.to_string() + } +} diff --git a/src/query/service/src/locks/lock_holder.rs b/src/query/service/src/locks/lock_holder.rs index 4ad31aefb7c9..fdd137af3594 100644 --- a/src/query/service/src/locks/lock_holder.rs +++ b/src/query/service/src/locks/lock_holder.rs @@ -122,11 +122,8 @@ impl LockHolder { let watch_delete_ident = TableLockIdent::new(tenant, table_id, prev_revision); // Get the previous revision, watch the delete event. - let req = WatchRequest { - key: watch_delete_ident.to_string_key(), - key_end: None, - filter_type: FilterType::Delete.into(), - }; + let req = WatchRequest::new(watch_delete_ident.to_string_key(), None) + .with_filter(FilterType::Delete); let mut watch_stream = meta_api.watch(req).await?; let lock_meta = meta_api.get_pb(&watch_delete_ident).await?; diff --git a/src/query/settings/src/settings_global.rs b/src/query/settings/src/settings_global.rs index b43ba8335e51..4b247c9a6300 100644 --- a/src/query/settings/src/settings_global.rs +++ b/src/query/settings/src/settings_global.rs @@ -58,7 +58,8 @@ impl Settings { }); UserApiProvider::instance() - .set_setting(&self.tenant, UserSetting { name: key, value }) + .setting_api(&self.tenant) + .set_setting(UserSetting { name: key, value }) .await?; Ok(()) } diff --git a/src/query/users/src/lib.rs b/src/query/users/src/lib.rs index 76518d04819a..19f9d595ce55 100644 --- a/src/query/users/src/lib.rs +++ b/src/query/users/src/lib.rs @@ -24,7 +24,6 @@ mod role_mgr; mod user; mod user_api; mod user_mgr; -mod user_setting; mod user_stage; mod user_udf; mod visibility_checker; diff --git a/src/query/users/src/user_setting.rs b/src/query/users/src/user_setting.rs deleted file mode 100644 index a22b4e44dd13..000000000000 --- a/src/query/users/src/user_setting.rs +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use databend_common_exception::Result; -use databend_common_meta_app::principal::UserSetting; -use databend_common_meta_app::tenant::Tenant; -use databend_common_meta_types::MatchSeq; - -use crate::UserApiProvider; - -impl UserApiProvider { - // Set a setting. - #[async_backtrace::framed] - pub async fn set_setting(&self, tenant: &Tenant, setting: UserSetting) -> Result { - let setting_api_provider = self.setting_api(tenant); - setting_api_provider.set_setting(setting).await - } - - // Get all settings list. - #[async_backtrace::framed] - pub async fn get_settings(&self, tenant: &Tenant) -> Result> { - let setting_api_provider = self.setting_api(tenant); - setting_api_provider.get_settings().await - } - - // Drop a setting by name. - #[async_backtrace::framed] - pub async fn drop_setting(&self, tenant: &Tenant, name: &str) -> Result<()> { - let setting_api_provider = self.setting_api(tenant); - setting_api_provider - .try_drop_setting(name, MatchSeq::GE(1)) - .await - } -}