From 23575874eb7b9fc9cca5cb07d87b05a748ad3b2d Mon Sep 17 00:00:00 2001 From: roll Date: Thu, 23 Apr 2020 11:01:28 +0300 Subject: [PATCH 1/2] Added additional information to exceptions --- dataflows/base/datastream_processor.py | 13 +++++++++++-- dataflows/base/flow.py | 4 ++-- tests/test_lib.py | 14 ++++++++++++++ 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/dataflows/base/datastream_processor.py b/dataflows/base/datastream_processor.py index ae71133..9854be7 100644 --- a/dataflows/base/datastream_processor.py +++ b/dataflows/base/datastream_processor.py @@ -26,11 +26,13 @@ def __init__(self): self.stats = {} self.source = None self.datapackage = None + self.position = None - def __call__(self, source=None): + def __call__(self, source=None, position=None): if source is None: source = DataStream() self.source = source + self.position = position return self def process_resource(self, resource: ResourceWrapper): @@ -69,7 +71,14 @@ def func(): return func def _process(self): - datastream = self.source._process() + try: + datastream = self.source._process() + except Exception as exception: + if (not hasattr(exception, 'processorName')): + exception.processorName = self.__class__.__name__ + exception.processorObject = self + exception.processorPosition = self.position + raise exception self.datapackage = Package(descriptor=copy.deepcopy(datastream.dp.descriptor)) self.datapackage = self.process_datapackage(self.datapackage) diff --git a/dataflows/base/flow.py b/dataflows/base/flow.py index e2a8225..f17ad4d 100644 --- a/dataflows/base/flow.py +++ b/dataflows/base/flow.py @@ -29,11 +29,11 @@ def _preprocess_chain(self): def _chain(self, ds=None): from ..helpers import datapackage_processor, rows_processor, row_processor, iterable_loader - for link in self._preprocess_chain(): + for position, link in enumerate(self._preprocess_chain(), start=1): if isinstance(link, Flow): ds = link._chain(ds) elif isinstance(link, DataStreamProcessor): - ds = link(ds) + ds = link(ds, position=position) elif isfunction(link): sig = signature(link) params = list(sig.parameters) diff --git a/tests/test_lib.py b/tests/test_lib.py index c6b119d..97e1ff0 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -1532,6 +1532,7 @@ def test_force_temporal_format(): } ]] + # Extract missing values def test_extract_missing_values(): @@ -1657,3 +1658,16 @@ def test_conditional(): assert result2[0] == [ dict(a=i, c=i) for i in range(3) ] + + +def test_exception_information(): + from dataflows import load + flow = Flow( + load('data/bad-path1.csv'), + load('data/bad-path2.csv'), + ) + with pytest.raises(Exception) as excinfo: + data = flow.results() + assert excinfo.value.processorName == 'load' + assert excinfo.value.processorObject.load_source == 'data/bad-path2.csv' + assert excinfo.value.processorPosition == 2 From 93c5ed78900a6ad4d4f3a9c8d85eb9e276120c66 Mon Sep 17 00:00:00 2001 From: roll Date: Mon, 25 May 2020 16:50:22 +0300 Subject: [PATCH 2/2] Updated implementation --- dataflows/__init__.py | 3 +- dataflows/base/__init__.py | 1 + dataflows/base/datastream_processor.py | 26 ++++++++++++++---- dataflows/base/exceptions.py | 12 ++++++++ tests/test_lib.py | 38 ++++++++++++++++++++++---- 5 files changed, 69 insertions(+), 11 deletions(-) create mode 100644 dataflows/base/exceptions.py diff --git a/dataflows/__init__.py b/dataflows/__init__.py index 59c9513..7040c70 100644 --- a/dataflows/__init__.py +++ b/dataflows/__init__.py @@ -1,4 +1,5 @@ from .base import DataStream, DataStreamProcessor, schema_validator, ValidationError from .base import ResourceWrapper, PackageWrapper +from .base import exceptions from .base import Flow -from .processors import * # noqa +from .processors import * # noqa diff --git a/dataflows/base/__init__.py b/dataflows/base/__init__.py index 6b0b815..73b652b 100644 --- a/dataflows/base/__init__.py +++ b/dataflows/base/__init__.py @@ -1,3 +1,4 @@ +from . import exceptions from .datastream import DataStream from .datastream_processor import DataStreamProcessor from .resource_wrapper import ResourceWrapper diff --git a/dataflows/base/datastream_processor.py b/dataflows/base/datastream_processor.py index 9854be7..befa9e4 100644 --- a/dataflows/base/datastream_processor.py +++ b/dataflows/base/datastream_processor.py @@ -6,6 +6,7 @@ from datapackage import Package from tableschema.exceptions import CastError +from . import exceptions from .datastream import DataStream from .resource_wrapper import ResourceWrapper from .schema_validator import schema_validator @@ -74,10 +75,14 @@ def _process(self): try: datastream = self.source._process() except Exception as exception: - if (not hasattr(exception, 'processorName')): - exception.processorName = self.__class__.__name__ - exception.processorObject = self - exception.processorPosition = self.position + if not isinstance(exception, exceptions.ProcessorError): + error = exceptions.ProcessorError( + exception, + processor_name=self.source.__class__.__name__, + processor_object=self.source, + processor_position=self.source.position + ) + raise error from exception raise exception self.datapackage = Package(descriptor=copy.deepcopy(datastream.dp.descriptor)) @@ -99,7 +104,18 @@ def process(self): return ds.dp, ds.merge_stats() def results(self, on_error=None): - ds = self._process() + try: + ds = self._process() + except Exception as exception: + if not isinstance(exception, exceptions.ProcessorError): + error = exceptions.ProcessorError( + exception, + processor_name=self.__class__.__name__, + processor_object=self, + processor_position=self.position + ) + raise error from exception + raise exception results = [ list(schema_validator(res.res, res, on_error=on_error)) for res in ds.res_iter diff --git a/dataflows/base/exceptions.py b/dataflows/base/exceptions.py new file mode 100644 index 0000000..08a0c19 --- /dev/null +++ b/dataflows/base/exceptions.py @@ -0,0 +1,12 @@ +class DataflowsException(Exception): + pass + + +class ProcessorError(DataflowsException): + + def __init__(self, cause, *, processor_name, processor_object, processor_position): + self.cause = cause + self.processor_name = processor_name + self.processor_object = processor_object + self.processor_position = processor_position + super().__init__(str(cause)) diff --git a/tests/test_lib.py b/tests/test_lib.py index 97e1ff0..2ab6f61 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -1661,13 +1661,41 @@ def test_conditional(): def test_exception_information(): - from dataflows import load + from dataflows import load, exceptions + flow = Flow( + load('data/bad-path1.csv'), + ) + with pytest.raises(exceptions.ProcessorError) as excinfo: + data = flow.results() + assert str(excinfo.value.cause) == "[Errno 2] No such file or directory: 'data/bad-path1.csv'" + assert excinfo.value.processor_name == 'load' + assert excinfo.value.processor_object.load_source == 'data/bad-path1.csv' + assert excinfo.value.processor_position == 1 + + +def test_exception_information_multiple_processors(): + from dataflows import load, exceptions flow = Flow( load('data/bad-path1.csv'), load('data/bad-path2.csv'), ) - with pytest.raises(Exception) as excinfo: + with pytest.raises(exceptions.ProcessorError) as excinfo: + data = flow.results() + assert str(excinfo.value.cause) == "[Errno 2] No such file or directory: 'data/bad-path1.csv'" + assert excinfo.value.processor_name == 'load' + assert excinfo.value.processor_object.load_source == 'data/bad-path1.csv' + assert excinfo.value.processor_position == 1 + + +def test_exception_information_multiple_processors_last_errored(): + from dataflows import load, exceptions + flow = Flow( + load('data/academy.csv'), + load('data/bad-path2.csv'), + ) + with pytest.raises(exceptions.ProcessorError) as excinfo: data = flow.results() - assert excinfo.value.processorName == 'load' - assert excinfo.value.processorObject.load_source == 'data/bad-path2.csv' - assert excinfo.value.processorPosition == 2 + assert str(excinfo.value.cause) == "[Errno 2] No such file or directory: 'data/bad-path2.csv'" + assert excinfo.value.processor_name == 'load' + assert excinfo.value.processor_object.load_source == 'data/bad-path2.csv' + assert excinfo.value.processor_position == 2