diff --git a/dataflows/__init__.py b/dataflows/__init__.py index 59c9513..7040c70 100644 --- a/dataflows/__init__.py +++ b/dataflows/__init__.py @@ -1,4 +1,5 @@ from .base import DataStream, DataStreamProcessor, schema_validator, ValidationError from .base import ResourceWrapper, PackageWrapper +from .base import exceptions from .base import Flow -from .processors import * # noqa +from .processors import * # noqa diff --git a/dataflows/base/__init__.py b/dataflows/base/__init__.py index 6b0b815..73b652b 100644 --- a/dataflows/base/__init__.py +++ b/dataflows/base/__init__.py @@ -1,3 +1,4 @@ +from . import exceptions from .datastream import DataStream from .datastream_processor import DataStreamProcessor from .resource_wrapper import ResourceWrapper diff --git a/dataflows/base/datastream_processor.py b/dataflows/base/datastream_processor.py index ae71133..befa9e4 100644 --- a/dataflows/base/datastream_processor.py +++ b/dataflows/base/datastream_processor.py @@ -6,6 +6,7 @@ from datapackage import Package from tableschema.exceptions import CastError +from . import exceptions from .datastream import DataStream from .resource_wrapper import ResourceWrapper from .schema_validator import schema_validator @@ -26,11 +27,13 @@ def __init__(self): self.stats = {} self.source = None self.datapackage = None + self.position = None - def __call__(self, source=None): + def __call__(self, source=None, position=None): if source is None: source = DataStream() self.source = source + self.position = position return self def process_resource(self, resource: ResourceWrapper): @@ -69,7 +72,18 @@ def func(): return func def _process(self): - datastream = self.source._process() + try: + datastream = self.source._process() + except Exception as exception: + if not isinstance(exception, exceptions.ProcessorError): + error = exceptions.ProcessorError( + exception, + processor_name=self.source.__class__.__name__, + processor_object=self.source, + processor_position=self.source.position + ) + raise error from exception + raise exception self.datapackage = Package(descriptor=copy.deepcopy(datastream.dp.descriptor)) self.datapackage = self.process_datapackage(self.datapackage) @@ -90,7 +104,18 @@ def process(self): return ds.dp, ds.merge_stats() def results(self, on_error=None): - ds = self._process() + try: + ds = self._process() + except Exception as exception: + if not isinstance(exception, exceptions.ProcessorError): + error = exceptions.ProcessorError( + exception, + processor_name=self.__class__.__name__, + processor_object=self, + processor_position=self.position + ) + raise error from exception + raise exception results = [ list(schema_validator(res.res, res, on_error=on_error)) for res in ds.res_iter diff --git a/dataflows/base/exceptions.py b/dataflows/base/exceptions.py new file mode 100644 index 0000000..08a0c19 --- /dev/null +++ b/dataflows/base/exceptions.py @@ -0,0 +1,12 @@ +class DataflowsException(Exception): + pass + + +class ProcessorError(DataflowsException): + + def __init__(self, cause, *, processor_name, processor_object, processor_position): + self.cause = cause + self.processor_name = processor_name + self.processor_object = processor_object + self.processor_position = processor_position + super().__init__(str(cause)) diff --git a/dataflows/base/flow.py b/dataflows/base/flow.py index e2a8225..f17ad4d 100644 --- a/dataflows/base/flow.py +++ b/dataflows/base/flow.py @@ -29,11 +29,11 @@ def _preprocess_chain(self): def _chain(self, ds=None): from ..helpers import datapackage_processor, rows_processor, row_processor, iterable_loader - for link in self._preprocess_chain(): + for position, link in enumerate(self._preprocess_chain(), start=1): if isinstance(link, Flow): ds = link._chain(ds) elif isinstance(link, DataStreamProcessor): - ds = link(ds) + ds = link(ds, position=position) elif isfunction(link): sig = signature(link) params = list(sig.parameters) diff --git a/tests/test_lib.py b/tests/test_lib.py index 61a419f..f33e237 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -1596,6 +1596,7 @@ def test_force_temporal_format(): } ]] + # Extract missing values def test_extract_missing_values(): @@ -1722,6 +1723,47 @@ def test_conditional(): dict(a=i, c=i) for i in range(3) ] +def test_exception_information(): + from dataflows import load, exceptions + flow = Flow( + load('data/bad-path1.csv'), + ) + with pytest.raises(exceptions.ProcessorError) as excinfo: + data = flow.results() + assert str(excinfo.value.cause) == "[Errno 2] No such file or directory: 'data/bad-path1.csv'" + assert excinfo.value.processor_name == 'load' + assert excinfo.value.processor_object.load_source == 'data/bad-path1.csv' + assert excinfo.value.processor_position == 1 + + +def test_exception_information_multiple_processors(): + from dataflows import load, exceptions + flow = Flow( + load('data/bad-path1.csv'), + load('data/bad-path2.csv'), + ) + with pytest.raises(exceptions.ProcessorError) as excinfo: + data = flow.results() + assert str(excinfo.value.cause) == "[Errno 2] No such file or directory: 'data/bad-path1.csv'" + assert excinfo.value.processor_name == 'load' + assert excinfo.value.processor_object.load_source == 'data/bad-path1.csv' + assert excinfo.value.processor_position == 1 + + +def test_exception_information_multiple_processors_last_errored(): + from dataflows import load, exceptions + flow = Flow( + load('data/academy.csv'), + load('data/bad-path2.csv'), + ) + with pytest.raises(exceptions.ProcessorError) as excinfo: + data = flow.results() + assert str(excinfo.value.cause) == "[Errno 2] No such file or directory: 'data/bad-path2.csv'" + assert excinfo.value.processor_name == 'load' + assert excinfo.value.processor_object.load_source == 'data/bad-path2.csv' + assert excinfo.value.processor_position == 2 + + def test_finalizer(): from dataflows import Flow, finalizer