Skip to content

Commit

Permalink
feat(draft): add report= argument for uproot.dask; trigger report…
Browse files Browse the repository at this point in the history
… collection (take 2!) (#1058)

* add report= to trigger dask-awkward creating graceful-failure-report

* add mock_empty

* whoops

* fix mock_empty

* backend is passed in upstream; use OSError

* Update src/uproot/_dask.py

Co-authored-by: Lindsey Gray <[email protected]>

* use oop interface

* need allowed_exceptions

* time and functools

* whoops

* add to report; small fixes

* style: pre-commit fixes

* move some methods

* whoops

* None for success

* monotonic -> time

* ordering

* fixup duration stuff

* two impls...

* call_time needs to be outside wrapper

* rename argument; add function arg description to docstring

* add test

---------

Co-authored-by: Lindsey Gray <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 12, 2023
1 parent c5ff061 commit 66d1feb
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 28 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ requires-python = ">=3.8"
[project.optional-dependencies]
dev = [
"boost_histogram>=0.13",
"dask-awkward>=2023.10.0",
"dask-awkward>=2023.12.1",
"dask[array]",
"hist>=1.2",
"pandas",
Expand Down
209 changes: 182 additions & 27 deletions src/uproot/_dask.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

import functools
import math
import socket
import time
from collections.abc import Callable, Iterable, Mapping

try:
Expand Down Expand Up @@ -38,6 +41,7 @@ def dask(
allow_missing=False,
open_files=True,
form_mapping=None,
allow_read_errors_with_report=False,
**options,
):
"""
Expand Down Expand Up @@ -91,6 +95,10 @@ def dask(
form_mapping (Callable[awkward.forms.Form] -> awkward.forms.Form | None): If not none
and library="ak" then apply this remapping function to the awkward form of the input
data. The form keys of the desired form should be available data in the input form.
allow_read_errors_with_report (bool): If True, catch OSError exceptions and return an
empty array for these nodes in the task graph. The return of this function then
becomes a two element tuple, where the first return is the dask-awkward collection
of interest and the second return is a report dask-awkward collection.
options: See below.
Returns dask equivalents of the backends supported by uproot. If ``library='np'``,
Expand Down Expand Up @@ -259,6 +267,7 @@ def dask(
interp_options,
form_mapping,
steps_per_file,
allow_read_errors_with_report,
)
else:
return _get_dak_array_delay_open(
Expand All @@ -274,6 +283,7 @@ def dask(
interp_options,
form_mapping,
steps_per_file,
allow_read_errors_with_report,
)
else:
raise NotImplementedError()
Expand Down Expand Up @@ -890,6 +900,15 @@ class UprootReadMixin:
form_mapping_info: ImplementsFormMappingInfo
common_keys: frozenset[str]
interp_options: dict[str, Any]
allow_read_errors_with_report: bool

@property
def allowed_exceptions(self):
return (OSError,)

@property
def return_report(self) -> bool:
return self.allow_read_errors_with_report

def read_tree(self, tree: HasBranches, start: int, stop: int) -> AwkArray:
assert start <= stop
Expand Down Expand Up @@ -946,6 +965,15 @@ def mock(self) -> AwkArray:
behavior=self.form_mapping_info.behavior,
)

def mock_empty(self, backend) -> AwkArray:
awkward = uproot.extras.awkward()
return awkward.to_backend(
self.expected_form.length_zero_array(highlevel=False),
backend=backend,
highlevel=True,
behavior=self.form_mapping_info.behavior,
)

def prepare_for_projection(self) -> tuple[AwkArray, TypeTracerReport, dict]:
awkward = uproot.extras.awkward()
dask_awkward = uproot.extras.dask_awkward()
Expand Down Expand Up @@ -1011,6 +1039,53 @@ def project_keys(self: T, keys: frozenset[str]) -> T:
raise NotImplementedError


def _report_failure(exception, call_time, *args, **kwargs):
awkward = uproot.extras.awkward()
return awkward.Array(
[
{
"call_time": call_time,
"duration": None,
"args": [repr(a) for a in args],
"kwargs": [[k, repr(v)] for k, v in kwargs.items()],
"exception": type(exception).__name__,
"message": str(exception),
"fqdn": socket.getfqdn(),
"hostname": socket.gethostname(),
}
]
)


def _report_success(duration, *args, **kwargs):
awkward = uproot.extras.awkward()
return awkward.Array(
[
{
"call_time": None,
"duration": duration,
"args": [repr(a) for a in args],
"kwargs": [[k, repr(v)] for k, v in kwargs.items()],
"exception": None,
"message": None,
"fqdn": None,
"hostname": None,
}
]
)


def with_duration(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
start = time.monotonic()
result = f(*args, **kwargs)
stop = time.monotonic()
return result, (stop - start)

return wrapper


class _UprootRead(UprootReadMixin):
def __init__(
self,
Expand All @@ -1020,13 +1095,15 @@ def __init__(
base_form: Form,
expected_form: Form,
form_mapping_info: ImplementsFormMappingInfo,
allow_read_errors_with_report: bool,
) -> None:
self.ttrees = ttrees
self.common_keys = frozenset(common_keys)
self.interp_options = interp_options
self.base_form = base_form
self.expected_form = expected_form
self.form_mapping_info = form_mapping_info
self.allow_read_errors_with_report = allow_read_errors_with_report

def project_keys(self: T, keys: frozenset[str]) -> T:
return _UprootRead(
Expand All @@ -1036,11 +1113,39 @@ def project_keys(self: T, keys: frozenset[str]) -> T:
self.base_form,
self.expected_form,
self.form_mapping_info,
self.allow_read_errors_with_report,
)

def __call__(self, i_start_stop) -> AwkArray:
def __call__(self, i_start_stop):
i, start, stop = i_start_stop
if self.return_report:
call_time = time.time_ns()
try:
result, duration = with_duration(self._call_impl)(i, start, stop)
return (
result,
_report_success(
duration,
self.ttrees[i],
start,
stop,
),
)
except self.allowed_exceptions as err:
return (
self.mock_empty(backend="cpu"),
_report_failure(
err,
call_time,
self.ttrees[i],
start,
stop,
),
)

return self._call_impl(i, start, stop)

def _call_impl(self, i, start, stop):
return self.read_tree(self.ttrees[i], start, stop)


Expand All @@ -1055,6 +1160,7 @@ def __init__(
base_form: Form,
expected_form: Form,
form_mapping_info: ImplementsFormMappingInfo,
allow_read_errors_with_report: bool,
) -> None:
self.custom_classes = custom_classes
self.allow_missing = allow_missing
Expand All @@ -1064,15 +1170,11 @@ def __init__(
self.base_form = base_form
self.expected_form = expected_form
self.form_mapping_info = form_mapping_info
self.allow_read_errors_with_report = allow_read_errors_with_report

def __call__(self, blockwise_args) -> AwkArray:
(
file_path,
object_path,
i_step_or_start,
n_steps_or_stop,
is_chunk,
) = blockwise_args
def _call_impl(
self, file_path, object_path, i_step_or_start, n_steps_or_stop, is_chunk
):
ttree = uproot._util.regularize_object_path(
file_path,
object_path,
Expand Down Expand Up @@ -1105,6 +1207,50 @@ def __call__(self, blockwise_args) -> AwkArray:

return self.read_tree(ttree, start, stop)

def __call__(self, blockwise_args):
(
file_path,
object_path,
i_step_or_start,
n_steps_or_stop,
is_chunk,
) = blockwise_args

if self.return_report:
call_time = time.time_ns()
try:
result, duration = with_duration(self._call_impl)(
file_path, object_path, i_step_or_start, n_steps_or_stop, is_chunk
)
return (
result,
_report_success(
duration,
file_path,
object_path,
i_step_or_start,
n_steps_or_stop,
is_chunk,
),
)
except self.allowed_exceptions as err:
return (
self.mock_empty(backend="cpu"),
_report_failure(
err,
call_time,
file_path,
object_path,
i_step_or_start,
n_steps_or_stop,
is_chunk,
),
)

return self._call_impl(
file_path, object_path, i_step_or_start, n_steps_or_stop, is_chunk
)

def project_keys(self: T, keys: frozenset[str]) -> T:
return _UprootOpenAndRead(
self.custom_classes,
Expand All @@ -1115,6 +1261,7 @@ def project_keys(self: T, keys: frozenset[str]) -> T:
self.base_form,
self.expected_form,
self.form_mapping_info,
self.allow_read_errors_with_report,
)


Expand Down Expand Up @@ -1151,6 +1298,7 @@ def _get_dak_array(
interp_options,
form_mapping,
steps_per_file,
allow_read_errors_with_report,
):
dask_awkward = uproot.extras.dask_awkward()
awkward = uproot.extras.awkward()
Expand Down Expand Up @@ -1306,15 +1454,18 @@ def real_filter_branch(branch):
else:
expected_form, form_mapping_info = form_mapping(base_form)

fn = _UprootRead(
ttrees,
common_keys,
interp_options,
base_form=base_form,
expected_form=expected_form,
form_mapping_info=form_mapping_info,
allow_read_errors_with_report=allow_read_errors_with_report,
)

return dask_awkward.from_map(
_UprootRead(
ttrees,
common_keys,
interp_options,
base_form=base_form,
expected_form=expected_form,
form_mapping_info=form_mapping_info,
),
fn,
partition_args,
divisions=tuple(divisions),
label="from-uproot",
Expand All @@ -1334,6 +1485,7 @@ def _get_dak_array_delay_open(
interp_options,
form_mapping,
steps_per_file,
allow_read_errors_with_report,
):
dask_awkward = uproot.extras.dask_awkward()
awkward = uproot.extras.awkward()
Expand Down Expand Up @@ -1396,17 +1548,20 @@ def _get_dak_array_delay_open(
else:
expected_form, form_mapping_info = form_mapping(base_form)

fn = _UprootOpenAndRead(
custom_classes,
allow_missing,
real_options,
common_keys,
interp_options,
base_form=base_form,
expected_form=expected_form,
form_mapping_info=form_mapping_info,
allow_read_errors_with_report=allow_read_errors_with_report,
)

return dask_awkward.from_map(
_UprootOpenAndRead(
custom_classes,
allow_missing,
real_options,
common_keys,
interp_options,
base_form=base_form,
expected_form=expected_form,
form_mapping_info=form_mapping_info,
),
fn,
partition_args,
divisions=None if divisions is None else tuple(divisions),
label="from-uproot",
Expand Down
25 changes: 25 additions & 0 deletions tests/test_1058_dask_awkward_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import pytest
import skhep_testdata

import uproot


dask = pytest.importorskip("dask")
dask_awkward = pytest.importorskip("dask_awkward")


def test_with_report():
test_path1 = skhep_testdata.data_path("uproot-Zmumu.root") + ":events"
test_path2 = skhep_testdata.data_path("uproot-Zmumu-uncompressed.root") + ":events"
test_path3 = "/some/file/that/doesnt/exist"
files = [test_path1, test_path2, test_path3]
collection, report = uproot.dask(
files,
library="ak",
open_files=False,
allow_read_errors_with_report=True,
)
_, creport = dask.compute(collection, report)
assert creport[0].exception is None # test_path1 is good
assert creport[1].exception is None # test_path2 is good
assert creport[2].exception == "FileNotFoundError" # test_path3 is a bad file

0 comments on commit 66d1feb

Please sign in to comment.