Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MRG: Add graph-based clustering #234

Merged
merged 36 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c57550f
compiling cmds
bluegenes Feb 22, 2024
9ebef77
working test
bluegenes Feb 22, 2024
c69a6f0
rm rust tests in favor of python tests
bluegenes Feb 22, 2024
5628845
rm comment
bluegenes Feb 22, 2024
9eb62ae
add ani to pairwise and multisearch
bluegenes Feb 23, 2024
23b35be
add ani testing for multisearch
bluegenes Feb 23, 2024
99b02a2
add ani to test csv
bluegenes Feb 23, 2024
5efa56d
Merge branch 'main' into add-cluster
bluegenes Feb 23, 2024
53dc312
upd tests; also output cluster size histogram
bluegenes Feb 23, 2024
6929f50
add tests for max ani, max contain
bluegenes Feb 24, 2024
4424f41
keep singletons!
bluegenes Feb 24, 2024
b9926d1
multithreaded read records
bluegenes Feb 24, 2024
c96c15a
upd ani
bluegenes Feb 24, 2024
e880a6b
test bad,empty input
bluegenes Feb 24, 2024
9eab7ea
rustfmt, clippy
bluegenes Feb 24, 2024
1b64a53
fix for percent ani
bluegenes Feb 24, 2024
98cbc27
make ani default
bluegenes Feb 24, 2024
ba2084d
actually, keep read sequential for now
bluegenes Feb 24, 2024
3a80062
replace ani calc with split br
bluegenes Feb 26, 2024
170b901
Merge branch 'main' into add-cluster
bluegenes Feb 26, 2024
1c8bd63
adjust+test for optional ANI; use cANI terminology
bluegenes Feb 26, 2024
49c249c
print underlying errors from graph building
bluegenes Feb 26, 2024
e497cdf
Merge branch 'main' into add-cluster
bluegenes Feb 26, 2024
c3c032e
ignore self-matches reported via multisearch
bluegenes Feb 26, 2024
d286fef
rustfmt
bluegenes Feb 26, 2024
c3874fe
upd column names
bluegenes Feb 26, 2024
96c473e
Merge branch 'main' into add-cluster
bluegenes Feb 26, 2024
15b7b4d
harmonize names with sourmash
bluegenes Feb 26, 2024
44e437a
harmonize ANI with sourmash (fraction, not percent)
bluegenes Feb 26, 2024
3fa24da
fix misc argparse things (#245)
ctb Feb 27, 2024
a5df3cd
make sizes optional; test help and defaults
bluegenes Feb 27, 2024
3b4e0e8
add multisearch to help
bluegenes Feb 27, 2024
894237b
add option to write all results
bluegenes Feb 27, 2024
2dea5ab
Merge branch 'main' into add-cluster
bluegenes Feb 27, 2024
f79a35b
fix pw --write-all for no ani
bluegenes Feb 27, 2024
af88b59
fix help for similarity, not distance
bluegenes Feb 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 180 additions & 34 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tempfile = "3.10"
needletail = "0.5.1"
csv = "1.3.0"
camino = "1.1.6"
rustworkx-core = "0.14.0"

[dev-dependencies]
assert_cmd = "2.0.14"
Expand Down
31 changes: 30 additions & 1 deletion doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
| `manysearch` | Multithreaded containment search for many queries in many large metagenomes | [link](#Running-manysearch)
| `multisearch` | Multithreaded comparison of multiple sketches, in memory | [link](#Running-multisearch)
| `pairwise` | Multithreaded pairwise comparison of multiple sketches, in memory | [link](#Running-multisearch)
| `cluster` | cluster sequences based on similarity data from `pairwise` or `multisearch` | [link](#Running-cluster)

This repository implements multithreaded plugins for [sourmash](https://sourmash.readthedocs.io/) that provide very fast implementations of `sketch`, `search`, and `gather`. These commands are typically hundreds to thousands of times faster, and 10-50x lower memory, than the current sourmash code. For example, a `gather` of SRR606249 with sourmash v4.8.6 against GTDB rs214 takes 40 minutes and 14 GB of RAM, while `fastgather` with 64 cores takes only 2 minutes and 2 GB of RAM.

Expand All @@ -26,7 +27,8 @@ sourmash supports a variety of different storage formats for sketches (see [sour
| `fastmultigather` | Multiple metagenomes in sig, zip, manifest CSV, or fromfile | Zip, manifest CSV, fromfile, or rocksdb index |
| `manysearch` | Multiple genomes in sig, zip, manifest CSV, or fromfile | Zip, manifest CSV, fromfile, or rocksdb index |
| `multisearch` | Multiple sketches in sig, zip, manifest CSV, or fromfile | Multiple sketches in sig, zip, manifest CSV, or fromfile |
| `pairwise` | Multiple sketches in sig, zip, manifest CSV, or fromfile | N/A
| `pairwise` | Multiple sketches in sig, zip, manifest CSV, or fromfile | N/A |
| `cluster`| Output from `pairwise` or `multisearch`| N/A |

### Using zipfiles

Expand Down Expand Up @@ -223,6 +225,31 @@ We suggest using a manifest CSV for the metagenome collection.

The results file here, `query.x.gtdb-reps.csv`, will have 8 columns: `query` and `query_md5`, `match` and `match_md5`, and `containment`, `jaccard`, `max_containment`, and `intersect_hashes`.


### Running `cluster`

The `cluster` command conducts graph-based clustering via the sequence similarity measures in `pairwise` or `multisearch` outputs. It is a new command and we are exploring its utility.

`cluster` takes the csv output of `pairwise` or `multisearch` input, and outputs two CSVs:

1. `-o`, `--output` will contain the names of the clusters and the `ident` of each sequence included in the cluster (e.g. `Component_1, name1;name2`)

```
cluster,nodes
Component_1,name1;name2;name3
Component_2,name4
```

2. `--cluster-sizes` will contain information on cluster size, with a counts for the number of clusters of that size. For the two clusters above, the counts would look like this:

```
cluster_size,count
3,1
1,1
```

`cluster` takes a `--similarity_column` argument to specify which of the similarity columns, with the following choices: `containment`, `max_containment`, `jaccard`, `average_containment_ani`, `maximum_containment_ani`. All values should be input as fractions (e.g. 0.9 for 90%)

## Notes on concurrency and efficiency

Each command does things slightly differently, with implications for CPU and disk load. You can measure threading efficiency with `/usr/bin/time -v` on Linux systems, and disk load by number of complaints received when running.
Expand All @@ -239,6 +266,8 @@ Like `multisearch` and `pairwise`, `fastgather` loads everything at the beginnin

`fastmultigather` loads the entire database once, and then loads one query from disk per thread. The compute-per-query can be significant, though, so multithreading efficiency here is less dependent on I/O and the disk is less likely to be saturated with many threads. We suggest limiting threads to between 32 and 64 to decrease shared disk load.

`cluster` loads the entire file multithreaded, and then populates the graph sequentially.

## Appendix 1 - `index` to create a low-memory index

The command `sourmash scripts index` makes an on-disk inverted index
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ index = "sourmash_plugin_branchwater:Branchwater_Index"
check = "sourmash_plugin_branchwater:Branchwater_Check"
manysketch = "sourmash_plugin_branchwater:Branchwater_Manysketch"
pairwise = "sourmash_plugin_branchwater:Branchwater_Pairwise"
cluster = "sourmash_plugin_branchwater:Branchwater_Cluster"

[project.optional-dependencies]
test = [
Expand Down
149 changes: 149 additions & 0 deletions src/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use anyhow::{Context, Result};
use rustworkx_core::connectivity::connected_components;
use rustworkx_core::petgraph::graph::{NodeIndex, UnGraph};
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;

use crate::utils::MultiSearchResult;

// potential todo:
// - eval DiGraph for directed similarity info (e.g. input containment_A, containment_B independently)
// - explore if collect-first, add edges second style parallelization is worthwhile

fn build_graph(
file_path: &str,
similarity_measure: &str,
similarity_threshold: f64,
) -> Result<(UnGraph<String, f64>, HashMap<String, NodeIndex>)> {
let mut reader = csv::Reader::from_path(file_path).context("Failed to open CSV file")?;
let mut name_to_node: HashMap<String, NodeIndex> = HashMap::new();
let mut graph = UnGraph::<String, f64>::new_undirected();

for result in reader.deserialize::<MultiSearchResult>() {
let record = result.map_err(|e| anyhow::anyhow!("Error deserializing record: {}", e))?;

// ignore self-matches reported via multisearch
if record.query_name == record.match_name {
continue;
}

let similarity = match similarity_measure {
"containment" => record.containment,
"max_containment" => record.max_containment,
"jaccard" => record.jaccard,
"average_containment_ani" => match record.average_containment_ani {
Some(value) => value,
None => {
return Err(anyhow::anyhow!(
"average_containment_ani is None. Did you estimate ANI?"
))
}
},
"max_containment_ani" => match record.max_containment_ani {
Some(value) => value,
None => {
return Err(anyhow::anyhow!(
"max_containment_ani is None. Did you estimate ANI?"
))
}
},
_ => {
return Err(anyhow::anyhow!(
"Invalid similarity measure: {}",
similarity_measure
))
} // should not happen
};

let node1 = *name_to_node
.entry(record.query_name.clone())
.or_insert_with(|| graph.add_node(record.query_name.clone()));
let node2 = *name_to_node
.entry(record.match_name.clone())
.or_insert_with(|| graph.add_node(record.match_name.clone()));

if similarity >= similarity_threshold {
graph.add_edge(node1, node2, similarity);
}
}

if graph.node_count() == 0 {
bail!("No nodes added to graph.")
}

if graph.edge_count() == 0 {
bail!("Graph has nodes but no edges were added.");
}

Ok((graph, name_to_node))
}

pub fn cluster(
pairwise_csv: String,
output_clusters: String,
similarity_column: String,
similarity_threshold: f64,
cluster_sizes: Option<String>,
) -> Result<()> {
let (graph, name_to_node) =
match build_graph(&pairwise_csv, &similarity_column, similarity_threshold) {
Ok(result) => result,
Err(e) => {
eprintln!("Error: {:?}", e); // print the underlying error.
bail!("Failed to build graph.");
}
};
let components = connected_components(&graph);

// HashMap to count cluster sizes
let mut size_counts: HashMap<usize, usize> = HashMap::new();

// Open file for components + names
let mut file = File::create(output_clusters).context("Failed to create output file")?;

// write header
writeln!(file, "cluster,nodes").context("Failed to write header to output file")?;
// for each component, find corresponding node names + write to file
for (i, component) in components.iter().enumerate() {
let component_name = format!("Component_{}", i + 1);
let node_names: Vec<String> = component
.iter()
.filter_map(|node_id| {
name_to_node.iter().find_map(|(name, &id)| {
if id == *node_id {
let ident: &str = name.split(' ').next().unwrap();
Some(ident.to_owned())
} else {
None
}
})
})
.collect();

let node_names_str = node_names.join(";");

writeln!(file, "{},{}", component_name, node_names_str).context(format!(
"Failed to write component {} to output file",
i + 1
))?;

// add cluster to aggregated counts
let count = size_counts.entry(component.len()).or_insert(0);
*count += 1;
}

// write the sizes and counts
if let Some(sizes_file) = cluster_sizes {
let mut cluster_size_file =
File::create(sizes_file).context("Failed to create cluster size file")?;
writeln!(cluster_size_file, "cluster_size,count")
.context("Failed to write header to cluster size file")?;
for (size, count) in size_counts {
writeln!(cluster_size_file, "{},{}", size, count)
.context("Failed to write size count to cluster size file")?;
}
}

Ok(())
}
27 changes: 27 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod utils;
use crate::utils::build_selection;
use crate::utils::is_revindex_database;
mod check;
mod cluster;
mod fastgather;
mod fastmultigather;
mod index;
Expand Down Expand Up @@ -238,6 +239,7 @@ fn do_pairwise(
scaled: usize,
moltype: String,
estimate_ani: bool,
write_all: bool,
output_path: Option<String>,
) -> anyhow::Result<u8> {
let selection = build_selection(ksize, scaled, &moltype);
Expand All @@ -248,6 +250,7 @@ fn do_pairwise(
&selection,
allow_failed_sigpaths,
estimate_ani,
write_all,
output_path,
) {
Ok(_) => Ok(0),
Expand All @@ -274,6 +277,29 @@ fn do_manysketch(
}
}

#[pyfunction]
fn do_cluster(
pairwise_csv: String,
output_clusters: String,
similarity_column: String,
similarity_threshold: f64,
cluster_sizes: Option<String>,
) -> anyhow::Result<u8> {
match cluster::cluster(
pairwise_csv,
output_clusters,
similarity_column,
similarity_threshold,
cluster_sizes,
) {
Ok(_) => Ok(0),
Err(e) => {
eprintln!("Error: {e}");
Ok(1)
}
}
}

#[pymodule]
fn sourmash_plugin_branchwater(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(do_manysearch, m)?)?;
Expand All @@ -285,5 +311,6 @@ fn sourmash_plugin_branchwater(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(set_global_thread_pool, m)?)?;
m.add_function(wrap_pyfunction!(do_multisearch, m)?)?;
m.add_function(wrap_pyfunction!(do_pairwise, m)?)?;
m.add_function(wrap_pyfunction!(do_cluster, m)?)?;
Ok(())
}
16 changes: 8 additions & 8 deletions src/multisearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,17 @@ pub fn multisearch(
let max_containment =
containment_query_in_target.max(containment_target_in_query);
let jaccard = overlap / (target_size + query_size - overlap);
let mut query_ani = None;
let mut match_ani = None;
let mut query_containment_ani = None;
let mut match_containment_ani = None;
let mut average_containment_ani = None;
let mut max_containment_ani = None;

// estimate ANI values
if estimate_ani {
let qani = ani_from_containment(containment_query_in_target, ksize) * 100.0;
let mani = ani_from_containment(containment_target_in_query, ksize) * 100.0;
query_ani = Some(qani);
match_ani = Some(mani);
let qani = ani_from_containment(containment_query_in_target, ksize);
let mani = ani_from_containment(containment_target_in_query, ksize);
query_containment_ani = Some(qani);
match_containment_ani = Some(mani);
average_containment_ani = Some((qani + mani) / 2.);
max_containment_ani = Some(f64::max(qani, mani));
}
Expand All @@ -107,8 +107,8 @@ pub fn multisearch(
max_containment,
jaccard,
intersect_hashes: overlap,
query_ani,
match_ani,
query_containment_ani,
match_containment_ani,
average_containment_ani,
max_containment_ani,
})
Expand Down
36 changes: 28 additions & 8 deletions src/pairwise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub fn pairwise(
selection: &Selection,
allow_failed_sigpaths: bool,
estimate_ani: bool,
write_all: bool,
output: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
// Load all sigs into memory at once.
Expand Down Expand Up @@ -54,6 +55,7 @@ pub fn pairwise(
let ksize = selection.ksize().unwrap() as f64;

sketches.par_iter().enumerate().for_each(|(idx, query)| {
let mut has_written_comparison = false;
for against in sketches.iter().skip(idx + 1) {
let overlap = query.minhash.count_common(&against.minhash, false).unwrap() as f64;
let query1_size = query.minhash.size() as f64;
Expand All @@ -63,19 +65,20 @@ pub fn pairwise(
let containment_q2_in_q1 = overlap / query2_size;

if containment_q1_in_q2 > threshold || containment_q2_in_q1 > threshold {
has_written_comparison = true;
let max_containment = containment_q1_in_q2.max(containment_q2_in_q1);
let jaccard = overlap / (query1_size + query2_size - overlap);
let mut query_ani = None;
let mut match_ani = None;
let mut query_containment_ani = None;
let mut match_containment_ani = None;
let mut average_containment_ani = None;
let mut max_containment_ani = None;

// estimate ANI values
if estimate_ani {
let qani = ani_from_containment(containment_q1_in_q2, ksize) * 100.0;
let mani = ani_from_containment(containment_q2_in_q1, ksize) * 100.0;
query_ani = Some(qani);
match_ani = Some(mani);
let qani = ani_from_containment(containment_q1_in_q2, ksize);
let mani = ani_from_containment(containment_q2_in_q1, ksize);
query_containment_ani = Some(qani);
match_containment_ani = Some(mani);
average_containment_ani = Some((qani + mani) / 2.);
max_containment_ani = Some(f64::max(qani, mani));
}
Expand All @@ -88,8 +91,8 @@ pub fn pairwise(
max_containment,
jaccard,
intersect_hashes: overlap,
query_ani,
match_ani,
query_containment_ani,
match_containment_ani,
average_containment_ani,
max_containment_ani,
})
Expand All @@ -101,6 +104,23 @@ pub fn pairwise(
eprintln!("Processed {} comparisons", i);
}
}
if write_all & !has_written_comparison {
send.send(MultiSearchResult {
query_name: query.name.clone(),
query_md5: query.md5sum.clone(),
match_name: query.name.clone(),
match_md5: query.md5sum.clone(),
containment: 1.0,
max_containment: 1.0,
jaccard: 1.0,
intersect_hashes: query.minhash.size() as f64,
query_containment_ani: Some(1.0),
match_containment_ani: Some(1.0),
average_containment_ani: Some(1.0),
max_containment_ani: Some(1.0),
})
.unwrap();
}
});

// do some cleanup and error handling -
Expand Down
Loading
Loading