Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement view service restarts in summonerd #3689

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions crates/view/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ use ark_std::UniformRand;
use async_stream::try_stream;
use camino::Utf8Path;
use decaf377::Fq;
use futures::stream::{StreamExt, TryStreamExt};
use futures::{
stream::{StreamExt, TryStreamExt},
FutureExt,
};
use rand::Rng;
use rand_core::OsRng;
use tokio::sync::{watch, RwLock};
use tokio::{
sync::{watch, RwLock},
task::JoinHandle,
};
use tokio_stream::wrappers::WatchStream;
use tonic::{async_trait, transport::Channel, Request, Response, Status};
use tracing::instrument;
Expand Down Expand Up @@ -82,6 +88,8 @@ pub struct ViewService {
node: Url,
/// Used to watch for changes to the sync height.
sync_height_rx: watch::Receiver<u64>,
/// The handle to the spawned worker task, allowing us to cancel it.
worker_handle: Arc<JoinHandle<()>>,
}

impl ViewService {
Expand All @@ -107,17 +115,30 @@ impl ViewService {
let (worker, sct, error_slot, sync_height_rx) =
Worker::new(storage.clone(), node.clone()).await?;

tokio::spawn(worker.run());
let worker_handle = tokio::spawn(worker.run().map(|_| ()));

Ok(Self {
storage,
error_slot,
sync_height_rx,
state_commitment_tree: sct,
node,
worker_handle: Arc::new(worker_handle),
})
}

/// Close this view service, stopping the background syncing task.
///
/// This will have a spoooky action at a distance in that other cloned instances of this
/// service sharing the syncing task will also be silently broken, so be careful when using
/// this method.
///
/// Also this is sort of a stop-gap for a more robust view service where this method won't be
/// necessary, so one should exercise skepticism that this method even exists.
pub async fn abort(&self) {
self.worker_handle.abort();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random observation, but if you want some stronger guarantees about the timing of the join handle cancellation, you can immediately await the worker handle and that will make this method return when those resources are actually released. It can be useful for testing.

}

async fn check_worker(&self) -> Result<(), tonic::Status> {
// If the shared error slot is set, then an error has occurred in the worker
// that we should bubble up.
Expand Down
80 changes: 77 additions & 3 deletions tools/summonerd/src/penumbra_knower.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,72 @@
use std::{
sync::Arc,
time::{Duration, Instant},
};

use anyhow::Result;
use camino::Utf8Path;
use penumbra_asset::STAKING_TOKEN_ASSET_ID;
use penumbra_keys::{Address, FullViewingKey};
use penumbra_num::Amount;
use penumbra_view::{Storage, ViewService};
use tokio::sync::Mutex;
use url::Url;

/// The amount of time to wait for a new block before restarting the view service.
const RESTART_TIME_SECS: u64 = 20;

/// The actions the watcher can ask us to do.
#[derive(Debug, Clone, Copy)]
enum WatcherAdvice {
DoNothing,
RestartTheViewService,
}

/// A stateful adviser on restarting view services.
#[derive(Debug, Clone, Copy)]
struct Watcher {
sync_height: u64,
sync_time: Option<Instant>,
}

impl Watcher {
pub fn new() -> Self {
Self {
sync_height: 0,
sync_time: None,
}
}

pub fn what_should_i_do(&mut self, sync_height: Option<u64>, now: Instant) -> WatcherAdvice {
let sync_height = sync_height.unwrap_or(0u64);
if sync_height > self.sync_height {
self.sync_height = sync_height;
self.sync_time = Some(now);
return WatcherAdvice::DoNothing;
}
match self.sync_time {
Some(then) if now.duration_since(then) >= Duration::from_secs(RESTART_TIME_SECS) => {
WatcherAdvice::RestartTheViewService
}
_ => WatcherAdvice::DoNothing,
}
}
}

/// Knows things about a running penumbra system, requires internet connectivity
#[derive(Clone)]
pub struct PenumbraKnower {
// Nota bene that this is the storage from the view service, and is how
// we get the specific information we need, as this will get populated
// by the view service.
storage: Storage,
// The node the view service will use.
node: Url,
// Not sure if storing this is necessary, but seems like a good idea to avoid things getting
// dropped on the floor
_view: ViewService,
view: Arc<Mutex<ViewService>>,
// Some state for calculating whether ro restart the view service.
watcher: Arc<Mutex<Watcher>>,
}

impl PenumbraKnower {
Expand All @@ -28,14 +79,37 @@ impl PenumbraKnower {
node: Url,
) -> Result<Self> {
let storage = Storage::load_or_initialize(Some(storage_path), fvk, node.clone()).await?;
let view = ViewService::new(storage.clone(), node).await?;
let view = ViewService::new(storage.clone(), node.clone()).await?;
Ok(Self {
storage,
_view: view,
node,
view: Arc::new(Mutex::new(view)),
watcher: Arc::new(Mutex::new(Watcher::new())),
})
}

async fn restart_view_service_if_necesary(&self) -> Result<()> {
let sync_height = self.storage.last_sync_height().await?;
match self
.watcher
.lock()
.await
.what_should_i_do(sync_height, Instant::now())
{
WatcherAdvice::DoNothing => {}
WatcherAdvice::RestartTheViewService => {
tracing::info!("restarting the view service");
let new_view = ViewService::new(self.storage.clone(), self.node.clone()).await?;
let mut view = self.view.lock().await;
std::mem::replace(&mut *view, new_view).abort().await;
}
}
Ok(())
}

pub async fn total_amount_sent_to_me(&self, by: &Address) -> Result<Amount> {
self.restart_view_service_if_necesary().await?;

let notes = self.storage.notes_by_sender(by).await?;
let what_i_want = STAKING_TOKEN_ASSET_ID.to_owned();
let mut total = Amount::zero();
Expand Down
Loading