From 1e141c3abb6993ea482d360c02e1a053b75c423a Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Wed, 27 Nov 2024 09:15:35 +0000 Subject: [PATCH 01/20] improvements in speed in mzml stats --- quantmsutils/mzml/mzml_statistics.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index 1b3d706..b8cd3df 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -55,6 +55,7 @@ def parse_mzml(file_name: str, file_columns: list, id_only: bool = False): exp = MSExperiment() acquisition_datetime = exp.getDateTime().get() MzMLFile().load(file_name, exp) + scan_pattern = re.compile(r"[scan|spectrum]=(\d+)") for spectrum in exp: id_ = spectrum.getNativeID() ms_level = spectrum.getMSLevel() @@ -121,7 +122,7 @@ def parse_mzml(file_name: str, file_columns: list, id_only: bool = False): if id_only and ms_level == 2: psm_part_info.append( [ - re.findall(r"[scan|spectrum]=(\d+)", id_)[0], + scan_pattern.findall(id_)[0], ms_level, mz_array, intensity_array, From 58fc54b87bbaaaf260754d5ebac8e1fdc88fe088 Mon Sep 17 00:00:00 2001 From: Chengxin Dai <37200167+daichengxin@users.noreply.github.com> Date: Thu, 28 Nov 2024 16:05:52 +0800 Subject: [PATCH 02/20] Fixed ms2rescore bugs --- quantmsutils/psm/psm_conversion.py | 2 + quantmsutils/rescoring/ms2rescore.py | 102 ++++++++++++++++++++++++++- 2 files changed, 102 insertions(+), 2 deletions(-) diff --git a/quantmsutils/psm/psm_conversion.py b/quantmsutils/psm/psm_conversion.py index 232f3af..9b06705 100644 --- a/quantmsutils/psm/psm_conversion.py +++ b/quantmsutils/psm/psm_conversion.py @@ -141,6 +141,8 @@ def convert_psm( if hit.metaValueExists("MS:1001491"): global_qvalue = hit.getMetaValue("MS:1001491") + elif hit.metaValueExists("q-value"): + global_qvalue = hit.getMetaValue("q-value") charge = hit.getCharge() peptidoform = hit.getSequence().toString() diff --git a/quantmsutils/rescoring/ms2rescore.py b/quantmsutils/rescoring/ms2rescore.py index 030c59c..2cb27e3 100644 --- a/quantmsutils/rescoring/ms2rescore.py +++ b/quantmsutils/rescoring/ms2rescore.py @@ -5,17 +5,110 @@ import importlib.resources import json import logging -from typing import List import click import pyopenms as oms from ms2rescore import package_data, rescore from psm_utils import PSMList from psm_utils.io.idxml import IdXMLReader, IdXMLWriter +from typing import Iterable, List, Tuple, Union +from pathlib import Path +from psm_utils.psm import PSM logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +class IDXMLReaderPatch(IdXMLReader): + def __init__(self, filename: Union[Path, str], *args, **kwargs) -> None: + """ + Patch Reader for idXML files based on IDXMLReader. + + Parameters + ---------- + filename: str, pathlib.Path + Path to idXML file. + + Examples + -------- + """ + super().__init__(filename, *args, **kwargs) + self.protein_ids, self.peptide_ids = self._parse_idxml() + self.user_params_metadata = self._get_userparams_metadata(self.peptide_ids[0].getHits()[0]) + self.rescoring_features = self._get_rescoring_features(self.peptide_ids[0].getHits()[0]) + self.skip_invalid_psm = 0 + + def __iter__(self) -> Iterable[PSM]: + """ + Iterate over file and return PSMs one-by-one. + """ + for peptide_id in self.peptide_ids: + for peptide_hit in peptide_id.getHits(): + psm = self._parse_psm(self.protein_ids, peptide_id, peptide_hit) + if psm is not None: + yield psm + else: + self.skip_invalid_psm += 1 + + def _parse_psm( + self, + protein_ids: oms.ProteinIdentification, + peptide_id: oms.PeptideIdentification, + peptide_hit: oms.PeptideHit, + ) -> PSM: + """ + Parse idXML :py:class:`~pyopenms.PeptideHit` to :py:class:`~psm_utils.psm.PSM`. + + Uses additional information from :py:class:`~pyopenms.ProteinIdentification` and + :py:class:`~pyopenms.PeptideIdentification` to annotate parameters of the + :py:class:`~psm_utils.psm.PSM` object. + """ + peptidoform = self._parse_peptidoform( + peptide_hit.getSequence().toString(), peptide_hit.getCharge() + ) + # This is needed to calculate a qvalue before rescoring the PSMList + peptide_id_metadata = { + "idxml:score_type": str(peptide_id.getScoreType()), + "idxml:higher_score_better": str(peptide_id.isHigherScoreBetter()), + "idxml:significance_threshold": str(peptide_id.getSignificanceThreshold()), + } + peptide_hit_metadata = { + key: peptide_hit.getMetaValue(key) for key in self.user_params_metadata + } + + # Get search engines score features and check valueExits + rescoring_features = {} + for key in self.rescoring_features: + feature = peptide_hit.metaValueExists(key) + if not feature: + print(type(feature)) + return None + else: + rescoring_features[key] = float(peptide_hit.getMetaValue(key)) + + return PSM( + peptidoform=peptidoform, + spectrum_id=peptide_id.getMetaValue("spectrum_reference"), + run=self._get_run(protein_ids, peptide_id), + is_decoy=self._is_decoy(peptide_hit), + score=peptide_hit.getScore(), + precursor_mz=peptide_id.getMZ(), + retention_time=peptide_id.getRT(), + # NOTE: ion mobility will be supported by OpenMS in the future + protein_list=[ + accession.decode() for accession in peptide_hit.extractProteinAccessionsSet() + ], + rank=peptide_hit.getRank() + 1, # 0-based to 1-based + source="idXML", + # Storing proforma notation of peptidoform and UNIMOD peptide sequence for mapping back + # to original sequence in writer + provenance_data={str(peptidoform): peptide_hit.getSequence().toString()}, + # Store metadata of PeptideIdentification and PeptideHit objects + metadata={**peptide_id_metadata, **peptide_hit_metadata}, + + rescoring_features=rescoring_features, + ) + + def parse_cli_arguments_to_config( config_file: str = None, feature_generators: str = None, @@ -119,9 +212,14 @@ def parse_cli_arguments_to_config( def rescore_idxml(input_file, output_file, config) -> None: """Rescore PSMs in an idXML file and keep other information unchanged.""" # Read PSMs - reader = IdXMLReader(input_file) + reader = IDXMLReaderPatch(input_file) psm_list = reader.read_file() + if reader.skip_invalid_psm != 0: + logging.warning( + f"Removed {reader.skip_invalid_psm} PSMs without search engine features!" + ) + # Rescore rescore(config, psm_list) From 8d6bc7d4a1c1bd943a4b610b30f0fc23838badd2 Mon Sep 17 00:00:00 2001 From: Chengxin Dai <37200167+daichengxin@users.noreply.github.com> Date: Thu, 28 Nov 2024 16:12:54 +0800 Subject: [PATCH 03/20] Update ms2rescore.py --- quantmsutils/rescoring/ms2rescore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quantmsutils/rescoring/ms2rescore.py b/quantmsutils/rescoring/ms2rescore.py index 2cb27e3..47a5a3b 100644 --- a/quantmsutils/rescoring/ms2rescore.py +++ b/quantmsutils/rescoring/ms2rescore.py @@ -11,7 +11,7 @@ from ms2rescore import package_data, rescore from psm_utils import PSMList from psm_utils.io.idxml import IdXMLReader, IdXMLWriter -from typing import Iterable, List, Tuple, Union +from typing import Iterable, List, Union from pathlib import Path from psm_utils.psm import PSM From ad2ea8083956933aba7e5c6ad76c8f328680b714 Mon Sep 17 00:00:00 2001 From: Chengxin Dai <37200167+daichengxin@users.noreply.github.com> Date: Thu, 28 Nov 2024 20:15:45 +0800 Subject: [PATCH 04/20] Update ms2rescore.py --- quantmsutils/rescoring/ms2rescore.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/quantmsutils/rescoring/ms2rescore.py b/quantmsutils/rescoring/ms2rescore.py index 47a5a3b..4c46809 100644 --- a/quantmsutils/rescoring/ms2rescore.py +++ b/quantmsutils/rescoring/ms2rescore.py @@ -40,6 +40,27 @@ def __init__(self, filename: Union[Path, str], *args, **kwargs) -> None: def __iter__(self) -> Iterable[PSM]: """ Iterate over file and return PSMs one-by-one. + Test cases will: + + Input PSM 1: PeptideHit with metavalue + "MSGF:ScoreRatio" value="0.212121212121212"/> + "MSGF:Energy" value="130.0"/> + "MSGF:lnEValue" value="-3.603969939390662"/> + "MSGF:lnExplainedIonCurrentRatio" value="-0.881402756873971"/> + "MSGF:lnNTermIonCurrentRatio" value="-1.931878317286471"/> + "MSGF:lnCTermIonCurrentRatio" value="-1.311462733724937"/> + "MSGF:lnMS2IonCurrent" value="9.702930189540499"/> + "MSGF:MeanErrorTop7" value="259.986879999999985"/> + "MSGF:sqMeanErrorTop7" value="6.75931777721344e04"/> + "MSGF:StdevErrorTop7" value="143.678020000000004"/> + PSM2: PeptideHit No above metaValue + + Run: + reader = IDXMLReaderPatch(input_file) + psm_list = reader.read_file() + + psm_list: return [PSM 1] + """ for peptide_id in self.peptide_ids: for peptide_hit in peptide_id.getHits(): @@ -80,7 +101,6 @@ def _parse_psm( for key in self.rescoring_features: feature = peptide_hit.metaValueExists(key) if not feature: - print(type(feature)) return None else: rescoring_features[key] = float(peptide_hit.getMetaValue(key)) From 8d3e0ad761810d223fdaccc17cd6c1193cf4a7aa Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Fri, 29 Nov 2024 19:18:20 +0000 Subject: [PATCH 05/20] improvements in speed in mzml stats --- pyproject.toml | 2 +- quantmsutils/__init__.py | 2 +- quantmsutils/mzml/mzml_statistics.py | 423 +++++++++++++++------------ recipe/meta.yaml | 2 +- 4 files changed, 231 insertions(+), 198 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ca5d136..05c39ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "quantms-utils" description = "Python scripts and helpers for the quantMS workflow" readme = "README.md" license = "MIT" -version = "0.0.15" +version = "0.0.16" authors = [ "Yasset Perez-Riverol ", "Dai Chengxin ", diff --git a/quantmsutils/__init__.py b/quantmsutils/__init__.py index 6561790..d62d967 100644 --- a/quantmsutils/__init__.py +++ b/quantmsutils/__init__.py @@ -1 +1 @@ -__version__ = "0.0.15" +__version__ = "0.0.16" diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index b8cd3df..1ef8bf5 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -1,20 +1,35 @@ import re import sqlite3 from pathlib import Path +from typing import Optional import click +import numpy as np import pandas as pd -import pyarrow +import pyarrow as pa +import pyarrow.parquet as pq from pyopenms import MSExperiment, MzMLFile @click.command("mzmlstats") -@click.option("--ms_path", type=click.Path(exists=True)) +@click.option("--ms_path", type=click.Path(exists=True), required=True) @click.option( - "--id_only", is_flag=True, help="Generate a csv with the spectrum id and the peaks" + "--id_only", is_flag=True, + help="Generate a csv with the spectrum id and the peaks" +) +@click.option( + "--batch_size", + type=int, + default=10000, + help="Number of rows to write in each batch" ) @click.pass_context -def mzml_statistics(ctx, ms_path: str, id_only: bool = False) -> None: +def mzml_statistics( + ctx, + ms_path: str, + id_only: bool = False, + batch_size: int = 10000 +) -> None: """ The mzml_statistics function parses mass spectrometry data files, either in .mzML or Bruker .d formats, to extract and compile a set of statistics about the spectra contained within. @@ -29,211 +44,229 @@ def mzml_statistics(ctx, ms_path: str, id_only: bool = False) -> None: peaks data for MS level 2 spectra. """ - file_columns = [ - "SpectrumID", - "MSLevel", - "Charge", - "MS_peaks", - "Base_Peak_Intensity", - "Summed_Peak_Intensities", - "Retention_Time", - "Exp_Mass_To_Charge", - "AcquisitionDateTime", - ] + schema = pa.schema([ + pa.field("SpectrumID", pa.string(), nullable=True), + pa.field("MSLevel", pa.float64(), nullable=True), + pa.field("Charge", pa.float64(), nullable=True), + pa.field("MS_peaks", pa.float64(), nullable=True), + pa.field("Base_Peak_Intensity", pa.float64(), nullable=True), + pa.field("Summed_Peak_Intensities", pa.float64(), nullable=True), + pa.field("Retention_Time", pa.float64(), nullable=True), + pa.field("Exp_Mass_To_Charge", pa.float64(), nullable=True), + pa.field("AcquisitionDateTime", pa.string(), nullable=True), + ]) - def parse_mzml(file_name: str, file_columns: list, id_only: bool = False): + def batch_write_mzml( + file_name: str, + parquet_schema: pa.Schema, + output_path: str, + id_only: bool = False, + batch_size: int = 10000 + ) -> Optional[str]: """ Parse mzML file and return a pandas DataFrame with the information. If id_only is True, it will also save a csv. @param file_name: The file name of the mzML file - @param file_columns: The columns of the DataFrame @param id_only: If True, it will save a csv with the spectrum id, mz and intensity @return: A pandas DataFrame with the information of the mzML file """ - - info = [] - psm_part_info = [] exp = MSExperiment() - acquisition_datetime = exp.getDateTime().get() MzMLFile().load(file_name, exp) + acquisition_datetime = exp.getDateTime().get() scan_pattern = re.compile(r"[scan|spectrum]=(\d+)") - for spectrum in exp: - id_ = spectrum.getNativeID() - ms_level = spectrum.getMSLevel() - rt = spectrum.getRT() if spectrum.getRT() else None - - peaks_tuple = spectrum.get_peaks() - peak_per_ms = len(peaks_tuple[0]) - - if not spectrum.metaValueExists("base peak intensity"): - bpc = max(peaks_tuple[1]) if len(peaks_tuple[1]) > 0 else None - else: - bpc = spectrum.getMetaValue("base peak intensity") - - if not spectrum.metaValueExists("total ion current"): - tic = sum(peaks_tuple[1]) if len(peaks_tuple[1]) > 0 else None - else: - tic = spectrum.getMetaValue("total ion current") - - if ms_level == 1: - info_list = [ - id_, - ms_level, - None, - peak_per_ms, - bpc, - tic, - rt, - None, - acquisition_datetime, - ] - elif ms_level == 2: - charge_state = spectrum.getPrecursors()[0].getCharge() - emz = ( - spectrum.getPrecursors()[0].getMZ() - if spectrum.getPrecursors()[0].getMZ() - else None + + # Prepare parquet writer + parquet_writer = None + batch_data = [] + psm_parts = [] + + try: + for spectrum in exp: + # Peak extraction + peaks = spectrum.get_peaks() + mz_array, intensity_array = peaks[0], peaks[1] + + # Compute peaks efficiently + peak_per_ms = len(mz_array) + base_peak_intensity = float(np.max(intensity_array)) if peak_per_ms > 0 else None + total_intensity = float(np.sum(intensity_array)) if peak_per_ms > 0 else None + + # Metadata extraction + ms_level = spectrum.getMSLevel() + rt = spectrum.getRT() + + if ms_level == 2: + precursor = spectrum.getPrecursors()[0] + charge_state = precursor.getCharge() + exp_mz = precursor.getMZ() + + if id_only: + scan_id = scan_pattern.findall(spectrum.getNativeID())[0] + psm_parts.append([ + scan_id, ms_level, + mz_array.tolist(), + intensity_array.tolist() + ]) + + row_data = { + "SpectrumID": spectrum.getNativeID(), + "MSLevel": float(ms_level), + "Charge": float(charge_state) if charge_state is not None else None, + "MS_peaks": float(peak_per_ms), + "Base_Peak_Intensity": float(base_peak_intensity) if base_peak_intensity is not None else None, + "Summed_Peak_Intensities": float(total_intensity) if total_intensity is not None else None, + "Retention_Time": float(rt), + "Exp_Mass_To_Charge": float(exp_mz) if exp_mz is not None else None, + "AcquisitionDateTime": str(acquisition_datetime) + } + elif ms_level == 1: + row_data = { + "SpectrumID": spectrum.getNativeID(), + "MSLevel": float(ms_level), + "Charge": None, + "MS_peaks": float(peak_per_ms), + "Base_Peak_Intensity": float(base_peak_intensity) if base_peak_intensity is not None else None, + "Summed_Peak_Intensities": float(total_intensity) if total_intensity is not None else None, + "Retention_Time": float(rt), + "Exp_Mass_To_Charge": None, + "AcquisitionDateTime": str(acquisition_datetime) + } + else: + continue + + batch_data.append(row_data) + + # Write batch when it reaches specified size + if len(batch_data) >= batch_size: + batch_table = pa.Table.from_pylist(batch_data, schema=parquet_schema) + + if parquet_writer is None: + parquet_writer = pq.ParquetWriter(where=output_path, schema=parquet_schema, compression='gzip') + + parquet_writer.write_table(batch_table) + batch_data = [] + + # Write any remaining data + if batch_data: + batch_table = pa.Table.from_pylist(batch_data, schema=parquet_schema) + if parquet_writer is None: + parquet_writer = pq.ParquetWriter(where=output_path, schema=parquet_schema, compression='gzip') + parquet_writer.write_table(batch_table) + + # Write spectrum data if id_only + if id_only and psm_parts: + spectrum_table = pa.Table.from_pylist( + psm_parts, + schema=pa.schema([ + ('scan', pa.string()), + ('ms_level', pa.int32()), + ('mz', pa.list_(pa.float64())), + ('intensity', pa.list_(pa.float64())) + ]) ) - info_list = [ - id_, - ms_level, - charge_state, - peak_per_ms, - bpc, - tic, - rt, - emz, - acquisition_datetime, - ] - mz_array = peaks_tuple[0] - intensity_array = peaks_tuple[1] - else: - info_list = [ - id_, - ms_level, - None, - None, - None, - None, - rt, - None, - acquisition_datetime, - ] - - if id_only and ms_level == 2: - psm_part_info.append( - [ - scan_pattern.findall(id_)[0], - ms_level, - mz_array, - intensity_array, - ] + pq.write_table( + spectrum_table, + f"{Path(file_name).stem}_spectrum_df.parquet", + compression='gzip' ) - info.append(info_list) - - if id_only and len(psm_part_info) > 0: - pd.DataFrame( - psm_part_info, columns=["scan", "ms_level", "mz", "intensity"] - ).to_parquet( - f"{Path(ms_path).stem}_spectrum_df.parquet", - index=False, - compression="gzip", - ) - return pd.DataFrame(info, columns=file_columns) + if parquet_writer is not None: + parquet_writer.close() + return output_path - def parse_bruker_d(file_name: str, file_columns: list): + finally: + if parquet_writer: + parquet_writer.close() + + def batch_write_bruker_d( + file_name: str, + output_path: str, + parquet_schema: pa.Schema, + batch_size: int = 10000 + ) -> str: + """ + Batch processing and writing of Bruker .d files. + """ sql_filepath = f"{file_name}/analysis.tdf" - if not Path(sql_filepath).exists(): - msg = f"File '{sql_filepath}' not found" - raise FileNotFoundError(msg) - conn = sqlite3.connect(sql_filepath) - c = conn.cursor() - - datetime_cmd = ( - "SELECT Value FROM GlobalMetadata WHERE key='AcquisitionDateTime'" - ) - acquisition_date_time = c.execute(datetime_cmd).fetchall()[0][0] - - df = pd.read_sql_query( - "SELECT Id, MsMsType, NumPeaks, MaxIntensity, SummedIntensities, Time FROM frames", - conn, - ) - df["AcquisitionDateTime"] = acquisition_date_time - - # {8:'DDA-PASEF', 9:'DIA-PASEF'} - if 8 in df["MsMsType"].values: - mslevel_map = {0: 1, 8: 2} - elif 9 in df["MsMsType"].values: - mslevel_map = {0: 1, 9: 2} - else: - msg = f"Unrecognized ms type '{df['MsMsType'].values}'" - raise ValueError(msg) - df["MsMsType"] = df["MsMsType"].map(mslevel_map) - try: - # This line raises an sqlite error if the table does not exist - _ = conn.execute("SELECT * from Precursors LIMIT 1").fetchall() - precursor_df = pd.read_sql_query("SELECT * from Precursors", conn) - except sqlite3.OperationalError as e: - if "no such table: Precursors" in str(e): - print( - f"No precursors recorded in {file_name}, This is normal for DIA data." - ) - precursor_df = pd.DataFrame() - else: - raise - - if len(df) == len(precursor_df): - df = pd.concat([df, precursor_df["Charge", "MonoisotopicMz"]], axis=1) - df["Charge"] = df["Charge"].fillna(0) - else: - df[["Charge", "Exp_Mass_To_Charge"]] = None, None - - df = df[ - [ - "Id", - "MsMsType", - "Charge", - "NumPeaks", - "MaxIntensity", - "SummedIntensities", - "Time", - "Exp_Mass_To_Charge", - "AcquisitionDateTime", - ] - ] - df.columns = pd.Index(file_columns) - - return df - - if not (Path(ms_path).exists()): - print(f"Not found '{ms_path}', trying to find alias") - ms_path_path = Path(ms_path) - path_stem = str(ms_path_path.stem) - candidates = ( - list(ms_path_path.parent.glob("*.d")) - + list(ms_path_path.parent.glob("*.mzml")) - + list(ms_path_path.parent.glob("*.mzML")) - ) - - candidates = [c for c in candidates if path_stem in str(c)] - - if len(candidates) == 1: - ms_path = str(candidates[0].resolve()) - else: - raise FileNotFoundError() - - if Path(ms_path).suffix == ".d" and Path(ms_path).is_dir(): - ms_df = parse_bruker_d(ms_path, file_columns) - elif Path(ms_path).suffix in [".mzML", ".mzml"]: - ms_df = parse_mzml(ms_path, file_columns, id_only) + with sqlite3.connect(sql_filepath) as conn: + # Retrieve acquisition datetime + acquisition_date_time = conn.execute( + "SELECT Value FROM GlobalMetadata WHERE key='AcquisitionDateTime'" + ).fetchone()[0] + + # Set up parquet writer + parquet_writer = pq.ParquetWriter( + output_path, + parquet_schema, + compression='gzip' + ) + + try: + # Stream data in batches + for chunk in pd.read_sql_query( + """ + SELECT + Id, + CASE + WHEN MsMsType IN (8, 9) THEN 2 + WHEN MsMsType = 0 THEN 1 + ELSE NULL + END as MsMsType, + NumPeaks, + MaxIntensity, + SummedIntensities, + Time, + Charge, + MonoisotopicMz + FROM frames + """, + conn, + chunksize=batch_size + ): + # Prepare batch + chunk['AcquisitionDateTime'] = acquisition_date_time + + # Convert to pyarrow table and write + batch_table = pa.Table.from_pandas(chunk, schema=parquet_schema) + parquet_writer.write_table(batch_table) + + finally: + parquet_writer.close() + + return output_path + + # Resolve file path + ms_path = _resolve_ms_path(ms_path) + output_path = f"{Path(ms_path).stem}_ms_info.parquet" + + # Choose processing method based on file type + if Path(ms_path).suffix == ".d": + batch_write_bruker_d(file_name=ms_path, output_path=output_path, parquet_schema=schema, batch_size=batch_size) + elif Path(ms_path).suffix.lower() in [".mzml"]: + batch_write_mzml(file_name=ms_path, parquet_schema=schema, output_path=output_path, id_only=id_only, batch_size=batch_size) else: - msg = f"Unrecognized or the mass spec file '{ms_path}' do not exist" - raise RuntimeError(msg) - - ms_df.to_parquet( - f"{Path(ms_path).stem}_ms_info.parquet", - engine="pyarrow", - index=False, - compression="gzip", - ) + raise RuntimeError(f"Unsupported file type: {ms_path}") + +def _resolve_ms_path(ms_path: str) -> str: + """ + Resolve mass spectrometry file path with improved candidate search. + """ + path_obj = Path(ms_path) + if path_obj.exists(): + return str(path_obj) + + candidates = list(path_obj.parent.glob(f"{path_obj.stem}*")) + valid_extensions = {'.d', '.mzml', '.mzML'} + candidates = [ + str(c.resolve()) + for c in candidates + if c.suffix.lower() in valid_extensions + ] + + if len(candidates) == 1: + return candidates[0] + + raise FileNotFoundError(f"No unique file found for {ms_path}") + +if __name__ == "__main__": + mzml_statistics() \ No newline at end of file diff --git a/recipe/meta.yaml b/recipe/meta.yaml index 2b3fb7f..6b076ba 100644 --- a/recipe/meta.yaml +++ b/recipe/meta.yaml @@ -1,7 +1,7 @@ # recipe/meta.yaml package: name: quantms-utils - version: "0.0.15" + version: "0.0.16" source: path: ../ From 5154daf3d7e63028a7d116124e4558069929fd9e Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Sat, 30 Nov 2024 08:53:40 +0000 Subject: [PATCH 06/20] improve to streaming. --- quantmsutils/mzml/mzml_statistics.py | 248 ++++++++++++++------------- 1 file changed, 130 insertions(+), 118 deletions(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index 1ef8bf5..d2f46be 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -8,9 +8,128 @@ import pandas as pd import pyarrow as pa import pyarrow.parquet as pq -from pyopenms import MSExperiment, MzMLFile +from pyopenms import MzMLFile +class BatchWritingConsumer: + def __init__(self, parquet_schema, output_path, batch_size=10000, id_only=False): + self.parquet_schema = parquet_schema + self.output_path = output_path + self.batch_size = batch_size + self.id_only = id_only + self.batch_data = [] + self.psm_parts = [] + self.parquet_writer = None + self.acquisition_datetime = None + self.scan_pattern = re.compile(r"[scan|spectrum]=(\d+)") + + def setExperimentalSettings(self, settings): + # Extract acquisition date/time from experimental settings + self.acquisition_datetime = settings.getDateTime().get() + + def setExpectedSize(self, a, b): + pass + + def consumeChromatogram(self, chromatogram): + # Optionally handle chromatograms if needed + pass + + def consumeSpectrum(self, spectrum): + """ + Consume spectrum data and write to parquet file. + :param spectrum: spectrum data + :return: + """ + + peaks = spectrum.get_peaks() + mz_array, intensity_array = peaks[0], peaks[1] + peak_per_ms = len(mz_array) + base_peak_intensity = float(np.max(intensity_array)) if peak_per_ms > 0 else None + total_intensity = float(np.sum(intensity_array)) if peak_per_ms > 0 else None + ms_level = spectrum.getMSLevel() + rt = spectrum.getRT() + + if ms_level == 2: + precursor = spectrum.getPrecursors()[0] + charge_state = precursor.getCharge() + exp_mz = precursor.getMZ() + + if self.id_only: + scan_id = self.scan_pattern.findall(spectrum.getNativeID())[0] + self.psm_parts.append([ + scan_id, ms_level, + mz_array.tolist(), + intensity_array.tolist() + ]) + + row_data = { + "SpectrumID": spectrum.getNativeID(), + "MSLevel": float(ms_level), + "Charge": float(charge_state) if charge_state is not None else None, + "MS_peaks": float(peak_per_ms), + "Base_Peak_Intensity": float(base_peak_intensity) if base_peak_intensity is not None else None, + "Summed_Peak_Intensities": float(total_intensity) if total_intensity is not None else None, + "Retention_Time": float(rt), + "Exp_Mass_To_Charge": float(exp_mz) if exp_mz is not None else None, + "AcquisitionDateTime": str(self.acquisition_datetime) + } + elif ms_level == 1: + row_data = { + "SpectrumID": spectrum.getNativeID(), + "MSLevel": float(ms_level), + "Charge": None, + "MS_peaks": float(peak_per_ms), + "Base_Peak_Intensity": float(base_peak_intensity) if base_peak_intensity is not None else None, + "Summed_Peak_Intensities": float(total_intensity) if total_intensity is not None else None, + "Retention_Time": float(rt), + "Exp_Mass_To_Charge": None, + "AcquisitionDateTime": str(self.acquisition_datetime) + } + else: + return + + self.batch_data.append(row_data) + + # Write batch when it reaches specified size + if len(self.batch_data) >= self.batch_size: + self._write_batch() + + def _write_batch(self): + batch_table = pa.Table.from_pylist(self.batch_data, schema=self.parquet_schema) + + if self.parquet_writer is None: + self.parquet_writer = pq.ParquetWriter( + where=self.output_path, schema=self.parquet_schema, compression="gzip" + ) + + self.parquet_writer.write_table(batch_table) + self.batch_data = [] + + def finalize(self): + # Write remaining data + if self.batch_data: + self._write_batch() + + # Write spectrum data if id_only + if self.id_only and self.psm_parts: + spectrum_table = pa.Table.from_pylist( + self.psm_parts, + schema=pa.schema([ + ("scan", pa.string()), + ("ms_level", pa.int32()), + ("mz", pa.list_(pa.float64())), + ("intensity", pa.list_(pa.float64())) + ]) + ) + pq.write_table( + spectrum_table, + f"{Path(self.output_path).stem}_spectrum_df.parquet", + compression="gzip" + ) + + if self.parquet_writer: + self.parquet_writer.close() + @click.command("mzmlstats") @click.option("--ms_path", type=click.Path(exists=True), required=True) @click.option( @@ -56,126 +175,19 @@ def mzml_statistics( pa.field("AcquisitionDateTime", pa.string(), nullable=True), ]) - def batch_write_mzml( - file_name: str, - parquet_schema: pa.Schema, - output_path: str, - id_only: bool = False, - batch_size: int = 10000 - ) -> Optional[str]: + def batch_write_mzml_streaming(file_name: str, parquet_schema: pa.Schema, output_path: str, id_only: bool = False, + batch_size: int = 10000) -> Optional[str]: """ - Parse mzML file and return a pandas DataFrame with the information. If id_only is True, it will also save a csv. - @param file_name: The file name of the mzML file - @param id_only: If True, it will save a csv with the spectrum id, mz and intensity - @return: A pandas DataFrame with the information of the mzML file + Parse mzML file in a streaming manner and write to Parquet. """ - exp = MSExperiment() - MzMLFile().load(file_name, exp) - acquisition_datetime = exp.getDateTime().get() - scan_pattern = re.compile(r"[scan|spectrum]=(\d+)") - - # Prepare parquet writer - parquet_writer = None - batch_data = [] - psm_parts = [] - + consumer = BatchWritingConsumer(parquet_schema, output_path, batch_size, id_only) try: - for spectrum in exp: - # Peak extraction - peaks = spectrum.get_peaks() - mz_array, intensity_array = peaks[0], peaks[1] - - # Compute peaks efficiently - peak_per_ms = len(mz_array) - base_peak_intensity = float(np.max(intensity_array)) if peak_per_ms > 0 else None - total_intensity = float(np.sum(intensity_array)) if peak_per_ms > 0 else None - - # Metadata extraction - ms_level = spectrum.getMSLevel() - rt = spectrum.getRT() - - if ms_level == 2: - precursor = spectrum.getPrecursors()[0] - charge_state = precursor.getCharge() - exp_mz = precursor.getMZ() - - if id_only: - scan_id = scan_pattern.findall(spectrum.getNativeID())[0] - psm_parts.append([ - scan_id, ms_level, - mz_array.tolist(), - intensity_array.tolist() - ]) - - row_data = { - "SpectrumID": spectrum.getNativeID(), - "MSLevel": float(ms_level), - "Charge": float(charge_state) if charge_state is not None else None, - "MS_peaks": float(peak_per_ms), - "Base_Peak_Intensity": float(base_peak_intensity) if base_peak_intensity is not None else None, - "Summed_Peak_Intensities": float(total_intensity) if total_intensity is not None else None, - "Retention_Time": float(rt), - "Exp_Mass_To_Charge": float(exp_mz) if exp_mz is not None else None, - "AcquisitionDateTime": str(acquisition_datetime) - } - elif ms_level == 1: - row_data = { - "SpectrumID": spectrum.getNativeID(), - "MSLevel": float(ms_level), - "Charge": None, - "MS_peaks": float(peak_per_ms), - "Base_Peak_Intensity": float(base_peak_intensity) if base_peak_intensity is not None else None, - "Summed_Peak_Intensities": float(total_intensity) if total_intensity is not None else None, - "Retention_Time": float(rt), - "Exp_Mass_To_Charge": None, - "AcquisitionDateTime": str(acquisition_datetime) - } - else: - continue - - batch_data.append(row_data) - - # Write batch when it reaches specified size - if len(batch_data) >= batch_size: - batch_table = pa.Table.from_pylist(batch_data, schema=parquet_schema) - - if parquet_writer is None: - parquet_writer = pq.ParquetWriter(where=output_path, schema=parquet_schema, compression='gzip') - - parquet_writer.write_table(batch_table) - batch_data = [] - - # Write any remaining data - if batch_data: - batch_table = pa.Table.from_pylist(batch_data, schema=parquet_schema) - if parquet_writer is None: - parquet_writer = pq.ParquetWriter(where=output_path, schema=parquet_schema, compression='gzip') - parquet_writer.write_table(batch_table) - - # Write spectrum data if id_only - if id_only and psm_parts: - spectrum_table = pa.Table.from_pylist( - psm_parts, - schema=pa.schema([ - ('scan', pa.string()), - ('ms_level', pa.int32()), - ('mz', pa.list_(pa.float64())), - ('intensity', pa.list_(pa.float64())) - ]) - ) - pq.write_table( - spectrum_table, - f"{Path(file_name).stem}_spectrum_df.parquet", - compression='gzip' - ) - - if parquet_writer is not None: - parquet_writer.close() + MzMLFile().transform(file_name.encode(), consumer) + consumer.finalize() return output_path - - finally: - if parquet_writer: - parquet_writer.close() + except Exception as e: + print(f"Error during streaming: {e}") + return None def batch_write_bruker_d( file_name: str, @@ -243,7 +255,7 @@ def batch_write_bruker_d( if Path(ms_path).suffix == ".d": batch_write_bruker_d(file_name=ms_path, output_path=output_path, parquet_schema=schema, batch_size=batch_size) elif Path(ms_path).suffix.lower() in [".mzml"]: - batch_write_mzml(file_name=ms_path, parquet_schema=schema, output_path=output_path, id_only=id_only, batch_size=batch_size) + batch_write_mzml_streaming(file_name=ms_path, parquet_schema=schema, output_path=output_path, id_only=id_only, batch_size=batch_size) else: raise RuntimeError(f"Unsupported file type: {ms_path}") From db2f3b9be19e946255be8f8d0e5624d6b7f021f8 Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Sat, 30 Nov 2024 09:04:40 +0000 Subject: [PATCH 07/20] improve to streaming. --- quantmsutils/mzml/mzml_statistics.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index d2f46be..ced12e7 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -158,9 +158,11 @@ def mzml_statistics( python script_name.py mzml_statistics --ms_path "path/to/file.mzML" :param ctx: Click context + :param ms_path: A string specifying the path to the mass spectrometry file. :param id_only: A boolean flag that, when set to True, generates a CSV file containing only the spectrum ID and peaks data for MS level 2 spectra. + :param batch_size: An integer specifying the number of rows to write in each batch. """ schema = pa.schema([ @@ -279,6 +281,3 @@ def _resolve_ms_path(ms_path: str) -> str: return candidates[0] raise FileNotFoundError(f"No unique file found for {ms_path}") - -if __name__ == "__main__": - mzml_statistics() \ No newline at end of file From 9f9c82f79c2ae0711c3cd0fc5dc07fe7426eafd9 Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Sat, 30 Nov 2024 10:20:21 +0000 Subject: [PATCH 08/20] Update quantmsutils/mzml/mzml_statistics.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- quantmsutils/mzml/mzml_statistics.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index ced12e7..12d1f5f 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -95,16 +95,19 @@ def consumeSpectrum(self, spectrum): self._write_batch() def _write_batch(self): - batch_table = pa.Table.from_pylist(self.batch_data, schema=self.parquet_schema) + try: + batch_table = pa.Table.from_pylist(self.batch_data, schema=self.parquet_schema) - if self.parquet_writer is None: - self.parquet_writer = pq.ParquetWriter( - where=self.output_path, schema=self.parquet_schema, compression="gzip" - ) - - self.parquet_writer.write_table(batch_table) - self.batch_data = [] + if self.parquet_writer is None: + self.parquet_writer = pq.ParquetWriter( + where=self.output_path, schema=self.parquet_schema, compression="gzip" + ) + self.parquet_writer.write_table(batch_table) + self.batch_data = [] + except Exception as e: + print(f"Error during batch writing: {e}") + raise def finalize(self): # Write remaining data if self.batch_data: From d64503d32ce1d39519e6d3015897a380d7945e5e Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Sat, 30 Nov 2024 11:41:22 +0000 Subject: [PATCH 09/20] improvements parsing .d --- .gitignore | 1 + quantmsutils/mzml/mzml_statistics.py | 116 +++++++++++++++++---------- 2 files changed, 75 insertions(+), 42 deletions(-) diff --git a/.gitignore b/.gitignore index e2a06e8..bfcef11 100644 --- a/.gitignore +++ b/.gitignore @@ -161,3 +161,4 @@ cython_debug/ *.csv *_df.csv *.tsv +/tests/test_data/hMICAL1_coiPAnP-N2-200_3Murea-1Mthiourea-200mMtcep_14733.d/ diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index 12d1f5f..ae8d59d 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -1,7 +1,7 @@ import re import sqlite3 from pathlib import Path -from typing import Optional +from typing import Optional, List, Dict import click import numpy as np @@ -12,6 +12,11 @@ class BatchWritingConsumer: + """ + A class to consume mass spectrometry data and write to parquet file in batches from mzML files using + pyopenms streaming. + """ + def __init__(self, parquet_schema, output_path, batch_size=10000, id_only=False): self.parquet_schema = parquet_schema self.output_path = output_path @@ -24,21 +29,19 @@ def __init__(self, parquet_schema, output_path, batch_size=10000, id_only=False) self.scan_pattern = re.compile(r"[scan|spectrum]=(\d+)") def setExperimentalSettings(self, settings): - # Extract acquisition date/time from experimental settings self.acquisition_datetime = settings.getDateTime().get() def setExpectedSize(self, a, b): pass def consumeChromatogram(self, chromatogram): - # Optionally handle chromatograms if needed pass def consumeSpectrum(self, spectrum): """ Consume spectrum data and write to parquet file. - :param spectrum: spectrum data - :return: + :param spectrum: spectrum data. + :return: None """ peaks = spectrum.get_peaks() @@ -108,8 +111,12 @@ def _write_batch(self): except Exception as e: print(f"Error during batch writing: {e}") raise + def finalize(self): - # Write remaining data + """ + Finalize the writing process. + :return: + """ if self.batch_data: self._write_batch() @@ -133,6 +140,14 @@ def finalize(self): if self.parquet_writer: self.parquet_writer.close() +def column_exists(conn, table_name: str) -> List[str]: + """ + Fetch the existing columns in the specified SQLite table. + """ + table_info = pd.read_sql_query(f"PRAGMA table_info({table_name});", conn) + return set(table_info['name'].tolist()) + + @click.command("mzmlstats") @click.option("--ms_path", type=click.Path(exists=True), required=True) @click.option( @@ -147,10 +162,10 @@ def finalize(self): ) @click.pass_context def mzml_statistics( - ctx, - ms_path: str, - id_only: bool = False, - batch_size: int = 10000 + ctx, + ms_path: str, + id_only: bool = False, + batch_size: int = 10000 ) -> None: """ The mzml_statistics function parses mass spectrometry data files, either in @@ -158,7 +173,7 @@ def mzml_statistics( It supports generating detailed or ID-only CSV files based on the spectra data. # Command line usage example - python script_name.py mzml_statistics --ms_path "path/to/file.mzML" + quantmsutilsc mzmlstats --ms_path "path/to/file.mzML" :param ctx: Click context @@ -195,10 +210,9 @@ def batch_write_mzml_streaming(file_name: str, parquet_schema: pa.Schema, output return None def batch_write_bruker_d( - file_name: str, - output_path: str, - parquet_schema: pa.Schema, - batch_size: int = 10000 + file_name: str, + output_path: str, + batch_size: int = 10000 ) -> str: """ Batch processing and writing of Bruker .d files. @@ -211,40 +225,56 @@ def batch_write_bruker_d( "SELECT Value FROM GlobalMetadata WHERE key='AcquisitionDateTime'" ).fetchone()[0] + columns = column_exists(conn, "frames") + + schema = pa.schema([ + pa.field("Id", pa.int32(), nullable=False), + pa.field("MsMsType", pa.int32(), nullable=True), + pa.field("NumPeaks", pa.int32(), nullable=True), + pa.field("MaxIntensity", pa.float64(), nullable=True), + pa.field("SummedIntensities", pa.float64(), nullable=True), + pa.field("Time", pa.float64(), nullable=True), + pa.field("Charge", pa.int32(), nullable=True), + pa.field("MonoisotopicMz", pa.float64(), nullable=True), + pa.field("AcquisitionDateTime", pa.string(), nullable=True)] + ) + # Set up parquet writer parquet_writer = pq.ParquetWriter( output_path, - parquet_schema, + schema=schema, compression='gzip' ) + base_columns = [ + "Id", + "CASE WHEN MsMsType IN (8, 9) THEN 2 WHEN MsMsType = 0 THEN 1 ELSE NULL END as MsMsType", + "NumPeaks", + "MaxIntensity", + "SummedIntensities", + "Time" + ] + + if "Charge" in columns: + base_columns.insert(-1, "Charge") # Add before the last column for logical flow + + if "MonoisotopicMz" in columns: + base_columns.insert(-1, "MonoisotopicMz") + + query = f""" + SELECT + {', '.join(base_columns)} + FROM frames + """ + try: # Stream data in batches - for chunk in pd.read_sql_query( - """ - SELECT - Id, - CASE - WHEN MsMsType IN (8, 9) THEN 2 - WHEN MsMsType = 0 THEN 1 - ELSE NULL - END as MsMsType, - NumPeaks, - MaxIntensity, - SummedIntensities, - Time, - Charge, - MonoisotopicMz - FROM frames - """, - conn, - chunksize=batch_size - ): - # Prepare batch + for chunk in pd.read_sql_query(query, conn, chunksize=batch_size): chunk['AcquisitionDateTime'] = acquisition_date_time - - # Convert to pyarrow table and write - batch_table = pa.Table.from_pandas(chunk, schema=parquet_schema) + for col in schema.names: + if col not in chunk.columns: + chunk[col] = None + batch_table = pa.Table.from_pandas(chunk, schema=schema) parquet_writer.write_table(batch_table) finally: @@ -258,12 +288,14 @@ def batch_write_bruker_d( # Choose processing method based on file type if Path(ms_path).suffix == ".d": - batch_write_bruker_d(file_name=ms_path, output_path=output_path, parquet_schema=schema, batch_size=batch_size) + batch_write_bruker_d(file_name=ms_path, output_path=output_path, batch_size=batch_size) elif Path(ms_path).suffix.lower() in [".mzml"]: - batch_write_mzml_streaming(file_name=ms_path, parquet_schema=schema, output_path=output_path, id_only=id_only, batch_size=batch_size) + batch_write_mzml_streaming(file_name=ms_path, parquet_schema=schema, output_path=output_path, id_only=id_only, + batch_size=batch_size) else: raise RuntimeError(f"Unsupported file type: {ms_path}") + def _resolve_ms_path(ms_path: str) -> str: """ Resolve mass spectrometry file path with improved candidate search. From 04ba2a9846bba4a569d9589f57625f5c6acf6877 Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Sat, 30 Nov 2024 13:38:18 +0000 Subject: [PATCH 10/20] id_only move to batch writing. --- quantmsutils/mzml/mzml_statistics.py | 59 ++++++++++++++++++---------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index ae8d59d..4a47993 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -17,14 +17,16 @@ class BatchWritingConsumer: pyopenms streaming. """ - def __init__(self, parquet_schema, output_path, batch_size=10000, id_only=False): + def __init__(self, parquet_schema: pa.Schema, id_parquet_schema: pa.Schema, output_path, batch_size=10000, id_only=False): self.parquet_schema = parquet_schema + self.id_parquet_schema = id_parquet_schema self.output_path = output_path self.batch_size = batch_size self.id_only = id_only self.batch_data = [] self.psm_parts = [] self.parquet_writer = None + self.id_parquet_writer = None self.acquisition_datetime = None self.scan_pattern = re.compile(r"[scan|spectrum]=(\d+)") @@ -60,9 +62,12 @@ def consumeSpectrum(self, spectrum): if self.id_only: scan_id = self.scan_pattern.findall(spectrum.getNativeID())[0] self.psm_parts.append([ - scan_id, ms_level, - mz_array.tolist(), - intensity_array.tolist() + { + "scan": scan_id, + "ms_level": ms_level, + "mz": mz_array.tolist(), + "intensity": intensity_array.tolist() + } ]) row_data = { @@ -108,6 +113,22 @@ def _write_batch(self): self.parquet_writer.write_table(batch_table) self.batch_data = [] + + if self.id_only and self.psm_parts: + spectrum_table = pa.Table.from_pylist( + self.psm_parts, schema=self.id_parquet_schema + ) + + if self.id_parquet_writer is None: + self.id_parquet_writer = pq.ParquetWriter( + where=f"{Path(self.output_path).stem}_spectrum_df.parquet", + schema=self.id_parquet_schema, + compression="gzip" + ) + + self.parquet_writer.write_table(spectrum_table) + self.psm_parts = [] + except Exception as e: print(f"Error during batch writing: {e}") raise @@ -122,24 +143,14 @@ def finalize(self): # Write spectrum data if id_only if self.id_only and self.psm_parts: - spectrum_table = pa.Table.from_pylist( - self.psm_parts, - schema=pa.schema([ - ("scan", pa.string()), - ("ms_level", pa.int32()), - ("mz", pa.list_(pa.float64())), - ("intensity", pa.list_(pa.float64())) - ]) - ) - pq.write_table( - spectrum_table, - f"{Path(self.output_path).stem}_spectrum_df.parquet", - compression="gzip" - ) + self._write_batch() if self.parquet_writer: self.parquet_writer.close() + if self.id_parquet_writer: + self.id_parquet_writer.close() + def column_exists(conn, table_name: str) -> List[str]: """ Fetch the existing columns in the specified SQLite table. @@ -195,7 +206,15 @@ def mzml_statistics( pa.field("AcquisitionDateTime", pa.string(), nullable=True), ]) - def batch_write_mzml_streaming(file_name: str, parquet_schema: pa.Schema, output_path: str, id_only: bool = False, + id_schema = pa.schema([ + ("scan", pa.string()), + ("ms_level", pa.int32()), + ("mz", pa.list_(pa.float64())), + ("intensity", pa.list_(pa.float64())) + ]) + + def batch_write_mzml_streaming(file_name: str, parquet_schema: pa.Schema, output_path: str, + id_parquet_schema: pa.Schema, id_only: bool = False, batch_size: int = 10000) -> Optional[str]: """ Parse mzML file in a streaming manner and write to Parquet. @@ -290,7 +309,7 @@ def batch_write_bruker_d( if Path(ms_path).suffix == ".d": batch_write_bruker_d(file_name=ms_path, output_path=output_path, batch_size=batch_size) elif Path(ms_path).suffix.lower() in [".mzml"]: - batch_write_mzml_streaming(file_name=ms_path, parquet_schema=schema, output_path=output_path, id_only=id_only, + batch_write_mzml_streaming(file_name=ms_path, parquet_schema=schema, id_parquet_schema=id_schema, output_path=output_path, id_only=id_only, batch_size=batch_size) else: raise RuntimeError(f"Unsupported file type: {ms_path}") From ddda85685bf8024e86bdd97ca1c88f965fe3b4cb Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Sat, 30 Nov 2024 13:38:33 +0000 Subject: [PATCH 11/20] id_only move to batch writing. --- quantmsutils/mzml/mzml_statistics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index 4a47993..0a32f05 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -13,7 +13,7 @@ class BatchWritingConsumer: """ - A class to consume mass spectrometry data and write to parquet file in batches from mzML files using + A class to consume mass spectrometry data and write to a parquet file in batches from mzML files using pyopenms streaming. """ From 5999bfb40d47987c94386560d0a32b3b7fbc01c2 Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Sat, 30 Nov 2024 13:45:56 +0000 Subject: [PATCH 12/20] id_only move to batch writing. --- quantmsutils/mzml/mzml_statistics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index 0a32f05..a9ea409 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -1,7 +1,7 @@ import re import sqlite3 from pathlib import Path -from typing import Optional, List, Dict +from typing import Optional, List import click import numpy as np From 5264eb8bc082379fccb6ba81a00c4a9907652d00 Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Fri, 29 Nov 2024 19:18:20 +0000 Subject: [PATCH 13/20] improvements in speed in mzml stats --- .gitignore | 1 + pyproject.toml | 2 +- quantmsutils/__init__.py | 2 +- quantmsutils/mzml/mzml_statistics.py | 486 ++++++++++++++++----------- recipe/meta.yaml | 2 +- 5 files changed, 296 insertions(+), 197 deletions(-) diff --git a/.gitignore b/.gitignore index e2a06e8..bfcef11 100644 --- a/.gitignore +++ b/.gitignore @@ -161,3 +161,4 @@ cython_debug/ *.csv *_df.csv *.tsv +/tests/test_data/hMICAL1_coiPAnP-N2-200_3Murea-1Mthiourea-200mMtcep_14733.d/ diff --git a/pyproject.toml b/pyproject.toml index ca5d136..05c39ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ name = "quantms-utils" description = "Python scripts and helpers for the quantMS workflow" readme = "README.md" license = "MIT" -version = "0.0.15" +version = "0.0.16" authors = [ "Yasset Perez-Riverol ", "Dai Chengxin ", diff --git a/quantmsutils/__init__.py b/quantmsutils/__init__.py index 6561790..d62d967 100644 --- a/quantmsutils/__init__.py +++ b/quantmsutils/__init__.py @@ -1 +1 @@ -__version__ = "0.0.15" +__version__ = "0.0.16" diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index b8cd3df..d582b0b 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -1,239 +1,337 @@ import re import sqlite3 from pathlib import Path +from typing import Optional, List import click +import numpy as np import pandas as pd -import pyarrow -from pyopenms import MSExperiment, MzMLFile +import pyarrow as pa +import pyarrow.parquet as pq +from pyopenms import MzMLFile + + +class BatchWritingConsumer: + """ + A class to consume mass spectrometry data and write to a parquet file in batches from mzML files using + pyopenms streaming. + """ + + def __init__(self, parquet_schema: pa.Schema, id_parquet_schema: pa.Schema, output_path, batch_size=10000, id_only=False): + self.parquet_schema = parquet_schema + self.id_parquet_schema = id_parquet_schema + self.output_path = output_path + self.batch_size = batch_size + self.id_only = id_only + self.batch_data = [] + self.psm_parts = [] + self.parquet_writer = None + self.id_parquet_writer = None + self.acquisition_datetime = None + self.scan_pattern = re.compile(r"[scan|spectrum]=(\d+)") + + def setExperimentalSettings(self, settings): + self.acquisition_datetime = settings.getDateTime().get() + + def setExpectedSize(self, a, b): + pass + + def consumeChromatogram(self, chromatogram): + pass + + def consumeSpectrum(self, spectrum): + """ + Consume spectrum data and write to parquet file. + :param spectrum: spectrum data. + :return: None + """ + + peaks = spectrum.get_peaks() + mz_array, intensity_array = peaks[0], peaks[1] + peak_per_ms = len(mz_array) + base_peak_intensity = float(np.max(intensity_array)) if peak_per_ms > 0 else None + total_intensity = float(np.sum(intensity_array)) if peak_per_ms > 0 else None + ms_level = spectrum.getMSLevel() + rt = spectrum.getRT() + + if ms_level == 2: + precursor = spectrum.getPrecursors()[0] + charge_state = precursor.getCharge() + exp_mz = precursor.getMZ() + + if self.id_only: + scan_id = self.scan_pattern.findall(spectrum.getNativeID())[0] + self.psm_parts.append([ + { + "scan": scan_id, + "ms_level": ms_level, + "mz": mz_array.tolist(), + "intensity": intensity_array.tolist() + } + ]) + + row_data = { + "SpectrumID": spectrum.getNativeID(), + "MSLevel": float(ms_level), + "Charge": float(charge_state) if charge_state is not None else None, + "MS_peaks": float(peak_per_ms), + "Base_Peak_Intensity": float(base_peak_intensity) if base_peak_intensity is not None else None, + "Summed_Peak_Intensities": float(total_intensity) if total_intensity is not None else None, + "Retention_Time": float(rt), + "Exp_Mass_To_Charge": float(exp_mz) if exp_mz is not None else None, + "AcquisitionDateTime": str(self.acquisition_datetime) + } + elif ms_level == 1: + row_data = { + "SpectrumID": spectrum.getNativeID(), + "MSLevel": float(ms_level), + "Charge": None, + "MS_peaks": float(peak_per_ms), + "Base_Peak_Intensity": float(base_peak_intensity) if base_peak_intensity is not None else None, + "Summed_Peak_Intensities": float(total_intensity) if total_intensity is not None else None, + "Retention_Time": float(rt), + "Exp_Mass_To_Charge": None, + "AcquisitionDateTime": str(self.acquisition_datetime) + } + else: + return + + self.batch_data.append(row_data) + + # Write batch when it reaches specified size + if len(self.batch_data) >= self.batch_size: + self._write_batch() + + def _write_batch(self): + try: + batch_table = pa.Table.from_pylist(self.batch_data, schema=self.parquet_schema) + + if self.parquet_writer is None: + self.parquet_writer = pq.ParquetWriter( + where=self.output_path, schema=self.parquet_schema, compression="gzip" + ) + + self.parquet_writer.write_table(batch_table) + self.batch_data = [] + + if self.id_only and self.psm_parts: + spectrum_table = pa.Table.from_pylist( + self.psm_parts, schema=self.id_parquet_schema + ) + + if self.id_parquet_writer is None: + self.id_parquet_writer = pq.ParquetWriter( + where=f"{Path(self.output_path).stem}_spectrum_df.parquet", + schema=self.id_parquet_schema, + compression="gzip" + ) + + self.id_parquet_writer.write_table(spectrum_table) + self.psm_parts = [] + + except Exception as e: + print(f"Error during batch writing: {e}") + raise + + def finalize(self): + """ + Finalize the writing process. + :return: + """ + if self.batch_data: + self._write_batch() + + # Write spectrum data if id_only + if self.id_only and self.psm_parts: + self._write_batch() + + if self.parquet_writer: + self.parquet_writer.close() + + if self.id_parquet_writer: + self.id_parquet_writer.close() + +def column_exists(conn, table_name: str) -> List[str]: + """ + Fetch the existing columns in the specified SQLite table. + """ + table_info = pd.read_sql_query(f"PRAGMA table_info({table_name});", conn) + return set(table_info['name'].tolist()) @click.command("mzmlstats") -@click.option("--ms_path", type=click.Path(exists=True)) +@click.option("--ms_path", type=click.Path(exists=True), required=True) +@click.option( + "--id_only", is_flag=True, + help="Generate a csv with the spectrum id and the peaks" +) @click.option( - "--id_only", is_flag=True, help="Generate a csv with the spectrum id and the peaks" + "--batch_size", + type=int, + default=10000, + help="Number of rows to write in each batch" ) @click.pass_context -def mzml_statistics(ctx, ms_path: str, id_only: bool = False) -> None: +def mzml_statistics( + ctx, + ms_path: str, + id_only: bool = False, + batch_size: int = 10000 +) -> None: """ The mzml_statistics function parses mass spectrometry data files, either in .mzML or Bruker .d formats, to extract and compile a set of statistics about the spectra contained within. It supports generating detailed or ID-only CSV files based on the spectra data. # Command line usage example - python script_name.py mzml_statistics --ms_path "path/to/file.mzML" + quantmsutilsc mzmlstats --ms_path "path/to/file.mzML" :param ctx: Click context + :param ms_path: A string specifying the path to the mass spectrometry file. :param id_only: A boolean flag that, when set to True, generates a CSV file containing only the spectrum ID and peaks data for MS level 2 spectra. + :param batch_size: An integer specifying the number of rows to write in each batch. """ - file_columns = [ - "SpectrumID", - "MSLevel", - "Charge", - "MS_peaks", - "Base_Peak_Intensity", - "Summed_Peak_Intensities", - "Retention_Time", - "Exp_Mass_To_Charge", - "AcquisitionDateTime", - ] + schema = pa.schema([ + pa.field("SpectrumID", pa.string(), nullable=True), + pa.field("MSLevel", pa.float64(), nullable=True), + pa.field("Charge", pa.float64(), nullable=True), + pa.field("MS_peaks", pa.float64(), nullable=True), + pa.field("Base_Peak_Intensity", pa.float64(), nullable=True), + pa.field("Summed_Peak_Intensities", pa.float64(), nullable=True), + pa.field("Retention_Time", pa.float64(), nullable=True), + pa.field("Exp_Mass_To_Charge", pa.float64(), nullable=True), + pa.field("AcquisitionDateTime", pa.string(), nullable=True), + ]) - def parse_mzml(file_name: str, file_columns: list, id_only: bool = False): + id_schema = pa.schema([ + ("scan", pa.string()), + ("ms_level", pa.int32()), + ("mz", pa.list_(pa.float64())), + ("intensity", pa.list_(pa.float64())) + ]) + + def batch_write_mzml_streaming(file_name: str, parquet_schema: pa.Schema, output_path: str, + id_parquet_schema: pa.Schema, id_only: bool = False, + batch_size: int = 10000) -> Optional[str]: """ - Parse mzML file and return a pandas DataFrame with the information. If id_only is True, it will also save a csv. - @param file_name: The file name of the mzML file - @param file_columns: The columns of the DataFrame - @param id_only: If True, it will save a csv with the spectrum id, mz and intensity - @return: A pandas DataFrame with the information of the mzML file + Parse mzML file in a streaming manner and write to Parquet. """ + consumer = BatchWritingConsumer(parquet_schema, output_path, batch_size, id_only) + try: + MzMLFile().transform(file_name.encode(), consumer) + consumer.finalize() + return output_path + except Exception as e: + print(f"Error during streaming: {e}") + return None - info = [] - psm_part_info = [] - exp = MSExperiment() - acquisition_datetime = exp.getDateTime().get() - MzMLFile().load(file_name, exp) - scan_pattern = re.compile(r"[scan|spectrum]=(\d+)") - for spectrum in exp: - id_ = spectrum.getNativeID() - ms_level = spectrum.getMSLevel() - rt = spectrum.getRT() if spectrum.getRT() else None - - peaks_tuple = spectrum.get_peaks() - peak_per_ms = len(peaks_tuple[0]) - - if not spectrum.metaValueExists("base peak intensity"): - bpc = max(peaks_tuple[1]) if len(peaks_tuple[1]) > 0 else None - else: - bpc = spectrum.getMetaValue("base peak intensity") - - if not spectrum.metaValueExists("total ion current"): - tic = sum(peaks_tuple[1]) if len(peaks_tuple[1]) > 0 else None - else: - tic = spectrum.getMetaValue("total ion current") - - if ms_level == 1: - info_list = [ - id_, - ms_level, - None, - peak_per_ms, - bpc, - tic, - rt, - None, - acquisition_datetime, - ] - elif ms_level == 2: - charge_state = spectrum.getPrecursors()[0].getCharge() - emz = ( - spectrum.getPrecursors()[0].getMZ() - if spectrum.getPrecursors()[0].getMZ() - else None - ) - info_list = [ - id_, - ms_level, - charge_state, - peak_per_ms, - bpc, - tic, - rt, - emz, - acquisition_datetime, - ] - mz_array = peaks_tuple[0] - intensity_array = peaks_tuple[1] - else: - info_list = [ - id_, - ms_level, - None, - None, - None, - None, - rt, - None, - acquisition_datetime, - ] - - if id_only and ms_level == 2: - psm_part_info.append( - [ - scan_pattern.findall(id_)[0], - ms_level, - mz_array, - intensity_array, - ] - ) - info.append(info_list) - - if id_only and len(psm_part_info) > 0: - pd.DataFrame( - psm_part_info, columns=["scan", "ms_level", "mz", "intensity"] - ).to_parquet( - f"{Path(ms_path).stem}_spectrum_df.parquet", - index=False, - compression="gzip", - ) + def batch_write_bruker_d( + file_name: str, + output_path: str, + batch_size: int = 10000 + ) -> str: + """ + Batch processing and writing of Bruker .d files. + """ + sql_filepath = f"{file_name}/analysis.tdf" - return pd.DataFrame(info, columns=file_columns) + with sqlite3.connect(sql_filepath) as conn: + # Retrieve acquisition datetime + acquisition_date_time = conn.execute( + "SELECT Value FROM GlobalMetadata WHERE key='AcquisitionDateTime'" + ).fetchone()[0] - def parse_bruker_d(file_name: str, file_columns: list): - sql_filepath = f"{file_name}/analysis.tdf" - if not Path(sql_filepath).exists(): - msg = f"File '{sql_filepath}' not found" - raise FileNotFoundError(msg) - conn = sqlite3.connect(sql_filepath) - c = conn.cursor() - - datetime_cmd = ( - "SELECT Value FROM GlobalMetadata WHERE key='AcquisitionDateTime'" - ) - acquisition_date_time = c.execute(datetime_cmd).fetchall()[0][0] - - df = pd.read_sql_query( - "SELECT Id, MsMsType, NumPeaks, MaxIntensity, SummedIntensities, Time FROM frames", - conn, - ) - df["AcquisitionDateTime"] = acquisition_date_time - - # {8:'DDA-PASEF', 9:'DIA-PASEF'} - if 8 in df["MsMsType"].values: - mslevel_map = {0: 1, 8: 2} - elif 9 in df["MsMsType"].values: - mslevel_map = {0: 1, 9: 2} - else: - msg = f"Unrecognized ms type '{df['MsMsType'].values}'" - raise ValueError(msg) - df["MsMsType"] = df["MsMsType"].map(mslevel_map) + columns = column_exists(conn, "frames") - try: - # This line raises an sqlite error if the table does not exist - _ = conn.execute("SELECT * from Precursors LIMIT 1").fetchall() - precursor_df = pd.read_sql_query("SELECT * from Precursors", conn) - except sqlite3.OperationalError as e: - if "no such table: Precursors" in str(e): - print( - f"No precursors recorded in {file_name}, This is normal for DIA data." - ) - precursor_df = pd.DataFrame() - else: - raise + schema = pa.schema([ + pa.field("Id", pa.int32(), nullable=False), + pa.field("MsMsType", pa.int32(), nullable=True), + pa.field("NumPeaks", pa.int32(), nullable=True), + pa.field("MaxIntensity", pa.float64(), nullable=True), + pa.field("SummedIntensities", pa.float64(), nullable=True), + pa.field("Time", pa.float64(), nullable=True), + pa.field("Charge", pa.int32(), nullable=True), + pa.field("MonoisotopicMz", pa.float64(), nullable=True), + pa.field("AcquisitionDateTime", pa.string(), nullable=True)] + ) - if len(df) == len(precursor_df): - df = pd.concat([df, precursor_df["Charge", "MonoisotopicMz"]], axis=1) - df["Charge"] = df["Charge"].fillna(0) - else: - df[["Charge", "Exp_Mass_To_Charge"]] = None, None + # Set up parquet writer + parquet_writer = pq.ParquetWriter( + output_path, + schema=schema, + compression='gzip' + ) - df = df[ - [ + base_columns = [ "Id", - "MsMsType", - "Charge", + "CASE WHEN MsMsType IN (8, 9) THEN 2 WHEN MsMsType = 0 THEN 1 ELSE NULL END as MsMsType", "NumPeaks", "MaxIntensity", "SummedIntensities", - "Time", - "Exp_Mass_To_Charge", - "AcquisitionDateTime", + "Time" ] - ] - df.columns = pd.Index(file_columns) - return df + if "Charge" in columns: + base_columns.insert(-1, "Charge") # Add before the last column for logical flow - if not (Path(ms_path).exists()): - print(f"Not found '{ms_path}', trying to find alias") - ms_path_path = Path(ms_path) - path_stem = str(ms_path_path.stem) - candidates = ( - list(ms_path_path.parent.glob("*.d")) - + list(ms_path_path.parent.glob("*.mzml")) - + list(ms_path_path.parent.glob("*.mzML")) - ) + if "MonoisotopicMz" in columns: + base_columns.insert(-1, "MonoisotopicMz") - candidates = [c for c in candidates if path_stem in str(c)] + query = f""" + SELECT + {', '.join(base_columns)} + FROM frames + """ - if len(candidates) == 1: - ms_path = str(candidates[0].resolve()) - else: - raise FileNotFoundError() + try: + # Stream data in batches + for chunk in pd.read_sql_query(query, conn, chunksize=batch_size): + chunk['AcquisitionDateTime'] = acquisition_date_time + for col in schema.names: + if col not in chunk.columns: + chunk[col] = None + batch_table = pa.Table.from_pandas(chunk, schema=schema) + parquet_writer.write_table(batch_table) + + finally: + parquet_writer.close() + + return output_path + + # Resolve file path + ms_path = _resolve_ms_path(ms_path) + output_path = f"{Path(ms_path).stem}_ms_info.parquet" - if Path(ms_path).suffix == ".d" and Path(ms_path).is_dir(): - ms_df = parse_bruker_d(ms_path, file_columns) - elif Path(ms_path).suffix in [".mzML", ".mzml"]: - ms_df = parse_mzml(ms_path, file_columns, id_only) + # Choose processing method based on file type + if Path(ms_path).suffix == ".d": + batch_write_bruker_d(file_name=ms_path, output_path=output_path, batch_size=batch_size) + elif Path(ms_path).suffix.lower() in [".mzml"]: + batch_write_mzml_streaming(file_name=ms_path, parquet_schema=schema, id_parquet_schema=id_schema, output_path=output_path, id_only=id_only, + batch_size=batch_size) else: - msg = f"Unrecognized or the mass spec file '{ms_path}' do not exist" - raise RuntimeError(msg) - - ms_df.to_parquet( - f"{Path(ms_path).stem}_ms_info.parquet", - engine="pyarrow", - index=False, - compression="gzip", - ) + raise RuntimeError(f"Unsupported file type: {ms_path}") + + +def _resolve_ms_path(ms_path: str) -> str: + """ + Resolve mass spectrometry file path with improved candidate search. + """ + path_obj = Path(ms_path) + if path_obj.exists(): + return str(path_obj) + + candidates = list(path_obj.parent.glob(f"{path_obj.stem}*")) + valid_extensions = {'.d', '.mzml', '.mzML'} + candidates = [ + str(c.resolve()) + for c in candidates + if c.suffix.lower() in valid_extensions + ] + + if len(candidates) == 1: + return candidates[0] + + raise FileNotFoundError(f"No unique file found for {ms_path}") diff --git a/recipe/meta.yaml b/recipe/meta.yaml index 2b3fb7f..6b076ba 100644 --- a/recipe/meta.yaml +++ b/recipe/meta.yaml @@ -1,7 +1,7 @@ # recipe/meta.yaml package: name: quantms-utils - version: "0.0.15" + version: "0.0.16" source: path: ../ From dbfc89d901d96cb3f50be58f4cc109e44f66b037 Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Sat, 30 Nov 2024 14:29:20 +0000 Subject: [PATCH 14/20] black applied. --- quantmsutils/mzml/mzml_statistics.py | 178 ++++++++++++++------------- 1 file changed, 95 insertions(+), 83 deletions(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index d582b0b..a5f6a33 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -17,7 +17,14 @@ class BatchWritingConsumer: pyopenms streaming. """ - def __init__(self, parquet_schema: pa.Schema, id_parquet_schema: pa.Schema, output_path, batch_size=10000, id_only=False): + def __init__( + self, + parquet_schema: pa.Schema, + id_parquet_schema: pa.Schema, + output_path, + batch_size=10000, + id_only=False, + ): self.parquet_schema = parquet_schema self.id_parquet_schema = id_parquet_schema self.output_path = output_path @@ -61,25 +68,31 @@ def consumeSpectrum(self, spectrum): if self.id_only: scan_id = self.scan_pattern.findall(spectrum.getNativeID())[0] - self.psm_parts.append([ - { - "scan": scan_id, - "ms_level": ms_level, - "mz": mz_array.tolist(), - "intensity": intensity_array.tolist() - } - ]) + self.psm_parts.append( + [ + { + "scan": scan_id, + "ms_level": ms_level, + "mz": mz_array.tolist(), + "intensity": intensity_array.tolist(), + } + ] + ) row_data = { "SpectrumID": spectrum.getNativeID(), "MSLevel": float(ms_level), "Charge": float(charge_state) if charge_state is not None else None, "MS_peaks": float(peak_per_ms), - "Base_Peak_Intensity": float(base_peak_intensity) if base_peak_intensity is not None else None, - "Summed_Peak_Intensities": float(total_intensity) if total_intensity is not None else None, + "Base_Peak_Intensity": ( + float(base_peak_intensity) if base_peak_intensity is not None else None + ), + "Summed_Peak_Intensities": ( + float(total_intensity) if total_intensity is not None else None + ), "Retention_Time": float(rt), "Exp_Mass_To_Charge": float(exp_mz) if exp_mz is not None else None, - "AcquisitionDateTime": str(self.acquisition_datetime) + "AcquisitionDateTime": str(self.acquisition_datetime), } elif ms_level == 1: row_data = { @@ -87,11 +100,15 @@ def consumeSpectrum(self, spectrum): "MSLevel": float(ms_level), "Charge": None, "MS_peaks": float(peak_per_ms), - "Base_Peak_Intensity": float(base_peak_intensity) if base_peak_intensity is not None else None, - "Summed_Peak_Intensities": float(total_intensity) if total_intensity is not None else None, + "Base_Peak_Intensity": ( + float(base_peak_intensity) if base_peak_intensity is not None else None + ), + "Summed_Peak_Intensities": ( + float(total_intensity) if total_intensity is not None else None + ), "Retention_Time": float(rt), "Exp_Mass_To_Charge": None, - "AcquisitionDateTime": str(self.acquisition_datetime) + "AcquisitionDateTime": str(self.acquisition_datetime), } else: return @@ -123,7 +140,7 @@ def _write_batch(self): self.id_parquet_writer = pq.ParquetWriter( where=f"{Path(self.output_path).stem}_spectrum_df.parquet", schema=self.id_parquet_schema, - compression="gzip" + compression="gzip", ) self.id_parquet_writer.write_table(spectrum_table) @@ -151,33 +168,23 @@ def finalize(self): if self.id_parquet_writer: self.id_parquet_writer.close() + def column_exists(conn, table_name: str) -> List[str]: """ Fetch the existing columns in the specified SQLite table. """ table_info = pd.read_sql_query(f"PRAGMA table_info({table_name});", conn) - return set(table_info['name'].tolist()) + return set(table_info["name"].tolist()) @click.command("mzmlstats") @click.option("--ms_path", type=click.Path(exists=True), required=True) +@click.option("--id_only", is_flag=True, help="Generate a csv with the spectrum id and the peaks") @click.option( - "--id_only", is_flag=True, - help="Generate a csv with the spectrum id and the peaks" -) -@click.option( - "--batch_size", - type=int, - default=10000, - help="Number of rows to write in each batch" + "--batch_size", type=int, default=10000, help="Number of rows to write in each batch" ) @click.pass_context -def mzml_statistics( - ctx, - ms_path: str, - id_only: bool = False, - batch_size: int = 10000 -) -> None: +def mzml_statistics(ctx, ms_path: str, id_only: bool = False, batch_size: int = 10000) -> None: """ The mzml_statistics function parses mass spectrometry data files, either in .mzML or Bruker .d formats, to extract and compile a set of statistics about the spectra contained within. @@ -194,28 +201,37 @@ def mzml_statistics( :param batch_size: An integer specifying the number of rows to write in each batch. """ - schema = pa.schema([ - pa.field("SpectrumID", pa.string(), nullable=True), - pa.field("MSLevel", pa.float64(), nullable=True), - pa.field("Charge", pa.float64(), nullable=True), - pa.field("MS_peaks", pa.float64(), nullable=True), - pa.field("Base_Peak_Intensity", pa.float64(), nullable=True), - pa.field("Summed_Peak_Intensities", pa.float64(), nullable=True), - pa.field("Retention_Time", pa.float64(), nullable=True), - pa.field("Exp_Mass_To_Charge", pa.float64(), nullable=True), - pa.field("AcquisitionDateTime", pa.string(), nullable=True), - ]) - - id_schema = pa.schema([ - ("scan", pa.string()), - ("ms_level", pa.int32()), - ("mz", pa.list_(pa.float64())), - ("intensity", pa.list_(pa.float64())) - ]) - - def batch_write_mzml_streaming(file_name: str, parquet_schema: pa.Schema, output_path: str, - id_parquet_schema: pa.Schema, id_only: bool = False, - batch_size: int = 10000) -> Optional[str]: + schema = pa.schema( + [ + pa.field("SpectrumID", pa.string(), nullable=True), + pa.field("MSLevel", pa.float64(), nullable=True), + pa.field("Charge", pa.float64(), nullable=True), + pa.field("MS_peaks", pa.float64(), nullable=True), + pa.field("Base_Peak_Intensity", pa.float64(), nullable=True), + pa.field("Summed_Peak_Intensities", pa.float64(), nullable=True), + pa.field("Retention_Time", pa.float64(), nullable=True), + pa.field("Exp_Mass_To_Charge", pa.float64(), nullable=True), + pa.field("AcquisitionDateTime", pa.string(), nullable=True), + ] + ) + + id_schema = pa.schema( + [ + ("scan", pa.string()), + ("ms_level", pa.int32()), + ("mz", pa.list_(pa.float64())), + ("intensity", pa.list_(pa.float64())), + ] + ) + + def batch_write_mzml_streaming( + file_name: str, + parquet_schema: pa.Schema, + output_path: str, + id_parquet_schema: pa.Schema, + id_only: bool = False, + batch_size: int = 10000, + ) -> Optional[str]: """ Parse mzML file in a streaming manner and write to Parquet. """ @@ -228,11 +244,7 @@ def batch_write_mzml_streaming(file_name: str, parquet_schema: pa.Schema, output print(f"Error during streaming: {e}") return None - def batch_write_bruker_d( - file_name: str, - output_path: str, - batch_size: int = 10000 - ) -> str: + def batch_write_bruker_d(file_name: str, output_path: str, batch_size: int = 10000) -> str: """ Batch processing and writing of Bruker .d files. """ @@ -246,24 +258,22 @@ def batch_write_bruker_d( columns = column_exists(conn, "frames") - schema = pa.schema([ - pa.field("Id", pa.int32(), nullable=False), - pa.field("MsMsType", pa.int32(), nullable=True), - pa.field("NumPeaks", pa.int32(), nullable=True), - pa.field("MaxIntensity", pa.float64(), nullable=True), - pa.field("SummedIntensities", pa.float64(), nullable=True), - pa.field("Time", pa.float64(), nullable=True), - pa.field("Charge", pa.int32(), nullable=True), - pa.field("MonoisotopicMz", pa.float64(), nullable=True), - pa.field("AcquisitionDateTime", pa.string(), nullable=True)] + schema = pa.schema( + [ + pa.field("Id", pa.int32(), nullable=False), + pa.field("MsMsType", pa.int32(), nullable=True), + pa.field("NumPeaks", pa.int32(), nullable=True), + pa.field("MaxIntensity", pa.float64(), nullable=True), + pa.field("SummedIntensities", pa.float64(), nullable=True), + pa.field("Time", pa.float64(), nullable=True), + pa.field("Charge", pa.int32(), nullable=True), + pa.field("MonoisotopicMz", pa.float64(), nullable=True), + pa.field("AcquisitionDateTime", pa.string(), nullable=True), + ] ) # Set up parquet writer - parquet_writer = pq.ParquetWriter( - output_path, - schema=schema, - compression='gzip' - ) + parquet_writer = pq.ParquetWriter(output_path, schema=schema, compression="gzip") base_columns = [ "Id", @@ -271,7 +281,7 @@ def batch_write_bruker_d( "NumPeaks", "MaxIntensity", "SummedIntensities", - "Time" + "Time", ] if "Charge" in columns: @@ -289,7 +299,7 @@ def batch_write_bruker_d( try: # Stream data in batches for chunk in pd.read_sql_query(query, conn, chunksize=batch_size): - chunk['AcquisitionDateTime'] = acquisition_date_time + chunk["AcquisitionDateTime"] = acquisition_date_time for col in schema.names: if col not in chunk.columns: chunk[col] = None @@ -309,8 +319,14 @@ def batch_write_bruker_d( if Path(ms_path).suffix == ".d": batch_write_bruker_d(file_name=ms_path, output_path=output_path, batch_size=batch_size) elif Path(ms_path).suffix.lower() in [".mzml"]: - batch_write_mzml_streaming(file_name=ms_path, parquet_schema=schema, id_parquet_schema=id_schema, output_path=output_path, id_only=id_only, - batch_size=batch_size) + batch_write_mzml_streaming( + file_name=ms_path, + parquet_schema=schema, + id_parquet_schema=id_schema, + output_path=output_path, + id_only=id_only, + batch_size=batch_size, + ) else: raise RuntimeError(f"Unsupported file type: {ms_path}") @@ -324,12 +340,8 @@ def _resolve_ms_path(ms_path: str) -> str: return str(path_obj) candidates = list(path_obj.parent.glob(f"{path_obj.stem}*")) - valid_extensions = {'.d', '.mzml', '.mzML'} - candidates = [ - str(c.resolve()) - for c in candidates - if c.suffix.lower() in valid_extensions - ] + valid_extensions = {".d", ".mzml", ".mzML"} + candidates = [str(c.resolve()) for c in candidates if c.suffix.lower() in valid_extensions] if len(candidates) == 1: return candidates[0] From fe1739e6f0634492662d3022a6aa82ca9542e196 Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Sat, 30 Nov 2024 14:33:05 +0000 Subject: [PATCH 15/20] black applied. --- quantmsutils/mzml/mzml_statistics.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index a5f6a33..c1d17e9 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -290,11 +290,8 @@ def batch_write_bruker_d(file_name: str, output_path: str, batch_size: int = 100 if "MonoisotopicMz" in columns: base_columns.insert(-1, "MonoisotopicMz") - query = f""" - SELECT - {', '.join(base_columns)} - FROM frames - """ + safe_columns = [col for col in base_columns if col.replace(" ", "").isalnum()] # Remove spaces + query = f"""SELECT {', '.join(safe_columns)} FROM frames """ try: # Stream data in batches From 2ca002c0e384f555ac44d3d29da682310ee59810 Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Sat, 30 Nov 2024 14:37:56 +0000 Subject: [PATCH 16/20] black applied. --- quantmsutils/mzml/mzml_statistics.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index c1d17e9..94baf66 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -235,7 +235,13 @@ def batch_write_mzml_streaming( """ Parse mzML file in a streaming manner and write to Parquet. """ - consumer = BatchWritingConsumer(parquet_schema, output_path, batch_size, id_only) + consumer = BatchWritingConsumer( + parquet_schema=parquet_schema, + output_path=output_path, + batch_size=batch_size, + id_only=id_only, + id_parquet_schema=id_parquet_schema, + ) try: MzMLFile().transform(file_name.encode(), consumer) consumer.finalize() @@ -290,7 +296,9 @@ def batch_write_bruker_d(file_name: str, output_path: str, batch_size: int = 100 if "MonoisotopicMz" in columns: base_columns.insert(-1, "MonoisotopicMz") - safe_columns = [col for col in base_columns if col.replace(" ", "").isalnum()] # Remove spaces + safe_columns = [ + col for col in base_columns if col.replace(" ", "").isalnum() + ] # Remove spaces query = f"""SELECT {', '.join(safe_columns)} FROM frames """ try: From ce697c7ab268217c699a9066e79fd20649852cb5 Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Sat, 30 Nov 2024 14:54:00 +0000 Subject: [PATCH 17/20] black applied. --- quantmsutils/mzml/mzml_statistics.py | 39 +++++++++++++++++++++------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index 94baf66..ca095ff 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -120,30 +120,51 @@ def consumeSpectrum(self, spectrum): self._write_batch() def _write_batch(self): + """ + Write accumulated batch data more efficiently using PyArrow's streaming writer. + + Improvements: + - Directly stream data without creating a full in-memory table + - Reduce memory overhead for large datasets + - More efficient batch processing + """ try: - batch_table = pa.Table.from_pylist(self.batch_data, schema=self.parquet_schema) + # If no data, return early + if not self.batch_data: + return + # Initialize writers lazily if not already created if self.parquet_writer is None: self.parquet_writer = pq.ParquetWriter( - where=self.output_path, schema=self.parquet_schema, compression="gzip" + where=self.output_path, + schema=self.parquet_schema, + compression="gzip" ) - self.parquet_writer.write_table(batch_table) + # Create a RecordBatch directly from the current batch + batch = pa.RecordBatch.from_pylist(self.batch_data, schema=self.parquet_schema) + + # Write the batch directly + self.parquet_writer.write_batch(batch) + + # Clear the batch data self.batch_data = [] + # Handle ID-only data if applicable if self.id_only and self.psm_parts: - spectrum_table = pa.Table.from_pylist( - self.psm_parts, schema=self.id_parquet_schema - ) - + # Similar approach for spectrum ID data if self.id_parquet_writer is None: self.id_parquet_writer = pq.ParquetWriter( where=f"{Path(self.output_path).stem}_spectrum_df.parquet", schema=self.id_parquet_schema, - compression="gzip", + compression="gzip" ) - self.id_parquet_writer.write_table(spectrum_table) + id_batch = pa.RecordBatch.from_pylist( + self.psm_parts, + schema=self.id_parquet_schema + ) + self.id_parquet_writer.write_batch(id_batch) self.psm_parts = [] except Exception as e: From 2b5cc7f263639da103cbf2986dfdfac2bd67a52e Mon Sep 17 00:00:00 2001 From: Julianus Pfeuffer Date: Sat, 30 Nov 2024 17:08:37 +0100 Subject: [PATCH 18/20] Apply suggestions from code review --- quantmsutils/mzml/mzml_statistics.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index ca095ff..fbda986 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -73,8 +73,8 @@ def consumeSpectrum(self, spectrum): { "scan": scan_id, "ms_level": ms_level, - "mz": mz_array.tolist(), - "intensity": intensity_array.tolist(), + "mz": mz_array, + "intensity": intensity_array, } ] ) From 71784a6e30bcc7648e8f0de6cc806a72755e4744 Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Sat, 30 Nov 2024 19:49:23 +0000 Subject: [PATCH 19/20] black applied. --- quantmsutils/mzml/mzml_statistics.py | 44 +++++++++++++++------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index fbda986..c242312 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -283,8 +283,32 @@ def batch_write_bruker_d(file_name: str, output_path: str, batch_size: int = 100 "SELECT Value FROM GlobalMetadata WHERE key='AcquisitionDateTime'" ).fetchone()[0] + # Check which optional columns exist columns = column_exists(conn, "frames") + # Get allowed columns from the schema + allowed_columns = { + 'Id': 'Id', + 'MsMsType': 'CASE WHEN MsMsType IN (8, 9) THEN 2 WHEN MsMsType = 0 THEN 1 ELSE NULL END', + 'NumPeaks': 'NumPeaks', + 'MaxIntensity': 'MaxIntensity', + 'SummedIntensities': 'SummedIntensities', + 'Time': 'Time', + 'Charge': 'Charge', + 'MonoisotopicMz': 'MonoisotopicMz' + } + + # Construct safe column list + safe_columns = [] + column_mapping = {} + for schema_col_name, sql_expr in allowed_columns.items(): + if schema_col_name in columns or schema_col_name == 'Id': + safe_columns.append(sql_expr) + column_mapping[schema_col_name] = sql_expr + + # Construct the query using parameterized safe columns + query = f"""SELECT {', '.join(safe_columns)} FROM frames""" + schema = pa.schema( [ pa.field("Id", pa.int32(), nullable=False), @@ -302,26 +326,6 @@ def batch_write_bruker_d(file_name: str, output_path: str, batch_size: int = 100 # Set up parquet writer parquet_writer = pq.ParquetWriter(output_path, schema=schema, compression="gzip") - base_columns = [ - "Id", - "CASE WHEN MsMsType IN (8, 9) THEN 2 WHEN MsMsType = 0 THEN 1 ELSE NULL END as MsMsType", - "NumPeaks", - "MaxIntensity", - "SummedIntensities", - "Time", - ] - - if "Charge" in columns: - base_columns.insert(-1, "Charge") # Add before the last column for logical flow - - if "MonoisotopicMz" in columns: - base_columns.insert(-1, "MonoisotopicMz") - - safe_columns = [ - col for col in base_columns if col.replace(" ", "").isalnum() - ] # Remove spaces - query = f"""SELECT {', '.join(safe_columns)} FROM frames """ - try: # Stream data in batches for chunk in pd.read_sql_query(query, conn, chunksize=batch_size): From 4d3351fb0bcac1eeb628e0c5e6d23273825fe54e Mon Sep 17 00:00:00 2001 From: Yasset Perez-Riverol Date: Sat, 30 Nov 2024 19:49:50 +0000 Subject: [PATCH 20/20] black applied. --- quantmsutils/mzml/mzml_statistics.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/quantmsutils/mzml/mzml_statistics.py b/quantmsutils/mzml/mzml_statistics.py index c242312..9fd349d 100644 --- a/quantmsutils/mzml/mzml_statistics.py +++ b/quantmsutils/mzml/mzml_statistics.py @@ -136,9 +136,7 @@ def _write_batch(self): # Initialize writers lazily if not already created if self.parquet_writer is None: self.parquet_writer = pq.ParquetWriter( - where=self.output_path, - schema=self.parquet_schema, - compression="gzip" + where=self.output_path, schema=self.parquet_schema, compression="gzip" ) # Create a RecordBatch directly from the current batch @@ -157,12 +155,11 @@ def _write_batch(self): self.id_parquet_writer = pq.ParquetWriter( where=f"{Path(self.output_path).stem}_spectrum_df.parquet", schema=self.id_parquet_schema, - compression="gzip" + compression="gzip", ) id_batch = pa.RecordBatch.from_pylist( - self.psm_parts, - schema=self.id_parquet_schema + self.psm_parts, schema=self.id_parquet_schema ) self.id_parquet_writer.write_batch(id_batch) self.psm_parts = [] @@ -288,21 +285,21 @@ def batch_write_bruker_d(file_name: str, output_path: str, batch_size: int = 100 # Get allowed columns from the schema allowed_columns = { - 'Id': 'Id', - 'MsMsType': 'CASE WHEN MsMsType IN (8, 9) THEN 2 WHEN MsMsType = 0 THEN 1 ELSE NULL END', - 'NumPeaks': 'NumPeaks', - 'MaxIntensity': 'MaxIntensity', - 'SummedIntensities': 'SummedIntensities', - 'Time': 'Time', - 'Charge': 'Charge', - 'MonoisotopicMz': 'MonoisotopicMz' + "Id": "Id", + "MsMsType": "CASE WHEN MsMsType IN (8, 9) THEN 2 WHEN MsMsType = 0 THEN 1 ELSE NULL END", + "NumPeaks": "NumPeaks", + "MaxIntensity": "MaxIntensity", + "SummedIntensities": "SummedIntensities", + "Time": "Time", + "Charge": "Charge", + "MonoisotopicMz": "MonoisotopicMz", } # Construct safe column list safe_columns = [] column_mapping = {} for schema_col_name, sql_expr in allowed_columns.items(): - if schema_col_name in columns or schema_col_name == 'Id': + if schema_col_name in columns or schema_col_name == "Id": safe_columns.append(sql_expr) column_mapping[schema_col_name] = sql_expr