Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sheets parameter for the load processor #138

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ DataFlows comes with a few built-in processors which do most of the heavy liftin
- **dump_to_sql** - Store the results in a relational database (creates one or more tables or updates existing tables)

### Flow Control
- **conditional** - Run parts of the flow based on the structure of the datapackage at the calling point
- **conditional** - Run parts of the flow based on the structure of the datapackage at the calling point
- **finalizer** - Call a function when all data had been processed
- **checkpoint** - Cache results of a subflow in a datapackage and load it upon request

Expand Down Expand Up @@ -66,6 +66,7 @@ def load(source, name=None, resources=None, strip=True, limit_rows=None,
- A list of resource names to load
- `None` indicates to load all resources
- The index of the resource in the package
- `sheets` - REGEX. For the Excel format it's possible to pass the `sheet` option to `tabulator` to open the exact Excel sheet. Dataflows supports also the `sheets` option allowing to load multiple Excel sheets at once if their names match the given regex.
- `options` - based on the loaded file, extra options (e.g. `sheet` for Excel files etc., see the link to tabulator above)

Relevant only when _not_ loading data from a datapackage:
Expand Down
Binary file added data/sheets.xlsx
Binary file not shown.
108 changes: 69 additions & 39 deletions dataflows/processors/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import warnings
import datetime

import re
import tabulator
from copy import deepcopy
from slugify import slugify
from datapackage import Package
from tabulator import Stream
from tabulator.parser import Parser
Expand Down Expand Up @@ -208,52 +212,78 @@ def safe_process_datapackage(self, dp: Package):
self.resource_descriptors.append(resource.descriptor)
self.iterators.append(resource.iter(keyed=True, cast=True))

# Loading multiple excel sheets
elif self.options.get('sheets'):
sheets = self.options.pop('sheets')
pattern = re.compile(sheets)
self.options['workbook_cache'] = {}
self.options['sheet'] = 0
try:
while True:
self.options['sheet'] += 1
descriptor, stream = self.get_resource()
if re.search(pattern, stream.fragment):
descriptor['name'] = slugify(stream.fragment, to_lower=True)
descriptor['path'] = '.'.join([descriptor['name'], stream.format])
self.resource_descriptors.append(descriptor)
self.iterators.append(stream.iter(keyed=True))
except tabulator.exceptions.SourceError:
pass
if not self.resource_descriptors:
message = 'No sheets found for the regex "%s"'
raise RuntimeError(message % sheets)

# Loading for any other source
else:
path = os.path.basename(self.load_source)
path = os.path.splitext(path)[0]
descriptor = dict(path=self.name or path,
profile='tabular-data-resource')
descriptor, stream = self.get_resource()
self.resource_descriptors.append(descriptor)
descriptor['name'] = self.name or path
if 'encoding' in self.options:
descriptor['encoding'] = self.options['encoding']
self.options.setdefault('custom_parsers', {}).setdefault('xml', XMLParser)
self.options.setdefault('ignore_blank_headers', True)
self.options.setdefault('headers', 1)
stream: Stream = Stream(self.load_source, **self.options).open()
if len(stream.headers) != len(set(stream.headers)):
if not self.deduplicate_headers:
raise ValueError(
'Found duplicate headers.' +
'Use the `deduplicate_headers` flag (found headers=%r)' % stream.headers)
stream.headers = self.rename_duplicate_headers(stream.headers)
schema = Schema().infer(
stream.sample, headers=stream.headers,
confidence=1, guesser_cls=self.guesser)
if self.override_schema:
schema.update(self.override_schema)
if self.override_fields:
fields = schema.get('fields', [])
for field in fields:
field.update(self.override_fields.get(field['name'], {}))
if self.extract_missing_values:
missing_values = schema.get('missingValues', [])
if not self.extract_missing_values['values']:
self.extract_missing_values['values'] = missing_values
schema['fields'].append({
'name': self.extract_missing_values['target'],
'type': 'object',
'format': 'default',
'values': self.extract_missing_values['values'],
})
descriptor['schema'] = schema
descriptor['format'] = self.options.get('format', stream.format)
descriptor['path'] += '.{}'.format(stream.format)
self.iterators.append(stream.iter(keyed=True))

dp.descriptor.setdefault('resources', []).extend(self.resource_descriptors)
return dp

def get_resource(self):
path = os.path.basename(self.load_source)
path = os.path.splitext(path)[0]
descriptor = dict(path=self.name or path,
profile='tabular-data-resource')
descriptor['name'] = self.name or path
if 'encoding' in self.options:
descriptor['encoding'] = self.options['encoding']
self.options.setdefault('custom_parsers', {}).setdefault('xml', XMLParser)
self.options.setdefault('ignore_blank_headers', True)
self.options.setdefault('headers', 1)
stream: Stream = Stream(self.load_source, **self.options).open()
if len(stream.headers) != len(set(stream.headers)):
if not self.deduplicate_headers:
raise ValueError(
'Found duplicate headers.' +
'Use the `deduplicate_headers` flag (found headers=%r)' % stream.headers)
stream.headers = self.rename_duplicate_headers(stream.headers)
schema = Schema().infer(
stream.sample, headers=stream.headers,
confidence=1, guesser_cls=self.guesser)
if self.override_schema:
schema.update(self.override_schema)
if self.override_fields:
fields = schema.get('fields', [])
for field in fields:
field.update(self.override_fields.get(field['name'], {}))
if self.extract_missing_values:
missing_values = schema.get('missingValues', [])
if not self.extract_missing_values['values']:
self.extract_missing_values['values'] = missing_values
schema['fields'].append({
'name': self.extract_missing_values['target'],
'type': 'object',
'format': 'default',
'values': self.extract_missing_values['values'],
})
descriptor['schema'] = schema
descriptor['format'] = self.options.get('format', stream.format)
descriptor['path'] += '.{}'.format(stream.format)
return (descriptor, stream)

def stripper(self, iterator):
for r in iterator:
yield dict(
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def read(*paths):
PACKAGE = 'dataflows'
NAME = PACKAGE.replace('_', '-')
INSTALL_REQUIRES = [
'tabulator>=1.38.4',
'tabulator>=1.49',
'datapackage>=1.5.0',
'tableschema>=1.5',
'kvfile>=0.0.8',
Expand Down
86 changes: 86 additions & 0 deletions tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1743,3 +1743,89 @@ def finalize():

assert stats['processed'] == 10
assert stats['detected'] == 10


# Excel sheets loading

def test_load_excel_sheet_default():
from dataflows import load
flow = Flow(
load('data/sheets.xlsx'),
)
data, package, stats = flow.results()
assert len(package.descriptor['resources']) == 1
assert data == [
[{'id': 1, 'name': 'london'}],
]


def test_load_excel_sheet_by_number():
from dataflows import load
flow = Flow(
load('data/sheets.xlsx', sheet=2),
)
data, package, stats = flow.results()
assert len(package.descriptor['resources']) == 1
assert data == [
[{'id': 2, 'name': 'paris'}],
]


def test_load_excel_sheet_by_name():
from dataflows import load
flow = Flow(
load('data/sheets.xlsx', sheet='Sheet3'),
)
data, package, stats = flow.results()
assert len(package.descriptor['resources']) == 1
assert data == [
[{'id': 3, 'name': 'rome'}],
]


def test_load_excel_sheets_all():
from dataflows import load
flow = Flow(
load('data/sheets.xlsx', sheets='.*'),
)
data, package, stats = flow.results()
assert len(package.descriptor['resources']) == 3
print(package.descriptor['resources'][0])
assert package.descriptor['resources'][0]['name'] == 'sheet1'
assert package.descriptor['resources'][1]['name'] == 'sheet2'
assert package.descriptor['resources'][2]['name'] == 'sheet3'
assert package.descriptor['resources'][0]['path'] == 'sheet1.xlsx'
assert package.descriptor['resources'][1]['path'] == 'sheet2.xlsx'
assert package.descriptor['resources'][2]['path'] == 'sheet3.xlsx'
assert data == [
[{'id': 1, 'name': 'london'}],
[{'id': 2, 'name': 'paris'}],
[{'id': 3, 'name': 'rome'}],
]


def test_load_excel_sheets_matching():
from dataflows import load
flow = Flow(
load('data/sheets.xlsx', sheets='Sheet[1,3]'),
)
data, package, stats = flow.results()
assert len(package.descriptor['resources']) == 2
assert package.descriptor['resources'][0]['name'] == 'sheet1'
assert package.descriptor['resources'][1]['name'] == 'sheet3'
assert package.descriptor['resources'][0]['path'] == 'sheet1.xlsx'
assert package.descriptor['resources'][1]['path'] == 'sheet3.xlsx'
assert data == [
[{'id': 1, 'name': 'london'}],
[{'id': 3, 'name': 'rome'}],
]


def test_load_excel_sheets_not_found():
from dataflows import load, exceptions
flow = Flow(
load('data/sheets.xlsx', sheets='Sheet[4]'),
)
with pytest.raises(exceptions.ProcessorError) as excinfo:
data, package, stats = flow.results()
assert 'No sheets found' in str(excinfo.value)