diff --git a/crates/cnidarium/src/rpc.rs b/crates/cnidarium/src/rpc.rs index 72bc796be2..992fb35a38 100644 --- a/crates/cnidarium/src/rpc.rs +++ b/crates/cnidarium/src/rpc.rs @@ -28,6 +28,7 @@ use crate::rpc::proto::v1::{ WatchResponse, }; use futures::{StreamExt, TryStreamExt}; +use regex::Regex; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::instrument; @@ -121,17 +122,29 @@ impl QueryService for Server { const MAX_REGEX_LEN: usize = 1024; - let key_regex = regex::RegexBuilder::new(&request.key_regex) - .size_limit(MAX_REGEX_LEN) - .build() - .map_err(|e| Status::invalid_argument(format!("invalid key_regex: {}", e)))?; + let key_regex = match request.key_regex.as_str() { + "" => None, + _ => Some( + regex::RegexBuilder::new(&request.key_regex) + .size_limit(MAX_REGEX_LEN) + .build() + .map_err(|e| Status::invalid_argument(format!("invalid key_regex: {}", e)))?, + ), + }; // Use the `bytes` regex to allow matching byte strings. - let nv_key_regex = regex::bytes::RegexBuilder::new(&request.key_regex) - .size_limit(MAX_REGEX_LEN) - .unicode(false) - .build() - .map_err(|e| Status::invalid_argument(format!("invalid nv_key_regex: {}", e)))?; + let nv_key_regex = match request.nv_key_regex.as_str() { + "" => None, + _ => Some( + regex::bytes::RegexBuilder::new(&request.nv_key_regex) + .size_limit(MAX_REGEX_LEN) + .unicode(false) + .build() + .map_err(|e| { + Status::invalid_argument(format!("invalid nv_key_regex: {}", e)) + })?, + ), + }; let (tx, rx) = tokio::sync::mpsc::channel::>(10); @@ -148,8 +161,8 @@ impl QueryService for Server { async fn watch_changes( storage: Storage, - key_regex: regex::Regex, - nv_key_regex: regex::bytes::Regex, + key_regex: Option, + nv_key_regex: Option, tx: tokio::sync::mpsc::Sender>, ) -> anyhow::Result<()> { let mut changes_rx = storage.subscribe_changes(); @@ -160,31 +173,43 @@ async fn watch_changes( } let (version, changes) = changes_rx.borrow_and_update().clone(); - for (key, value) in changes.unwritten_changes().iter() { - if key_regex.is_match(key) { - tx.send(Ok(WatchResponse { - version, - entry: Some(wr::Entry::Kv(wr::KeyValue { - key: key.clone(), - value: value.as_ref().cloned().unwrap_or_default(), - deleted: value.is_none(), - })), - })) - .await?; + if key_regex.is_some() || nv_key_regex.is_none() { + for (key, value) in changes.unwritten_changes().iter() { + if key_regex + .as_ref() + .unwrap_or(&Regex::new(r"").expect("empty regex ok")) + .is_match(key) + { + tx.send(Ok(WatchResponse { + version, + entry: Some(wr::Entry::Kv(wr::KeyValue { + key: key.clone(), + value: value.as_ref().cloned().unwrap_or_default(), + deleted: value.is_none(), + })), + })) + .await?; + } } } - for (key, value) in changes.nonverifiable_changes().iter() { - if nv_key_regex.is_match(key) { - tx.send(Ok(WatchResponse { - version, - entry: Some(wr::Entry::NvKv(wr::NvKeyValue { - key: key.clone(), - value: value.as_ref().cloned().unwrap_or_default(), - deleted: value.is_none(), - })), - })) - .await?; + if nv_key_regex.is_some() || key_regex.is_none() { + for (key, value) in changes.nonverifiable_changes().iter() { + if nv_key_regex + .as_ref() + .unwrap_or(®ex::bytes::Regex::new(r"").expect("empty regex ok")) + .is_match(key) + { + tx.send(Ok(WatchResponse { + version, + entry: Some(wr::Entry::NvKv(wr::NvKeyValue { + key: key.clone(), + value: value.as_ref().cloned().unwrap_or_default(), + deleted: value.is_none(), + })), + })) + .await?; + } } } }