Skip to content

Commit

Permalink
Use real AWS S3 data tests and apply related fixes (#212)
Browse files Browse the repository at this point in the history
* remove moto

* further remove moto and related

* formatting

* simplify sources work

* simplify sources gathering; fix cloud paths

* linting and pytest marking

* fix utils path; correct pytest marker

* update test

* update to add new preset for jump data

* linting

* add scheduled test specificity

* find shape

* tests for data shape; minor change for parsl conf

* testing gh actions tests

* custom testing for large data tests

* docs updates; lower chunk size

* smaller chunk size

* updates for profiling

* avoid fail fast; increase test chunk size

* remove explain

* reduce chunk size

* attempt hte

* join false for s3 sqlite

* no join with threaded

* split tests; duckdb mem limit

* update parsl

* chunk size 2000

* chunk size 1000

* 8gb duckdb memory

* remove concat test

* 800 chunk size

* remove duckdb mem limit; update ddb + pa

* try unsorted

* avoid large data copies in joins

* sort output

* chunk size 1000

* sort output and remove setting

* chunk size 3000

* readd proper data changes

* avoid column removal

* chunk size 5000; readd col removal

* 7000 chunk size

* 10000 chunksize

* chunk size 15000

* 20000 chunksize

* chunksize 30000

* chunk size 45000

* chunksize 40000

* chunksize 35000

* chunk size 30000, scoping for test data

* use tmp_path

* chunk size 31000

* add test cytominer-database cmd

* poetry env context for cytominer-database

* show command

* use static data for cytominer-database

* provide unique paths for test

* create temp dependency for testing

* try threaded tests

* attempt without temp dir

* readd temp dir

* show sql

* resolve path

* show sources

* move test files

* show more about sources

* show sources without conditional

* add missing files

* remove debug; move tests

* fix ci conditional

* lines cleanup; remove httpfs install

* testing comments

* revert to cloudpathlib ^0.18.0

* update lockfile

* prepare test ci for pr

* split test jobs

* remove large data tests from matrix

* linting

* remove dev branch

* correct comments; ci job dep

* avoid httpfs plugin error

* comment about task event trigger

Co-Authored-By: Gregory Way <[email protected]>

* comment about object storage path

Co-Authored-By: Gregory Way <[email protected]>

* limit join data for tests

* linting

---------

Co-authored-by: Gregory Way <[email protected]>
  • Loading branch information
d33bs and gwaybio authored Jul 3, 2024
1 parent 0bb98c9 commit 8e343d0
Show file tree
Hide file tree
Showing 20 changed files with 607 additions and 1,370 deletions.
26 changes: 23 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Python setup
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python_version }}
# remove poetry.lock file for scheduled tests
# to help simulate possible upstream issues
- name: Remove poetry.lock for scheduled tests
if: github.event_name == 'schedule'
# runs every Wednesday at 7 AM UTC
if: github.event.schedule == '0 7 * * 3'
run: |
rm poetry.lock
- name: Setup for poetry
Expand All @@ -40,4 +41,23 @@ jobs:
- name: Run sphinx-docs build test
run: poetry run sphinx-build docs/source doctest -W
- name: Run pytest
run: poetry run pytest
run: poetry run pytest -m "not large_data_tests"
# run large data tests as a separate job to help
# conserve resources by detecting failure with
# smaller tests first.
run_large_data_tests:
runs-on: ubuntu-22.04
needs: run_tests
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Python setup
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Setup for poetry
uses: ./.github/actions/setup-poetry
- name: Install environment
run: poetry install --no-interaction --no-ansi
- name: Run pytest for large data tests
run: poetry run pytest -m "large_data_tests"
29 changes: 11 additions & 18 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ def _get_table_columns_and_types(
import pathlib

import duckdb
from cloudpathlib import AnyPath

from cytotable.utils import _duckdb_reader, _sqlite_mixed_type_query_to_parquet

source_path = source["source_path"]
source_type = str(pathlib.Path(source_path).suffix).lower()
source_type = str(source_path.suffix).lower()

# prepare the data source in the form of a duckdb query
select_source = (
Expand Down Expand Up @@ -209,7 +210,7 @@ def _get_table_chunk_offsets(
import pathlib

import duckdb
from cloudpathlib import AnyPath
from cloudpathlib import AnyPath, CloudPath

from cytotable.exceptions import NoInputDataException
from cytotable.utils import _duckdb_reader
Expand All @@ -219,18 +220,9 @@ def _get_table_chunk_offsets(
if source is not None:
table_name = source["table_name"] if "table_name" in source.keys() else None
source_path = source["source_path"]
source_type = str(pathlib.Path(source_path).suffix).lower()
source_type = str(source_path.suffix).lower()

try:
# for csv's, check that we have more than one row (a header and data values)
if (
source_type == ".csv"
and sum(1 for _ in AnyPath(source_path).open("r")) <= 1
):
raise NoInputDataException(
f"Data file has 0 rows of values. Error in file: {source_path}"
)

# gather the total rowcount from csv or sqlite data input sources
with _duckdb_reader() as ddb_reader:
rowcount = int(
Expand Down Expand Up @@ -322,8 +314,8 @@ def _source_chunk_to_parquet(

# attempt to build dest_path
source_dest_path = (
f"{dest_path}/{str(pathlib.Path(source_group_name).stem).lower()}/"
f"{str(pathlib.Path(source['source_path']).parent.name).lower()}"
f"{dest_path}/{str(AnyPath(source_group_name).stem).lower()}/"
f"{str(source['source_path'].parent.name).lower()}"
)
pathlib.Path(source_dest_path).mkdir(parents=True, exist_ok=True)

Expand Down Expand Up @@ -364,11 +356,11 @@ def _source_chunk_to_parquet(

# build output query and filepath base
# (chunked output will append offset to keep output paths unique)
if str(AnyPath(source["source_path"]).suffix).lower() == ".csv":
if str(source["source_path"].suffix).lower() == ".csv":
base_query = f"SELECT {select_columns} FROM read_csv_auto('{str(source['source_path'])}', header=TRUE, delim=',')"
result_filepath_base = f"{source_dest_path}/{str(source['source_path'].stem)}"

elif str(AnyPath(source["source_path"]).suffix).lower() == ".sqlite":
elif str(source["source_path"].suffix).lower() == ".sqlite":
base_query = f"SELECT {select_columns} FROM sqlite_scan('{str(source['source_path'])}', '{str(source['table_name'])}')"
result_filepath_base = f"{source_dest_path}/{str(source['source_path'].stem)}.{source['table_name']}"

Expand Down Expand Up @@ -405,7 +397,7 @@ def _source_chunk_to_parquet(
# to handle the mixed types
if (
"Mismatch Type Error" in str(e)
and str(AnyPath(source["source_path"]).suffix).lower() == ".sqlite"
and str(source["source_path"].suffix).lower() == ".sqlite"
):
_write_parquet_table_with_metadata(
# here we use sqlite instead of duckdb to extract
Expand Down Expand Up @@ -817,6 +809,7 @@ def _join_source_chunk(
exclude_meta_cols = [
f"c NOT LIKE '{col}%'" for col in list(CYOTABLE_META_COLUMN_TYPES.keys())
]

with _duckdb_reader() as ddb_reader:
result = ddb_reader.execute(
f"""
Expand Down Expand Up @@ -1114,7 +1107,7 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
else []
),
**kwargs,
).result()
)

# expand the destination path
expanded_dest_path = _expand_path(path=dest_path)
Expand Down
48 changes: 48 additions & 0 deletions cytotable/presets.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,54 @@
AND per_nuclei.Nuclei_Number_Object_Number = per_cytoplasm.Cytoplasm_Parent_Nuclei
""",
},
"cellprofiler_sqlite_cpg0016_jump": {
# version specifications using related references
"CONFIG_SOURCE_VERSION": {
"cellprofiler": "v4.0.0",
},
# names of source table compartments (for ex. cells.csv, etc.)
"CONFIG_NAMES_COMPARTMENTS": ("cells", "nuclei", "cytoplasm"),
# names of source table metadata (for ex. image.csv, etc.)
"CONFIG_NAMES_METADATA": ("image",),
# column names in any compartment or metadata tables which contain
# unique names to avoid renaming
"CONFIG_IDENTIFYING_COLUMNS": (
"ImageNumber",
"ObjectNumber",
"Metadata_Well",
"Metadata_Plate",
"Parent_Cells",
"Parent_Nuclei",
),
# chunk size to use for join operations to help with possible performance issues
# note: this number is an estimate and is may need changes contingent on data
# and system used by this library.
"CONFIG_CHUNK_SIZE": 1000,
# compartment and metadata joins performed using DuckDB SQL
# and modified at runtime as needed
"CONFIG_JOINS": """
SELECT
image.Image_TableNumber,
image.Metadata_ImageNumber,
image.Metadata_Plate,
image.Metadata_Well,
image.Image_Metadata_Site,
image.Image_Metadata_Row,
cytoplasm.* EXCLUDE (Metadata_ImageNumber),
cells.* EXCLUDE (Metadata_ImageNumber),
nuclei.* EXCLUDE (Metadata_ImageNumber)
FROM
read_parquet('cytoplasm.parquet') AS cytoplasm
LEFT JOIN read_parquet('cells.parquet') AS cells ON
cells.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
AND cells.Metadata_ObjectNumber = cytoplasm.Cytoplasm_Parent_Cells
LEFT JOIN read_parquet('nuclei.parquet') AS nuclei ON
nuclei.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
AND nuclei.Metadata_ObjectNumber = cytoplasm.Cytoplasm_Parent_Nuclei
LEFT JOIN read_parquet('image.parquet') AS image ON
image.Metadata_ImageNumber = cytoplasm.Metadata_ImageNumber
""",
},
"cellprofiler_sqlite_pycytominer": {
# version specifications using related references
"CONFIG_SOURCE_VERSION": {
Expand Down
61 changes: 45 additions & 16 deletions cytotable/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@
from typing import Any, Dict, List, Optional, Union

from cloudpathlib import AnyPath
from parsl.app.app import join_app, python_app

from cytotable.exceptions import NoInputDataException

@python_app
def _build_path(
path: Union[str, pathlib.Path, AnyPath], **kwargs
) -> Union[pathlib.Path, AnyPath]:

def _build_path(path: str, **kwargs) -> Union[pathlib.Path, AnyPath]:
"""
Build a path client or return local path.
Expand Down Expand Up @@ -43,10 +41,9 @@ def _build_path(
return processed_path


@python_app
def _get_source_filepaths(
path: Union[pathlib.Path, AnyPath],
targets: List[str],
targets: Optional[List[str]] = None,
source_datatype: Optional[str] = None,
) -> Dict[str, List[Dict[str, Any]]]:
"""
Expand Down Expand Up @@ -75,7 +72,7 @@ def _get_source_filepaths(

if (targets is None or targets == []) and source_datatype is None:
raise DatatypeException(
f"A source_datatype must be specified when using undefined compartments and metadata names."
"A source_datatype must be specified when using undefined compartments and metadata names."
)

# gathers files from provided path using compartments + metadata as a filter
Expand All @@ -87,9 +84,9 @@ def _get_source_filepaths(
for subpath in (
(path,)
# used if the source path is a single file
if AnyPath(path).is_file()
if path.is_file()
# iterates through a source directory
else (x for x in AnyPath(path).glob("**/*") if AnyPath(x).is_file())
else (x for x in path.glob("**/*") if x.is_file())
)
# ensure the subpaths meet certain specifications
if (
Expand Down Expand Up @@ -129,7 +126,8 @@ def _get_source_filepaths(
.arrow()["table_name"]
.to_pylist()
# make sure the table names match with compartment + metadata names
if any(target.lower() in table_name.lower() for target in targets)
if targets is not None
and any(target.lower() in table_name.lower() for target in targets)
]
else:
# if we don't have sqlite source, append the existing element
Expand Down Expand Up @@ -181,7 +179,6 @@ def _get_source_filepaths(
return grouped_sources


@python_app
def _infer_source_datatype(
sources: Dict[str, List[Dict[str, Any]]], source_datatype: Optional[str] = None
) -> str:
Expand Down Expand Up @@ -230,7 +227,6 @@ def _infer_source_datatype(
return source_datatype


@python_app
def _filter_source_filepaths(
sources: Dict[str, List[Dict[str, Any]]], source_datatype: str
) -> Dict[str, List[Dict[str, Any]]]:
Expand Down Expand Up @@ -260,12 +256,45 @@ def _filter_source_filepaths(
if file["source_path"].stat().st_size > 0
# ensure the datatype matches the source datatype
and file["source_path"].suffix == f".{source_datatype}"
and _file_is_more_than_one_line(path=file["source_path"])
]
for filegroup, files in sources.items()
}


@join_app
def _file_is_more_than_one_line(path: Union[pathlib.Path, AnyPath]) -> bool:
"""
Check if the file has more than one line.
Args:
path (Union[pathlib.Path, AnyPath]):
The path to the file.
Returns:
bool:
True if the file has more than one line, False otherwise.
Raises:
NoInputDataException: If the file has zero lines.
"""

# if we don't have a sqlite file
# (we can't check sqlite files for lines)
if path.suffix.lower() != ".sqlite":
with path.open("r") as f:
try:
# read two lines, if the second is empty return false
return bool(f.readline() and f.readline())

except StopIteration:
# If we encounter the end of the file, it has only one line
raise NoInputDataException(
f"Data file has 0 rows of values. Error in file: {path}"
)
else:
return True


def _gather_sources(
source_path: str,
source_datatype: Optional[str] = None,
Expand Down Expand Up @@ -295,11 +324,11 @@ def _gather_sources(
_infer_source_datatype,
)

source_path = _build_path(path=source_path, **kwargs)
built_path = _build_path(path=source_path, **kwargs)

# gather filepaths which will be used as the basis for this work
sources = _get_source_filepaths(
path=source_path, targets=targets, source_datatype=source_datatype
path=built_path, targets=targets, source_datatype=source_datatype
)

# infer or validate the source datatype based on source filepaths
Expand Down
19 changes: 12 additions & 7 deletions cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ def _duckdb_reader() -> duckdb.DuckDBPyConnection:
INSTALL sqlite_scanner;
LOAD sqlite_scanner;
/* Install httpfs plugin to avoid error
https://github.com/duckdb/duckdb/issues/3243 */
INSTALL httpfs;
/*
Set threads available to duckdb
See the following for more information:
Expand Down Expand Up @@ -322,7 +326,7 @@ def _sqlite_affinity_data_type_lookup(col_type: str) -> str:
return pa.Table.from_pylist(results)


def _cache_cloudpath_to_local(path: Union[str, AnyPath]) -> pathlib.Path:
def _cache_cloudpath_to_local(path: AnyPath) -> pathlib.Path:
"""
Takes a cloudpath and uses cache to convert to a local copy
for use in scenarios where remote work is not possible (sqlite).
Expand All @@ -337,24 +341,25 @@ def _cache_cloudpath_to_local(path: Union[str, AnyPath]) -> pathlib.Path:
A local pathlib.Path to cached version of cloudpath file.
"""

candidate_path = AnyPath(path)

# check that the path is a file (caching won't work with a dir)
# and check that the file is of sqlite type
# (other file types will be handled remotely in cloud)
if candidate_path.is_file() and candidate_path.suffix.lower() == ".sqlite":
if (
isinstance(path, CloudPath)
and path.is_file()
and path.suffix.lower() == ".sqlite"
):
try:
# update the path to be the local filepath for reference in CytoTable ops
# note: incurs a data read which will trigger caching of the file
path = CloudPath(path).fspath
path = pathlib.Path(path.fspath)
except InvalidPrefixError:
# share information about not finding a cloud path
logger.info(
"Did not detect a cloud path based on prefix. Defaulting to use local path operations."
)

# cast the result as a pathlib.Path
return pathlib.Path(path)
return path


def _arrow_type_cast_if_specified(
Expand Down
Loading

0 comments on commit 8e343d0

Please sign in to comment.