Skip to content

Commit

Permalink
Spawn feeder threads on process startup
Browse files Browse the repository at this point in the history
Ref #6
  • Loading branch information
kostaskol committed Sep 23, 2023
1 parent 234027e commit 1698c8d
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ mod processing;
mod database;
mod entities;
mod logger;
mod feeder;

use std::process;

Expand All @@ -70,7 +71,7 @@ fn main() {
let cli: Cli = Cli::parse_args();

if let Err(e) = logger::init() {
error!("Could not initialize logger: {}", e);
error!("Could not initialize logging: {}", e);
process::exit(1);
}

Expand All @@ -82,12 +83,9 @@ fn main() {
}
};

let connection = match DbConnection::connect(
cfg.db().user(),
cfg.db().passwd(),
cfg.db().db_name(),
cfg.db().host(),
cfg.db().port()) {

let connection = match DbConnection::connect(cfg.db().user(), cfg.db().passwd(),
cfg.db().db_name(), cfg.db().host(), cfg.db().port()) {
Ok(c) => c,
Err(e) => {
error!("Could not connect to database: {}", e);
Expand All @@ -105,6 +103,13 @@ fn main() {
let (feed_sendr, feed_recvr) = crossbeam_channel::unbounded();
let (load_sendr, load_recvr) = crossbeam_channel::unbounded();

let f_handles = feeder::start_feeders(
&feed_sendr,
cfg.redis().host(),
cfg.redis().port(),
cfg.workers().num_feeders()
);

let p_handles = processing::start_processors(
&feed_recvr,
&load_sendr,
Expand All @@ -114,6 +119,11 @@ fn main() {

let l_handles = database::start_loaders(&load_recvr, db_loader, cfg.workers().num_loaders());

// Feeders are the first threads to finish in the event of a graceful shutdown
for handle in f_handles {
handle.join().unwrap();
}

// Dropping the sender will gracefully close the receiver's end as well
// and as such make all processor threads return
drop(feed_sendr);
Expand All @@ -124,18 +134,16 @@ fn main() {
// to the loader through the load channel
for handle in p_handles {
if let Ok(res) = handle.join() {
match res {
Ok(s) => println!("{}", s),
Err(e) => println!("Error in processor: {}", e)
if let Err(e) = res {
error!("Error in processor: {}", e)
}
}
println!("Joined processor");
}

drop(load_sendr);

for handle in l_handles {
// We don't really care how loader threads exited
handle.join().unwrap();
println!("Joined loader");
}
}

0 comments on commit 1698c8d

Please sign in to comment.