diff --git a/src/ekobox/apply.py b/src/ekobox/apply.py index e7932f54b..8d92f9a7b 100644 --- a/src/ekobox/apply.py +++ b/src/ekobox/apply.py @@ -1,146 +1,131 @@ -"""Apply evolution operator to a PDF.""" +"""Apply operator evolution to PDF set.""" -from collections.abc import Sequence -from typing import Optional +from dataclasses import dataclass +from typing import Dict, Optional, Union import numpy as np -import numpy.typing as npt from eko import basis_rotation as br from eko import interpolation from eko.io import EKO from eko.io.types import EvolutionPoint -RawPdfResult = dict[EvolutionPoint, npt.ArrayLike] -"""Evolved PDFs as raw grids. -The key is given by the associated evolution point. The values are -tensors sorted by (replica, flavor, xgrid). -""" +def apply_pdf( + eko: EKO, + lhapdf_like, + targetgrid=None, + rotate_to_evolution_basis=False, +): + """Apply all available operators to the input PDFs. -RawErrorResult = dict[EvolutionPoint, Optional[npt.ArrayLike]] -"""Integration errors for evolved PDFs as raw grids. + Parameters + ---------- + output : eko.output.EKO + eko output object containing all informations + lhapdf_like : object + object that provides an xfxQ2 callable (as `lhapdf `_ + and :class:`ekomark.toyLH.toyPDF` do) (and thus is in flavor basis) + targetgrid : list + if given, interpolates to the pdfs given at targetgrid (instead of xgrid) + rotate_to_evolution_basis : bool + if True rotate to evoluton basis -The key is given by the associated evolution point. The values are -tensors sorted by (replica, flavor, xgrid). -""" + Returns + ------- + out_grid : dict + output PDFs and their associated errors for the computed mu2grid + """ + qed = eko.theory_card.order[1] > 0 + if rotate_to_evolution_basis: + if not qed: + rotate_flavor_to_evolution = br.rotate_flavor_to_evolution + labels = br.evol_basis + else: + rotate_flavor_to_evolution = br.rotate_flavor_to_unified_evolution + labels = br.unified_evol_basis + return apply_pdf_flavor( + eko, lhapdf_like, targetgrid, rotate_flavor_to_evolution, labels=labels + ) + return apply_pdf_flavor(eko, lhapdf_like, targetgrid) -LabeledPdfResult = dict[EvolutionPoint, dict[int, npt.ArrayLike]] -"""Evolved PDFs labeled by their PDF identifier. +CONTRACTION = "ajbk,bk" -The outer key is given by the associated evolution point. The inner key -is the |PID|. The inner values are the values for along the xgrid. -""" -LabeledErrorResult = dict[EvolutionPoint, dict[int, Optional[npt.ArrayLike]]] -"""Integration errors for evolved PDFs labeled by their PDF identifier. +_PdfLabel = Union[int, str] +"""PDF identifier: either PID or label.""" -The outer key is given by the associated evolution point. The inner key -is the |PID|. The inner values are the values for along the xgrid. -""" +@dataclass +class PdfResult: + """Helper class to collect PDF results.""" -def apply_pdf( - eko: EKO, - lhapdf_like, - targetgrid: npt.ArrayLike = None, - rotate_to_evolution_basis: bool = False, -) -> tuple[LabeledPdfResult, LabeledErrorResult]: - """Apply all available operators to the input PDF. + pdfs: Dict[_PdfLabel, float] + errors: Optional[Dict[_PdfLabel, float]] = None + + +def apply_pdf_flavor( + eko: EKO, lhapdf_like, targetgrid=None, flavor_rotation=None, labels=None +): + """Apply all available operators to the input PDFs. Parameters ---------- - eko : - eko output object containing all informations - lhapdf_like : Any - object that provides an `xfxQ2` callable (as `lhapdf `_ - and :class:`ekomark.toyLH.toyPDF` do) (and thus is in flavor basis) - targetgrid : - if given, interpolates to the targetgrid (instead of xgrid) - rotate_to_evolution_basis : - if True rotate to evoluton basis + output : eko.output.EKO + eko output object containing all informations + lhapdf_like : object + object that provides an xfxQ2 callable (as `lhapdf `_ + and :class:`ekomark.toyLH.toyPDF` do) (and thus is in flavor basis) + targetgrid : list + if given, interpolates to the pdfs given at targetgrid (instead of xgrid) + flavor_rotation : np.ndarray + Rotation matrix in flavor space + labels : list + list of labels Returns ------- - pdfs : - PDFs for the computed evolution points - errors : - Integration errors for PDFs for the computed evolution points + out_grid : dict + output PDFs and their associated errors for the computed mu2grid """ # create pdfs - input_pdfs = np.zeros((len(br.flavor_basis_pids), len(eko.xgrid))) + pdfs = np.zeros((len(br.flavor_basis_pids), len(eko.xgrid))) for j, pid in enumerate(br.flavor_basis_pids): if not lhapdf_like.hasFlavor(pid): continue - input_pdfs[j] = np.array( + pdfs[j] = np.array( [lhapdf_like.xfxQ2(pid, x, eko.mu20) / x for x in eko.xgrid.raw] ) - # apply - grids, grid_errors = apply_grids(eko, input_pdfs[None, :]) - # post-process - qed = eko.theory_card.order[1] > 0 - flavor_rotation = None - labels = br.flavor_basis_pids - if rotate_to_evolution_basis: - if not qed: - flavor_rotation = br.rotate_flavor_to_evolution - labels = br.evol_basis_pids - else: - flavor_rotation = br.rotate_flavor_to_unified_evolution - labels = br.unified_evol_basis_pids - new_grids = rotate_result(eko, grids, labels, targetgrid, flavor_rotation) - new_errors = rotate_result(eko, grid_errors, labels, targetgrid, flavor_rotation) - # unwrap the replica axis again - pdfs: LabeledPdfResult = {} - errors: LabeledErrorResult = {} - for ep, pdf in new_grids.items(): - pdfs[ep] = { - lab: grid[0] if grid is not None else None for lab, grid in pdf.items() - } - errors[ep] = { - lab: (grid[0] if grid is not None else None) - for lab, grid in new_errors[ep].items() - } - return pdfs, errors - - -def rotate_result( - eko: EKO, - grids: RawErrorResult, - flavor_labels: Sequence[int], - targetgrid: Optional[npt.ArrayLike] = None, - flavor_rotation: Optional[npt.ArrayLike] = None, -) -> LabeledErrorResult: - """Rotate and relabel PDFs. - Parameters - ---------- - eko : - eko output object containing all informations - grids : - Raw grids coming from evolution - flavor_labels : - flavors names - targetgrid : - if given, interpolates to the targetgrid (instead of xgrid) - flavor_rotation : - if given, rotates in flavor space + # build output + out_grid: Dict[EvolutionPoint, PdfResult] = {} + for ep, elem in eko.items(): + pdf_final = np.einsum(CONTRACTION, elem.operator, pdfs, optimize="optimal") + if elem.error is not None: + error_final = np.einsum(CONTRACTION, elem.error, pdfs, optimize="optimal") + else: + error_final = None + out_grid[ep] = PdfResult(dict(zip(br.flavor_basis_pids, pdf_final))) + if error_final is not None: + out_grid[ep].errors = dict(zip(br.flavor_basis_pids, error_final)) - Returns - ------- - pdfs : - relabeled and, if requested rotated, PDFs - """ + qed = eko.theory_card.order[1] > 0 # rotate to evolution basis if flavor_rotation is not None: - new_grids = {} - for ep, pdf_grid in grids.items(): - new_grids[ep] = ( - np.einsum("ab,rbk->rak", flavor_rotation, pdf_grid, optimize="optimal") - if pdf_grid is not None - else None + for q2, op in out_grid.items(): + pdf = flavor_rotation @ np.array( + [op.pdfs[pid] for pid in br.flavor_basis_pids] ) - grids = new_grids + if not qed: + evol_basis = br.evol_basis + else: + evol_basis = br.unified_evol_basis + op.pdfs = dict(zip(evol_basis, pdf)) + if op.errors is not None: + errors = flavor_rotation @ np.array( + [op.errors[pid] for pid in br.flavor_basis_pids] + ) + op.errors = dict(zip(evol_basis, errors)) # rotate/interpolate to target grid if targetgrid is not None: @@ -150,74 +135,14 @@ def rotate_result( mode_N=False, ) - x_rotation = b.get_interpolation(targetgrid) - new_grids = {} - for ep, pdf_grid in grids.items(): - new_grids[ep] = ( - np.einsum("jk,rbk->rbj", x_rotation, pdf_grid, optimize="optimal") - if pdf_grid is not None - else None - ) - grids = new_grids - - # relabel - new_grids = {} - for ep, pdf_grid in grids.items(): - new_grids[ep] = dict( - zip( - flavor_labels, - np.swapaxes(pdf_grid, 0, 1) - if pdf_grid is not None - else [None] * len(flavor_labels), - ) - ) - grids = new_grids - - return grids - - -_EKO_CONTRACTION = "ajbk,rbk->raj" -"""Contract eko for all replicas.""" - - -def apply_grids( - eko: EKO, input_grids: npt.ArrayLike -) -> tuple[RawPdfResult, RawErrorResult]: - """Apply all available operators to the input grids. - - Parameters - ---------- - eko : - eko output object - input_grids : - 3D PDF grids evaluated at the inital scale. The axis have to be (replica, flavor, xgrid) - - Returns - ------- - pdfs : - output PDFs for the computed evolution points - pdfs : - associated integration errors for the computed evolution points - """ - # sanity check - if len(input_grids.shape) != 3 or input_grids.shape[1:] != ( - len(br.flavor_basis_pids), - len(eko.xgrid), - ): - raise ValueError( - "input grids have to be sorted by replica, flavor, xgrid!" - f"The shape has to be (r,{len(br.flavor_basis_pids)},{len(eko.xgrid)})" - ) - # iterate - pdfs: RawPdfResult = {} - errors: RawErrorResult = {} - for ep, elem in eko.items(): - pdfs[ep] = np.einsum( - _EKO_CONTRACTION, elem.operator, input_grids, optimize="optimal" - ) - errors[ep] = ( - np.einsum(_EKO_CONTRACTION, elem.error, input_grids, optimize="optimal") - if elem.error is not None - else None - ) - return pdfs, errors + rot = b.get_interpolation(targetgrid) + for evpdf in out_grid.values(): + for pdf_label in evpdf.pdfs: + evpdf.pdfs[pdf_label] = np.matmul(rot, evpdf.pdfs[pdf_label]) + if evpdf.errors is not None: + evpdf.errors[pdf_label] = np.matmul(rot, evpdf.errors[pdf_label]) + # cast back to be backward compatible + real_out_grid = {} + for ep, res in out_grid.items(): + real_out_grid[ep] = {"pdfs": res.pdfs, "errors": res.errors} + return real_out_grid diff --git a/tests/ekobox/test_apply.py b/tests/ekobox/test_apply.py index 522c4dc6d..70eb74adc 100644 --- a/tests/ekobox/test_apply.py +++ b/tests/ekobox/test_apply.py @@ -5,31 +5,34 @@ from tests.conftest import EKOFactory -def test_apply(eko_factory: EKOFactory, fake_pdf): - eko = eko_factory.get() - ep_out = eko.evolgrid[0] - # base application - pdfs, errors = apply.apply_pdf(eko, fake_pdf) - assert len(pdfs) == len(eko.evolgrid) - assert len(pdfs) == len(errors) - ep_pdfs = pdfs[ep_out] - assert list(ep_pdfs.keys()) == list(br.flavor_basis_pids) - # rotate to target_grid - for target_grid in ([0.75], [0.5, 0.6, 0.7]): - pdfs, _errors = apply.apply_pdf(eko, fake_pdf, target_grid) - ep_pdfs = pdfs[ep_out] - assert list(ep_pdfs.keys()) == list(br.flavor_basis_pids) - assert len(ep_pdfs[21]) == len(target_grid) - # rotate flavor - pdfs, _errors = apply.apply_pdf(eko, fake_pdf, rotate_to_evolution_basis=True) - ep_pdfs = pdfs[ep_out] - assert list(ep_pdfs.keys()) == list(br.evol_basis_pids) +class TestApply: + def test_apply(self, eko_factory: EKOFactory, fake_pdf): + eko = eko_factory.get() + ep_out = eko.evolgrid[0] + # fake pdfs + pdf_grid = apply.apply_pdf(eko, fake_pdf) + assert len(pdf_grid) == len(eko.evolgrid) + pdfs = pdf_grid[ep_out]["pdfs"] + assert list(pdfs.keys()) == list(br.flavor_basis_pids) + # rotate to target_grid + target_grid = [0.75] + pdf_grid = apply.apply_pdf(eko, fake_pdf, target_grid) + assert len(pdf_grid) == 1 + pdfs = pdf_grid[ep_out]["pdfs"] + assert list(pdfs.keys()) == list(br.flavor_basis_pids) - -def test_apply_grids(eko_factory: EKOFactory): - eko = eko_factory.get() - # since everything is random, we can only test the tensor shapes here - input_grids = np.random.rand(3, len(br.flavor_basis_pids), len(eko.xgrid)) - pdfs, errors = apply.apply_grids(eko, input_grids) - for ep, res in pdfs.items(): - assert res.shape == input_grids.shape + def test_apply_flavor(self, eko_factory: EKOFactory, fake_pdf, monkeypatch): + eko = eko_factory.get() + ep_out = eko.evolgrid[0] + # fake pdfs + monkeypatch.setattr( + "eko.basis_rotation.rotate_flavor_to_evolution", np.ones((14, 14)) + ) + fake_evol_basis = tuple( + ["a", "b"] + [str(x) for x in range(len(br.flavor_basis_pids) - 2)] + ) + monkeypatch.setattr("eko.basis_rotation.evol_basis", fake_evol_basis) + pdf_grid = apply.apply_pdf(eko, fake_pdf, rotate_to_evolution_basis=True) + assert len(pdf_grid) == len(eko.evolgrid) + pdfs = pdf_grid[ep_out]["pdfs"] + assert list(pdfs.keys()) == list(fake_evol_basis)