Skip to content

Commit

Permalink
fixes to ckan dataset fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
OriHoch committed Jul 15, 2024
1 parent fd7c3ab commit be6ae23
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
29 changes: 20 additions & 9 deletions datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import json
import uuid
import shutil

import requests
Expand Down Expand Up @@ -37,15 +36,24 @@ def get_filtered_tabular_resources_to_update(tmpdir, source_filter, id_, name, f
print(f'filtering tabular data from {filename} with format {format_}...')
resources_to_update = []
DF.Flow(
DF.load(f'{tmpdir}/{id_}', name='filtered', format=format_.lower()),
DF.load(f'{tmpdir}/{id_}', name='filtered', format=format_.lower(), infer_strategy=DF.load.INFER_STRINGS, cast_strategy=DF.load.CAST_TO_STRINGS),
DF.filter_rows(lambda row: all(row.get(k) == v for k, v in source_filter.items())),
DF.printer(),
DF.dump_to_path(f'{tmpdir}/{id_}-filtered')
).process()
with open(f'{tmpdir}/{id_}-filtered/datapackage.json', 'r') as f:
hash_ = json.load(f)['hash']
shutil.copyfile(f'{tmpdir}/{id_}-filtered/filtered.csv', f'{tmpdir}/{id_}')
resources_to_update.append((id_, name, 'CSV', hash_, description, filename))
dp = json.load(f)
hash_ = dp['hash']
count_of_rows = dp['count_of_rows']
if count_of_rows == 0:
print('no rows found, skipping resource')
else:
shutil.copyfile(f'{tmpdir}/{id_}-filtered/filtered.csv', f'{tmpdir}/{id_}')
if not filename.lower().endswith('.csv'):
filename = filename.lower().replace('.xlsx', '.csv').replace('.xls', '.csv')
if not filename.endswith('.csv'):
filename = f'{filename}.csv'
resources_to_update.append((id_, name, 'CSV', hash_, description, filename))
return resources_to_update


Expand All @@ -56,10 +64,13 @@ def get_filtered_geojson_resources_to_update(tmpdir, source_filter, id_, name, f
data = json.load(f)
features = data.get('features') or []
features = [feature for feature in features if all(feature['properties'].get(k) == v for k, v in source_filter.items())]
data['features'] = features
with open(f'{tmpdir}/{id_}', 'w') as f:
json.dump(data, f)
resources_to_update.append((id_, name, 'GEOJSON', hash_, description, filename))
if not features:
print('no features found, skipping resource')
else:
data['features'] = features
with open(f'{tmpdir}/{id_}', 'w') as f:
json.dump(data, f)
resources_to_update.append((id_, name, 'GEOJSON', hash_, description, filename))
return resources_to_update


Expand Down
4 changes: 1 addition & 3 deletions datacity_ckan_dgp/operators/generic_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ def operator(name, params):
tmpdir = params.get('tmpdir')
with tempdir(tmpdir) as tmpdir:
print('starting generic_fetcher operator')
print(f'source_url={source_url} target_instance_name={target_instance_name} target_package_id={target_package_id} target_organization_id={target_organization_id}')
print(f'source_filter={source_filter}')
print(f'tmpdir={tmpdir}')
print(json.dumps(params))
for fetcher in FETCHERS:
assert fetcher['match'].keys() == {'url_contains'}, 'only url_contains match is supported at the moment'
if fetcher['match']['url_contains'] in source_url:
Expand Down

0 comments on commit be6ae23

Please sign in to comment.