diff --git a/.gitignore b/.gitignore
index 9bd2e7c..3be643a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,3 @@ __pycache__
_version.py
.*.swp
.envrc
-.ipynb_checkpoints
-.vscode
-site
diff --git a/README.md b/README.md
index b56047a..41b1240 100644
--- a/README.md
+++ b/README.md
@@ -1,9 +1,8 @@
# _Heracles_ — Harmonic-space statistics on the sphere
-[![PyPI](https://badge.fury.io/py/heracles.svg)](https://pypi.org/project/heracles)
+[![PyPI](https://img.shields.io/pypi/v/heracles)](https://pypi.org/project/heracles)
[![Python](https://img.shields.io/pypi/pyversions/heracles)](https://www.python.org)
[![Documentation](https://readthedocs.org/projects/heracles/badge/?version=latest)](https://heracles.readthedocs.io/en/latest/?badge=latest)
[![Test](https://github.com/heracles-ec/heracles/actions/workflows/test.yml/badge.svg)](https://github.com/heracles-ec/heracles/actions/workflows/test.yml)
-[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)
The _Heracles_ code was developed in the _Euclid_ Science Ground Segment.
diff --git a/heracles/__init__.py b/heracles/__init__.py
index 3a00859..26e7d99 100644
--- a/heracles/__init__.py
+++ b/heracles/__init__.py
@@ -16,7 +16,7 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see .
-"""Heracles: Euclid code for harmonic-space statistics on the sphere."""
+"""Main module of the *Heracles* package."""
try:
from ._version import __version__, __version_tuple__ # noqa: F401
diff --git a/heracles/catalog/__init__.py b/heracles/catalog/__init__.py
index 9d79304..7566a5e 100644
--- a/heracles/catalog/__init__.py
+++ b/heracles/catalog/__init__.py
@@ -16,9 +16,9 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see .
-"""Module for catalogue processing."""
+"""module for catalogue processing"""
from .array import ArrayCatalog # noqa: F401
-from .base import Catalog, CatalogPage, CatalogView, _CatalogBase # noqa: F401
+from .base import Catalog, CatalogBase, CatalogPage, CatalogView # noqa: F401
from .filters import FootprintFilter, InvalidValueFilter # noqa: F401
from .fits import FitsCatalog # noqa: F401
diff --git a/heracles/catalog/array.py b/heracles/catalog/array.py
index 47d2edc..9db3b2f 100644
--- a/heracles/catalog/array.py
+++ b/heracles/catalog/array.py
@@ -16,83 +16,42 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see .
-"""Module for array catalogues."""
-from collections.abc import Iterator
-from typing import Optional, TypeVar
+"""module for array catalogues"""
-from .base import CatalogPage, _CatalogBase
+from .base import CatalogBase, CatalogPage
-class ArrayCatalog(_CatalogBase):
- """Catalogue reader for arrays."""
+class ArrayCatalog(CatalogBase):
+ """catalogue reader for arrays"""
- def __init__(self, arr: TypeVar("Unknown")) -> None:
- """Create a new array catalogue reader.
-
- Args:
- arr: _description_
- """
+ def __init__(self, arr):
+ """create a new array catalogue reader"""
super().__init__()
self._arr = arr
- def __copy__(self) -> "ArrayCatalog":
- """Return a copy of this catalogue.
-
- Returns:
- _description_
- """
+ def __copy__(self):
+ """return a copy of this catalogue"""
other = super().__copy__()
other._arr = self._arr
return other
- def _names(self) -> TypeVar("Unknown"):
- """_summary_.
-
- Returns:
- _description_
- """
+ def _names(self):
return self._arr.dtype.names
- def _size(self, selection: Optional[TypeVar("Unknown")]) -> int:
- """_summary_.
-
- Args:
- selection: _description_
-
- Returns:
- _description_
- """
+ def _size(self, selection):
if selection is None:
return len(self._arr)
return len(self._arr[selection])
- def _join(
- self,
- first: TypeVar("Unknown"),
- *other: TypeVar("Unknown"),
- ) -> TypeVar("Unknown"):
- """Join boolean masks.
-
- Args:
- first: _description_
-
- Returns:
- _description_
- """
+ def _join(self, first, *other):
+ """join boolean masks"""
mask = first
for a in other:
mask = mask & a
return mask
- def _pages(self, selection: Optional[TypeVar("Unknown")]) -> Iterator[CatalogPage]:
- """Iterate the rows of the array in pages.
-
- Args:
- selection: _description_
-
- Yields:
- _description_
- """
+ def _pages(self, selection):
+ """iterate the rows of the array in pages"""
if selection is None:
arr = self._arr
else:
diff --git a/heracles/catalog/base.py b/heracles/catalog/base.py
index 55758bb..4a79bf7 100644
--- a/heracles/catalog/base.py
+++ b/heracles/catalog/base.py
@@ -16,29 +16,25 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see .
-"""Base definition for catalogue interface."""
+"""base definition for catalogue interface"""
from abc import ABCMeta, abstractmethod
-from collections.abc import Iterator, Mapping
+from collections.abc import Mapping
from types import MappingProxyType
-from typing import Optional, Protocol, TypeVar, Union, runtime_checkable
+from typing import Protocol, runtime_checkable
import numpy as np
-import numpy.typing as npt
class CatalogPage:
"""One batch of rows from a catalogue.
Internally holds all column data as a numpy array.
- """
- def _update(self) -> None:
- """Update internal data after dictionary changes.
+ """
- Raises:
- ValueError: _description_
- """
+ def _update(self):
+ """Update internal data after dictionary changes."""
# get and check size of rows
size: int = -1
for col, rows in self._data.items():
@@ -50,77 +46,47 @@ def _update(self) -> None:
self._size = size
def __init__(self, data: Mapping) -> None:
- """Create a new catalogue page from given data.
-
- Args:
- data: _description_
- """
+ """Create a new catalogue page from given data."""
self._data = {k: np.asanyarray(v) for k, v in data.items()}
for v in self._data.values():
v.flags.writeable = False
self._update()
- def __getitem__(self, col: Union[list, str, tuple]) -> npt.NDArray:
- """Return one or more columns without checking.
-
- Args:
- col: _description_
-
- Returns:
- _description_
- """
+ def __getitem__(self, col):
+ """Return one or more columns without checking."""
if isinstance(col, (list, tuple)):
return tuple(self._data[c] for c in col)
return self._data[col]
- def __len__(self) -> int:
- """Number of columns in the page.
-
- Returns:
- _description_
- """
+ def __len__(self):
+ """Number of columns in the page."""
return len(self._data)
- def __copy__(self) -> "CatalogPage":
- """Create a copy.
-
- Returns:
- _description_
- """
+ def __copy__(self):
+ """Create a copy."""
return self.copy()
- def __iter__(self) -> Iterator[npt.NDArray]:
- """Iterate over column names.
-
- Yields:
- _description_
- """
+ def __iter__(self):
+ """Iterate over column names."""
yield from self._data
@property
- def names(self) -> list[dict]:
+ def names(self):
"""Column names in the page."""
return list(self._data)
@property
- def size(self) -> int:
+ def size(self):
"""Number of rows in the page."""
return self._size
@property
- def data(self) -> MappingProxyType:
+ def data(self):
"""Return an immutable view on the data of this page."""
return MappingProxyType(self._data)
- def get(self, *col: TypeVar("Unknown")) -> Union[npt.NDArray, list[npt.NDArray]]:
- """Return one or more columns with checking.
-
- Raises:
- ValueError: _description_
-
- Returns:
- _description_
- """
+ def get(self, *col):
+ """Return one or more columns with checking."""
val = []
for c in col:
v = self._data[c]
@@ -133,19 +99,11 @@ def get(self, *col: TypeVar("Unknown")) -> Union[npt.NDArray, list[npt.NDArray]]
return val
def copy(self) -> "CatalogPage":
- """Create new page instance with the same data.
-
- Returns:
- _description_
- """
+ """Create new page instance with the same data."""
return CatalogPage(self._data)
- def delete(self, where: TypeVar("Unknown")) -> None:
- """Delete the rows indicated by ``where``.
-
- Args:
- where: _description_
- """
+ def delete(self, where) -> None:
+ """Delete the rows indicated by ``where``."""
for col, rows in self._data.items():
self._data[col] = np.delete(rows, where)
self._update()
@@ -153,165 +111,109 @@ def delete(self, where: TypeVar("Unknown")) -> None:
@runtime_checkable
class Catalog(Protocol):
- """Protocol for catalogues."""
-
- def __getitem__(self, where: TypeVar("Unknown")) -> TypeVar("Unknown"):
- """Create a view with the given selection.
+ """protocol for catalogues"""
- Args:
- where: _description_
- """
+ def __getitem__(self, where):
+ """create a view with the given selection"""
...
@property
- def base(self) -> Optional[TypeVar("Unknown")]:
- """Return the base catalogue of a view, or ``None`` if not a view."""
+ def base(self):
+ """return the base catalogue of a view, or ``None`` if not a view"""
...
@property
- def selection(self) -> Optional[TypeVar("Unknown")]:
- """Return the selection of a view, or ``None`` if not a view."""
+ def selection(self):
+ """return the selection of a view, or ``None`` if not a view"""
...
@property
- def names(self) -> Optional[TypeVar("Unknown")]:
- """Columns in the catalogue, or ``None`` if not known."""
+ def names(self):
+ """columns in the catalogue, or ``None`` if not known"""
...
@property
- def size(self) -> Optional[int]:
- """Rows in the catalogue, or ``None`` if not known."""
+ def size(self):
+ """rows in the catalogue, or ``None`` if not known"""
...
@property
- def visibility(self) -> TypeVar("Unknown"):
- """Visibility map of the catalogue."""
+ def visibility(self):
+ """visibility map of the catalogue"""
...
- def where(
- self,
- selection: TypeVar("Unknown"),
- visibility: Optional[TypeVar("Unknown")] = None,
- ) -> TypeVar("Unknown"):
- """Create a view on this catalogue with the given selection.
-
- Args:
- selection: _description_
- visibility: _description_
- """
+ def where(self, selection, visibility=None):
+ """create a view on this catalogue with the given selection"""
...
@property
- def page_size(self) -> int:
- """Page size for iteration."""
+ def page_size(self):
+ """page size for iteration"""
...
- def __iter__(self) -> TypeVar("Unknown"):
- """Iterate over pages of rows in the catalogue."""
+ def __iter__(self):
+ """iterate over pages of rows in the catalogue"""
...
- def select(self, selection: TypeVar("Unknown")) -> TypeVar("Unknown"):
- """Iterate over pages of rows with the given selection.
-
- Args:
- selection: _description_
- """
+ def select(self, selection):
+ """iterate over pages of rows with the given selection"""
...
class CatalogView:
- """A view of a catalogue with some selection applied."""
-
- def __init__(
- self,
- catalog: Catalog,
- selection: TypeVar("Unknown"),
- visibility: Optional[TypeVar("Unknown")] = None,
- ) -> None:
- """Create a new view.
-
- Args:
- catalog: _description_
- selection: _description_
- visibility: _description_
- """
+ """a view of a catalogue with some selection applied"""
+
+ def __init__(self, catalog, selection, visibility=None):
+ """create a new view"""
self._catalog = catalog
self._selection = selection
self._visibility = visibility
- def __repr__(self) -> str:
- """Object representation of this view.
-
- Returns:
- _description_
- """
+ def __repr__(self):
+ """object representation of this view"""
return f"{self._catalog!r}[{self._selection!r}]"
- def __str__(self) -> str:
- """String representation of this view.
-
- Returns:
- _description_
- """
+ def __str__(self):
+ """string representation of this view"""
return f"{self._catalog!s}[{self._selection!s}]"
- def __getitem__(self, where: TypeVar("Unknown")) -> TypeVar("Unknown"):
- """Return a view with a subselection of this view.
-
- Args:
- where: _description_
-
- Returns:
- _description_
- """
+ def __getitem__(self, where):
+ """return a view with a subselection of this view"""
return self.where(where)
@property
- def base(self) -> TypeVar("Unknown"):
- """Base catalogue of this view."""
+ def base(self):
+ """base catalogue of this view"""
return self._catalog
@property
- def selection(self) -> TypeVar("Unknown"):
- """Selection of this view."""
+ def selection(self):
+ """selection of this view"""
return self._selection
@property
- def names(self) -> TypeVar("Unknown"):
- """Column names of this view."""
+ def names(self):
+ """column names of this view"""
return self._catalog.names
@property
- def size(self) -> int:
- """Size of this view, might not take selection into account."""
+ def size(self):
+ """size of this view, might not take selection into account"""
return self._catalog._size(self._selection)
@property
- def visibility(self) -> TypeVar("Unknown"):
- """The visibility of this view."""
+ def visibility(self):
+ """the visibility of this view"""
if self._visibility is None:
return self._catalog.visibility
return self._visibility
@visibility.setter
- def visibility(self, visibility: TypeVar("Unknown")) -> None:
- """_summary_."""
+ def visibility(self, visibility):
self._visibility = visibility
- def where(
- self,
- selection: TypeVar("Unknown"),
- visibility: Optional[TypeVar("Unknown")] = None,
- ) -> TypeVar("Unknown"):
- """Return a view with a subselection of this view.
-
- Args:
- selection: _description_
- visibility: _description_
-
- Returns:
- _description_
- """
+ def where(self, selection, visibility=None):
+ """return a view with a subselection of this view"""
if isinstance(selection, (tuple, list)):
joined = (self._selection, *selection)
else:
@@ -321,27 +223,16 @@ def where(
return self._catalog.where(joined, visibility)
@property
- def page_size(self) -> int:
- """Page size for iterating this view."""
+ def page_size(self):
+ """page size for iterating this view"""
return self._catalog.page_size
- def __iter__(self) -> Iterator[TypeVar("Unknown")]:
- """Iterate the catalogue with the selection of this view.
-
- Yields:
- _description_
- """
+ def __iter__(self):
+ """iterate the catalogue with the selection of this view"""
yield from self._catalog.select(self._selection)
- def select(self, selection: TypeVar("Unknown")) -> Iterator[TypeVar("Unknown")]:
- """Iterate over pages of rows with the given selection.
-
- Args:
- selection: _description_
-
- Yields:
- _description_
- """
+ def select(self, selection):
+ """iterate over pages of rows with the given selection"""
if isinstance(selection, (tuple, list)):
joined = (self._selection, *selection)
else:
@@ -349,24 +240,22 @@ def select(self, selection: TypeVar("Unknown")) -> Iterator[TypeVar("Unknown")]:
yield from self._catalog.select(joined)
-class _CatalogBase(metaclass=ABCMeta):
- """Abstract base class for base catalogues (not views)."""
+class CatalogBase(metaclass=ABCMeta):
+ """abstract base class for base catalogues (not views)"""
- _default_page_size: int = 100_000
- """Default value for page size"""
+ default_page_size: int = 100_000
+ """default value for page size"""
- def __init__(self) -> None:
+ def __init__(self):
"""Create a new catalogue instance."""
- self._page_size = self._default_page_size
+
+ self._page_size = self.default_page_size
self._filters = []
self._visibility = None
- def __copy__(self) -> "_CatalogBase":
- """Return a shallow copy of the catalogue.
+ def __copy__(self):
+ """return a shallow copy of the catalogue"""
- Returns:
- _description_
- """
other = self.__class__.__new__(self.__class__)
other._page_size = self._page_size
other._filters = self._filters.copy()
@@ -374,140 +263,93 @@ def __copy__(self) -> "_CatalogBase":
return other
@abstractmethod
- def _names(self) -> TypeVar("Unknown"):
- """Abstract method to return the columns in the catalogue."""
+ def _names(self):
+ """abstract method to return the columns in the catalogue"""
...
@abstractmethod
- def _size(self, selection: TypeVar("Unknown")) -> int:
- """Abstract method to return the size of the catalogue or selection.
-
- Args:
- selection: _description_
- """
+ def _size(self, selection):
+ """abstract method to return the size of the catalogue or selection"""
...
@abstractmethod
- def _join(self, *where: TypeVar("Unknown")) -> TypeVar("Unknown"):
- """Abstract method to join selections."""
+ def _join(self, *where):
+ """abstract method to join selections"""
...
@abstractmethod
- def _pages(self, selection: TypeVar("Unknown")) -> TypeVar("Unknown"):
- """Abstract method to iterate selected pages from the catalogue.
-
- Args:
- selection: _description_
- """
+ def _pages(self, selection):
+ """abstract method to iterate selected pages from the catalogue"""
...
@property
- def filters(self) -> TypeVar("Unknown"):
- """Filters to apply to this catalogue."""
+ def filters(self):
+ """filters to apply to this catalogue"""
return self._filters
@filters.setter
- def filters(self, filters: TypeVar("Unknown")) -> None:
- """_summary_."""
+ def filters(self, filters):
self._filters = filters
- def add_filter(self, filt: TypeVar("Unknown")) -> None:
- """Add a filter to catalogue.
-
- Args:
- filt: _description_
- """
+ def add_filter(self, filt):
+ """add a filter to catalogue"""
self.filters.append(filt)
- def __getitem__(self, where: TypeVar("Unknown")) -> TypeVar("Unknown"):
- """Create a view on this catalogue with the given selection.
-
- Args:
- where: _description_
-
- Returns:
- _description_
- """
+ def __getitem__(self, where):
+ """create a view on this catalogue with the given selection"""
return self.where(where)
@property
- def base(self) -> None:
- """Returns ``None`` since this is not a view of another catalogue."""
+ def base(self):
+ """returns ``None`` since this is not a view of another catalogue"""
return
@property
- def selection(self) -> None:
- """Returns ``None`` since this is not a view of another catalogue."""
+ def selection(self):
+ """returns ``None`` since this is not a view of another catalogue"""
return
@property
- def names(self) -> Optional[TypeVar("Unknown")]:
- """Columns in the catalogue, or ``None`` if not known."""
+ def names(self):
+ """columns in the catalogue, or ``None`` if not known"""
return self._names()
@property
- def size(self) -> Optional[int]:
- """Total rows in the catalogue, or ``None`` if not known."""
+ def size(self):
+ """total rows in the catalogue, or ``None`` if not known"""
return self._size(None)
@property
- def visibility(self) -> TypeVar("Unknown"):
- """Optional visibility map for catalogue."""
+ def visibility(self):
+ """optional visibility map for catalogue"""
return self._visibility
@visibility.setter
- def visibility(self, visibility: TypeVar("Unknown")) -> None:
- """_summary_."""
+ def visibility(self, visibility):
self._visibility = visibility
- def where(
- self,
- selection: TypeVar("Unknown"),
- visibility: Optional[TypeVar("Unknown")] = None,
- ) -> CatalogView:
- """Create a view on this catalogue with the given selection.
-
- Args:
- selection: _description_
- visibility: _description_
-
- Returns:
- _description_
- """
+ def where(self, selection, visibility=None):
+ """create a view on this catalogue with the given selection"""
if isinstance(selection, (tuple, list)):
selection = self._join(*selection)
return CatalogView(self, selection, visibility)
@property
- def page_size(self) -> int:
- """Number of rows per page (default: 100_000)."""
+ def page_size(self):
+ """number of rows per page (default: 100_000)"""
return self._page_size
@page_size.setter
- def page_size(self, value: int) -> None:
- """_summary_."""
+ def page_size(self, value):
self._page_size = value
- def __iter__(self) -> Iterator[TypeVar("Unknown")]:
- """Iterate over pages of rows in the catalogue.
-
- Yields:
- _description_
- """
+ def __iter__(self):
+ """iterate over pages of rows in the catalogue"""
yield from self.select(None)
- def select(
- self,
- selection: Union[list, tuple, TypeVar("Unknown")],
- ) -> Iterator[CatalogPage]:
- """Iterate over pages of rows with the given selection.
-
- Args:
- selection: _description_
+ def select(self, selection):
+ """iterate over pages of rows with the given selection"""
- Yields:
- _description_
- """
if isinstance(selection, (tuple, list)):
selection = self._join(*selection)
diff --git a/heracles/catalog/filters.py b/heracles/catalog/filters.py
index 12e8859..0036f53 100644
--- a/heracles/catalog/filters.py
+++ b/heracles/catalog/filters.py
@@ -16,65 +16,46 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see .
-"""Module for catalogue filters."""
+"""module for catalogue filters"""
import warnings
-from typing import Optional, TypeVar
import healpy as hp
import numpy as np
-from .base import CatalogPage
-
class InvalidValueFilter:
"""Filter invalid values from a catalogue."""
- def __init__(
- self,
- *columns: TypeVar("Unknown"),
- weight: Optional[TypeVar("Unknown")] = None,
- warn: bool = True,
- ) -> None:
+ def __init__(self, *columns, weight=None, warn=True):
"""Filter invalid values in the given columns.
If ``warn`` is true, invalid values will emit a warning.
- Args:
- columns: _description_
- weight: _description_
- warn: _description_
"""
- self._columns = columns
- self._weight = weight
- self._warn = warn
- def __repr__(self) -> str:
- """_summary_.
+ self.columns = columns
+ self.weight = weight
+ self.warn = warn
- Returns:
- _description_
- """
+ def __repr__(self):
name = self.__class__.__name__
- args = list(map(repr, self._columns))
- args += [f"weight={self._weight!r}", f"warn={self._warn!r}"]
+ args = list(map(repr, self.columns))
+ args += [f"weight={self.weight!r}", f"warn={self.warn!r}"]
args = ", ".join(args)
return f"{name}({args})"
- def __call__(self, page: CatalogPage) -> None:
- """Filter a catalog page.
+ def __call__(self, page):
+ """Filter a catalog page."""
- Args:
- page: _description_
- """
invalid_mask = np.zeros(page.size, dtype=bool)
- for col in self._columns:
+ for col in self.columns:
invalid_mask |= np.isnan(page[col])
- if self._weight is not None:
- invalid_mask &= page[self._weight] != 0
+ if self.weight is not None:
+ invalid_mask &= page[self.weight] != 0
invalid = np.where(invalid_mask)[0]
if len(invalid) > 0:
- if self._warn:
+ if self.warn:
warnings.warn("WARNING: catalog contains invalid values")
page.delete(invalid)
@@ -82,44 +63,30 @@ def __call__(self, page: CatalogPage) -> None:
class FootprintFilter:
"""Filter a catalogue using a footprint map."""
- def __init__(self, footprint: TypeVar("Unknown"), lon: float, lat: float) -> None:
- """Filter using the given footprint map and position columns.
-
- Args:
- footprint: _description_
- lon: _description_
- lat: _description_
- """
+ def __init__(self, footprint, lon, lat):
+ """Filter using the given footprint map and position columns."""
self._footprint = footprint
self._nside = hp.get_nside(footprint)
self._lonlat = (lon, lat)
@property
- def footprint(self) -> TypeVar("Unknown"):
- """Footprint for filter."""
+ def footprint(self):
+ """footprint for filter"""
return self._footprint
@property
- def lonlat(self) -> tuple[float, float]:
- """Longitude and latitude columns."""
+ def lonlat(self):
+ """longitude and latitude columns"""
return self._lonlat
- def __repr__(self) -> str:
- """_summary_.
-
- Returns:
- _description_
- """
+ def __repr__(self):
name = self.__class__.__name__
lon, lat = self.lonlat
return f"{name}(..., {lon!r}, {lat!r})"
- def __call__(self, page: CatalogPage) -> None:
- """Filter catalog page.
+ def __call__(self, page):
+ """filter catalog page"""
- Args:
- page: _description_
- """
lon, lat = self._lonlat
ipix = hp.ang2pix(self._nside, page[lon], page[lat], lonlat=True)
exclude = np.where(self._footprint[ipix] == 0)[0]
diff --git a/heracles/catalog/fits.py b/heracles/catalog/fits.py
index 5f8e6fe..5d611aa 100644
--- a/heracles/catalog/fits.py
+++ b/heracles/catalog/fits.py
@@ -16,98 +16,57 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see .
-"""Module for catalogue processing."""
+"""module for catalogue processing"""
-from collections.abc import Iterator
-from typing import Any, Optional, TypeVar
from weakref import finalize, ref
import fitsio
-from .base import CatalogPage, _CatalogBase
+from .base import CatalogBase, CatalogPage
-def _is_table_hdu(hdu: fitsio.hdu.TableHDU) -> bool:
- """Return true if HDU is a table with data.
-
- Args:
- hdu: _description_
-
- Returns:
- _description_
- """
+def _is_table_hdu(hdu):
+ """return true if HDU is a table with data"""
return isinstance(hdu, fitsio.hdu.TableHDU) and hdu.has_data()
-def rowfilter(array: TypeVar("Unknown"), expr: str) -> Any:
- """Filter the rows of a structured array.
-
- Args:
- array: _description_
- expr: _description_
-
- Returns:
- _description_
- """
+def rowfilter(array, expr):
+ """filter the rows of a structured array"""
return eval(expr, None, {name: array[name] for name in array.dtype.names})
-class FitsCatalog(_CatalogBase):
- """Flexible reader for catalogues from FITS files."""
+class FitsCatalog(CatalogBase):
+ """flexible reader for catalogues from FITS files"""
- def __init__(
- self,
- filename: TypeVar("Unknown"),
- *,
- columns: Optional[TypeVar("Unknown")] = None,
- ext: Optional[TypeVar("Unknown")] = None,
- ) -> None:
- """Create a new FITS catalogue reader.
+ def __init__(self, filename, *, columns=None, ext=None):
+ """create a new FITS catalogue reader
Neither opens the FITS file nor reads the catalogue immediately.
- Args:
- filename: _description_
- columns: _description_
- ext: _description_
"""
super().__init__()
self._filename = filename
self._columns = columns
self._ext = ext
- def __copy__(self) -> "FitsCatalog":
- """Return a copy of this catalog.
-
- Returns:
- _description_
- """
+ def __copy__(self):
+ """return a copy of this catalog"""
other = super().__copy__()
other._filename = self._filename
other._columns = self._columns
other._ext = self._ext
return other
- def __repr__(self) -> str:
- """String representation of FitsCatalog.
-
- Returns:
- _description_
- """
+ def __repr__(self):
+ """string representation of FitsCatalog"""
s = self._filename
if self._ext is not None:
s = s + f"[{self._ext!r}]"
return s
- def hdu(self) -> TypeVar("Unknown"):
- """HDU for catalogue data.
+ def hdu(self):
+ """HDU for catalogue data"""
- Raises:
- TypeError: _description_
-
- Returns:
- _description_
- """
# see if there's a reference to hdu still around
try:
hdu = self._hdu()
@@ -146,47 +105,26 @@ def hdu(self) -> TypeVar("Unknown"):
return hdu
- def _names(self) -> Optional[TypeVar("Unknown")]:
- """Column names in FITS catalogue.
-
- Returns:
- _description_
- """
+ def _names(self):
+ """column names in FITS catalogue"""
# store column names on first access
if self._columns is None:
self._columns = self.hdu().get_colnames()
return self._columns
- def _size(self, selection: TypeVar("Unknown")) -> int:
- """Size of FITS catalogue; selection is ignored.
-
- Args:
- selection: _description_
-
- Returns:
- _description_
- """
+ def _size(self, selection):
+ """size of FITS catalogue; selection is ignored"""
return self.hdu().get_nrows()
- def _join(self, *where: TypeVar("Unknown")) -> Optional[str]:
- """Join rowfilter expressions.
-
- Returns:
- _description_
- """
+ def _join(self, *where):
+ """join rowfilter expressions"""
if not where:
return None
return "(" + ") & (".join(map(str, filter(None, where))) + ")"
- def _pages(self, selection: TypeVar("Unknown")) -> Iterator[CatalogPage]:
- """Iterate pages of rows in FITS file, optionally using the query.
+ def _pages(self, selection):
+ """iterate pages of rows in FITS file, optionally using the query"""
- Args:
- selection: _description_
-
- Yields:
- _description_
- """
# keep an unchanging local copy of the page size
page_size = self.page_size
diff --git a/heracles/core.py b/heracles/core.py
index 408aa8d..bebf96c 100644
--- a/heracles/core.py
+++ b/heracles/core.py
@@ -16,28 +16,14 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see .
-"""module for common core functionality."""
+"""module for common core functionality"""
from collections import UserDict
from collections.abc import Mapping, Sequence
-from typing import Optional, TypeVar, Union
-def toc_match(
- key: TypeVar("Unknown"),
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
-) -> bool:
- """Return whether a tocdict entry matches include/exclude criteria.
-
- Args:
- key: _description_
- include: _description_
- exclude: _description_
-
- Returns:
- _description_
- """
+def toc_match(key, include=None, exclude=None):
+ """return whether a tocdict entry matches include/exclude criteria"""
if include is not None:
for pattern in include:
if all(p is Ellipsis or p == k for p, k in zip(pattern, key)):
@@ -51,24 +37,8 @@ def toc_match(
return True
-def toc_filter(
- obj: Union[Sequence, Mapping],
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
-) -> Union[dict, list]:
- """Return a filtered toc dict ``d``.
-
- Args:
- obj: _description_
- include: _description_
- exclude: _description_
-
- Raises:
- TypeError: _description_
-
- Returns:
- _description_
- """
+def toc_filter(obj, include=None, exclude=None):
+ """return a filtered toc dict ``d``"""
if isinstance(obj, Sequence):
return [toc_filter(item, include, exclude) for item in obj]
if isinstance(obj, Mapping):
@@ -80,20 +50,10 @@ def toc_filter(
# subclassing UserDict here since that returns the correct type from methods
# such as __copy__(), __or__(), etc.
class TocDict(UserDict):
- """Table-of-contents dictionary with pattern-based lookup."""
-
- def __getitem__(self, pattern: TypeVar("Unknown")) -> TypeVar("Unknown"):
- """Look up one or many keys in dict.
-
- Args:
- pattern: _description_
-
- Raises:
- KeyError: _description_
+ """Table-of-contents dictionary with pattern-based lookup"""
- Returns:
- _description_
- """
+ def __getitem__(self, pattern):
+ """look up one or many keys in dict"""
# first, see if pattern is a valid entry in the dict
# might fail with KeyError (no such entry) or TypeError (not hashable)
try:
diff --git a/heracles/covariance.py b/heracles/covariance.py
index 8bf7c02..78508d7 100644
--- a/heracles/covariance.py
+++ b/heracles/covariance.py
@@ -16,39 +16,25 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see .
-"""Module for covariance matrix computation."""
+"""module for covariance matrix computation"""
+
import logging
import time
from datetime import timedelta
from itertools import combinations_with_replacement
-from typing import Optional, TypeVar
import healpy as hp
import numpy as np
-import numpy.typing as npt
from ._kmeans_radec import kmeans_sample
-_logger = logging.getLogger(__name__)
+logger = logging.getLogger(__name__)
class SampleCovariance(np.ndarray):
- """Array subclass for iterative sample covariance matrix computation."""
-
- def __new__(
- cls,
- nrows: int,
- ncols: Optional[int] = None,
- ) -> "SampleCovariance":
- """_summary_.
-
- Args:
- nrows: _description_
- ncols: _description_
-
- Returns:
- _description_
- """
+ """array subclass for iterative sample covariance matrix computation"""
+
+ def __new__(cls, nrows, ncols=None):
if ncols is None:
ncols = nrows
cov = np.zeros((nrows, ncols)).view(cls)
@@ -57,12 +43,7 @@ def __new__(
cov.sample_col_mean = np.zeros(ncols)
return cov
- def __array_finalize__(self, cov: Optional["SampleCovariance"]) -> None:
- """_summary_.
-
- Args:
- cov: _description_
- """
+ def __array_finalize__(self, cov):
if cov is None:
return
nrows, ncols = np.shape(cov)
@@ -73,21 +54,9 @@ def __array_finalize__(self, cov: Optional["SampleCovariance"]) -> None:
self.sample_col_mean[:] = getattr(cov, "sample_col_mean", 0.0)
-def add_sample(
- cov: "SampleCovariance",
- x: npt.NDArray,
- y: Optional[npt.NDArray] = None,
-) -> None:
- """Add a sample to a sample covariance matrix.
+def add_sample(cov, x, y=None):
+ """add a sample to a sample covariance matrix"""
- Args:
- cov: _description_
- x: _description_
- y: _description_
-
- Raises:
- ValueError: _description_
- """
x = np.reshape(x, -1)
if y is None:
y = x
@@ -106,20 +75,16 @@ def add_sample(
cov += (np.outer(delta, y - cov.sample_col_mean) - cov) / (cov.sample_count - 1)
-def update_covariance(cov: "SampleCovariance", sample: TypeVar("Unknown")) -> None:
- """Update a set of sample covariances given a sample.
+def update_covariance(cov, sample):
+ """update a set of sample covariances given a sample"""
- Args:
- cov: _description_
- sample: _description_
- """
- _logger.info("updating covariances for %d item(s)", len(sample))
+ logger.info("updating covariances for %d item(s)", len(sample))
t = time.monotonic()
for (k1, v1), (k2, v2) in combinations_with_replacement(sample.items(), 2):
if (k1, k2) not in cov:
nrows, ncols = np.size(v1), np.size(v2)
- _logger.info(
+ logger.info(
"creating %d x %d covariance matrix for %s, %s",
nrows,
ncols,
@@ -127,10 +92,10 @@ def update_covariance(cov: "SampleCovariance", sample: TypeVar("Unknown")) -> No
k2,
)
cov[k1, k2] = SampleCovariance(nrows, ncols)
- _logger.info("updating covariance for %s, %s", k1, k2)
+ logger.info("updating covariance for %s, %s", k1, k2)
add_sample(cov[k1, k2], v1, v2)
- _logger.info(
+ logger.info(
"updated %d covariance(s) in %s",
len(sample) * (len(sample) + 1) // 2,
timedelta(seconds=(time.monotonic() - t)),
@@ -138,47 +103,33 @@ def update_covariance(cov: "SampleCovariance", sample: TypeVar("Unknown")) -> No
def jackknife_regions_kmeans(
- fpmap: TypeVar("Unknown"),
- n: int,
+ fpmap,
+ n,
*,
- maxrepeat: int = 5,
- maxiter: int = 1_000,
- tol: float = 1e-5,
- return_centers: bool = False,
-) -> TypeVar("Unknown"):
- """Partition a footprint map into n regions using k-means.
-
- Args:
- fpmap: _description_
- n: _description_
- maxrepeat: _description_
- maxiter: _description_
- tol: _description_
- return_centers: _description_
-
- Raises:
- RuntimeError: _description_
-
- Returns:
- _description_
- """
+ maxrepeat=5,
+ maxiter=1000,
+ tol=1e-5,
+ return_centers=False,
+):
+ """partition a footprint map into n regions using k-means"""
+
nside = hp.get_nside(fpmap)
npix = hp.nside2npix(nside)
- _logger.info("partitioning map with NSIDE=%s into %s regions", nside, n)
+ logger.info("partitioning map with NSIDE=%s into %s regions", nside, n)
t = time.monotonic()
- _logger.info("finding all nonzero pixels in map")
+ logger.info("finding all nonzero pixels in map")
ipix = np.nonzero(fpmap)[0]
- _logger.info("found %d nonzero pixels in map", len(ipix))
- _logger.info("getting angles of all nonzero pixels in map")
+ logger.info("found %d nonzero pixels in map", len(ipix))
+ logger.info("getting angles of all nonzero pixels in map")
radec = np.transpose(hp.pix2ang(nside, ipix, lonlat=True))
for r in range(maxrepeat + 1):
- _logger.info(
+ logger.info(
"constructing %s regions using k-means%s",
n,
"" if r == 0 else f" (repeat {r})",
@@ -187,9 +138,9 @@ def jackknife_regions_kmeans(
km = kmeans_sample(radec, n, verbose=0)
if km.converged:
- _logger.info("k-means converged")
+ logger.info("k-means converged")
break
- _logger.info("k-means not converged; repeat")
+ logger.info("k-means not converged; repeat")
else:
msg = (
f"k-means failed to partition map into {n} regions after repeat {maxrepeat}"
@@ -201,15 +152,15 @@ def jackknife_regions_kmeans(
areas = 60**4 // 100 / np.pi / npix * np.bincount(km.labels)
area_mean, area_std, area_unit = np.mean(areas), np.std(areas), "deg2"
if area_mean < 1.0:
- area_mean, area_std, area_unit = area_mean / 3_600, area_std / 3_600, "arcmin2"
+ area_mean, area_std, area_unit = area_mean / 3600, area_std / 3600, "arcmin2"
if area_mean < 1.0:
- area_mean, area_std, area_unit = area_mean / 3_600, area_std / 3_600, "arcsec2"
- _logger.info("region area is %.3f ± %.3f %s", area_mean, area_std, area_unit)
+ area_mean, area_std, area_unit = area_mean / 3600, area_std / 3600, "arcsec2"
+ logger.info("region area is %.3f ± %.3f %s", area_mean, area_std, area_unit)
jkmap = np.zeros(npix, dtype=int)
jkmap[ipix] = km.labels + 1
- _logger.info("partitioned map in %s", timedelta(seconds=(time.monotonic() - t)))
+ logger.info("partitioned map in %s", timedelta(seconds=(time.monotonic() - t)))
if return_centers:
result = jkmap, km.centers
diff --git a/heracles/io.py b/heracles/io.py
index 3ceed05..2f88a40 100644
--- a/heracles/io.py
+++ b/heracles/io.py
@@ -16,20 +16,18 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see .
-"""Module for file reading and writing."""
+"""module for file reading and writing"""
import logging
import os
-from typing import Optional, TypeVar
import fitsio
import healpy as hp
import numpy as np
-import numpy.typing as npt
from .core import TocDict, toc_match
-_logger = logging.getLogger(__name__)
+logger = logging.getLogger(__name__)
_METADATA_COMMENTS = {
@@ -46,27 +44,15 @@
}
-def _write_metadata(hdu: TypeVar("Unknown"), metadata: TypeVar("Unknown")) -> None:
- """Write array metadata to FITS HDU.
-
- Args:
- hdu: _description_
- metadata: _description_
- """
+def _write_metadata(hdu, metadata):
+ """write array metadata to FITS HDU"""
md = metadata or {}
for key, value in md.items():
hdu.write_key("META " + key.upper(), value, _METADATA_COMMENTS.get(key))
-def _read_metadata(hdu: TypeVar("Unknown")) -> dict:
- """Read array metadata from FITS HDU.
-
- Args:
- hdu: _description_
-
- Returns:
- _description_
- """
+def _read_metadata(hdu):
+ """read array metadata from FITS HDU"""
h = hdu.read_header()
md = {}
for key in h:
@@ -75,23 +61,8 @@ def _read_metadata(hdu: TypeVar("Unknown")) -> dict:
return md
-def read_mask(
- mask_name: TypeVar("Unknown"),
- nside: Optional[TypeVar("Unknown")] = None,
- field: int = 0,
- extra_mask_name: Optional[TypeVar("Unknown")] = None,
-) -> npt.NDArray:
- """Read visibility map from a HEALPix map file.
-
- Args:
- mask_name: _description_
- nside: _description_
- field: _description_
- extra_mask_name: _description_
-
- Returns:
- _description_
- """
+def read_mask(mask_name, nside=None, field=0, extra_mask_name=None):
+ """read visibility map from a HEALPix map file"""
mask = hp.read_map(mask_name, field=field)
# set unseen pixels to zero
@@ -120,28 +91,22 @@ def read_mask(
def write_maps(
- filename: str,
- maps: TocDict,
+ filename,
+ maps,
*,
- clobber: bool = False,
- workdir: str = ".",
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
-) -> None:
- """Write a set of maps to FITS file.
+ clobber=False,
+ workdir=".",
+ include=None,
+ exclude=None,
+):
+ """write a set of maps to FITS file
If the output file exists, the new estimates will be appended, unless the
``clobber`` parameter is set to ``True``.
- Args:
- filename: _description_
- maps: _description_
- clobber: _description_
- workdir: _description_
- include: _description_
- exclude: _description_
"""
- _logger.info("writing %d maps to %s", len(maps), filename)
+
+ logger.info("writing %d maps to %s", len(maps), filename)
# full path to FITS file
path = os.path.join(workdir, filename)
@@ -175,7 +140,7 @@ def write_maps(
if not toc_match((n, i), include=include, exclude=exclude):
continue
- _logger.info("writing %s map for bin %s", n, i)
+ logger.info("writing %s map for bin %s", n, i)
# the cl extension name
ext = f"MAP{mapn}"
@@ -221,28 +186,13 @@ def write_maps(
tocentry[0] = (ext, n, i)
fits["MAPTOC"].append(tocentry)
- _logger.info("done with %d maps", len(maps))
+ logger.info("done with %d maps", len(maps))
-def read_maps(
- filename: str,
- workdir: str = ".",
- *,
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
-) -> TocDict:
- """Read a set of maps from a FITS file.
-
- Args:
- filename: _description_
- workdir: _description_
- include: _description_
- exclude: _description_
-
- Returns:
- _description_
- """
- _logger.info("reading maps from %s", filename)
+def read_maps(filename, workdir=".", *, include=None, exclude=None):
+ """read a set of maps from a FITS file"""
+
+ logger.info("reading maps from %s", filename)
# full path to FITS file
path = os.path.join(workdir, filename)
@@ -263,7 +213,7 @@ def read_maps(
if not toc_match((n, i), include=include, exclude=exclude):
continue
- _logger.info("reading %s map for bin %s", n, i)
+ logger.info("reading %s map for bin %s", n, i)
# read the map from the extension
m = fits[ext].read()
@@ -279,35 +229,29 @@ def read_maps(
# store in set of maps
maps[n, i] = m
- _logger.info("done with %d maps", len(maps))
+ logger.info("done with %d maps", len(maps))
# return the dictionary of maps
return maps
def write_alms(
- filename: str,
- alms: TocDict,
+ filename,
+ alms,
*,
- clobber: bool = False,
- workdir: str = ".",
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
-) -> None:
- """Write a set of alms to FITS file.
+ clobber=False,
+ workdir=".",
+ include=None,
+ exclude=None,
+):
+ """write a set of alms to FITS file
If the output file exists, the new estimates will be appended, unless the
``clobber`` parameter is set to ``True``.
- Args:
- filename: _description_
- alms: _description_
- clobber: _description_
- workdir: _description_
- include: _description_
- exclude: _description_
"""
- _logger.info("writing %d alms to %s", len(alms), filename)
+
+ logger.info("writing %d alms to %s", len(alms), filename)
# full path to FITS file
path = os.path.join(workdir, filename)
@@ -341,7 +285,7 @@ def write_alms(
if not toc_match((n, i), include=include, exclude=exclude):
continue
- _logger.info("writing %s alm for bin %s", n, i)
+ logger.info("writing %s alm for bin %s", n, i)
# the cl extension name
ext = f"ALM{almn}"
@@ -357,28 +301,13 @@ def write_alms(
tocentry[0] = (ext, n, i)
fits["ALMTOC"].append(tocentry)
- _logger.info("done with %d alms", len(alms))
+ logger.info("done with %d alms", len(alms))
-def read_alms(
- filename: str,
- workdir: str = ".",
- *,
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
-) -> TocDict:
- """Read a set of alms from a FITS file.
-
- Args:
- filename: _description_
- workdir: _description_
- include: _description_
- exclude: _description_
-
- Returns:
- _description_
- """
- _logger.info("reading alms from %s", filename)
+def read_alms(filename, workdir=".", *, include=None, exclude=None):
+ """read a set of alms from a FITS file"""
+
+ logger.info("reading alms from %s", filename)
# full path to FITS file
path = os.path.join(workdir, filename)
@@ -399,7 +328,7 @@ def read_alms(
if not toc_match((n, i), include=include, exclude=exclude):
continue
- _logger.info("reading %s alm for bin %s", n, i)
+ logger.info("reading %s alm for bin %s", n, i)
# read the alm from the extension
raw = fits[ext].read()
@@ -414,34 +343,21 @@ def read_alms(
# store in set of alms
alms[n, i] = alm
- _logger.info("done with %d alms", len(alms))
+ logger.info("done with %d alms", len(alms))
# return the dictionary of alms
return alms
-def write_cls(
- filename: str,
- cls,
- *,
- clobber: bool = False,
- workdir: str = ".",
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
-) -> None:
- """Write a set of cls to FITS file.
+def write_cls(filename, cls, *, clobber=False, workdir=".", include=None, exclude=None):
+ """write a set of cls to FITS file
If the output file exists, the new estimates will be appended, unless the
``clobber`` parameter is set to ``True``.
- Args:
- filename: _description_
- clobber: _description_
- workdir: _description_
- include: _description_
- exclude: _description_
"""
- _logger.info("writing %d cls to %s", len(cls), filename)
+
+ logger.info("writing %d cls to %s", len(cls), filename)
# full path to FITS file
path = os.path.join(workdir, filename)
@@ -475,7 +391,7 @@ def write_cls(
if not toc_match((k1, k2, i1, i2), include=include, exclude=exclude):
continue
- _logger.info("writing %s x %s cl for bins %s, %s", k1, k2, i1, i2)
+ logger.info("writing %s x %s cl for bins %s, %s", k1, k2, i1, i2)
# the cl extension name
ext = f"CL{cln}"
@@ -511,28 +427,13 @@ def write_cls(
tocentry[0] = (ext, k1, k2, i1, i2)
fits["CLTOC"].append(tocentry)
- _logger.info("done with %d cls", len(cls))
+ logger.info("done with %d cls", len(cls))
-def read_cls(
- filename: str,
- workdir: str = ".",
- *,
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
-) -> TocDict:
- """Read a set of cls from a FITS file.
-
- Args:
- filename: _description_
- workdir: _description_
- include: _description_
- exclude: _description_
-
- Returns:
- _description_
- """
- _logger.info("reading cls from %s", filename)
+def read_cls(filename, workdir=".", *, include=None, exclude=None):
+ """read a set of cls from a FITS file"""
+
+ logger.info("reading cls from %s", filename)
# full path to FITS file
path = os.path.join(workdir, filename)
@@ -553,7 +454,7 @@ def read_cls(
if not toc_match((k1, k2, i1, i2), include=include, exclude=exclude):
continue
- _logger.info("reading %s x %s cl for bins %s, %s", k1, k2, i1, i2)
+ logger.info("reading %s x %s cl for bins %s, %s", k1, k2, i1, i2)
# read the cl from the extension
cl = fits[ext].read()
@@ -564,35 +465,21 @@ def read_cls(
# store in set of cls
cls[k1, k2, i1, i2] = cl
- _logger.info("done with %d cls", len(cls))
+ logger.info("done with %d cls", len(cls))
# return the dictionary of cls
return cls
-def write_mms(
- filename: str,
- mms: TypeVar("Unknown"),
- *,
- clobber: bool = False,
- workdir: str = ".",
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
-) -> None:
- """Write a set of mixing matrices to FITS file.
+def write_mms(filename, mms, *, clobber=False, workdir=".", include=None, exclude=None):
+ """write a set of mixing matrices to FITS file
If the output file exists, the new mixing matrices will be appended, unless
the ``clobber`` parameter is set to ``True``.
- Args:
- filename: _description_
- mms: _description_
- clobber: _description_
- workdir: _description_
- include: _description_
- exclude: _description_
"""
- _logger.info("writing %d mm(s) to %s", len(mms), filename)
+
+ logger.info("writing %d mm(s) to %s", len(mms), filename)
# full path to FITS file
path = os.path.join(workdir, filename)
@@ -626,7 +513,7 @@ def write_mms(
if not toc_match((n, i1, i2), include=include, exclude=exclude):
continue
- _logger.info("writing mixing matrix %s for bins %s, %s", n, i1, i2)
+ logger.info("writing mixing matrix %s for bins %s, %s", n, i1, i2)
# the mm extension name
ext = f"MM{mmn}"
@@ -651,28 +538,13 @@ def write_mms(
tocentry[0] = (ext, n, i1, i2)
fits["MMTOC"].append(tocentry)
- _logger.info("done with %d mm(s)", len(mms))
+ logger.info("done with %d mm(s)", len(mms))
-def read_mms(
- filename: str,
- workdir: str = ".",
- *,
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
-) -> TypeVar("Unknown"):
- """Read a set of mixing matrices from a FITS file.
-
- Args:
- filename: _description_
- workdir: _description_
- include: _description_
- exclude: _description_
-
- Returns:
- _description_
- """
- _logger.info("reading mixing matrices from %s", filename)
+def read_mms(filename, workdir=".", *, include=None, exclude=None):
+ """read a set of mixing matrices from a FITS file"""
+
+ logger.info("reading mixing matrices from %s", filename)
# full path to FITS file
path = os.path.join(workdir, filename)
@@ -693,7 +565,7 @@ def read_mms(
if not toc_match((n, i1, i2), include=include, exclude=exclude):
continue
- _logger.info("reading mixing matrix %s for bins %s, %s", n, i1, i2)
+ logger.info("reading mixing matrix %s for bins %s, %s", n, i1, i2)
# read the mixing matrix from the extension
mm = fits[ext].read()
@@ -704,34 +576,21 @@ def read_mms(
# store in set of mms
mms[n, i1, i2] = mm
- _logger.info("done with %d mm(s)", len(mms))
+ logger.info("done with %d mm(s)", len(mms))
# return the dictionary of mms
return mms
-def write_cov(
- filename: str,
- cov: TypeVar("Unknown"),
- clobber: bool = False,
- workdir: str = ".",
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
-) -> None:
- """Write a set of covariance matrices to FITS file.
+def write_cov(filename, cov, clobber=False, workdir=".", include=None, exclude=None):
+ """write a set of covariance matrices to FITS file
If the output file exists, the new estimates will be appended, unless the
``clobber`` parameter is set to ``True``.
- Args:
- filename: _description_
- cov: _description_
- clobber: _description_
- workdir: _description_
- include: _description_
- exclude: _description_
"""
- _logger.info("writing %d covariances to %s", len(cov), filename)
+
+ logger.info("writing %d covariances to %s", len(cov), filename)
# full path to FITS file
path = os.path.join(workdir, filename)
@@ -779,7 +638,7 @@ def write_cov(
ext = f"COV{extn}"
extn += 1
- _logger.info("writing %s x %s covariance matrix", k1, k2)
+ logger.info("writing %s x %s covariance matrix", k1, k2)
# write the covariance matrix as an image
fits.write_image(mat, extname=ext)
@@ -800,28 +659,13 @@ def write_cov(
tocentry[0] = (ext, *k1, *k2)
fits["COVTOC"].append(tocentry)
- _logger.info("done with %d covariance(s)", len(cov))
+ logger.info("done with %d covariance(s)", len(cov))
-def read_cov(
- filename: str,
- workdir: str = ".",
- *,
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
-) -> TocDict:
- """Read a set of covariances matrices from a FITS file.
-
- Args:
- filename: _description_
- workdir: _description_
- include: _description_
- exclude: _description_
-
- Returns:
- _description_
- """
- _logger.info("reading covariance matrices from %s", filename)
+def read_cov(filename, workdir=".", *, include=None, exclude=None):
+ """read a set of covariances matrices from a FITS file"""
+
+ logger.info("reading covariance matrices from %s", filename)
# full path to FITS file
path = os.path.join(workdir, filename)
@@ -844,7 +688,7 @@ def read_cov(
if not toc_match((k1, k2), include=include, exclude=exclude):
continue
- _logger.info("reading %s x %s covariance matrix", k1, k2)
+ logger.info("reading %s x %s covariance matrix", k1, k2)
# read the covariance matrix from the extension
mat = fits[ext].read()
@@ -855,7 +699,7 @@ def read_cov(
# store in set
cov[k1, k2] = mat
- _logger.info("done with %d covariance(s)", len(cov))
+ logger.info("done with %d covariance(s)", len(cov))
# return the toc dict of covariances
return cov
diff --git a/heracles/maps.py b/heracles/maps.py
index 5383162..f7e6732 100644
--- a/heracles/maps.py
+++ b/heracles/maps.py
@@ -16,7 +16,7 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see .
-"""Module for map-making."""
+"""module for map-making"""
import logging
import typing as t
@@ -34,18 +34,11 @@
if t.TYPE_CHECKING:
from .catalog import Catalog, CatalogPage
-_logger = logging.getLogger(__name__)
+logger = logging.getLogger(__name__)
-def _nativebyteorder(fn: t.Callable) -> t.Any:
- """Utility decorator to convert inputs to native byteorder.
-
- Args:
- fn: _description_
-
- Returns:
- _description_
- """
+def _nativebyteorder(fn):
+ """utility decorator to convert inputs to native byteorder"""
@wraps(fn)
def wrapper(*inputs):
@@ -61,35 +54,14 @@ def wrapper(*inputs):
@_nativebyteorder
@njit(nogil=True, fastmath=True)
-def _map_pos(pos: t.TypeVar("Unknown"), ipix: t.TypeVar("Unknown")) -> None:
- """_summary_.
-
- Args:
- pos: _description_
- ipix: _description_
- """
+def _map_pos(pos, ipix):
for i in ipix:
pos[i] += 1
@_nativebyteorder
@njit(nogil=True, fastmath=True)
-def _map_real(
- wht: t.TypeVar("Unknown"),
- val: t.TypeVar("Unknown"),
- ipix: t.TypeVar("Unknown"),
- w: t.TypeVar("Unknown"),
- v: t.TypeVar("Unknown"),
-) -> None:
- """_summary_.
-
- Args:
- wht: _description_
- val: _description_
- ipix: _description_
- w: _description_
- v: _description_
- """
+def _map_real(wht, val, ipix, w, v):
for i, w_i, v_i in zip(ipix, w, v):
wht[i] += w_i
val[i] += w_i / wht[i] * (v_i - val[i])
@@ -97,24 +69,7 @@ def _map_real(
@_nativebyteorder
@njit(nogil=True, fastmath=True)
-def _map_complex(
- wht: t.TypeVar("Unknown"),
- val: t.TypeVar("Unknown"),
- ipix: t.TypeVar("Unknown"),
- w: t.TypeVar("Unknown"),
- re: t.TypeVar("Unknown"),
- im: t.TypeVar("Unknown"),
-) -> None:
- """_summary_.
-
- Args:
- wht: _description_
- val: _description_
- ipix: _description_
- w: _description_
- re: _description_
- im: _description_
- """
+def _map_complex(wht, val, ipix, w, re, im):
for i, w_i, re_i, im_i in zip(ipix, w, re, im):
wht[i] += w_i
val[0, i] += w_i / wht[i] * (re_i - val[0, i])
@@ -123,34 +78,13 @@ def _map_complex(
@_nativebyteorder
@njit(nogil=True, fastmath=True)
-def _map_weight(
- wht: t.TypeVar("Unknown"),
- ipix: t.TypeVar("Unknown"),
- w: t.TypeVar("Unknown"),
-) -> None:
- """_summary_.
-
- Args:
- wht: _description_
- ipix: _description_
- w: _description_
- """
+def _map_weight(wht, ipix, w):
for i, w_i in zip(ipix, w):
wht[i] += w_i
-def update_metadata(
- array: t.TypeVar("Unknown"),
- **metadata: t.TypeVar("Unknown"),
-) -> None:
- """Update metadata of an array dtype.
-
- Args:
- array: _description_
-
- Raises:
- ValueError: _description_
- """
+def update_metadata(array, **metadata):
+ """update metadata of an array dtype"""
md = {}
if array.dtype.metadata is not None:
md.update(array.dtype.metadata)
@@ -171,28 +105,25 @@ def update_metadata(
# type alias for map data
-_MapData = np.ndarray
+MapData = np.ndarray
# type hint for functions returned by map generators
-_MapFunction = t.Callable[["CatalogPage"], None]
+MapFunction = t.Callable[["CatalogPage"], None]
# type hint for map generators
-_MapGenerator = t.Generator[_MapFunction, None, _MapData]
+MapGenerator = t.Generator[MapFunction, None, MapData]
-class _Map(metaclass=ABCMeta):
+class Map(metaclass=ABCMeta):
"""Abstract base class for map making from catalogues.
Concrete classes must implement the `__call__()` method which takes a
catalogue instance and returns a generator for mapping.
+
"""
def __init__(self, columns: tuple[t.Optional[str]]) -> None:
- """Initialise the map.
-
- Args:
- columns: _description_
- """
+ """Initialise the map."""
self._columns = columns
super().__init__()
@@ -202,31 +133,21 @@ def columns(self) -> tuple[t.Optional[str]]:
return self._columns
@abstractmethod
- def __call__(self, catalog: "Catalog") -> t.Union[_MapData, _MapGenerator]:
- """Implementation for mapping a catalogue.
-
- Args:
- catalog: _description_
-
- Returns:
- _description_
- """
+ def __call__(self, catalog: "Catalog") -> t.Union[MapData, MapGenerator]:
+ """Implementation for mapping a catalogue."""
...
-class HealpixMap(_Map):
+class HealpixMap(Map):
"""Abstract base class for HEALPix map making.
HEALPix maps have a resolution parameter, available as the ``nside``
property.
- """
- def __init__(self, nside: int, **kwargs: t.Any) -> None:
- """Initialize map with the given nside parameter.
+ """
- Args:
- nside: _description_
- """
+ def __init__(self, nside: int, **kwargs) -> None:
+ """Initialize map with the given nside parameter."""
self._nside: int = nside
super().__init__(**kwargs)
@@ -241,25 +162,21 @@ def nside(self, nside: int) -> None:
self._nside = nside
-class RandomizableMap(_Map):
+class RandomizableMap(Map):
"""Abstract base class for randomisable maps.
Randomisable maps have a ``randomize`` property that determines
whether or not the maps are randomised.
- """
- def __init__(self, randomize: bool, **kwargs: t.Any) -> None:
- """Initialise map with the given randomize property.
+ """
- Args:
- randomize: _description_
- """
+ def __init__(self, randomize: bool, **kwargs) -> None:
+ """Initialise map with the given randomize property."""
self._randomize = randomize
super().__init__(**kwargs)
@property
def randomize(self) -> bool:
- """_summary_."""
return self._randomize
@randomize.setter
@@ -268,27 +185,23 @@ def randomize(self, randomize: bool) -> None:
self._randomize = randomize
-class NormalizableMap(_Map):
+class NormalizableMap(Map):
"""Abstract base class for normalisable maps.
A normalised map is a map that is divided by its mean weight.
Normalisable maps have a ``normalize`` property that determines
whether or not the maps are normalised.
- """
- def __init__(self, normalize: bool, **kwargs: t.Any) -> None:
- """Initialise map with the given normalize property.
+ """
- Args:
- normalize: _description_
- """
+ def __init__(self, normalize: bool, **kwargs) -> None:
+ """Initialise map with the given normalize property."""
self._normalize = normalize
super().__init__(**kwargs)
@property
def normalize(self) -> bool:
- """_summary_."""
return self._normalize
@normalize.setter
@@ -302,6 +215,7 @@ class PositionMap(HealpixMap, RandomizableMap):
Can produce both overdensity maps and number count maps, depending
on the ``overdensity`` property.
+
"""
def __init__(
@@ -313,15 +227,7 @@ def __init__(
overdensity: bool = True,
randomize: bool = False,
) -> None:
- """Create a position map with the given properties.
-
- Args:
- nside: _description_
- lon: _description_
- lat: _description_
- overdensity: _description_
- randomize: _description_
- """
+ """Create a position map with the given properties."""
super().__init__(columns=(lon, lat), nside=nside, randomize=randomize)
self._overdensity: bool = overdensity
@@ -332,21 +238,11 @@ def overdensity(self) -> bool:
@overdensity.setter
def overdensity(self, overdensity: bool) -> None:
- """_summary_."""
self._overdensity = overdensity
- def __call__(self, catalog: "Catalog") -> _MapGenerator:
- """Map the given catalogue.
-
- Args:
- catalog: _description_
+ def __call__(self, catalog: "Catalog") -> MapGenerator:
+ """Map the given catalogue."""
- Returns:
- _description_
-
- Yields:
- _description_
- """
# get catalogue column definition
col = self.columns
@@ -359,12 +255,8 @@ def __call__(self, catalog: "Catalog") -> _MapGenerator:
# keep track of the total number of galaxies
ngal = 0
+ # function to map catalogue data
def mapper(page: "CatalogPage") -> None:
- """Function to map catalogue data.
-
- Args:
- page: _description_
- """
nonlocal ngal
if not self._randomize:
@@ -443,34 +335,17 @@ def __init__(
*,
normalize: bool = True,
) -> None:
- """Create a new real map.
-
- Args:
- nside: _description_
- lon: _description_
- lat: _description_
- value: _description_
- weight: _description_
- normalize: _description_
- """
+ """Create a new real map."""
+
super().__init__(
columns=(lon, lat, value, weight),
nside=nside,
normalize=normalize,
)
- def __call__(self, catalog: "Catalog") -> _MapGenerator:
- """Map real values from catalogue to HEALPix map.
+ def __call__(self, catalog: "Catalog") -> MapGenerator:
+ """Map real values from catalogue to HEALPix map."""
- Args:
- catalog: _description_
-
- Returns:
- _description_
-
- Yields:
- _description_
- """
# get the column definition of the catalogue
*col, wcol = self.columns
@@ -486,12 +361,8 @@ def __call__(self, catalog: "Catalog") -> _MapGenerator:
ngal = 0
wmean, var = 0.0, 0.0
+ # go through pages in catalogue and map values
def mapper(page: "CatalogPage") -> None:
- """Go through pages in catalogue and map values.
-
- Args:
- page: _description_
- """
nonlocal ngal, wmean, var
if wcol is not None:
@@ -561,6 +432,7 @@ class ComplexMap(HealpixMap, NormalizableMap, RandomizableMap):
Can optionally flip the sign of the second shear component,
depending on the ``conjugate`` property.
+
"""
def __init__(
@@ -577,20 +449,8 @@ def __init__(
normalize: bool = True,
randomize: bool = False,
) -> None:
- """Create a new shear map.
-
- Args:
- nside: _description_
- lon: _description_
- lat: _description_
- real: _description_
- imag: _description_
- weight: _description_
- spin: _description_
- conjugate: _description_
- normalize: _description_
- randomize: _description_
- """
+ """Create a new shear map."""
+
self._spin: int = spin
self._conjugate: bool = conjugate
super().__init__(
@@ -620,18 +480,9 @@ def conjugate(self, conjugate: bool) -> None:
"""Set the conjugate flag."""
self._conjugate = conjugate
- def __call__(self, catalog: "Catalog") -> _MapGenerator:
- """Map shears from catalogue to HEALPix map.
-
- Args:
- catalog: _description_
-
- Returns:
- _description_
+ def __call__(self, catalog: "Catalog") -> MapGenerator:
+ """Map shears from catalogue to HEALPix map."""
- Yields:
- _description_
- """
# get the column definition of the catalogue
*col, wcol = self.columns
@@ -651,14 +502,9 @@ def __call__(self, catalog: "Catalog") -> _MapGenerator:
ngal = 0
wmean, var = 0.0, 0.0
+ # go through pages in catalogue and get the shear values,
+ # randomise if asked to, and do the mapping
def mapper(page: "CatalogPage") -> None:
- """Go through pages in catalogue.
-
- Get the shear values, randomise if asked to, and do the mapping.
-
- Args:
- page: _description_
- """
nonlocal ngal, wmean, var
if wcol is not None:
@@ -733,25 +579,12 @@ class VisibilityMap(HealpixMap):
"""Copy visibility map from catalogue at given resolution."""
def __init__(self, nside: int) -> None:
- """Create visibility map at given NSIDE parameter.
-
- Args:
- nside: _description_
- """
+ """Create visibility map at given NSIDE parameter."""
super().__init__(columns=(), nside=nside)
- def __call__(self, catalog: "Catalog") -> _MapData:
- """Create a visibility map from the given catalogue.
+ def __call__(self, catalog: "Catalog") -> MapData:
+ """Create a visibility map from the given catalogue."""
- Args:
- catalog: _description_
-
- Raises:
- ValueError: _description_
-
- Returns:
- _description_
- """
# make sure that catalogue has a visibility map
vmap = catalog.visibility
if vmap is None:
@@ -785,31 +618,14 @@ def __init__(
lat: str,
weight: str,
*,
- normalize: bool = True,
+ normalize=True,
) -> None:
- """Create a new weight map.
-
- Args:
- nside: _description_
- lon: _description_
- lat: _description_
- weight: _description_
- normalize: _description_
- """
+ """Create a new weight map."""
super().__init__(columns=(lon, lat, weight), nside=nside, normalize=normalize)
- def __call__(self, catalog: "Catalog") -> _MapGenerator:
- """Map catalogue weights.
+ def __call__(self, catalog: "Catalog") -> MapGenerator:
+ """Map catalogue weights."""
- Args:
- catalog: _description_
-
- Returns:
- _description_
-
- Yields:
- _description_
- """
# get the columns for this map
*col, wcol = self.columns
@@ -820,12 +636,8 @@ def __call__(self, catalog: "Catalog") -> _MapGenerator:
# create the weight map
wht = np.zeros(npix)
+ # map catalogue
def mapper(page: "CatalogPage") -> None:
- """Map catalogue.
-
- Args:
- page: _description_
- """
lon, lat = page.get(*col)
if wcol is None:
@@ -861,23 +673,12 @@ def mapper(page: "CatalogPage") -> None:
return wht
-_Spin2Map = partial(ComplexMap, spin=2)
-ShearMap = _Spin2Map
-EllipticityMap = _Spin2Map
-
+Spin2Map = partial(ComplexMap, spin=2)
+ShearMap = Spin2Map
+EllipticityMap = Spin2Map
-def _close_and_return(generator: t.Generator) -> t.Any:
- """_summary_.
- Args:
- generator: _description_
-
- Raises:
- RuntimeError: _description_
-
- Returns:
- _description_
- """
+def _close_and_return(generator):
try:
next(generator)
except StopIteration as end:
@@ -888,7 +689,7 @@ def _close_and_return(generator: t.Generator) -> t.Any:
def map_catalogs(
- maps: t.Mapping[t.Any, _Map],
+ maps: t.Mapping[t.Any, Map],
catalogs: t.Mapping[t.Any, "Catalog"],
*,
parallel: bool = False,
@@ -896,24 +697,9 @@ def map_catalogs(
include: t.Optional[t.Sequence[tuple[t.Any, t.Any]]] = None,
exclude: t.Optional[t.Sequence[tuple[t.Any, t.Any]]] = None,
progress: bool = False,
-) -> dict[tuple[t.Any, t.Any], _MapData]:
- """Make maps for a set of catalogues.
-
- Args:
- maps: _description_
- catalogs: _description_
- parallel: _description_
- out: _description_
- include: _description_
- exclude: _description_
- progress: _description_
-
- Raises:
- RuntimeError: _description_
-
- Returns:
- _description_
- """
+) -> dict[tuple[t.Any, t.Any], MapData]:
+ """Make maps for a set of catalogues."""
+
# the toc dict of maps
if out is None:
out = TocDict()
@@ -1017,25 +803,14 @@ def map_catalogs(
def transform_maps(
- maps: t.Mapping[tuple[t.Any, t.Any], _MapData],
+ maps: t.Mapping[tuple[t.Any, t.Any], MapData],
*,
- out: t.Optional[t.MutableMapping[t.Any, t.Any]] = None,
+ out: t.MutableMapping[t.Any, t.Any] = None,
progress: bool = False,
- **kwargs: t.Any,
+ **kwargs,
) -> dict[tuple[t.Any, t.Any], np.ndarray]:
- """Transform a set of maps to alms.
+ """transform a set of maps to alms"""
- Args:
- maps: _description_
- out: _description_
- progress: _description_
-
- Raises:
- NotImplementedError: _description_
-
- Returns:
- _description_
- """
# the output toc dict
if out is None:
out = TocDict()
@@ -1054,7 +829,7 @@ def transform_maps(
md = m.dtype.metadata or {}
spin = md.get("spin", 0)
- _logger.info("transforming %s map (spin %s) for bin %s", k, spin, i)
+ logger.info("transforming %s map (spin %s) for bin %s", k, spin, i)
if spin == 0:
pol = False
diff --git a/heracles/plot.py b/heracles/plot.py
index df9f083..444608b 100644
--- a/heracles/plot.py
+++ b/heracles/plot.py
@@ -16,30 +16,21 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see .
-"""Utility functions for plotting."""
+"""utility functions for plotting"""
from collections import defaultdict
from collections.abc import Mapping
from itertools import chain, count, cycle
-from typing import Optional, TypeVar
-import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
from cycler import cycler
-_DEFAULT_CYCLER = cycler(linestyle=["-", "--", ":", "-."])
+DEFAULT_CYCLER = cycler(linestyle=["-", "--", ":", "-."])
-def _dont_draw_zero_tick(tick: mpl.axis.Tick) -> None:
- """Custom draw function for ticks that does not draw zero.
-
- Args:
- tick: _description_
-
- Returns:
- _description_
- """
+def _dont_draw_zero_tick(tick):
+ """custom draw function for ticks that does not draw zero"""
draw = tick.draw
def wrap(*args, **kwargs):
@@ -51,52 +42,27 @@ def wrap(*args, **kwargs):
return wrap
-def _pad_ylim(ymin: float, ymax: float) -> tuple[float, float]:
- """Pad the y axis range depending on signs.
-
- Args:
- ymin: _description_
- ymax: _description_
-
- Returns:
- _description_
- """
+def _pad_ylim(ymin, ymax):
+ """pad the y axis range depending on signs"""
return (ymin * 10 ** (-np.sign(ymin) / 2), ymax * 10 ** (np.sign(ymax) / 2))
def postage_stamps(
- plot: Optional[TypeVar("Unknown")] = None,
- transpose: Optional[TypeVar("Unknown")] = None,
+ plot=None,
+ transpose=None,
*,
- scale: Optional[TypeVar("Unknown")] = None,
- trxshift: int = 0,
- tryshift: int = 0,
- stampsize: float = 1.0,
- hatch_empty: bool = False,
- linscale: float = 0.01,
- cycler: Optional[TypeVar("Unknown")] = None,
-) -> mpl.figure.Figure:
- """Create a postage stamp plot for cls.
-
- Args:
- plot: _description_
- transpose: _description_
- scale: _description_
- trxshift: _description_
- tryshift: _description_
- stampsize: _description_
- hatch_empty: _description_
- linscale: _description_
- cycler: _description_
-
- Raises:
- ValueError: _description_
-
- Returns:
- _description_
- """
+ scale=None,
+ trxshift=0,
+ tryshift=0,
+ stampsize=1.0,
+ hatch_empty=False,
+ linscale=0.01,
+ cycler=None,
+):
+ """create a postage stamp plot for cls"""
+
if cycler is None:
- cycler = _DEFAULT_CYCLER
+ cycler = DEFAULT_CYCLER
if plot is None and transpose is None:
msg = "missing plot data"
diff --git a/heracles/twopoint.py b/heracles/twopoint.py
index 0602fa2..3235079 100644
--- a/heracles/twopoint.py
+++ b/heracles/twopoint.py
@@ -16,17 +16,15 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see .
-"""Module for angular power spectrum estimation."""
+"""module for angular power spectrum estimation"""
import logging
import time
from datetime import timedelta
from itertools import combinations_with_replacement, product
-from typing import Any, Optional, TypeVar, Union
import healpy as hp
import numpy as np
-import numpy.typing as npt
from convolvecl import mixmat, mixmat_eb
from .core import TocDict, toc_match
@@ -40,37 +38,20 @@
update_metadata,
)
-_logger = logging.getLogger(__name__)
+logger = logging.getLogger(__name__)
-def angular_power_spectra(
- alms: TocDict,
- alms2: Optional[TocDict] = None,
- *,
- lmax: Optional[int] = None,
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
-) -> TocDict:
- """Compute angular power spectra from a set of alms.
-
- Args:
- alms: _description_
- alms2: _description_
- lmax: _description_
- include: _description_
- exclude: _description_
-
- Returns:
- _description_
- """
- _logger.info(
+def angular_power_spectra(alms, alms2=None, *, lmax=None, include=None, exclude=None):
+ """compute angular power spectra from a set of alms"""
+
+ logger.info(
"computing cls for %d%s alm(s)",
len(alms),
f"x{len(alms2)}" if alms2 is not None else "",
)
t = time.monotonic()
- _logger.info("using LMAX = %s for cls", lmax)
+ logger.info("using LMAX = %s for cls", lmax)
# collect all alm combinations for computing cls
if alms2 is None:
@@ -98,7 +79,7 @@ def angular_power_spectra(
if not toc_match((k1, k2, i1, i2), include, exclude):
continue
- _logger.info("computing %s x %s cl for bins %s, %s", k1, k2, i1, i2)
+ logger.info("computing %s x %s cl for bins %s, %s", k1, k2, i1, i2)
# compute the raw cl from the alms
cl = hp.alm2cl(alm1, alm2, lmax_out=lmax)
@@ -125,7 +106,7 @@ def angular_power_spectra(
# keep track of names
twopoint_names.add((k1, k2))
- _logger.info(
+ logger.info(
"computed %d cl(s) in %s",
len(cls),
timedelta(seconds=(time.monotonic() - t)),
@@ -135,22 +116,10 @@ def angular_power_spectra(
return cls
-def debias_cls(
- cls,
- bias: Optional[TypeVar("Unknown")] = None,
- *,
- inplace: bool = False,
-) -> Union[TypeVar("Unknown"), TocDict]:
- """Remove bias from cls.
-
- Args:
- bias: _description_
- inplace: _description_
+def debias_cls(cls, bias=None, *, inplace=False):
+ """remove bias from cls"""
- Returns:
- _description_
- """
- _logger.info("debiasing %d cl(s)%s", len(cls), " in place" if inplace else "")
+ logger.info("debiasing %d cl(s)%s", len(cls), " in place" if inplace else "")
t = time.monotonic()
# the output toc dict
@@ -158,7 +127,7 @@ def debias_cls(
# subtract bias of each cl in turn
for key in cls:
- _logger.info("debiasing %s x %s cl for bins %s, %s", *key)
+ logger.info("debiasing %s x %s cl for bins %s, %s", *key)
cl = cls[key]
md = cl.dtype.metadata or {}
@@ -188,7 +157,7 @@ def debias_cls(
# store debiased cl in output set
out[key] = cl
- _logger.info(
+ logger.info(
"debiased %d cl(s) in %s",
len(out),
timedelta(seconds=(time.monotonic() - t)),
@@ -198,19 +167,10 @@ def debias_cls(
return out
-def depixelate_cls(cls, *, inplace: bool = False) -> Union[TypeVar("Unknown"), TocDict]:
- """Remove discretisation kernel from cls.
+def depixelate_cls(cls, *, inplace=False):
+ """remove discretisation kernel from cls"""
- Args:
- inplace: _description_
-
- Raises:
- ValueError: _description_
-
- Returns:
- _description_
- """
- _logger.info("depixelate %d cl(s)%s", len(cls), " in place" if inplace else "")
+ logger.info("depixelate %d cl(s)%s", len(cls), " in place" if inplace else "")
t = time.monotonic()
# keep a cache of convolution kernels (i.e. pixel window functions)
@@ -223,7 +183,7 @@ def depixelate_cls(cls, *, inplace: bool = False) -> Union[TypeVar("Unknown"), T
# remove effect of convolution for each cl in turn
for key in cls:
- _logger.info("depixelate %s x %s cl for bins %s, %s", *key)
+ logger.info("depixelate %s x %s cl for bins %s, %s", *key)
cl = cls[key]
md = cl.dtype.metadata or {}
@@ -244,7 +204,7 @@ def depixelate_cls(cls, *, inplace: bool = False) -> Union[TypeVar("Unknown"), T
# deconvolve the kernels of the first and second map
for i, spin, kernel in zip([1, 2], spins, kernels):
- _logger.info("- spin-%s %s kernel", spin, kernel)
+ logger.info("- spin-%s %s kernel", spin, kernel)
if kernel is None:
fl = None
a = None
@@ -256,7 +216,7 @@ def depixelate_cls(cls, *, inplace: bool = False) -> Union[TypeVar("Unknown"), T
fls[kernel][nside, lmax, 2] = fl2
fl = fls[kernel].get((nside, lmax, spin))
if fl is None:
- _logger.warning(
+ logger.warning(
"no HEALPix kernel for NSIDE = %s, LMAX = %s, SPIN = %s",
nside,
lmax,
@@ -284,7 +244,7 @@ def depixelate_cls(cls, *, inplace: bool = False) -> Union[TypeVar("Unknown"), T
# store depixelated cl in output set
out[key] = cl
- _logger.info(
+ logger.info(
"depixelated %d cl(s) in %s",
len(out),
timedelta(seconds=(time.monotonic() - t)),
@@ -294,27 +254,13 @@ def depixelate_cls(cls, *, inplace: bool = False) -> Union[TypeVar("Unknown"), T
return out
-def mixing_matrices(
- cls,
- *,
- l1max: Optional[TypeVar("Unknown")] = None,
- l2max: Optional[TypeVar("Unknown")] = None,
- l3max: Optional[TypeVar("Unknown")] = None,
-) -> TocDict:
- """Compute mixing matrices from a set of cls.
-
- Args:
- l1max: _description_
- l2max: _description_
- l3max: _description_
-
- Returns:
- _description_
- """
- _logger.info("computing two-point mixing matrices for %d cl(s)", len(cls))
+def mixing_matrices(cls, *, l1max=None, l2max=None, l3max=None):
+ """compute mixing matrices from a set of cls"""
+
+ logger.info("computing two-point mixing matrices for %d cl(s)", len(cls))
t = time.monotonic()
- _logger.info("using L1MAX = %s, L2MAX = %s, L3MAX = %s", l1max, l2max, l3max)
+ logger.info("using L1MAX = %s, L2MAX = %s, L3MAX = %s", l1max, l2max, l3max)
# set of computed mixing matrices
mms = TocDict()
@@ -325,19 +271,19 @@ def mixing_matrices(
if cl.dtype.names is not None:
cl = cl["CL"]
if k1 == "V" and k2 == "V":
- _logger.info("computing 00 mixing matrix for bins %s, %s", i1, i2)
+ logger.info("computing 00 mixing matrix for bins %s, %s", i1, i2)
w00 = mixmat(cl, l1max=l1max, l2max=l2max, l3max=l3max)
mms["00", i1, i2] = w00
elif k1 == "V" and k2 == "W":
- _logger.info("computing 0+ mixing matrix for bins %s, %s", i1, i2)
+ logger.info("computing 0+ mixing matrix for bins %s, %s", i1, i2)
w0p = mixmat(cl, l1max=l1max, l2max=l2max, l3max=l3max, spin=(0, 2))
mms["0+", i1, i2] = w0p
elif k1 == "W" and k2 == "V":
- _logger.info("computing 0+ mixing matrix for bins %s, %s", i2, i1)
+ logger.info("computing 0+ mixing matrix for bins %s, %s", i2, i1)
w0p = mixmat(cl, l1max=l1max, l2max=l2max, l3max=l3max, spin=(2, 0))
mms["0+", i2, i1] = w0p
elif k1 == "W" and k2 == "W":
- _logger.info("computing ++, --, +- mixing matrices for bins %s, %s", i1, i2)
+ logger.info("computing ++, --, +- mixing matrices for bins %s, %s", i1, i2)
wpp, wmm, wpm = mixmat_eb(
cl,
l1max=l1max,
@@ -349,7 +295,7 @@ def mixing_matrices(
mms["--", i1, i2] = wmm
mms["+-", i1, i2] = wpm
else:
- _logger.warning(
+ logger.warning(
"computing unknown %s x %s mixing matrix for bins %s, %s",
k1,
k2,
@@ -359,7 +305,7 @@ def mixing_matrices(
w = mixmat(cl, l1max=l1max, l2max=l2max, l3max=l3max)
mms[f"{k1}{k2}", i1, i2] = w
- _logger.info(
+ logger.info(
"computed %d mm(s) in %s",
len(mms),
timedelta(seconds=(time.monotonic() - t)),
@@ -369,24 +315,11 @@ def mixing_matrices(
return mms
-def pixelate_mms_healpix(
- mms: TypeVar("Unknown"),
- nside: TypeVar("Unknown"),
- *,
- inplace: bool = False,
-) -> Union[TypeVar("Unknown"), TocDict]:
- """Apply HEALPix pixel window function to mms.
-
- Args:
- mms: _description_
- nside: _description_
- inplace: _description_
+def pixelate_mms_healpix(mms, nside, *, inplace=False):
+ """apply HEALPix pixel window function to mms"""
- Returns:
- _description_
- """
- _logger.info("pixelate %d mm(s)%s", len(mms), " in place" if inplace else "")
- _logger.info("kernel: HEALPix, NSIDE=%d", nside)
+ logger.info("pixelate %d mm(s)%s", len(mms), " in place" if inplace else "")
+ logger.info("kernel: HEALPix, NSIDE=%d", nside)
t = time.monotonic()
# pixel window functions
@@ -402,7 +335,7 @@ def pixelate_mms_healpix(
# apply discretisation kernel from cl to each mm in turn
for key in mms:
- _logger.info("pixelate %s mm for bins %s, %s", *key)
+ logger.info("pixelate %s mm for bins %s, %s", *key)
mm = mms[key]
if not inplace:
@@ -410,7 +343,7 @@ def pixelate_mms_healpix(
n = np.shape(mm)[-2]
if n >= lmax:
- _logger.error(
+ logger.error(
"no HEALPix pixel window function for NSIDE=%d and LMAX=%d",
nside,
n - 1,
@@ -424,13 +357,13 @@ def pixelate_mms_healpix(
elif name in ["++", "--", "+-", "-+"]:
mm *= fl2[:n] * fl2[:n]
else:
- _logger.warning("unknown mixing matrix, assuming spin-0")
+ logger.warning("unknown mixing matrix, assuming spin-0")
mm *= fl0[:n] * fl0[:n]
# store pixelated mm in output set
out[key] = mm
- _logger.info(
+ logger.info(
"pixelated %d mm(s) in %s",
len(out),
timedelta(seconds=(time.monotonic() - t)),
@@ -440,35 +373,11 @@ def pixelate_mms_healpix(
return out
-def bin2pt(
- arr: TypeVar("Unknown"),
- bins: TypeVar("Unknown"),
- name: TypeVar("Unknown"),
- *,
- weights: Optional[TypeVar("Unknown")] = None,
-) -> npt.NDArray:
- """Compute binned two-point data.
-
- Args:
- arr: _description_
- bins: _description_
- name: _description_
- weights: _description_
-
- Returns:
- _description_
- """
+def bin2pt(arr, bins, name, *, weights=None):
+ """Compute binned two-point data."""
- def norm(a: npt.NDArray, b: npt.NDArray) -> npt.NDArray:
- """Divide a by b if a is nonzero.
-
- Args:
- a: _description_
- b: _description_
-
- Returns:
- _description_
- """
+ def norm(a, b):
+ """divide a by b if a is nonzero"""
out = np.zeros(np.broadcast(a, b).shape)
return np.divide(a, b, where=(a != 0), out=out)
@@ -533,7 +442,8 @@ def norm(a: npt.NDArray, b: npt.NDArray) -> npt.NDArray:
def binned_cls(cls, bins, *, weights=None, out=None):
- """Compute binned angular power spectra."""
+ """compute binned angular power spectra"""
+
if out is None:
out = TocDict()
@@ -544,7 +454,8 @@ def binned_cls(cls, bins, *, weights=None, out=None):
def binned_mms(mms, bins, *, weights=None, out=None):
- """Compute binned mixing matrices."""
+ """compute binned mixing matrices"""
+
if out is None:
out = TocDict()
@@ -555,36 +466,25 @@ def binned_mms(mms, bins, *, weights=None, out=None):
def random_bias(
- maps: TypeVar("Unknown"),
- catalogs: TypeVar("Unknown"),
+ maps,
+ catalogs,
*,
- repeat: int = 1,
- full: bool = False,
- parallel: bool = False,
- include: Optional[TypeVar("Unknown")] = None,
- exclude: Optional[TypeVar("Unknown")] = None,
- progress: bool = False,
- **kwargs: Any,
-) -> TocDict:
- """Bias estimate from randomised maps.
+ repeat=1,
+ full=False,
+ parallel=False,
+ include=None,
+ exclude=None,
+ progress=False,
+ **kwargs,
+):
+ """bias estimate from randomised maps
The ``include`` and ``exclude`` selection is applied to the maps.
- Args:
- maps: _description_
- catalogs: _description_
- repeat: _description_
- full: _description_
- parallel: _description_
- include: _description_
- exclude: _description_
- progress: _description_
-
- Returns:
- _description_
"""
- _logger.info("estimating two-point bias for %d catalog(s)", len(catalogs))
- _logger.info("randomising %s maps", ", ".join(map(str, maps)))
+
+ logger.info("estimating two-point bias for %d catalog(s)", len(catalogs))
+ logger.info("randomising %s maps", ", ".join(map(str, maps)))
t = time.monotonic()
# grab lmax parameter if given
@@ -593,7 +493,7 @@ def random_bias(
# include will be set below after we have the first set of alms
include_cls = None
if full:
- _logger.info("estimating cross-biases")
+ logger.info("estimating cross-biases")
# set all input maps to randomize
# store and later reset their initial state
@@ -605,7 +505,7 @@ def random_bias(
nbs = TocDict()
for n in range(repeat):
- _logger.info(
+ logger.info(
"estimating bias from randomised maps%s",
"" if n == 0 else f" (repeat {n+1})",
)
@@ -634,7 +534,7 @@ def random_bias(
for k, m in maps.items():
m.randomize = randomize[k]
- _logger.info(
+ logger.info(
"estimated %d two-point biases in %s",
len(nbs),
timedelta(seconds=(time.monotonic() - t)),
diff --git a/heracles/util.py b/heracles/util.py
index 3b61784..f485d4f 100644
--- a/heracles/util.py
+++ b/heracles/util.py
@@ -16,76 +16,54 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with Heracles. If not, see .
-"""Module for utilities."""
-import io
+"""module for utilities"""
+
import os
import sys
import time
from datetime import timedelta
-from typing import Optional, TypeVar
class Progress:
- """Simple progress bar for operations."""
-
- def __init__(self, out: io.IOBase = sys.stdout) -> None:
- """Create a new progress bar.
-
- Args:
- out: _description_
- """
- self._out = out
- self._time = 0
- self._progress = 0
- self._total = 0
- self._title = None
+ """simple progress bar for operations"""
- def start(
- self,
- total: TypeVar("Unknown"),
- title: Optional[str] = None,
- ) -> None:
- """Start new progress.
+ def __init__(self, out=sys.stdout):
+ """create a new progress bar"""
+ self.out = out
+ self.time = 0
+ self.progress = 0
+ self.total = 0
+ self.title = None
- Args:
- total: _description_
- title: _description_
- """
- self._time = time.monotonic()
- self._progress = 0
- self._total = total
- self._title = title
+ def start(self, total, title=None):
+ """start new progress"""
+ self.time = time.monotonic()
+ self.progress = 0
+ self.total = total
+ self.title = title
self.update(0)
- def update(self, step: int = 1) -> None:
- """Update progress.
-
- Args:
- step: _description_
- """
- self._progress = min(self._progress + step, self._total)
- m = f"{self._title!s}: " if self._title is not None else ""
- p = self._progress / self._total
+ def update(self, step=1):
+ """update progress"""
+ self.progress = min(self.progress + step, self.total)
+ m = f"{self.title!s}: " if self.title is not None else ""
+ p = self.progress / self.total
b = "#" * int(20 * p)
- f = f"{self._progress:_}/{self._total:_}"
- t = timedelta(seconds=(time.monotonic() - self._time))
+ f = f"{self.progress:_}/{self.total:_}"
+ t = timedelta(seconds=(time.monotonic() - self.time))
s = f"\r{m}{100*p:3.0f}% |{b:20s}| {f} | {t}"
try:
- w, _ = os.get_terminal_size(self._out.fileno())
+ w, _ = os.get_terminal_size(self.out.fileno())
except (OSError, AttributeError):
pass
else:
if w > 0:
s = s[:w]
- self._out.write(s)
- self._out.flush()
-
- def stop(self, complete: bool = True) -> None:
- """Stop progress and end line.
+ self.out.write(s)
+ self.out.flush()
- Args:
- complete: _description_
- """
+ def stop(self, complete=True):
+ """stop progress and end line"""
if complete:
- self.update(self._total - self._progress)
- self._out.write("\n")
+ self.update(self.total - self.progress)
+ self.out.write("\n")
diff --git a/mkdocs.yml b/mkdocs.yml
index bfbdf00..4d7e213 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -29,11 +29,11 @@ plugins:
# headings
show_category_heading: true
show_object_full_path: false
- show_root_toc_entry: false
+ show_root_toc_entry: true
# members
filters: ["!^__?"]
inherited_members: true
- show_submodules: true
+ show_submodules: false
# docstrings
docstring_options:
ignore_init_summary: true
diff --git a/pyproject.toml b/pyproject.toml
index 2ac69f0..9a98163 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -71,24 +71,14 @@ extend-exclude = [
]
fix = true
force-exclude = true
-ignore = [
- "D203",
- "D213",
- "D401",
- "D406",
- "D407",
- "D417",
-]
line-length = 100
-per-file-ignores = {"tests/*" = [
- "D",
+per-file-ignores = {"test_*" = [
"S101",
]}
select = [
"A",
"BLE",
"COM",
- "D",
"DJ",
"DTZ",
"E",
diff --git a/tests/test_catalog.py b/tests/test_catalog.py
index 77c7094..912d04d 100644
--- a/tests/test_catalog.py
+++ b/tests/test_catalog.py
@@ -5,7 +5,7 @@
@pytest.fixture
def catalog():
- from heracles.catalog import CatalogPage, _CatalogBase
+ from heracles.catalog import CatalogBase, CatalogPage
# fix a set of rows to be returned for testing
size = 100
@@ -13,7 +13,7 @@ def catalog():
y = np.random.rand(size)
z = np.random.rand(size)
- class TestCatalog(_CatalogBase):
+ class TestCatalog(CatalogBase):
SIZE = size
DATA = dict(x=x, y=y, z=z)
@@ -145,21 +145,21 @@ def test_catalog_page_immutable():
def test_catalog_base(catalog):
- from heracles.catalog import Catalog, _CatalogBase
+ from heracles.catalog import Catalog, CatalogBase
# ABC cannot be instantiated directly
with pytest.raises(TypeError):
- _CatalogBase()
+ CatalogBase()
# fixture has tested concrete implementation
- assert isinstance(catalog, _CatalogBase)
+ assert isinstance(catalog, CatalogBase)
# check that CatalogBase implements the Catalog protocol
assert isinstance(catalog, Catalog)
def test_catalog_base_properties(catalog):
- from heracles.catalog import _CatalogBase
+ from heracles.catalog import CatalogBase
assert catalog.size == catalog.SIZE
assert catalog.names == list(catalog.DATA.keys())
@@ -167,11 +167,11 @@ def test_catalog_base_properties(catalog):
assert catalog.base is None
assert catalog.selection is None
- assert catalog.page_size == _CatalogBase._default_page_size
+ assert catalog.page_size == CatalogBase.default_page_size
catalog.page_size = 1
assert catalog.page_size == 1
- catalog.page_size = _CatalogBase._default_page_size
- assert catalog.page_size == _CatalogBase._default_page_size
+ catalog.page_size = CatalogBase.default_page_size
+ assert catalog.page_size == CatalogBase.default_page_size
filt = object()
assert catalog.filters == []
@@ -202,9 +202,9 @@ def test_catalog_base_pagination(catalog):
def test_catalog_base_copy():
- from heracles.catalog import _CatalogBase
+ from heracles.catalog import CatalogBase
- class TestCatalog(_CatalogBase):
+ class TestCatalog(CatalogBase):
def __init__(self):
super().__init__()
self._visibility = object()