From 112bb2461af7d86a6b16177c43fe9f11249b69a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=BAc=C3=A1s=20Meier?= Date: Mon, 21 Oct 2024 09:05:38 -0700 Subject: [PATCH] pindexer: indexing for insights dashboard (#4898) Closes #4896. This adds the requisite tables, with a component parametrized by the numeraire for price information. Price information uses candlestick data for generating the price, so the recently added event will need to be used there. This also backfills in rich domain types into old crates as necessary, for convenience. Also closes #4883, since at this point we've refactored out all requisite schemas. ## Checklist before requesting a review - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: > indexing only --- Cargo.lock | 1 + crates/bin/pindexer/Cargo.toml | 1 + crates/bin/pindexer/src/indexer_ext.rs | 9 + crates/bin/pindexer/src/insights/mod.rs | 506 ++++++++++++++++++ crates/bin/pindexer/src/insights/schema.sql | 54 ++ crates/bin/pindexer/src/lib.rs | 1 + crates/core/component/fee/src/component.rs | 17 +- crates/core/component/fee/src/event.rs | 47 ++ .../core/component/funding/src/component.rs | 30 +- crates/core/component/funding/src/event.rs | 50 +- .../src/component/action_handler/output.rs | 9 +- .../src/component/action_handler/spend.rs | 9 +- .../shielded-pool/src/component/transfer.rs | 173 +++--- .../core/component/shielded-pool/src/event.rs | 309 +++++++++-- .../src/component/action_handler/delegate.rs | 4 +- .../component/action_handler/undelegate.rs | 4 +- .../component/stake/src/component/stake.rs | 17 +- .../validator_handler/uptime_tracker.rs | 4 +- .../validator_handler/validator_manager.rs | 32 +- .../validator_handler/validator_store.rs | 35 +- crates/core/component/stake/src/event.rs | 476 +++++++++++++--- crates/core/component/stake/src/lib.rs | 2 +- 22 files changed, 1528 insertions(+), 262 deletions(-) create mode 100644 crates/bin/pindexer/src/insights/mod.rs create mode 100644 crates/bin/pindexer/src/insights/schema.sql diff --git a/Cargo.lock b/Cargo.lock index 965eea0d35..62d71d39c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5824,6 +5824,7 @@ dependencies = [ "penumbra-auction", "penumbra-dex", "penumbra-fee", + "penumbra-funding", "penumbra-governance", "penumbra-keys", "penumbra-num", diff --git a/crates/bin/pindexer/Cargo.toml b/crates/bin/pindexer/Cargo.toml index 610d43e780..24ee0b3531 100644 --- a/crates/bin/pindexer/Cargo.toml +++ b/crates/bin/pindexer/Cargo.toml @@ -22,6 +22,7 @@ penumbra-app = {workspace = true, default-features = false} penumbra-auction = {workspace = true, default-features = false} penumbra-dex = {workspace = true, default-features = false} penumbra-fee = {workspace = true, default-features = false} +penumbra-funding = {workspace = true, default-features = false} penumbra-keys = {workspace = true, default-features = false} penumbra-governance = {workspace = true, default-features = false} penumbra-num = {workspace = true, default-features = false} diff --git a/crates/bin/pindexer/src/indexer_ext.rs b/crates/bin/pindexer/src/indexer_ext.rs index 950000fdd3..90600e43ab 100644 --- a/crates/bin/pindexer/src/indexer_ext.rs +++ b/crates/bin/pindexer/src/indexer_ext.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + pub trait IndexerExt: Sized { fn with_default_penumbra_app_views(self) -> Self; } @@ -13,5 +15,12 @@ impl IndexerExt for cometindex::Indexer { .with_index(crate::dex_ex::Component::new()) .with_index(crate::supply::Component::new()) .with_index(crate::ibc::Component::new()) + .with_index(crate::insights::Component::new( + penumbra_asset::asset::Id::from_str( + // USDC + "passet1w6e7fvgxsy6ccy3m8q0eqcuyw6mh3yzqu3uq9h58nu8m8mku359spvulf6", + ) + .ok(), + )) } } diff --git a/crates/bin/pindexer/src/insights/mod.rs b/crates/bin/pindexer/src/insights/mod.rs new file mode 100644 index 0000000000..221c64eb25 --- /dev/null +++ b/crates/bin/pindexer/src/insights/mod.rs @@ -0,0 +1,506 @@ +use std::{collections::BTreeMap, iter}; + +use cometindex::{async_trait, AppView, ContextualizedEvent, PgTransaction}; +use penumbra_app::genesis::Content; +use penumbra_asset::{asset, STAKING_TOKEN_ASSET_ID}; +use penumbra_dex::{ + event::{EventArbExecution, EventCandlestickData}, + DirectedTradingPair, +}; +use penumbra_fee::event::EventBlockFees; +use penumbra_funding::event::EventFundingStreamReward; +use penumbra_num::Amount; +use penumbra_proto::{event::EventDomainType, DomainType, Name}; +use penumbra_shielded_pool::event::{ + EventInboundFungibleTokenTransfer, EventOutboundFungibleTokenRefund, + EventOutboundFungibleTokenTransfer, +}; +use penumbra_stake::{ + event::{EventDelegate, EventRateDataChange, EventUndelegate}, + validator::Validator, + IdentityKey, +}; +use sqlx::PgPool; + +use crate::parsing::parse_content; + +#[derive(Debug, Clone, Copy)] +struct ValidatorSupply { + um: u64, + rate_bps2: u64, +} + +async fn modify_validator_supply( + dbtx: &mut PgTransaction<'_>, + height: u64, + ik: IdentityKey, + f: Box anyhow::Result + Send + 'static>, +) -> anyhow::Result { + let ik_text = ik.to_string(); + let supply = { + let row: Option<(i64, i64)> = sqlx::query_as(" + SELECT um, rate_bps2 FROM _insights_validators WHERE validator_id = $1 ORDER BY height DESC LIMIT 1 + ").bind(&ik_text).fetch_optional(dbtx.as_mut()).await?; + let row = row.unwrap_or((0i64, 1_0000_0000i64)); + ValidatorSupply { + um: u64::try_from(row.0)?, + rate_bps2: u64::try_from(row.1)?, + } + }; + let new_supply = f(supply)?; + sqlx::query( + r#" + INSERT INTO _insights_validators + VALUES ($1, $2, $3, $4) + ON CONFLICT (validator_id, height) DO UPDATE SET + um = excluded.um, + rate_bps2 = excluded.rate_bps2 + "#, + ) + .bind(&ik_text) + .bind(i64::try_from(height)?) + .bind(i64::try_from(new_supply.um)?) + .bind(i64::try_from(new_supply.rate_bps2)?) + .execute(dbtx.as_mut()) + .await?; + Ok(i64::try_from(new_supply.um)? - i64::try_from(supply.um)?) +} + +#[derive(Default, Debug, Clone, Copy)] +struct Supply { + total: u64, + staked: u64, + price: Option, +} + +async fn modify_supply( + dbtx: &mut PgTransaction<'_>, + height: u64, + price_numeraire: Option, + f: Box anyhow::Result + Send + 'static>, +) -> anyhow::Result<()> { + let supply: Supply = { + let row: Option<(i64, i64, Option)> = sqlx::query_as( + "SELECT total, staked, price FROM insights_supply ORDER BY HEIGHT DESC LIMIT 1", + ) + .fetch_optional(dbtx.as_mut()) + .await?; + row.map(|(total, staked, price)| { + anyhow::Result::<_>::Ok(Supply { + total: total.try_into()?, + staked: staked.try_into()?, + price, + }) + }) + .transpose()? + .unwrap_or_default() + }; + let supply = f(supply)?; + sqlx::query( + r#" + INSERT INTO + insights_supply(height, total, staked, price, price_numeraire_asset_id) + VALUES ($1, $2, $3, $5, $4) + ON CONFLICT (height) DO UPDATE SET + total = excluded.total, + staked = excluded.staked, + price = excluded.price, + price_numeraire_asset_id = excluded.price_numeraire_asset_id + "#, + ) + .bind(i64::try_from(height)?) + .bind(i64::try_from(supply.total)?) + .bind(i64::try_from(supply.staked)?) + .bind(price_numeraire.map(|x| x.to_bytes())) + .bind(supply.price) + .execute(dbtx.as_mut()) + .await?; + Ok(()) +} + +#[derive(Debug, Clone, Copy, PartialEq)] +enum DepositorExisted { + Yes, + No, +} + +async fn register_depositor( + dbtx: &mut PgTransaction<'_>, + asset_id: asset::Id, + address: &str, +) -> anyhow::Result { + let exists: bool = sqlx::query_scalar( + "SELECT EXISTS (SELECT 1 FROM _insights_shielded_pool_depositors WHERE asset_id = $1 AND address = $2)", + ) + .bind(asset_id.to_bytes()) + .bind(address) + .fetch_one(dbtx.as_mut()) + .await?; + if exists { + return Ok(DepositorExisted::Yes); + } + sqlx::query("INSERT INTO _insights_shielded_pool_depositors VALUES ($1, $2)") + .bind(asset_id.to_bytes()) + .bind(address) + .execute(dbtx.as_mut()) + .await?; + Ok(DepositorExisted::No) +} + +async fn asset_flow( + dbtx: &mut PgTransaction<'_>, + asset_id: asset::Id, + height: u64, + flow: i128, + depositor_existed: DepositorExisted, +) -> anyhow::Result<()> { + let asset_pool: Option<(String, String, i32)> = sqlx::query_as("SELECT total_value, current_value, unique_depositors FROM insights_shielded_pool WHERE asset_id = $1 ORDER BY height DESC LIMIT 1").bind(asset_id.to_bytes()).fetch_optional(dbtx.as_mut()).await?; + let mut asset_pool = asset_pool + .map(|(t, c, u)| { + anyhow::Result::<(i128, i128, i32)>::Ok(( + i128::from_str_radix(&t, 10)?, + i128::from_str_radix(&c, 10)?, + u, + )) + }) + .transpose()? + .unwrap_or((0i128, 0i128, 0i32)); + asset_pool.0 += flow.abs(); + asset_pool.1 += flow; + asset_pool.2 += match depositor_existed { + DepositorExisted::Yes => 0, + DepositorExisted::No => 1, + }; + sqlx::query( + r#" + INSERT INTO insights_shielded_pool + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (asset_id, height) DO UPDATE SET + total_value = excluded.total_value, + current_value = excluded.current_value, + unique_depositors = excluded.unique_depositors + "#, + ) + .bind(asset_id.to_bytes()) + .bind(i64::try_from(height)?) + .bind(asset_pool.0.to_string()) + .bind(asset_pool.1.to_string()) + .bind(asset_pool.2) + .execute(dbtx.as_mut()) + .await?; + Ok(()) +} + +#[derive(Debug)] +pub struct Component { + price_numeraire: Option, +} + +impl Component { + /// This component depends on a reference asset for the total supply price. + pub fn new(price_numeraire: Option) -> Self { + Self { price_numeraire } + } +} + +/// Add the initial native token supply. +async fn add_genesis_native_token_allocation_supply<'a>( + dbtx: &mut PgTransaction<'a>, + content: &Content, +) -> anyhow::Result<()> { + fn content_mints(content: &Content) -> BTreeMap { + let community_pool_mint = iter::once(( + *STAKING_TOKEN_ASSET_ID, + content.community_pool_content.initial_balance.amount, + )); + let allocation_mints = content + .shielded_pool_content + .allocations + .iter() + .map(|allocation| { + let value = allocation.value(); + (value.asset_id, value.amount) + }); + + let mut out = BTreeMap::new(); + for (id, amount) in community_pool_mint.chain(allocation_mints) { + out.entry(id).and_modify(|x| *x += amount).or_insert(amount); + } + out + } + + let mints = content_mints(content); + + let unstaked = u64::try_from( + mints + .get(&*STAKING_TOKEN_ASSET_ID) + .copied() + .unwrap_or_default() + .value(), + )?; + + let mut staked = 0u64; + // at genesis, assume a 1:1 ratio between delegation amount and native token amount. + for val in &content.stake_content.validators { + let val = Validator::try_from(val.clone())?; + let delegation_amount: u64 = mints + .get(&val.token().id()) + .cloned() + .unwrap_or_default() + .value() + .try_into()?; + staked += delegation_amount; + modify_validator_supply( + dbtx, + 0, + val.identity_key, + Box::new(move |_| { + Ok(ValidatorSupply { + um: delegation_amount, + rate_bps2: 1_0000_0000, + }) + }), + ) + .await?; + } + + modify_supply( + dbtx, + 0, + None, + Box::new(move |_| { + Ok(Supply { + total: unstaked + staked, + staked, + price: None, + }) + }), + ) + .await?; + + Ok(()) +} +#[async_trait] +impl AppView for Component { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + app_state: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + for statement in include_str!("schema.sql").split(";") { + sqlx::query(statement).execute(dbtx.as_mut()).await?; + } + + // decode the initial supply from the genesis + // initial app state is not recomputed from events, because events are not emitted in init_chain. + // instead, the indexer directly parses the genesis. + add_genesis_native_token_allocation_supply(dbtx, &parse_content(app_state.clone())?) + .await?; + Ok(()) + } + + fn is_relevant(&self, type_str: &str) -> bool { + [ + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ] + .into_iter() + .any(|x| type_str == x) + } + + async fn index_event( + &self, + dbtx: &mut PgTransaction, + event: &ContextualizedEvent, + _src_db: &PgPool, + ) -> Result<(), anyhow::Error> { + let height = event.block_height; + if let Ok(e) = EventUndelegate::try_from_event(&event.event) { + let delta = modify_validator_supply( + dbtx, + height, + e.identity_key, + Box::new(move |supply| { + Ok(ValidatorSupply { + um: supply.um + u64::try_from(e.amount.value()).expect(""), + ..supply + }) + }), + ) + .await?; + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + // The amount staked has changed, but no inflation has happened. + Ok(Supply { + staked: u64::try_from(i64::try_from(supply.staked)? + delta)?, + ..supply + }) + }), + ) + .await?; + } else if let Ok(e) = EventDelegate::try_from_event(&event.event) { + let delta = modify_validator_supply( + dbtx, + height, + e.identity_key, + Box::new(move |supply| { + Ok(ValidatorSupply { + um: supply.um + u64::try_from(e.amount.value()).expect(""), + ..supply + }) + }), + ) + .await?; + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + Ok(Supply { + staked: u64::try_from(i64::try_from(supply.staked)? + delta)?, + ..supply + }) + }), + ) + .await?; + } else if let Ok(e) = EventRateDataChange::try_from_event(&event.event) { + let delta = modify_validator_supply( + dbtx, + height, + e.identity_key, + Box::new(move |supply| { + // del_um <- um / old_exchange_rate + // um <- del_um * new_exchange_rate + // so + // um <- um * (new_exchange_rate / old_exchange_rate) + // and the bps cancel out. + let um = (u128::from(supply.um) * e.rate_data.validator_exchange_rate.value()) + .checked_div(supply.rate_bps2.into()) + .unwrap_or(0u128) + .try_into()?; + Ok(ValidatorSupply { + um, + rate_bps2: u64::try_from(e.rate_data.validator_exchange_rate.value())?, + }) + }), + ) + .await?; + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + // Value has been created or destroyed! + Ok(Supply { + total: u64::try_from(i64::try_from(supply.total)? + delta)?, + staked: u64::try_from(i64::try_from(supply.staked)? + delta)?, + ..supply + }) + }), + ) + .await?; + } else if let Ok(e) = EventBlockFees::try_from_event(&event.event) { + let value = e.swapped_fee_total.value(); + if value.asset_id == *STAKING_TOKEN_ASSET_ID { + let amount = u64::try_from(value.amount.value())?; + // We consider the tip to be destroyed too, matching the current logic + // DRAGON: if this changes, this code should use the base fee only. + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + Ok(Supply { + total: supply.total + amount, + ..supply + }) + }), + ) + .await?; + } + } else if let Ok(e) = EventArbExecution::try_from_event(&event.event) { + let input = e.swap_execution.input; + let output = e.swap_execution.output; + if input.asset_id == *STAKING_TOKEN_ASSET_ID + && output.asset_id == *STAKING_TOKEN_ASSET_ID + { + let profit = u64::try_from((output.amount - input.amount).value())?; + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + Ok(Supply { + total: supply.total + profit, + ..supply + }) + }), + ) + .await?; + } + } else if let Ok(e) = EventFundingStreamReward::try_from_event(&event.event) { + let amount = u64::try_from(e.reward_amount.value())?; + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + Ok(Supply { + total: supply.total + amount, + ..supply + }) + }), + ) + .await?; + } else if let Ok(e) = EventInboundFungibleTokenTransfer::try_from_event(&event.event) { + if e.value.asset_id != *STAKING_TOKEN_ASSET_ID { + let existed = register_depositor(dbtx, e.value.asset_id, &e.sender).await?; + let flow = i128::try_from(e.value.amount.value())?; + asset_flow(dbtx, e.value.asset_id, height, flow, existed).await?; + } + } else if let Ok(e) = EventOutboundFungibleTokenTransfer::try_from_event(&event.event) { + if e.value.asset_id != *STAKING_TOKEN_ASSET_ID { + let flow = i128::try_from(e.value.amount.value())?; + // For outbound transfers, never increment unique count + asset_flow(dbtx, e.value.asset_id, height, -flow, DepositorExisted::No).await?; + } + } else if let Ok(e) = EventOutboundFungibleTokenRefund::try_from_event(&event.event) { + if e.value.asset_id != *STAKING_TOKEN_ASSET_ID { + let flow = i128::try_from(e.value.amount.value())?; + // For outbound transfers, never increment unique count. + asset_flow(dbtx, e.value.asset_id, height, flow, DepositorExisted::No).await?; + } + } else if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { + if let Some(pn) = self.price_numeraire { + if e.pair == DirectedTradingPair::new(*STAKING_TOKEN_ASSET_ID, pn) { + let price = e.stick.close; + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + Ok(Supply { + price: Some(price), + ..supply + }) + }), + ) + .await?; + } + } + } + tracing::debug!(?event, "unrecognized event"); + Ok(()) + } +} diff --git a/crates/bin/pindexer/src/insights/schema.sql b/crates/bin/pindexer/src/insights/schema.sql new file mode 100644 index 0000000000..a23d67c055 --- /dev/null +++ b/crates/bin/pindexer/src/insights/schema.sql @@ -0,0 +1,54 @@ +-- A table containing updates to the total supply, and market cap. +CREATE TABLE IF NOT EXISTS insights_supply ( + -- The height where the supply was updated. + height BIGINT PRIMARY KEY, + -- The total supply of the staking token at this height. + total BIGINT NOT NULL, + staked BIGINT NOT NULL, + -- Price, if it can be found for whatever numeraire we choose at runtime. + price FLOAT8, + -- The numeraire for the price we've chosen. + price_numeraire_asset_id BYTEA, + -- The market cap, i.e. price * total amount. + market_cap FLOAT8 GENERATED ALWAYS AS (total::FLOAT8 * price) STORED +); + +-- A working table to save the state around validators we need. +-- +-- This is necessary because rate data changes increase the total supply, +-- but don't directly tell us how much the total supply increased. +CREATE TABLE IF NOT EXISTS _insights_validators ( + -- The validator this row concerns. + validator_id TEXT NOT NULL, + -- The height for the supply update. + height BIGINT NOT NULL, + -- The total amount staked with them, in terms of the native token. + um BIGINT NOT NULL, + -- How much native um we get per unit of the delegation token. + rate_bps2 BIGINT NOT NULL, + PRIMARY KEY (validator_id, height) +); + +-- Our internal representation of the shielded pool table. +CREATE TABLE IF NOT EXISTS insights_shielded_pool ( + -- The asset this concerns. + asset_id BYTEA NOT NULL, + height BIGINT NOT NULL, + -- The total value shielded, in terms of that asset. + total_value TEXT NOT NULL, + -- The current value shielded, in terms of that asset. + current_value TEXT NOT NULL, + -- The number of unique depositors. + unique_depositors INT NOT NULL, + PRIMARY KEY (asset_id, height) +); + +-- Unique depositors into the shielded pool +CREATE TABLE IF NOT EXISTS _insights_shielded_pool_depositors ( + asset_id BYTEA NOT NULL, + address TEXT NOT NULL, + PRIMARY KEY (asset_id, address) +); + +CREATE OR REPLACE VIEW insights_shielded_pool_latest AS + SELECT DISTINCT ON (asset_id) * FROM insights_shielded_pool ORDER BY asset_id, height DESC; diff --git a/crates/bin/pindexer/src/lib.rs b/crates/bin/pindexer/src/lib.rs index e2c2d63476..47429f1c10 100644 --- a/crates/bin/pindexer/src/lib.rs +++ b/crates/bin/pindexer/src/lib.rs @@ -6,6 +6,7 @@ pub mod block; pub mod dex; pub mod dex_ex; pub mod ibc; +pub mod insights; mod parsing; pub mod shielded_pool; mod sql; diff --git a/crates/core/component/fee/src/component.rs b/crates/core/component/fee/src/component.rs index e3c80d9f73..25ffa10451 100644 --- a/crates/core/component/fee/src/component.rs +++ b/crates/core/component/fee/src/component.rs @@ -4,12 +4,12 @@ mod view; use std::sync::Arc; -use crate::{genesis, Fee}; +use crate::{event::EventBlockFees, genesis, Fee}; use async_trait::async_trait; use cnidarium::StateWrite; use cnidarium_component::Component; -use penumbra_proto::core::component::fee::v1 as pb; use penumbra_proto::state::StateWriteProto as _; +use penumbra_proto::DomainType as _; use tendermint::abci; use tracing::instrument; @@ -56,11 +56,14 @@ impl Component for FeeComponent { let swapped_total = swapped_base + swapped_tip; - state_ref.record_proto(pb::EventBlockFees { - swapped_fee_total: Some(Fee::from_staking_token_amount(swapped_total).into()), - swapped_base_fee_total: Some(Fee::from_staking_token_amount(swapped_base).into()), - swapped_tip_total: Some(Fee::from_staking_token_amount(swapped_tip).into()), - }); + state_ref.record_proto( + EventBlockFees { + swapped_fee_total: Fee::from_staking_token_amount(swapped_total), + swapped_base_fee_total: Fee::from_staking_token_amount(swapped_base), + swapped_tip_total: Fee::from_staking_token_amount(swapped_tip), + } + .to_proto(), + ); } #[instrument(name = "fee", skip(_state))] diff --git a/crates/core/component/fee/src/event.rs b/crates/core/component/fee/src/event.rs index 8b13789179..5a87c81976 100644 --- a/crates/core/component/fee/src/event.rs +++ b/crates/core/component/fee/src/event.rs @@ -1 +1,48 @@ +use crate::Fee; +use anyhow::{anyhow, Context}; +use penumbra_proto::{core::component::fee::v1 as pb, DomainType, Name as _}; +#[derive(Clone, Debug)] +pub struct EventBlockFees { + pub swapped_fee_total: Fee, + pub swapped_base_fee_total: Fee, + pub swapped_tip_total: Fee, +} + +impl TryFrom for EventBlockFees { + type Error = anyhow::Error; + + fn try_from(value: pb::EventBlockFees) -> Result { + fn inner(value: pb::EventBlockFees) -> anyhow::Result { + Ok(EventBlockFees { + swapped_fee_total: value + .swapped_fee_total + .ok_or(anyhow!("missing `swapped_fee_total`"))? + .try_into()?, + swapped_base_fee_total: value + .swapped_base_fee_total + .ok_or(anyhow!("missing `swapped_base_fee_total`"))? + .try_into()?, + swapped_tip_total: value + .swapped_tip_total + .ok_or(anyhow!("missing `swapped_tip_total`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventBlockFees::NAME)) + } +} + +impl From for pb::EventBlockFees { + fn from(value: EventBlockFees) -> Self { + Self { + swapped_fee_total: Some(value.swapped_fee_total.into()), + swapped_base_fee_total: Some(value.swapped_base_fee_total.into()), + swapped_tip_total: Some(value.swapped_tip_total.into()), + } + } +} + +impl DomainType for EventBlockFees { + type Proto = pb::EventBlockFees; +} diff --git a/crates/core/component/funding/src/component.rs b/crates/core/component/funding/src/component.rs index b459eac239..a0ea863ec7 100644 --- a/crates/core/component/funding/src/component.rs +++ b/crates/core/component/funding/src/component.rs @@ -6,7 +6,7 @@ pub use metrics::register_metrics; /* Component implementation */ use penumbra_asset::{Value, STAKING_TOKEN_ASSET_ID}; -use penumbra_proto::StateWriteProto; +use penumbra_proto::{DomainType, StateWriteProto}; use penumbra_stake::component::validator_handler::ValidatorDataRead; pub use view::{StateReadExt, StateWriteExt}; @@ -19,7 +19,7 @@ use cnidarium_component::Component; use tendermint::v0_37::abci; use tracing::instrument; -use crate::{event::funding_stream_reward, genesis}; +use crate::{event::EventFundingStreamReward, genesis}; pub struct Funding {} @@ -112,11 +112,14 @@ impl Component for Funding { // If the recipient is an address, mint a note to that address Recipient::Address(address) => { // Record the funding stream reward event: - state.record_proto(funding_stream_reward( - address.to_string(), - base_rate.epoch_index, - reward_amount_for_stream.into(), - )); + state.record_proto( + EventFundingStreamReward { + recipient: address.to_string(), + epoch_index: base_rate.epoch_index, + reward_amount: reward_amount_for_stream, + } + .to_proto(), + ); state .mint_note( @@ -134,11 +137,14 @@ impl Component for Funding { // If the recipient is the Community Pool, deposit the funds into the Community Pool Recipient::CommunityPool => { // Record the funding stream reward event: - state.record_proto(funding_stream_reward( - "community-pool".to_string(), - base_rate.epoch_index, - reward_amount_for_stream.into(), - )); + state.record_proto( + EventFundingStreamReward { + recipient: "community-pool".to_string(), + epoch_index: base_rate.epoch_index, + reward_amount: reward_amount_for_stream, + } + .to_proto(), + ); state .community_pool_deposit(Value { diff --git a/crates/core/component/funding/src/event.rs b/crates/core/component/funding/src/event.rs index c40df99e9a..6d646bfe53 100644 --- a/crates/core/component/funding/src/event.rs +++ b/crates/core/component/funding/src/event.rs @@ -1,14 +1,42 @@ +use anyhow::{anyhow, Context}; use penumbra_num::Amount; -use penumbra_proto::penumbra::core::component::funding::v1 as pb; - -pub fn funding_stream_reward( - recipient: String, - epoch_index: u64, - reward_amount: Amount, -) -> pb::EventFundingStreamReward { - pb::EventFundingStreamReward { - recipient, - epoch_index, - reward_amount: Some(reward_amount.into()), +use penumbra_proto::{penumbra::core::component::funding::v1 as pb, DomainType, Name as _}; + +#[derive(Clone, Debug)] +pub struct EventFundingStreamReward { + pub recipient: String, + pub epoch_index: u64, + pub reward_amount: Amount, +} + +impl TryFrom for EventFundingStreamReward { + type Error = anyhow::Error; + + fn try_from(value: pb::EventFundingStreamReward) -> Result { + fn inner(value: pb::EventFundingStreamReward) -> anyhow::Result { + Ok(EventFundingStreamReward { + recipient: value.recipient, + epoch_index: value.epoch_index, + reward_amount: value + .reward_amount + .ok_or(anyhow!("missing `reward_amount`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventFundingStreamReward::NAME)) } } + +impl From for pb::EventFundingStreamReward { + fn from(value: EventFundingStreamReward) -> Self { + Self { + recipient: value.recipient, + epoch_index: value.epoch_index, + reward_amount: Some(value.reward_amount.into()), + } + } +} + +impl DomainType for EventFundingStreamReward { + type Proto = pb::EventFundingStreamReward; +} diff --git a/crates/core/component/shielded-pool/src/component/action_handler/output.rs b/crates/core/component/shielded-pool/src/component/action_handler/output.rs index 330a39c0e0..18a4515c9f 100644 --- a/crates/core/component/shielded-pool/src/component/action_handler/output.rs +++ b/crates/core/component/shielded-pool/src/component/action_handler/output.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use cnidarium::StateWrite; use cnidarium_component::ActionHandler; use penumbra_proof_params::OUTPUT_PROOF_VERIFICATION_KEY; -use penumbra_proto::StateWriteProto as _; +use penumbra_proto::{DomainType as _, StateWriteProto as _}; use penumbra_sct::component::source::SourceContext; use crate::{component::NoteManager, event, output::OutputProofPublic, Output}; @@ -34,7 +34,12 @@ impl ActionHandler for Output { .add_note_payload(self.body.note_payload.clone(), source) .await; - state.record_proto(event::output(&self.body.note_payload)); + state.record_proto( + event::EventOutput { + note_commitment: self.body.note_payload.note_commitment, + } + .to_proto(), + ); Ok(()) } diff --git a/crates/core/component/shielded-pool/src/component/action_handler/spend.rs b/crates/core/component/shielded-pool/src/component/action_handler/spend.rs index ee4c1ace85..b3da310650 100644 --- a/crates/core/component/shielded-pool/src/component/action_handler/spend.rs +++ b/crates/core/component/shielded-pool/src/component/action_handler/spend.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use cnidarium::StateWrite; use cnidarium_component::ActionHandler; use penumbra_proof_params::SPEND_PROOF_VERIFICATION_KEY; -use penumbra_proto::StateWriteProto as _; +use penumbra_proto::{DomainType, StateWriteProto as _}; use penumbra_sct::component::{ source::SourceContext, tree::{SctManager, VerificationExt}, @@ -49,7 +49,12 @@ impl ActionHandler for Spend { state.nullify(self.body.nullifier, source).await; // Also record an ABCI event for transaction indexing. - state.record_proto(event::spend(&self.body.nullifier)); + state.record_proto( + event::EventSpend { + nullifier: self.body.nullifier, + } + .to_proto(), + ); Ok(()) } diff --git a/crates/core/component/shielded-pool/src/component/transfer.rs b/crates/core/component/shielded-pool/src/component/transfer.rs index 18308e82ca..88c0e799ab 100644 --- a/crates/core/component/shielded-pool/src/component/transfer.rs +++ b/crates/core/component/shielded-pool/src/component/transfer.rs @@ -2,7 +2,8 @@ use std::str::FromStr; use crate::{ component::{AssetRegistry, NoteManager}, - event, Ics20Withdrawal, + event::{self, FungibleTokenTransferPacketMetadata}, + Ics20Withdrawal, }; use anyhow::{Context, Result}; use async_trait::async_trait; @@ -25,8 +26,8 @@ use penumbra_ibc::component::ChannelStateReadExt; use penumbra_keys::Address; use penumbra_num::Amount; use penumbra_proto::{ - core::component::shielded_pool::v1::FungibleTokenTransferPacketMetadata, - penumbra::core::component::ibc::v1::FungibleTokenPacketData, StateReadProto, StateWriteProto, + penumbra::core::component::ibc::v1::FungibleTokenPacketData, DomainType as _, StateReadProto, + StateWriteProto, }; use penumbra_sct::CommitmentSource; @@ -118,23 +119,26 @@ pub trait Ics20TransferWriteExt: StateWrite { ), new_value_balance, ); - self.record_proto(event::outbound_fungible_token_transfer( - Value { - amount: withdrawal.amount, - asset_id: withdrawal.denom.id(), - }, - &withdrawal.return_address, - withdrawal.destination_chain_address.clone(), - FungibleTokenTransferPacketMetadata { - channel: withdrawal.source_channel.0.clone(), - sequence: self - .get_send_sequence( - &withdrawal.source_channel, - &checked_packet.source_port(), - ) - .await?, - }, - )); + self.record_proto( + event::EventOutboundFungibleTokenTransfer { + value: Value { + amount: withdrawal.amount, + asset_id: withdrawal.denom.id(), + }, + sender: withdrawal.return_address.clone(), + receiver: withdrawal.destination_chain_address.clone(), + meta: FungibleTokenTransferPacketMetadata { + channel: withdrawal.source_channel.0.clone(), + sequence: self + .get_send_sequence( + &withdrawal.source_channel, + &checked_packet.source_port(), + ) + .await?, + }, + } + .to_proto(), + ); } else { // receiver is the source, burn utxos @@ -168,23 +172,26 @@ pub trait Ics20TransferWriteExt: StateWrite { ), new_value_balance, ); - self.record_proto(event::outbound_fungible_token_transfer( - Value { - amount: withdrawal.amount, - asset_id: withdrawal.denom.id(), - }, - &withdrawal.return_address, - withdrawal.destination_chain_address.clone(), - FungibleTokenTransferPacketMetadata { - channel: withdrawal.source_channel.0.clone(), - sequence: self - .get_send_sequence( - &withdrawal.source_channel, - &checked_packet.source_port(), - ) - .await?, - }, - )); + self.record_proto( + event::EventOutboundFungibleTokenTransfer { + value: Value { + amount: withdrawal.amount, + asset_id: withdrawal.denom.id(), + }, + sender: withdrawal.return_address.clone(), + receiver: withdrawal.destination_chain_address.clone(), + meta: FungibleTokenTransferPacketMetadata { + channel: withdrawal.source_channel.0.clone(), + sequence: self + .get_send_sequence( + &withdrawal.source_channel, + &checked_packet.source_port(), + ) + .await?, + }, + } + .to_proto(), + ); } self.send_packet_execute(checked_packet).await; @@ -388,15 +395,18 @@ async fn recv_transfer_packet_inner( state_key::ics20_value_balance::by_asset_id(&msg.packet.chan_on_b, &denom.id()), new_value_balance, ); - state.record_proto(event::inbound_fungible_token_transfer( - value, - packet_data.sender.clone(), - &receiver_address, - FungibleTokenTransferPacketMetadata { - channel: msg.packet.chan_on_a.0.clone(), - sequence: msg.packet.sequence.0, - }, - )); + state.record_proto( + event::EventInboundFungibleTokenTransfer { + value, + sender: packet_data.sender.clone(), + receiver: receiver_address, + meta: FungibleTokenTransferPacketMetadata { + channel: msg.packet.chan_on_a.0.clone(), + sequence: msg.packet.sequence.0, + }, + } + .to_proto(), + ); } else { // create new denom: // @@ -448,15 +458,18 @@ async fn recv_transfer_packet_inner( state_key::ics20_value_balance::by_asset_id(&msg.packet.chan_on_b, &denom.id()), new_value_balance, ); - state.record_proto(event::inbound_fungible_token_transfer( - value, - packet_data.sender.clone(), - &receiver_address, - FungibleTokenTransferPacketMetadata { - channel: msg.packet.chan_on_a.0.clone(), - sequence: msg.packet.sequence.0, - }, - )); + state.record_proto( + event::EventInboundFungibleTokenTransfer { + value, + sender: packet_data.sender.clone(), + receiver: receiver_address, + meta: FungibleTokenTransferPacketMetadata { + channel: msg.packet.chan_on_a.0.clone(), + sequence: msg.packet.sequence.0, + }, + } + .to_proto(), + ); } Ok(()) @@ -527,17 +540,20 @@ async fn refund_tokens( state_key::ics20_value_balance::by_asset_id(&packet.chan_on_a, &denom.id()), new_value_balance, ); - state.record_proto(event::outbound_fungible_token_refund( - value, - &receiver, // note, this comes from packet_data.sender - packet_data.receiver.clone(), - reason, - // Use the destination channel, i.e. our name for it, to be consistent across events. - FungibleTokenTransferPacketMetadata { - channel: packet.chan_on_b.0.clone(), - sequence: packet.sequence.0, - }, - )); + state.record_proto( + event::EventOutboundFungibleTokenRefund { + value, + sender: receiver, // note, this comes from packet_data.sender + receiver: packet_data.receiver.clone(), + reason, + // Use the destination channel, i.e. our name for it, to be consistent across events. + meta: FungibleTokenTransferPacketMetadata { + channel: packet.chan_on_b.0.clone(), + sequence: packet.sequence.0, + }, + } + .to_proto(), + ); } else { let value_balance: Amount = state .get(&state_key::ics20_value_balance::by_asset_id( @@ -566,17 +582,20 @@ async fn refund_tokens( state_key::ics20_value_balance::by_asset_id(&packet.chan_on_a, &denom.id()), new_value_balance, ); - // note, order flipped relative to the event. - state.record_proto(event::outbound_fungible_token_refund( - value, - &receiver, // note, this comes from packet_data.sender - packet_data.receiver.clone(), - reason, - FungibleTokenTransferPacketMetadata { - channel: packet.chan_on_b.0.clone(), - sequence: packet.sequence.0, - }, - )); + state.record_proto( + event::EventOutboundFungibleTokenRefund { + value, + sender: receiver, // note, this comes from packet_data.sender + receiver: packet_data.receiver.clone(), + reason, + // Use the destination channel, i.e. our name for it, to be consistent across events. + meta: FungibleTokenTransferPacketMetadata { + channel: packet.chan_on_b.0.clone(), + sequence: packet.sequence.0, + }, + } + .to_proto(), + ); } Ok(()) diff --git a/crates/core/component/shielded-pool/src/event.rs b/crates/core/component/shielded-pool/src/event.rs index f13f1398c0..e0f14b67fa 100644 --- a/crates/core/component/shielded-pool/src/event.rs +++ b/crates/core/component/shielded-pool/src/event.rs @@ -1,79 +1,274 @@ +use anyhow::{anyhow, Context}; use penumbra_asset::Value; use penumbra_keys::Address; -use penumbra_proto::core::component::shielded_pool::v1::{ - event_outbound_fungible_token_refund::Reason, EventInboundFungibleTokenTransfer, - EventOutboundFungibleTokenRefund, EventOutboundFungibleTokenTransfer, EventOutput, EventSpend, - FungibleTokenTransferPacketMetadata, -}; +use penumbra_proto::{core::component::shielded_pool::v1 as pb, DomainType}; use penumbra_sct::Nullifier; +use prost::Name as _; -use crate::NotePayload; +use crate::note::StateCommitment; -// These are sort of like the proto/domain type From impls, because -// we don't have separate domain types for the events (yet, possibly ever). +// // These are sort of like the proto/domain type From impls, because +// // we don't have separate domain types for the events (yet, possibly ever). +// Narrator: we did in fact need the separate domain types. -pub fn spend(nullifier: &Nullifier) -> EventSpend { - EventSpend { - nullifier: Some((*nullifier).into()), +#[derive(Clone, Debug)] +pub struct EventSpend { + pub nullifier: Nullifier, +} + +impl TryFrom for EventSpend { + type Error = anyhow::Error; + + fn try_from(value: pb::EventSpend) -> Result { + fn inner(value: pb::EventSpend) -> anyhow::Result { + Ok(EventSpend { + nullifier: value + .nullifier + .ok_or(anyhow!("missing `nullifier`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventSpend::NAME)) } } -pub fn output(note_payload: &NotePayload) -> EventOutput { - EventOutput { - note_commitment: Some(note_payload.note_commitment.into()), +impl From for pb::EventSpend { + fn from(value: EventSpend) -> Self { + Self { + nullifier: Some(value.nullifier.into()), + } + } +} + +impl DomainType for EventSpend { + type Proto = pb::EventSpend; +} + +#[derive(Clone, Debug)] +pub struct EventOutput { + pub note_commitment: StateCommitment, +} + +impl TryFrom for EventOutput { + type Error = anyhow::Error; + + fn try_from(value: pb::EventOutput) -> Result { + fn inner(value: pb::EventOutput) -> anyhow::Result { + Ok(EventOutput { + note_commitment: value + .note_commitment + .ok_or(anyhow!("missing `note_commitment`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventOutput::NAME)) } } -pub fn outbound_fungible_token_transfer( - value: Value, - sender: &Address, - receiver: String, - meta: FungibleTokenTransferPacketMetadata, -) -> EventOutboundFungibleTokenTransfer { - EventOutboundFungibleTokenTransfer { - value: Some(value.into()), - sender: Some(sender.into()), - receiver, - meta: Some(meta), +impl From for pb::EventOutput { + fn from(value: EventOutput) -> Self { + Self { + note_commitment: Some(value.note_commitment.into()), + } } } +impl DomainType for EventOutput { + type Proto = pb::EventOutput; +} + +#[derive(Clone, Debug)] +pub struct FungibleTokenTransferPacketMetadata { + pub channel: String, + pub sequence: u64, +} + +impl TryFrom for FungibleTokenTransferPacketMetadata { + type Error = anyhow::Error; + + fn try_from(value: pb::FungibleTokenTransferPacketMetadata) -> Result { + fn inner( + value: pb::FungibleTokenTransferPacketMetadata, + ) -> anyhow::Result { + Ok(FungibleTokenTransferPacketMetadata { + channel: value.channel, + sequence: value.sequence, + }) + } + inner(value).context(format!( + "parsing {}", + pb::FungibleTokenTransferPacketMetadata::NAME + )) + } +} + +impl From for pb::FungibleTokenTransferPacketMetadata { + fn from(value: FungibleTokenTransferPacketMetadata) -> Self { + Self { + channel: value.channel, + sequence: value.sequence, + } + } +} + +impl DomainType for FungibleTokenTransferPacketMetadata { + type Proto = pb::FungibleTokenTransferPacketMetadata; +} + +#[derive(Clone, Debug)] +pub struct EventOutboundFungibleTokenTransfer { + pub value: Value, + pub sender: Address, + pub receiver: String, + pub meta: FungibleTokenTransferPacketMetadata, +} + +impl TryFrom for EventOutboundFungibleTokenTransfer { + type Error = anyhow::Error; + + fn try_from(value: pb::EventOutboundFungibleTokenTransfer) -> Result { + fn inner( + value: pb::EventOutboundFungibleTokenTransfer, + ) -> anyhow::Result { + Ok(EventOutboundFungibleTokenTransfer { + value: value.value.ok_or(anyhow!("missing `value`"))?.try_into()?, + sender: value + .sender + .ok_or(anyhow!("missing `sender`"))? + .try_into()?, + receiver: value.receiver, + meta: value.meta.ok_or(anyhow!("missing `meta`"))?.try_into()?, + }) + } + inner(value).context(format!( + "parsing {}", + pb::EventOutboundFungibleTokenTransfer::NAME + )) + } +} + +impl From for pb::EventOutboundFungibleTokenTransfer { + fn from(value: EventOutboundFungibleTokenTransfer) -> Self { + Self { + value: Some(value.value.into()), + sender: Some(value.sender.into()), + receiver: value.receiver, + meta: Some(value.meta.into()), + } + } +} + +impl DomainType for EventOutboundFungibleTokenTransfer { + type Proto = pb::EventOutboundFungibleTokenTransfer; +} + #[derive(Clone, Copy, Debug)] +#[repr(i32)] pub enum FungibleTokenRefundReason { - Timeout, - Error, -} - -pub fn outbound_fungible_token_refund( - value: Value, - sender: &Address, - receiver: String, - reason: FungibleTokenRefundReason, - meta: FungibleTokenTransferPacketMetadata, -) -> EventOutboundFungibleTokenRefund { - let reason = match reason { - FungibleTokenRefundReason::Timeout => Reason::Timeout, - FungibleTokenRefundReason::Error => Reason::Error, - }; - EventOutboundFungibleTokenRefund { - value: Some(value.into()), - sender: Some(sender.into()), - receiver, - reason: reason as i32, - meta: Some(meta), + Unspecified = 0, + Timeout = 1, + Error = 2, +} + +#[derive(Clone, Debug)] +pub struct EventOutboundFungibleTokenRefund { + pub value: Value, + pub sender: Address, + pub receiver: String, + pub reason: FungibleTokenRefundReason, + pub meta: FungibleTokenTransferPacketMetadata, +} + +impl TryFrom for EventOutboundFungibleTokenRefund { + type Error = anyhow::Error; + + fn try_from(value: pb::EventOutboundFungibleTokenRefund) -> Result { + fn inner( + value: pb::EventOutboundFungibleTokenRefund, + ) -> anyhow::Result { + use pb::event_outbound_fungible_token_refund::Reason; + let reason = match value.reason() { + Reason::Timeout => FungibleTokenRefundReason::Timeout, + Reason::Error => FungibleTokenRefundReason::Error, + Reason::Unspecified => FungibleTokenRefundReason::Unspecified, + }; + Ok(EventOutboundFungibleTokenRefund { + value: value.value.ok_or(anyhow!("missing `value`"))?.try_into()?, + sender: value + .sender + .ok_or(anyhow!("missing `sender`"))? + .try_into()?, + receiver: value.receiver, + reason, + meta: value.meta.ok_or(anyhow!("missing `meta`"))?.try_into()?, + }) + } + inner(value).context(format!( + "parsing {}", + pb::EventOutboundFungibleTokenRefund::NAME + )) } } -pub fn inbound_fungible_token_transfer( - value: Value, - sender: String, - receiver: &Address, - meta: FungibleTokenTransferPacketMetadata, -) -> EventInboundFungibleTokenTransfer { - EventInboundFungibleTokenTransfer { - value: Some(value.into()), - sender, - receiver: Some(receiver.into()), - meta: Some(meta), +impl From for pb::EventOutboundFungibleTokenRefund { + fn from(value: EventOutboundFungibleTokenRefund) -> Self { + Self { + value: Some(value.value.into()), + sender: Some(value.sender.into()), + receiver: value.receiver, + reason: value.reason as i32, + meta: Some(value.meta.into()), + } } } + +impl DomainType for EventOutboundFungibleTokenRefund { + type Proto = pb::EventOutboundFungibleTokenRefund; +} + +#[derive(Clone, Debug)] +pub struct EventInboundFungibleTokenTransfer { + pub value: Value, + pub sender: String, + pub receiver: Address, + pub meta: FungibleTokenTransferPacketMetadata, +} + +impl TryFrom for EventInboundFungibleTokenTransfer { + type Error = anyhow::Error; + + fn try_from(value: pb::EventInboundFungibleTokenTransfer) -> Result { + fn inner( + value: pb::EventInboundFungibleTokenTransfer, + ) -> anyhow::Result { + Ok(EventInboundFungibleTokenTransfer { + value: value.value.ok_or(anyhow!("missing `value`"))?.try_into()?, + sender: value.sender, + receiver: value + .receiver + .ok_or(anyhow!("missing `receiver`"))? + .try_into()?, + meta: value.meta.ok_or(anyhow!("missing `meta`"))?.try_into()?, + }) + } + inner(value).context(format!( + "parsing {}", + pb::EventInboundFungibleTokenTransfer::NAME + )) + } +} + +impl From for pb::EventInboundFungibleTokenTransfer { + fn from(value: EventInboundFungibleTokenTransfer) -> Self { + Self { + value: Some(value.value.into()), + sender: value.sender, + receiver: Some(value.receiver.into()), + meta: Some(value.meta.into()), + } + } +} + +impl DomainType for EventInboundFungibleTokenTransfer { + type Proto = pb::EventInboundFungibleTokenTransfer; +} diff --git a/crates/core/component/stake/src/component/action_handler/delegate.rs b/crates/core/component/stake/src/component/action_handler/delegate.rs index f27231f1d7..5b01bc9963 100644 --- a/crates/core/component/stake/src/component/action_handler/delegate.rs +++ b/crates/core/component/stake/src/component/action_handler/delegate.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use cnidarium::StateWrite; use cnidarium_component::ActionHandler; use penumbra_num::Amount; -use penumbra_proto::StateWriteProto; +use penumbra_proto::{DomainType, StateWriteProto}; use penumbra_sct::component::clock::EpochRead; use crate::{ @@ -132,7 +132,7 @@ impl ActionHandler for Delegate { // We queue the delegation so it can be processed at the epoch boundary. tracing::debug!(?self, "queuing delegation for next epoch"); state.push_delegation(self.clone()); - state.record_proto(event::delegate(self)); + state.record_proto(event::EventDelegate::from(self).to_proto()); Ok(()) } } diff --git a/crates/core/component/stake/src/component/action_handler/undelegate.rs b/crates/core/component/stake/src/component/action_handler/undelegate.rs index 6edfa1eaa8..da3947ee01 100644 --- a/crates/core/component/stake/src/component/action_handler/undelegate.rs +++ b/crates/core/component/stake/src/component/action_handler/undelegate.rs @@ -1,7 +1,7 @@ use anyhow::Result; use async_trait::async_trait; use cnidarium::StateWrite; -use penumbra_proto::StateWriteProto; +use penumbra_proto::{DomainType as _, StateWriteProto}; use penumbra_sct::component::clock::EpochRead; use penumbra_shielded_pool::component::AssetRegistry; @@ -85,7 +85,7 @@ impl ActionHandler for Undelegate { tracing::debug!(?self, "queuing undelegation for next epoch"); state.push_undelegation(self.clone()); - state.record_proto(event::undelegate(self)); + state.record_proto(event::EventUndelegate::from(self).to_proto()); Ok(()) } diff --git a/crates/core/component/stake/src/component/stake.rs b/crates/core/component/stake/src/component/stake.rs index c2984312d2..98b8bf8139 100644 --- a/crates/core/component/stake/src/component/stake.rs +++ b/crates/core/component/stake/src/component/stake.rs @@ -1,6 +1,6 @@ pub mod address; -use crate::event::slashing_penalty_applied; +use crate::event::EventSlashingPenaltyApplied; use crate::params::StakeParameters; use crate::rate::BaseRateData; use crate::validator::{self, Validator}; @@ -15,7 +15,7 @@ use cnidarium::{StateRead, StateWrite}; use cnidarium_component::Component; use futures::{StreamExt, TryStreamExt}; use penumbra_num::Amount; -use penumbra_proto::{StateReadProto, StateWriteProto}; +use penumbra_proto::{DomainType, StateReadProto, StateWriteProto}; use penumbra_sct::component::clock::EpochRead; use std::pin::Pin; use std::str::FromStr; @@ -475,11 +475,14 @@ pub(crate) trait RateDataWrite: StateWrite { let new_penalty = current_penalty.compound(slashing_penalty); // Emit an event indicating the validator had a slashing penalty applied. - self.record_proto(slashing_penalty_applied( - *identity_key, - current_epoch_index, - new_penalty, - )); + self.record_proto( + EventSlashingPenaltyApplied { + identity_key: *identity_key, + epoch_index: current_epoch_index, + new_penalty, + } + .to_proto(), + ); self.put( state_key::penalty::for_id_in_epoch(identity_key, current_epoch_index), new_penalty, diff --git a/crates/core/component/stake/src/component/validator_handler/uptime_tracker.rs b/crates/core/component/stake/src/component/validator_handler/uptime_tracker.rs index c22e77abe9..b3b9999165 100644 --- a/crates/core/component/stake/src/component/validator_handler/uptime_tracker.rs +++ b/crates/core/component/stake/src/component/validator_handler/uptime_tracker.rs @@ -17,7 +17,7 @@ use { async_trait::async_trait, cnidarium::StateWrite, futures::StreamExt as _, - penumbra_proto::StateWriteProto, + penumbra_proto::{DomainType, StateWriteProto}, penumbra_sct::component::clock::EpochRead, std::collections::BTreeMap, tap::Tap, @@ -178,7 +178,7 @@ pub trait ValidatorUptimeTracker: StateWrite { if !voted { // If the validator didn't sign, we need to emit a missed block event. - self.record_proto(event::validator_missed_block(identity_key)); + self.record_proto(event::EventValidatorMissedBlock { identity_key }.to_proto()); } uptime.mark_height_as_signed(height, voted)?; diff --git a/crates/core/component/stake/src/component/validator_handler/validator_manager.rs b/crates/core/component/stake/src/component/validator_handler/validator_manager.rs index 2fa17fc577..2064291966 100644 --- a/crates/core/component/stake/src/component/validator_handler/validator_manager.rs +++ b/crates/core/component/stake/src/component/validator_handler/validator_manager.rs @@ -24,7 +24,7 @@ use { cnidarium::StateWrite, penumbra_asset::asset, penumbra_num::Amount, - penumbra_proto::StateWriteProto, + penumbra_proto::{DomainType as _, StateWriteProto}, penumbra_sct::component::{ clock::{EpochManager, EpochRead}, StateReadExt as _, @@ -302,7 +302,13 @@ pub trait ValidatorManager: StateWrite { tracing::info!("successful state transition"); self.put(validator_state_path, new_state); - self.record_proto(event::validator_state_change(*identity_key, new_state)); + self.record_proto( + event::EventValidatorStateChange { + identity_key: *identity_key, + state: new_state, + } + .to_proto(), + ); Ok((old_state, new_state)) } @@ -481,7 +487,12 @@ pub trait ValidatorManager: StateWrite { // Track the validator's definition in an event (the rest of the attributes will be tracked // in events emitted by the calls to set_* methods below). - self.record_proto(event::validator_definition_upload(validator.clone())); + self.record_proto( + event::EventValidatorDefinitionUpload { + validator: validator.clone(), + } + .to_proto(), + ); // We initialize the validator's state, power, and bonding state. self.set_initial_validator_state(&validator_identity, initial_state)?; @@ -599,7 +610,7 @@ pub trait ValidatorManager: StateWrite { ); // Track the validator's definition in an event. - self.record_proto(event::validator_definition_upload(validator)); + self.record_proto(event::EventValidatorDefinitionUpload { validator }.to_proto()); Ok(()) } @@ -664,11 +675,14 @@ pub trait ValidatorManager: StateWrite { if let (Inactive | Jailed | Active, Tombstoned) = (old_state, new_state) { let current_height = self.get_block_height().await?; - self.record_proto(event::tombstone_validator( - current_height, - validator.identity_key.clone(), - evidence, - )); + self.record_proto( + event::EventTombstoneValidator::from_evidence( + current_height, + validator.identity_key.clone(), + evidence, + ) + .to_proto(), + ); } Ok(()) diff --git a/crates/core/component/stake/src/component/validator_handler/validator_store.rs b/crates/core/component/stake/src/component/validator_handler/validator_store.rs index 01e6e00bb4..b3e7508ce1 100644 --- a/crates/core/component/stake/src/component/validator_handler/validator_store.rs +++ b/crates/core/component/stake/src/component/validator_handler/validator_store.rs @@ -253,7 +253,13 @@ pub(crate) trait ValidatorDataWrite: StateWrite { state_key::validators::pool::bonding_state::by_id(identity_key), state.clone(), ); - self.record_proto(event::validator_bonding_state_change(*identity_key, state)); + self.record_proto( + event::EventValidatorBondingStateChange { + identity_key: *identity_key, + bonding_state: state, + } + .to_proto(), + ); } #[instrument(skip(self))] @@ -270,10 +276,13 @@ pub(crate) trait ValidatorDataWrite: StateWrite { state_key::validators::power::by_id(identity_key), voting_power, ); - self.record_proto(event::validator_voting_power_change( - *identity_key, - voting_power, - )); + self.record_proto( + event::EventValidatorVotingPowerChange { + identity_key: *identity_key, + voting_power, + } + .to_proto(), + ); Ok(()) } @@ -290,7 +299,13 @@ pub(crate) trait ValidatorDataWrite: StateWrite { } self.put(state_key::validators::state::by_id(id), initial_state); - self.record_proto(event::validator_state_change(*id, initial_state)); + self.record_proto( + event::EventValidatorStateChange { + identity_key: *id, + state: initial_state, + } + .to_proto(), + ); Ok(()) } @@ -301,7 +316,13 @@ pub(crate) trait ValidatorDataWrite: StateWrite { state_key::validators::rate::current_by_id(identity_key), rate_data.clone(), ); - self.record_proto(event::validator_rate_data_change(*identity_key, rate_data)); + self.record_proto( + event::EventRateDataChange { + identity_key: *identity_key, + rate_data, + } + .to_proto(), + ); } #[instrument(skip(self))] diff --git a/crates/core/component/stake/src/event.rs b/crates/core/component/stake/src/event.rs index 86de0322b2..cb0d184af4 100644 --- a/crates/core/component/stake/src/event.rs +++ b/crates/core/component/stake/src/event.rs @@ -1,100 +1,448 @@ use crate::{ - rate, + rate::RateData, validator::{BondingState, State, Validator}, Delegate, IdentityKey, Penalty, Undelegate, }; +use anyhow::{anyhow, Context as _}; use penumbra_num::Amount; -use penumbra_proto::core::component::stake::v1 as pb; +use penumbra_proto::{core::component::stake::v1 as pb, DomainType, Name as _}; use tendermint::abci::types::Misbehavior; -pub fn validator_state_change( - identity_key: IdentityKey, - state: State, -) -> pb::EventValidatorStateChange { - pb::EventValidatorStateChange { - identity_key: Some(identity_key.into()), - state: Some(state.into()), +#[derive(Clone, Debug)] +pub struct EventValidatorStateChange { + pub identity_key: IdentityKey, + pub state: State, +} + +impl TryFrom for EventValidatorStateChange { + type Error = anyhow::Error; + + fn try_from(value: pb::EventValidatorStateChange) -> Result { + fn inner( + value: pb::EventValidatorStateChange, + ) -> anyhow::Result { + Ok(EventValidatorStateChange { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + state: value.state.ok_or(anyhow!("missing `state`"))?.try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventValidatorStateChange::NAME)) } } -pub fn validator_voting_power_change( - identity_key: IdentityKey, - voting_power: Amount, -) -> pb::EventValidatorVotingPowerChange { - pb::EventValidatorVotingPowerChange { - identity_key: Some(identity_key.into()), - voting_power: Some(voting_power.into()), +impl From for pb::EventValidatorStateChange { + fn from(value: EventValidatorStateChange) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + state: Some(value.state.into()), + } } } -pub fn validator_bonding_state_change( - identity_key: IdentityKey, - bonding_state: BondingState, -) -> pb::EventValidatorBondingStateChange { - pb::EventValidatorBondingStateChange { - identity_key: Some(identity_key.into()), - bonding_state: Some(bonding_state.into()), +impl DomainType for EventValidatorStateChange { + type Proto = pb::EventValidatorStateChange; +} + +#[derive(Clone, Debug)] +pub struct EventValidatorVotingPowerChange { + pub identity_key: IdentityKey, + pub voting_power: Amount, +} + +impl TryFrom for EventValidatorVotingPowerChange { + type Error = anyhow::Error; + + fn try_from(value: pb::EventValidatorVotingPowerChange) -> Result { + fn inner( + value: pb::EventValidatorVotingPowerChange, + ) -> anyhow::Result { + Ok(EventValidatorVotingPowerChange { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + voting_power: value + .voting_power + .ok_or(anyhow!("missing `voting_power`"))? + .try_into()?, + }) + } + inner(value).context(format!( + "parsing {}", + pb::EventValidatorVotingPowerChange::NAME + )) } } -pub fn validator_rate_data_change( - identity_key: IdentityKey, - rate_data: rate::RateData, -) -> pb::EventRateDataChange { - pb::EventRateDataChange { - identity_key: Some(identity_key.into()), - rate_data: Some(rate_data.into()), +impl From for pb::EventValidatorVotingPowerChange { + fn from(value: EventValidatorVotingPowerChange) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + voting_power: Some(value.voting_power.into()), + } } } -pub fn validator_definition_upload(validator: Validator) -> pb::EventValidatorDefinitionUpload { - pb::EventValidatorDefinitionUpload { - validator: Some(validator.into()), +impl DomainType for EventValidatorVotingPowerChange { + type Proto = pb::EventValidatorVotingPowerChange; +} + +#[derive(Clone, Debug)] +pub struct EventValidatorBondingStateChange { + pub identity_key: IdentityKey, + pub bonding_state: BondingState, +} + +impl TryFrom for EventValidatorBondingStateChange { + type Error = anyhow::Error; + + fn try_from(value: pb::EventValidatorBondingStateChange) -> Result { + fn inner( + value: pb::EventValidatorBondingStateChange, + ) -> anyhow::Result { + Ok(EventValidatorBondingStateChange { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + bonding_state: value + .bonding_state + .ok_or(anyhow!("missing `bonding_state`"))? + .try_into()?, + }) + } + inner(value).context(format!( + "parsing {}", + pb::EventValidatorBondingStateChange::NAME + )) } } -pub fn validator_missed_block(identity_key: IdentityKey) -> pb::EventValidatorMissedBlock { - pb::EventValidatorMissedBlock { - identity_key: Some(identity_key.into()), +impl From for pb::EventValidatorBondingStateChange { + fn from(value: EventValidatorBondingStateChange) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + bonding_state: Some(value.bonding_state.into()), + } } } -pub fn delegate(delegate: &Delegate) -> pb::EventDelegate { - pb::EventDelegate { - identity_key: Some(delegate.validator_identity.into()), - amount: Some(delegate.unbonded_amount.into()), +impl DomainType for EventValidatorBondingStateChange { + type Proto = pb::EventValidatorBondingStateChange; +} + +#[derive(Clone, Debug)] +pub struct EventRateDataChange { + pub identity_key: IdentityKey, + pub rate_data: RateData, +} + +impl TryFrom for EventRateDataChange { + type Error = anyhow::Error; + + fn try_from(value: pb::EventRateDataChange) -> Result { + fn inner(value: pb::EventRateDataChange) -> anyhow::Result { + Ok(EventRateDataChange { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + rate_data: value + .rate_data + .ok_or(anyhow!("missing `rate_data`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventRateDataChange::NAME)) } } -pub fn undelegate(undelegate: &Undelegate) -> pb::EventUndelegate { - pb::EventUndelegate { - identity_key: Some(undelegate.validator_identity.into()), - amount: Some(undelegate.unbonded_amount.into()), +impl From for pb::EventRateDataChange { + fn from(value: EventRateDataChange) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + rate_data: Some(value.rate_data.into()), + } } } -pub fn tombstone_validator( - current_height: u64, - identity_key: IdentityKey, - evidence: &Misbehavior, -) -> pb::EventTombstoneValidator { - pb::EventTombstoneValidator { - evidence_height: evidence.height.value(), - current_height, - identity_key: Some(identity_key.into()), - address: evidence.validator.address.to_vec(), - voting_power: evidence.validator.power.value(), +impl DomainType for EventRateDataChange { + type Proto = pb::EventRateDataChange; +} + +#[derive(Clone, Debug)] +pub struct EventValidatorDefinitionUpload { + pub validator: Validator, +} + +impl TryFrom for EventValidatorDefinitionUpload { + type Error = anyhow::Error; + + fn try_from(value: pb::EventValidatorDefinitionUpload) -> Result { + fn inner( + value: pb::EventValidatorDefinitionUpload, + ) -> anyhow::Result { + Ok(EventValidatorDefinitionUpload { + validator: value + .validator + .ok_or(anyhow!("missing `validator`"))? + .try_into()?, + }) + } + inner(value).context(format!( + "parsing {}", + pb::EventValidatorDefinitionUpload::NAME + )) } } -pub fn slashing_penalty_applied( - identity_key: IdentityKey, - epoch_index: u64, - new_penalty: Penalty, -) -> pb::EventSlashingPenaltyApplied { - pb::EventSlashingPenaltyApplied { - identity_key: Some(identity_key.into()), - epoch_index, - new_penalty: Some(new_penalty.into()), +impl From for pb::EventValidatorDefinitionUpload { + fn from(value: EventValidatorDefinitionUpload) -> Self { + Self { + validator: Some(value.validator.into()), + } + } +} + +impl DomainType for EventValidatorDefinitionUpload { + type Proto = pb::EventValidatorDefinitionUpload; +} + +#[derive(Clone, Debug)] +pub struct EventValidatorMissedBlock { + pub identity_key: IdentityKey, +} + +impl TryFrom for EventValidatorMissedBlock { + type Error = anyhow::Error; + + fn try_from(value: pb::EventValidatorMissedBlock) -> Result { + fn inner( + value: pb::EventValidatorMissedBlock, + ) -> anyhow::Result { + Ok(EventValidatorMissedBlock { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventValidatorMissedBlock::NAME)) + } +} + +impl From for pb::EventValidatorMissedBlock { + fn from(value: EventValidatorMissedBlock) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + } + } +} + +impl DomainType for EventValidatorMissedBlock { + type Proto = pb::EventValidatorMissedBlock; +} + +#[derive(Clone, Debug)] +pub struct EventDelegate { + pub identity_key: IdentityKey, + pub amount: Amount, +} + +impl From<&Delegate> for EventDelegate { + fn from(value: &Delegate) -> Self { + Self { + identity_key: value.validator_identity, + amount: value.unbonded_amount, + } + } +} + +impl TryFrom for EventDelegate { + type Error = anyhow::Error; + + fn try_from(value: pb::EventDelegate) -> Result { + fn inner(value: pb::EventDelegate) -> anyhow::Result { + Ok(EventDelegate { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + amount: value + .amount + .ok_or(anyhow!("missing `amount`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventDelegate::NAME)) } } + +impl From for pb::EventDelegate { + fn from(value: EventDelegate) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + amount: Some(value.amount.into()), + } + } +} + +impl DomainType for EventDelegate { + type Proto = pb::EventDelegate; +} + +#[derive(Clone, Debug)] +pub struct EventUndelegate { + pub identity_key: IdentityKey, + pub amount: Amount, +} + +impl From<&Undelegate> for EventUndelegate { + fn from(value: &Undelegate) -> Self { + Self { + identity_key: value.validator_identity, + amount: value.unbonded_amount, + } + } +} + +impl TryFrom for EventUndelegate { + type Error = anyhow::Error; + + fn try_from(value: pb::EventUndelegate) -> Result { + fn inner(value: pb::EventUndelegate) -> anyhow::Result { + Ok(EventUndelegate { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + amount: value + .amount + .ok_or(anyhow!("missing `amount`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventUndelegate::NAME)) + } +} + +impl From for pb::EventUndelegate { + fn from(value: EventUndelegate) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + amount: Some(value.amount.into()), + } + } +} + +impl DomainType for EventUndelegate { + type Proto = pb::EventUndelegate; +} + +#[derive(Clone, Debug)] +pub struct EventTombstoneValidator { + pub evidence_height: u64, + pub current_height: u64, + pub identity_key: IdentityKey, + pub address: Vec, + pub voting_power: u64, +} + +impl EventTombstoneValidator { + pub fn from_evidence( + current_height: u64, + identity_key: IdentityKey, + evidence: &Misbehavior, + ) -> Self { + Self { + evidence_height: evidence.height.value(), + current_height, + identity_key, + address: evidence.validator.address.to_vec(), + voting_power: evidence.validator.power.value(), + } + } +} + +impl TryFrom for EventTombstoneValidator { + type Error = anyhow::Error; + + fn try_from(value: pb::EventTombstoneValidator) -> Result { + fn inner(value: pb::EventTombstoneValidator) -> anyhow::Result { + Ok(EventTombstoneValidator { + evidence_height: value.evidence_height, + current_height: value.current_height, + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + address: value.address, + voting_power: value.voting_power, + }) + } + inner(value).context(format!("parsing {}", pb::EventTombstoneValidator::NAME)) + } +} + +impl From for pb::EventTombstoneValidator { + fn from(value: EventTombstoneValidator) -> Self { + Self { + evidence_height: value.evidence_height, + current_height: value.current_height, + identity_key: Some(value.identity_key.into()), + address: value.address, + voting_power: value.voting_power, + } + } +} + +impl DomainType for EventTombstoneValidator { + type Proto = pb::EventTombstoneValidator; +} + +#[derive(Clone, Debug)] +pub struct EventSlashingPenaltyApplied { + pub identity_key: IdentityKey, + pub epoch_index: u64, + pub new_penalty: Penalty, +} + +impl TryFrom for EventSlashingPenaltyApplied { + type Error = anyhow::Error; + + fn try_from(value: pb::EventSlashingPenaltyApplied) -> Result { + fn inner( + value: pb::EventSlashingPenaltyApplied, + ) -> anyhow::Result { + Ok(EventSlashingPenaltyApplied { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + epoch_index: value.epoch_index, + new_penalty: value + .new_penalty + .ok_or(anyhow!("missing `new_penalty`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventSlashingPenaltyApplied::NAME)) + } +} + +impl From for pb::EventSlashingPenaltyApplied { + fn from(value: EventSlashingPenaltyApplied) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + epoch_index: value.epoch_index, + new_penalty: Some(value.new_penalty.into()), + } + } +} + +impl DomainType for EventSlashingPenaltyApplied { + type Proto = pb::EventSlashingPenaltyApplied; +} diff --git a/crates/core/component/stake/src/lib.rs b/crates/core/component/stake/src/lib.rs index eb574260df..ccaab1c2be 100644 --- a/crates/core/component/stake/src/lib.rs +++ b/crates/core/component/stake/src/lib.rs @@ -6,7 +6,7 @@ mod changes; mod current_consensus_keys; mod delegation_token; -mod event; +pub mod event; mod governance_key; mod identity_key; mod penalty;