From 1698c8d42f52aad15d664a4f099c83433cc9617b Mon Sep 17 00:00:00 2001 From: Kostas Kolivas Date: Wed, 20 Sep 2023 16:40:36 +0300 Subject: [PATCH] Spawn feeder threads on process startup Ref #6 --- src/main.rs | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1d93429..80f9c5a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -59,6 +59,7 @@ mod processing; mod database; mod entities; mod logger; +mod feeder; use std::process; @@ -70,7 +71,7 @@ fn main() { let cli: Cli = Cli::parse_args(); if let Err(e) = logger::init() { - error!("Could not initialize logger: {}", e); + error!("Could not initialize logging: {}", e); process::exit(1); } @@ -82,12 +83,9 @@ fn main() { } }; - let connection = match DbConnection::connect( - cfg.db().user(), - cfg.db().passwd(), - cfg.db().db_name(), - cfg.db().host(), - cfg.db().port()) { + + let connection = match DbConnection::connect(cfg.db().user(), cfg.db().passwd(), + cfg.db().db_name(), cfg.db().host(), cfg.db().port()) { Ok(c) => c, Err(e) => { error!("Could not connect to database: {}", e); @@ -105,6 +103,13 @@ fn main() { let (feed_sendr, feed_recvr) = crossbeam_channel::unbounded(); let (load_sendr, load_recvr) = crossbeam_channel::unbounded(); + let f_handles = feeder::start_feeders( + &feed_sendr, + cfg.redis().host(), + cfg.redis().port(), + cfg.workers().num_feeders() + ); + let p_handles = processing::start_processors( &feed_recvr, &load_sendr, @@ -114,6 +119,11 @@ fn main() { let l_handles = database::start_loaders(&load_recvr, db_loader, cfg.workers().num_loaders()); + // Feeders are the first threads to finish in the event of a graceful shutdown + for handle in f_handles { + handle.join().unwrap(); + } + // Dropping the sender will gracefully close the receiver's end as well // and as such make all processor threads return drop(feed_sendr); @@ -124,18 +134,16 @@ fn main() { // to the loader through the load channel for handle in p_handles { if let Ok(res) = handle.join() { - match res { - Ok(s) => println!("{}", s), - Err(e) => println!("Error in processor: {}", e) + if let Err(e) = res { + error!("Error in processor: {}", e) } } - println!("Joined processor"); } drop(load_sendr); for handle in l_handles { + // We don't really care how loader threads exited handle.join().unwrap(); - println!("Joined loader"); } }