Skip to content

Commit

Permalink
API(maps): refactored concurrent mapping with coroutines (#78)
Browse files Browse the repository at this point in the history
Concurrent mapping of catalogues now uses `async def` coroutines with a
much simpler implementation. Instead of passing catalogue pages to each
mapper, mappers now individually iterate over catalogues using `async
for`, and an improved caching mechanism (now using `lru_cache`) ensures
that mappers receive the same rows and selections from a FITS file.

Besides, the progress display of `map_catalogs()` and `transform_maps()`
is much improved by using the `rich` package for dynamic progress
updates.

Closes: #76
Closes: #77
  • Loading branch information
ntessore authored Dec 10, 2023
1 parent 1e21445 commit 160f157
Show file tree
Hide file tree
Showing 9 changed files with 484 additions and 361 deletions.
187 changes: 160 additions & 27 deletions examples/example.ipynb

Large diffs are not rendered by default.

52 changes: 26 additions & 26 deletions heracles/catalog/fits.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# License along with Heracles. If not, see <https://www.gnu.org/licenses/>.
"""module for catalogue processing"""

from functools import lru_cache
from weakref import finalize, ref

import fitsio
Expand All @@ -38,6 +39,9 @@ def rowfilter(array, expr):
class FitsCatalog(CatalogBase):
"""flexible reader for catalogues from FITS files"""

READ_CACHE = 3
SELECT_CACHE = 3

def __init__(self, path, *, columns=None, ext=None):
"""create a new FITS catalogue reader
Expand Down Expand Up @@ -127,6 +131,27 @@ def _join(self, *where):
return None
return "(" + ") & (".join(map(str, filter(None, where))) + ")"

@lru_cache(maxsize=READ_CACHE)
def _read(self, start, stop):
"""
Read a range of rows from FITS.
"""

hdu = self.hdu()
names = self._names()
return hdu[names][start:stop]

@lru_cache(maxsize=SELECT_CACHE)
def _select(self, start, stop, selection):
"""
Read a range of rows from FITS and apply selection.
"""

rows = self._read(start, stop)
if selection is not None:
rows = rows[rowfilter(rows, selection)]
return rows

def _pages(self, selection):
"""iterate pages of rows in FITS file, optionally using the query"""

Expand All @@ -139,32 +164,7 @@ def _pages(self, selection):
# use all rows or selection if one is given
nrows = hdu.get_nrows()

# information for caching
hduid = id(hdu)

# now iterate all rows in batches
for start in range(0, nrows, page_size):
stop = start + page_size

# see if rows were cached
try:
if self._rowinfo == (hduid, start, stop):
rows = self._rows
else:
rows = None
except AttributeError:
rows = None

# retrieve rows if not cached
if rows is None:
rows = hdu[names][start:stop]

# update row cache
self._rowinfo = (hduid, start, stop)
self._rows = rows

# apply selection if given
if selection is not None:
rows = rows[rowfilter(rows, selection)]

rows = self._select(start, start + page_size, selection)
yield CatalogPage({name: rows[name] for name in names})
Loading

0 comments on commit 160f157

Please sign in to comment.