Skip to content

Commit

Permalink
feat: impl multi chromatogram query
Browse files Browse the repository at this point in the history
  • Loading branch information
jspaezp committed Sep 24, 2024
1 parent b925ef5 commit ff37d18
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 48 deletions.
20 changes: 11 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use std::time::Instant;
use timsquery::models::elution_group::ElutionGroup;
use timsquery::queriable_tims_data::queriable_tims_data::query_multi_group;
use timsquery::traits::tolerance::DefaultTolerance;
use timsquery::Aggregator;
use timsquery::{
models::aggregators::{
ChromatomobilogramStats, ExtractedIonChromatomobilogram, RawPeakIntensityAggregator,
RawPeakVectorAggregator,
ChromatomobilogramStats, ExtractedIonChromatomobilogram, MultiCMGStats,
MultiCMGStatsArrays, RawPeakIntensityAggregator, RawPeakVectorAggregator,
},
models::indices::raw_file_index::RawFileIndex,
models::indices::transposed_quad_index::QuadSplittedTransposedIndex,
Expand Down Expand Up @@ -146,6 +145,7 @@ pub enum PossibleAggregator {
RawPeakVectorAggregator,
ExtractedIonChromatomobilogram,
ChromatoMobilogramStat,
MultiCMGStats,
}

#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, clap::ValueEnum)]
Expand Down Expand Up @@ -225,19 +225,14 @@ pub fn execute_query(
macro_rules! execute_query_inner {
($index:expr, $agg:expr) => {
let tmp = query_multi_group(&$index, &$index, &tolerance, &elution_groups, &$agg);
// debug!("{:?}", tmp);

let start = Instant::now();
let mut out = Vec::with_capacity(tmp.len());
for (res, eg) in tmp.into_iter().zip(elution_groups) {
out.push(ElutionGroupResults {
elution_group: eg,
result: res.finalize(),
result: res,
});
}
let elapsed = start.elapsed();
println!("Finalizing query took {:#?}", elapsed);
// info!("{:#?}", out);

let put_path = std::path::Path::new(&output_path).join("results.json");
std::fs::create_dir_all(put_path.parent().unwrap()).unwrap();
Expand Down Expand Up @@ -276,6 +271,10 @@ pub fn execute_query(
let aggregator = ExtractedIonChromatomobilogram::new;
execute_query_inner!(index, aggregator);
}
PossibleAggregator::MultiCMGStats => {
let aggregator = MultiCMGStats::new;
execute_query_inner!(index, aggregator);
}
}
}
(PossibleIndex::RawFileIndex, aggregator) => {
Expand All @@ -297,6 +296,9 @@ pub fn execute_query(
let aggregator = ExtractedIonChromatomobilogram::new;
execute_query_inner!(index, aggregator);
}
PossibleAggregator::MultiCMGStats => {
panic!("Not Implemented!");
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/models/aggregators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ pub mod streaming_aggregator;

pub use raw_peak_agg::ChromatomobilogramStats;
pub use raw_peak_agg::ExtractedIonChromatomobilogram;
pub use raw_peak_agg::MultiCMGStats;
pub use raw_peak_agg::MultiCMGStatsArrays;
pub use raw_peak_agg::RawPeakIntensityAggregator;
pub use raw_peak_agg::RawPeakVectorAggregator;
73 changes: 58 additions & 15 deletions src/models/aggregators/raw_peak_agg/chromatogram_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,28 @@ impl Aggregator<RawPeak> for ExtractedIonChromatomobilogram {
}

// type MappingCollection<T1, T2> = BTreeMap<T1, T2>;
type MappingCollection<T1, T2> = HashMap<T1, T2>;
pub type MappingCollection<T1, T2> = HashMap<T1, T2>;

#[derive(Debug, Clone)]
pub struct MultiChromatomobilogramStats {
pub scan_tof_mapping:
MappingCollection<(usize, u32), (RunningStatsCalculator, RunningStatsCalculator)>,
pub id: u64,
pub struct ScanTofStatsCalculatorPair {
pub scan: RunningStatsCalculator,
pub tof: RunningStatsCalculator,
}

impl ScanTofStatsCalculatorPair {
pub fn new(intensity: u64, scan_index: usize, tof_index: u32) -> Self {
let scan_index = scan_index as f64;
let tof_index = tof_index as f64;

Self {
scan: RunningStatsCalculator::new(intensity, scan_index),
tof: RunningStatsCalculator::new(intensity, tof_index),
}
}
pub fn add(&mut self, intensity: u64, scan_index: usize, tof_index: u32) {
self.scan.add(scan_index as f64, intensity);
self.tof.add(tof_index as f64, intensity);
}
}

#[derive(Debug, Clone)]
Expand All @@ -81,7 +96,7 @@ pub struct ChromatomobilogramStats {
// In theory we can optimize this to make a single aggregator struct
// that shares the weight (intensity), since all will have the same weight
// and retention times.
pub scan_tof_mapping: MappingCollection<u32, (RunningStatsCalculator, RunningStatsCalculator)>,
pub scan_tof_mapping: MappingCollection<u32, ScanTofStatsCalculatorPair>,
pub id: u64,
}

Expand All @@ -94,7 +109,7 @@ impl ChromatomobilogramStats {
}
}

#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Default)]
pub struct ChromatomobilogramStatsArrays {
pub retention_time_miliseconds: Vec<u32>,
pub tof_index_means: Vec<f64>,
Expand All @@ -103,6 +118,28 @@ pub struct ChromatomobilogramStatsArrays {
pub scan_index_sds: Vec<f64>,
}

impl ChromatomobilogramStatsArrays {
// TODO use default instead of new everywhere ..
pub fn new() -> Self {
Self {
retention_time_miliseconds: Vec::new(),
tof_index_means: Vec::new(),
tof_index_sds: Vec::new(),
scan_index_means: Vec::new(),
scan_index_sds: Vec::new(),
}
}

pub fn fold(&mut self, other: Self) {
self.retention_time_miliseconds
.extend(other.retention_time_miliseconds);
self.tof_index_means.extend(other.tof_index_means);
self.tof_index_sds.extend(other.tof_index_sds);
self.scan_index_means.extend(other.scan_index_means);
self.scan_index_sds.extend(other.scan_index_sds);
}
}

impl Aggregator<RawPeak> for ChromatomobilogramStats {
type Output = ChromatomobilogramStatsArrays;

Expand All @@ -113,12 +150,12 @@ impl Aggregator<RawPeak> for ChromatomobilogramStats {
self.scan_tof_mapping
.entry(rt_miliseconds)
.and_modify(|curr| {
curr.0.add(peak.scan_index as f64, u64_intensity);
curr.1.add(peak.tof_index as f64, u64_intensity);
curr.add(u64_intensity, peak.scan_index, peak.tof_index);
})
.or_insert((
RunningStatsCalculator::new(u64_intensity, peak.scan_index as f64),
RunningStatsCalculator::new(u64_intensity, peak.tof_index as f64),
.or_insert(ScanTofStatsCalculatorPair::new(
u64_intensity,
peak.scan_index,
peak.tof_index,
));
}

Expand All @@ -127,10 +164,16 @@ impl Aggregator<RawPeak> for ChromatomobilogramStats {
let ((scan_means, scan_sds), (tof_means, tof_sds)): (VecTuple, VecTuple) = self
.scan_tof_mapping
.values()
.map(|(scan, tof)| {
.map(|pair| {
(
(scan.mean().unwrap(), scan.standard_deviation().unwrap()),
(tof.mean().unwrap(), tof.standard_deviation().unwrap()),
(
pair.scan.mean().unwrap(),
pair.scan.standard_deviation().unwrap(),
),
(
pair.tof.mean().unwrap(),
pair.tof.standard_deviation().unwrap(),
),
)
})
.unzip();
Expand Down
3 changes: 3 additions & 0 deletions src/models/aggregators/raw_peak_agg/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
pub mod chromatogram_agg;
pub mod multi_chromatogram_agg;
pub mod point_agg;

pub use chromatogram_agg::ChromatomobilogramStats;
pub use chromatogram_agg::ExtractedIonChromatomobilogram;
pub use multi_chromatogram_agg::MultiCMGStats;
pub use multi_chromatogram_agg::MultiCMGStatsArrays;
pub use point_agg::RawPeakIntensityAggregator;
pub use point_agg::RawPeakVectorAggregator;
85 changes: 85 additions & 0 deletions src/models/aggregators/raw_peak_agg/multi_chromatogram_agg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use super::chromatogram_agg::{
ChromatomobilogramStatsArrays, MappingCollection, ScanTofStatsCalculatorPair,
};
use crate::models::frames::raw_peak::RawPeak;
use crate::traits::aggregator::Aggregator;
use serde::Serialize;

#[derive(Debug, Clone)]
pub struct MultiCMGStats {
pub scan_tof_mapping: MappingCollection<(usize, u32), ScanTofStatsCalculatorPair>,
pub id: u64,
}

impl MultiCMGStats {
pub fn new(id: u64) -> Self {
Self {
scan_tof_mapping: MappingCollection::new(),
id,
}
}
}

#[derive(Debug, Clone, Serialize)]
pub struct MultiCMGStatsArrays {
pub transition_stats: MappingCollection<usize, ChromatomobilogramStatsArrays>,
id: u64,
}

impl Aggregator<(RawPeak, usize)> for MultiCMGStats {
type Output = MultiCMGStatsArrays;

fn add(&mut self, peak: &(RawPeak, usize)) {
let (peak, transition) = peak;
let u64_intensity = peak.intensity as u64;
let rt_miliseconds = (peak.retention_time * 1000.0) as u32;

self.scan_tof_mapping
.entry((*transition, rt_miliseconds))
.and_modify(|curr| {
curr.add(u64_intensity, peak.scan_index, peak.tof_index);
})
.or_insert(ScanTofStatsCalculatorPair::new(
u64_intensity,
peak.scan_index,
peak.tof_index,
));
}

fn finalize(self) -> MultiCMGStatsArrays {
let mut transition_stats = MappingCollection::new();
for ((transition, rt_ms), scan_tof_mapping) in self.scan_tof_mapping.into_iter() {
transition_stats
.entry(transition)
.and_modify(|curr: &mut ChromatomobilogramStatsArrays| {
curr.retention_time_miliseconds.push(rt_ms);
curr.scan_index_means
.push(scan_tof_mapping.scan.mean().unwrap());
curr.scan_index_sds
.push(scan_tof_mapping.scan.standard_deviation().unwrap());
curr.tof_index_means
.push(scan_tof_mapping.tof.mean().unwrap());
curr.tof_index_sds
.push(scan_tof_mapping.tof.standard_deviation().unwrap());
})
.or_insert_with(|| {
let mut out = ChromatomobilogramStatsArrays::new();
out.retention_time_miliseconds.push(rt_ms);
out.scan_index_means
.push(scan_tof_mapping.scan.mean().unwrap());
out.scan_index_sds
.push(scan_tof_mapping.scan.standard_deviation().unwrap());
out.tof_index_means
.push(scan_tof_mapping.tof.mean().unwrap());
out.tof_index_sds
.push(scan_tof_mapping.tof.standard_deviation().unwrap());
out
});
}

MultiCMGStatsArrays {
transition_stats,
id: self.id,
}
}
}
13 changes: 0 additions & 13 deletions src/models/aggregators/raw_peak_agg/point_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ impl Aggregator<RawPeak> for RawPeakIntensityAggregator {
self.intensity += peak.intensity as u64;
}

fn fold(&mut self, other: Self) {
self.intensity += other.intensity;
}

fn finalize(self) -> u64 {
self.intensity
}
Expand Down Expand Up @@ -74,15 +70,6 @@ impl Aggregator<RawPeak> for RawPeakVectorAggregator {
self.peaks.retention_times.push(peak.retention_time);
}

fn fold(&mut self, other: Self) {
self.peaks.scans.extend(other.peaks.scans);
self.peaks.tofs.extend(other.peaks.tofs);
self.peaks.intensities.extend(other.peaks.intensities);
self.peaks
.retention_times
.extend(other.peaks.retention_times);
}

fn finalize(self) -> RawPeakVectorArrays {
self.peaks
}
Expand Down
Loading

0 comments on commit ff37d18

Please sign in to comment.