Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for partitioned arrow files, and convert vasttools.pipeline to use pathlib rather than os.path #580

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Added

- Add support for storing pipeline measurements as partitioned arrow files [#580](https://github.com/askap-vast/vast-tools/pull/580)
- Add epoch 69 [#576](https://github.com/askap-vast/vast-tools/pull/576)
- Add epoch 68 [#575](https://github.com/askap-vast/vast-tools/pull/575)
- Add epoch 67 [#573](https://github.com/askap-vast/vast-tools/pull/573)
Expand All @@ -18,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### Changed

- Switch os.path to Pathlib in vasttools.pipeline [#580](https://github.com/askap-vast/vast-tools/pull/580)
- Minor changes to docstring formatting throughout based on updated mkdocs versions [#334](https://github.com/askap-vast/vast-tools/pull/334)
- Minor changes for matplotlib 3.7: add angle kwarg to Ellipse and change matplotlib.pyplot.cm.get_cmap to matplotlib.colormaps.get_cmap [#334](https://github.com/askap-vast/vast-tools/pull/334)
- Refreshed dependencies - major changes are python 3.10, mkdocs (and related packages), astropy v5 and matplotlib v3.7 [#334](https://github.com/askap-vast/vast-tools/pull/334)
Expand All @@ -39,6 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

#### List of PRs

- [#580](https://github.com/askap-vast/vast-tools/pull/580): feat: Add support for partitioned measurement files and switch vasttools.pipeline to use pathlib
- [#576](https://github.com/askap-vast/vast-tools/pull/576): feat: Add epoch 69
- [#575](https://github.com/askap-vast/vast-tools/pull/575): feat: Add epoch 68
- [#573](https://github.com/askap-vast/vast-tools/pull/573): feat: Add epoch 67
Expand Down
176 changes: 124 additions & 52 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import copy
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import pytest
import vaex
Expand Down Expand Up @@ -34,7 +33,9 @@ def dummy_pipeline_object(mocker: MockerFixture) -> vtp.Pipeline:
mocker_getenv = mocker.patch(
'os.getenv', return_value=expected_path
)
mock_isdir = mocker.patch('os.path.isdir', return_value=True)
mock_isdir = mocker.patch('vasttools.pipeline.Path.is_dir',
return_value=True
)

pipe = vtp.Pipeline()

Expand Down Expand Up @@ -315,7 +316,7 @@ def dummy_pipeline_pairs_df() -> pd.DataFrame:


def load_parquet_side_effect(
value: str,
value: Union[str, Path],
**kwargs
) -> Union[pd.DataFrame, vaex.dataframe.DataFrame]:
"""
Expand All @@ -333,6 +334,8 @@ def load_parquet_side_effect(
The relevant dataframe which is returned from the function. Can be
pandas or vaex.
"""
if isinstance(value, Path):
value = str(value)
if 'bands.parquet' in value:
return dummy_pipeline_bands()
elif 'associations.parquet' in value:
Expand Down Expand Up @@ -370,8 +373,13 @@ def dummy_PipeAnalysis_base(
Returns:
The vtp.PipeAnalysis instance.
"""
mock_isdir = mocker.patch('os.path.isdir', return_value=True)
mock_isfile = mocker.patch('os.path.isfile', return_value=False)
mock_isdir = mocker.patch('vasttools.pipeline.Path.is_dir',
# return_value=False,
side_effect=[True, False, False]
)
mock_isfile = mocker.patch(
'vasttools.pipeline.Path.is_file',
return_value=False)
pandas_read_parquet_mocker = mocker.patch(
'vasttools.pipeline.pd.read_parquet',
side_effect=load_parquet_side_effect
Expand Down Expand Up @@ -410,7 +418,6 @@ def dummy_PipeAnalysis(
Returns:
The vtp.PipeAnalysis instance.
"""

measurement_pairs_existence_mocker = mocker.patch(
'vasttools.pipeline.PipeRun._check_measurement_pairs_file',
return_value=True
Expand Down Expand Up @@ -469,8 +476,12 @@ def dummy_PipeAnalysis_vaex(
Returns:
The vtp.PipeAnalysis instance.
"""
mock_isdir = mocker.patch('os.path.isdir', return_value=True)
mock_isfile = mocker.patch('os.path.isfile', return_value=True)
mock_isdir = mocker.patch(
'vasttools.pipeline.Path.is_dir',
return_value=True)
mock_isfile = mocker.patch(
'vasttools.pipeline.Path.is_file',
return_value=True)
pandas_read_parquet_mocker = mocker.patch(
'vasttools.pipeline.pd.read_parquet',
side_effect=load_parquet_side_effect
Expand Down Expand Up @@ -664,11 +675,13 @@ def test_init(self, mocker: MockerFixture) -> None:
Returns:
None
"""
expected_path = '/path/to/pipelineruns/'
expected_path = Path('/path/to/pipelineruns/')
mocker_getenv = mocker.patch(
'os.getenv', return_value=expected_path
)
mock_isdir = mocker.patch('os.path.isdir', return_value=True)
mock_isdir = mocker.patch(
'vasttools.pipeline.Path.is_dir',
return_value=True)

pipe = vtp.Pipeline()

Expand All @@ -687,11 +700,13 @@ def test_init_projectdir(self, mocker: MockerFixture) -> None:
Returns:
None
"""
expected_path = '/path/to/projectdir/'
expected_path = Path('/path/to/projectdir/')
mocker_abspath = mocker.patch(
'os.path.abspath', return_value=expected_path
)
mock_isdir = mocker.patch('os.path.isdir', return_value=True)
mock_isdir = mocker.patch(
'vasttools.pipeline.Path.is_dir',
return_value=True)

pipe = vtp.Pipeline(project_dir=expected_path)

Expand All @@ -709,7 +724,7 @@ def test_init_env_fail(self, mocker: MockerFixture) -> None:
Returns:
None
"""
expected_path = '/path/to/pipelineruns/'
expected_path = Path('/path/to/pipelineruns/')
mocker_getenv = mocker.patch(
'os.getenv', return_value=None
)
Expand All @@ -733,11 +748,13 @@ def test_init_project_dir_fail(self, mocker: MockerFixture) -> None:
Returns:
None
"""
expected_path = '/path/to/projectdir/'
expected_path = Path('/path/to/projectdir/')
mocker_abspath = mocker.patch(
'os.path.abspath', return_value=expected_path
)
mock_isdir = mocker.patch('os.path.isdir', return_value=False)
mock_isdir = mocker.patch(
'vasttools.pipeline.Path.is_dir',
return_value=False)

with pytest.raises(vtp.PipelineDirectoryError) as excinfo:
pipe = vtp.Pipeline(project_dir=expected_path)
Expand All @@ -760,21 +777,21 @@ def test_list_piperuns(
Returns:
None
"""
expected_path = '/path/to/pipelineruns'
expected_path = Path('/path/to/pipelineruns')

expected_result = ['job1', 'job2']

mocker_glob = mocker.patch(
'glob.glob', return_value=[
os.path.join(expected_path, 'job1'),
os.path.join(expected_path, 'job2'),
os.path.join(expected_path, 'images')
'vasttools.pipeline.Path.glob', return_value=[
expected_path / 'job1',
expected_path / 'job2',
expected_path / 'images'
]
)

result = dummy_pipeline_object.list_piperuns()

mocker_glob.assert_called_once_with(os.path.join(expected_path, '*'))
mocker_glob.assert_called_once_with('*')
assert result == expected_result

def test_list_images(
Expand All @@ -795,22 +812,21 @@ def test_list_images(
Returns:
None
"""
expected_path = '/path/to/pipelineruns'
expected_path = Path('/path/to/pipelineruns')
images_path = expected_path / 'images'

expected_result = ['image1', 'image2']

mocker_glob = mocker.patch(
'glob.glob', return_value=[
os.path.join(expected_path, 'images', 'image1'),
os.path.join(expected_path, 'images', 'image2'),
'vasttools.pipeline.Path.glob', return_value=[
images_path / 'image1',
images_path / 'image2',
]
)

result = dummy_pipeline_object.list_images()

mocker_glob.assert_called_once_with(os.path.join(
expected_path, 'images', '*'
))
mocker_glob.assert_called_once_with('*')
assert result == expected_result

def test_load_run_dir_fail(
Expand All @@ -832,7 +848,9 @@ def test_load_run_dir_fail(
Returns:
None
"""
mock_isdir = mocker.patch('os.path.isdir', return_value=False)
mock_isdir = mocker.patch(
'vasttools.pipeline.Path.is_dir',
return_value=False)

pipe = dummy_pipeline_object

Expand All @@ -859,8 +877,12 @@ def test_load_run_no_vaex(
Returns:
None
"""
mock_isdir = mocker.patch('os.path.isdir', return_value=True)
mock_isfile = mocker.patch('os.path.isfile', return_value=False)
mock_isdir = mocker.patch('vasttools.pipeline.Path.is_dir',
side_effect=[True, False, False]
)
mock_isfile = mocker.patch('vasttools.pipeline.Path.is_file',
return_value=False
)
pandas_read_parquet_mocker = mocker.patch(
'vasttools.pipeline.pd.read_parquet',
side_effect=load_parquet_side_effect
Expand Down Expand Up @@ -898,8 +920,12 @@ def test_load_run_no_vaex_check_columns(
Returns:
None
"""
mock_isdir = mocker.patch('os.path.isdir', return_value=True)
mock_isfile = mocker.patch('os.path.isfile', return_value=False)
mock_isdir = mocker.patch('vasttools.pipeline.Path.is_dir',
side_effect=[True, False, False]
)
mock_isfile = mocker.patch('vasttools.pipeline.Path.is_file',
return_value=False
)
pandas_read_parquet_mocker = mocker.patch(
'vasttools.pipeline.pd.read_parquet',
side_effect=load_parquet_side_effect
Expand Down Expand Up @@ -939,8 +965,54 @@ def test_load_run_vaex(
Returns:
None
"""
mock_isdir = mocker.patch('os.path.isdir', return_value=True)
mock_isfile = mocker.patch('os.path.isfile', return_value=True)
mock_isdir = mocker.patch(
'vasttools.pipeline.Path.is_dir',
return_value=True)
mock_isfile = mocker.patch(
'vasttools.pipeline.Path.is_file',
return_value=True)
pandas_read_parquet_mocker = mocker.patch(
'vasttools.pipeline.pd.read_parquet',
side_effect=load_parquet_side_effect
)
vaex_open_mocker = mocker.patch(
'vasttools.pipeline.vaex.open',
return_value=dummy_pipeline_measurements_vaex()
)

pipe = dummy_pipeline_object
run_name = 'test_run'
run = pipe.load_run(run_name)

assert run.name == run_name
assert run._vaex_meas is True

def test_load_run_vaex_partitioned(
self,
dummy_pipeline_object: vtp.Pipeline,
mocker: MockerFixture
) -> None:
"""
Tests the load run method.

Specifically when the arrow files are present so vaex is used.
The usual mocks are in place, including using the read parquet side
effect.

Args:
dummy_pipeline_object: The dummy Pipeline object that is used
for testing.
mocker: The pytest mocker mock object.

Returns:
None
"""
mock_isdir = mocker.patch(
'vasttools.pipeline.Path.is_dir',
return_value=True)
mock_isfile = mocker.patch(
'vasttools.pipeline.Path.is_file',
return_value=False)
pandas_read_parquet_mocker = mocker.patch(
'vasttools.pipeline.pd.read_parquet',
side_effect=load_parquet_side_effect
Expand Down Expand Up @@ -969,50 +1041,50 @@ class TestPipeAnalysis:
[
[True],
[False],
[True,True],
[False,False],
[True,False],
[True, True],
[False, False],
[True, False],
],
ids=("single-exists",
"single-no-exists",
"multiple-all-exists",
"multiple-no-exists",
"multiple-some-exists",
)
)
def test__check_measurement_pairs_file(self,
)
def test__check_measurement_pairs_file(
self,
pairs_existence: List[bool],
dummy_PipeAnalysis_base: vtp.PipeAnalysis,
mocker: MockerFixture
) -> None:
) -> None:
"""
Tests the _check_measurement_pairs_file method.

Args:
pairs_existence: A list of booleans corresponding to whether a
pairs file exists.
dummy_PipeAnalysis_base: The base dummy PipeAnalysis object.
mocker: The pytest-mock mocker object.

Returns:
None
"""
mocker_isfile = mocker.patch(
"os.path.isfile",
"vasttools.pipeline.Path.is_file",
side_effect=pairs_existence
)
fake_pairs_file = [""]*len(pairs_existence)

fake_pairs_file = [Path("")] * len(pairs_existence)

dummy_PipeAnalysis_base.measurement_pairs_file = fake_pairs_file

returned_val = dummy_PipeAnalysis_base._check_measurement_pairs_file()

all_exist = sum(pairs_existence) == len(pairs_existence)

assert returned_val == all_exist



def test_combine_with_run(
self,
dummy_PipeAnalysis: vtp.PipeAnalysis
Expand Down
Loading