From 71c7f25a2758f3b1afed4dfb7798d3ac3846540e Mon Sep 17 00:00:00 2001 From: Lucas Meier Date: Fri, 26 Jan 2024 16:12:35 -0800 Subject: [PATCH] Implement view service restarts in summonerd This uses the database being stale as a source of truth as to a lousy view service --- crates/view/src/service.rs | 27 ++++++++- tools/summonerd/src/penumbra_knower.rs | 80 +++++++++++++++++++++++++- 2 files changed, 101 insertions(+), 6 deletions(-) diff --git a/crates/view/src/service.rs b/crates/view/src/service.rs index fc62244d65..0f822f15f8 100644 --- a/crates/view/src/service.rs +++ b/crates/view/src/service.rs @@ -9,10 +9,16 @@ use ark_std::UniformRand; use async_stream::try_stream; use camino::Utf8Path; use decaf377::Fq; -use futures::stream::{StreamExt, TryStreamExt}; +use futures::{ + stream::{StreamExt, TryStreamExt}, + FutureExt, +}; use rand::Rng; use rand_core::OsRng; -use tokio::sync::{watch, RwLock}; +use tokio::{ + sync::{watch, RwLock}, + task::JoinHandle, +}; use tokio_stream::wrappers::WatchStream; use tonic::{async_trait, transport::Channel, Request, Response, Status}; use tracing::instrument; @@ -82,6 +88,8 @@ pub struct ViewService { node: Url, /// Used to watch for changes to the sync height. sync_height_rx: watch::Receiver, + /// The handle to the spawned worker task, allowing us to cancel it. + worker_handle: Arc>, } impl ViewService { @@ -107,7 +115,7 @@ impl ViewService { let (worker, sct, error_slot, sync_height_rx) = Worker::new(storage.clone(), node.clone()).await?; - tokio::spawn(worker.run()); + let worker_handle = tokio::spawn(worker.run().map(|_| ())); Ok(Self { storage, @@ -115,9 +123,22 @@ impl ViewService { sync_height_rx, state_commitment_tree: sct, node, + worker_handle: Arc::new(worker_handle), }) } + /// Close this view service, stopping the background syncing task. + /// + /// This will have a spoooky action at a distance in that other cloned instances of this + /// service sharing the syncing task will also be silently broken, so be careful when using + /// this method. + /// + /// Also this is sort of a stop-gap for a more robust view service where this method won't be + /// necessary, so one should exercise skepticism that this method even exists. + pub async fn abort(&self) { + self.worker_handle.abort(); + } + async fn check_worker(&self) -> Result<(), tonic::Status> { // If the shared error slot is set, then an error has occurred in the worker // that we should bubble up. diff --git a/tools/summonerd/src/penumbra_knower.rs b/tools/summonerd/src/penumbra_knower.rs index 5c03dcb7f4..7bccf97fc4 100644 --- a/tools/summonerd/src/penumbra_knower.rs +++ b/tools/summonerd/src/penumbra_knower.rs @@ -1,11 +1,58 @@ +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + use anyhow::Result; use camino::Utf8Path; use penumbra_asset::STAKING_TOKEN_ASSET_ID; use penumbra_keys::{Address, FullViewingKey}; use penumbra_num::Amount; use penumbra_view::{Storage, ViewService}; +use tokio::sync::Mutex; use url::Url; +/// The amount of time to wait for a new block before restarting the view service. +const RESTART_TIME_SECS: u64 = 20; + +/// The actions the watcher can ask us to do. +#[derive(Debug, Clone, Copy)] +enum WatcherAdvice { + DoNothing, + RestartTheViewService, +} + +/// A stateful adviser on restarting view services. +#[derive(Debug, Clone, Copy)] +struct Watcher { + sync_height: u64, + sync_time: Option, +} + +impl Watcher { + pub fn new() -> Self { + Self { + sync_height: 0, + sync_time: None, + } + } + + pub fn what_should_i_do(&mut self, sync_height: Option, now: Instant) -> WatcherAdvice { + let sync_height = sync_height.unwrap_or(0u64); + if sync_height > self.sync_height { + self.sync_height = sync_height; + self.sync_time = Some(now); + return WatcherAdvice::DoNothing; + } + match self.sync_time { + Some(then) if now.duration_since(then) >= Duration::from_secs(RESTART_TIME_SECS) => { + WatcherAdvice::RestartTheViewService + } + _ => WatcherAdvice::DoNothing, + } + } +} + /// Knows things about a running penumbra system, requires internet connectivity #[derive(Clone)] pub struct PenumbraKnower { @@ -13,9 +60,13 @@ pub struct PenumbraKnower { // we get the specific information we need, as this will get populated // by the view service. storage: Storage, + // The node the view service will use. + node: Url, // Not sure if storing this is necessary, but seems like a good idea to avoid things getting // dropped on the floor - _view: ViewService, + view: Arc>, + // Some state for calculating whether ro restart the view service. + watcher: Arc>, } impl PenumbraKnower { @@ -28,14 +79,37 @@ impl PenumbraKnower { node: Url, ) -> Result { let storage = Storage::load_or_initialize(Some(storage_path), fvk, node.clone()).await?; - let view = ViewService::new(storage.clone(), node).await?; + let view = ViewService::new(storage.clone(), node.clone()).await?; Ok(Self { storage, - _view: view, + node, + view: Arc::new(Mutex::new(view)), + watcher: Arc::new(Mutex::new(Watcher::new())), }) } + async fn restart_view_service_if_necesary(&self) -> Result<()> { + let sync_height = self.storage.last_sync_height().await?; + match self + .watcher + .lock() + .await + .what_should_i_do(sync_height, Instant::now()) + { + WatcherAdvice::DoNothing => {} + WatcherAdvice::RestartTheViewService => { + tracing::info!("restarting the view service"); + let new_view = ViewService::new(self.storage.clone(), self.node.clone()).await?; + let mut view = self.view.lock().await; + std::mem::replace(&mut *view, new_view).abort().await; + } + } + Ok(()) + } + pub async fn total_amount_sent_to_me(&self, by: &Address) -> Result { + self.restart_view_service_if_necesary().await?; + let notes = self.storage.notes_by_sender(by).await?; let what_i_want = STAKING_TOKEN_ASSET_ID.to_owned(); let mut total = Amount::zero();