Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added additional information to exceptions #134

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dataflows/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .base import DataStream, DataStreamProcessor, schema_validator, ValidationError
from .base import ResourceWrapper, PackageWrapper
from .base import exceptions
from .base import Flow
from .processors import * # noqa
from .processors import * # noqa
1 change: 1 addition & 0 deletions dataflows/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from . import exceptions
from .datastream import DataStream
from .datastream_processor import DataStreamProcessor
from .resource_wrapper import ResourceWrapper
Expand Down
31 changes: 28 additions & 3 deletions dataflows/base/datastream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datapackage import Package
from tableschema.exceptions import CastError

from . import exceptions
from .datastream import DataStream
from .resource_wrapper import ResourceWrapper
from .schema_validator import schema_validator
Expand All @@ -26,11 +27,13 @@ def __init__(self):
self.stats = {}
self.source = None
self.datapackage = None
self.position = None

def __call__(self, source=None):
def __call__(self, source=None, position=None):
if source is None:
source = DataStream()
self.source = source
self.position = position
return self

def process_resource(self, resource: ResourceWrapper):
Expand Down Expand Up @@ -69,7 +72,18 @@ def func():
return func

def _process(self):
datastream = self.source._process()
try:
datastream = self.source._process()
except Exception as exception:
if not isinstance(exception, exceptions.ProcessorError):
error = exceptions.ProcessorError(
exception,
processor_name=self.source.__class__.__name__,
processor_object=self.source,
processor_position=self.source.position
)
raise error from exception
raise exception

self.datapackage = Package(descriptor=copy.deepcopy(datastream.dp.descriptor))
self.datapackage = self.process_datapackage(self.datapackage)
Expand All @@ -90,7 +104,18 @@ def process(self):
return ds.dp, ds.merge_stats()

def results(self, on_error=None):
ds = self._process()
try:
ds = self._process()
except Exception as exception:
if not isinstance(exception, exceptions.ProcessorError):
error = exceptions.ProcessorError(
exception,
processor_name=self.__class__.__name__,
processor_object=self,
processor_position=self.position
)
raise error from exception
raise exception
results = [
list(schema_validator(res.res, res, on_error=on_error))
for res in ds.res_iter
Expand Down
12 changes: 12 additions & 0 deletions dataflows/base/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
class DataflowsException(Exception):
pass


class ProcessorError(DataflowsException):

def __init__(self, cause, *, processor_name, processor_object, processor_position):
self.cause = cause
self.processor_name = processor_name
self.processor_object = processor_object
self.processor_position = processor_position
super().__init__(str(cause))
4 changes: 2 additions & 2 deletions dataflows/base/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ def _preprocess_chain(self):
def _chain(self, ds=None):
from ..helpers import datapackage_processor, rows_processor, row_processor, iterable_loader

for link in self._preprocess_chain():
for position, link in enumerate(self._preprocess_chain(), start=1):
if isinstance(link, Flow):
ds = link._chain(ds)
elif isinstance(link, DataStreamProcessor):
ds = link(ds)
ds = link(ds, position=position)
elif isfunction(link):
sig = signature(link)
params = list(sig.parameters)
Expand Down
42 changes: 42 additions & 0 deletions tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1596,6 +1596,7 @@ def test_force_temporal_format():
}
]]


# Extract missing values

def test_extract_missing_values():
Expand Down Expand Up @@ -1722,6 +1723,47 @@ def test_conditional():
dict(a=i, c=i) for i in range(3)
]

def test_exception_information():
from dataflows import load, exceptions
flow = Flow(
load('data/bad-path1.csv'),
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
data = flow.results()
assert str(excinfo.value.cause) == "[Errno 2] No such file or directory: 'data/bad-path1.csv'"
assert excinfo.value.processor_name == 'load'
assert excinfo.value.processor_object.load_source == 'data/bad-path1.csv'
assert excinfo.value.processor_position == 1


def test_exception_information_multiple_processors():
from dataflows import load, exceptions
flow = Flow(
load('data/bad-path1.csv'),
load('data/bad-path2.csv'),
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
data = flow.results()
assert str(excinfo.value.cause) == "[Errno 2] No such file or directory: 'data/bad-path1.csv'"
assert excinfo.value.processor_name == 'load'
assert excinfo.value.processor_object.load_source == 'data/bad-path1.csv'
assert excinfo.value.processor_position == 1


def test_exception_information_multiple_processors_last_errored():
from dataflows import load, exceptions
flow = Flow(
load('data/academy.csv'),
load('data/bad-path2.csv'),
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
data = flow.results()
assert str(excinfo.value.cause) == "[Errno 2] No such file or directory: 'data/bad-path2.csv'"
assert excinfo.value.processor_name == 'load'
assert excinfo.value.processor_object.load_source == 'data/bad-path2.csv'
assert excinfo.value.processor_position == 2


def test_finalizer():
from dataflows import Flow, finalizer

Expand Down