diff --git a/Cargo.lock b/Cargo.lock index 4681e50097..e37618957f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5824,6 +5824,7 @@ dependencies = [ "penumbra-auction", "penumbra-dex", "penumbra-fee", + "penumbra-funding", "penumbra-governance", "penumbra-keys", "penumbra-num", diff --git a/crates/bin/pindexer/Cargo.toml b/crates/bin/pindexer/Cargo.toml index 610d43e780..24ee0b3531 100644 --- a/crates/bin/pindexer/Cargo.toml +++ b/crates/bin/pindexer/Cargo.toml @@ -22,6 +22,7 @@ penumbra-app = {workspace = true, default-features = false} penumbra-auction = {workspace = true, default-features = false} penumbra-dex = {workspace = true, default-features = false} penumbra-fee = {workspace = true, default-features = false} +penumbra-funding = {workspace = true, default-features = false} penumbra-keys = {workspace = true, default-features = false} penumbra-governance = {workspace = true, default-features = false} penumbra-num = {workspace = true, default-features = false} diff --git a/crates/bin/pindexer/src/indexer_ext.rs b/crates/bin/pindexer/src/indexer_ext.rs index 950000fdd3..90600e43ab 100644 --- a/crates/bin/pindexer/src/indexer_ext.rs +++ b/crates/bin/pindexer/src/indexer_ext.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + pub trait IndexerExt: Sized { fn with_default_penumbra_app_views(self) -> Self; } @@ -13,5 +15,12 @@ impl IndexerExt for cometindex::Indexer { .with_index(crate::dex_ex::Component::new()) .with_index(crate::supply::Component::new()) .with_index(crate::ibc::Component::new()) + .with_index(crate::insights::Component::new( + penumbra_asset::asset::Id::from_str( + // USDC + "passet1w6e7fvgxsy6ccy3m8q0eqcuyw6mh3yzqu3uq9h58nu8m8mku359spvulf6", + ) + .ok(), + )) } } diff --git a/crates/bin/pindexer/src/insights/mod.rs b/crates/bin/pindexer/src/insights/mod.rs new file mode 100644 index 0000000000..221c64eb25 --- /dev/null +++ b/crates/bin/pindexer/src/insights/mod.rs @@ -0,0 +1,506 @@ +use std::{collections::BTreeMap, iter}; + +use cometindex::{async_trait, AppView, ContextualizedEvent, PgTransaction}; +use penumbra_app::genesis::Content; +use penumbra_asset::{asset, STAKING_TOKEN_ASSET_ID}; +use penumbra_dex::{ + event::{EventArbExecution, EventCandlestickData}, + DirectedTradingPair, +}; +use penumbra_fee::event::EventBlockFees; +use penumbra_funding::event::EventFundingStreamReward; +use penumbra_num::Amount; +use penumbra_proto::{event::EventDomainType, DomainType, Name}; +use penumbra_shielded_pool::event::{ + EventInboundFungibleTokenTransfer, EventOutboundFungibleTokenRefund, + EventOutboundFungibleTokenTransfer, +}; +use penumbra_stake::{ + event::{EventDelegate, EventRateDataChange, EventUndelegate}, + validator::Validator, + IdentityKey, +}; +use sqlx::PgPool; + +use crate::parsing::parse_content; + +#[derive(Debug, Clone, Copy)] +struct ValidatorSupply { + um: u64, + rate_bps2: u64, +} + +async fn modify_validator_supply( + dbtx: &mut PgTransaction<'_>, + height: u64, + ik: IdentityKey, + f: Box anyhow::Result + Send + 'static>, +) -> anyhow::Result { + let ik_text = ik.to_string(); + let supply = { + let row: Option<(i64, i64)> = sqlx::query_as(" + SELECT um, rate_bps2 FROM _insights_validators WHERE validator_id = $1 ORDER BY height DESC LIMIT 1 + ").bind(&ik_text).fetch_optional(dbtx.as_mut()).await?; + let row = row.unwrap_or((0i64, 1_0000_0000i64)); + ValidatorSupply { + um: u64::try_from(row.0)?, + rate_bps2: u64::try_from(row.1)?, + } + }; + let new_supply = f(supply)?; + sqlx::query( + r#" + INSERT INTO _insights_validators + VALUES ($1, $2, $3, $4) + ON CONFLICT (validator_id, height) DO UPDATE SET + um = excluded.um, + rate_bps2 = excluded.rate_bps2 + "#, + ) + .bind(&ik_text) + .bind(i64::try_from(height)?) + .bind(i64::try_from(new_supply.um)?) + .bind(i64::try_from(new_supply.rate_bps2)?) + .execute(dbtx.as_mut()) + .await?; + Ok(i64::try_from(new_supply.um)? - i64::try_from(supply.um)?) +} + +#[derive(Default, Debug, Clone, Copy)] +struct Supply { + total: u64, + staked: u64, + price: Option, +} + +async fn modify_supply( + dbtx: &mut PgTransaction<'_>, + height: u64, + price_numeraire: Option, + f: Box anyhow::Result + Send + 'static>, +) -> anyhow::Result<()> { + let supply: Supply = { + let row: Option<(i64, i64, Option)> = sqlx::query_as( + "SELECT total, staked, price FROM insights_supply ORDER BY HEIGHT DESC LIMIT 1", + ) + .fetch_optional(dbtx.as_mut()) + .await?; + row.map(|(total, staked, price)| { + anyhow::Result::<_>::Ok(Supply { + total: total.try_into()?, + staked: staked.try_into()?, + price, + }) + }) + .transpose()? + .unwrap_or_default() + }; + let supply = f(supply)?; + sqlx::query( + r#" + INSERT INTO + insights_supply(height, total, staked, price, price_numeraire_asset_id) + VALUES ($1, $2, $3, $5, $4) + ON CONFLICT (height) DO UPDATE SET + total = excluded.total, + staked = excluded.staked, + price = excluded.price, + price_numeraire_asset_id = excluded.price_numeraire_asset_id + "#, + ) + .bind(i64::try_from(height)?) + .bind(i64::try_from(supply.total)?) + .bind(i64::try_from(supply.staked)?) + .bind(price_numeraire.map(|x| x.to_bytes())) + .bind(supply.price) + .execute(dbtx.as_mut()) + .await?; + Ok(()) +} + +#[derive(Debug, Clone, Copy, PartialEq)] +enum DepositorExisted { + Yes, + No, +} + +async fn register_depositor( + dbtx: &mut PgTransaction<'_>, + asset_id: asset::Id, + address: &str, +) -> anyhow::Result { + let exists: bool = sqlx::query_scalar( + "SELECT EXISTS (SELECT 1 FROM _insights_shielded_pool_depositors WHERE asset_id = $1 AND address = $2)", + ) + .bind(asset_id.to_bytes()) + .bind(address) + .fetch_one(dbtx.as_mut()) + .await?; + if exists { + return Ok(DepositorExisted::Yes); + } + sqlx::query("INSERT INTO _insights_shielded_pool_depositors VALUES ($1, $2)") + .bind(asset_id.to_bytes()) + .bind(address) + .execute(dbtx.as_mut()) + .await?; + Ok(DepositorExisted::No) +} + +async fn asset_flow( + dbtx: &mut PgTransaction<'_>, + asset_id: asset::Id, + height: u64, + flow: i128, + depositor_existed: DepositorExisted, +) -> anyhow::Result<()> { + let asset_pool: Option<(String, String, i32)> = sqlx::query_as("SELECT total_value, current_value, unique_depositors FROM insights_shielded_pool WHERE asset_id = $1 ORDER BY height DESC LIMIT 1").bind(asset_id.to_bytes()).fetch_optional(dbtx.as_mut()).await?; + let mut asset_pool = asset_pool + .map(|(t, c, u)| { + anyhow::Result::<(i128, i128, i32)>::Ok(( + i128::from_str_radix(&t, 10)?, + i128::from_str_radix(&c, 10)?, + u, + )) + }) + .transpose()? + .unwrap_or((0i128, 0i128, 0i32)); + asset_pool.0 += flow.abs(); + asset_pool.1 += flow; + asset_pool.2 += match depositor_existed { + DepositorExisted::Yes => 0, + DepositorExisted::No => 1, + }; + sqlx::query( + r#" + INSERT INTO insights_shielded_pool + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (asset_id, height) DO UPDATE SET + total_value = excluded.total_value, + current_value = excluded.current_value, + unique_depositors = excluded.unique_depositors + "#, + ) + .bind(asset_id.to_bytes()) + .bind(i64::try_from(height)?) + .bind(asset_pool.0.to_string()) + .bind(asset_pool.1.to_string()) + .bind(asset_pool.2) + .execute(dbtx.as_mut()) + .await?; + Ok(()) +} + +#[derive(Debug)] +pub struct Component { + price_numeraire: Option, +} + +impl Component { + /// This component depends on a reference asset for the total supply price. + pub fn new(price_numeraire: Option) -> Self { + Self { price_numeraire } + } +} + +/// Add the initial native token supply. +async fn add_genesis_native_token_allocation_supply<'a>( + dbtx: &mut PgTransaction<'a>, + content: &Content, +) -> anyhow::Result<()> { + fn content_mints(content: &Content) -> BTreeMap { + let community_pool_mint = iter::once(( + *STAKING_TOKEN_ASSET_ID, + content.community_pool_content.initial_balance.amount, + )); + let allocation_mints = content + .shielded_pool_content + .allocations + .iter() + .map(|allocation| { + let value = allocation.value(); + (value.asset_id, value.amount) + }); + + let mut out = BTreeMap::new(); + for (id, amount) in community_pool_mint.chain(allocation_mints) { + out.entry(id).and_modify(|x| *x += amount).or_insert(amount); + } + out + } + + let mints = content_mints(content); + + let unstaked = u64::try_from( + mints + .get(&*STAKING_TOKEN_ASSET_ID) + .copied() + .unwrap_or_default() + .value(), + )?; + + let mut staked = 0u64; + // at genesis, assume a 1:1 ratio between delegation amount and native token amount. + for val in &content.stake_content.validators { + let val = Validator::try_from(val.clone())?; + let delegation_amount: u64 = mints + .get(&val.token().id()) + .cloned() + .unwrap_or_default() + .value() + .try_into()?; + staked += delegation_amount; + modify_validator_supply( + dbtx, + 0, + val.identity_key, + Box::new(move |_| { + Ok(ValidatorSupply { + um: delegation_amount, + rate_bps2: 1_0000_0000, + }) + }), + ) + .await?; + } + + modify_supply( + dbtx, + 0, + None, + Box::new(move |_| { + Ok(Supply { + total: unstaked + staked, + staked, + price: None, + }) + }), + ) + .await?; + + Ok(()) +} +#[async_trait] +impl AppView for Component { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + app_state: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + for statement in include_str!("schema.sql").split(";") { + sqlx::query(statement).execute(dbtx.as_mut()).await?; + } + + // decode the initial supply from the genesis + // initial app state is not recomputed from events, because events are not emitted in init_chain. + // instead, the indexer directly parses the genesis. + add_genesis_native_token_allocation_supply(dbtx, &parse_content(app_state.clone())?) + .await?; + Ok(()) + } + + fn is_relevant(&self, type_str: &str) -> bool { + [ + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ::Proto::full_name(), + ] + .into_iter() + .any(|x| type_str == x) + } + + async fn index_event( + &self, + dbtx: &mut PgTransaction, + event: &ContextualizedEvent, + _src_db: &PgPool, + ) -> Result<(), anyhow::Error> { + let height = event.block_height; + if let Ok(e) = EventUndelegate::try_from_event(&event.event) { + let delta = modify_validator_supply( + dbtx, + height, + e.identity_key, + Box::new(move |supply| { + Ok(ValidatorSupply { + um: supply.um + u64::try_from(e.amount.value()).expect(""), + ..supply + }) + }), + ) + .await?; + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + // The amount staked has changed, but no inflation has happened. + Ok(Supply { + staked: u64::try_from(i64::try_from(supply.staked)? + delta)?, + ..supply + }) + }), + ) + .await?; + } else if let Ok(e) = EventDelegate::try_from_event(&event.event) { + let delta = modify_validator_supply( + dbtx, + height, + e.identity_key, + Box::new(move |supply| { + Ok(ValidatorSupply { + um: supply.um + u64::try_from(e.amount.value()).expect(""), + ..supply + }) + }), + ) + .await?; + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + Ok(Supply { + staked: u64::try_from(i64::try_from(supply.staked)? + delta)?, + ..supply + }) + }), + ) + .await?; + } else if let Ok(e) = EventRateDataChange::try_from_event(&event.event) { + let delta = modify_validator_supply( + dbtx, + height, + e.identity_key, + Box::new(move |supply| { + // del_um <- um / old_exchange_rate + // um <- del_um * new_exchange_rate + // so + // um <- um * (new_exchange_rate / old_exchange_rate) + // and the bps cancel out. + let um = (u128::from(supply.um) * e.rate_data.validator_exchange_rate.value()) + .checked_div(supply.rate_bps2.into()) + .unwrap_or(0u128) + .try_into()?; + Ok(ValidatorSupply { + um, + rate_bps2: u64::try_from(e.rate_data.validator_exchange_rate.value())?, + }) + }), + ) + .await?; + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + // Value has been created or destroyed! + Ok(Supply { + total: u64::try_from(i64::try_from(supply.total)? + delta)?, + staked: u64::try_from(i64::try_from(supply.staked)? + delta)?, + ..supply + }) + }), + ) + .await?; + } else if let Ok(e) = EventBlockFees::try_from_event(&event.event) { + let value = e.swapped_fee_total.value(); + if value.asset_id == *STAKING_TOKEN_ASSET_ID { + let amount = u64::try_from(value.amount.value())?; + // We consider the tip to be destroyed too, matching the current logic + // DRAGON: if this changes, this code should use the base fee only. + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + Ok(Supply { + total: supply.total + amount, + ..supply + }) + }), + ) + .await?; + } + } else if let Ok(e) = EventArbExecution::try_from_event(&event.event) { + let input = e.swap_execution.input; + let output = e.swap_execution.output; + if input.asset_id == *STAKING_TOKEN_ASSET_ID + && output.asset_id == *STAKING_TOKEN_ASSET_ID + { + let profit = u64::try_from((output.amount - input.amount).value())?; + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + Ok(Supply { + total: supply.total + profit, + ..supply + }) + }), + ) + .await?; + } + } else if let Ok(e) = EventFundingStreamReward::try_from_event(&event.event) { + let amount = u64::try_from(e.reward_amount.value())?; + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + Ok(Supply { + total: supply.total + amount, + ..supply + }) + }), + ) + .await?; + } else if let Ok(e) = EventInboundFungibleTokenTransfer::try_from_event(&event.event) { + if e.value.asset_id != *STAKING_TOKEN_ASSET_ID { + let existed = register_depositor(dbtx, e.value.asset_id, &e.sender).await?; + let flow = i128::try_from(e.value.amount.value())?; + asset_flow(dbtx, e.value.asset_id, height, flow, existed).await?; + } + } else if let Ok(e) = EventOutboundFungibleTokenTransfer::try_from_event(&event.event) { + if e.value.asset_id != *STAKING_TOKEN_ASSET_ID { + let flow = i128::try_from(e.value.amount.value())?; + // For outbound transfers, never increment unique count + asset_flow(dbtx, e.value.asset_id, height, -flow, DepositorExisted::No).await?; + } + } else if let Ok(e) = EventOutboundFungibleTokenRefund::try_from_event(&event.event) { + if e.value.asset_id != *STAKING_TOKEN_ASSET_ID { + let flow = i128::try_from(e.value.amount.value())?; + // For outbound transfers, never increment unique count. + asset_flow(dbtx, e.value.asset_id, height, flow, DepositorExisted::No).await?; + } + } else if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { + if let Some(pn) = self.price_numeraire { + if e.pair == DirectedTradingPair::new(*STAKING_TOKEN_ASSET_ID, pn) { + let price = e.stick.close; + modify_supply( + dbtx, + height, + self.price_numeraire, + Box::new(move |supply| { + Ok(Supply { + price: Some(price), + ..supply + }) + }), + ) + .await?; + } + } + } + tracing::debug!(?event, "unrecognized event"); + Ok(()) + } +} diff --git a/crates/bin/pindexer/src/insights/schema.sql b/crates/bin/pindexer/src/insights/schema.sql new file mode 100644 index 0000000000..a23d67c055 --- /dev/null +++ b/crates/bin/pindexer/src/insights/schema.sql @@ -0,0 +1,54 @@ +-- A table containing updates to the total supply, and market cap. +CREATE TABLE IF NOT EXISTS insights_supply ( + -- The height where the supply was updated. + height BIGINT PRIMARY KEY, + -- The total supply of the staking token at this height. + total BIGINT NOT NULL, + staked BIGINT NOT NULL, + -- Price, if it can be found for whatever numeraire we choose at runtime. + price FLOAT8, + -- The numeraire for the price we've chosen. + price_numeraire_asset_id BYTEA, + -- The market cap, i.e. price * total amount. + market_cap FLOAT8 GENERATED ALWAYS AS (total::FLOAT8 * price) STORED +); + +-- A working table to save the state around validators we need. +-- +-- This is necessary because rate data changes increase the total supply, +-- but don't directly tell us how much the total supply increased. +CREATE TABLE IF NOT EXISTS _insights_validators ( + -- The validator this row concerns. + validator_id TEXT NOT NULL, + -- The height for the supply update. + height BIGINT NOT NULL, + -- The total amount staked with them, in terms of the native token. + um BIGINT NOT NULL, + -- How much native um we get per unit of the delegation token. + rate_bps2 BIGINT NOT NULL, + PRIMARY KEY (validator_id, height) +); + +-- Our internal representation of the shielded pool table. +CREATE TABLE IF NOT EXISTS insights_shielded_pool ( + -- The asset this concerns. + asset_id BYTEA NOT NULL, + height BIGINT NOT NULL, + -- The total value shielded, in terms of that asset. + total_value TEXT NOT NULL, + -- The current value shielded, in terms of that asset. + current_value TEXT NOT NULL, + -- The number of unique depositors. + unique_depositors INT NOT NULL, + PRIMARY KEY (asset_id, height) +); + +-- Unique depositors into the shielded pool +CREATE TABLE IF NOT EXISTS _insights_shielded_pool_depositors ( + asset_id BYTEA NOT NULL, + address TEXT NOT NULL, + PRIMARY KEY (asset_id, address) +); + +CREATE OR REPLACE VIEW insights_shielded_pool_latest AS + SELECT DISTINCT ON (asset_id) * FROM insights_shielded_pool ORDER BY asset_id, height DESC; diff --git a/crates/bin/pindexer/src/lib.rs b/crates/bin/pindexer/src/lib.rs index e2c2d63476..47429f1c10 100644 --- a/crates/bin/pindexer/src/lib.rs +++ b/crates/bin/pindexer/src/lib.rs @@ -6,6 +6,7 @@ pub mod block; pub mod dex; pub mod dex_ex; pub mod ibc; +pub mod insights; mod parsing; pub mod shielded_pool; mod sql; diff --git a/crates/core/component/fee/src/component.rs b/crates/core/component/fee/src/component.rs index e3c80d9f73..25ffa10451 100644 --- a/crates/core/component/fee/src/component.rs +++ b/crates/core/component/fee/src/component.rs @@ -4,12 +4,12 @@ mod view; use std::sync::Arc; -use crate::{genesis, Fee}; +use crate::{event::EventBlockFees, genesis, Fee}; use async_trait::async_trait; use cnidarium::StateWrite; use cnidarium_component::Component; -use penumbra_proto::core::component::fee::v1 as pb; use penumbra_proto::state::StateWriteProto as _; +use penumbra_proto::DomainType as _; use tendermint::abci; use tracing::instrument; @@ -56,11 +56,14 @@ impl Component for FeeComponent { let swapped_total = swapped_base + swapped_tip; - state_ref.record_proto(pb::EventBlockFees { - swapped_fee_total: Some(Fee::from_staking_token_amount(swapped_total).into()), - swapped_base_fee_total: Some(Fee::from_staking_token_amount(swapped_base).into()), - swapped_tip_total: Some(Fee::from_staking_token_amount(swapped_tip).into()), - }); + state_ref.record_proto( + EventBlockFees { + swapped_fee_total: Fee::from_staking_token_amount(swapped_total), + swapped_base_fee_total: Fee::from_staking_token_amount(swapped_base), + swapped_tip_total: Fee::from_staking_token_amount(swapped_tip), + } + .to_proto(), + ); } #[instrument(name = "fee", skip(_state))] diff --git a/crates/core/component/fee/src/event.rs b/crates/core/component/fee/src/event.rs index 8b13789179..5a87c81976 100644 --- a/crates/core/component/fee/src/event.rs +++ b/crates/core/component/fee/src/event.rs @@ -1 +1,48 @@ +use crate::Fee; +use anyhow::{anyhow, Context}; +use penumbra_proto::{core::component::fee::v1 as pb, DomainType, Name as _}; +#[derive(Clone, Debug)] +pub struct EventBlockFees { + pub swapped_fee_total: Fee, + pub swapped_base_fee_total: Fee, + pub swapped_tip_total: Fee, +} + +impl TryFrom for EventBlockFees { + type Error = anyhow::Error; + + fn try_from(value: pb::EventBlockFees) -> Result { + fn inner(value: pb::EventBlockFees) -> anyhow::Result { + Ok(EventBlockFees { + swapped_fee_total: value + .swapped_fee_total + .ok_or(anyhow!("missing `swapped_fee_total`"))? + .try_into()?, + swapped_base_fee_total: value + .swapped_base_fee_total + .ok_or(anyhow!("missing `swapped_base_fee_total`"))? + .try_into()?, + swapped_tip_total: value + .swapped_tip_total + .ok_or(anyhow!("missing `swapped_tip_total`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventBlockFees::NAME)) + } +} + +impl From for pb::EventBlockFees { + fn from(value: EventBlockFees) -> Self { + Self { + swapped_fee_total: Some(value.swapped_fee_total.into()), + swapped_base_fee_total: Some(value.swapped_base_fee_total.into()), + swapped_tip_total: Some(value.swapped_tip_total.into()), + } + } +} + +impl DomainType for EventBlockFees { + type Proto = pb::EventBlockFees; +} diff --git a/crates/core/component/funding/src/component.rs b/crates/core/component/funding/src/component.rs index b459eac239..a0ea863ec7 100644 --- a/crates/core/component/funding/src/component.rs +++ b/crates/core/component/funding/src/component.rs @@ -6,7 +6,7 @@ pub use metrics::register_metrics; /* Component implementation */ use penumbra_asset::{Value, STAKING_TOKEN_ASSET_ID}; -use penumbra_proto::StateWriteProto; +use penumbra_proto::{DomainType, StateWriteProto}; use penumbra_stake::component::validator_handler::ValidatorDataRead; pub use view::{StateReadExt, StateWriteExt}; @@ -19,7 +19,7 @@ use cnidarium_component::Component; use tendermint::v0_37::abci; use tracing::instrument; -use crate::{event::funding_stream_reward, genesis}; +use crate::{event::EventFundingStreamReward, genesis}; pub struct Funding {} @@ -112,11 +112,14 @@ impl Component for Funding { // If the recipient is an address, mint a note to that address Recipient::Address(address) => { // Record the funding stream reward event: - state.record_proto(funding_stream_reward( - address.to_string(), - base_rate.epoch_index, - reward_amount_for_stream.into(), - )); + state.record_proto( + EventFundingStreamReward { + recipient: address.to_string(), + epoch_index: base_rate.epoch_index, + reward_amount: reward_amount_for_stream, + } + .to_proto(), + ); state .mint_note( @@ -134,11 +137,14 @@ impl Component for Funding { // If the recipient is the Community Pool, deposit the funds into the Community Pool Recipient::CommunityPool => { // Record the funding stream reward event: - state.record_proto(funding_stream_reward( - "community-pool".to_string(), - base_rate.epoch_index, - reward_amount_for_stream.into(), - )); + state.record_proto( + EventFundingStreamReward { + recipient: "community-pool".to_string(), + epoch_index: base_rate.epoch_index, + reward_amount: reward_amount_for_stream, + } + .to_proto(), + ); state .community_pool_deposit(Value { diff --git a/crates/core/component/funding/src/event.rs b/crates/core/component/funding/src/event.rs index c40df99e9a..6d646bfe53 100644 --- a/crates/core/component/funding/src/event.rs +++ b/crates/core/component/funding/src/event.rs @@ -1,14 +1,42 @@ +use anyhow::{anyhow, Context}; use penumbra_num::Amount; -use penumbra_proto::penumbra::core::component::funding::v1 as pb; - -pub fn funding_stream_reward( - recipient: String, - epoch_index: u64, - reward_amount: Amount, -) -> pb::EventFundingStreamReward { - pb::EventFundingStreamReward { - recipient, - epoch_index, - reward_amount: Some(reward_amount.into()), +use penumbra_proto::{penumbra::core::component::funding::v1 as pb, DomainType, Name as _}; + +#[derive(Clone, Debug)] +pub struct EventFundingStreamReward { + pub recipient: String, + pub epoch_index: u64, + pub reward_amount: Amount, +} + +impl TryFrom for EventFundingStreamReward { + type Error = anyhow::Error; + + fn try_from(value: pb::EventFundingStreamReward) -> Result { + fn inner(value: pb::EventFundingStreamReward) -> anyhow::Result { + Ok(EventFundingStreamReward { + recipient: value.recipient, + epoch_index: value.epoch_index, + reward_amount: value + .reward_amount + .ok_or(anyhow!("missing `reward_amount`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventFundingStreamReward::NAME)) } } + +impl From for pb::EventFundingStreamReward { + fn from(value: EventFundingStreamReward) -> Self { + Self { + recipient: value.recipient, + epoch_index: value.epoch_index, + reward_amount: Some(value.reward_amount.into()), + } + } +} + +impl DomainType for EventFundingStreamReward { + type Proto = pb::EventFundingStreamReward; +} diff --git a/crates/core/component/shielded-pool/src/component/action_handler/output.rs b/crates/core/component/shielded-pool/src/component/action_handler/output.rs index 330a39c0e0..18a4515c9f 100644 --- a/crates/core/component/shielded-pool/src/component/action_handler/output.rs +++ b/crates/core/component/shielded-pool/src/component/action_handler/output.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use cnidarium::StateWrite; use cnidarium_component::ActionHandler; use penumbra_proof_params::OUTPUT_PROOF_VERIFICATION_KEY; -use penumbra_proto::StateWriteProto as _; +use penumbra_proto::{DomainType as _, StateWriteProto as _}; use penumbra_sct::component::source::SourceContext; use crate::{component::NoteManager, event, output::OutputProofPublic, Output}; @@ -34,7 +34,12 @@ impl ActionHandler for Output { .add_note_payload(self.body.note_payload.clone(), source) .await; - state.record_proto(event::output(&self.body.note_payload)); + state.record_proto( + event::EventOutput { + note_commitment: self.body.note_payload.note_commitment, + } + .to_proto(), + ); Ok(()) } diff --git a/crates/core/component/shielded-pool/src/component/action_handler/spend.rs b/crates/core/component/shielded-pool/src/component/action_handler/spend.rs index ee4c1ace85..b3da310650 100644 --- a/crates/core/component/shielded-pool/src/component/action_handler/spend.rs +++ b/crates/core/component/shielded-pool/src/component/action_handler/spend.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use cnidarium::StateWrite; use cnidarium_component::ActionHandler; use penumbra_proof_params::SPEND_PROOF_VERIFICATION_KEY; -use penumbra_proto::StateWriteProto as _; +use penumbra_proto::{DomainType, StateWriteProto as _}; use penumbra_sct::component::{ source::SourceContext, tree::{SctManager, VerificationExt}, @@ -49,7 +49,12 @@ impl ActionHandler for Spend { state.nullify(self.body.nullifier, source).await; // Also record an ABCI event for transaction indexing. - state.record_proto(event::spend(&self.body.nullifier)); + state.record_proto( + event::EventSpend { + nullifier: self.body.nullifier, + } + .to_proto(), + ); Ok(()) } diff --git a/crates/core/component/shielded-pool/src/component/transfer.rs b/crates/core/component/shielded-pool/src/component/transfer.rs index 18308e82ca..88c0e799ab 100644 --- a/crates/core/component/shielded-pool/src/component/transfer.rs +++ b/crates/core/component/shielded-pool/src/component/transfer.rs @@ -2,7 +2,8 @@ use std::str::FromStr; use crate::{ component::{AssetRegistry, NoteManager}, - event, Ics20Withdrawal, + event::{self, FungibleTokenTransferPacketMetadata}, + Ics20Withdrawal, }; use anyhow::{Context, Result}; use async_trait::async_trait; @@ -25,8 +26,8 @@ use penumbra_ibc::component::ChannelStateReadExt; use penumbra_keys::Address; use penumbra_num::Amount; use penumbra_proto::{ - core::component::shielded_pool::v1::FungibleTokenTransferPacketMetadata, - penumbra::core::component::ibc::v1::FungibleTokenPacketData, StateReadProto, StateWriteProto, + penumbra::core::component::ibc::v1::FungibleTokenPacketData, DomainType as _, StateReadProto, + StateWriteProto, }; use penumbra_sct::CommitmentSource; @@ -118,23 +119,26 @@ pub trait Ics20TransferWriteExt: StateWrite { ), new_value_balance, ); - self.record_proto(event::outbound_fungible_token_transfer( - Value { - amount: withdrawal.amount, - asset_id: withdrawal.denom.id(), - }, - &withdrawal.return_address, - withdrawal.destination_chain_address.clone(), - FungibleTokenTransferPacketMetadata { - channel: withdrawal.source_channel.0.clone(), - sequence: self - .get_send_sequence( - &withdrawal.source_channel, - &checked_packet.source_port(), - ) - .await?, - }, - )); + self.record_proto( + event::EventOutboundFungibleTokenTransfer { + value: Value { + amount: withdrawal.amount, + asset_id: withdrawal.denom.id(), + }, + sender: withdrawal.return_address.clone(), + receiver: withdrawal.destination_chain_address.clone(), + meta: FungibleTokenTransferPacketMetadata { + channel: withdrawal.source_channel.0.clone(), + sequence: self + .get_send_sequence( + &withdrawal.source_channel, + &checked_packet.source_port(), + ) + .await?, + }, + } + .to_proto(), + ); } else { // receiver is the source, burn utxos @@ -168,23 +172,26 @@ pub trait Ics20TransferWriteExt: StateWrite { ), new_value_balance, ); - self.record_proto(event::outbound_fungible_token_transfer( - Value { - amount: withdrawal.amount, - asset_id: withdrawal.denom.id(), - }, - &withdrawal.return_address, - withdrawal.destination_chain_address.clone(), - FungibleTokenTransferPacketMetadata { - channel: withdrawal.source_channel.0.clone(), - sequence: self - .get_send_sequence( - &withdrawal.source_channel, - &checked_packet.source_port(), - ) - .await?, - }, - )); + self.record_proto( + event::EventOutboundFungibleTokenTransfer { + value: Value { + amount: withdrawal.amount, + asset_id: withdrawal.denom.id(), + }, + sender: withdrawal.return_address.clone(), + receiver: withdrawal.destination_chain_address.clone(), + meta: FungibleTokenTransferPacketMetadata { + channel: withdrawal.source_channel.0.clone(), + sequence: self + .get_send_sequence( + &withdrawal.source_channel, + &checked_packet.source_port(), + ) + .await?, + }, + } + .to_proto(), + ); } self.send_packet_execute(checked_packet).await; @@ -388,15 +395,18 @@ async fn recv_transfer_packet_inner( state_key::ics20_value_balance::by_asset_id(&msg.packet.chan_on_b, &denom.id()), new_value_balance, ); - state.record_proto(event::inbound_fungible_token_transfer( - value, - packet_data.sender.clone(), - &receiver_address, - FungibleTokenTransferPacketMetadata { - channel: msg.packet.chan_on_a.0.clone(), - sequence: msg.packet.sequence.0, - }, - )); + state.record_proto( + event::EventInboundFungibleTokenTransfer { + value, + sender: packet_data.sender.clone(), + receiver: receiver_address, + meta: FungibleTokenTransferPacketMetadata { + channel: msg.packet.chan_on_a.0.clone(), + sequence: msg.packet.sequence.0, + }, + } + .to_proto(), + ); } else { // create new denom: // @@ -448,15 +458,18 @@ async fn recv_transfer_packet_inner( state_key::ics20_value_balance::by_asset_id(&msg.packet.chan_on_b, &denom.id()), new_value_balance, ); - state.record_proto(event::inbound_fungible_token_transfer( - value, - packet_data.sender.clone(), - &receiver_address, - FungibleTokenTransferPacketMetadata { - channel: msg.packet.chan_on_a.0.clone(), - sequence: msg.packet.sequence.0, - }, - )); + state.record_proto( + event::EventInboundFungibleTokenTransfer { + value, + sender: packet_data.sender.clone(), + receiver: receiver_address, + meta: FungibleTokenTransferPacketMetadata { + channel: msg.packet.chan_on_a.0.clone(), + sequence: msg.packet.sequence.0, + }, + } + .to_proto(), + ); } Ok(()) @@ -527,17 +540,20 @@ async fn refund_tokens( state_key::ics20_value_balance::by_asset_id(&packet.chan_on_a, &denom.id()), new_value_balance, ); - state.record_proto(event::outbound_fungible_token_refund( - value, - &receiver, // note, this comes from packet_data.sender - packet_data.receiver.clone(), - reason, - // Use the destination channel, i.e. our name for it, to be consistent across events. - FungibleTokenTransferPacketMetadata { - channel: packet.chan_on_b.0.clone(), - sequence: packet.sequence.0, - }, - )); + state.record_proto( + event::EventOutboundFungibleTokenRefund { + value, + sender: receiver, // note, this comes from packet_data.sender + receiver: packet_data.receiver.clone(), + reason, + // Use the destination channel, i.e. our name for it, to be consistent across events. + meta: FungibleTokenTransferPacketMetadata { + channel: packet.chan_on_b.0.clone(), + sequence: packet.sequence.0, + }, + } + .to_proto(), + ); } else { let value_balance: Amount = state .get(&state_key::ics20_value_balance::by_asset_id( @@ -566,17 +582,20 @@ async fn refund_tokens( state_key::ics20_value_balance::by_asset_id(&packet.chan_on_a, &denom.id()), new_value_balance, ); - // note, order flipped relative to the event. - state.record_proto(event::outbound_fungible_token_refund( - value, - &receiver, // note, this comes from packet_data.sender - packet_data.receiver.clone(), - reason, - FungibleTokenTransferPacketMetadata { - channel: packet.chan_on_b.0.clone(), - sequence: packet.sequence.0, - }, - )); + state.record_proto( + event::EventOutboundFungibleTokenRefund { + value, + sender: receiver, // note, this comes from packet_data.sender + receiver: packet_data.receiver.clone(), + reason, + // Use the destination channel, i.e. our name for it, to be consistent across events. + meta: FungibleTokenTransferPacketMetadata { + channel: packet.chan_on_b.0.clone(), + sequence: packet.sequence.0, + }, + } + .to_proto(), + ); } Ok(()) diff --git a/crates/core/component/shielded-pool/src/event.rs b/crates/core/component/shielded-pool/src/event.rs index f13f1398c0..e0f14b67fa 100644 --- a/crates/core/component/shielded-pool/src/event.rs +++ b/crates/core/component/shielded-pool/src/event.rs @@ -1,79 +1,274 @@ +use anyhow::{anyhow, Context}; use penumbra_asset::Value; use penumbra_keys::Address; -use penumbra_proto::core::component::shielded_pool::v1::{ - event_outbound_fungible_token_refund::Reason, EventInboundFungibleTokenTransfer, - EventOutboundFungibleTokenRefund, EventOutboundFungibleTokenTransfer, EventOutput, EventSpend, - FungibleTokenTransferPacketMetadata, -}; +use penumbra_proto::{core::component::shielded_pool::v1 as pb, DomainType}; use penumbra_sct::Nullifier; +use prost::Name as _; -use crate::NotePayload; +use crate::note::StateCommitment; -// These are sort of like the proto/domain type From impls, because -// we don't have separate domain types for the events (yet, possibly ever). +// // These are sort of like the proto/domain type From impls, because +// // we don't have separate domain types for the events (yet, possibly ever). +// Narrator: we did in fact need the separate domain types. -pub fn spend(nullifier: &Nullifier) -> EventSpend { - EventSpend { - nullifier: Some((*nullifier).into()), +#[derive(Clone, Debug)] +pub struct EventSpend { + pub nullifier: Nullifier, +} + +impl TryFrom for EventSpend { + type Error = anyhow::Error; + + fn try_from(value: pb::EventSpend) -> Result { + fn inner(value: pb::EventSpend) -> anyhow::Result { + Ok(EventSpend { + nullifier: value + .nullifier + .ok_or(anyhow!("missing `nullifier`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventSpend::NAME)) } } -pub fn output(note_payload: &NotePayload) -> EventOutput { - EventOutput { - note_commitment: Some(note_payload.note_commitment.into()), +impl From for pb::EventSpend { + fn from(value: EventSpend) -> Self { + Self { + nullifier: Some(value.nullifier.into()), + } + } +} + +impl DomainType for EventSpend { + type Proto = pb::EventSpend; +} + +#[derive(Clone, Debug)] +pub struct EventOutput { + pub note_commitment: StateCommitment, +} + +impl TryFrom for EventOutput { + type Error = anyhow::Error; + + fn try_from(value: pb::EventOutput) -> Result { + fn inner(value: pb::EventOutput) -> anyhow::Result { + Ok(EventOutput { + note_commitment: value + .note_commitment + .ok_or(anyhow!("missing `note_commitment`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventOutput::NAME)) } } -pub fn outbound_fungible_token_transfer( - value: Value, - sender: &Address, - receiver: String, - meta: FungibleTokenTransferPacketMetadata, -) -> EventOutboundFungibleTokenTransfer { - EventOutboundFungibleTokenTransfer { - value: Some(value.into()), - sender: Some(sender.into()), - receiver, - meta: Some(meta), +impl From for pb::EventOutput { + fn from(value: EventOutput) -> Self { + Self { + note_commitment: Some(value.note_commitment.into()), + } } } +impl DomainType for EventOutput { + type Proto = pb::EventOutput; +} + +#[derive(Clone, Debug)] +pub struct FungibleTokenTransferPacketMetadata { + pub channel: String, + pub sequence: u64, +} + +impl TryFrom for FungibleTokenTransferPacketMetadata { + type Error = anyhow::Error; + + fn try_from(value: pb::FungibleTokenTransferPacketMetadata) -> Result { + fn inner( + value: pb::FungibleTokenTransferPacketMetadata, + ) -> anyhow::Result { + Ok(FungibleTokenTransferPacketMetadata { + channel: value.channel, + sequence: value.sequence, + }) + } + inner(value).context(format!( + "parsing {}", + pb::FungibleTokenTransferPacketMetadata::NAME + )) + } +} + +impl From for pb::FungibleTokenTransferPacketMetadata { + fn from(value: FungibleTokenTransferPacketMetadata) -> Self { + Self { + channel: value.channel, + sequence: value.sequence, + } + } +} + +impl DomainType for FungibleTokenTransferPacketMetadata { + type Proto = pb::FungibleTokenTransferPacketMetadata; +} + +#[derive(Clone, Debug)] +pub struct EventOutboundFungibleTokenTransfer { + pub value: Value, + pub sender: Address, + pub receiver: String, + pub meta: FungibleTokenTransferPacketMetadata, +} + +impl TryFrom for EventOutboundFungibleTokenTransfer { + type Error = anyhow::Error; + + fn try_from(value: pb::EventOutboundFungibleTokenTransfer) -> Result { + fn inner( + value: pb::EventOutboundFungibleTokenTransfer, + ) -> anyhow::Result { + Ok(EventOutboundFungibleTokenTransfer { + value: value.value.ok_or(anyhow!("missing `value`"))?.try_into()?, + sender: value + .sender + .ok_or(anyhow!("missing `sender`"))? + .try_into()?, + receiver: value.receiver, + meta: value.meta.ok_or(anyhow!("missing `meta`"))?.try_into()?, + }) + } + inner(value).context(format!( + "parsing {}", + pb::EventOutboundFungibleTokenTransfer::NAME + )) + } +} + +impl From for pb::EventOutboundFungibleTokenTransfer { + fn from(value: EventOutboundFungibleTokenTransfer) -> Self { + Self { + value: Some(value.value.into()), + sender: Some(value.sender.into()), + receiver: value.receiver, + meta: Some(value.meta.into()), + } + } +} + +impl DomainType for EventOutboundFungibleTokenTransfer { + type Proto = pb::EventOutboundFungibleTokenTransfer; +} + #[derive(Clone, Copy, Debug)] +#[repr(i32)] pub enum FungibleTokenRefundReason { - Timeout, - Error, -} - -pub fn outbound_fungible_token_refund( - value: Value, - sender: &Address, - receiver: String, - reason: FungibleTokenRefundReason, - meta: FungibleTokenTransferPacketMetadata, -) -> EventOutboundFungibleTokenRefund { - let reason = match reason { - FungibleTokenRefundReason::Timeout => Reason::Timeout, - FungibleTokenRefundReason::Error => Reason::Error, - }; - EventOutboundFungibleTokenRefund { - value: Some(value.into()), - sender: Some(sender.into()), - receiver, - reason: reason as i32, - meta: Some(meta), + Unspecified = 0, + Timeout = 1, + Error = 2, +} + +#[derive(Clone, Debug)] +pub struct EventOutboundFungibleTokenRefund { + pub value: Value, + pub sender: Address, + pub receiver: String, + pub reason: FungibleTokenRefundReason, + pub meta: FungibleTokenTransferPacketMetadata, +} + +impl TryFrom for EventOutboundFungibleTokenRefund { + type Error = anyhow::Error; + + fn try_from(value: pb::EventOutboundFungibleTokenRefund) -> Result { + fn inner( + value: pb::EventOutboundFungibleTokenRefund, + ) -> anyhow::Result { + use pb::event_outbound_fungible_token_refund::Reason; + let reason = match value.reason() { + Reason::Timeout => FungibleTokenRefundReason::Timeout, + Reason::Error => FungibleTokenRefundReason::Error, + Reason::Unspecified => FungibleTokenRefundReason::Unspecified, + }; + Ok(EventOutboundFungibleTokenRefund { + value: value.value.ok_or(anyhow!("missing `value`"))?.try_into()?, + sender: value + .sender + .ok_or(anyhow!("missing `sender`"))? + .try_into()?, + receiver: value.receiver, + reason, + meta: value.meta.ok_or(anyhow!("missing `meta`"))?.try_into()?, + }) + } + inner(value).context(format!( + "parsing {}", + pb::EventOutboundFungibleTokenRefund::NAME + )) } } -pub fn inbound_fungible_token_transfer( - value: Value, - sender: String, - receiver: &Address, - meta: FungibleTokenTransferPacketMetadata, -) -> EventInboundFungibleTokenTransfer { - EventInboundFungibleTokenTransfer { - value: Some(value.into()), - sender, - receiver: Some(receiver.into()), - meta: Some(meta), +impl From for pb::EventOutboundFungibleTokenRefund { + fn from(value: EventOutboundFungibleTokenRefund) -> Self { + Self { + value: Some(value.value.into()), + sender: Some(value.sender.into()), + receiver: value.receiver, + reason: value.reason as i32, + meta: Some(value.meta.into()), + } } } + +impl DomainType for EventOutboundFungibleTokenRefund { + type Proto = pb::EventOutboundFungibleTokenRefund; +} + +#[derive(Clone, Debug)] +pub struct EventInboundFungibleTokenTransfer { + pub value: Value, + pub sender: String, + pub receiver: Address, + pub meta: FungibleTokenTransferPacketMetadata, +} + +impl TryFrom for EventInboundFungibleTokenTransfer { + type Error = anyhow::Error; + + fn try_from(value: pb::EventInboundFungibleTokenTransfer) -> Result { + fn inner( + value: pb::EventInboundFungibleTokenTransfer, + ) -> anyhow::Result { + Ok(EventInboundFungibleTokenTransfer { + value: value.value.ok_or(anyhow!("missing `value`"))?.try_into()?, + sender: value.sender, + receiver: value + .receiver + .ok_or(anyhow!("missing `receiver`"))? + .try_into()?, + meta: value.meta.ok_or(anyhow!("missing `meta`"))?.try_into()?, + }) + } + inner(value).context(format!( + "parsing {}", + pb::EventInboundFungibleTokenTransfer::NAME + )) + } +} + +impl From for pb::EventInboundFungibleTokenTransfer { + fn from(value: EventInboundFungibleTokenTransfer) -> Self { + Self { + value: Some(value.value.into()), + sender: value.sender, + receiver: Some(value.receiver.into()), + meta: Some(value.meta.into()), + } + } +} + +impl DomainType for EventInboundFungibleTokenTransfer { + type Proto = pb::EventInboundFungibleTokenTransfer; +} diff --git a/crates/core/component/stake/src/component/action_handler/delegate.rs b/crates/core/component/stake/src/component/action_handler/delegate.rs index f27231f1d7..5b01bc9963 100644 --- a/crates/core/component/stake/src/component/action_handler/delegate.rs +++ b/crates/core/component/stake/src/component/action_handler/delegate.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use cnidarium::StateWrite; use cnidarium_component::ActionHandler; use penumbra_num::Amount; -use penumbra_proto::StateWriteProto; +use penumbra_proto::{DomainType, StateWriteProto}; use penumbra_sct::component::clock::EpochRead; use crate::{ @@ -132,7 +132,7 @@ impl ActionHandler for Delegate { // We queue the delegation so it can be processed at the epoch boundary. tracing::debug!(?self, "queuing delegation for next epoch"); state.push_delegation(self.clone()); - state.record_proto(event::delegate(self)); + state.record_proto(event::EventDelegate::from(self).to_proto()); Ok(()) } } diff --git a/crates/core/component/stake/src/component/action_handler/undelegate.rs b/crates/core/component/stake/src/component/action_handler/undelegate.rs index 6edfa1eaa8..da3947ee01 100644 --- a/crates/core/component/stake/src/component/action_handler/undelegate.rs +++ b/crates/core/component/stake/src/component/action_handler/undelegate.rs @@ -1,7 +1,7 @@ use anyhow::Result; use async_trait::async_trait; use cnidarium::StateWrite; -use penumbra_proto::StateWriteProto; +use penumbra_proto::{DomainType as _, StateWriteProto}; use penumbra_sct::component::clock::EpochRead; use penumbra_shielded_pool::component::AssetRegistry; @@ -85,7 +85,7 @@ impl ActionHandler for Undelegate { tracing::debug!(?self, "queuing undelegation for next epoch"); state.push_undelegation(self.clone()); - state.record_proto(event::undelegate(self)); + state.record_proto(event::EventUndelegate::from(self).to_proto()); Ok(()) } diff --git a/crates/core/component/stake/src/component/stake.rs b/crates/core/component/stake/src/component/stake.rs index c2984312d2..98b8bf8139 100644 --- a/crates/core/component/stake/src/component/stake.rs +++ b/crates/core/component/stake/src/component/stake.rs @@ -1,6 +1,6 @@ pub mod address; -use crate::event::slashing_penalty_applied; +use crate::event::EventSlashingPenaltyApplied; use crate::params::StakeParameters; use crate::rate::BaseRateData; use crate::validator::{self, Validator}; @@ -15,7 +15,7 @@ use cnidarium::{StateRead, StateWrite}; use cnidarium_component::Component; use futures::{StreamExt, TryStreamExt}; use penumbra_num::Amount; -use penumbra_proto::{StateReadProto, StateWriteProto}; +use penumbra_proto::{DomainType, StateReadProto, StateWriteProto}; use penumbra_sct::component::clock::EpochRead; use std::pin::Pin; use std::str::FromStr; @@ -475,11 +475,14 @@ pub(crate) trait RateDataWrite: StateWrite { let new_penalty = current_penalty.compound(slashing_penalty); // Emit an event indicating the validator had a slashing penalty applied. - self.record_proto(slashing_penalty_applied( - *identity_key, - current_epoch_index, - new_penalty, - )); + self.record_proto( + EventSlashingPenaltyApplied { + identity_key: *identity_key, + epoch_index: current_epoch_index, + new_penalty, + } + .to_proto(), + ); self.put( state_key::penalty::for_id_in_epoch(identity_key, current_epoch_index), new_penalty, diff --git a/crates/core/component/stake/src/component/validator_handler/uptime_tracker.rs b/crates/core/component/stake/src/component/validator_handler/uptime_tracker.rs index c22e77abe9..b3b9999165 100644 --- a/crates/core/component/stake/src/component/validator_handler/uptime_tracker.rs +++ b/crates/core/component/stake/src/component/validator_handler/uptime_tracker.rs @@ -17,7 +17,7 @@ use { async_trait::async_trait, cnidarium::StateWrite, futures::StreamExt as _, - penumbra_proto::StateWriteProto, + penumbra_proto::{DomainType, StateWriteProto}, penumbra_sct::component::clock::EpochRead, std::collections::BTreeMap, tap::Tap, @@ -178,7 +178,7 @@ pub trait ValidatorUptimeTracker: StateWrite { if !voted { // If the validator didn't sign, we need to emit a missed block event. - self.record_proto(event::validator_missed_block(identity_key)); + self.record_proto(event::EventValidatorMissedBlock { identity_key }.to_proto()); } uptime.mark_height_as_signed(height, voted)?; diff --git a/crates/core/component/stake/src/component/validator_handler/validator_manager.rs b/crates/core/component/stake/src/component/validator_handler/validator_manager.rs index 2fa17fc577..2064291966 100644 --- a/crates/core/component/stake/src/component/validator_handler/validator_manager.rs +++ b/crates/core/component/stake/src/component/validator_handler/validator_manager.rs @@ -24,7 +24,7 @@ use { cnidarium::StateWrite, penumbra_asset::asset, penumbra_num::Amount, - penumbra_proto::StateWriteProto, + penumbra_proto::{DomainType as _, StateWriteProto}, penumbra_sct::component::{ clock::{EpochManager, EpochRead}, StateReadExt as _, @@ -302,7 +302,13 @@ pub trait ValidatorManager: StateWrite { tracing::info!("successful state transition"); self.put(validator_state_path, new_state); - self.record_proto(event::validator_state_change(*identity_key, new_state)); + self.record_proto( + event::EventValidatorStateChange { + identity_key: *identity_key, + state: new_state, + } + .to_proto(), + ); Ok((old_state, new_state)) } @@ -481,7 +487,12 @@ pub trait ValidatorManager: StateWrite { // Track the validator's definition in an event (the rest of the attributes will be tracked // in events emitted by the calls to set_* methods below). - self.record_proto(event::validator_definition_upload(validator.clone())); + self.record_proto( + event::EventValidatorDefinitionUpload { + validator: validator.clone(), + } + .to_proto(), + ); // We initialize the validator's state, power, and bonding state. self.set_initial_validator_state(&validator_identity, initial_state)?; @@ -599,7 +610,7 @@ pub trait ValidatorManager: StateWrite { ); // Track the validator's definition in an event. - self.record_proto(event::validator_definition_upload(validator)); + self.record_proto(event::EventValidatorDefinitionUpload { validator }.to_proto()); Ok(()) } @@ -664,11 +675,14 @@ pub trait ValidatorManager: StateWrite { if let (Inactive | Jailed | Active, Tombstoned) = (old_state, new_state) { let current_height = self.get_block_height().await?; - self.record_proto(event::tombstone_validator( - current_height, - validator.identity_key.clone(), - evidence, - )); + self.record_proto( + event::EventTombstoneValidator::from_evidence( + current_height, + validator.identity_key.clone(), + evidence, + ) + .to_proto(), + ); } Ok(()) diff --git a/crates/core/component/stake/src/component/validator_handler/validator_store.rs b/crates/core/component/stake/src/component/validator_handler/validator_store.rs index 01e6e00bb4..b3e7508ce1 100644 --- a/crates/core/component/stake/src/component/validator_handler/validator_store.rs +++ b/crates/core/component/stake/src/component/validator_handler/validator_store.rs @@ -253,7 +253,13 @@ pub(crate) trait ValidatorDataWrite: StateWrite { state_key::validators::pool::bonding_state::by_id(identity_key), state.clone(), ); - self.record_proto(event::validator_bonding_state_change(*identity_key, state)); + self.record_proto( + event::EventValidatorBondingStateChange { + identity_key: *identity_key, + bonding_state: state, + } + .to_proto(), + ); } #[instrument(skip(self))] @@ -270,10 +276,13 @@ pub(crate) trait ValidatorDataWrite: StateWrite { state_key::validators::power::by_id(identity_key), voting_power, ); - self.record_proto(event::validator_voting_power_change( - *identity_key, - voting_power, - )); + self.record_proto( + event::EventValidatorVotingPowerChange { + identity_key: *identity_key, + voting_power, + } + .to_proto(), + ); Ok(()) } @@ -290,7 +299,13 @@ pub(crate) trait ValidatorDataWrite: StateWrite { } self.put(state_key::validators::state::by_id(id), initial_state); - self.record_proto(event::validator_state_change(*id, initial_state)); + self.record_proto( + event::EventValidatorStateChange { + identity_key: *id, + state: initial_state, + } + .to_proto(), + ); Ok(()) } @@ -301,7 +316,13 @@ pub(crate) trait ValidatorDataWrite: StateWrite { state_key::validators::rate::current_by_id(identity_key), rate_data.clone(), ); - self.record_proto(event::validator_rate_data_change(*identity_key, rate_data)); + self.record_proto( + event::EventRateDataChange { + identity_key: *identity_key, + rate_data, + } + .to_proto(), + ); } #[instrument(skip(self))] diff --git a/crates/core/component/stake/src/event.rs b/crates/core/component/stake/src/event.rs index 86de0322b2..cb0d184af4 100644 --- a/crates/core/component/stake/src/event.rs +++ b/crates/core/component/stake/src/event.rs @@ -1,100 +1,448 @@ use crate::{ - rate, + rate::RateData, validator::{BondingState, State, Validator}, Delegate, IdentityKey, Penalty, Undelegate, }; +use anyhow::{anyhow, Context as _}; use penumbra_num::Amount; -use penumbra_proto::core::component::stake::v1 as pb; +use penumbra_proto::{core::component::stake::v1 as pb, DomainType, Name as _}; use tendermint::abci::types::Misbehavior; -pub fn validator_state_change( - identity_key: IdentityKey, - state: State, -) -> pb::EventValidatorStateChange { - pb::EventValidatorStateChange { - identity_key: Some(identity_key.into()), - state: Some(state.into()), +#[derive(Clone, Debug)] +pub struct EventValidatorStateChange { + pub identity_key: IdentityKey, + pub state: State, +} + +impl TryFrom for EventValidatorStateChange { + type Error = anyhow::Error; + + fn try_from(value: pb::EventValidatorStateChange) -> Result { + fn inner( + value: pb::EventValidatorStateChange, + ) -> anyhow::Result { + Ok(EventValidatorStateChange { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + state: value.state.ok_or(anyhow!("missing `state`"))?.try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventValidatorStateChange::NAME)) } } -pub fn validator_voting_power_change( - identity_key: IdentityKey, - voting_power: Amount, -) -> pb::EventValidatorVotingPowerChange { - pb::EventValidatorVotingPowerChange { - identity_key: Some(identity_key.into()), - voting_power: Some(voting_power.into()), +impl From for pb::EventValidatorStateChange { + fn from(value: EventValidatorStateChange) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + state: Some(value.state.into()), + } } } -pub fn validator_bonding_state_change( - identity_key: IdentityKey, - bonding_state: BondingState, -) -> pb::EventValidatorBondingStateChange { - pb::EventValidatorBondingStateChange { - identity_key: Some(identity_key.into()), - bonding_state: Some(bonding_state.into()), +impl DomainType for EventValidatorStateChange { + type Proto = pb::EventValidatorStateChange; +} + +#[derive(Clone, Debug)] +pub struct EventValidatorVotingPowerChange { + pub identity_key: IdentityKey, + pub voting_power: Amount, +} + +impl TryFrom for EventValidatorVotingPowerChange { + type Error = anyhow::Error; + + fn try_from(value: pb::EventValidatorVotingPowerChange) -> Result { + fn inner( + value: pb::EventValidatorVotingPowerChange, + ) -> anyhow::Result { + Ok(EventValidatorVotingPowerChange { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + voting_power: value + .voting_power + .ok_or(anyhow!("missing `voting_power`"))? + .try_into()?, + }) + } + inner(value).context(format!( + "parsing {}", + pb::EventValidatorVotingPowerChange::NAME + )) } } -pub fn validator_rate_data_change( - identity_key: IdentityKey, - rate_data: rate::RateData, -) -> pb::EventRateDataChange { - pb::EventRateDataChange { - identity_key: Some(identity_key.into()), - rate_data: Some(rate_data.into()), +impl From for pb::EventValidatorVotingPowerChange { + fn from(value: EventValidatorVotingPowerChange) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + voting_power: Some(value.voting_power.into()), + } } } -pub fn validator_definition_upload(validator: Validator) -> pb::EventValidatorDefinitionUpload { - pb::EventValidatorDefinitionUpload { - validator: Some(validator.into()), +impl DomainType for EventValidatorVotingPowerChange { + type Proto = pb::EventValidatorVotingPowerChange; +} + +#[derive(Clone, Debug)] +pub struct EventValidatorBondingStateChange { + pub identity_key: IdentityKey, + pub bonding_state: BondingState, +} + +impl TryFrom for EventValidatorBondingStateChange { + type Error = anyhow::Error; + + fn try_from(value: pb::EventValidatorBondingStateChange) -> Result { + fn inner( + value: pb::EventValidatorBondingStateChange, + ) -> anyhow::Result { + Ok(EventValidatorBondingStateChange { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + bonding_state: value + .bonding_state + .ok_or(anyhow!("missing `bonding_state`"))? + .try_into()?, + }) + } + inner(value).context(format!( + "parsing {}", + pb::EventValidatorBondingStateChange::NAME + )) } } -pub fn validator_missed_block(identity_key: IdentityKey) -> pb::EventValidatorMissedBlock { - pb::EventValidatorMissedBlock { - identity_key: Some(identity_key.into()), +impl From for pb::EventValidatorBondingStateChange { + fn from(value: EventValidatorBondingStateChange) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + bonding_state: Some(value.bonding_state.into()), + } } } -pub fn delegate(delegate: &Delegate) -> pb::EventDelegate { - pb::EventDelegate { - identity_key: Some(delegate.validator_identity.into()), - amount: Some(delegate.unbonded_amount.into()), +impl DomainType for EventValidatorBondingStateChange { + type Proto = pb::EventValidatorBondingStateChange; +} + +#[derive(Clone, Debug)] +pub struct EventRateDataChange { + pub identity_key: IdentityKey, + pub rate_data: RateData, +} + +impl TryFrom for EventRateDataChange { + type Error = anyhow::Error; + + fn try_from(value: pb::EventRateDataChange) -> Result { + fn inner(value: pb::EventRateDataChange) -> anyhow::Result { + Ok(EventRateDataChange { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + rate_data: value + .rate_data + .ok_or(anyhow!("missing `rate_data`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventRateDataChange::NAME)) } } -pub fn undelegate(undelegate: &Undelegate) -> pb::EventUndelegate { - pb::EventUndelegate { - identity_key: Some(undelegate.validator_identity.into()), - amount: Some(undelegate.unbonded_amount.into()), +impl From for pb::EventRateDataChange { + fn from(value: EventRateDataChange) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + rate_data: Some(value.rate_data.into()), + } } } -pub fn tombstone_validator( - current_height: u64, - identity_key: IdentityKey, - evidence: &Misbehavior, -) -> pb::EventTombstoneValidator { - pb::EventTombstoneValidator { - evidence_height: evidence.height.value(), - current_height, - identity_key: Some(identity_key.into()), - address: evidence.validator.address.to_vec(), - voting_power: evidence.validator.power.value(), +impl DomainType for EventRateDataChange { + type Proto = pb::EventRateDataChange; +} + +#[derive(Clone, Debug)] +pub struct EventValidatorDefinitionUpload { + pub validator: Validator, +} + +impl TryFrom for EventValidatorDefinitionUpload { + type Error = anyhow::Error; + + fn try_from(value: pb::EventValidatorDefinitionUpload) -> Result { + fn inner( + value: pb::EventValidatorDefinitionUpload, + ) -> anyhow::Result { + Ok(EventValidatorDefinitionUpload { + validator: value + .validator + .ok_or(anyhow!("missing `validator`"))? + .try_into()?, + }) + } + inner(value).context(format!( + "parsing {}", + pb::EventValidatorDefinitionUpload::NAME + )) } } -pub fn slashing_penalty_applied( - identity_key: IdentityKey, - epoch_index: u64, - new_penalty: Penalty, -) -> pb::EventSlashingPenaltyApplied { - pb::EventSlashingPenaltyApplied { - identity_key: Some(identity_key.into()), - epoch_index, - new_penalty: Some(new_penalty.into()), +impl From for pb::EventValidatorDefinitionUpload { + fn from(value: EventValidatorDefinitionUpload) -> Self { + Self { + validator: Some(value.validator.into()), + } + } +} + +impl DomainType for EventValidatorDefinitionUpload { + type Proto = pb::EventValidatorDefinitionUpload; +} + +#[derive(Clone, Debug)] +pub struct EventValidatorMissedBlock { + pub identity_key: IdentityKey, +} + +impl TryFrom for EventValidatorMissedBlock { + type Error = anyhow::Error; + + fn try_from(value: pb::EventValidatorMissedBlock) -> Result { + fn inner( + value: pb::EventValidatorMissedBlock, + ) -> anyhow::Result { + Ok(EventValidatorMissedBlock { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventValidatorMissedBlock::NAME)) + } +} + +impl From for pb::EventValidatorMissedBlock { + fn from(value: EventValidatorMissedBlock) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + } + } +} + +impl DomainType for EventValidatorMissedBlock { + type Proto = pb::EventValidatorMissedBlock; +} + +#[derive(Clone, Debug)] +pub struct EventDelegate { + pub identity_key: IdentityKey, + pub amount: Amount, +} + +impl From<&Delegate> for EventDelegate { + fn from(value: &Delegate) -> Self { + Self { + identity_key: value.validator_identity, + amount: value.unbonded_amount, + } + } +} + +impl TryFrom for EventDelegate { + type Error = anyhow::Error; + + fn try_from(value: pb::EventDelegate) -> Result { + fn inner(value: pb::EventDelegate) -> anyhow::Result { + Ok(EventDelegate { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + amount: value + .amount + .ok_or(anyhow!("missing `amount`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventDelegate::NAME)) } } + +impl From for pb::EventDelegate { + fn from(value: EventDelegate) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + amount: Some(value.amount.into()), + } + } +} + +impl DomainType for EventDelegate { + type Proto = pb::EventDelegate; +} + +#[derive(Clone, Debug)] +pub struct EventUndelegate { + pub identity_key: IdentityKey, + pub amount: Amount, +} + +impl From<&Undelegate> for EventUndelegate { + fn from(value: &Undelegate) -> Self { + Self { + identity_key: value.validator_identity, + amount: value.unbonded_amount, + } + } +} + +impl TryFrom for EventUndelegate { + type Error = anyhow::Error; + + fn try_from(value: pb::EventUndelegate) -> Result { + fn inner(value: pb::EventUndelegate) -> anyhow::Result { + Ok(EventUndelegate { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + amount: value + .amount + .ok_or(anyhow!("missing `amount`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventUndelegate::NAME)) + } +} + +impl From for pb::EventUndelegate { + fn from(value: EventUndelegate) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + amount: Some(value.amount.into()), + } + } +} + +impl DomainType for EventUndelegate { + type Proto = pb::EventUndelegate; +} + +#[derive(Clone, Debug)] +pub struct EventTombstoneValidator { + pub evidence_height: u64, + pub current_height: u64, + pub identity_key: IdentityKey, + pub address: Vec, + pub voting_power: u64, +} + +impl EventTombstoneValidator { + pub fn from_evidence( + current_height: u64, + identity_key: IdentityKey, + evidence: &Misbehavior, + ) -> Self { + Self { + evidence_height: evidence.height.value(), + current_height, + identity_key, + address: evidence.validator.address.to_vec(), + voting_power: evidence.validator.power.value(), + } + } +} + +impl TryFrom for EventTombstoneValidator { + type Error = anyhow::Error; + + fn try_from(value: pb::EventTombstoneValidator) -> Result { + fn inner(value: pb::EventTombstoneValidator) -> anyhow::Result { + Ok(EventTombstoneValidator { + evidence_height: value.evidence_height, + current_height: value.current_height, + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + address: value.address, + voting_power: value.voting_power, + }) + } + inner(value).context(format!("parsing {}", pb::EventTombstoneValidator::NAME)) + } +} + +impl From for pb::EventTombstoneValidator { + fn from(value: EventTombstoneValidator) -> Self { + Self { + evidence_height: value.evidence_height, + current_height: value.current_height, + identity_key: Some(value.identity_key.into()), + address: value.address, + voting_power: value.voting_power, + } + } +} + +impl DomainType for EventTombstoneValidator { + type Proto = pb::EventTombstoneValidator; +} + +#[derive(Clone, Debug)] +pub struct EventSlashingPenaltyApplied { + pub identity_key: IdentityKey, + pub epoch_index: u64, + pub new_penalty: Penalty, +} + +impl TryFrom for EventSlashingPenaltyApplied { + type Error = anyhow::Error; + + fn try_from(value: pb::EventSlashingPenaltyApplied) -> Result { + fn inner( + value: pb::EventSlashingPenaltyApplied, + ) -> anyhow::Result { + Ok(EventSlashingPenaltyApplied { + identity_key: value + .identity_key + .ok_or(anyhow!("missing `identity_key`"))? + .try_into()?, + epoch_index: value.epoch_index, + new_penalty: value + .new_penalty + .ok_or(anyhow!("missing `new_penalty`"))? + .try_into()?, + }) + } + inner(value).context(format!("parsing {}", pb::EventSlashingPenaltyApplied::NAME)) + } +} + +impl From for pb::EventSlashingPenaltyApplied { + fn from(value: EventSlashingPenaltyApplied) -> Self { + Self { + identity_key: Some(value.identity_key.into()), + epoch_index: value.epoch_index, + new_penalty: Some(value.new_penalty.into()), + } + } +} + +impl DomainType for EventSlashingPenaltyApplied { + type Proto = pb::EventSlashingPenaltyApplied; +} diff --git a/crates/core/component/stake/src/lib.rs b/crates/core/component/stake/src/lib.rs index eb574260df..ccaab1c2be 100644 --- a/crates/core/component/stake/src/lib.rs +++ b/crates/core/component/stake/src/lib.rs @@ -6,7 +6,7 @@ mod changes; mod current_consensus_keys; mod delegation_token; -mod event; +pub mod event; mod governance_key; mod identity_key; mod penalty;