Skip to content

Commit

Permalink
Merge pull request fedimint#6475 from tvolk131/gateway_owns_task_group
Browse files Browse the repository at this point in the history
refactor: gateway owns its task group
  • Loading branch information
elsirion authored Nov 29, 2024
2 parents a9a2146 + 79ffc2b commit 56462da
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
5 changes: 1 addition & 4 deletions gateway/ln-gateway/src/bin/gatewayd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use std::sync::Arc;

use fedimint_core::fedimint_build_code_version_env;
use fedimint_core::task::TaskGroup;
use fedimint_core::util::handle_version_hash_command;
use fedimint_logging::TracingSetup;
#[cfg(not(target_env = "msvc"))]
Expand All @@ -29,10 +28,8 @@ fn main() -> Result<(), anyhow::Error> {
runtime.block_on(async {
handle_version_hash_command(fedimint_build_code_version_env!());
TracingSetup::default().init()?;
let tg = TaskGroup::new();
tg.install_kill_handler();
let gatewayd = Gateway::new_with_default_modules().await?;
let shutdown_receiver = gatewayd.clone().run(tg, runtime.clone()).await?;
let shutdown_receiver = gatewayd.clone().run(runtime.clone()).await?;
shutdown_receiver.await;
gatewayd.unannounce_from_all_federations().await;
info!("Gatewayd exiting...");
Expand Down
28 changes: 17 additions & 11 deletions gateway/ln-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ pub struct Gateway {

/// The "module mode" of the gateway. Options are LNv1, LNv2, or All.
lightning_module_mode: LightningModuleMode,

/// The task group for all tasks related to the gateway.
task_group: TaskGroup,
}

impl std::fmt::Debug for Gateway {
Expand Down Expand Up @@ -365,6 +368,9 @@ impl Gateway {
let gateway_config =
Self::get_gateway_configuration(gateway_db.clone(), &gateway_parameters).await;

let task_group = TaskGroup::new();
task_group.install_kill_handler();

Ok(Self {
federation_manager: Arc::new(RwLock::new(FederationManager::new())),
lightning_builder,
Expand All @@ -376,6 +382,7 @@ impl Gateway {
versioned_api: gateway_parameters.versioned_api,
listen: gateway_parameters.listen,
lightning_module_mode: gateway_parameters.lightning_module_mode,
task_group,
})
}

Expand Down Expand Up @@ -418,27 +425,26 @@ impl Gateway {
/// to service requests.
pub async fn run(
self,
tg: TaskGroup,
runtime: Arc<tokio::runtime::Runtime>,
) -> anyhow::Result<TaskShutdownToken> {
self.register_clients_timer(&tg);
self.register_clients_timer();
self.load_clients().await?;
self.start_gateway(&tg, runtime);
self.start_gateway(runtime);
// start webserver last to avoid handling requests before fully initialized
let handle = tg.make_handle();
run_webserver(Arc::new(self), tg).await?;
let handle = self.task_group.make_handle();
run_webserver(Arc::new(self)).await?;
let shutdown_receiver = handle.make_shutdown_rx();
Ok(shutdown_receiver)
}

/// Begins the task for listening for intercepted payments from the
/// lightning node.
fn start_gateway(&self, task_group: &TaskGroup, runtime: Arc<tokio::runtime::Runtime>) {
fn start_gateway(&self, runtime: Arc<tokio::runtime::Runtime>) {
const PAYMENT_STREAM_RETRY_SECONDS: u64 = 5;

let self_copy = self.clone();
let tg = task_group.clone();
task_group.spawn(
let tg = self.task_group.clone();
self.task_group.spawn(
"Subscribe to intercepted lightning payments in stream",
|handle| async move {
// Repeatedly attempt to establish a connection to the lightning node and create a payment stream, re-trying if the connection is broken.
Expand Down Expand Up @@ -1860,14 +1866,14 @@ impl Gateway {
/// connected federations every 8.5 mins. Only registers the Gateway if it
/// has successfully connected to the Lightning node, so that it can
/// include route hints in the registration.
fn register_clients_timer(&self, task_group: &TaskGroup) {
fn register_clients_timer(&self) {
// Only spawn background registration thread if gateway is running LNv1
if self.is_running_lnv1() {
let lightning_module_mode = self.lightning_module_mode;
info!(?lightning_module_mode, "Spawning register task...");
let gateway = self.clone();
let register_task_group = task_group.make_subgroup();
task_group.spawn_cancellable("register clients", async move {
let register_task_group = self.task_group.make_subgroup();
self.task_group.spawn_cancellable("register clients", async move {
loop {
let gateway_config = gateway.clone_gateway_config().await;
if let Some(gateway_config) = gateway_config {
Expand Down
3 changes: 2 additions & 1 deletion gateway/ln-gateway/src/rpc/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ use crate::rpc::ConfigPayload;
use crate::Gateway;

/// Creates the webserver's routes and spawns the webserver in a separate task.
pub async fn run_webserver(gateway: Arc<Gateway>, task_group: TaskGroup) -> anyhow::Result<()> {
pub async fn run_webserver(gateway: Arc<Gateway>) -> anyhow::Result<()> {
let task_group = gateway.task_group.clone();
let v1_routes = v1_routes(gateway.clone(), task_group.clone());
let api_v1 = Router::new()
.nest(&format!("/{V1_API_ENDPOINT}"), v1_routes.clone())
Expand Down

0 comments on commit 56462da

Please sign in to comment.