From f9c13d47494a4e86e8be5c2633595dc571503fab Mon Sep 17 00:00:00 2001 From: "J. Sebastian Paez" Date: Fri, 20 Sep 2024 12:56:20 -0700 Subject: [PATCH] feat(aggregators)!: Added and refactored --- benches/benchmark_indices.rs | 11 +- src/main.rs | 275 ++++++----- src/models/aggregators/mod.rs | 4 + src/models/aggregators/raw_peak_agg.rs | 22 - .../raw_peak_agg/chromatogram_agg.rs | 161 +++++++ src/models/aggregators/raw_peak_agg/mod.rs | 7 + .../aggregators/raw_peak_agg/point_agg.rs | 89 ++++ .../aggregators/streaming_aggregator.rs | 181 +++++++ src/models/frames/mod.rs | 2 +- src/models/frames/raw_frames.rs | 34 +- src/models/frames/raw_peak.rs | 5 +- src/models/indices/raw_file_index.rs | 7 +- src/models/indices/transposed_quad_index.rs | 452 ++++++++++-------- src/models/queries.rs | 2 + .../queriable_tims_data.rs | 16 + src/traits/aggregator.rs | 2 +- src/traits/tolerance.rs | 16 +- 17 files changed, 935 insertions(+), 351 deletions(-) delete mode 100644 src/models/aggregators/raw_peak_agg.rs create mode 100644 src/models/aggregators/raw_peak_agg/chromatogram_agg.rs create mode 100644 src/models/aggregators/raw_peak_agg/mod.rs create mode 100644 src/models/aggregators/raw_peak_agg/point_agg.rs create mode 100644 src/models/aggregators/streaming_aggregator.rs diff --git a/benches/benchmark_indices.rs b/benches/benchmark_indices.rs index 66465aa..67c2169 100644 --- a/benches/benchmark_indices.rs +++ b/benches/benchmark_indices.rs @@ -54,6 +54,7 @@ fn build_elution_groups(raw_file_path: String) -> Vec { let mut rng = ChaCha8Rng::seed_from_u64(43u64); for i in 1..NUM_ELUTION_GROUPS { + // Rand f32/64 are number from 0-1 let rt = rng.gen::() * MAX_RT; let mobility = rng.gen::() * (MAX_MOBILITY - MIN_MOBILITY) + MIN_MOBILITY; let mz = rng.gen::() * (MAX_MZ - MIN_MZ) + MIN_MZ; @@ -69,7 +70,9 @@ fn build_elution_groups(raw_file_path: String) -> Vec { fragment_charges.push(fragment_charge); } - let precursor_charge = rng.gen::() * 3 + 1; + // rand u8 is number from 0-255 ... which is not amazing for us ... + // let precursor_charge = rng.gen::() * 3 + 1; + let precursor_charge = 2; out_egs.push(ElutionGroup { id: i as u64, @@ -125,11 +128,10 @@ macro_rules! add_bench_random { ) }, |(index, query_groups, tolerance)| { - let aggregator_factory = |_id| RawPeakIntensityAggregator { intensity: 0 }; let local_lambda = |elution_group: &ElutionGroup| { query_indexed( &index, - &aggregator_factory, + &RawPeakIntensityAggregator::new, &index, &tolerance, &elution_group, @@ -157,13 +159,12 @@ macro_rules! add_bench_optim { ) }, |(index, query_groups, tolerance)| { - let aggregator_factory = |_id| RawPeakIntensityAggregator { intensity: 0 }; let foo = query_multi_group( &index, &index, &tolerance, &query_groups, - &aggregator_factory, + &RawPeakIntensityAggregator::new, ); black_box((|_foo| false)(foo)); }, diff --git a/src/main.rs b/src/main.rs index fd5c215..10c02a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,27 +3,63 @@ use timsquery::queriable_tims_data::queriable_tims_data::query_multi_group; use timsquery::traits::tolerance::DefaultTolerance; use timsquery::Aggregator; use timsquery::{ - models::aggregators::RawPeakIntensityAggregator, models::indices::raw_file_index::RawFileIndex, + models::aggregators::{ + ChromatomobilogramStats, ExtractedIonChromatomobilogram, RawPeakIntensityAggregator, + RawPeakVectorAggregator, + }, + models::indices::raw_file_index::RawFileIndex, models::indices::transposed_quad_index::QuadSplittedTransposedIndex, }; use timsquery::traits::tolerance::{MobilityTolerance, MzToleramce, QuadTolerance, RtTolerance}; use clap::{Parser, Subcommand}; +use log::{debug, info}; use serde::{Deserialize, Serialize}; -// Read json with tolerance settings -// Read json with elution groups -// Load index -// Query index -// Serialize results +fn main() { + env_logger::init(); + let args = Args::parse(); + + match args.command { + Some(Commands::QueryIndex(args)) => main_query_index(args), + Some(Commands::WriteTemplate(args)) => main_write_template(args), + None => { + println!("No command provided"); + } + } +} + +fn main_write_template(args: WriteTemplateArgs) { + let output_path = args.output_path; + let egs = template_elution_groups(10); + let tolerance = template_tolerance_settings(); + + // Serialize both and write as files in the output path. + // Do pretty serialization. + let egs_json = serde_json::to_string_pretty(&egs).unwrap(); + let tolerance_json = serde_json::to_string_pretty(&tolerance).unwrap(); + + let put_path = std::path::Path::new(&output_path); + std::fs::create_dir_all(put_path).unwrap(); + println!("Writing to {}", put_path.display()); + let egs_json_path = put_path.join("elution_groups.json"); + let tolerance_json_path = put_path.join("tolerance_settings.json"); + std::fs::write(egs_json_path.clone(), egs_json).unwrap(); + std::fs::write(tolerance_json_path.clone(), tolerance_json).unwrap(); + println!( + "use as `timsquery query-index --pretty --output-path '.' --raw-file-path 'your_file.d' --tolerance-settings-path {:#?} --elution-groups-path {:#?}`", + tolerance_json_path, + egs_json_path, + ); +} fn template_elution_groups(num: usize) -> Vec { let mut egs = Vec::with_capacity(num); for i in 1..num { let rt = 300.0 + (i as f32 * 10.0); let mobility = 1.0 + (i as f32 * 0.01); - let mz = 1000.0 + (i as f64 * 10.0); + let mz = 500.0 + (i as f64 * 10.0); let precursor_charge = 2; let fragment_mzs = Some(vec![mz]); let fragment_charges = Some(vec![precursor_charge]); @@ -40,6 +76,39 @@ fn template_elution_groups(num: usize) -> Vec { egs } +fn main_query_index(args: QueryIndexArgs) { + let args_clone = args.clone(); + + let raw_file_path = args.raw_file_path; + let tolerance_settings_path = args.tolerance_settings_path; + let elution_groups_path = args.elution_groups_path; + let index_use = args.index; + let aggregator_use = args.aggregator; + + let tolerance_settings: DefaultTolerance = + serde_json::from_str(&std::fs::read_to_string(&tolerance_settings_path).unwrap()).unwrap(); + let elution_groups: Vec = + serde_json::from_str(&std::fs::read_to_string(&elution_groups_path).unwrap()).unwrap(); + + let index_use = match (index_use, elution_groups.len() > 10) { + (PossibleIndex::RawFileIndex, true) => PossibleIndex::RawFileIndex, + (PossibleIndex::TransposedQuadIndex, true) => PossibleIndex::TransposedQuadIndex, + (PossibleIndex::RawFileIndex, false) => PossibleIndex::RawFileIndex, + (PossibleIndex::TransposedQuadIndex, false) => PossibleIndex::TransposedQuadIndex, + (PossibleIndex::Unspecified, true) => PossibleIndex::TransposedQuadIndex, + (PossibleIndex::Unspecified, false) => PossibleIndex::RawFileIndex, + }; + + execute_query( + index_use, + aggregator_use, + raw_file_path, + tolerance_settings, + elution_groups, + args_clone, + ); +} + fn template_tolerance_settings() -> DefaultTolerance { DefaultTolerance { ms: MzToleramce::Ppm((20.0, 20.0)), @@ -57,21 +126,24 @@ struct Args { } #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)] -enum PossibleAggregator { +pub enum PossibleAggregator { #[default] RawPeakIntensityAggregator, + RawPeakVectorAggregator, + ExtractedIonChromatomobilogram, + ChromatoMobilogramStat, } #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)] -enum PossibleIndex { +pub enum PossibleIndex { #[default] Unspecified, RawFileIndex, TransposedQuadIndex, } -#[derive(Parser, Debug)] -struct QueryIndexArgs { +#[derive(Parser, Debug, Clone)] +pub struct QueryIndexArgs { /// The path to the raw file to query. #[arg(short, long)] raw_file_path: String, @@ -116,112 +188,99 @@ enum Commands { } #[derive(Debug, Serialize, Deserialize)] -struct ElutionGroupResults { +struct ElutionGroupResults { elution_group: ElutionGroup, - result: u64, + result: T, } -fn main() { - env_logger::init(); - let args = Args::parse(); - - match args.command { - Some(Commands::QueryIndex(args)) => main_query_index(args), - Some(Commands::WriteTemplate(args)) => main_write_template(args), - None => { - println!("No command provided"); - } - } -} - -fn main_write_template(args: WriteTemplateArgs) { +pub fn execute_query( + index: PossibleIndex, + aggregator: PossibleAggregator, + raw_file_path: String, + tolerance: DefaultTolerance, + elution_groups: Vec, + args: QueryIndexArgs, +) { let output_path = args.output_path; - let egs = template_elution_groups(10); - let tolerance = template_tolerance_settings(); + let pretty_print = args.pretty; - // Serialize both and write as files in the output path. - // Do pretty serialization. - let egs_json = serde_json::to_string_pretty(&egs).unwrap(); - let tolerance_json = serde_json::to_string_pretty(&tolerance).unwrap(); + macro_rules! execute_query_inner { + ($index:expr, $agg:expr) => { + let tmp = query_multi_group(&$index, &$index, &tolerance, &elution_groups, &$agg); + // debug!("{:?}", tmp); - let put_path = std::path::Path::new(&output_path); - std::fs::create_dir_all(put_path).unwrap(); - println!("Writing to {}", put_path.display()); - let egs_json_path = put_path.join("elution_groups.json"); - let tolerance_json_path = put_path.join("tolerance_settings.json"); - std::fs::write(egs_json_path.clone(), egs_json).unwrap(); - std::fs::write(tolerance_json_path.clone(), tolerance_json).unwrap(); - println!( - "use as `timsquery query-index --pretty --output-path '.' --raw-file-path 'your_file.d' --tolerance-settings-path {:#?} --elution-groups-path {:#?}`", - tolerance_json_path, - egs_json_path, - ); -} + let start = std::time::Instant::now(); + let mut out = Vec::with_capacity(tmp.len()); + for (res, eg) in tmp.into_iter().zip(elution_groups) { + out.push(ElutionGroupResults { + elution_group: eg, + result: res.finalize(), + }); + } + let elapsed = start.elapsed(); + println!("Finalizing query took {:#?}", elapsed); + // info!("{:#?}", out); -fn main_query_index(args: QueryIndexArgs) { - let raw_file_path = args.raw_file_path; - let tolerance_settings_path = args.tolerance_settings_path; - let elution_groups_path = args.elution_groups_path; - let output_path = args.output_path; - let index_use = args.index; + let put_path = std::path::Path::new(&output_path).join("results.json"); + std::fs::create_dir_all(put_path.parent().unwrap()).unwrap(); + println!("Writing to {}", put_path.display()); - let tolerance_settings: DefaultTolerance = - serde_json::from_str(&std::fs::read_to_string(&tolerance_settings_path).unwrap()).unwrap(); - let elution_groups: Vec = - serde_json::from_str(&std::fs::read_to_string(&elution_groups_path).unwrap()).unwrap(); - - let aggregator_factory = |_id| RawPeakIntensityAggregator { intensity: 0 }; - let foo = if (elution_groups.len() > 10) || index_use == PossibleIndex::TransposedQuadIndex { - let index = QuadSplittedTransposedIndex::from_path(&(raw_file_path.clone())).unwrap(); - query_multi_group( - &index, - &index, - &tolerance_settings, - &elution_groups, - &aggregator_factory, - ) - } else { - let index = RawFileIndex::from_path(&(raw_file_path.clone())).unwrap(); - query_multi_group( - &index, - &index, - &tolerance_settings, - &elution_groups, - &aggregator_factory, - ) - }; - - let mut out = Vec::new(); - for (res, eg) in foo.into_iter().zip(elution_groups) { - out.push(ElutionGroupResults { - elution_group: eg, - result: res.finalize(), - }); + let serialized = if pretty_print { + println!("Pretty printing enabled"); + serde_json::to_string_pretty(&out).unwrap() + } else { + serde_json::to_string(&out).unwrap() + }; + std::fs::write(put_path, serialized).unwrap(); + }; } - let put_path = std::path::Path::new(&output_path).join("results.json"); - std::fs::create_dir_all(put_path.parent().unwrap()).unwrap(); - println!("Writing to {}", put_path.display()); + match (index, aggregator) { + (PossibleIndex::TransposedQuadIndex, aggregator) => { + let index = QuadSplittedTransposedIndex::from_path(&(raw_file_path.clone())).unwrap(); + match aggregator { + PossibleAggregator::RawPeakIntensityAggregator => { + let aggregator = RawPeakIntensityAggregator::new; + execute_query_inner!(index, aggregator); + } + PossibleAggregator::RawPeakVectorAggregator => { + let aggregator = RawPeakVectorAggregator::new; + execute_query_inner!(index, aggregator); + } + PossibleAggregator::ChromatoMobilogramStat => { + let aggregator = ChromatomobilogramStats::new; + execute_query_inner!(index, aggregator); + } + PossibleAggregator::ExtractedIonChromatomobilogram => { + let aggregator = ExtractedIonChromatomobilogram::new; + execute_query_inner!(index, aggregator); + } + } + } + (PossibleIndex::RawFileIndex, aggregator) => { + let index = RawFileIndex::from_path(&(raw_file_path.clone())).unwrap(); + match aggregator { + PossibleAggregator::RawPeakIntensityAggregator => { + let aggregator = RawPeakIntensityAggregator::new; + execute_query_inner!(index, aggregator); + } + PossibleAggregator::RawPeakVectorAggregator => { + let aggregator = RawPeakVectorAggregator::new; + execute_query_inner!(index, aggregator); + } + PossibleAggregator::ChromatoMobilogramStat => { + let aggregator = ChromatomobilogramStats::new; + execute_query_inner!(index, aggregator); + } + PossibleAggregator::ExtractedIonChromatomobilogram => { + let aggregator = ExtractedIonChromatomobilogram::new; + execute_query_inner!(index, aggregator); + } + } + } - let serialized = if args.pretty { - println!("Pretty printing enabled"); - serde_json::to_string_pretty(&out).unwrap() - } else { - serde_json::to_string(&out).unwrap() - }; - std::fs::write(put_path, serialized).unwrap(); + (PossibleIndex::Unspecified, _) => { + panic!("Should have been handled"); + } + } } - -// fn main() { -// println!("Hello, world!"); -// let qst_file_index = QuadSplittedTransposedIndex::from_path(&(raw_file_path.clone())).unwrap(); -// let tolerance = DefaultTolerance::default(); -// let aggregator_factory = |id| RawPeakIntensityAggregator { intensity: 0 }; -// let foo = query_multi_group( -// &qst_file_index, -// &qst_file_index, -// &tolerance, -// &query_groups, -// &aggregator_factory, -// ); -// } diff --git a/src/models/aggregators/mod.rs b/src/models/aggregators/mod.rs index 198f78e..9143fd7 100644 --- a/src/models/aggregators/mod.rs +++ b/src/models/aggregators/mod.rs @@ -1,3 +1,7 @@ pub mod raw_peak_agg; +pub mod streaming_aggregator; +pub use raw_peak_agg::ChromatomobilogramStats; +pub use raw_peak_agg::ExtractedIonChromatomobilogram; pub use raw_peak_agg::RawPeakIntensityAggregator; +pub use raw_peak_agg::RawPeakVectorAggregator; diff --git a/src/models/aggregators/raw_peak_agg.rs b/src/models/aggregators/raw_peak_agg.rs deleted file mode 100644 index f4f4a9e..0000000 --- a/src/models/aggregators/raw_peak_agg.rs +++ /dev/null @@ -1,22 +0,0 @@ -use crate::models::frames::raw_peak::RawPeak; -use crate::traits::aggregator::Aggregator; - -pub struct RawPeakIntensityAggregator { - pub intensity: u64, -} - -impl Aggregator for RawPeakIntensityAggregator { - type Output = u64; - - fn add(&mut self, peak: &RawPeak) { - self.intensity += peak.intensity as u64; - } - - fn fold(&mut self, other: Self) { - self.intensity += other.intensity; - } - - fn finalize(&self) -> u64 { - self.intensity - } -} diff --git a/src/models/aggregators/raw_peak_agg/chromatogram_agg.rs b/src/models/aggregators/raw_peak_agg/chromatogram_agg.rs new file mode 100644 index 0000000..b6df18d --- /dev/null +++ b/src/models/aggregators/raw_peak_agg/chromatogram_agg.rs @@ -0,0 +1,161 @@ +use serde::Serialize; +use std::collections::BTreeMap; + +use super::super::streaming_aggregator::{RunningStatsCalculator, StreamingAggregatorError}; +use crate::models::frames::raw_peak::RawPeak; +use crate::traits::aggregator::Aggregator; + +#[derive(Debug, Clone, Serialize)] +pub struct ExtractedIonChromatomobilogram { + pub rt_tree: BTreeMap, + pub scan_tree: BTreeMap, + pub tof_tree: BTreeMap, + pub id: u64, +} + +impl ExtractedIonChromatomobilogram { + pub fn new(id: u64) -> Self { + Self { + rt_tree: BTreeMap::new(), + scan_tree: BTreeMap::new(), + tof_tree: BTreeMap::new(), + id, + } + } +} + +impl Aggregator for ExtractedIonChromatomobilogram { + type Output = ChromatomobilogramVectorArrayTuples; + + fn add(&mut self, peak: &RawPeak) { + let u64_intensity = peak.intensity as u64; + + // In theory I could use a power of 2 to have a better preservation of + // the precision. + // TODO make this macro ... right now it feels very repetitive. + let rt_miliseconds = (peak.retention_time * 1000.0) as u32; + + self.rt_tree + .entry(rt_miliseconds) + .and_modify(|curr| *curr += u64_intensity) + .or_insert(u64_intensity); + + self.scan_tree + .entry(peak.scan_index) + .and_modify(|curr| *curr += u64_intensity) + .or_insert(u64_intensity); + + self.tof_tree + .entry(peak.tof_index) + .and_modify(|curr| *curr += u64_intensity) + .or_insert(u64_intensity); + } + + fn fold(&mut self, other: Self) { + panic!("Not Implemented;") + } + + fn finalize(self) -> ChromatomobilogramVectorArrayTuples { + ChromatomobilogramVectorArrayTuples { + scan_indices: self.scan_tree.into_iter().collect(), + tof_indices: self.tof_tree.into_iter().collect(), + retention_times: self + .rt_tree + .into_iter() + .map(|(k, v)| ((k as f32) / 1000.0, v)) + .collect(), + } + } +} + +#[derive(Debug, Clone)] +pub struct ChromatomobilogramStats { + // TODO OPTIMIZE THIS ... as needed. + // In theory we can optimize this to make a single aggregator struct + // that shares the weight (intensity), since all will have the same weight + // and retention times. + pub scan_tree: BTreeMap, + pub tof_tree: BTreeMap, + pub id: u64, +} + +impl ChromatomobilogramStats { + pub fn new(id: u64) -> Self { + Self { + scan_tree: BTreeMap::new(), + tof_tree: BTreeMap::new(), + id, + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct ChromatomobilogramStatsArrays { + pub retention_time_miliseconds: Vec, + pub tof_index_means: Vec, + pub tof_index_sds: Vec, + pub scan_index_means: Vec, + pub scan_index_sds: Vec, +} + +impl Aggregator for ChromatomobilogramStats { + type Output = ChromatomobilogramStatsArrays; + + fn add(&mut self, peak: &RawPeak) { + let u64_intensity = peak.intensity as u64; + let rt_miliseconds = (peak.retention_time * 1000.0) as u32; + + // TODO make this macro as well ... + self.scan_tree + .entry(rt_miliseconds) + .and_modify(|curr| curr.add(peak.scan_index as f64, u64_intensity)) + .or_insert(RunningStatsCalculator::new( + u64_intensity, + peak.scan_index as f64, + )); + self.tof_tree + .entry(rt_miliseconds) + .and_modify(|curr| curr.add(peak.tof_index as f64, u64_intensity)) + .or_insert(RunningStatsCalculator::new( + u64_intensity, + peak.tof_index as f64, + )); + } + + fn fold(&mut self, _other: Self) { + panic!("Not Implemented;") + } + + fn finalize(self) -> ChromatomobilogramStatsArrays { + ChromatomobilogramStatsArrays { + retention_time_miliseconds: self.scan_tree.keys().map(|x| *x).collect::>(), + tof_index_means: self + .tof_tree + .values() + .map(|x| x.mean().unwrap()) + .collect::>(), + tof_index_sds: self + .tof_tree + .values() + .map(|x| x.standard_deviation().unwrap()) + .collect::>(), + scan_index_means: self + .scan_tree + .values() + .map(|x| x.mean().unwrap()) + .collect::>(), + scan_index_sds: self + .scan_tree + .values() + .map(|x| x.standard_deviation().unwrap()) + .collect::>(), + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct ChromatomobilogramVectorArrayTuples { + pub scan_indices: Vec<(usize, u64)>, + pub tof_indices: Vec<(u32, u64)>, + pub retention_times: Vec<(f32, u64)>, +} diff --git a/src/models/aggregators/raw_peak_agg/mod.rs b/src/models/aggregators/raw_peak_agg/mod.rs new file mode 100644 index 0000000..41721b2 --- /dev/null +++ b/src/models/aggregators/raw_peak_agg/mod.rs @@ -0,0 +1,7 @@ +pub mod chromatogram_agg; +pub mod point_agg; + +pub use chromatogram_agg::ChromatomobilogramStats; +pub use chromatogram_agg::ExtractedIonChromatomobilogram; +pub use point_agg::RawPeakIntensityAggregator; +pub use point_agg::RawPeakVectorAggregator; diff --git a/src/models/aggregators/raw_peak_agg/point_agg.rs b/src/models/aggregators/raw_peak_agg/point_agg.rs new file mode 100644 index 0000000..67d1cf5 --- /dev/null +++ b/src/models/aggregators/raw_peak_agg/point_agg.rs @@ -0,0 +1,89 @@ +use crate::models::frames::raw_peak::RawPeak; +use crate::traits::aggregator::Aggregator; +use serde::Serialize; + +#[derive(Debug, Clone, Copy)] +pub struct RawPeakIntensityAggregator { + pub id: u64, + pub intensity: u64, +} + +impl RawPeakIntensityAggregator { + pub fn new(id: u64) -> Self { + Self { id, intensity: 0 } + } +} + +impl Aggregator for RawPeakIntensityAggregator { + type Output = u64; + + fn add(&mut self, peak: &RawPeak) { + self.intensity += peak.intensity as u64; + } + + fn fold(&mut self, other: Self) { + self.intensity += other.intensity; + } + + fn finalize(self) -> u64 { + self.intensity + } +} + +#[derive(Debug, Clone)] +pub struct RawPeakVectorAggregator { + pub id: u64, + pub peaks: RawPeakVectorArrays, +} + +impl RawPeakVectorAggregator { + pub fn new(id: u64) -> Self { + Self { + id, + peaks: RawPeakVectorArrays::new(), + } + } +} + +impl RawPeakVectorArrays { + pub fn new() -> Self { + Self { + scans: Vec::new(), + tofs: Vec::new(), + intensities: Vec::new(), + retention_times: Vec::new(), + } + } +} + +#[derive(Debug, Clone, Serialize, Default)] +pub struct RawPeakVectorArrays { + pub scans: Vec, + pub tofs: Vec, + pub intensities: Vec, + pub retention_times: Vec, +} + +impl Aggregator for RawPeakVectorAggregator { + type Output = RawPeakVectorArrays; + + fn add(&mut self, peak: &RawPeak) { + self.peaks.scans.push(peak.scan_index); + self.peaks.tofs.push(peak.tof_index); + self.peaks.intensities.push(peak.intensity); + self.peaks.retention_times.push(peak.retention_time); + } + + fn fold(&mut self, other: Self) { + self.peaks.scans.extend(other.peaks.scans); + self.peaks.tofs.extend(other.peaks.tofs); + self.peaks.intensities.extend(other.peaks.intensities); + self.peaks + .retention_times + .extend(other.peaks.retention_times); + } + + fn finalize(self) -> RawPeakVectorArrays { + self.peaks + } +} diff --git a/src/models/aggregators/streaming_aggregator.rs b/src/models/aggregators/streaming_aggregator.rs new file mode 100644 index 0000000..4bb08cd --- /dev/null +++ b/src/models/aggregators/streaming_aggregator.rs @@ -0,0 +1,181 @@ +use log::debug; + +// Generic streaming aggregator that takes a pair of unsigned ints one with a value +// and another with a weight and in a streaming fashion adds the value to the accumulator +// to calculate the total, mean and variance. + +#[derive(Debug, Clone, Copy)] +pub enum StreamingAggregatorError { + DivisionByZero, + NotEnoughData, +} + +type Result = std::result::Result; + +pub struct StreamingMeanAggregator { + pub aggregation: u128, + pub weight: u128, // Pretty sure an u64 is enough ... + pub points: u64, +} + +impl StreamingMeanAggregator { + pub fn new() -> Self { + Self { + aggregation: 0, + weight: 0, + points: 0, + } + } + + fn add(&mut self, value: u128, weight: u128) { + self.aggregation += value; + self.weight += weight; + self.points += 1; + } + + fn fold(&mut self, other: Self) { + self.aggregation += other.aggregation; + self.weight += other.weight; + } + + fn mean(&self) -> Result { + if self.weight == 0 { + return Err(StreamingAggregatorError::NotEnoughData); + } + Ok(self.aggregation as f64 / self.weight as f64) + } +} + +/// Ref impl in javascript ... +/// https://nestedsoftware.com/2018/03/27/calculating-standard-deviation-on-streaming-data-253l.23919.html +/// https://nestedsoftware.com/2019/09/26/incremental-average-and-standard-deviation-with-sliding-window-470k.176143.html +/// Read the blog ... its amazing. +/// +#[derive(Debug, Clone, Copy, Default)] +pub struct RunningStatsCalculator { + weight: u64, + mean_n: f64, + d_: f64, + // count: u64, +} + +impl RunningStatsCalculator { + pub fn new(weight: u64, mean: f64) -> Self { + Self { + weight, + mean_n: mean, + d_: 0.0, + // count: 0, + } + } + + /// Add a new value to the running stats calculator. + pub fn add(&mut self, value: f64, weight: u64) { + // Update the mean + // I am using a default weight of 1 for now ... not 100% sure how much + // tha treally matters but it would fix the division by zero. + debug_assert!(weight >= 1, "Weight must be >= 1"); + let weight_ratio = weight as f64 / self.weight as f64; + let delta = value - self.mean_n; + let last_mean_n = self.mean_n; + self.mean_n += delta * weight_ratio; + + // Update the variance + let to_add = weight as f64 * (value - self.mean_n) * (value - last_mean_n); + self.d_ += to_add; + + // Update the weight + self.weight += weight; + } + + pub fn mean(&self) -> Result { + if self.weight == 0 { + return Err(StreamingAggregatorError::NotEnoughData); + } + Ok(self.mean_n) + } + + pub fn variance(&self) -> Result { + if self.weight == 0 { + return Err(StreamingAggregatorError::NotEnoughData); + } + Ok(self.d_.abs() / self.weight as f64) + } + + pub fn standard_deviation(&self) -> Result { + let variance = self.variance()?; + if variance.is_nan() { + debug!("variance is nan, state -> {:?}", self); + }; + if variance.is_infinite() { + debug!("variance is infinite, state -> {:?}", self); + }; + if variance.is_sign_negative() { + debug!("variance is negative, state -> {:?}", self); + }; + Ok(variance.sqrt()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_running_stats_calculator() { + let mut calc = RunningStatsCalculator::new(10, 0.0); + calc.add(10.0, 2); + calc.add(10.0, 2); + calc.add(10.0, 2); + calc.add(10.0, 2); + calc.add(10.0, 2); + assert!(calc.mean().unwrap() < 5.6); + assert!(calc.mean().unwrap() > 4.4); + assert!(calc.variance().unwrap() > 15.); + assert!(calc.variance().unwrap() < 25.); + assert!(calc.standard_deviation().unwrap() > 4.5); + assert!(calc.standard_deviation().unwrap() < 5.5); + } + + // https://www.kaggle.com/datasets/carlmcbrideellis/data-anscombes-quartet?resource=download + // ascombes quarted data from kaggle + // + // Both have real mean of 7.5 and std of 1.94 + const ASCOMBES_3: [f64; 11] = [ + 7.46, 6.77, 12.74, 7.11, 7.81, 8.84, 6.08, 5.39, 8.15, 6.42, 5.73, + ]; + const ASCOMBES_4: [f64; 11] = [ + 6.58, 5.76, 7.71, 8.84, 8.47, 7.04, 5.25, 12.5, 5.56, 7.91, 6.89, + ]; + + #[test] + fn test_running_stats_calculator_ascombes_3() { + let mut calc = RunningStatsCalculator::new(1, ASCOMBES_3[0]); + for i in 1..ASCOMBES_3.len() { + calc.add(ASCOMBES_3[i], 1); + } + assert!(calc.mean().unwrap() < 7.6); + assert!(calc.mean().unwrap() > 7.4); + assert!(calc.standard_deviation().unwrap() > 1.92); + assert!(calc.standard_deviation().unwrap() < 1.99); + } + + #[test] + fn test_running_stats_calculator_ascombes_4() { + let mut calc = RunningStatsCalculator::new(1, ASCOMBES_4[0]); + for i in 1..ASCOMBES_4.len() { + calc.add(ASCOMBES_4[i], 1); + } + assert!(calc.mean().unwrap() < 7.6); + assert!(calc.mean().unwrap() > 7.4); + + // Note that the tolerance here is a hair higher ... bc there + // is an outlier value. + assert!( + calc.standard_deviation().unwrap() > 1.91, + "Expected > 1.92, got {}", + calc.standard_deviation().unwrap() + ); + assert!(calc.standard_deviation().unwrap() < 1.99); + } +} diff --git a/src/models/frames/mod.rs b/src/models/frames/mod.rs index ce34356..7cffa4e 100644 --- a/src/models/frames/mod.rs +++ b/src/models/frames/mod.rs @@ -1,5 +1,5 @@ pub mod expanded_frame; +pub mod expanded_window_group; pub mod raw_frames; pub mod raw_peak; pub mod single_quad_settings; -pub mod expanded_window_group; diff --git a/src/models/frames/raw_frames.rs b/src/models/frames/raw_frames.rs index 21d66e7..689b0b7 100644 --- a/src/models/frames/raw_frames.rs +++ b/src/models/frames/raw_frames.rs @@ -42,14 +42,37 @@ pub fn frame_elems_matching<'a>( scan_range, quad_range ); - let quad_range = quad_range + let quad_scan_range = quad_range .and_then(|quad_range| scans_matching_quad(&frame.quadrupole_settings, quad_range)); - let (scan_ind_start, scan_ind_end): (usize, usize) = match quad_range { - Some((start, end)) => (start.max(scan_range.0), end.min(scan_range.1)), - None => (scan_range.0, scan_range.1), + let scan_range_use = if quad_scan_range.is_none() { + 0..0 + } else { + let quad_scan_range = quad_scan_range.unwrap(); + + // Only checkinghere bc its common for them to get flipped + // bc skill issues. (and the highest scan is actually the lowest 1/k0) + let min_scan = scan_range.0.min(scan_range.1); + let max_scan = scan_range.0.max(scan_range.1); + + let min_quad_scan = quad_scan_range.0.min(quad_scan_range.1); + let max_quad_scan = quad_scan_range.0.max(quad_scan_range.1); + + let (scan_ind_start, scan_ind_end): (usize, usize) = + (min_quad_scan.max(min_scan), max_quad_scan.min(max_scan)); + + if scan_ind_end < scan_ind_start { + // This can happen if the quad range matches but not the scan range. + // TODO refactor this logic... + 0..0 + } else { + scan_ind_start..scan_ind_end + } }; - (scan_ind_start..scan_ind_end) + + let retention_time = frame.rt as f32; + + scan_range_use .map(move |scan_index| { let scan_is = frame.scan_offsets[scan_index]; let scan_ie = frame.scan_offsets[scan_index + 1]; @@ -66,6 +89,7 @@ pub fn frame_elems_matching<'a>( scan_index, tof_index: tof_ind, intensity, + retention_time, }) .collect(); diff --git a/src/models/frames/raw_peak.rs b/src/models/frames/raw_peak.rs index e7ad2f4..9d6a8f7 100644 --- a/src/models/frames/raw_peak.rs +++ b/src/models/frames/raw_peak.rs @@ -1,6 +1,9 @@ -#[derive(Debug, Clone, Copy)] +use serde::Serialize; + +#[derive(Debug, Clone, Copy, Serialize)] pub struct RawPeak { pub scan_index: usize, pub tof_index: u32, pub intensity: u32, + pub retention_time: f32, } diff --git a/src/models/indices/raw_file_index.rs b/src/models/indices/raw_file_index.rs index 62c55a0..5608d0a 100644 --- a/src/models/indices/raw_file_index.rs +++ b/src/models/indices/raw_file_index.rs @@ -16,6 +16,7 @@ use crate::models::queries::{ }; use crate::traits::indexed_data::IndexedData; use crate::ToleranceAdapter; +use log::trace; pub struct RawFileIndex { file_reader: FrameReader, @@ -64,6 +65,8 @@ impl RawFileIndex { fqs: &'b FragmentGroupIndexQuery, fun: &'c mut dyn for<'r> FnMut(RawPeak, Arc), ) { + trace!("RawFileIndex::apply_on_query"); + trace!("FragmentGroupIndexQuery: {:?}", fqs); let frames: Vec = self .read_frames_in_range(&fqs.precursor_query.frame_index_range) .filter(|x| x.is_ok()) @@ -107,9 +110,9 @@ impl RawFileIndex { let quad_range = tol.quad_range(elution_group.precursor_mz, elution_group.precursor_charge); let mut min_scan_index = - self.meta_converters.rt_converter.invert(mobility_range.0) as usize; + self.meta_converters.im_converter.invert(mobility_range.0) as usize; let mut max_scan_index = - self.meta_converters.rt_converter.invert(mobility_range.1) as usize; + self.meta_converters.im_converter.invert(mobility_range.1) as usize; if min_scan_index > max_scan_index { std::mem::swap(&mut min_scan_index, &mut max_scan_index); diff --git a/src/models/indices/transposed_quad_index.rs b/src/models/indices/transposed_quad_index.rs index 7daf3e0..11ce34f 100644 --- a/src/models/indices/transposed_quad_index.rs +++ b/src/models/indices/transposed_quad_index.rs @@ -9,22 +9,25 @@ use crate::utils::compress_explode::{compress_vec, explode_vec}; use crate::utils::display::{glimpse_vec, GlimpseConfig}; use crate::utils::sorting::argsort_by; use crate::ToleranceAdapter; -use core::num; use core::panic; +use itertools::Itertools; use log::trace; use log::{debug, info}; +use std::collections::BTreeMap; use rayon::prelude::*; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::fmt::Display; use std::sync::Arc; use timsrust::converters::{ ConvertableDomain, Frame2RtConverter, Scan2ImConverter, Tof2MzConverter, }; use timsrust::readers::{FrameReader, MetadataReader}; +use timsrust::Frame; use timsrust::TimsRustError; -use timsrust::{Frame, QuadrupoleSettings}; + +use indicatif::ProgressIterator; // TODO break this module apart ... its getting too big for my taste // - JSP: 2024-11-19 @@ -65,18 +68,25 @@ impl QuadSplittedTransposedIndex { rt_range: Option, ) -> impl Iterator + '_ { trace!( - "QuadSplittedTransposedIndex::query_peaks tof_range: {:?}, scan_range: {:?}, rt_range: {:?}", + "QuadSplittedTransposedIndex::query_peaks tof_range: {:?}, scan_range: {:?}, rt_range: {:?}, precursor_mz_range: {:?}", tof_range, scan_range, rt_range, + precursor_mz_range, ); - let matching_quads = self.get_matching_quad_settings(precursor_mz_range, scan_range); + // let matching_quads = self.get_matching_quad_settings(precursor_mz_range, scan_range); + let matching_quads: Vec = self + .get_matching_quad_settings(precursor_mz_range, scan_range) + .collect(); + trace!("matching_quads: {:?}", matching_quads); + let rt_range = match rt_range { Some(x) => Some(x.to_frame_index_range(&self.rt_converter)), None => None, }; matching_quads + .into_iter() .map(move |qs| { let tqi = self.indices.get(&qs).unwrap(); tqi.query_peaks(tof_range, scan_range, rt_range.clone()) @@ -93,11 +103,15 @@ impl QuadSplittedTransposedIndex { .iter() .filter(move |qs| { (qs.ranges.isolation_low <= precursor_mz_range.1) - && (precursor_mz_range.0 <= qs.ranges.isolation_high) + && (qs.ranges.isolation_high >= precursor_mz_range.0) }) .filter(move |qs| { if let Some(scan_range) = scan_range { - (qs.ranges.scan_start <= scan_range.1) && (scan_range.0 <= qs.ranges.scan_end) + // This is done for sanity tbh ... sometimes they get flipped + // bc the lowest scan is actually the highest 1/k0. + let min_scan = qs.ranges.scan_start.min(qs.ranges.scan_end); + let max_scan = qs.ranges.scan_start.max(qs.ranges.scan_end); + (min_scan <= scan_range.1) && (scan_range.0 <= max_scan) } else { true } @@ -123,12 +137,6 @@ impl QuadSplittedTransposedIndex { self.im_converter.invert(mobility_range.0) as usize, self.im_converter.invert(mobility_range.1) as usize, ); - let isolation_mz_range = ( - self.mz_converter - .invert(precursor_mz_range.0 - quad_range.0 as f64) as f32, - self.mz_converter - .invert(precursor_mz_range.1 + quad_range.1 as f64) as f32, - ); let frame_index_range = ( self.rt_converter.invert(rt_range.0) as usize, self.rt_converter.invert(rt_range.1) as usize, @@ -136,7 +144,6 @@ impl QuadSplittedTransposedIndex { assert!(frame_index_range.0 <= frame_index_range.1); assert!(mz_index_range.0 <= mz_index_range.1); - assert!(isolation_mz_range.0 <= isolation_mz_range.1); // assert!(mobility_index_range.0 <= mobility_index_range.1); let mobility_index_range = ( mobility_index_range.0.min(mobility_index_range.1) as usize, @@ -147,7 +154,7 @@ impl QuadSplittedTransposedIndex { frame_index_range, mz_index_range, mobility_index_range, - isolation_mz_range, + isolation_mz_range: quad_range, }; // TODO: change this unwrap and use explicitly the lack of fragment mzs. @@ -224,6 +231,7 @@ pub struct QuadSplittedTransposedIndexBuilder { impl QuadSplittedTransposedIndexBuilder { fn add_frame(&mut self, frame: Frame) { let expanded_quad_settings = expand_quad_settings(&frame.quadrupole_settings); + let exploded_scans = explode_vec(&frame.scan_offsets); for qs in expanded_quad_settings { // Add key if it doesnt exist ... @@ -233,15 +241,27 @@ impl QuadSplittedTransposedIndexBuilder { "Adding new transposed quad index for qs {:?} with max tof {}", qs, max_tof ); - self.indices.insert( - qs, - TransposedQuadIndexBuilder::new(qs.clone(), *max_tof as usize), - ); + let new_index = TransposedQuadIndexBuilder::new(qs.clone()); + self.indices.insert(qs, new_index); } - self.indices - .get_mut(&qs) - .unwrap() - .add_frame_slice(&frame, (qs.ranges.scan_start, qs.ranges.scan_end)); + + let peak_start = frame.scan_offsets[qs.ranges.scan_start]; + let peak_end = frame.scan_offsets[qs.ranges.scan_end + 1]; + + let int_slice = frame.intensities[peak_start..peak_end].to_vec(); + let tof_slice = frame.tof_indices[peak_start..peak_end].to_vec(); + let expanded_scan_slice = exploded_scans[peak_start..peak_end].to_vec(); + + let frame_index = frame.index; + let frame_rt = frame.rt; + + self.indices.get_mut(&qs).unwrap().add_frame_slice( + int_slice, + tof_slice, + expanded_scan_slice, + frame_index, + frame_rt, + ); } } @@ -257,17 +277,18 @@ impl QuadSplittedTransposedIndexBuilder { mz_converter: meta_converters.mz_converter, im_converter: meta_converters.im_converter, }; - for frame in file_reader.get_all() { + for frame in file_reader.get_all().into_iter().progress() { let frame = frame?; out.add_frame(frame); } + Ok(out) } fn build(self) -> QuadSplittedTransposedIndex { let mut indices = HashMap::new(); let mut flat_quad_settings = Vec::new(); - for (qs, builder) in self.indices { + for (qs, builder) in self.indices.into_iter().progress() { let tmp = Arc::new(qs); indices.insert(tmp.clone(), builder.build()); flat_quad_settings.push(qs.clone()); @@ -294,16 +315,13 @@ impl QuadSplittedTransposedIndexBuilder { #[derive(Debug)] struct TransposedQuadIndex { quad_settings: SingleQuadrupoleSetting, - frame_index_rt_pairs: Vec<(usize, f64)>, - peak_buckets: Vec>, + frame_rts: Vec, + frame_indices: Vec, + peak_buckets: BTreeMap, // 72 bytes for peak Option (24/vec) // Conservatvely ... 30_000_000 elements can be layed out in a vec. // 638425 is the max observed val for a tof index ... So we dont need an // additional bucketing. - - // In some of my data a lot of the buckets are empty. - // I could maybe use a sparse representation ... btree? hashmap? - // num_none: 511271, num_some: 127161 } fn display_opt_peak_bucket(opt_peak_bucket: &Option) -> String { @@ -360,10 +378,11 @@ impl Display for TransposedQuadIndex { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "TransposedQuadIndex\n quad_settings: {}\n frame_index_rt_pairs: {}\n peak_buckets: {}\n", + "TransposedQuadIndex\n quad_settings: {}\n frame_indices: {}\n frame_rts: {}\n peak_buckets: NOT SHOWING\n", self.quad_settings, - glimpse_vec(&self.frame_index_rt_pairs, Some(GlimpseConfig { max_items: 10, padding: 2, new_line: true })), - display_opt_peak_bucket_vec(&self.peak_buckets), + glimpse_vec(&self.frame_indices, Some(GlimpseConfig { max_items: 10, padding: 2, new_line: true })), + glimpse_vec(&self.frame_rts, Some(GlimpseConfig { max_items: 10, padding: 2, new_line: true })), + // display_opt_peak_bucket_vec(&self.peak_buckets), ) } } @@ -372,7 +391,7 @@ impl Display for TransposedQuadIndex { pub struct PeakInQuad { pub scan_index: usize, pub intensity: u32, - pub frame_index: usize, + pub retention_time: f32, pub tof_index: u32, } @@ -381,7 +400,7 @@ impl PeakInQuad { Self { scan_index: peak_in_bucket.scan_index, intensity: peak_in_bucket.intensity, - frame_index: peak_in_bucket.local_frame_index as usize, + retention_time: peak_in_bucket.retention_time, tof_index, } } @@ -406,54 +425,41 @@ impl TransposedQuadIndex { // which will do for now for prototyping. let frame_index_range = self.convert_to_local_frame_range(rt_range); - // Coult I just return an Arc<[intensities]> + ... - // If I made the peak buckets sparse, I could make it ... not be an option. - (tof_range.0..tof_range.1) - .filter(|tof_index| self.peak_buckets[*tof_index as usize].is_some()) - .map(move |tof_index| { - self.peak_buckets[tof_index as usize] - .as_ref() - .unwrap() - .query_peaks(scan_range, frame_index_range) + self.peak_buckets + .range(tof_range.0..tof_range.1) + .map(move |(tof_index, pb)| { + pb.query_peaks(scan_range, frame_index_range) .map(move |p| PeakInQuad::from_peak_in_bucket(p, tof_index.clone())) }) .flatten() + // Coult I just return an Arc<[intensities]> + ... + // If I made the peak buckets sparse, I could make it ... not be an option. } fn convert_to_local_frame_range( &self, rt_range: Option, - ) -> Option<(LocalFrameIndex, LocalFrameIndex)> { - let frame_index_range: Option<(LocalFrameIndex, LocalFrameIndex)> = match rt_range { + ) -> Option<(f32, f32)> { + let frame_index_range = match rt_range { Some(FrameRTTolerance::Seconds((rt_low, rt_high))) => { - let frame_id_start = self - .frame_index_rt_pairs - .binary_search_by(|x| x.1.partial_cmp(&rt_low).unwrap()) - .unwrap_or_else(|x| x); - let frame_id_end = self - .frame_index_rt_pairs - .binary_search_by(|x| x.1.partial_cmp(&rt_high).unwrap()) - .unwrap_or_else(|x| x); - - Some(( - frame_id_start as LocalFrameIndex, - frame_id_end as LocalFrameIndex, - )) + Some((rt_low as f32, rt_high as f32)) } Some(FrameRTTolerance::FrameIndex((frame_low, frame_high))) => { let frame_id_start = self - .frame_index_rt_pairs - .binary_search_by(|x| x.0.partial_cmp(&frame_low).unwrap()) + .frame_indices + .binary_search_by(|x| x.cmp(&frame_low)) .unwrap_or_else(|x| x); let frame_id_end = self - .frame_index_rt_pairs - .binary_search_by(|x| x.0.partial_cmp(&frame_high).unwrap()) + .frame_indices + .binary_search_by(|x| x.cmp(&frame_high)) .unwrap_or_else(|x| x); + // TODO consider throwing a warning if we are + // out of bounds here. Some(( - frame_id_start as LocalFrameIndex, - frame_id_end as LocalFrameIndex, + self.frame_rts[frame_id_start.min(self.frame_rts.len() - 1)] as f32, + self.frame_rts[frame_id_end.min(self.frame_rts.len() - 1)] as f32, )) } None => None, @@ -467,100 +473,132 @@ impl TransposedQuadIndex { frame_index_range } - - pub fn query_tof_index(&self, tof_index: u32) -> Option<&PeakBucket> { - self.peak_buckets[tof_index as usize].as_ref() - } } struct TransposedQuadIndexBuilder { quad_settings: SingleQuadrupoleSetting, - peak_bucket_builders: Vec>, - frame_indices: HashMap, - frame_index_rt_pairs: Vec<(usize, f64)>, - num_frames: u32, - last_frame_index: usize, + int_slices: Vec>, + tof_slices: Vec>, + scan_slices: Vec>, + frame_indices: Vec, + frame_rts: Vec, } impl TransposedQuadIndexBuilder { - fn new(quad_settings: SingleQuadrupoleSetting, init_max_tof: usize) -> Self { - let mut builder_vec = Vec::with_capacity(init_max_tof); - for _ in 0..init_max_tof { - builder_vec.push(None); - } + fn new(quad_settings: SingleQuadrupoleSetting) -> Self { Self { quad_settings, - peak_bucket_builders: builder_vec, - frame_indices: HashMap::new(), - frame_index_rt_pairs: Vec::new(), - num_frames: 0, - last_frame_index: 0, + int_slices: Vec::new(), + tof_slices: Vec::new(), + scan_slices: Vec::new(), + frame_indices: Vec::new(), + frame_rts: Vec::new(), } } - fn add_frame_slice(&mut self, frame: &Frame, scan_range: (usize, usize)) { - let frame_index = frame.index; - let frame_rt = frame.rt; - self.add_frame_index_rt(frame_index, frame_rt); - for scan_index in scan_range.0..(scan_range.1 - 1) { - let peak_index_start = frame.scan_offsets[scan_index]; - let peak_index_end = frame.scan_offsets[scan_index + 1]; + fn add_frame_slice( + &mut self, + int_slice: Vec, + tof_slice: Vec, + expanded_scan_slice: Vec, + frame_index: usize, + frame_rt: f64, + ) { + assert!(int_slice.len() == tof_slice.len()); + assert!(int_slice.len() == expanded_scan_slice.len()); - for peak_index in peak_index_start..peak_index_end { - let tof_index = frame.tof_indices[peak_index]; - let intensity = frame.intensities[peak_index]; - self.add_peak(tof_index, scan_index, intensity, frame_index); - } - } + self.int_slices.push(int_slice); + self.tof_slices.push(tof_slice); + self.scan_slices.push(expanded_scan_slice); + self.frame_indices.push(frame_index); + self.frame_rts.push(frame_rt); } + fn build(self) -> TransposedQuadIndex { + let max_tof = *self + .tof_slices + .iter() + .map(|x| x.iter().max().unwrap()) + .max() + .unwrap(); + let mut tof_counts = vec![0; max_tof as usize + 1]; + let mut tot_peaks = 0; + for tof_slice in self.tof_slices.iter() { + for tof in tof_slice.iter() { + tof_counts[*tof as usize] += 1; + tot_peaks += 1; + } + } - pub fn add_frame_index_rt(&mut self, frame_index: usize, rt: f64) { - self.num_frames += 1; - // Make sure they are inserted in order. - if self.last_frame_index > frame_index { - panic!("Frame index out of order"); + let mut peak_buckets = HashMap::with_capacity((max_tof + 1).try_into().unwrap()); + for (tof, count) in tof_counts.clone().into_iter().enumerate() { + if count > 0 { + let peak_bucket = PeakBucketBuilder::new(count); + peak_buckets.insert(tof as u32, peak_bucket); + } else { + continue; + } } - self.last_frame_index = frame_index; - self.frame_index_rt_pairs.push((frame_index, rt)); - // Make sure frames are inserted only once. - if self.frame_indices.contains_key(&frame_index) { - panic!("Frame index already inserted"); - // TODO make this an error ... + let out_rts = self.frame_rts.clone(); + let out_indices = self.frame_indices.clone(); + let mut added_peaks = 0; + + // TODO rewrite this as indices instead zipping the slices. + for ((((int_slice, tof_slice), scan_slice), frame_index), frame_rt) in self + .int_slices + .into_iter() + .zip(self.tof_slices.into_iter()) + .zip(self.scan_slices.into_iter()) + .zip(self.frame_indices.into_iter()) + .zip(self.frame_rts.into_iter()) + { + let rt = frame_rt as f32; + for ((inten, tof), scan) in int_slice + .into_iter() + .zip(tof_slice.into_iter()) + .zip(scan_slice.into_iter()) + { + peak_buckets + .get_mut(&tof) + .expect("Should have just added it...") + .add_peak(scan, inten, rt); + + added_peaks += 1; + } } - self.frame_indices.insert(frame_index, self.num_frames); - } - fn add_peak(&mut self, tof_index: u32, scan_index: usize, intensity: u32, frame_index: usize) { - if self.peak_bucket_builders.len() <= (tof_index + 1) as usize { - let num_to_add = (tof_index as usize - self.peak_bucket_builders.len()) + 20; - self.peak_bucket_builders - .extend((0..num_to_add).map(|_| None)); + if added_peaks != tot_peaks { + println!("TransposedQuadIndex::add_frame_slice failed at peak count check, expected: {}, real: {}", tot_peaks, added_peaks); + panic!("TransposedQuadIndex::add_frame_slice failed at peak count check"); } - if self.peak_bucket_builders[tof_index as usize].is_none() { - self.peak_bucket_builders[tof_index as usize] = Some(PeakBucketBuilder::new(20)); + + for (tof, count) in tof_counts.into_iter().enumerate() { + if count > 0 { + let curr_bucket = peak_buckets.get(&(tof as u32)).unwrap(); + let real_count = curr_bucket.intensities.len(); + if real_count != count { + println!("TransposedQuadIndex::build failed at tof bucket count check, expected: {}, real: {}", count, real_count); + println!("Bucket -> {:?}", curr_bucket); + + panic!("TransposedQuadIndex::build failed at tof bucket count check"); + }; + } else { + continue; + } } - self.peak_bucket_builders[tof_index as usize] - .as_mut() - .unwrap() - .add_peak(scan_index, intensity, frame_index); - } - fn build(self) -> TransposedQuadIndex { - let peak_buckets = self - .peak_bucket_builders - .into_iter() - .map(|x| match x { - Some(x) => Some(x.build(&self.frame_indices)), - None => None, - }) - .collect(); + let peak_bucket = BTreeMap::from_par_iter( + peak_buckets + .into_par_iter() + .map(|(tof, pb)| (tof, pb.build())), + ); TransposedQuadIndex { quad_settings: self.quad_settings, - frame_index_rt_pairs: self.frame_index_rt_pairs, - peak_buckets, + frame_rts: out_rts, + frame_indices: out_indices, + peak_buckets: peak_bucket, } } } @@ -568,16 +606,13 @@ impl TransposedQuadIndexBuilder { pub struct PeakInBucket { pub scan_index: usize, pub intensity: u32, - pub local_frame_index: u32, + pub retention_time: f32, } -type LocalFrameIndex = u32; - #[derive(Debug)] struct PeakBucket { intensities: Vec, - // TODO: consider using rts instead of local_frame_indices - local_frame_indices: Vec, + retention_times: Vec, scan_offsets: Vec, } @@ -585,10 +620,10 @@ impl Display for PeakBucket { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "PeakBucket: \n len={},\n local_frame_indices={},\n scan_offsets={},\n intensities={}", + "PeakBucket: \n len={},\n retention_times={},\n scan_offsets={},\n intensities={}", self.len(), glimpse_vec( - &self.local_frame_indices, + &self.retention_times, Some(GlimpseConfig { max_items: 10, padding: 2, @@ -615,53 +650,10 @@ impl Display for PeakBucket { } } -impl PeakBucket { - pub fn len(&self) -> usize { - self.intensities.len() - } - - pub fn query_peaks( - &self, - scan_range: Option<(usize, usize)>, - local_index_range: Option<(LocalFrameIndex, LocalFrameIndex)>, - ) -> impl Iterator + '_ { - let scan_range = match scan_range { - Some((scan_low, scan_high)) => { - (scan_low.max(0)..scan_high.min(self.scan_offsets.len() - 1)).into_iter() - } - None => 0..self.scan_offsets.len(), - }; - - let tmp = scan_range - .map(move |scan_index| { - let peak_index_start = self.scan_offsets[scan_index]; - let peak_index_end = self.scan_offsets[scan_index + 1]; - - let local_frame_slice = &self.local_frame_indices[peak_index_start..peak_index_end]; - let intensity_slice = &self.intensities[peak_index_start..peak_index_end]; - - local_frame_slice - .iter() - .zip(intensity_slice) - .filter(move |(lfi, _inten)| match local_index_range { - Some((low, high)) => &low <= *lfi && *lfi <= &high, - None => true, - }) - .map(move |(lfi, inten)| PeakInBucket { - scan_index, - intensity: *inten, - local_frame_index: *lfi, - }) - }) - .flatten(); - tmp - } -} - #[derive(Debug)] struct PeakBucketBuilder { intensities: Vec, - frame_indices: Vec, + retention_times: Vec, scan_offsets: Vec, } @@ -669,60 +661,107 @@ impl PeakBucketBuilder { fn new(capacity: usize) -> Self { Self { intensities: Vec::with_capacity(capacity), - frame_indices: Vec::with_capacity(capacity), + retention_times: Vec::with_capacity(capacity), scan_offsets: Vec::with_capacity(capacity), } } - fn add_peak(&mut self, scan_index: usize, intensity: u32, frame_index: usize) { + fn add_peak(&mut self, scan_index: usize, intensity: u32, retention_time: f32) { self.intensities.push(intensity); - self.frame_indices.push(frame_index); + self.retention_times.push(retention_time); self.scan_offsets.push(scan_index); } - fn build(mut self, index_mapping: &HashMap) -> PeakBucket { + fn build(mut self) -> PeakBucket { let mut indices = argsort_by(&self.scan_offsets, |x| *x); sort_by_indices_multi!( &mut indices, &mut self.scan_offsets, - &mut self.frame_indices, + &mut self.retention_times, &mut self.intensities ); - let compressed = compress_vec(&self.scan_offsets); - let local_frame_indices = self - .frame_indices - .iter() - .map(|x| index_mapping[x]) - .collect(); + // TODO consider if I really need to compress this. + let compressed = compress_vec(&self.scan_offsets); let out = PeakBucket { intensities: self.intensities, - local_frame_indices: local_frame_indices, + retention_times: self.retention_times, scan_offsets: compressed, }; - debug_assert!(out.verify()); + assert!(out.verify(), "PeakBucket::build failed at verify"); out } } impl PeakBucket { + pub fn len(&self) -> usize { + self.intensities.len() + } + + pub fn query_peaks( + &self, + scan_range: Option<(usize, usize)>, + rt_range: Option<(f32, f32)>, + ) -> impl Iterator + '_ { + let scan_range = match scan_range { + Some((scan_low, scan_high)) => { + (scan_low..scan_high.min(self.scan_offsets.len() - 1)).into_iter() + } + None => 0..self.scan_offsets.len(), + }; + let tmp = scan_range + .map(move |scan_index| { + let peak_index_start = self.scan_offsets[scan_index]; + let peak_index_end = self.scan_offsets[scan_index + 1]; + + (peak_index_start..peak_index_end) + .into_iter() + .map(move |peak_index| { + let retention_time = self.retention_times[peak_index]; + + match rt_range { + Some((low, high)) => { + if retention_time < low || retention_time > high { + return None; + } + } + None => {} + } + + let intensity = self.intensities[peak_index]; + Some(PeakInBucket { + scan_index, + intensity, + retention_time, + }) + }) + }) + .flatten() + .filter_map(|x| x); + tmp + } + fn verify(&self) -> bool { - if self.intensities.len() != self.local_frame_indices.len() { + if self.intensities.len() != self.retention_times.len() { + println!("PeakBucket::verify failed at length check"); return false; } match self.scan_offsets.last() { Some(last) => { - if *last != self.local_frame_indices.len() { + if *last != self.retention_times.len() { + println!("PeakBucket::verify failed at last scan check, too many scans"); return false; } } None => { + println!("PeakBucket::verify failed at last scan check, no scans"); return false; } } for i in 1..self.scan_offsets.len() { if self.scan_offsets[i] < self.scan_offsets[i - 1] { + println!("PeakBucket::verify failed at scan order check"); return false; } } @@ -737,6 +776,7 @@ impl From for RawPeak { scan_index: peak_in_quad.scan_index, tof_index: peak_in_quad.tof_index, intensity: peak_in_quad.intensity, + retention_time: peak_in_quad.retention_time, } } } @@ -754,7 +794,7 @@ impl IndexedData for QuadSplittedTransposedInd let out = fragment_query .mz_index_ranges - .par_iter() + .iter() .map(|tof_range| { self.query_peaks( *tof_range, @@ -797,7 +837,7 @@ impl IndexedData for QuadSplittedTransposedInd fragment_query .mz_index_ranges - .par_iter() + .iter() .map(|tof_range| { let mut out = Vec::new(); self.query_peaks( @@ -821,13 +861,15 @@ impl IndexedData for QuadSplittedTransposedInd aggregator: &mut [AG], ) { fragment_queries - .par_iter() - .zip(aggregator.par_iter_mut()) + .iter() + .zip(aggregator.iter_mut()) .for_each(|(fragment_query, agg)| { let precursor_mz_range = ( fragment_query.precursor_query.isolation_mz_range.0 as f64, - fragment_query.precursor_query.isolation_mz_range.0 as f64, + fragment_query.precursor_query.isolation_mz_range.1 as f64, ); + assert!(precursor_mz_range.0 <= precursor_mz_range.1); + assert!(precursor_mz_range.0 > 0.0); let scan_range = Some(fragment_query.precursor_query.mobility_index_range); let frame_index_range = Some(FrameRTTolerance::FrameIndex( fragment_query.precursor_query.frame_index_range, diff --git a/src/models/queries.rs b/src/models/queries.rs index 7cfe2e2..39e6eb7 100644 --- a/src/models/queries.rs +++ b/src/models/queries.rs @@ -1,3 +1,4 @@ +#[derive(Debug, Clone)] pub struct PrecursorIndexQuery { pub frame_index_range: (usize, usize), pub mz_index_range: (u32, u32), @@ -5,6 +6,7 @@ pub struct PrecursorIndexQuery { pub isolation_mz_range: (f32, f32), } +#[derive(Debug, Clone)] pub struct FragmentGroupIndexQuery { pub mz_index_ranges: Vec<(u32, u32)>, pub precursor_query: PrecursorIndexQuery, diff --git a/src/queriable_tims_data/queriable_tims_data.rs b/src/queriable_tims_data/queriable_tims_data.rs index 1ba8df3..60fec7a 100644 --- a/src/queriable_tims_data/queriable_tims_data.rs +++ b/src/queriable_tims_data/queriable_tims_data.rs @@ -1,4 +1,6 @@ +use log::{debug, info}; use std::rc::Rc; +use std::time::Instant; use crate::{ traits::aggregator, Aggregator, ElutionGroup, IndexedData, Tolerance, ToleranceAdapter, @@ -95,6 +97,7 @@ where TA: ToleranceAdapter, TL: Tolerance, { + let start = Instant::now(); let mut fragment_queries = Vec::with_capacity(elution_groups.len()); let mut aggregators = Vec::with_capacity(elution_groups.len()); @@ -106,6 +109,19 @@ where } indexed_data.add_query_multi_group(&fragment_queries, &mut aggregators); + let duration = start.elapsed(); + info!("Querying took {:#?}", duration); + + let microsecond_duration = duration.as_micros(); + let microseconds_per_query = microsecond_duration / elution_groups.len() as u128; + let queries_per_second = 1_000_000.0 / microseconds_per_query as f64; + + info!("That is {:#?} queries per second", queries_per_second); + debug!( + "That is {:#?} microseconds per query", + microseconds_per_query + ); + aggregators } diff --git a/src/traits/aggregator.rs b/src/traits/aggregator.rs index a8b175b..6de4c67 100644 --- a/src/traits/aggregator.rs +++ b/src/traits/aggregator.rs @@ -5,5 +5,5 @@ pub trait Aggregator: Send + Sync { fn add(&mut self, item: &I); fn fold(&mut self, item: Self); - fn finalize(&self) -> Self::Output; + fn finalize(self) -> Self::Output; } diff --git a/src/traits/tolerance.rs b/src/traits/tolerance.rs index 873b3be..4ee276c 100644 --- a/src/traits/tolerance.rs +++ b/src/traits/tolerance.rs @@ -88,11 +88,25 @@ impl Tolerance for DefaultTolerance { } fn quad_range(&self, precursor_mz: f64, precursor_charge: u8) -> (f32, f32) { + // Should this be a recoverable error? + if precursor_charge == 0 { + panic!("Precursor charge is 0, inputs: self: {:?}, precursor_mz: {:?}, precursor_charge: {:?}", self, precursor_mz, precursor_charge); + }; match self.quad { QuadTolerance::Absolute((low, high, num_isotopes)) => { let max_offset = (1.0 / precursor_charge as f32) * num_isotopes as f32; let f32_mz = precursor_mz as f32; - (f32_mz - low - max_offset, f32_mz + high + max_offset) + let mz_low = f32_mz - low - max_offset; + let mz_high = f32_mz + high + max_offset; + assert!(mz_low <= mz_high); + assert!( + mz_low > 0.0, + "Precursor mz is 0 or less, inputs: self: {:?}, precursor_mz: {:?}, precursor_charge: {:?}", + self, + precursor_mz, + precursor_charge + ); + (mz_low, mz_high) } } }