From 651aafe37e6e69b58d3ab80a1d016881e13ac687 Mon Sep 17 00:00:00 2001 From: roll Date: Tue, 26 May 2020 14:33:10 +0300 Subject: [PATCH 1/7] Added docs --- PROCESSORS.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index 8ee2244..2282abb 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -11,7 +11,7 @@ DataFlows comes with a few built-in processors which do most of the heavy liftin - **dump_to_sql** - Store the results in a relational database (creates one or more tables or updates existing tables) ### Flow Control -- **conditional** - Run parts of the flow based on the structure of the datapackage at the calling point +- **conditional** - Run parts of the flow based on the structure of the datapackage at the calling point - **finalizer** - Call a function when all data had been processed - **checkpoint** - Cache results of a subflow in a datapackage and load it upon request @@ -66,6 +66,7 @@ def load(source, name=None, resources=None, strip=True, limit_rows=None, - A list of resource names to load - `None` indicates to load all resources - The index of the resource in the package +- `sheets` - REGEX. For the Excel format it's possible to pass the `sheet` option to `tabulator` to open the exact Excel sheet. Dataflows supports also the `sheets` option allowing to load multiple Excel sheets at once if their names match the given regex. - `options` - based on the loaded file, extra options (e.g. `sheet` for Excel files etc., see the link to tabulator above) Relevant only when _not_ loading data from a datapackage: From 94860b5b5013922fa6285770b80097e6c1773646 Mon Sep 17 00:00:00 2001 From: roll Date: Tue, 26 May 2020 14:34:17 +0300 Subject: [PATCH 2/7] Added tests --- tests/test_lib.py | 86 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/tests/test_lib.py b/tests/test_lib.py index f382f59..eddbc3a 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -1743,3 +1743,89 @@ def finalize(): assert stats['processed'] == 10 assert stats['detected'] == 10 + + +# Excel sheets loading + +def test_load_excel_sheet_default(): + from dataflows import load + flow = Flow( + load('data/sheets.xlsx'), + ) + data, package, stats = flow.results() + assert len(package.descriptor['resources']) == 1 + assert data == [ + [{'id': 1, 'name': 'london'}], + ] + + +def test_load_excel_sheet_by_number(): + from dataflows import load + flow = Flow( + load('data/sheets.xlsx', sheet=2), + ) + data, package, stats = flow.results() + assert len(package.descriptor['resources']) == 1 + assert data == [ + [{'id': 2, 'name': 'paris'}], + ] + + +def test_load_excel_sheet_by_name(): + from dataflows import load + flow = Flow( + load('data/sheets.xlsx', sheet='Sheet3'), + ) + data, package, stats = flow.results() + assert len(package.descriptor['resources']) == 1 + assert data == [ + [{'id': 3, 'name': 'rome'}], + ] + + +def test_load_excel_sheets_all(): + from dataflows import load + flow = Flow( + load('data/sheets.xlsx', sheets='.*'), + ) + data, package, stats = flow.results() + assert len(package.descriptor['resources']) == 3 + print(package.descriptor['resources'][0]) + assert package.descriptor['resources'][0]['name'] == 'sheet1' + assert package.descriptor['resources'][1]['name'] == 'sheet2' + assert package.descriptor['resources'][2]['name'] == 'sheet3' + assert package.descriptor['resources'][0]['path'] == 'sheet1.xlsx' + assert package.descriptor['resources'][1]['path'] == 'sheet2.xlsx' + assert package.descriptor['resources'][2]['path'] == 'sheet3.xlsx' + assert data == [ + [{'id': 1, 'name': 'london'}], + [{'id': 2, 'name': 'paris'}], + [{'id': 3, 'name': 'rome'}], + ] + + +def test_load_excel_sheets_matching(): + from dataflows import load + flow = Flow( + load('data/sheets.xlsx', sheets='Sheet[1,3]'), + ) + data, package, stats = flow.results() + assert len(package.descriptor['resources']) == 2 + assert package.descriptor['resources'][0]['name'] == 'sheet1' + assert package.descriptor['resources'][1]['name'] == 'sheet3' + assert package.descriptor['resources'][0]['path'] == 'sheet1.xlsx' + assert package.descriptor['resources'][1]['path'] == 'sheet3.xlsx' + assert data == [ + [{'id': 1, 'name': 'london'}], + [{'id': 3, 'name': 'rome'}], + ] + + +def test_load_excel_sheets_not_found(): + from dataflows import load + flow = Flow( + load('data/sheets.xlsx', sheets='Sheet[4]'), + ) + with pytest.raises(RuntimeError) as excinfo: + data, package, stats = flow.results() + assert 'No sheets found' in str(excinfo.value) From ddd9aeaaaacbbccdf329e123a5415506e64cf8fe Mon Sep 17 00:00:00 2001 From: roll Date: Tue, 26 May 2020 14:41:23 +0300 Subject: [PATCH 3/7] Added load --- dataflows/processors/load.py | 103 ++++++++++++++++++++++------------- 1 file changed, 64 insertions(+), 39 deletions(-) diff --git a/dataflows/processors/load.py b/dataflows/processors/load.py index cbd351f..ba7f472 100644 --- a/dataflows/processors/load.py +++ b/dataflows/processors/load.py @@ -208,52 +208,77 @@ def safe_process_datapackage(self, dp: Package): self.resource_descriptors.append(resource.descriptor) self.iterators.append(resource.iter(keyed=True, cast=True)) + # Loading multiple excel sheets + elif self.options.get('sheets'): + options = deepcopy(self.options) + pattern = re.compile(options['sheets']) + try: + while True: + options['sheet'] = options.get('sheet', 0) + 1 + descriptor, stream = self.get_resource(options) + if re.search(pattern, stream.fragment): + descriptor['name'] = slugify(stream.fragment, to_lower=True) + descriptor['path'] = '.'.join([descriptor['name'], stream.format]) + self.resource_descriptors.append(descriptor) + self.iterators.append(stream.iter(keyed=True)) + except tabulator.exceptions.SourceError: + pass + if not self.resource_descriptors: + message = 'No sheets found for the regex "%s"' + raise RuntimeError(message % options['sheets']) + # Loading for any other source else: - path = os.path.basename(self.load_source) - path = os.path.splitext(path)[0] - descriptor = dict(path=self.name or path, - profile='tabular-data-resource') + descriptor, stream = self.get_resource(self.options) self.resource_descriptors.append(descriptor) - descriptor['name'] = self.name or path - if 'encoding' in self.options: - descriptor['encoding'] = self.options['encoding'] - self.options.setdefault('custom_parsers', {}).setdefault('xml', XMLParser) - self.options.setdefault('ignore_blank_headers', True) - self.options.setdefault('headers', 1) - stream: Stream = Stream(self.load_source, **self.options).open() - if len(stream.headers) != len(set(stream.headers)): - if not self.deduplicate_headers: - raise ValueError( - 'Found duplicate headers.' + - 'Use the `deduplicate_headers` flag (found headers=%r)' % stream.headers) - stream.headers = self.rename_duplicate_headers(stream.headers) - schema = Schema().infer( - stream.sample, headers=stream.headers, - confidence=1, guesser_cls=self.guesser) - if self.override_schema: - schema.update(self.override_schema) - if self.override_fields: - fields = schema.get('fields', []) - for field in fields: - field.update(self.override_fields.get(field['name'], {})) - if self.extract_missing_values: - missing_values = schema.get('missingValues', []) - if not self.extract_missing_values['values']: - self.extract_missing_values['values'] = missing_values - schema['fields'].append({ - 'name': self.extract_missing_values['target'], - 'type': 'object', - 'format': 'default', - 'values': self.extract_missing_values['values'], - }) - descriptor['schema'] = schema - descriptor['format'] = self.options.get('format', stream.format) - descriptor['path'] += '.{}'.format(stream.format) self.iterators.append(stream.iter(keyed=True)) + dp.descriptor.setdefault('resources', []).extend(self.resource_descriptors) return dp + def get_resource(self, options): + options = deepcopy(options) + path = os.path.basename(self.load_source) + path = os.path.splitext(path)[0] + descriptor = dict(path=self.name or path, + profile='tabular-data-resource') + descriptor['name'] = self.name or path + if 'encoding' in self.options: + descriptor['encoding'] = self.options['encoding'] + self.options.setdefault('custom_parsers', {}).setdefault('xml', XMLParser) + self.options.setdefault('ignore_blank_headers', True) + self.options.setdefault('headers', 1) + stream: Stream = Stream(self.load_source, **self.options).open() + if len(stream.headers) != len(set(stream.headers)): + if not self.deduplicate_headers: + raise ValueError( + 'Found duplicate headers.' + + 'Use the `deduplicate_headers` flag (found headers=%r)' % stream.headers) + stream.headers = self.rename_duplicate_headers(stream.headers) + schema = Schema().infer( + stream.sample, headers=stream.headers, + confidence=1, guesser_cls=self.guesser) + if self.override_schema: + schema.update(self.override_schema) + if self.override_fields: + fields = schema.get('fields', []) + for field in fields: + field.update(self.override_fields.get(field['name'], {})) + if self.extract_missing_values: + missing_values = schema.get('missingValues', []) + if not self.extract_missing_values['values']: + self.extract_missing_values['values'] = missing_values + schema['fields'].append({ + 'name': self.extract_missing_values['target'], + 'type': 'object', + 'format': 'default', + 'values': self.extract_missing_values['values'], + }) + descriptor['schema'] = schema + descriptor['format'] = self.options.get('format', stream.format) + descriptor['path'] += '.{}'.format(stream.format) + return (descriptor, stream) + def stripper(self, iterator): for r in iterator: yield dict( From c8a9cb2a51c84ad777255aced838d10347c37760 Mon Sep 17 00:00:00 2001 From: roll Date: Tue, 26 May 2020 18:16:41 +0300 Subject: [PATCH 4/7] Fixed tests --- data/sheets.xlsx | Bin 0 -> 7468 bytes dataflows/processors/__init__.py | 2 +- dataflows/processors/load.py | 24 +++++++++++++-------- tests/test_lib.py | 35 +++++++++++++++---------------- 4 files changed, 33 insertions(+), 28 deletions(-) create mode 100644 data/sheets.xlsx diff --git a/data/sheets.xlsx b/data/sheets.xlsx new file mode 100644 index 0000000000000000000000000000000000000000..7cb86ad655d859e133deb96b00db5bc5fd4855f1 GIT binary patch literal 7468 zcmeHMbySq^wx)(=hyesda%hC1ML}W!K?F&qyHh$_m=oq9Z*x1-8**Ey_qg*%?h|lge+$N5O zZkBfDPJjQv!{u&klRBVl*9^hO+5mk8s!v*KQGlSNdIO3O;53un)tIPmVAlAP4|fGE zy-;avd&KtF`|ObD9@_X|Rl7o-Q}`WO&0c+GF0d*$zx$-8Cm)PP*G5@&{0s7{KUPSze5Ea+tVE*i=2K zZK)oYq=+C{ zw{s{GCZCfx$^hvULbR-FsEwi8M6Io_(ULsC7`HGr&_kHvKB&BI*NRN)V;&VHb9D^oRN@Pc-n_>mml5&v9AZL!hiJUos8DC#F~>!>4p@AZiA50T$s12P>S%7 z9TA6rD$N&Sa7Kc2X-1G>o(@3*gGp2edts^(^^e)_k`l&E8>EjynB&I@Ge9`gd?f=?zh?|_@UzfPwJdAPGN#WbU7r$wi&3Kbl?~*SSy~-Kdm#hzsaVBQDLI~m3YcO&3 zXOgLePxx-8lv4CsALak`IeumIK0dOU>aKrg;b_k_05Aod^;T z4E&Xptp}4CY8+e>GjIh+nMYbj+}RaEN#zsbI)1?N#AxPL-U%8q17?D_lWYGo7M$h5 z?^poGBeookz@E4(+8BM0c9@wyBx-Wfh7^xWr~jx|=*aXK#^TCCD`O3w7eSYi(G*S5 zYt9k1-EEYV%5o?P1ap$6zZG0~&K$nXjGu+aNzW`xVdRWO?@MLT-dM;O7u$b8X-xH) zoBB1!4mgx^4CmBVyqs4vi)l~hOXFzU2Qm0swep7#ugC5&pSaVPb=+Z3l>H$pjRR|N zOM5pR0#HscQZM#IMFt@Nb(^g0-=)EMSv{`>;tlDi$Xej|6ltkrB>2UWDjf59v^vXPGoi7%w%UM zGXmU%gwgbH@2B@o3WirH8GdST;9*3g%$+$736xUs^cUHpwYQz^h$(J~=W!Lv6Fk?!}wX z$Cj&Zl_e=%A0?yHez!O8TjRHjn_MrRsyx!G0f@+F_B+w!Ze#PfC)87GAGq&B%k{Ox z`S58rf0LzWz-aWCCg%_XfAl=>y~>#Vkoc-Q_0M^SQb9V!LZA`F<_*I8_m(omHy(Xy zyfr0Yhda&Nk&^F%H?IIiEzMIC#YT#yLo-7^WrMy2 zkw0}`pbEB~?b8izSfvNs&)JXRtcSdBjo}sY4DM}ZbGmuG9{qzS7pX?@&Vp{Fmr|xv z`=G516)$81&G}lowSLL%+1Ytq_nXI9<9eu@&mBxVQJz9gc!6`gMd~Xo}*zr(A73yyd*xcErO@YqOW2mx`LXh zaD?Cc#3(-?%#C|wskkcoj_890bs;?tA0V$=-65CQxEkkp8cjYJhva}_LLxZbGX?T! zwKF@r{7G}N%oI-0O6au^g~T=gQ^9ndhNYk(n6%Q#JQ17Z5ZcH&QP4KBGaV$w2%+rvd1^B05``+M05oeLw4l`W}wijnfew z`<|fB)-?^)vDGM-yiljrQeT0u@D*?7G!3H(TAtKXc-eZ(XPPmZ>Z8r^1@ZAUM)$mF zpac;5hw+haUVdmcz z>JJLdiRFqdBO-#p*dl`biP)wW!QT^OKSJXJeZE7}6A0hU)ajlOi`0*)0BIn)Q2A53 zQJqXo@dcuap$4jl=*?bVm~OwCy*lgZP37Y!F3qDe=0tun`KA~%j|y02UnDxTeaqc% z>Y7zm)rgkvlZrMk^x9QjZtn`eU9DO%l2se?k!!V6v*s=C=0Y`V1%5;6{<&YD_`q$a zrtGYp(L)>$7EIm4B-{J*kDkLu?@*q^ZPlSI&Awji^6_Su^+yk=S!By)64fIXu_J6v zzP24)8eccaew~LG{ga{jL|(%xV}4eo%3Z?mA+bf0@+86vEp@yrA+A=jI$$de?xChO z>Q$cUs-x8I`RHlM>&B~OM#`!kXrxt2K{$$ztJ&10_8#u+tIOkTSe)853qr05lA4<4 zTRYK$KRn6tRDFu%^tqcg3OJgt5%aj|voyo|bBu&DjSR(Jp;4+UV*#d1toQVCu!B)l zNfKRNAKm{X>Xq{zLiud>eWFuvLx}lq(29TfIk6D!GY64)w7tpkaAd06G>El5YwaH;hq(9Os9M$urIP zjKb6fw@ce@^Oad$O*jAtB!$xhHCwgmQb>-7osLzdIGk+lianhpDLaL2v)-yv^#57z8{8QUSdQLPa3qwa!6IEwN zM9+d+axyu6bhq`1EvQ zS?K+ugo278Va*y!pgSkeW?#$pVaYxQV<@!)*FBmO*=f?-u;3Is(@jYJ=d5yG#du|S z4#{rMeM`#n9@kQptA_C*vRgL0Esx$&g8V3ZnpbA%k&CnvfXB!zatVQ_7`NyV{ z;BVeLd)Szsxm~9wfgHEIB~HaRXjEN&({aFns=Os8(N=(G4r@1qq$40!F{k#nymF{* zrT8F2Fp%8MRhaKDuGA?4nK<+1S~Tr}ZT2iFE!2pT8sC%g2UqmjHlK|JTE8nv&-(G> z?NRI@#A<(7juM+!Lw|tF|0I!z`=({3z6AGmGp$E~Sojl>`;&5$dzD=l@U9YTv&s^F zo^jwR2Uov)I0;as%#f)OVxi46FFZ(4W2nP$4;0*>Af-hMh^#pWerN4uq$s?_NR{MRZ8yoP&{cPDanHTX)HB18;B6C+M%j?_ zGoxi#E4q9kt?}y9a+VyZyh`70Zu$*qjI-u!*J94Zi*8QTbuZD$EGBD{OgSY0TWEEM zqui0~?)OvYpq*6AZWy;5Ne4kuYZR-1?M|xdjwN0Ku4?#0R#swdJt(#T>;9U1Lqbb@ zq&elN+lcN{{|O+>Q$pl`7+dzv`^XK#mx5_)A$^6nsVd_fyr6^+W>YzqptVyev7SZF zN`ecD%s9d?W*|~*UVB0u`e)@wClQrx0+ACWfIqQ<@H{V&!-$c+z4h5nBUawd4?-&a za6pvqJqY^bt`c3`YZYeo95~*iUPg^gtPvnn&AJ~7Lm!5VNqAL9$Kp4_@>mJLXPhh$ zNgtHNXOS6n0E=EPsu)Sc=;ur2vE&EOV@6VcOgfG#PKx76LfIN+%HF6CXLG4VX&WDp zPj_GvdcYH<>XrMptdaL2iaXOX1HOb9MCYwy?NKZM8zCJLq^_o`IgBuQO;y?LDi2n zQod=|ueRr=(u8T^uqcwqray09HZLVNbbGOxY*;R5VEl}5C%F(iHCiwHW06O#1ww?>tl*K3l2pNX65$srop1$0y6j?8BYDXb!OgWdg8VLY$z1my%s%Bg72~(4 z6W+OY*h-{r4j|P8;uktrmq+;QPV~$YYk)1wzrv(Z(Pf6!+*Tic1?N1s(4y7EEmroS z|9&5K$Mjtij}wh;b-wNr6B8xXx17-)OpoVZIB41_m2^!?3dxD!)hO_*A}1_kpIqK- zjO0hrT-VK?jyK~0bPv$s?8UH@cB)Wptl}@g@M8kG>SicsvtO+wm43cTsnnn>i$|jx zzQ&e(C~OGi`kEz?Ef!t69gz~*)h04dkqh(HoS-n<$7|s}G1>+o^={snEN~AIoxwlG z4-fGd=fV414=Dlkg%AM-5DNvf9hFCh;Ti9Em~9AX(&12g@mH2_n%CA;BJh&p_P3to z79@EX5&kfw<4EsxG_W9BZlXdR+Nl*q!Bf_{&Qii^(1zkt&c=PCE7?v`tg?~B$j|4t zjN80O9*sFAucO6aE~8javW$IYoK-yLKG*x*@rJ8PkCO{Npm}HE8LX{r(7vd)Q*vi; zN&C=FOQs(JmbZuCYA~OG#U_K>IWrpH#6A_#s6*@zu&}f3Gw4bXjhpo}>k$8TWXV6cmk z{M|=-FN+4Dd?(EQ<-3x>^-}lROJ3QMYQN~yfe}>*u5|Vq*!z3^Wl4~bkhN+ou9i<< z+b%Z85nxVlqpU`fknrYa;3G#G3FQz9t3IL6+=G>-_0I^ol{&JG-n3Q7GPp3-Pc73P zQk;Tlg_5=;nB3LYn>w0badC^0|5%2447b8`d+VfvB#I8j{Kl-Wy=JI9@*T#n=YM*m@voyG|L<>5&Nc=AI!YqiZ&A($t;;AEw|T#MG=rcW!j{Y1tFxiy zGRnn$7~?NNnMD+(Ur~O$2fPe;alP`ZS&N8k`4#ZzQtdLz#r?#uW-Vb~M!8_t&&AAT zz>DMVuV$^`Tn0Qd>$fq#I4}Nc);dDXes%7GS!Y;hB6r!Wi+wRtBz~SB_-BBZhtA6Y z7h6kY`~36h5}g73LoZQ^r;|87VxKYFoFBD<8I#|h!g-@@_tp5^kx z7mEb4l>R(;WM>bb2k;*X>t&pa3XCi%KaV{5F9qiEgBRuE*VqeQ`K??l$pa86jDkXh O_)AwJ6g~!YQ literal 0 HcmV?d00001 diff --git a/dataflows/processors/__init__.py b/dataflows/processors/__init__.py index 72ecf80..db96198 100644 --- a/dataflows/processors/__init__.py +++ b/dataflows/processors/__init__.py @@ -13,7 +13,7 @@ from .deduplicate import deduplicate from .duplicate import duplicate from .filter_rows import filter_rows -from .finalizer import finalizer +# from .finalizer import finalizer from .find_replace import find_replace from .join import join, join_self, join_with_self from .select_fields import select_fields diff --git a/dataflows/processors/load.py b/dataflows/processors/load.py index ba7f472..d56e1df 100644 --- a/dataflows/processors/load.py +++ b/dataflows/processors/load.py @@ -2,6 +2,10 @@ import warnings import datetime +import re +import tabulator +from copy import deepcopy +from slugify import slugify from datapackage import Package from tabulator import Stream from tabulator.parser import Parser @@ -212,9 +216,11 @@ def safe_process_datapackage(self, dp: Package): elif self.options.get('sheets'): options = deepcopy(self.options) pattern = re.compile(options['sheets']) + sheets = options.pop('sheets') + options['sheet'] = 0 try: while True: - options['sheet'] = options.get('sheet', 0) + 1 + options['sheet'] += 1 descriptor, stream = self.get_resource(options) if re.search(pattern, stream.fragment): descriptor['name'] = slugify(stream.fragment, to_lower=True) @@ -225,7 +231,7 @@ def safe_process_datapackage(self, dp: Package): pass if not self.resource_descriptors: message = 'No sheets found for the regex "%s"' - raise RuntimeError(message % options['sheets']) + raise RuntimeError(message % sheets) # Loading for any other source else: @@ -243,12 +249,12 @@ def get_resource(self, options): descriptor = dict(path=self.name or path, profile='tabular-data-resource') descriptor['name'] = self.name or path - if 'encoding' in self.options: - descriptor['encoding'] = self.options['encoding'] - self.options.setdefault('custom_parsers', {}).setdefault('xml', XMLParser) - self.options.setdefault('ignore_blank_headers', True) - self.options.setdefault('headers', 1) - stream: Stream = Stream(self.load_source, **self.options).open() + if 'encoding' in options: + descriptor['encoding'] = options['encoding'] + options.setdefault('custom_parsers', {}).setdefault('xml', XMLParser) + options.setdefault('ignore_blank_headers', True) + options.setdefault('headers', 1) + stream: Stream = Stream(self.load_source, **options).open() if len(stream.headers) != len(set(stream.headers)): if not self.deduplicate_headers: raise ValueError( @@ -275,7 +281,7 @@ def get_resource(self, options): 'values': self.extract_missing_values['values'], }) descriptor['schema'] = schema - descriptor['format'] = self.options.get('format', stream.format) + descriptor['format'] = options.get('format', stream.format) descriptor['path'] += '.{}'.format(stream.format) return (descriptor, stream) diff --git a/tests/test_lib.py b/tests/test_lib.py index eddbc3a..1c01c2c 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -1720,29 +1720,28 @@ def test_conditional(): dict(a=i, c=i) for i in range(3) ] +# def test_finalizer(): + # from dataflows import Flow, finalizer -def test_finalizer(): - from dataflows import Flow, finalizer + # stats = dict( + # processed=0, + # detected=None + # ) - stats = dict( - processed=0, - detected=None - ) - - def process(row): - stats['processed'] += 1 + # def process(row): + # stats['processed'] += 1 - def finalize(): - stats['detected'] = stats['processed'] + # def finalize(): + # stats['detected'] = stats['processed'] - Flow( - (dict(a=1) for i in range(10)), - process, - finalizer(finalize), - ).process() + # Flow( + # (dict(a=1) for i in range(10)), + # process, + # finalizer(finalize), + # ).process() - assert stats['processed'] == 10 - assert stats['detected'] == 10 + # assert stats['processed'] == 10 + # assert stats['detected'] == 10 # Excel sheets loading From 6a91c10a08e5cfcf097a126ec6973ff26ae2c498 Mon Sep 17 00:00:00 2001 From: roll Date: Wed, 27 May 2020 13:33:39 +0300 Subject: [PATCH 5/7] Use workbook_cache --- dataflows/processors/load.py | 31 +++++++++++++++---------------- setup.py | 2 +- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/dataflows/processors/load.py b/dataflows/processors/load.py index d56e1df..89f45ce 100644 --- a/dataflows/processors/load.py +++ b/dataflows/processors/load.py @@ -214,14 +214,14 @@ def safe_process_datapackage(self, dp: Package): # Loading multiple excel sheets elif self.options.get('sheets'): - options = deepcopy(self.options) - pattern = re.compile(options['sheets']) - sheets = options.pop('sheets') - options['sheet'] = 0 + sheets = self.options.pop('sheets') + pattern = re.compile(sheets) + self.options['workbook_cache'] = {} + self.options['sheet'] = 0 try: while True: - options['sheet'] += 1 - descriptor, stream = self.get_resource(options) + self.options['sheet'] += 1 + descriptor, stream = self.get_resource() if re.search(pattern, stream.fragment): descriptor['name'] = slugify(stream.fragment, to_lower=True) descriptor['path'] = '.'.join([descriptor['name'], stream.format]) @@ -235,26 +235,25 @@ def safe_process_datapackage(self, dp: Package): # Loading for any other source else: - descriptor, stream = self.get_resource(self.options) + descriptor, stream = self.get_resource() self.resource_descriptors.append(descriptor) self.iterators.append(stream.iter(keyed=True)) dp.descriptor.setdefault('resources', []).extend(self.resource_descriptors) return dp - def get_resource(self, options): - options = deepcopy(options) + def get_resource(self): path = os.path.basename(self.load_source) path = os.path.splitext(path)[0] descriptor = dict(path=self.name or path, profile='tabular-data-resource') descriptor['name'] = self.name or path - if 'encoding' in options: - descriptor['encoding'] = options['encoding'] - options.setdefault('custom_parsers', {}).setdefault('xml', XMLParser) - options.setdefault('ignore_blank_headers', True) - options.setdefault('headers', 1) - stream: Stream = Stream(self.load_source, **options).open() + if 'encoding' in self.options: + descriptor['encoding'] = self.options['encoding'] + self.options.setdefault('custom_parsers', {}).setdefault('xml', XMLParser) + self.options.setdefault('ignore_blank_headers', True) + self.options.setdefault('headers', 1) + stream: Stream = Stream(self.load_source, **self.options).open() if len(stream.headers) != len(set(stream.headers)): if not self.deduplicate_headers: raise ValueError( @@ -281,7 +280,7 @@ def get_resource(self, options): 'values': self.extract_missing_values['values'], }) descriptor['schema'] = schema - descriptor['format'] = options.get('format', stream.format) + descriptor['format'] = self.options.get('format', stream.format) descriptor['path'] += '.{}'.format(stream.format) return (descriptor, stream) diff --git a/setup.py b/setup.py index e5cd31b..227f283 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ def read(*paths): PACKAGE = 'dataflows' NAME = PACKAGE.replace('_', '-') INSTALL_REQUIRES = [ - 'tabulator>=1.38.4', + 'tabulator>=1.49', 'datapackage>=1.5.0', 'tableschema>=1.5', 'kvfile>=0.0.8', From 73984188f5a0164df24126b6d001ba5879f3b544 Mon Sep 17 00:00:00 2001 From: roll Date: Wed, 27 May 2020 13:59:20 +0300 Subject: [PATCH 6/7] Uncomment finalizer --- dataflows/processors/__init__.py | 2 +- tests/test_lib.py | 35 ++++++++++++++++---------------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/dataflows/processors/__init__.py b/dataflows/processors/__init__.py index db96198..72ecf80 100644 --- a/dataflows/processors/__init__.py +++ b/dataflows/processors/__init__.py @@ -13,7 +13,7 @@ from .deduplicate import deduplicate from .duplicate import duplicate from .filter_rows import filter_rows -# from .finalizer import finalizer +from .finalizer import finalizer from .find_replace import find_replace from .join import join, join_self, join_with_self from .select_fields import select_fields diff --git a/tests/test_lib.py b/tests/test_lib.py index 1c01c2c..eddbc3a 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -1720,28 +1720,29 @@ def test_conditional(): dict(a=i, c=i) for i in range(3) ] -# def test_finalizer(): - # from dataflows import Flow, finalizer - # stats = dict( - # processed=0, - # detected=None - # ) +def test_finalizer(): + from dataflows import Flow, finalizer - # def process(row): - # stats['processed'] += 1 + stats = dict( + processed=0, + detected=None + ) + + def process(row): + stats['processed'] += 1 - # def finalize(): - # stats['detected'] = stats['processed'] + def finalize(): + stats['detected'] = stats['processed'] - # Flow( - # (dict(a=1) for i in range(10)), - # process, - # finalizer(finalize), - # ).process() + Flow( + (dict(a=1) for i in range(10)), + process, + finalizer(finalize), + ).process() - # assert stats['processed'] == 10 - # assert stats['detected'] == 10 + assert stats['processed'] == 10 + assert stats['detected'] == 10 # Excel sheets loading From fbdbd1c8eda4b235245b0db4605033c1de87bc5b Mon Sep 17 00:00:00 2001 From: roll Date: Wed, 27 May 2020 15:02:41 +0300 Subject: [PATCH 7/7] FIxed tests --- tests/test_lib.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_lib.py b/tests/test_lib.py index eddbc3a..2e14080 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -1822,10 +1822,10 @@ def test_load_excel_sheets_matching(): def test_load_excel_sheets_not_found(): - from dataflows import load + from dataflows import load, exceptions flow = Flow( load('data/sheets.xlsx', sheets='Sheet[4]'), ) - with pytest.raises(RuntimeError) as excinfo: + with pytest.raises(exceptions.ProcessorError) as excinfo: data, package, stats = flow.results() assert 'No sheets found' in str(excinfo.value)