diff --git a/CHANGELOG.md b/CHANGELOG.md index eb208101..b9aad769 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), #### Added +- Add support for storing pipeline measurements as partitioned arrow files [#580](https://github.com/askap-vast/vast-tools/pull/580) - Add epoch 69 [#576](https://github.com/askap-vast/vast-tools/pull/576) - Add epoch 68 [#575](https://github.com/askap-vast/vast-tools/pull/575) - Add epoch 67 [#573](https://github.com/askap-vast/vast-tools/pull/573) @@ -18,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), #### Changed +- Switch os.path to Pathlib in vasttools.pipeline [#580](https://github.com/askap-vast/vast-tools/pull/580) - Minor changes to docstring formatting throughout based on updated mkdocs versions [#334](https://github.com/askap-vast/vast-tools/pull/334) - Minor changes for matplotlib 3.7: add angle kwarg to Ellipse and change matplotlib.pyplot.cm.get_cmap to matplotlib.colormaps.get_cmap [#334](https://github.com/askap-vast/vast-tools/pull/334) - Refreshed dependencies - major changes are python 3.10, mkdocs (and related packages), astropy v5 and matplotlib v3.7 [#334](https://github.com/askap-vast/vast-tools/pull/334) @@ -39,6 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), #### List of PRs +- [#580](https://github.com/askap-vast/vast-tools/pull/580): feat: Add support for partitioned measurement files and switch vasttools.pipeline to use pathlib - [#576](https://github.com/askap-vast/vast-tools/pull/576): feat: Add epoch 69 - [#575](https://github.com/askap-vast/vast-tools/pull/575): feat: Add epoch 68 - [#573](https://github.com/askap-vast/vast-tools/pull/573): feat: Add epoch 67 diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 2add1546..c1f06a71 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -2,7 +2,6 @@ import copy import matplotlib.pyplot as plt import numpy as np -import os import pandas as pd import pytest import vaex @@ -34,7 +33,9 @@ def dummy_pipeline_object(mocker: MockerFixture) -> vtp.Pipeline: mocker_getenv = mocker.patch( 'os.getenv', return_value=expected_path ) - mock_isdir = mocker.patch('os.path.isdir', return_value=True) + mock_isdir = mocker.patch('vasttools.pipeline.Path.is_dir', + return_value=True + ) pipe = vtp.Pipeline() @@ -315,7 +316,7 @@ def dummy_pipeline_pairs_df() -> pd.DataFrame: def load_parquet_side_effect( - value: str, + value: Union[str, Path], **kwargs ) -> Union[pd.DataFrame, vaex.dataframe.DataFrame]: """ @@ -333,6 +334,8 @@ def load_parquet_side_effect( The relevant dataframe which is returned from the function. Can be pandas or vaex. """ + if isinstance(value, Path): + value = str(value) if 'bands.parquet' in value: return dummy_pipeline_bands() elif 'associations.parquet' in value: @@ -370,8 +373,13 @@ def dummy_PipeAnalysis_base( Returns: The vtp.PipeAnalysis instance. """ - mock_isdir = mocker.patch('os.path.isdir', return_value=True) - mock_isfile = mocker.patch('os.path.isfile', return_value=False) + mock_isdir = mocker.patch('vasttools.pipeline.Path.is_dir', + # return_value=False, + side_effect=[True, False, False] + ) + mock_isfile = mocker.patch( + 'vasttools.pipeline.Path.is_file', + return_value=False) pandas_read_parquet_mocker = mocker.patch( 'vasttools.pipeline.pd.read_parquet', side_effect=load_parquet_side_effect @@ -410,7 +418,6 @@ def dummy_PipeAnalysis( Returns: The vtp.PipeAnalysis instance. """ - measurement_pairs_existence_mocker = mocker.patch( 'vasttools.pipeline.PipeRun._check_measurement_pairs_file', return_value=True @@ -469,8 +476,12 @@ def dummy_PipeAnalysis_vaex( Returns: The vtp.PipeAnalysis instance. """ - mock_isdir = mocker.patch('os.path.isdir', return_value=True) - mock_isfile = mocker.patch('os.path.isfile', return_value=True) + mock_isdir = mocker.patch( + 'vasttools.pipeline.Path.is_dir', + return_value=True) + mock_isfile = mocker.patch( + 'vasttools.pipeline.Path.is_file', + return_value=True) pandas_read_parquet_mocker = mocker.patch( 'vasttools.pipeline.pd.read_parquet', side_effect=load_parquet_side_effect @@ -664,11 +675,13 @@ def test_init(self, mocker: MockerFixture) -> None: Returns: None """ - expected_path = '/path/to/pipelineruns/' + expected_path = Path('/path/to/pipelineruns/') mocker_getenv = mocker.patch( 'os.getenv', return_value=expected_path ) - mock_isdir = mocker.patch('os.path.isdir', return_value=True) + mock_isdir = mocker.patch( + 'vasttools.pipeline.Path.is_dir', + return_value=True) pipe = vtp.Pipeline() @@ -687,11 +700,13 @@ def test_init_projectdir(self, mocker: MockerFixture) -> None: Returns: None """ - expected_path = '/path/to/projectdir/' + expected_path = Path('/path/to/projectdir/') mocker_abspath = mocker.patch( 'os.path.abspath', return_value=expected_path ) - mock_isdir = mocker.patch('os.path.isdir', return_value=True) + mock_isdir = mocker.patch( + 'vasttools.pipeline.Path.is_dir', + return_value=True) pipe = vtp.Pipeline(project_dir=expected_path) @@ -709,7 +724,7 @@ def test_init_env_fail(self, mocker: MockerFixture) -> None: Returns: None """ - expected_path = '/path/to/pipelineruns/' + expected_path = Path('/path/to/pipelineruns/') mocker_getenv = mocker.patch( 'os.getenv', return_value=None ) @@ -733,11 +748,13 @@ def test_init_project_dir_fail(self, mocker: MockerFixture) -> None: Returns: None """ - expected_path = '/path/to/projectdir/' + expected_path = Path('/path/to/projectdir/') mocker_abspath = mocker.patch( 'os.path.abspath', return_value=expected_path ) - mock_isdir = mocker.patch('os.path.isdir', return_value=False) + mock_isdir = mocker.patch( + 'vasttools.pipeline.Path.is_dir', + return_value=False) with pytest.raises(vtp.PipelineDirectoryError) as excinfo: pipe = vtp.Pipeline(project_dir=expected_path) @@ -760,21 +777,21 @@ def test_list_piperuns( Returns: None """ - expected_path = '/path/to/pipelineruns' + expected_path = Path('/path/to/pipelineruns') expected_result = ['job1', 'job2'] mocker_glob = mocker.patch( - 'glob.glob', return_value=[ - os.path.join(expected_path, 'job1'), - os.path.join(expected_path, 'job2'), - os.path.join(expected_path, 'images') + 'vasttools.pipeline.Path.glob', return_value=[ + expected_path / 'job1', + expected_path / 'job2', + expected_path / 'images' ] ) result = dummy_pipeline_object.list_piperuns() - mocker_glob.assert_called_once_with(os.path.join(expected_path, '*')) + mocker_glob.assert_called_once_with('*') assert result == expected_result def test_list_images( @@ -795,22 +812,21 @@ def test_list_images( Returns: None """ - expected_path = '/path/to/pipelineruns' + expected_path = Path('/path/to/pipelineruns') + images_path = expected_path / 'images' expected_result = ['image1', 'image2'] mocker_glob = mocker.patch( - 'glob.glob', return_value=[ - os.path.join(expected_path, 'images', 'image1'), - os.path.join(expected_path, 'images', 'image2'), + 'vasttools.pipeline.Path.glob', return_value=[ + images_path / 'image1', + images_path / 'image2', ] ) result = dummy_pipeline_object.list_images() - mocker_glob.assert_called_once_with(os.path.join( - expected_path, 'images', '*' - )) + mocker_glob.assert_called_once_with('*') assert result == expected_result def test_load_run_dir_fail( @@ -832,7 +848,9 @@ def test_load_run_dir_fail( Returns: None """ - mock_isdir = mocker.patch('os.path.isdir', return_value=False) + mock_isdir = mocker.patch( + 'vasttools.pipeline.Path.is_dir', + return_value=False) pipe = dummy_pipeline_object @@ -859,8 +877,12 @@ def test_load_run_no_vaex( Returns: None """ - mock_isdir = mocker.patch('os.path.isdir', return_value=True) - mock_isfile = mocker.patch('os.path.isfile', return_value=False) + mock_isdir = mocker.patch('vasttools.pipeline.Path.is_dir', + side_effect=[True, False, False] + ) + mock_isfile = mocker.patch('vasttools.pipeline.Path.is_file', + return_value=False + ) pandas_read_parquet_mocker = mocker.patch( 'vasttools.pipeline.pd.read_parquet', side_effect=load_parquet_side_effect @@ -898,8 +920,12 @@ def test_load_run_no_vaex_check_columns( Returns: None """ - mock_isdir = mocker.patch('os.path.isdir', return_value=True) - mock_isfile = mocker.patch('os.path.isfile', return_value=False) + mock_isdir = mocker.patch('vasttools.pipeline.Path.is_dir', + side_effect=[True, False, False] + ) + mock_isfile = mocker.patch('vasttools.pipeline.Path.is_file', + return_value=False + ) pandas_read_parquet_mocker = mocker.patch( 'vasttools.pipeline.pd.read_parquet', side_effect=load_parquet_side_effect @@ -939,8 +965,54 @@ def test_load_run_vaex( Returns: None """ - mock_isdir = mocker.patch('os.path.isdir', return_value=True) - mock_isfile = mocker.patch('os.path.isfile', return_value=True) + mock_isdir = mocker.patch( + 'vasttools.pipeline.Path.is_dir', + return_value=True) + mock_isfile = mocker.patch( + 'vasttools.pipeline.Path.is_file', + return_value=True) + pandas_read_parquet_mocker = mocker.patch( + 'vasttools.pipeline.pd.read_parquet', + side_effect=load_parquet_side_effect + ) + vaex_open_mocker = mocker.patch( + 'vasttools.pipeline.vaex.open', + return_value=dummy_pipeline_measurements_vaex() + ) + + pipe = dummy_pipeline_object + run_name = 'test_run' + run = pipe.load_run(run_name) + + assert run.name == run_name + assert run._vaex_meas is True + + def test_load_run_vaex_partitioned( + self, + dummy_pipeline_object: vtp.Pipeline, + mocker: MockerFixture + ) -> None: + """ + Tests the load run method. + + Specifically when the arrow files are present so vaex is used. + The usual mocks are in place, including using the read parquet side + effect. + + Args: + dummy_pipeline_object: The dummy Pipeline object that is used + for testing. + mocker: The pytest mocker mock object. + + Returns: + None + """ + mock_isdir = mocker.patch( + 'vasttools.pipeline.Path.is_dir', + return_value=True) + mock_isfile = mocker.patch( + 'vasttools.pipeline.Path.is_file', + return_value=False) pandas_read_parquet_mocker = mocker.patch( 'vasttools.pipeline.pd.read_parquet', side_effect=load_parquet_side_effect @@ -969,9 +1041,9 @@ class TestPipeAnalysis: [ [True], [False], - [True,True], - [False,False], - [True,False], + [True, True], + [False, False], + [True, False], ], ids=("single-exists", "single-no-exists", @@ -979,40 +1051,40 @@ class TestPipeAnalysis: "multiple-no-exists", "multiple-some-exists", ) - ) - def test__check_measurement_pairs_file(self, + ) + def test__check_measurement_pairs_file( + self, pairs_existence: List[bool], dummy_PipeAnalysis_base: vtp.PipeAnalysis, mocker: MockerFixture - ) -> None: + ) -> None: """ Tests the _check_measurement_pairs_file method. - + Args: pairs_existence: A list of booleans corresponding to whether a pairs file exists. dummy_PipeAnalysis_base: The base dummy PipeAnalysis object. mocker: The pytest-mock mocker object. - + Returns: None """ mocker_isfile = mocker.patch( - "os.path.isfile", + "vasttools.pipeline.Path.is_file", side_effect=pairs_existence ) - - fake_pairs_file = [""]*len(pairs_existence) - + + fake_pairs_file = [Path("")] * len(pairs_existence) + dummy_PipeAnalysis_base.measurement_pairs_file = fake_pairs_file - + returned_val = dummy_PipeAnalysis_base._check_measurement_pairs_file() - + all_exist = sum(pairs_existence) == len(pairs_existence) - + assert returned_val == all_exist - - + def test_combine_with_run( self, dummy_PipeAnalysis: vtp.PipeAnalysis diff --git a/vasttools/pipeline.py b/vasttools/pipeline.py index 29cb4146..e89322d3 100644 --- a/vasttools/pipeline.py +++ b/vasttools/pipeline.py @@ -9,7 +9,6 @@ import numexpr import os import warnings -import glob import vaex import dask.dataframe as dd import colorcet as cc @@ -22,6 +21,7 @@ import matplotlib.pyplot as plt from typing import List, Tuple +from pathlib import Path from bokeh.models import ( Span, BoxAnnotation, @@ -110,7 +110,7 @@ def __init__( associations: pd.DataFrame, bands: pd.DataFrame, measurements: Union[pd.DataFrame, vaex.dataframe.DataFrame], - measurement_pairs_file: List[str], + measurement_pairs_file: Union[List[str], List[Path]], vaex_meas: bool = False, n_workers: int = HOST_NCPU - 1, scheduler: str = 'processes' @@ -161,7 +161,7 @@ def __init__( self.associations = associations self.bands = bands self.measurements = measurements - self.measurement_pairs_file = measurement_pairs_file + self.measurement_pairs_file = [Path(i) for i in measurement_pairs_file] self.relations = relations self.n_workers = n_workers self._vaex_meas = vaex_meas @@ -177,7 +177,7 @@ def _check_measurement_pairs_file(self): measurement_pairs_exists = True for filepath in self.measurement_pairs_file: - if not os.path.isfile(filepath): + if not filepath.is_file(): self.logger.warning(f"Measurement pairs file ({filepath}) does" f" not exist. You will be unable to access" f" measurement pairs or two-epoch metrics." @@ -361,7 +361,7 @@ def get_source( if self._vaex_meas: measurements = the_measurements[ the_measurements['source'] == id - ].to_pandas_df() + ].extract().to_pandas_df() else: measurements = the_measurements.loc[ @@ -395,7 +395,7 @@ def get_source( s = the_sources.loc[id] - num_measurements = s['n_measurements'] + num_measurements = len(measurements) #s['n_measurements'] source_coord = SkyCoord( s['wavg_ra'], @@ -515,7 +515,7 @@ def load_two_epoch_metrics(self) -> None: self._vaex_meas_pairs = False if len(self.measurement_pairs_file) > 1: arrow_files = ( - [i.endswith(".arrow") for i in self.measurement_pairs_file] + [i.suffix == ".arrow" for i in self.measurement_pairs_file] ) if np.any(arrow_files): measurement_pairs_df = vaex.open_many( @@ -532,7 +532,7 @@ def load_two_epoch_metrics(self) -> None: dd.read_parquet(self.measurement_pairs_file).compute() ) else: - if self.measurement_pairs_file[0].endswith('.arrow'): + if self.measurement_pairs_file[0].suffix == '.arrow': measurement_pairs_df = ( vaex.open(self.measurement_pairs_file[0]) ) @@ -553,7 +553,7 @@ def load_two_epoch_metrics(self) -> None: measurement_pairs_df.pair_epoch_key, agg='count' ) - pair_counts = pair_counts.to_pandas_df().rename( + pair_counts = pair_counts.extract().to_pandas_df().rename( columns={'count': 'total_pairs'} ).set_index('pair_epoch_key') else: @@ -976,7 +976,7 @@ def _filter_meas_pairs_df( ) if not self._vaex_meas_pairs: - new_measurement_pairs = new_measurement_pairs.to_pandas_df() + new_measurement_pairs = new_measurement_pairs.extract().to_pandas_df() return new_measurement_pairs @@ -1025,7 +1025,7 @@ def recalc_measurement_pairs_df( # convert a vaex measurements df to panads so an index can be set if isinstance(measurements_df, vaex.dataframe.DataFrame): - measurements_df = measurements_df[flux_cols].to_pandas_df() + measurements_df = measurements_df[flux_cols].extract().to_pandas_df() else: measurements_df = measurements_df.loc[:, flux_cols].copy() @@ -1211,7 +1211,7 @@ def recalc_sources_df( ) # Switch to pandas at this point to perform join - sources_df = sources_df.to_pandas_df().set_index('source') + sources_df = sources_df.extract().to_pandas_df().set_index('source') sources_df = sources_df.join(sources_df_fluxes) @@ -1847,7 +1847,7 @@ def run_two_epoch_analysis( (pairs_df[vs_label] > vs) & (pairs_df[m_abs_label] > m) ] - candidate_pairs = candidate_pairs.to_pandas_df() + candidate_pairs = candidate_pairs.extract().to_pandas_df() else: candidate_pairs = pairs_df.loc[ @@ -2500,10 +2500,12 @@ def __init__(self, project_dir: Optional[str] = None) -> None: " must be defined or the 'project_dir' argument defined" " when initialising the pipeline class object." ) + else: + pipeline_run_path = Path(pipeline_run_path) else: - pipeline_run_path = os.path.abspath(str(project_dir)) + pipeline_run_path = Path(project_dir).resolve() - if not os.path.isdir(pipeline_run_path): + if not pipeline_run_path.is_dir(): raise PipelineDirectoryError( "Pipeline run directory {} not found!".format( pipeline_run_path @@ -2522,11 +2524,9 @@ def list_piperuns(self) -> List[str]: Returns: List of pipeline run names present in directory. """ - jobs = sorted(glob.glob( - os.path.join(self.project_dir, "*") - )) + jobs = sorted(self.project_dir.glob("*")) - jobs = [i.split("/")[-1] for i in jobs] + jobs = [i.name for i in jobs] jobs.remove('images') return jobs @@ -2538,11 +2538,10 @@ def list_images(self) -> List[str]: Returns: List of images processed. """ - img_list = sorted(glob.glob( - os.path.join(self.project_dir, "images", "*") - )) + img_dir = self.project_dir / "images" + img_list = sorted(img_dir.glob("*")) - img_list = [i.split("/")[-1] for i in img_list] + img_list = [i.name for i in img_list] return img_list @@ -2596,38 +2595,24 @@ def load_run( ValueError: Entered pipeline run does not exist. """ - run_dir = os.path.join( - self.project_dir, - run_name - ) + run_dir = Path(self.project_dir) / run_name - if not os.path.isdir(run_dir): + if not run_dir.is_dir(): raise OSError( "Run '%s' directory does not exist!", run_name ) return - images = pd.read_parquet( - os.path.join( - run_dir, - 'images.parquet' - ) - ) + images = pd.read_parquet(run_dir / 'images.parquet') skyregions = pd.read_parquet( - os.path.join( - run_dir, - 'skyregions.parquet' - ), + run_dir / 'skyregions.parquet', engine='pyarrow' ) bands = pd.read_parquet( - os.path.join( - run_dir, - 'bands.parquet' - ), + run_dir / 'bands.parquet', engine='pyarrow' ) @@ -2656,18 +2641,12 @@ def load_run( ) relations = pd.read_parquet( - os.path.join( - run_dir, - 'relations.parquet' - ), + run_dir / 'relations.parquet', engine='pyarrow' ) sources = pd.read_parquet( - os.path.join( - run_dir, - 'sources.parquet' - ), + run_dir / 'sources.parquet', engine='pyarrow' ) @@ -2688,33 +2667,30 @@ def load_run( ) associations = pd.read_parquet( - os.path.join( - run_dir, - 'associations.parquet' - ), + run_dir / 'associations.parquet', engine='pyarrow' ) vaex_meas = False - if os.path.isfile(os.path.join( - run_dir, - 'measurements.arrow' - )): - measurements = vaex.open( - os.path.join(run_dir, 'measurements.arrow') - ) + arrow_path = run_dir / 'measurements.arrow' + if arrow_path.is_file(): vaex_meas = True + measurements = vaex.open(arrow_path) + warnings.warn( + "Measurements have been loaded with vaex from a single file.") - warnings.warn("Measurements have been loaded with vaex.") + elif arrow_path.is_dir(): + vaex_meas = True + measurements = vaex.open(arrow_path / '*.parquet') + warnings.warn( + "Measurements have been loaded with vaex " + "from a partitioned file.") else: m_files = images['measurements_path'].tolist() - m_files += sorted(glob.glob(os.path.join( - run_dir, - "forced_measurements*.parquet" - ))) + m_files += sorted(run_dir.glob("forced_measurements*.parquet")) # use dask to open measurement parquets # as they are spread over many different files @@ -2740,19 +2716,11 @@ def load_run( images = images.set_index('id') - if os.path.isfile(os.path.join( - run_dir, - "measurement_pairs.arrow" - )): - measurement_pairs_file = [os.path.join( - run_dir, - "measurement_pairs.arrow" - )] + measurement_pairs_arrow = run_dir / "measurement_pairs.arrow" + if measurement_pairs_arrow.is_file(): + measurement_pairs_file = [measurement_pairs_arrow] else: - measurement_pairs_file = [os.path.join( - run_dir, - "measurement_pairs.parquet" - )] + measurement_pairs_file = [run_dir / "measurement_pairs.parquet"] piperun = PipeAnalysis( name=run_name,