Skip to content

Commit

Permalink
Allow overriding of CSR accumulators with an IndexLike.
Browse files Browse the repository at this point in the history
To support a custom indexer in TileDB-SOMA, this introduces a basic
abstraction that is used to build the Indexer used by the
CSR Accumulator.  This can then be easily overridden by an
implementation's custom subclass to swap that out without having to
duplicate substantial parts of the code.

The naming is, admittedly, not ideal; this is in part due to the naming
of the things we're trying to abstract over (the Pandas `Index` type
which has the `get_indexer` method).
  • Loading branch information
thetorpedodog authored and beroy committed Dec 19, 2023
1 parent 10dc344 commit b47bbc8
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 86 deletions.
136 changes: 54 additions & 82 deletions python-spec/src/somacore/query/_fast_csr.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,74 @@
import numba.typed
import numpy as np
import numpy.typing as npt
import pandas as pd
import pyarrow as pa
from scipy import sparse

from .. import data as scd
from . import _eager_iter
from . import types


def read_scipy_csr(
matrix: scd.SparseNDArray, obs_joinids: pa.Array, var_joinids: pa.Array
) -> sparse.csr_matrix:
"""
Given a 2D SparseNDArray and joinids for the two dimensions, read the
slice and return it as an SciPy sparse.csr_matrix.
"""
data, indptr, indices, shape = _read_csr(matrix, obs_joinids, var_joinids)
csr = _create_scipy_csr_matrix(data, indices, indptr, shape=shape)
return csr
def read_csr(
matrix: scd.SparseNDArray,
obs_joinids: pa.Array,
var_joinids: pa.Array,
index_factory: types.IndexFactory,
) -> "AccumulatedCSR":
if not isinstance(matrix, scd.SparseNDArray) or matrix.ndim != 2:
raise TypeError("Can only read from a 2D SparseNDArray")

max_workers = (os.cpu_count() or 4) + 2
with futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
acc = _CSRAccumulator(
obs_joinids=obs_joinids,
var_joinids=var_joinids,
pool=pool,
index_factory=index_factory,
)
for tbl in _eager_iter.EagerIterator(
matrix.read((obs_joinids, var_joinids)).tables(),
pool=pool,
):
acc.append(tbl["soma_dim_0"], tbl["soma_dim_1"], tbl["soma_data"])

def read_arrow_csr(
matrix: scd.SparseNDArray, obs_joinids: pa.Array, var_joinids: pa.Array
) -> pa.SparseCSRMatrix:
"""
Given a 2D SparseNDArray and joinids for the two dimensions, read the
slice and return it as a pyarrow.SparseCSRMatrix.
"""
data, indptr, indices, shape = _read_csr(matrix, obs_joinids, var_joinids)
csr = pa.SparseCSRMatrix.from_numpy(data, indptr, indices, shape=shape)
return csr
return acc.finalize()


class _CSRAccumulatorFinalResult(NamedTuple):
class AccumulatedCSR(NamedTuple):
"""
Private.
Return type for the _CSRAccumulator.finalize method.
Contains a sparse CSR consituent elements
Contains a sparse CSR's constituent elements.
"""

data: npt.NDArray[np.number]
indptr: npt.NDArray[np.integer]
indices: npt.NDArray[np.integer]
shape: Tuple[int, int]

def to_scipy(self) -> sparse.csr_matrix:
"""Create a Scipy sparse.csr_matrix from component elements.
Conceptually, this is identical to::
sparse.csr_matrix((data, indices, indptr), shape=shape)
This ugliness is to bypass the O(N) scan that
:meth:`sparse._cs_matrix.__init__`
does when a new compressed matrix is created.
See `SciPy bug 11496 <https://github.com/scipy/scipy/issues/11496>`
for details.
"""
matrix = sparse.csr_matrix.__new__(sparse.csr_matrix)
matrix.data = self.data
matrix.indptr = self.indptr
matrix.indices = self.indices
matrix._shape = self.shape
return matrix


class _CSRAccumulator:
"""
Expand All @@ -62,14 +85,15 @@ def __init__(
obs_joinids: npt.NDArray[np.int64],
var_joinids: npt.NDArray[np.int64],
pool: futures.Executor,
index_factory: types.IndexFactory,
):
self.obs_joinids = obs_joinids
self.var_joinids = var_joinids
self.pool = pool

self.shape: Tuple[int, int] = (len(self.obs_joinids), len(self.var_joinids))
self.obs_indexer: pd.Index = pd.Index(self.obs_joinids)
self.var_indexer: pd.Index = pd.Index(self.var_joinids)
self.obs_indexer = index_factory(self.obs_joinids)
self.var_indexer = index_factory(self.var_joinids)
self.row_length: npt.NDArray[np.int64] = np.zeros(
(self.shape[0],), dtype=_select_dtype(self.shape[1])
)
Expand All @@ -91,6 +115,7 @@ def append(
) -> None:
"""
At accumulation time, do several things:
* re-index to positional indices, and if possible, cast to smaller dtype
to minimize memory footprint (at cost of some amount of time)
* accumulate column counts by row, i.e., build the basis of the indptr
Expand All @@ -113,15 +138,15 @@ def append(
self.coo_chunks.append((row_ind, col_ind, data.to_numpy()))
_accum_row_length(self.row_length, row_ind)

def finalize(self) -> _CSRAccumulatorFinalResult:
def finalize(self) -> AccumulatedCSR:
nnz = sum(len(chunk[2]) for chunk in self.coo_chunks)
index_dtype = _select_dtype(nnz)
if nnz == 0:
# There is no way to infer matrix dtype, so use a default and return
# an empty matrix. Float32 is used as a default type, as it is most
# compatible with AnnData expectations.
empty = sparse.csr_matrix((0, 0), dtype=np.float32)
return _CSRAccumulatorFinalResult(
return AccumulatedCSR(
data=empty.data,
indptr=empty.indptr,
indices=empty.indices,
Expand Down Expand Up @@ -160,7 +185,7 @@ def finalize(self) -> _CSRAccumulatorFinalResult:
]
)
_finalize_indptr(indptr)
return _CSRAccumulatorFinalResult(
return AccumulatedCSR(
data=data, indptr=indptr, indices=indices, shape=self.shape
)

Expand Down Expand Up @@ -243,61 +268,8 @@ def _select_dtype(


def _reindex_and_cast(
index: pd.Index, ids: npt.NDArray[np.int64], target_dtype: npt.DTypeLike
index: types.IndexLike, ids: npt.NDArray[np.int64], target_dtype: npt.DTypeLike
) -> npt.NDArray[np.int64]:
return cast(
npt.NDArray[np.int64], index.get_indexer(ids).astype(target_dtype, copy=False)
)


def _read_csr(
matrix: scd.SparseNDArray, obs_joinids: pa.Array, var_joinids: pa.Array
) -> Tuple[
npt.NDArray[np.number], # data
npt.NDArray[np.integer], # indptr
npt.NDArray[np.integer], # indices
Tuple[int, int], # shape
]:
if not isinstance(matrix, scd.SparseNDArray) or matrix.ndim != 2:
raise TypeError("Can only read from a 2D SparseNDArray")

max_workers = (os.cpu_count() or 4) + 2
with futures.ThreadPoolExecutor(max_workers=max_workers) as pool:
acc = _CSRAccumulator(
obs_joinids=obs_joinids, var_joinids=var_joinids, pool=pool
)
for tbl in _eager_iter.EagerIterator(
matrix.read((obs_joinids, var_joinids)).tables(),
pool=pool,
):
acc.append(tbl["soma_dim_0"], tbl["soma_dim_1"], tbl["soma_data"])

data, indptr, indices, shape = acc.finalize()

return data, indptr, indices, shape


def _create_scipy_csr_matrix(
data: npt.NDArray[np.number],
indices: npt.NDArray[np.integer],
indptr: npt.NDArray[np.integer],
shape: Tuple[int, int],
) -> sparse.csr_matrix:
"""Create a Scipy sparse.csr_matrix from component elements.
Conceptually, this is identical to::
sparse.csr_matrix((data, indices, indptr), shape=shape)
This ugliness is to bypass the O(N) scan that
:meth:`scipy.sparse._cs_matrix.__init__`
does when a new compressed matrix is created.
See https://github.com/scipy/scipy/issues/11496 for details on the bug.
"""
matrix = sparse.csr_matrix.__new__(sparse.csr_matrix)
matrix.data = data
matrix.indices = indices
matrix.indptr = indptr
matrix._shape = shape
return matrix
18 changes: 14 additions & 4 deletions python-spec/src/somacore/query/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .. import options
from . import _fast_csr
from . import axis
from . import types

_RO_AUTO = options.ResultOrder.AUTO

Expand Down Expand Up @@ -88,6 +89,7 @@ def __init__(
*,
obs_query: axis.AxisQuery = axis.AxisQuery(),
var_query: axis.AxisQuery = axis.AxisQuery(),
index_factory: types.IndexFactory = pd.Index,
):
if measurement_name not in experiment.ms:
raise ValueError("Measurement does not exist in the experiment")
Expand All @@ -97,7 +99,11 @@ def __init__(

self._matrix_axis_query = _MatrixAxisQuery(obs=obs_query, var=var_query)
self._joinids = _JoinIDCache(self)
self._indexer = AxisIndexer(self)
self._indexer = AxisIndexer(
self,
index_factory=index_factory,
)
self._index_factory = index_factory
self._threadpool_: Optional[futures.ThreadPoolExecutor] = None

def obs(
Expand Down Expand Up @@ -393,9 +399,12 @@ def _read_axis_mappings(fn, axis, keys: Sequence[str]) -> Dict[str, np.ndarray]:
obs_table, var_table = self._read_both_axes(column_names)

x_matrices = {
_xname: _fast_csr.read_scipy_csr(
all_x_arrays[_xname], self.obs_joinids(), self.var_joinids()
)
_xname: _fast_csr.read_csr(
all_x_arrays[_xname],
self.obs_joinids(),
self.var_joinids(),
index_factory=self._index_factory,
).to_scipy()
for _xname in all_x_arrays
}

Expand Down Expand Up @@ -736,6 +745,7 @@ class AxisIndexer:
"""

query: ExperimentAxisQuery
_index_factory: types.IndexFactory
_cached_obs: Optional[pd.Index] = None
_cached_var: Optional[pd.Index] = None

Expand Down
33 changes: 33 additions & 0 deletions python-spec/src/somacore/query/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Common types used across SOMA query modules."""

from typing import Any, Callable

import numpy as np
import numpy.typing as npt
from typing_extensions import Protocol


class IndexLike(Protocol):
"""The basics of what we expect an Index to be.
This is a basic description of the parts of the ``pandas.Index`` type
that we use. It is intended as a rough guide so an implementor can know
that they are probably passing the right "index" type into a function,
not as a full specification of the types and behavior of ``get_indexer``.
"""

def get_indexer(
self,
target: npt.NDArray[np.int64],
method: object = ...,
limit: object = ...,
tolerance: object = ...,
) -> Any:
"""Something compatible with Pandas' Index.get_indexer method."""


IndexFactory = Callable[[npt.NDArray[np.int64]], "IndexLike"]
"""Function that builds an index over the given NDArray.
This interface is implemented by the callable ``pandas.Index``.
"""

0 comments on commit b47bbc8

Please sign in to comment.