Skip to content

Commit

Permalink
Better handle regex in pcli query watch (#4361)
Browse files Browse the repository at this point in the history
  • Loading branch information
zbuc authored May 9, 2024
1 parent 845da22 commit e0276d7
Showing 1 changed file with 58 additions and 33 deletions.
91 changes: 58 additions & 33 deletions crates/cnidarium/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::rpc::proto::v1::{
WatchResponse,
};
use futures::{StreamExt, TryStreamExt};
use regex::Regex;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::instrument;
Expand Down Expand Up @@ -121,17 +122,29 @@ impl QueryService for Server {

const MAX_REGEX_LEN: usize = 1024;

let key_regex = regex::RegexBuilder::new(&request.key_regex)
.size_limit(MAX_REGEX_LEN)
.build()
.map_err(|e| Status::invalid_argument(format!("invalid key_regex: {}", e)))?;
let key_regex = match request.key_regex.as_str() {
"" => None,
_ => Some(
regex::RegexBuilder::new(&request.key_regex)
.size_limit(MAX_REGEX_LEN)
.build()
.map_err(|e| Status::invalid_argument(format!("invalid key_regex: {}", e)))?,
),
};

// Use the `bytes` regex to allow matching byte strings.
let nv_key_regex = regex::bytes::RegexBuilder::new(&request.key_regex)
.size_limit(MAX_REGEX_LEN)
.unicode(false)
.build()
.map_err(|e| Status::invalid_argument(format!("invalid nv_key_regex: {}", e)))?;
let nv_key_regex = match request.nv_key_regex.as_str() {
"" => None,
_ => Some(
regex::bytes::RegexBuilder::new(&request.nv_key_regex)
.size_limit(MAX_REGEX_LEN)
.unicode(false)
.build()
.map_err(|e| {
Status::invalid_argument(format!("invalid nv_key_regex: {}", e))
})?,
),
};

let (tx, rx) = tokio::sync::mpsc::channel::<Result<WatchResponse, tonic::Status>>(10);

Expand All @@ -148,8 +161,8 @@ impl QueryService for Server {

async fn watch_changes(
storage: Storage,
key_regex: regex::Regex,
nv_key_regex: regex::bytes::Regex,
key_regex: Option<regex::Regex>,
nv_key_regex: Option<regex::bytes::Regex>,
tx: tokio::sync::mpsc::Sender<Result<WatchResponse, tonic::Status>>,
) -> anyhow::Result<()> {
let mut changes_rx = storage.subscribe_changes();
Expand All @@ -160,31 +173,43 @@ async fn watch_changes(
}
let (version, changes) = changes_rx.borrow_and_update().clone();

for (key, value) in changes.unwritten_changes().iter() {
if key_regex.is_match(key) {
tx.send(Ok(WatchResponse {
version,
entry: Some(wr::Entry::Kv(wr::KeyValue {
key: key.clone(),
value: value.as_ref().cloned().unwrap_or_default(),
deleted: value.is_none(),
})),
}))
.await?;
if key_regex.is_some() || nv_key_regex.is_none() {
for (key, value) in changes.unwritten_changes().iter() {
if key_regex
.as_ref()
.unwrap_or(&Regex::new(r"").expect("empty regex ok"))
.is_match(key)
{
tx.send(Ok(WatchResponse {
version,
entry: Some(wr::Entry::Kv(wr::KeyValue {
key: key.clone(),
value: value.as_ref().cloned().unwrap_or_default(),
deleted: value.is_none(),
})),
}))
.await?;
}
}
}

for (key, value) in changes.nonverifiable_changes().iter() {
if nv_key_regex.is_match(key) {
tx.send(Ok(WatchResponse {
version,
entry: Some(wr::Entry::NvKv(wr::NvKeyValue {
key: key.clone(),
value: value.as_ref().cloned().unwrap_or_default(),
deleted: value.is_none(),
})),
}))
.await?;
if nv_key_regex.is_some() || key_regex.is_none() {
for (key, value) in changes.nonverifiable_changes().iter() {
if nv_key_regex
.as_ref()
.unwrap_or(&regex::bytes::Regex::new(r"").expect("empty regex ok"))
.is_match(key)
{
tx.send(Ok(WatchResponse {
version,
entry: Some(wr::Entry::NvKv(wr::NvKeyValue {
key: key.clone(),
value: value.as_ref().cloned().unwrap_or_default(),
deleted: value.is_none(),
})),
}))
.await?;
}
}
}
}
Expand Down

0 comments on commit e0276d7

Please sign in to comment.