Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add Ctrl-C interrupt #253

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add ctrl-c for multisearch
bluegenes committed Feb 28, 2024
commit 74a1b109e40411059b3cc2d557b0dff86af8b3a0
35 changes: 24 additions & 11 deletions src/multisearch.rs
Original file line number Diff line number Diff line change
@@ -4,10 +4,12 @@
use sourmash::selection::Selection;
use sourmash::signature::SigsTrait;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::Mutex;

use crate::utils::{
csvwriter_thread, load_collection, load_sketches, MultiSearchResult, ReportType,
csvwriter_thread, load_collection, load_sketches, MultiSearchResult, ReportType, ThreadManager,
};
use sourmash::ani_utils::ani_from_containment;

@@ -51,6 +53,11 @@
// // & spawn a thread that is dedicated to printing to a buffered output
let thrd = csvwriter_thread(recv, output);

// set up manager to allow for ctrl-c handling
let manager = ThreadManager::new(send, thrd);

// Wrap ThreadManager in Arc<Mutex> for safe sharing across threads
let manager_shared = Arc::new(Mutex::new(manager));
//
// Main loop: iterate (in parallel) over all search signature paths,
// loading them individually and searching them. Stuff results into
@@ -60,12 +67,21 @@
let processed_cmp = AtomicUsize::new(0);
let ksize = selection.ksize().unwrap() as f64;

let send = against
against
.par_iter()
.filter_map(|against| {
// let manager_clone = manager_shared.clone();
let mut results = vec![];
// search for matches & save containment.
for query in queries.iter() {
if manager_shared
.lock()
.unwrap()
.interrupted
.load(Ordering::SeqCst)
{
return None; // Early exit if interrupted
}
let i = processed_cmp.fetch_add(1, atomic::Ordering::SeqCst);
if i % 100000 == 0 {
eprintln!("Processed {} comparisons", i);
@@ -121,16 +137,13 @@
}
})
.flatten()
.try_for_each_with(send, |s, m| s.send(m));
// .try_for_each_with(send, |s, m| s.send(m));
.try_for_each_with(qmanager_shared.clone(), |manager, result| {

Check failure on line 141 in src/multisearch.rs

GitHub Actions / mamba

cannot find value `qmanager_shared` in this scope
manager.lock().unwrap().send(result)
})?;

// do some cleanup and error handling -
if let Err(e) = send {
eprintln!("Unable to send internal data: {:?}", e);
}

if let Err(e) = thrd.join() {
eprintln!("Unable to join internal thread: {:?}", e);
}
manager_shared.lock().unwrap().perform_cleanup();

// done!
let i: usize = processed_cmp.fetch_max(0, atomic::Ordering::SeqCst);
64 changes: 36 additions & 28 deletions src/pairwise.rs
Original file line number Diff line number Diff line change
@@ -100,20 +100,24 @@ pub fn pairwise(
average_containment_ani = Some((qani + mani) / 2.);
max_containment_ani = Some(f64::max(qani, mani));
}
manager_clone.lock().unwrap().send(MultiSearchResult {
query_name: query.name.clone(),
query_md5: query.md5sum.clone(),
match_name: against.name.clone(),
match_md5: against.md5sum.clone(),
containment: containment_q1_in_q2,
max_containment,
jaccard,
intersect_hashes: overlap,
query_containment_ani,
match_containment_ani,
average_containment_ani,
max_containment_ani,
})
manager_clone
.lock()
.unwrap()
.send(MultiSearchResult {
query_name: query.name.clone(),
query_md5: query.md5sum.clone(),
match_name: against.name.clone(),
match_md5: against.md5sum.clone(),
containment: containment_q1_in_q2,
max_containment,
jaccard,
intersect_hashes: overlap,
query_containment_ani,
match_containment_ani,
average_containment_ani,
max_containment_ani,
})
.unwrap()
}

let i = processed_cmp.fetch_add(1, atomic::Ordering::SeqCst);
@@ -134,20 +138,24 @@ pub fn pairwise(
max_containment_ani = Some(1.0);
}

manager_clone.lock().unwrap().send(MultiSearchResult {
query_name: query.name.clone(),
query_md5: query.md5sum.clone(),
match_name: query.name.clone(),
match_md5: query.md5sum.clone(),
containment: 1.0,
max_containment: 1.0,
jaccard: 1.0,
intersect_hashes: query.minhash.size() as f64,
query_containment_ani,
match_containment_ani,
average_containment_ani,
max_containment_ani,
})
manager_clone
.lock()
.unwrap()
.send(MultiSearchResult {
query_name: query.name.clone(),
query_md5: query.md5sum.clone(),
match_name: query.name.clone(),
match_md5: query.md5sum.clone(),
containment: 1.0,
max_containment: 1.0,
jaccard: 1.0,
intersect_hashes: query.minhash.size() as f64,
query_containment_ani,
match_containment_ani,
average_containment_ani,
max_containment_ani,
})
.unwrap()
}
});

9 changes: 4 additions & 5 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ use std::io::{BufRead, BufReader, BufWriter, Write};
use std::panic;
use std::sync::atomic;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::mpsc::SyncSender;
use std::sync::mpsc::{SendError, SyncSender};
use std::sync::Arc;

use sourmash::collection::Collection;
@@ -54,12 +54,11 @@ impl<T: Serialize + Send + 'static> ThreadManager<T> {
}
}

pub fn send(&self, result: T) {
pub fn send(&self, result: T) -> Result<(), SendError<T>> {
if let Some(ref sender) = self.sender {
sender.send(result).expect("Failed to send data");
sender.send(result)
} else {
// Handle the case where sender is None. This could be a no-op or log an error.
eprintln!("Attempted to send data, but sender is closed.");
Err(SendError(result)) // send custom error instead?
}
}