From 85641a137c0311421889118c68ca8299ea76ce57 Mon Sep 17 00:00:00 2001 From: Nicolas Tessore Date: Tue, 19 Sep 2023 14:50:48 +0100 Subject: [PATCH] separate structural and content changes --- .gitignore | 3 - README.md | 3 +- heracles/__init__.py | 2 +- heracles/catalog/__init__.py | 4 +- heracles/catalog/array.py | 69 ++---- heracles/catalog/base.py | 392 +++++++++++------------------------ heracles/catalog/filters.py | 79 ++----- heracles/catalog/fits.py | 110 +++------- heracles/core.py | 56 +---- heracles/covariance.py | 119 ++++------- heracles/io.py | 316 +++++++--------------------- heracles/maps.py | 361 ++++++-------------------------- heracles/plot.py | 72 ++----- heracles/twopoint.py | 230 ++++++-------------- heracles/util.py | 82 +++----- mkdocs.yml | 4 +- pyproject.toml | 12 +- tests/test_catalog.py | 22 +- 18 files changed, 501 insertions(+), 1435 deletions(-) diff --git a/.gitignore b/.gitignore index 9bd2e7c..3be643a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,3 @@ __pycache__ _version.py .*.swp .envrc -.ipynb_checkpoints -.vscode -site diff --git a/README.md b/README.md index b56047a..41b1240 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,8 @@ # _Heracles_ — Harmonic-space statistics on the sphere -[![PyPI](https://badge.fury.io/py/heracles.svg)](https://pypi.org/project/heracles) +[![PyPI](https://img.shields.io/pypi/v/heracles)](https://pypi.org/project/heracles) [![Python](https://img.shields.io/pypi/pyversions/heracles)](https://www.python.org) [![Documentation](https://readthedocs.org/projects/heracles/badge/?version=latest)](https://heracles.readthedocs.io/en/latest/?badge=latest) [![Test](https://github.com/heracles-ec/heracles/actions/workflows/test.yml/badge.svg)](https://github.com/heracles-ec/heracles/actions/workflows/test.yml) -[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit) The _Heracles_ code was developed in the _Euclid_ Science Ground Segment. diff --git a/heracles/__init__.py b/heracles/__init__.py index 3a00859..26e7d99 100644 --- a/heracles/__init__.py +++ b/heracles/__init__.py @@ -16,7 +16,7 @@ # # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . -"""Heracles: Euclid code for harmonic-space statistics on the sphere.""" +"""Main module of the *Heracles* package.""" try: from ._version import __version__, __version_tuple__ # noqa: F401 diff --git a/heracles/catalog/__init__.py b/heracles/catalog/__init__.py index 9d79304..7566a5e 100644 --- a/heracles/catalog/__init__.py +++ b/heracles/catalog/__init__.py @@ -16,9 +16,9 @@ # # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . -"""Module for catalogue processing.""" +"""module for catalogue processing""" from .array import ArrayCatalog # noqa: F401 -from .base import Catalog, CatalogPage, CatalogView, _CatalogBase # noqa: F401 +from .base import Catalog, CatalogBase, CatalogPage, CatalogView # noqa: F401 from .filters import FootprintFilter, InvalidValueFilter # noqa: F401 from .fits import FitsCatalog # noqa: F401 diff --git a/heracles/catalog/array.py b/heracles/catalog/array.py index 47d2edc..9db3b2f 100644 --- a/heracles/catalog/array.py +++ b/heracles/catalog/array.py @@ -16,83 +16,42 @@ # # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . -"""Module for array catalogues.""" -from collections.abc import Iterator -from typing import Optional, TypeVar +"""module for array catalogues""" -from .base import CatalogPage, _CatalogBase +from .base import CatalogBase, CatalogPage -class ArrayCatalog(_CatalogBase): - """Catalogue reader for arrays.""" +class ArrayCatalog(CatalogBase): + """catalogue reader for arrays""" - def __init__(self, arr: TypeVar("Unknown")) -> None: - """Create a new array catalogue reader. - - Args: - arr: _description_ - """ + def __init__(self, arr): + """create a new array catalogue reader""" super().__init__() self._arr = arr - def __copy__(self) -> "ArrayCatalog": - """Return a copy of this catalogue. - - Returns: - _description_ - """ + def __copy__(self): + """return a copy of this catalogue""" other = super().__copy__() other._arr = self._arr return other - def _names(self) -> TypeVar("Unknown"): - """_summary_. - - Returns: - _description_ - """ + def _names(self): return self._arr.dtype.names - def _size(self, selection: Optional[TypeVar("Unknown")]) -> int: - """_summary_. - - Args: - selection: _description_ - - Returns: - _description_ - """ + def _size(self, selection): if selection is None: return len(self._arr) return len(self._arr[selection]) - def _join( - self, - first: TypeVar("Unknown"), - *other: TypeVar("Unknown"), - ) -> TypeVar("Unknown"): - """Join boolean masks. - - Args: - first: _description_ - - Returns: - _description_ - """ + def _join(self, first, *other): + """join boolean masks""" mask = first for a in other: mask = mask & a return mask - def _pages(self, selection: Optional[TypeVar("Unknown")]) -> Iterator[CatalogPage]: - """Iterate the rows of the array in pages. - - Args: - selection: _description_ - - Yields: - _description_ - """ + def _pages(self, selection): + """iterate the rows of the array in pages""" if selection is None: arr = self._arr else: diff --git a/heracles/catalog/base.py b/heracles/catalog/base.py index 55758bb..4a79bf7 100644 --- a/heracles/catalog/base.py +++ b/heracles/catalog/base.py @@ -16,29 +16,25 @@ # # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . -"""Base definition for catalogue interface.""" +"""base definition for catalogue interface""" from abc import ABCMeta, abstractmethod -from collections.abc import Iterator, Mapping +from collections.abc import Mapping from types import MappingProxyType -from typing import Optional, Protocol, TypeVar, Union, runtime_checkable +from typing import Protocol, runtime_checkable import numpy as np -import numpy.typing as npt class CatalogPage: """One batch of rows from a catalogue. Internally holds all column data as a numpy array. - """ - def _update(self) -> None: - """Update internal data after dictionary changes. + """ - Raises: - ValueError: _description_ - """ + def _update(self): + """Update internal data after dictionary changes.""" # get and check size of rows size: int = -1 for col, rows in self._data.items(): @@ -50,77 +46,47 @@ def _update(self) -> None: self._size = size def __init__(self, data: Mapping) -> None: - """Create a new catalogue page from given data. - - Args: - data: _description_ - """ + """Create a new catalogue page from given data.""" self._data = {k: np.asanyarray(v) for k, v in data.items()} for v in self._data.values(): v.flags.writeable = False self._update() - def __getitem__(self, col: Union[list, str, tuple]) -> npt.NDArray: - """Return one or more columns without checking. - - Args: - col: _description_ - - Returns: - _description_ - """ + def __getitem__(self, col): + """Return one or more columns without checking.""" if isinstance(col, (list, tuple)): return tuple(self._data[c] for c in col) return self._data[col] - def __len__(self) -> int: - """Number of columns in the page. - - Returns: - _description_ - """ + def __len__(self): + """Number of columns in the page.""" return len(self._data) - def __copy__(self) -> "CatalogPage": - """Create a copy. - - Returns: - _description_ - """ + def __copy__(self): + """Create a copy.""" return self.copy() - def __iter__(self) -> Iterator[npt.NDArray]: - """Iterate over column names. - - Yields: - _description_ - """ + def __iter__(self): + """Iterate over column names.""" yield from self._data @property - def names(self) -> list[dict]: + def names(self): """Column names in the page.""" return list(self._data) @property - def size(self) -> int: + def size(self): """Number of rows in the page.""" return self._size @property - def data(self) -> MappingProxyType: + def data(self): """Return an immutable view on the data of this page.""" return MappingProxyType(self._data) - def get(self, *col: TypeVar("Unknown")) -> Union[npt.NDArray, list[npt.NDArray]]: - """Return one or more columns with checking. - - Raises: - ValueError: _description_ - - Returns: - _description_ - """ + def get(self, *col): + """Return one or more columns with checking.""" val = [] for c in col: v = self._data[c] @@ -133,19 +99,11 @@ def get(self, *col: TypeVar("Unknown")) -> Union[npt.NDArray, list[npt.NDArray]] return val def copy(self) -> "CatalogPage": - """Create new page instance with the same data. - - Returns: - _description_ - """ + """Create new page instance with the same data.""" return CatalogPage(self._data) - def delete(self, where: TypeVar("Unknown")) -> None: - """Delete the rows indicated by ``where``. - - Args: - where: _description_ - """ + def delete(self, where) -> None: + """Delete the rows indicated by ``where``.""" for col, rows in self._data.items(): self._data[col] = np.delete(rows, where) self._update() @@ -153,165 +111,109 @@ def delete(self, where: TypeVar("Unknown")) -> None: @runtime_checkable class Catalog(Protocol): - """Protocol for catalogues.""" - - def __getitem__(self, where: TypeVar("Unknown")) -> TypeVar("Unknown"): - """Create a view with the given selection. + """protocol for catalogues""" - Args: - where: _description_ - """ + def __getitem__(self, where): + """create a view with the given selection""" ... @property - def base(self) -> Optional[TypeVar("Unknown")]: - """Return the base catalogue of a view, or ``None`` if not a view.""" + def base(self): + """return the base catalogue of a view, or ``None`` if not a view""" ... @property - def selection(self) -> Optional[TypeVar("Unknown")]: - """Return the selection of a view, or ``None`` if not a view.""" + def selection(self): + """return the selection of a view, or ``None`` if not a view""" ... @property - def names(self) -> Optional[TypeVar("Unknown")]: - """Columns in the catalogue, or ``None`` if not known.""" + def names(self): + """columns in the catalogue, or ``None`` if not known""" ... @property - def size(self) -> Optional[int]: - """Rows in the catalogue, or ``None`` if not known.""" + def size(self): + """rows in the catalogue, or ``None`` if not known""" ... @property - def visibility(self) -> TypeVar("Unknown"): - """Visibility map of the catalogue.""" + def visibility(self): + """visibility map of the catalogue""" ... - def where( - self, - selection: TypeVar("Unknown"), - visibility: Optional[TypeVar("Unknown")] = None, - ) -> TypeVar("Unknown"): - """Create a view on this catalogue with the given selection. - - Args: - selection: _description_ - visibility: _description_ - """ + def where(self, selection, visibility=None): + """create a view on this catalogue with the given selection""" ... @property - def page_size(self) -> int: - """Page size for iteration.""" + def page_size(self): + """page size for iteration""" ... - def __iter__(self) -> TypeVar("Unknown"): - """Iterate over pages of rows in the catalogue.""" + def __iter__(self): + """iterate over pages of rows in the catalogue""" ... - def select(self, selection: TypeVar("Unknown")) -> TypeVar("Unknown"): - """Iterate over pages of rows with the given selection. - - Args: - selection: _description_ - """ + def select(self, selection): + """iterate over pages of rows with the given selection""" ... class CatalogView: - """A view of a catalogue with some selection applied.""" - - def __init__( - self, - catalog: Catalog, - selection: TypeVar("Unknown"), - visibility: Optional[TypeVar("Unknown")] = None, - ) -> None: - """Create a new view. - - Args: - catalog: _description_ - selection: _description_ - visibility: _description_ - """ + """a view of a catalogue with some selection applied""" + + def __init__(self, catalog, selection, visibility=None): + """create a new view""" self._catalog = catalog self._selection = selection self._visibility = visibility - def __repr__(self) -> str: - """Object representation of this view. - - Returns: - _description_ - """ + def __repr__(self): + """object representation of this view""" return f"{self._catalog!r}[{self._selection!r}]" - def __str__(self) -> str: - """String representation of this view. - - Returns: - _description_ - """ + def __str__(self): + """string representation of this view""" return f"{self._catalog!s}[{self._selection!s}]" - def __getitem__(self, where: TypeVar("Unknown")) -> TypeVar("Unknown"): - """Return a view with a subselection of this view. - - Args: - where: _description_ - - Returns: - _description_ - """ + def __getitem__(self, where): + """return a view with a subselection of this view""" return self.where(where) @property - def base(self) -> TypeVar("Unknown"): - """Base catalogue of this view.""" + def base(self): + """base catalogue of this view""" return self._catalog @property - def selection(self) -> TypeVar("Unknown"): - """Selection of this view.""" + def selection(self): + """selection of this view""" return self._selection @property - def names(self) -> TypeVar("Unknown"): - """Column names of this view.""" + def names(self): + """column names of this view""" return self._catalog.names @property - def size(self) -> int: - """Size of this view, might not take selection into account.""" + def size(self): + """size of this view, might not take selection into account""" return self._catalog._size(self._selection) @property - def visibility(self) -> TypeVar("Unknown"): - """The visibility of this view.""" + def visibility(self): + """the visibility of this view""" if self._visibility is None: return self._catalog.visibility return self._visibility @visibility.setter - def visibility(self, visibility: TypeVar("Unknown")) -> None: - """_summary_.""" + def visibility(self, visibility): self._visibility = visibility - def where( - self, - selection: TypeVar("Unknown"), - visibility: Optional[TypeVar("Unknown")] = None, - ) -> TypeVar("Unknown"): - """Return a view with a subselection of this view. - - Args: - selection: _description_ - visibility: _description_ - - Returns: - _description_ - """ + def where(self, selection, visibility=None): + """return a view with a subselection of this view""" if isinstance(selection, (tuple, list)): joined = (self._selection, *selection) else: @@ -321,27 +223,16 @@ def where( return self._catalog.where(joined, visibility) @property - def page_size(self) -> int: - """Page size for iterating this view.""" + def page_size(self): + """page size for iterating this view""" return self._catalog.page_size - def __iter__(self) -> Iterator[TypeVar("Unknown")]: - """Iterate the catalogue with the selection of this view. - - Yields: - _description_ - """ + def __iter__(self): + """iterate the catalogue with the selection of this view""" yield from self._catalog.select(self._selection) - def select(self, selection: TypeVar("Unknown")) -> Iterator[TypeVar("Unknown")]: - """Iterate over pages of rows with the given selection. - - Args: - selection: _description_ - - Yields: - _description_ - """ + def select(self, selection): + """iterate over pages of rows with the given selection""" if isinstance(selection, (tuple, list)): joined = (self._selection, *selection) else: @@ -349,24 +240,22 @@ def select(self, selection: TypeVar("Unknown")) -> Iterator[TypeVar("Unknown")]: yield from self._catalog.select(joined) -class _CatalogBase(metaclass=ABCMeta): - """Abstract base class for base catalogues (not views).""" +class CatalogBase(metaclass=ABCMeta): + """abstract base class for base catalogues (not views)""" - _default_page_size: int = 100_000 - """Default value for page size""" + default_page_size: int = 100_000 + """default value for page size""" - def __init__(self) -> None: + def __init__(self): """Create a new catalogue instance.""" - self._page_size = self._default_page_size + + self._page_size = self.default_page_size self._filters = [] self._visibility = None - def __copy__(self) -> "_CatalogBase": - """Return a shallow copy of the catalogue. + def __copy__(self): + """return a shallow copy of the catalogue""" - Returns: - _description_ - """ other = self.__class__.__new__(self.__class__) other._page_size = self._page_size other._filters = self._filters.copy() @@ -374,140 +263,93 @@ def __copy__(self) -> "_CatalogBase": return other @abstractmethod - def _names(self) -> TypeVar("Unknown"): - """Abstract method to return the columns in the catalogue.""" + def _names(self): + """abstract method to return the columns in the catalogue""" ... @abstractmethod - def _size(self, selection: TypeVar("Unknown")) -> int: - """Abstract method to return the size of the catalogue or selection. - - Args: - selection: _description_ - """ + def _size(self, selection): + """abstract method to return the size of the catalogue or selection""" ... @abstractmethod - def _join(self, *where: TypeVar("Unknown")) -> TypeVar("Unknown"): - """Abstract method to join selections.""" + def _join(self, *where): + """abstract method to join selections""" ... @abstractmethod - def _pages(self, selection: TypeVar("Unknown")) -> TypeVar("Unknown"): - """Abstract method to iterate selected pages from the catalogue. - - Args: - selection: _description_ - """ + def _pages(self, selection): + """abstract method to iterate selected pages from the catalogue""" ... @property - def filters(self) -> TypeVar("Unknown"): - """Filters to apply to this catalogue.""" + def filters(self): + """filters to apply to this catalogue""" return self._filters @filters.setter - def filters(self, filters: TypeVar("Unknown")) -> None: - """_summary_.""" + def filters(self, filters): self._filters = filters - def add_filter(self, filt: TypeVar("Unknown")) -> None: - """Add a filter to catalogue. - - Args: - filt: _description_ - """ + def add_filter(self, filt): + """add a filter to catalogue""" self.filters.append(filt) - def __getitem__(self, where: TypeVar("Unknown")) -> TypeVar("Unknown"): - """Create a view on this catalogue with the given selection. - - Args: - where: _description_ - - Returns: - _description_ - """ + def __getitem__(self, where): + """create a view on this catalogue with the given selection""" return self.where(where) @property - def base(self) -> None: - """Returns ``None`` since this is not a view of another catalogue.""" + def base(self): + """returns ``None`` since this is not a view of another catalogue""" return @property - def selection(self) -> None: - """Returns ``None`` since this is not a view of another catalogue.""" + def selection(self): + """returns ``None`` since this is not a view of another catalogue""" return @property - def names(self) -> Optional[TypeVar("Unknown")]: - """Columns in the catalogue, or ``None`` if not known.""" + def names(self): + """columns in the catalogue, or ``None`` if not known""" return self._names() @property - def size(self) -> Optional[int]: - """Total rows in the catalogue, or ``None`` if not known.""" + def size(self): + """total rows in the catalogue, or ``None`` if not known""" return self._size(None) @property - def visibility(self) -> TypeVar("Unknown"): - """Optional visibility map for catalogue.""" + def visibility(self): + """optional visibility map for catalogue""" return self._visibility @visibility.setter - def visibility(self, visibility: TypeVar("Unknown")) -> None: - """_summary_.""" + def visibility(self, visibility): self._visibility = visibility - def where( - self, - selection: TypeVar("Unknown"), - visibility: Optional[TypeVar("Unknown")] = None, - ) -> CatalogView: - """Create a view on this catalogue with the given selection. - - Args: - selection: _description_ - visibility: _description_ - - Returns: - _description_ - """ + def where(self, selection, visibility=None): + """create a view on this catalogue with the given selection""" if isinstance(selection, (tuple, list)): selection = self._join(*selection) return CatalogView(self, selection, visibility) @property - def page_size(self) -> int: - """Number of rows per page (default: 100_000).""" + def page_size(self): + """number of rows per page (default: 100_000)""" return self._page_size @page_size.setter - def page_size(self, value: int) -> None: - """_summary_.""" + def page_size(self, value): self._page_size = value - def __iter__(self) -> Iterator[TypeVar("Unknown")]: - """Iterate over pages of rows in the catalogue. - - Yields: - _description_ - """ + def __iter__(self): + """iterate over pages of rows in the catalogue""" yield from self.select(None) - def select( - self, - selection: Union[list, tuple, TypeVar("Unknown")], - ) -> Iterator[CatalogPage]: - """Iterate over pages of rows with the given selection. - - Args: - selection: _description_ + def select(self, selection): + """iterate over pages of rows with the given selection""" - Yields: - _description_ - """ if isinstance(selection, (tuple, list)): selection = self._join(*selection) diff --git a/heracles/catalog/filters.py b/heracles/catalog/filters.py index 12e8859..0036f53 100644 --- a/heracles/catalog/filters.py +++ b/heracles/catalog/filters.py @@ -16,65 +16,46 @@ # # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . -"""Module for catalogue filters.""" +"""module for catalogue filters""" import warnings -from typing import Optional, TypeVar import healpy as hp import numpy as np -from .base import CatalogPage - class InvalidValueFilter: """Filter invalid values from a catalogue.""" - def __init__( - self, - *columns: TypeVar("Unknown"), - weight: Optional[TypeVar("Unknown")] = None, - warn: bool = True, - ) -> None: + def __init__(self, *columns, weight=None, warn=True): """Filter invalid values in the given columns. If ``warn`` is true, invalid values will emit a warning. - Args: - columns: _description_ - weight: _description_ - warn: _description_ """ - self._columns = columns - self._weight = weight - self._warn = warn - def __repr__(self) -> str: - """_summary_. + self.columns = columns + self.weight = weight + self.warn = warn - Returns: - _description_ - """ + def __repr__(self): name = self.__class__.__name__ - args = list(map(repr, self._columns)) - args += [f"weight={self._weight!r}", f"warn={self._warn!r}"] + args = list(map(repr, self.columns)) + args += [f"weight={self.weight!r}", f"warn={self.warn!r}"] args = ", ".join(args) return f"{name}({args})" - def __call__(self, page: CatalogPage) -> None: - """Filter a catalog page. + def __call__(self, page): + """Filter a catalog page.""" - Args: - page: _description_ - """ invalid_mask = np.zeros(page.size, dtype=bool) - for col in self._columns: + for col in self.columns: invalid_mask |= np.isnan(page[col]) - if self._weight is not None: - invalid_mask &= page[self._weight] != 0 + if self.weight is not None: + invalid_mask &= page[self.weight] != 0 invalid = np.where(invalid_mask)[0] if len(invalid) > 0: - if self._warn: + if self.warn: warnings.warn("WARNING: catalog contains invalid values") page.delete(invalid) @@ -82,44 +63,30 @@ def __call__(self, page: CatalogPage) -> None: class FootprintFilter: """Filter a catalogue using a footprint map.""" - def __init__(self, footprint: TypeVar("Unknown"), lon: float, lat: float) -> None: - """Filter using the given footprint map and position columns. - - Args: - footprint: _description_ - lon: _description_ - lat: _description_ - """ + def __init__(self, footprint, lon, lat): + """Filter using the given footprint map and position columns.""" self._footprint = footprint self._nside = hp.get_nside(footprint) self._lonlat = (lon, lat) @property - def footprint(self) -> TypeVar("Unknown"): - """Footprint for filter.""" + def footprint(self): + """footprint for filter""" return self._footprint @property - def lonlat(self) -> tuple[float, float]: - """Longitude and latitude columns.""" + def lonlat(self): + """longitude and latitude columns""" return self._lonlat - def __repr__(self) -> str: - """_summary_. - - Returns: - _description_ - """ + def __repr__(self): name = self.__class__.__name__ lon, lat = self.lonlat return f"{name}(..., {lon!r}, {lat!r})" - def __call__(self, page: CatalogPage) -> None: - """Filter catalog page. + def __call__(self, page): + """filter catalog page""" - Args: - page: _description_ - """ lon, lat = self._lonlat ipix = hp.ang2pix(self._nside, page[lon], page[lat], lonlat=True) exclude = np.where(self._footprint[ipix] == 0)[0] diff --git a/heracles/catalog/fits.py b/heracles/catalog/fits.py index 5f8e6fe..5d611aa 100644 --- a/heracles/catalog/fits.py +++ b/heracles/catalog/fits.py @@ -16,98 +16,57 @@ # # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . -"""Module for catalogue processing.""" +"""module for catalogue processing""" -from collections.abc import Iterator -from typing import Any, Optional, TypeVar from weakref import finalize, ref import fitsio -from .base import CatalogPage, _CatalogBase +from .base import CatalogBase, CatalogPage -def _is_table_hdu(hdu: fitsio.hdu.TableHDU) -> bool: - """Return true if HDU is a table with data. - - Args: - hdu: _description_ - - Returns: - _description_ - """ +def _is_table_hdu(hdu): + """return true if HDU is a table with data""" return isinstance(hdu, fitsio.hdu.TableHDU) and hdu.has_data() -def rowfilter(array: TypeVar("Unknown"), expr: str) -> Any: - """Filter the rows of a structured array. - - Args: - array: _description_ - expr: _description_ - - Returns: - _description_ - """ +def rowfilter(array, expr): + """filter the rows of a structured array""" return eval(expr, None, {name: array[name] for name in array.dtype.names}) -class FitsCatalog(_CatalogBase): - """Flexible reader for catalogues from FITS files.""" +class FitsCatalog(CatalogBase): + """flexible reader for catalogues from FITS files""" - def __init__( - self, - filename: TypeVar("Unknown"), - *, - columns: Optional[TypeVar("Unknown")] = None, - ext: Optional[TypeVar("Unknown")] = None, - ) -> None: - """Create a new FITS catalogue reader. + def __init__(self, filename, *, columns=None, ext=None): + """create a new FITS catalogue reader Neither opens the FITS file nor reads the catalogue immediately. - Args: - filename: _description_ - columns: _description_ - ext: _description_ """ super().__init__() self._filename = filename self._columns = columns self._ext = ext - def __copy__(self) -> "FitsCatalog": - """Return a copy of this catalog. - - Returns: - _description_ - """ + def __copy__(self): + """return a copy of this catalog""" other = super().__copy__() other._filename = self._filename other._columns = self._columns other._ext = self._ext return other - def __repr__(self) -> str: - """String representation of FitsCatalog. - - Returns: - _description_ - """ + def __repr__(self): + """string representation of FitsCatalog""" s = self._filename if self._ext is not None: s = s + f"[{self._ext!r}]" return s - def hdu(self) -> TypeVar("Unknown"): - """HDU for catalogue data. + def hdu(self): + """HDU for catalogue data""" - Raises: - TypeError: _description_ - - Returns: - _description_ - """ # see if there's a reference to hdu still around try: hdu = self._hdu() @@ -146,47 +105,26 @@ def hdu(self) -> TypeVar("Unknown"): return hdu - def _names(self) -> Optional[TypeVar("Unknown")]: - """Column names in FITS catalogue. - - Returns: - _description_ - """ + def _names(self): + """column names in FITS catalogue""" # store column names on first access if self._columns is None: self._columns = self.hdu().get_colnames() return self._columns - def _size(self, selection: TypeVar("Unknown")) -> int: - """Size of FITS catalogue; selection is ignored. - - Args: - selection: _description_ - - Returns: - _description_ - """ + def _size(self, selection): + """size of FITS catalogue; selection is ignored""" return self.hdu().get_nrows() - def _join(self, *where: TypeVar("Unknown")) -> Optional[str]: - """Join rowfilter expressions. - - Returns: - _description_ - """ + def _join(self, *where): + """join rowfilter expressions""" if not where: return None return "(" + ") & (".join(map(str, filter(None, where))) + ")" - def _pages(self, selection: TypeVar("Unknown")) -> Iterator[CatalogPage]: - """Iterate pages of rows in FITS file, optionally using the query. + def _pages(self, selection): + """iterate pages of rows in FITS file, optionally using the query""" - Args: - selection: _description_ - - Yields: - _description_ - """ # keep an unchanging local copy of the page size page_size = self.page_size diff --git a/heracles/core.py b/heracles/core.py index 408aa8d..bebf96c 100644 --- a/heracles/core.py +++ b/heracles/core.py @@ -16,28 +16,14 @@ # # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . -"""module for common core functionality.""" +"""module for common core functionality""" from collections import UserDict from collections.abc import Mapping, Sequence -from typing import Optional, TypeVar, Union -def toc_match( - key: TypeVar("Unknown"), - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, -) -> bool: - """Return whether a tocdict entry matches include/exclude criteria. - - Args: - key: _description_ - include: _description_ - exclude: _description_ - - Returns: - _description_ - """ +def toc_match(key, include=None, exclude=None): + """return whether a tocdict entry matches include/exclude criteria""" if include is not None: for pattern in include: if all(p is Ellipsis or p == k for p, k in zip(pattern, key)): @@ -51,24 +37,8 @@ def toc_match( return True -def toc_filter( - obj: Union[Sequence, Mapping], - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, -) -> Union[dict, list]: - """Return a filtered toc dict ``d``. - - Args: - obj: _description_ - include: _description_ - exclude: _description_ - - Raises: - TypeError: _description_ - - Returns: - _description_ - """ +def toc_filter(obj, include=None, exclude=None): + """return a filtered toc dict ``d``""" if isinstance(obj, Sequence): return [toc_filter(item, include, exclude) for item in obj] if isinstance(obj, Mapping): @@ -80,20 +50,10 @@ def toc_filter( # subclassing UserDict here since that returns the correct type from methods # such as __copy__(), __or__(), etc. class TocDict(UserDict): - """Table-of-contents dictionary with pattern-based lookup.""" - - def __getitem__(self, pattern: TypeVar("Unknown")) -> TypeVar("Unknown"): - """Look up one or many keys in dict. - - Args: - pattern: _description_ - - Raises: - KeyError: _description_ + """Table-of-contents dictionary with pattern-based lookup""" - Returns: - _description_ - """ + def __getitem__(self, pattern): + """look up one or many keys in dict""" # first, see if pattern is a valid entry in the dict # might fail with KeyError (no such entry) or TypeError (not hashable) try: diff --git a/heracles/covariance.py b/heracles/covariance.py index 8bf7c02..78508d7 100644 --- a/heracles/covariance.py +++ b/heracles/covariance.py @@ -16,39 +16,25 @@ # # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . -"""Module for covariance matrix computation.""" +"""module for covariance matrix computation""" + import logging import time from datetime import timedelta from itertools import combinations_with_replacement -from typing import Optional, TypeVar import healpy as hp import numpy as np -import numpy.typing as npt from ._kmeans_radec import kmeans_sample -_logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) class SampleCovariance(np.ndarray): - """Array subclass for iterative sample covariance matrix computation.""" - - def __new__( - cls, - nrows: int, - ncols: Optional[int] = None, - ) -> "SampleCovariance": - """_summary_. - - Args: - nrows: _description_ - ncols: _description_ - - Returns: - _description_ - """ + """array subclass for iterative sample covariance matrix computation""" + + def __new__(cls, nrows, ncols=None): if ncols is None: ncols = nrows cov = np.zeros((nrows, ncols)).view(cls) @@ -57,12 +43,7 @@ def __new__( cov.sample_col_mean = np.zeros(ncols) return cov - def __array_finalize__(self, cov: Optional["SampleCovariance"]) -> None: - """_summary_. - - Args: - cov: _description_ - """ + def __array_finalize__(self, cov): if cov is None: return nrows, ncols = np.shape(cov) @@ -73,21 +54,9 @@ def __array_finalize__(self, cov: Optional["SampleCovariance"]) -> None: self.sample_col_mean[:] = getattr(cov, "sample_col_mean", 0.0) -def add_sample( - cov: "SampleCovariance", - x: npt.NDArray, - y: Optional[npt.NDArray] = None, -) -> None: - """Add a sample to a sample covariance matrix. +def add_sample(cov, x, y=None): + """add a sample to a sample covariance matrix""" - Args: - cov: _description_ - x: _description_ - y: _description_ - - Raises: - ValueError: _description_ - """ x = np.reshape(x, -1) if y is None: y = x @@ -106,20 +75,16 @@ def add_sample( cov += (np.outer(delta, y - cov.sample_col_mean) - cov) / (cov.sample_count - 1) -def update_covariance(cov: "SampleCovariance", sample: TypeVar("Unknown")) -> None: - """Update a set of sample covariances given a sample. +def update_covariance(cov, sample): + """update a set of sample covariances given a sample""" - Args: - cov: _description_ - sample: _description_ - """ - _logger.info("updating covariances for %d item(s)", len(sample)) + logger.info("updating covariances for %d item(s)", len(sample)) t = time.monotonic() for (k1, v1), (k2, v2) in combinations_with_replacement(sample.items(), 2): if (k1, k2) not in cov: nrows, ncols = np.size(v1), np.size(v2) - _logger.info( + logger.info( "creating %d x %d covariance matrix for %s, %s", nrows, ncols, @@ -127,10 +92,10 @@ def update_covariance(cov: "SampleCovariance", sample: TypeVar("Unknown")) -> No k2, ) cov[k1, k2] = SampleCovariance(nrows, ncols) - _logger.info("updating covariance for %s, %s", k1, k2) + logger.info("updating covariance for %s, %s", k1, k2) add_sample(cov[k1, k2], v1, v2) - _logger.info( + logger.info( "updated %d covariance(s) in %s", len(sample) * (len(sample) + 1) // 2, timedelta(seconds=(time.monotonic() - t)), @@ -138,47 +103,33 @@ def update_covariance(cov: "SampleCovariance", sample: TypeVar("Unknown")) -> No def jackknife_regions_kmeans( - fpmap: TypeVar("Unknown"), - n: int, + fpmap, + n, *, - maxrepeat: int = 5, - maxiter: int = 1_000, - tol: float = 1e-5, - return_centers: bool = False, -) -> TypeVar("Unknown"): - """Partition a footprint map into n regions using k-means. - - Args: - fpmap: _description_ - n: _description_ - maxrepeat: _description_ - maxiter: _description_ - tol: _description_ - return_centers: _description_ - - Raises: - RuntimeError: _description_ - - Returns: - _description_ - """ + maxrepeat=5, + maxiter=1000, + tol=1e-5, + return_centers=False, +): + """partition a footprint map into n regions using k-means""" + nside = hp.get_nside(fpmap) npix = hp.nside2npix(nside) - _logger.info("partitioning map with NSIDE=%s into %s regions", nside, n) + logger.info("partitioning map with NSIDE=%s into %s regions", nside, n) t = time.monotonic() - _logger.info("finding all nonzero pixels in map") + logger.info("finding all nonzero pixels in map") ipix = np.nonzero(fpmap)[0] - _logger.info("found %d nonzero pixels in map", len(ipix)) - _logger.info("getting angles of all nonzero pixels in map") + logger.info("found %d nonzero pixels in map", len(ipix)) + logger.info("getting angles of all nonzero pixels in map") radec = np.transpose(hp.pix2ang(nside, ipix, lonlat=True)) for r in range(maxrepeat + 1): - _logger.info( + logger.info( "constructing %s regions using k-means%s", n, "" if r == 0 else f" (repeat {r})", @@ -187,9 +138,9 @@ def jackknife_regions_kmeans( km = kmeans_sample(radec, n, verbose=0) if km.converged: - _logger.info("k-means converged") + logger.info("k-means converged") break - _logger.info("k-means not converged; repeat") + logger.info("k-means not converged; repeat") else: msg = ( f"k-means failed to partition map into {n} regions after repeat {maxrepeat}" @@ -201,15 +152,15 @@ def jackknife_regions_kmeans( areas = 60**4 // 100 / np.pi / npix * np.bincount(km.labels) area_mean, area_std, area_unit = np.mean(areas), np.std(areas), "deg2" if area_mean < 1.0: - area_mean, area_std, area_unit = area_mean / 3_600, area_std / 3_600, "arcmin2" + area_mean, area_std, area_unit = area_mean / 3600, area_std / 3600, "arcmin2" if area_mean < 1.0: - area_mean, area_std, area_unit = area_mean / 3_600, area_std / 3_600, "arcsec2" - _logger.info("region area is %.3f ± %.3f %s", area_mean, area_std, area_unit) + area_mean, area_std, area_unit = area_mean / 3600, area_std / 3600, "arcsec2" + logger.info("region area is %.3f ± %.3f %s", area_mean, area_std, area_unit) jkmap = np.zeros(npix, dtype=int) jkmap[ipix] = km.labels + 1 - _logger.info("partitioned map in %s", timedelta(seconds=(time.monotonic() - t))) + logger.info("partitioned map in %s", timedelta(seconds=(time.monotonic() - t))) if return_centers: result = jkmap, km.centers diff --git a/heracles/io.py b/heracles/io.py index 3ceed05..2f88a40 100644 --- a/heracles/io.py +++ b/heracles/io.py @@ -16,20 +16,18 @@ # # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . -"""Module for file reading and writing.""" +"""module for file reading and writing""" import logging import os -from typing import Optional, TypeVar import fitsio import healpy as hp import numpy as np -import numpy.typing as npt from .core import TocDict, toc_match -_logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) _METADATA_COMMENTS = { @@ -46,27 +44,15 @@ } -def _write_metadata(hdu: TypeVar("Unknown"), metadata: TypeVar("Unknown")) -> None: - """Write array metadata to FITS HDU. - - Args: - hdu: _description_ - metadata: _description_ - """ +def _write_metadata(hdu, metadata): + """write array metadata to FITS HDU""" md = metadata or {} for key, value in md.items(): hdu.write_key("META " + key.upper(), value, _METADATA_COMMENTS.get(key)) -def _read_metadata(hdu: TypeVar("Unknown")) -> dict: - """Read array metadata from FITS HDU. - - Args: - hdu: _description_ - - Returns: - _description_ - """ +def _read_metadata(hdu): + """read array metadata from FITS HDU""" h = hdu.read_header() md = {} for key in h: @@ -75,23 +61,8 @@ def _read_metadata(hdu: TypeVar("Unknown")) -> dict: return md -def read_mask( - mask_name: TypeVar("Unknown"), - nside: Optional[TypeVar("Unknown")] = None, - field: int = 0, - extra_mask_name: Optional[TypeVar("Unknown")] = None, -) -> npt.NDArray: - """Read visibility map from a HEALPix map file. - - Args: - mask_name: _description_ - nside: _description_ - field: _description_ - extra_mask_name: _description_ - - Returns: - _description_ - """ +def read_mask(mask_name, nside=None, field=0, extra_mask_name=None): + """read visibility map from a HEALPix map file""" mask = hp.read_map(mask_name, field=field) # set unseen pixels to zero @@ -120,28 +91,22 @@ def read_mask( def write_maps( - filename: str, - maps: TocDict, + filename, + maps, *, - clobber: bool = False, - workdir: str = ".", - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, -) -> None: - """Write a set of maps to FITS file. + clobber=False, + workdir=".", + include=None, + exclude=None, +): + """write a set of maps to FITS file If the output file exists, the new estimates will be appended, unless the ``clobber`` parameter is set to ``True``. - Args: - filename: _description_ - maps: _description_ - clobber: _description_ - workdir: _description_ - include: _description_ - exclude: _description_ """ - _logger.info("writing %d maps to %s", len(maps), filename) + + logger.info("writing %d maps to %s", len(maps), filename) # full path to FITS file path = os.path.join(workdir, filename) @@ -175,7 +140,7 @@ def write_maps( if not toc_match((n, i), include=include, exclude=exclude): continue - _logger.info("writing %s map for bin %s", n, i) + logger.info("writing %s map for bin %s", n, i) # the cl extension name ext = f"MAP{mapn}" @@ -221,28 +186,13 @@ def write_maps( tocentry[0] = (ext, n, i) fits["MAPTOC"].append(tocentry) - _logger.info("done with %d maps", len(maps)) + logger.info("done with %d maps", len(maps)) -def read_maps( - filename: str, - workdir: str = ".", - *, - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, -) -> TocDict: - """Read a set of maps from a FITS file. - - Args: - filename: _description_ - workdir: _description_ - include: _description_ - exclude: _description_ - - Returns: - _description_ - """ - _logger.info("reading maps from %s", filename) +def read_maps(filename, workdir=".", *, include=None, exclude=None): + """read a set of maps from a FITS file""" + + logger.info("reading maps from %s", filename) # full path to FITS file path = os.path.join(workdir, filename) @@ -263,7 +213,7 @@ def read_maps( if not toc_match((n, i), include=include, exclude=exclude): continue - _logger.info("reading %s map for bin %s", n, i) + logger.info("reading %s map for bin %s", n, i) # read the map from the extension m = fits[ext].read() @@ -279,35 +229,29 @@ def read_maps( # store in set of maps maps[n, i] = m - _logger.info("done with %d maps", len(maps)) + logger.info("done with %d maps", len(maps)) # return the dictionary of maps return maps def write_alms( - filename: str, - alms: TocDict, + filename, + alms, *, - clobber: bool = False, - workdir: str = ".", - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, -) -> None: - """Write a set of alms to FITS file. + clobber=False, + workdir=".", + include=None, + exclude=None, +): + """write a set of alms to FITS file If the output file exists, the new estimates will be appended, unless the ``clobber`` parameter is set to ``True``. - Args: - filename: _description_ - alms: _description_ - clobber: _description_ - workdir: _description_ - include: _description_ - exclude: _description_ """ - _logger.info("writing %d alms to %s", len(alms), filename) + + logger.info("writing %d alms to %s", len(alms), filename) # full path to FITS file path = os.path.join(workdir, filename) @@ -341,7 +285,7 @@ def write_alms( if not toc_match((n, i), include=include, exclude=exclude): continue - _logger.info("writing %s alm for bin %s", n, i) + logger.info("writing %s alm for bin %s", n, i) # the cl extension name ext = f"ALM{almn}" @@ -357,28 +301,13 @@ def write_alms( tocentry[0] = (ext, n, i) fits["ALMTOC"].append(tocentry) - _logger.info("done with %d alms", len(alms)) + logger.info("done with %d alms", len(alms)) -def read_alms( - filename: str, - workdir: str = ".", - *, - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, -) -> TocDict: - """Read a set of alms from a FITS file. - - Args: - filename: _description_ - workdir: _description_ - include: _description_ - exclude: _description_ - - Returns: - _description_ - """ - _logger.info("reading alms from %s", filename) +def read_alms(filename, workdir=".", *, include=None, exclude=None): + """read a set of alms from a FITS file""" + + logger.info("reading alms from %s", filename) # full path to FITS file path = os.path.join(workdir, filename) @@ -399,7 +328,7 @@ def read_alms( if not toc_match((n, i), include=include, exclude=exclude): continue - _logger.info("reading %s alm for bin %s", n, i) + logger.info("reading %s alm for bin %s", n, i) # read the alm from the extension raw = fits[ext].read() @@ -414,34 +343,21 @@ def read_alms( # store in set of alms alms[n, i] = alm - _logger.info("done with %d alms", len(alms)) + logger.info("done with %d alms", len(alms)) # return the dictionary of alms return alms -def write_cls( - filename: str, - cls, - *, - clobber: bool = False, - workdir: str = ".", - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, -) -> None: - """Write a set of cls to FITS file. +def write_cls(filename, cls, *, clobber=False, workdir=".", include=None, exclude=None): + """write a set of cls to FITS file If the output file exists, the new estimates will be appended, unless the ``clobber`` parameter is set to ``True``. - Args: - filename: _description_ - clobber: _description_ - workdir: _description_ - include: _description_ - exclude: _description_ """ - _logger.info("writing %d cls to %s", len(cls), filename) + + logger.info("writing %d cls to %s", len(cls), filename) # full path to FITS file path = os.path.join(workdir, filename) @@ -475,7 +391,7 @@ def write_cls( if not toc_match((k1, k2, i1, i2), include=include, exclude=exclude): continue - _logger.info("writing %s x %s cl for bins %s, %s", k1, k2, i1, i2) + logger.info("writing %s x %s cl for bins %s, %s", k1, k2, i1, i2) # the cl extension name ext = f"CL{cln}" @@ -511,28 +427,13 @@ def write_cls( tocentry[0] = (ext, k1, k2, i1, i2) fits["CLTOC"].append(tocentry) - _logger.info("done with %d cls", len(cls)) + logger.info("done with %d cls", len(cls)) -def read_cls( - filename: str, - workdir: str = ".", - *, - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, -) -> TocDict: - """Read a set of cls from a FITS file. - - Args: - filename: _description_ - workdir: _description_ - include: _description_ - exclude: _description_ - - Returns: - _description_ - """ - _logger.info("reading cls from %s", filename) +def read_cls(filename, workdir=".", *, include=None, exclude=None): + """read a set of cls from a FITS file""" + + logger.info("reading cls from %s", filename) # full path to FITS file path = os.path.join(workdir, filename) @@ -553,7 +454,7 @@ def read_cls( if not toc_match((k1, k2, i1, i2), include=include, exclude=exclude): continue - _logger.info("reading %s x %s cl for bins %s, %s", k1, k2, i1, i2) + logger.info("reading %s x %s cl for bins %s, %s", k1, k2, i1, i2) # read the cl from the extension cl = fits[ext].read() @@ -564,35 +465,21 @@ def read_cls( # store in set of cls cls[k1, k2, i1, i2] = cl - _logger.info("done with %d cls", len(cls)) + logger.info("done with %d cls", len(cls)) # return the dictionary of cls return cls -def write_mms( - filename: str, - mms: TypeVar("Unknown"), - *, - clobber: bool = False, - workdir: str = ".", - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, -) -> None: - """Write a set of mixing matrices to FITS file. +def write_mms(filename, mms, *, clobber=False, workdir=".", include=None, exclude=None): + """write a set of mixing matrices to FITS file If the output file exists, the new mixing matrices will be appended, unless the ``clobber`` parameter is set to ``True``. - Args: - filename: _description_ - mms: _description_ - clobber: _description_ - workdir: _description_ - include: _description_ - exclude: _description_ """ - _logger.info("writing %d mm(s) to %s", len(mms), filename) + + logger.info("writing %d mm(s) to %s", len(mms), filename) # full path to FITS file path = os.path.join(workdir, filename) @@ -626,7 +513,7 @@ def write_mms( if not toc_match((n, i1, i2), include=include, exclude=exclude): continue - _logger.info("writing mixing matrix %s for bins %s, %s", n, i1, i2) + logger.info("writing mixing matrix %s for bins %s, %s", n, i1, i2) # the mm extension name ext = f"MM{mmn}" @@ -651,28 +538,13 @@ def write_mms( tocentry[0] = (ext, n, i1, i2) fits["MMTOC"].append(tocentry) - _logger.info("done with %d mm(s)", len(mms)) + logger.info("done with %d mm(s)", len(mms)) -def read_mms( - filename: str, - workdir: str = ".", - *, - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, -) -> TypeVar("Unknown"): - """Read a set of mixing matrices from a FITS file. - - Args: - filename: _description_ - workdir: _description_ - include: _description_ - exclude: _description_ - - Returns: - _description_ - """ - _logger.info("reading mixing matrices from %s", filename) +def read_mms(filename, workdir=".", *, include=None, exclude=None): + """read a set of mixing matrices from a FITS file""" + + logger.info("reading mixing matrices from %s", filename) # full path to FITS file path = os.path.join(workdir, filename) @@ -693,7 +565,7 @@ def read_mms( if not toc_match((n, i1, i2), include=include, exclude=exclude): continue - _logger.info("reading mixing matrix %s for bins %s, %s", n, i1, i2) + logger.info("reading mixing matrix %s for bins %s, %s", n, i1, i2) # read the mixing matrix from the extension mm = fits[ext].read() @@ -704,34 +576,21 @@ def read_mms( # store in set of mms mms[n, i1, i2] = mm - _logger.info("done with %d mm(s)", len(mms)) + logger.info("done with %d mm(s)", len(mms)) # return the dictionary of mms return mms -def write_cov( - filename: str, - cov: TypeVar("Unknown"), - clobber: bool = False, - workdir: str = ".", - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, -) -> None: - """Write a set of covariance matrices to FITS file. +def write_cov(filename, cov, clobber=False, workdir=".", include=None, exclude=None): + """write a set of covariance matrices to FITS file If the output file exists, the new estimates will be appended, unless the ``clobber`` parameter is set to ``True``. - Args: - filename: _description_ - cov: _description_ - clobber: _description_ - workdir: _description_ - include: _description_ - exclude: _description_ """ - _logger.info("writing %d covariances to %s", len(cov), filename) + + logger.info("writing %d covariances to %s", len(cov), filename) # full path to FITS file path = os.path.join(workdir, filename) @@ -779,7 +638,7 @@ def write_cov( ext = f"COV{extn}" extn += 1 - _logger.info("writing %s x %s covariance matrix", k1, k2) + logger.info("writing %s x %s covariance matrix", k1, k2) # write the covariance matrix as an image fits.write_image(mat, extname=ext) @@ -800,28 +659,13 @@ def write_cov( tocentry[0] = (ext, *k1, *k2) fits["COVTOC"].append(tocentry) - _logger.info("done with %d covariance(s)", len(cov)) + logger.info("done with %d covariance(s)", len(cov)) -def read_cov( - filename: str, - workdir: str = ".", - *, - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, -) -> TocDict: - """Read a set of covariances matrices from a FITS file. - - Args: - filename: _description_ - workdir: _description_ - include: _description_ - exclude: _description_ - - Returns: - _description_ - """ - _logger.info("reading covariance matrices from %s", filename) +def read_cov(filename, workdir=".", *, include=None, exclude=None): + """read a set of covariances matrices from a FITS file""" + + logger.info("reading covariance matrices from %s", filename) # full path to FITS file path = os.path.join(workdir, filename) @@ -844,7 +688,7 @@ def read_cov( if not toc_match((k1, k2), include=include, exclude=exclude): continue - _logger.info("reading %s x %s covariance matrix", k1, k2) + logger.info("reading %s x %s covariance matrix", k1, k2) # read the covariance matrix from the extension mat = fits[ext].read() @@ -855,7 +699,7 @@ def read_cov( # store in set cov[k1, k2] = mat - _logger.info("done with %d covariance(s)", len(cov)) + logger.info("done with %d covariance(s)", len(cov)) # return the toc dict of covariances return cov diff --git a/heracles/maps.py b/heracles/maps.py index 5383162..f7e6732 100644 --- a/heracles/maps.py +++ b/heracles/maps.py @@ -16,7 +16,7 @@ # # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . -"""Module for map-making.""" +"""module for map-making""" import logging import typing as t @@ -34,18 +34,11 @@ if t.TYPE_CHECKING: from .catalog import Catalog, CatalogPage -_logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) -def _nativebyteorder(fn: t.Callable) -> t.Any: - """Utility decorator to convert inputs to native byteorder. - - Args: - fn: _description_ - - Returns: - _description_ - """ +def _nativebyteorder(fn): + """utility decorator to convert inputs to native byteorder""" @wraps(fn) def wrapper(*inputs): @@ -61,35 +54,14 @@ def wrapper(*inputs): @_nativebyteorder @njit(nogil=True, fastmath=True) -def _map_pos(pos: t.TypeVar("Unknown"), ipix: t.TypeVar("Unknown")) -> None: - """_summary_. - - Args: - pos: _description_ - ipix: _description_ - """ +def _map_pos(pos, ipix): for i in ipix: pos[i] += 1 @_nativebyteorder @njit(nogil=True, fastmath=True) -def _map_real( - wht: t.TypeVar("Unknown"), - val: t.TypeVar("Unknown"), - ipix: t.TypeVar("Unknown"), - w: t.TypeVar("Unknown"), - v: t.TypeVar("Unknown"), -) -> None: - """_summary_. - - Args: - wht: _description_ - val: _description_ - ipix: _description_ - w: _description_ - v: _description_ - """ +def _map_real(wht, val, ipix, w, v): for i, w_i, v_i in zip(ipix, w, v): wht[i] += w_i val[i] += w_i / wht[i] * (v_i - val[i]) @@ -97,24 +69,7 @@ def _map_real( @_nativebyteorder @njit(nogil=True, fastmath=True) -def _map_complex( - wht: t.TypeVar("Unknown"), - val: t.TypeVar("Unknown"), - ipix: t.TypeVar("Unknown"), - w: t.TypeVar("Unknown"), - re: t.TypeVar("Unknown"), - im: t.TypeVar("Unknown"), -) -> None: - """_summary_. - - Args: - wht: _description_ - val: _description_ - ipix: _description_ - w: _description_ - re: _description_ - im: _description_ - """ +def _map_complex(wht, val, ipix, w, re, im): for i, w_i, re_i, im_i in zip(ipix, w, re, im): wht[i] += w_i val[0, i] += w_i / wht[i] * (re_i - val[0, i]) @@ -123,34 +78,13 @@ def _map_complex( @_nativebyteorder @njit(nogil=True, fastmath=True) -def _map_weight( - wht: t.TypeVar("Unknown"), - ipix: t.TypeVar("Unknown"), - w: t.TypeVar("Unknown"), -) -> None: - """_summary_. - - Args: - wht: _description_ - ipix: _description_ - w: _description_ - """ +def _map_weight(wht, ipix, w): for i, w_i in zip(ipix, w): wht[i] += w_i -def update_metadata( - array: t.TypeVar("Unknown"), - **metadata: t.TypeVar("Unknown"), -) -> None: - """Update metadata of an array dtype. - - Args: - array: _description_ - - Raises: - ValueError: _description_ - """ +def update_metadata(array, **metadata): + """update metadata of an array dtype""" md = {} if array.dtype.metadata is not None: md.update(array.dtype.metadata) @@ -171,28 +105,25 @@ def update_metadata( # type alias for map data -_MapData = np.ndarray +MapData = np.ndarray # type hint for functions returned by map generators -_MapFunction = t.Callable[["CatalogPage"], None] +MapFunction = t.Callable[["CatalogPage"], None] # type hint for map generators -_MapGenerator = t.Generator[_MapFunction, None, _MapData] +MapGenerator = t.Generator[MapFunction, None, MapData] -class _Map(metaclass=ABCMeta): +class Map(metaclass=ABCMeta): """Abstract base class for map making from catalogues. Concrete classes must implement the `__call__()` method which takes a catalogue instance and returns a generator for mapping. + """ def __init__(self, columns: tuple[t.Optional[str]]) -> None: - """Initialise the map. - - Args: - columns: _description_ - """ + """Initialise the map.""" self._columns = columns super().__init__() @@ -202,31 +133,21 @@ def columns(self) -> tuple[t.Optional[str]]: return self._columns @abstractmethod - def __call__(self, catalog: "Catalog") -> t.Union[_MapData, _MapGenerator]: - """Implementation for mapping a catalogue. - - Args: - catalog: _description_ - - Returns: - _description_ - """ + def __call__(self, catalog: "Catalog") -> t.Union[MapData, MapGenerator]: + """Implementation for mapping a catalogue.""" ... -class HealpixMap(_Map): +class HealpixMap(Map): """Abstract base class for HEALPix map making. HEALPix maps have a resolution parameter, available as the ``nside`` property. - """ - def __init__(self, nside: int, **kwargs: t.Any) -> None: - """Initialize map with the given nside parameter. + """ - Args: - nside: _description_ - """ + def __init__(self, nside: int, **kwargs) -> None: + """Initialize map with the given nside parameter.""" self._nside: int = nside super().__init__(**kwargs) @@ -241,25 +162,21 @@ def nside(self, nside: int) -> None: self._nside = nside -class RandomizableMap(_Map): +class RandomizableMap(Map): """Abstract base class for randomisable maps. Randomisable maps have a ``randomize`` property that determines whether or not the maps are randomised. - """ - def __init__(self, randomize: bool, **kwargs: t.Any) -> None: - """Initialise map with the given randomize property. + """ - Args: - randomize: _description_ - """ + def __init__(self, randomize: bool, **kwargs) -> None: + """Initialise map with the given randomize property.""" self._randomize = randomize super().__init__(**kwargs) @property def randomize(self) -> bool: - """_summary_.""" return self._randomize @randomize.setter @@ -268,27 +185,23 @@ def randomize(self, randomize: bool) -> None: self._randomize = randomize -class NormalizableMap(_Map): +class NormalizableMap(Map): """Abstract base class for normalisable maps. A normalised map is a map that is divided by its mean weight. Normalisable maps have a ``normalize`` property that determines whether or not the maps are normalised. - """ - def __init__(self, normalize: bool, **kwargs: t.Any) -> None: - """Initialise map with the given normalize property. + """ - Args: - normalize: _description_ - """ + def __init__(self, normalize: bool, **kwargs) -> None: + """Initialise map with the given normalize property.""" self._normalize = normalize super().__init__(**kwargs) @property def normalize(self) -> bool: - """_summary_.""" return self._normalize @normalize.setter @@ -302,6 +215,7 @@ class PositionMap(HealpixMap, RandomizableMap): Can produce both overdensity maps and number count maps, depending on the ``overdensity`` property. + """ def __init__( @@ -313,15 +227,7 @@ def __init__( overdensity: bool = True, randomize: bool = False, ) -> None: - """Create a position map with the given properties. - - Args: - nside: _description_ - lon: _description_ - lat: _description_ - overdensity: _description_ - randomize: _description_ - """ + """Create a position map with the given properties.""" super().__init__(columns=(lon, lat), nside=nside, randomize=randomize) self._overdensity: bool = overdensity @@ -332,21 +238,11 @@ def overdensity(self) -> bool: @overdensity.setter def overdensity(self, overdensity: bool) -> None: - """_summary_.""" self._overdensity = overdensity - def __call__(self, catalog: "Catalog") -> _MapGenerator: - """Map the given catalogue. - - Args: - catalog: _description_ + def __call__(self, catalog: "Catalog") -> MapGenerator: + """Map the given catalogue.""" - Returns: - _description_ - - Yields: - _description_ - """ # get catalogue column definition col = self.columns @@ -359,12 +255,8 @@ def __call__(self, catalog: "Catalog") -> _MapGenerator: # keep track of the total number of galaxies ngal = 0 + # function to map catalogue data def mapper(page: "CatalogPage") -> None: - """Function to map catalogue data. - - Args: - page: _description_ - """ nonlocal ngal if not self._randomize: @@ -443,34 +335,17 @@ def __init__( *, normalize: bool = True, ) -> None: - """Create a new real map. - - Args: - nside: _description_ - lon: _description_ - lat: _description_ - value: _description_ - weight: _description_ - normalize: _description_ - """ + """Create a new real map.""" + super().__init__( columns=(lon, lat, value, weight), nside=nside, normalize=normalize, ) - def __call__(self, catalog: "Catalog") -> _MapGenerator: - """Map real values from catalogue to HEALPix map. + def __call__(self, catalog: "Catalog") -> MapGenerator: + """Map real values from catalogue to HEALPix map.""" - Args: - catalog: _description_ - - Returns: - _description_ - - Yields: - _description_ - """ # get the column definition of the catalogue *col, wcol = self.columns @@ -486,12 +361,8 @@ def __call__(self, catalog: "Catalog") -> _MapGenerator: ngal = 0 wmean, var = 0.0, 0.0 + # go through pages in catalogue and map values def mapper(page: "CatalogPage") -> None: - """Go through pages in catalogue and map values. - - Args: - page: _description_ - """ nonlocal ngal, wmean, var if wcol is not None: @@ -561,6 +432,7 @@ class ComplexMap(HealpixMap, NormalizableMap, RandomizableMap): Can optionally flip the sign of the second shear component, depending on the ``conjugate`` property. + """ def __init__( @@ -577,20 +449,8 @@ def __init__( normalize: bool = True, randomize: bool = False, ) -> None: - """Create a new shear map. - - Args: - nside: _description_ - lon: _description_ - lat: _description_ - real: _description_ - imag: _description_ - weight: _description_ - spin: _description_ - conjugate: _description_ - normalize: _description_ - randomize: _description_ - """ + """Create a new shear map.""" + self._spin: int = spin self._conjugate: bool = conjugate super().__init__( @@ -620,18 +480,9 @@ def conjugate(self, conjugate: bool) -> None: """Set the conjugate flag.""" self._conjugate = conjugate - def __call__(self, catalog: "Catalog") -> _MapGenerator: - """Map shears from catalogue to HEALPix map. - - Args: - catalog: _description_ - - Returns: - _description_ + def __call__(self, catalog: "Catalog") -> MapGenerator: + """Map shears from catalogue to HEALPix map.""" - Yields: - _description_ - """ # get the column definition of the catalogue *col, wcol = self.columns @@ -651,14 +502,9 @@ def __call__(self, catalog: "Catalog") -> _MapGenerator: ngal = 0 wmean, var = 0.0, 0.0 + # go through pages in catalogue and get the shear values, + # randomise if asked to, and do the mapping def mapper(page: "CatalogPage") -> None: - """Go through pages in catalogue. - - Get the shear values, randomise if asked to, and do the mapping. - - Args: - page: _description_ - """ nonlocal ngal, wmean, var if wcol is not None: @@ -733,25 +579,12 @@ class VisibilityMap(HealpixMap): """Copy visibility map from catalogue at given resolution.""" def __init__(self, nside: int) -> None: - """Create visibility map at given NSIDE parameter. - - Args: - nside: _description_ - """ + """Create visibility map at given NSIDE parameter.""" super().__init__(columns=(), nside=nside) - def __call__(self, catalog: "Catalog") -> _MapData: - """Create a visibility map from the given catalogue. + def __call__(self, catalog: "Catalog") -> MapData: + """Create a visibility map from the given catalogue.""" - Args: - catalog: _description_ - - Raises: - ValueError: _description_ - - Returns: - _description_ - """ # make sure that catalogue has a visibility map vmap = catalog.visibility if vmap is None: @@ -785,31 +618,14 @@ def __init__( lat: str, weight: str, *, - normalize: bool = True, + normalize=True, ) -> None: - """Create a new weight map. - - Args: - nside: _description_ - lon: _description_ - lat: _description_ - weight: _description_ - normalize: _description_ - """ + """Create a new weight map.""" super().__init__(columns=(lon, lat, weight), nside=nside, normalize=normalize) - def __call__(self, catalog: "Catalog") -> _MapGenerator: - """Map catalogue weights. + def __call__(self, catalog: "Catalog") -> MapGenerator: + """Map catalogue weights.""" - Args: - catalog: _description_ - - Returns: - _description_ - - Yields: - _description_ - """ # get the columns for this map *col, wcol = self.columns @@ -820,12 +636,8 @@ def __call__(self, catalog: "Catalog") -> _MapGenerator: # create the weight map wht = np.zeros(npix) + # map catalogue def mapper(page: "CatalogPage") -> None: - """Map catalogue. - - Args: - page: _description_ - """ lon, lat = page.get(*col) if wcol is None: @@ -861,23 +673,12 @@ def mapper(page: "CatalogPage") -> None: return wht -_Spin2Map = partial(ComplexMap, spin=2) -ShearMap = _Spin2Map -EllipticityMap = _Spin2Map - +Spin2Map = partial(ComplexMap, spin=2) +ShearMap = Spin2Map +EllipticityMap = Spin2Map -def _close_and_return(generator: t.Generator) -> t.Any: - """_summary_. - Args: - generator: _description_ - - Raises: - RuntimeError: _description_ - - Returns: - _description_ - """ +def _close_and_return(generator): try: next(generator) except StopIteration as end: @@ -888,7 +689,7 @@ def _close_and_return(generator: t.Generator) -> t.Any: def map_catalogs( - maps: t.Mapping[t.Any, _Map], + maps: t.Mapping[t.Any, Map], catalogs: t.Mapping[t.Any, "Catalog"], *, parallel: bool = False, @@ -896,24 +697,9 @@ def map_catalogs( include: t.Optional[t.Sequence[tuple[t.Any, t.Any]]] = None, exclude: t.Optional[t.Sequence[tuple[t.Any, t.Any]]] = None, progress: bool = False, -) -> dict[tuple[t.Any, t.Any], _MapData]: - """Make maps for a set of catalogues. - - Args: - maps: _description_ - catalogs: _description_ - parallel: _description_ - out: _description_ - include: _description_ - exclude: _description_ - progress: _description_ - - Raises: - RuntimeError: _description_ - - Returns: - _description_ - """ +) -> dict[tuple[t.Any, t.Any], MapData]: + """Make maps for a set of catalogues.""" + # the toc dict of maps if out is None: out = TocDict() @@ -1017,25 +803,14 @@ def map_catalogs( def transform_maps( - maps: t.Mapping[tuple[t.Any, t.Any], _MapData], + maps: t.Mapping[tuple[t.Any, t.Any], MapData], *, - out: t.Optional[t.MutableMapping[t.Any, t.Any]] = None, + out: t.MutableMapping[t.Any, t.Any] = None, progress: bool = False, - **kwargs: t.Any, + **kwargs, ) -> dict[tuple[t.Any, t.Any], np.ndarray]: - """Transform a set of maps to alms. + """transform a set of maps to alms""" - Args: - maps: _description_ - out: _description_ - progress: _description_ - - Raises: - NotImplementedError: _description_ - - Returns: - _description_ - """ # the output toc dict if out is None: out = TocDict() @@ -1054,7 +829,7 @@ def transform_maps( md = m.dtype.metadata or {} spin = md.get("spin", 0) - _logger.info("transforming %s map (spin %s) for bin %s", k, spin, i) + logger.info("transforming %s map (spin %s) for bin %s", k, spin, i) if spin == 0: pol = False diff --git a/heracles/plot.py b/heracles/plot.py index df9f083..444608b 100644 --- a/heracles/plot.py +++ b/heracles/plot.py @@ -16,30 +16,21 @@ # # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . -"""Utility functions for plotting.""" +"""utility functions for plotting""" from collections import defaultdict from collections.abc import Mapping from itertools import chain, count, cycle -from typing import Optional, TypeVar -import matplotlib as mpl import matplotlib.pyplot as plt import numpy as np from cycler import cycler -_DEFAULT_CYCLER = cycler(linestyle=["-", "--", ":", "-."]) +DEFAULT_CYCLER = cycler(linestyle=["-", "--", ":", "-."]) -def _dont_draw_zero_tick(tick: mpl.axis.Tick) -> None: - """Custom draw function for ticks that does not draw zero. - - Args: - tick: _description_ - - Returns: - _description_ - """ +def _dont_draw_zero_tick(tick): + """custom draw function for ticks that does not draw zero""" draw = tick.draw def wrap(*args, **kwargs): @@ -51,52 +42,27 @@ def wrap(*args, **kwargs): return wrap -def _pad_ylim(ymin: float, ymax: float) -> tuple[float, float]: - """Pad the y axis range depending on signs. - - Args: - ymin: _description_ - ymax: _description_ - - Returns: - _description_ - """ +def _pad_ylim(ymin, ymax): + """pad the y axis range depending on signs""" return (ymin * 10 ** (-np.sign(ymin) / 2), ymax * 10 ** (np.sign(ymax) / 2)) def postage_stamps( - plot: Optional[TypeVar("Unknown")] = None, - transpose: Optional[TypeVar("Unknown")] = None, + plot=None, + transpose=None, *, - scale: Optional[TypeVar("Unknown")] = None, - trxshift: int = 0, - tryshift: int = 0, - stampsize: float = 1.0, - hatch_empty: bool = False, - linscale: float = 0.01, - cycler: Optional[TypeVar("Unknown")] = None, -) -> mpl.figure.Figure: - """Create a postage stamp plot for cls. - - Args: - plot: _description_ - transpose: _description_ - scale: _description_ - trxshift: _description_ - tryshift: _description_ - stampsize: _description_ - hatch_empty: _description_ - linscale: _description_ - cycler: _description_ - - Raises: - ValueError: _description_ - - Returns: - _description_ - """ + scale=None, + trxshift=0, + tryshift=0, + stampsize=1.0, + hatch_empty=False, + linscale=0.01, + cycler=None, +): + """create a postage stamp plot for cls""" + if cycler is None: - cycler = _DEFAULT_CYCLER + cycler = DEFAULT_CYCLER if plot is None and transpose is None: msg = "missing plot data" diff --git a/heracles/twopoint.py b/heracles/twopoint.py index 0602fa2..3235079 100644 --- a/heracles/twopoint.py +++ b/heracles/twopoint.py @@ -16,17 +16,15 @@ # # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . -"""Module for angular power spectrum estimation.""" +"""module for angular power spectrum estimation""" import logging import time from datetime import timedelta from itertools import combinations_with_replacement, product -from typing import Any, Optional, TypeVar, Union import healpy as hp import numpy as np -import numpy.typing as npt from convolvecl import mixmat, mixmat_eb from .core import TocDict, toc_match @@ -40,37 +38,20 @@ update_metadata, ) -_logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) -def angular_power_spectra( - alms: TocDict, - alms2: Optional[TocDict] = None, - *, - lmax: Optional[int] = None, - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, -) -> TocDict: - """Compute angular power spectra from a set of alms. - - Args: - alms: _description_ - alms2: _description_ - lmax: _description_ - include: _description_ - exclude: _description_ - - Returns: - _description_ - """ - _logger.info( +def angular_power_spectra(alms, alms2=None, *, lmax=None, include=None, exclude=None): + """compute angular power spectra from a set of alms""" + + logger.info( "computing cls for %d%s alm(s)", len(alms), f"x{len(alms2)}" if alms2 is not None else "", ) t = time.monotonic() - _logger.info("using LMAX = %s for cls", lmax) + logger.info("using LMAX = %s for cls", lmax) # collect all alm combinations for computing cls if alms2 is None: @@ -98,7 +79,7 @@ def angular_power_spectra( if not toc_match((k1, k2, i1, i2), include, exclude): continue - _logger.info("computing %s x %s cl for bins %s, %s", k1, k2, i1, i2) + logger.info("computing %s x %s cl for bins %s, %s", k1, k2, i1, i2) # compute the raw cl from the alms cl = hp.alm2cl(alm1, alm2, lmax_out=lmax) @@ -125,7 +106,7 @@ def angular_power_spectra( # keep track of names twopoint_names.add((k1, k2)) - _logger.info( + logger.info( "computed %d cl(s) in %s", len(cls), timedelta(seconds=(time.monotonic() - t)), @@ -135,22 +116,10 @@ def angular_power_spectra( return cls -def debias_cls( - cls, - bias: Optional[TypeVar("Unknown")] = None, - *, - inplace: bool = False, -) -> Union[TypeVar("Unknown"), TocDict]: - """Remove bias from cls. - - Args: - bias: _description_ - inplace: _description_ +def debias_cls(cls, bias=None, *, inplace=False): + """remove bias from cls""" - Returns: - _description_ - """ - _logger.info("debiasing %d cl(s)%s", len(cls), " in place" if inplace else "") + logger.info("debiasing %d cl(s)%s", len(cls), " in place" if inplace else "") t = time.monotonic() # the output toc dict @@ -158,7 +127,7 @@ def debias_cls( # subtract bias of each cl in turn for key in cls: - _logger.info("debiasing %s x %s cl for bins %s, %s", *key) + logger.info("debiasing %s x %s cl for bins %s, %s", *key) cl = cls[key] md = cl.dtype.metadata or {} @@ -188,7 +157,7 @@ def debias_cls( # store debiased cl in output set out[key] = cl - _logger.info( + logger.info( "debiased %d cl(s) in %s", len(out), timedelta(seconds=(time.monotonic() - t)), @@ -198,19 +167,10 @@ def debias_cls( return out -def depixelate_cls(cls, *, inplace: bool = False) -> Union[TypeVar("Unknown"), TocDict]: - """Remove discretisation kernel from cls. +def depixelate_cls(cls, *, inplace=False): + """remove discretisation kernel from cls""" - Args: - inplace: _description_ - - Raises: - ValueError: _description_ - - Returns: - _description_ - """ - _logger.info("depixelate %d cl(s)%s", len(cls), " in place" if inplace else "") + logger.info("depixelate %d cl(s)%s", len(cls), " in place" if inplace else "") t = time.monotonic() # keep a cache of convolution kernels (i.e. pixel window functions) @@ -223,7 +183,7 @@ def depixelate_cls(cls, *, inplace: bool = False) -> Union[TypeVar("Unknown"), T # remove effect of convolution for each cl in turn for key in cls: - _logger.info("depixelate %s x %s cl for bins %s, %s", *key) + logger.info("depixelate %s x %s cl for bins %s, %s", *key) cl = cls[key] md = cl.dtype.metadata or {} @@ -244,7 +204,7 @@ def depixelate_cls(cls, *, inplace: bool = False) -> Union[TypeVar("Unknown"), T # deconvolve the kernels of the first and second map for i, spin, kernel in zip([1, 2], spins, kernels): - _logger.info("- spin-%s %s kernel", spin, kernel) + logger.info("- spin-%s %s kernel", spin, kernel) if kernel is None: fl = None a = None @@ -256,7 +216,7 @@ def depixelate_cls(cls, *, inplace: bool = False) -> Union[TypeVar("Unknown"), T fls[kernel][nside, lmax, 2] = fl2 fl = fls[kernel].get((nside, lmax, spin)) if fl is None: - _logger.warning( + logger.warning( "no HEALPix kernel for NSIDE = %s, LMAX = %s, SPIN = %s", nside, lmax, @@ -284,7 +244,7 @@ def depixelate_cls(cls, *, inplace: bool = False) -> Union[TypeVar("Unknown"), T # store depixelated cl in output set out[key] = cl - _logger.info( + logger.info( "depixelated %d cl(s) in %s", len(out), timedelta(seconds=(time.monotonic() - t)), @@ -294,27 +254,13 @@ def depixelate_cls(cls, *, inplace: bool = False) -> Union[TypeVar("Unknown"), T return out -def mixing_matrices( - cls, - *, - l1max: Optional[TypeVar("Unknown")] = None, - l2max: Optional[TypeVar("Unknown")] = None, - l3max: Optional[TypeVar("Unknown")] = None, -) -> TocDict: - """Compute mixing matrices from a set of cls. - - Args: - l1max: _description_ - l2max: _description_ - l3max: _description_ - - Returns: - _description_ - """ - _logger.info("computing two-point mixing matrices for %d cl(s)", len(cls)) +def mixing_matrices(cls, *, l1max=None, l2max=None, l3max=None): + """compute mixing matrices from a set of cls""" + + logger.info("computing two-point mixing matrices for %d cl(s)", len(cls)) t = time.monotonic() - _logger.info("using L1MAX = %s, L2MAX = %s, L3MAX = %s", l1max, l2max, l3max) + logger.info("using L1MAX = %s, L2MAX = %s, L3MAX = %s", l1max, l2max, l3max) # set of computed mixing matrices mms = TocDict() @@ -325,19 +271,19 @@ def mixing_matrices( if cl.dtype.names is not None: cl = cl["CL"] if k1 == "V" and k2 == "V": - _logger.info("computing 00 mixing matrix for bins %s, %s", i1, i2) + logger.info("computing 00 mixing matrix for bins %s, %s", i1, i2) w00 = mixmat(cl, l1max=l1max, l2max=l2max, l3max=l3max) mms["00", i1, i2] = w00 elif k1 == "V" and k2 == "W": - _logger.info("computing 0+ mixing matrix for bins %s, %s", i1, i2) + logger.info("computing 0+ mixing matrix for bins %s, %s", i1, i2) w0p = mixmat(cl, l1max=l1max, l2max=l2max, l3max=l3max, spin=(0, 2)) mms["0+", i1, i2] = w0p elif k1 == "W" and k2 == "V": - _logger.info("computing 0+ mixing matrix for bins %s, %s", i2, i1) + logger.info("computing 0+ mixing matrix for bins %s, %s", i2, i1) w0p = mixmat(cl, l1max=l1max, l2max=l2max, l3max=l3max, spin=(2, 0)) mms["0+", i2, i1] = w0p elif k1 == "W" and k2 == "W": - _logger.info("computing ++, --, +- mixing matrices for bins %s, %s", i1, i2) + logger.info("computing ++, --, +- mixing matrices for bins %s, %s", i1, i2) wpp, wmm, wpm = mixmat_eb( cl, l1max=l1max, @@ -349,7 +295,7 @@ def mixing_matrices( mms["--", i1, i2] = wmm mms["+-", i1, i2] = wpm else: - _logger.warning( + logger.warning( "computing unknown %s x %s mixing matrix for bins %s, %s", k1, k2, @@ -359,7 +305,7 @@ def mixing_matrices( w = mixmat(cl, l1max=l1max, l2max=l2max, l3max=l3max) mms[f"{k1}{k2}", i1, i2] = w - _logger.info( + logger.info( "computed %d mm(s) in %s", len(mms), timedelta(seconds=(time.monotonic() - t)), @@ -369,24 +315,11 @@ def mixing_matrices( return mms -def pixelate_mms_healpix( - mms: TypeVar("Unknown"), - nside: TypeVar("Unknown"), - *, - inplace: bool = False, -) -> Union[TypeVar("Unknown"), TocDict]: - """Apply HEALPix pixel window function to mms. - - Args: - mms: _description_ - nside: _description_ - inplace: _description_ +def pixelate_mms_healpix(mms, nside, *, inplace=False): + """apply HEALPix pixel window function to mms""" - Returns: - _description_ - """ - _logger.info("pixelate %d mm(s)%s", len(mms), " in place" if inplace else "") - _logger.info("kernel: HEALPix, NSIDE=%d", nside) + logger.info("pixelate %d mm(s)%s", len(mms), " in place" if inplace else "") + logger.info("kernel: HEALPix, NSIDE=%d", nside) t = time.monotonic() # pixel window functions @@ -402,7 +335,7 @@ def pixelate_mms_healpix( # apply discretisation kernel from cl to each mm in turn for key in mms: - _logger.info("pixelate %s mm for bins %s, %s", *key) + logger.info("pixelate %s mm for bins %s, %s", *key) mm = mms[key] if not inplace: @@ -410,7 +343,7 @@ def pixelate_mms_healpix( n = np.shape(mm)[-2] if n >= lmax: - _logger.error( + logger.error( "no HEALPix pixel window function for NSIDE=%d and LMAX=%d", nside, n - 1, @@ -424,13 +357,13 @@ def pixelate_mms_healpix( elif name in ["++", "--", "+-", "-+"]: mm *= fl2[:n] * fl2[:n] else: - _logger.warning("unknown mixing matrix, assuming spin-0") + logger.warning("unknown mixing matrix, assuming spin-0") mm *= fl0[:n] * fl0[:n] # store pixelated mm in output set out[key] = mm - _logger.info( + logger.info( "pixelated %d mm(s) in %s", len(out), timedelta(seconds=(time.monotonic() - t)), @@ -440,35 +373,11 @@ def pixelate_mms_healpix( return out -def bin2pt( - arr: TypeVar("Unknown"), - bins: TypeVar("Unknown"), - name: TypeVar("Unknown"), - *, - weights: Optional[TypeVar("Unknown")] = None, -) -> npt.NDArray: - """Compute binned two-point data. - - Args: - arr: _description_ - bins: _description_ - name: _description_ - weights: _description_ - - Returns: - _description_ - """ +def bin2pt(arr, bins, name, *, weights=None): + """Compute binned two-point data.""" - def norm(a: npt.NDArray, b: npt.NDArray) -> npt.NDArray: - """Divide a by b if a is nonzero. - - Args: - a: _description_ - b: _description_ - - Returns: - _description_ - """ + def norm(a, b): + """divide a by b if a is nonzero""" out = np.zeros(np.broadcast(a, b).shape) return np.divide(a, b, where=(a != 0), out=out) @@ -533,7 +442,8 @@ def norm(a: npt.NDArray, b: npt.NDArray) -> npt.NDArray: def binned_cls(cls, bins, *, weights=None, out=None): - """Compute binned angular power spectra.""" + """compute binned angular power spectra""" + if out is None: out = TocDict() @@ -544,7 +454,8 @@ def binned_cls(cls, bins, *, weights=None, out=None): def binned_mms(mms, bins, *, weights=None, out=None): - """Compute binned mixing matrices.""" + """compute binned mixing matrices""" + if out is None: out = TocDict() @@ -555,36 +466,25 @@ def binned_mms(mms, bins, *, weights=None, out=None): def random_bias( - maps: TypeVar("Unknown"), - catalogs: TypeVar("Unknown"), + maps, + catalogs, *, - repeat: int = 1, - full: bool = False, - parallel: bool = False, - include: Optional[TypeVar("Unknown")] = None, - exclude: Optional[TypeVar("Unknown")] = None, - progress: bool = False, - **kwargs: Any, -) -> TocDict: - """Bias estimate from randomised maps. + repeat=1, + full=False, + parallel=False, + include=None, + exclude=None, + progress=False, + **kwargs, +): + """bias estimate from randomised maps The ``include`` and ``exclude`` selection is applied to the maps. - Args: - maps: _description_ - catalogs: _description_ - repeat: _description_ - full: _description_ - parallel: _description_ - include: _description_ - exclude: _description_ - progress: _description_ - - Returns: - _description_ """ - _logger.info("estimating two-point bias for %d catalog(s)", len(catalogs)) - _logger.info("randomising %s maps", ", ".join(map(str, maps))) + + logger.info("estimating two-point bias for %d catalog(s)", len(catalogs)) + logger.info("randomising %s maps", ", ".join(map(str, maps))) t = time.monotonic() # grab lmax parameter if given @@ -593,7 +493,7 @@ def random_bias( # include will be set below after we have the first set of alms include_cls = None if full: - _logger.info("estimating cross-biases") + logger.info("estimating cross-biases") # set all input maps to randomize # store and later reset their initial state @@ -605,7 +505,7 @@ def random_bias( nbs = TocDict() for n in range(repeat): - _logger.info( + logger.info( "estimating bias from randomised maps%s", "" if n == 0 else f" (repeat {n+1})", ) @@ -634,7 +534,7 @@ def random_bias( for k, m in maps.items(): m.randomize = randomize[k] - _logger.info( + logger.info( "estimated %d two-point biases in %s", len(nbs), timedelta(seconds=(time.monotonic() - t)), diff --git a/heracles/util.py b/heracles/util.py index 3b61784..f485d4f 100644 --- a/heracles/util.py +++ b/heracles/util.py @@ -16,76 +16,54 @@ # # You should have received a copy of the GNU Lesser General Public # License along with Heracles. If not, see . -"""Module for utilities.""" -import io +"""module for utilities""" + import os import sys import time from datetime import timedelta -from typing import Optional, TypeVar class Progress: - """Simple progress bar for operations.""" - - def __init__(self, out: io.IOBase = sys.stdout) -> None: - """Create a new progress bar. - - Args: - out: _description_ - """ - self._out = out - self._time = 0 - self._progress = 0 - self._total = 0 - self._title = None + """simple progress bar for operations""" - def start( - self, - total: TypeVar("Unknown"), - title: Optional[str] = None, - ) -> None: - """Start new progress. + def __init__(self, out=sys.stdout): + """create a new progress bar""" + self.out = out + self.time = 0 + self.progress = 0 + self.total = 0 + self.title = None - Args: - total: _description_ - title: _description_ - """ - self._time = time.monotonic() - self._progress = 0 - self._total = total - self._title = title + def start(self, total, title=None): + """start new progress""" + self.time = time.monotonic() + self.progress = 0 + self.total = total + self.title = title self.update(0) - def update(self, step: int = 1) -> None: - """Update progress. - - Args: - step: _description_ - """ - self._progress = min(self._progress + step, self._total) - m = f"{self._title!s}: " if self._title is not None else "" - p = self._progress / self._total + def update(self, step=1): + """update progress""" + self.progress = min(self.progress + step, self.total) + m = f"{self.title!s}: " if self.title is not None else "" + p = self.progress / self.total b = "#" * int(20 * p) - f = f"{self._progress:_}/{self._total:_}" - t = timedelta(seconds=(time.monotonic() - self._time)) + f = f"{self.progress:_}/{self.total:_}" + t = timedelta(seconds=(time.monotonic() - self.time)) s = f"\r{m}{100*p:3.0f}% |{b:20s}| {f} | {t}" try: - w, _ = os.get_terminal_size(self._out.fileno()) + w, _ = os.get_terminal_size(self.out.fileno()) except (OSError, AttributeError): pass else: if w > 0: s = s[:w] - self._out.write(s) - self._out.flush() - - def stop(self, complete: bool = True) -> None: - """Stop progress and end line. + self.out.write(s) + self.out.flush() - Args: - complete: _description_ - """ + def stop(self, complete=True): + """stop progress and end line""" if complete: - self.update(self._total - self._progress) - self._out.write("\n") + self.update(self.total - self.progress) + self.out.write("\n") diff --git a/mkdocs.yml b/mkdocs.yml index bfbdf00..4d7e213 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -29,11 +29,11 @@ plugins: # headings show_category_heading: true show_object_full_path: false - show_root_toc_entry: false + show_root_toc_entry: true # members filters: ["!^__?"] inherited_members: true - show_submodules: true + show_submodules: false # docstrings docstring_options: ignore_init_summary: true diff --git a/pyproject.toml b/pyproject.toml index 2ac69f0..9a98163 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,24 +71,14 @@ extend-exclude = [ ] fix = true force-exclude = true -ignore = [ - "D203", - "D213", - "D401", - "D406", - "D407", - "D417", -] line-length = 100 -per-file-ignores = {"tests/*" = [ - "D", +per-file-ignores = {"test_*" = [ "S101", ]} select = [ "A", "BLE", "COM", - "D", "DJ", "DTZ", "E", diff --git a/tests/test_catalog.py b/tests/test_catalog.py index 77c7094..912d04d 100644 --- a/tests/test_catalog.py +++ b/tests/test_catalog.py @@ -5,7 +5,7 @@ @pytest.fixture def catalog(): - from heracles.catalog import CatalogPage, _CatalogBase + from heracles.catalog import CatalogBase, CatalogPage # fix a set of rows to be returned for testing size = 100 @@ -13,7 +13,7 @@ def catalog(): y = np.random.rand(size) z = np.random.rand(size) - class TestCatalog(_CatalogBase): + class TestCatalog(CatalogBase): SIZE = size DATA = dict(x=x, y=y, z=z) @@ -145,21 +145,21 @@ def test_catalog_page_immutable(): def test_catalog_base(catalog): - from heracles.catalog import Catalog, _CatalogBase + from heracles.catalog import Catalog, CatalogBase # ABC cannot be instantiated directly with pytest.raises(TypeError): - _CatalogBase() + CatalogBase() # fixture has tested concrete implementation - assert isinstance(catalog, _CatalogBase) + assert isinstance(catalog, CatalogBase) # check that CatalogBase implements the Catalog protocol assert isinstance(catalog, Catalog) def test_catalog_base_properties(catalog): - from heracles.catalog import _CatalogBase + from heracles.catalog import CatalogBase assert catalog.size == catalog.SIZE assert catalog.names == list(catalog.DATA.keys()) @@ -167,11 +167,11 @@ def test_catalog_base_properties(catalog): assert catalog.base is None assert catalog.selection is None - assert catalog.page_size == _CatalogBase._default_page_size + assert catalog.page_size == CatalogBase.default_page_size catalog.page_size = 1 assert catalog.page_size == 1 - catalog.page_size = _CatalogBase._default_page_size - assert catalog.page_size == _CatalogBase._default_page_size + catalog.page_size = CatalogBase.default_page_size + assert catalog.page_size == CatalogBase.default_page_size filt = object() assert catalog.filters == [] @@ -202,9 +202,9 @@ def test_catalog_base_pagination(catalog): def test_catalog_base_copy(): - from heracles.catalog import _CatalogBase + from heracles.catalog import CatalogBase - class TestCatalog(_CatalogBase): + class TestCatalog(CatalogBase): def __init__(self): super().__init__() self._visibility = object()