Skip to content

Commit

Permalink
Added additional information to exceptions (#134)
Browse files Browse the repository at this point in the history
* Added additional information to exceptions

* Updated implementation

Co-authored-by: Adam Kariv <[email protected]>
  • Loading branch information
roll and akariv authored May 26, 2020
1 parent 99f5215 commit f3dd15f
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 6 deletions.
3 changes: 2 additions & 1 deletion dataflows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .base import DataStream, DataStreamProcessor, schema_validator, ValidationError
from .base import ResourceWrapper, PackageWrapper
from .base import exceptions
from .base import Flow
from .processors import * # noqa
from .processors import * # noqa
1 change: 1 addition & 0 deletions dataflows/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from . import exceptions
from .datastream import DataStream
from .datastream_processor import DataStreamProcessor
from .resource_wrapper import ResourceWrapper
Expand Down
31 changes: 28 additions & 3 deletions dataflows/base/datastream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datapackage import Package
from tableschema.exceptions import CastError

from . import exceptions
from .datastream import DataStream
from .resource_wrapper import ResourceWrapper
from .schema_validator import schema_validator
Expand All @@ -26,11 +27,13 @@ def __init__(self):
self.stats = {}
self.source = None
self.datapackage = None
self.position = None

def __call__(self, source=None):
def __call__(self, source=None, position=None):
if source is None:
source = DataStream()
self.source = source
self.position = position
return self

def process_resource(self, resource: ResourceWrapper):
Expand Down Expand Up @@ -69,7 +72,18 @@ def func():
return func

def _process(self):
datastream = self.source._process()
try:
datastream = self.source._process()
except Exception as exception:
if not isinstance(exception, exceptions.ProcessorError):
error = exceptions.ProcessorError(
exception,
processor_name=self.source.__class__.__name__,
processor_object=self.source,
processor_position=self.source.position
)
raise error from exception
raise exception

self.datapackage = Package(descriptor=copy.deepcopy(datastream.dp.descriptor))
self.datapackage = self.process_datapackage(self.datapackage)
Expand All @@ -90,7 +104,18 @@ def process(self):
return ds.dp, ds.merge_stats()

def results(self, on_error=None):
ds = self._process()
try:
ds = self._process()
except Exception as exception:
if not isinstance(exception, exceptions.ProcessorError):
error = exceptions.ProcessorError(
exception,
processor_name=self.__class__.__name__,
processor_object=self,
processor_position=self.position
)
raise error from exception
raise exception
results = [
list(schema_validator(res.res, res, on_error=on_error))
for res in ds.res_iter
Expand Down
12 changes: 12 additions & 0 deletions dataflows/base/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class DataflowsException(Exception):
pass


class ProcessorError(DataflowsException):

def __init__(self, cause, *, processor_name, processor_object, processor_position):
self.cause = cause
self.processor_name = processor_name
self.processor_object = processor_object
self.processor_position = processor_position
super().__init__(str(cause))
4 changes: 2 additions & 2 deletions dataflows/base/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ def _preprocess_chain(self):
def _chain(self, ds=None):
from ..helpers import datapackage_processor, rows_processor, row_processor, iterable_loader

for link in self._preprocess_chain():
for position, link in enumerate(self._preprocess_chain(), start=1):
if isinstance(link, Flow):
ds = link._chain(ds)
elif isinstance(link, DataStreamProcessor):
ds = link(ds)
ds = link(ds, position=position)
elif isfunction(link):
sig = signature(link)
params = list(sig.parameters)
Expand Down
42 changes: 42 additions & 0 deletions tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1596,6 +1596,7 @@ def test_force_temporal_format():
}
]]


# Extract missing values

def test_extract_missing_values():
Expand Down Expand Up @@ -1722,6 +1723,47 @@ def test_conditional():
dict(a=i, c=i) for i in range(3)
]

def test_exception_information():
from dataflows import load, exceptions
flow = Flow(
load('data/bad-path1.csv'),
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
data = flow.results()
assert str(excinfo.value.cause) == "[Errno 2] No such file or directory: 'data/bad-path1.csv'"
assert excinfo.value.processor_name == 'load'
assert excinfo.value.processor_object.load_source == 'data/bad-path1.csv'
assert excinfo.value.processor_position == 1


def test_exception_information_multiple_processors():
from dataflows import load, exceptions
flow = Flow(
load('data/bad-path1.csv'),
load('data/bad-path2.csv'),
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
data = flow.results()
assert str(excinfo.value.cause) == "[Errno 2] No such file or directory: 'data/bad-path1.csv'"
assert excinfo.value.processor_name == 'load'
assert excinfo.value.processor_object.load_source == 'data/bad-path1.csv'
assert excinfo.value.processor_position == 1


def test_exception_information_multiple_processors_last_errored():
from dataflows import load, exceptions
flow = Flow(
load('data/academy.csv'),
load('data/bad-path2.csv'),
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
data = flow.results()
assert str(excinfo.value.cause) == "[Errno 2] No such file or directory: 'data/bad-path2.csv'"
assert excinfo.value.processor_name == 'load'
assert excinfo.value.processor_object.load_source == 'data/bad-path2.csv'
assert excinfo.value.processor_position == 2


def test_finalizer():
from dataflows import Flow, finalizer

Expand Down

0 comments on commit f3dd15f

Please sign in to comment.