Skip to content

Commit

Permalink
cnidarium: add filterable grpc streaming of state changes
Browse files Browse the repository at this point in the history
This commit adds a new RPC to cnidarium that provides grpc streaming of state
changes, with server-side filtering using regexes on the keys.

Also, there's stub code in pcli that will consume it.
  • Loading branch information
hdevalence committed Dec 29, 2023
1 parent 1eb74b7 commit 886daac
Show file tree
Hide file tree
Showing 40 changed files with 2,374 additions and 364 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion crates/bin/pcli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@ default = ["std", "parallel", "download-proving-keys"]
download-proving-keys = ["penumbra-proof-params/download-proving-keys"]
sct-divergence-check = ["penumbra-view/sct-divergence-check"]
std = ["ark-ff/std", "ibc-types/std"]
parallel = ["penumbra-proof-params/parallel", "decaf377/parallel", "penumbra-shielded-pool/parallel", "penumbra-dex/parallel", "penumbra-governance/parallel", "penumbra-stake/parallel", "penumbra-transaction/parallel", "penumbra-wallet/parallel", "penumbra-proof-setup/parallel"]
parallel = [
"penumbra-proof-params/parallel",
"decaf377/parallel",
"penumbra-shielded-pool/parallel",
"penumbra-dex/parallel",
"penumbra-governance/parallel",
"penumbra-stake/parallel",
"penumbra-transaction/parallel",
"penumbra-wallet/parallel",
"penumbra-proof-setup/parallel",
]

[dependencies]
# Workspace dependencies
Expand Down Expand Up @@ -60,6 +70,7 @@ async-stream = "0.2"
bincode = "1.3.3"
blake2b_simd = "0.5"
base64 = "0.21"
simple-base64 = "0.23"
bytes = "1"
comfy-table = "5"
directories = "4.0.1"
Expand Down
80 changes: 79 additions & 1 deletion crates/bin/pcli/src/command/query.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};

mod shielded_pool;
use shielded_pool::ShieldedPool;
Expand Down Expand Up @@ -61,10 +61,31 @@ pub enum QueryCmd {
/// Queries information about IBC.
#[clap(subcommand)]
Ibc(IbcCmd),
/// Subscribes to a filtered stream of state changes.
Watch {
/// The regex to filter keys in verifiable storage.
///
/// The empty string matches all keys; the pattern $^ matches no keys.
#[clap(long, default_value = "")]
key_regex: String,
/// The regex to filter keys in nonverifiable storage.
///
/// The empty string matches all keys; the pattern $^ matches no keys.
#[clap(long, default_value = "")]
nv_key_regex: String,
},
}

impl QueryCmd {
pub async fn exec(&self, app: &mut App) -> Result<()> {
if let QueryCmd::Watch {
key_regex,
nv_key_regex,
} = self
{
return watch(key_regex.clone(), nv_key_regex.clone(), app).await;
}

// Special-case: this is a Tendermint query
if let QueryCmd::Tx(tx) = self {
return tx.exec(app).await;
Expand Down Expand Up @@ -101,6 +122,7 @@ impl QueryCmd {
| QueryCmd::Dex(_)
| QueryCmd::Governance(_)
| QueryCmd::Dao(_)
| QueryCmd::Watch { .. }
| QueryCmd::Ibc(_) => {
unreachable!("query handled in guard");
}
Expand Down Expand Up @@ -138,6 +160,7 @@ impl QueryCmd {
| QueryCmd::ShieldedPool { .. }
| QueryCmd::Governance { .. }
| QueryCmd::Key { .. }
| QueryCmd::Watch { .. }
| QueryCmd::Ibc(_) => true,
}
}
Expand All @@ -154,6 +177,7 @@ impl QueryCmd {
| QueryCmd::Dex { .. }
| QueryCmd::Governance { .. }
| QueryCmd::Dao { .. }
| QueryCmd::Watch { .. }
| QueryCmd::Ibc(_) => {
unreachable!("query is special cased")
}
Expand All @@ -162,3 +186,57 @@ impl QueryCmd {
Ok(())
}
}

// this code (not just this function, the whole module) is pretty shitty,
// but that's maybe okay for the moment. it exists to consume the rpc.
async fn watch(key_regex: String, nv_key_regex: String, app: &mut App) -> Result<()> {
use penumbra_proto::cnidarium::v1alpha1::{
query_service_client::QueryServiceClient, watch_response as wr,
};
let mut client = QueryServiceClient::new(app.pd_channel().await?);

let req = penumbra_proto::cnidarium::v1alpha1::WatchRequest {
key_regex,
nv_key_regex,
};

tracing::debug!(?req);

let mut stream = client.watch(req).await?.into_inner();

while let Some(rsp) = stream.message().await? {
match rsp.entry {
Some(wr::Entry::Kv(kv)) => {
if kv.deleted {
println!("{} KV {} -> DELETED", rsp.version, kv.key);
} else {
println!(
"{} KV {} -> {}",
rsp.version,
kv.key,
simple_base64::encode(&kv.value)
);
}
}
Some(wr::Entry::NvKv(nv_kv)) => {
let key = simple_base64::encode(&nv_kv.key);

if nv_kv.deleted {
println!("{} NVKV {} -> DELETED", rsp.version, key);
} else {
println!(
"{} NVKV {} -> {}",
rsp.version,
key,
simple_base64::encode(&nv_kv.value)
);
}
}
None => {
return Err(anyhow!("server returned None event"));
}
}
}

Ok(())
}
1 change: 1 addition & 0 deletions crates/cnidarium/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ prost = { version = "0.12.3", optional = true }
serde = { version = "1", optional = true }
pbjson = { version = "0.5", optional = true }
ibc-proto = { version = "0.39.0", default-features = false, features = ["serde"], optional = true }
regex = "1.10.2"

[dev-dependencies]
tempfile = "3.3.0"
Expand Down
19 changes: 19 additions & 0 deletions crates/cnidarium/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ pub struct Cache {
}

impl Cache {
/// Inspect the cache of unwritten changes to the verifiable state.
pub fn unwritten_changes(&self) -> &BTreeMap<String, Option<Vec<u8>>> {
&self.unwritten_changes
}

/// Inspect the cache of unwritten changes to the nonverifiable state.
pub fn nonverifiable_changes(&self) -> &BTreeMap<Vec<u8>, Option<Vec<u8>>> {
&self.nonverifiable_changes
}

/// Merge the given cache with this one, taking its writes in place of ours.
pub fn merge(&mut self, other: Cache) {
// One might ask, why does this exist separately from `apply_to`? The
Expand Down Expand Up @@ -103,4 +113,13 @@ impl Cache {
}
changes_by_substore
}

pub(crate) fn clone_changes(&self) -> Self {
Self {
unwritten_changes: self.unwritten_changes.clone(),
nonverifiable_changes: self.nonverifiable_changes.clone(),
ephemeral_objects: Default::default(),
events: Default::default(),
}
}
}
Loading

0 comments on commit 886daac

Please sign in to comment.