diff --git a/PROCESSORS.md b/PROCESSORS.md index 8ee2244..2282abb 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -11,7 +11,7 @@ DataFlows comes with a few built-in processors which do most of the heavy liftin - **dump_to_sql** - Store the results in a relational database (creates one or more tables or updates existing tables) ### Flow Control -- **conditional** - Run parts of the flow based on the structure of the datapackage at the calling point +- **conditional** - Run parts of the flow based on the structure of the datapackage at the calling point - **finalizer** - Call a function when all data had been processed - **checkpoint** - Cache results of a subflow in a datapackage and load it upon request @@ -66,6 +66,7 @@ def load(source, name=None, resources=None, strip=True, limit_rows=None, - A list of resource names to load - `None` indicates to load all resources - The index of the resource in the package +- `sheets` - REGEX. For the Excel format it's possible to pass the `sheet` option to `tabulator` to open the exact Excel sheet. Dataflows supports also the `sheets` option allowing to load multiple Excel sheets at once if their names match the given regex. - `options` - based on the loaded file, extra options (e.g. `sheet` for Excel files etc., see the link to tabulator above) Relevant only when _not_ loading data from a datapackage: diff --git a/data/sheets.xlsx b/data/sheets.xlsx new file mode 100644 index 0000000..7cb86ad Binary files /dev/null and b/data/sheets.xlsx differ diff --git a/dataflows/processors/load.py b/dataflows/processors/load.py index cbd351f..89f45ce 100644 --- a/dataflows/processors/load.py +++ b/dataflows/processors/load.py @@ -2,6 +2,10 @@ import warnings import datetime +import re +import tabulator +from copy import deepcopy +from slugify import slugify from datapackage import Package from tabulator import Stream from tabulator.parser import Parser @@ -208,52 +212,78 @@ def safe_process_datapackage(self, dp: Package): self.resource_descriptors.append(resource.descriptor) self.iterators.append(resource.iter(keyed=True, cast=True)) + # Loading multiple excel sheets + elif self.options.get('sheets'): + sheets = self.options.pop('sheets') + pattern = re.compile(sheets) + self.options['workbook_cache'] = {} + self.options['sheet'] = 0 + try: + while True: + self.options['sheet'] += 1 + descriptor, stream = self.get_resource() + if re.search(pattern, stream.fragment): + descriptor['name'] = slugify(stream.fragment, to_lower=True) + descriptor['path'] = '.'.join([descriptor['name'], stream.format]) + self.resource_descriptors.append(descriptor) + self.iterators.append(stream.iter(keyed=True)) + except tabulator.exceptions.SourceError: + pass + if not self.resource_descriptors: + message = 'No sheets found for the regex "%s"' + raise RuntimeError(message % sheets) + # Loading for any other source else: - path = os.path.basename(self.load_source) - path = os.path.splitext(path)[0] - descriptor = dict(path=self.name or path, - profile='tabular-data-resource') + descriptor, stream = self.get_resource() self.resource_descriptors.append(descriptor) - descriptor['name'] = self.name or path - if 'encoding' in self.options: - descriptor['encoding'] = self.options['encoding'] - self.options.setdefault('custom_parsers', {}).setdefault('xml', XMLParser) - self.options.setdefault('ignore_blank_headers', True) - self.options.setdefault('headers', 1) - stream: Stream = Stream(self.load_source, **self.options).open() - if len(stream.headers) != len(set(stream.headers)): - if not self.deduplicate_headers: - raise ValueError( - 'Found duplicate headers.' + - 'Use the `deduplicate_headers` flag (found headers=%r)' % stream.headers) - stream.headers = self.rename_duplicate_headers(stream.headers) - schema = Schema().infer( - stream.sample, headers=stream.headers, - confidence=1, guesser_cls=self.guesser) - if self.override_schema: - schema.update(self.override_schema) - if self.override_fields: - fields = schema.get('fields', []) - for field in fields: - field.update(self.override_fields.get(field['name'], {})) - if self.extract_missing_values: - missing_values = schema.get('missingValues', []) - if not self.extract_missing_values['values']: - self.extract_missing_values['values'] = missing_values - schema['fields'].append({ - 'name': self.extract_missing_values['target'], - 'type': 'object', - 'format': 'default', - 'values': self.extract_missing_values['values'], - }) - descriptor['schema'] = schema - descriptor['format'] = self.options.get('format', stream.format) - descriptor['path'] += '.{}'.format(stream.format) self.iterators.append(stream.iter(keyed=True)) + dp.descriptor.setdefault('resources', []).extend(self.resource_descriptors) return dp + def get_resource(self): + path = os.path.basename(self.load_source) + path = os.path.splitext(path)[0] + descriptor = dict(path=self.name or path, + profile='tabular-data-resource') + descriptor['name'] = self.name or path + if 'encoding' in self.options: + descriptor['encoding'] = self.options['encoding'] + self.options.setdefault('custom_parsers', {}).setdefault('xml', XMLParser) + self.options.setdefault('ignore_blank_headers', True) + self.options.setdefault('headers', 1) + stream: Stream = Stream(self.load_source, **self.options).open() + if len(stream.headers) != len(set(stream.headers)): + if not self.deduplicate_headers: + raise ValueError( + 'Found duplicate headers.' + + 'Use the `deduplicate_headers` flag (found headers=%r)' % stream.headers) + stream.headers = self.rename_duplicate_headers(stream.headers) + schema = Schema().infer( + stream.sample, headers=stream.headers, + confidence=1, guesser_cls=self.guesser) + if self.override_schema: + schema.update(self.override_schema) + if self.override_fields: + fields = schema.get('fields', []) + for field in fields: + field.update(self.override_fields.get(field['name'], {})) + if self.extract_missing_values: + missing_values = schema.get('missingValues', []) + if not self.extract_missing_values['values']: + self.extract_missing_values['values'] = missing_values + schema['fields'].append({ + 'name': self.extract_missing_values['target'], + 'type': 'object', + 'format': 'default', + 'values': self.extract_missing_values['values'], + }) + descriptor['schema'] = schema + descriptor['format'] = self.options.get('format', stream.format) + descriptor['path'] += '.{}'.format(stream.format) + return (descriptor, stream) + def stripper(self, iterator): for r in iterator: yield dict( diff --git a/setup.py b/setup.py index e5cd31b..227f283 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ def read(*paths): PACKAGE = 'dataflows' NAME = PACKAGE.replace('_', '-') INSTALL_REQUIRES = [ - 'tabulator>=1.38.4', + 'tabulator>=1.49', 'datapackage>=1.5.0', 'tableschema>=1.5', 'kvfile>=0.0.8', diff --git a/tests/test_lib.py b/tests/test_lib.py index f382f59..2e14080 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -1743,3 +1743,89 @@ def finalize(): assert stats['processed'] == 10 assert stats['detected'] == 10 + + +# Excel sheets loading + +def test_load_excel_sheet_default(): + from dataflows import load + flow = Flow( + load('data/sheets.xlsx'), + ) + data, package, stats = flow.results() + assert len(package.descriptor['resources']) == 1 + assert data == [ + [{'id': 1, 'name': 'london'}], + ] + + +def test_load_excel_sheet_by_number(): + from dataflows import load + flow = Flow( + load('data/sheets.xlsx', sheet=2), + ) + data, package, stats = flow.results() + assert len(package.descriptor['resources']) == 1 + assert data == [ + [{'id': 2, 'name': 'paris'}], + ] + + +def test_load_excel_sheet_by_name(): + from dataflows import load + flow = Flow( + load('data/sheets.xlsx', sheet='Sheet3'), + ) + data, package, stats = flow.results() + assert len(package.descriptor['resources']) == 1 + assert data == [ + [{'id': 3, 'name': 'rome'}], + ] + + +def test_load_excel_sheets_all(): + from dataflows import load + flow = Flow( + load('data/sheets.xlsx', sheets='.*'), + ) + data, package, stats = flow.results() + assert len(package.descriptor['resources']) == 3 + print(package.descriptor['resources'][0]) + assert package.descriptor['resources'][0]['name'] == 'sheet1' + assert package.descriptor['resources'][1]['name'] == 'sheet2' + assert package.descriptor['resources'][2]['name'] == 'sheet3' + assert package.descriptor['resources'][0]['path'] == 'sheet1.xlsx' + assert package.descriptor['resources'][1]['path'] == 'sheet2.xlsx' + assert package.descriptor['resources'][2]['path'] == 'sheet3.xlsx' + assert data == [ + [{'id': 1, 'name': 'london'}], + [{'id': 2, 'name': 'paris'}], + [{'id': 3, 'name': 'rome'}], + ] + + +def test_load_excel_sheets_matching(): + from dataflows import load + flow = Flow( + load('data/sheets.xlsx', sheets='Sheet[1,3]'), + ) + data, package, stats = flow.results() + assert len(package.descriptor['resources']) == 2 + assert package.descriptor['resources'][0]['name'] == 'sheet1' + assert package.descriptor['resources'][1]['name'] == 'sheet3' + assert package.descriptor['resources'][0]['path'] == 'sheet1.xlsx' + assert package.descriptor['resources'][1]['path'] == 'sheet3.xlsx' + assert data == [ + [{'id': 1, 'name': 'london'}], + [{'id': 3, 'name': 'rome'}], + ] + + +def test_load_excel_sheets_not_found(): + from dataflows import load, exceptions + flow = Flow( + load('data/sheets.xlsx', sheets='Sheet[4]'), + ) + with pytest.raises(exceptions.ProcessorError) as excinfo: + data, package, stats = flow.results() + assert 'No sheets found' in str(excinfo.value)