Skip to content

Commit

Permalink
add generic fetcher with support for ckan dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
OriHoch committed Jul 11, 2024
1 parent daf0a69 commit c564f18
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 4 deletions.
27 changes: 27 additions & 0 deletions configuration.template.json
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,33 @@
"type": "text"
}
]
},
{
"name": "generic_fetcher",
"display": "Fetch/Update data from different source types into a CKAN Package",
"fields": [
{
"name": "source_url",
"display": "Source URL (source type will be inferred from the URL, see https://github.com/hasadna/datacity-ckan-dgp/blob/main/datacity_ckan_dgp/operators/generic_fetcher.py for details)",
"type": "text"
},
{
"name": "target_instance_name",
"display": "Target CKAN Instance",
"type": "enum",
"options": ["__CKAN_INSTANCES__"]
},
{
"name": "target_package_id",
"display": "ID of Package to Update (or Create)",
"type": "text"
},
{
"name": "target_organization_id",
"display": "Owner Organization of created package",
"type": "text"
}
]
}
],
"theme": {
Expand Down
Empty file.
80 changes: 80 additions & 0 deletions datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import os

import requests

from .. import ckan
from ..utils import http_stream_download


def fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir):
res = ckan.package_show(target_instance_name, target_package_id)
target_package_exists = False
existing_target_resources = {}
if res:
target_package_exists = True
for resource in res['resources']:
format_ = resource.get('format') or ''
name = resource.get('name') or ''
hash_ = resource.get('hash') or ''
id_ = resource.get('id') or ''
if format_ and name and hash_ and id_:
existing_target_resources[f'{name}.{format_}'] = {'hash': hash_, 'id': id_}
source_package_id = source_url.split('/dataset/')[1].split('/')[0]
source_instance_baseurl = source_url.split('/dataset/')[0]
if 'data.gov.il' in source_instance_baseurl:
headers = {'user-agent': 'datagov-external-client'}
else:
headers = None
res = requests.get(f'{source_instance_baseurl}/api/3/action/package_show?id={source_package_id}', headers=headers).json()
assert res['success']
package_title = res['result']['title']
resources_to_update = []
for resource in res['result']['resources']:
id_ = resource.get('id') or ''
url = resource.get('url') or ''
if url and id_:
if 'e.data.gov.il' in url:
url = url.replace('e.data.gov.il', 'data.gov.il')
filename = url.split('/')[-1]
source_hash = http_stream_download(f'{tmpdir}/{id_}', {'url': url, 'headers': headers})
source_format = resource.get('format') or ''
source_name = resource.get('name') or ''
description = resource.get('description') or ''
if existing_target_resources.get(f'{source_name}.{source_format}', {}).get('hash') != source_hash:
resources_to_update.append((id_, source_name, source_format, source_hash, description, filename))
if resources_to_update:
print(f'updating {len(resources_to_update)} resources')
if not target_package_exists:
print('creating target package')
res = ckan.package_create(target_instance_name, {
'name': target_package_id,
'title': package_title,
'owner_org': target_organization_id
})
assert res['success'], str(res)
for id_, name, format_, hash_, description, filename in resources_to_update:
print(f'{name}.{format_}')
if os.path.exists(f'{tmpdir}/{filename}'):
os.unlink(f'{tmpdir}/{filename}')
os.rename(f'{tmpdir}/{id_}', f'{tmpdir}/{filename}')
if f'{name}.{format_}' in existing_target_resources:
print('existing resource found, but hash is different, updating resource data')
res = ckan.resource_update(target_instance_name, {
'id': existing_target_resources[f'{name}.{format_}']['id'],
'hash': hash_,
'description': description
}, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))])
assert res['success'], str(res)
else:
print('no existing resource found, creating new resource')
res = ckan.resource_create(target_instance_name, {
'package_id': target_package_id,
'format': format_,
'name': name,
'hash': hash_,
'description': description
}, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))])
assert res['success'], str(res)
print('done, all resources created/updated')
else:
print('no resources to create/update')
50 changes: 50 additions & 0 deletions datacity_ckan_dgp/operators/generic_fetcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os
import sys
import json
import tempfile
import contextlib
from importlib import import_module


# the source url will be checked against the following types in order to determine which type of source it is
FETCHERS = [
{
# python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir"}'
'fetcher': 'ckan_dataset',
'match': {
'url_contains': '/dataset/'
}
}
]


@contextlib.contextmanager
def tempdir(tmpdir):
if tmpdir:
os.makedirs(tmpdir, exist_ok=True)
yield tmpdir
else:
with tempfile.TemporaryDirectory() as tmpdir:
yield tmpdir


def operator(name, params):
source_url = params['source_url']
target_instance_name = params['target_instance_name']
target_package_id = params['target_package_id']
target_organization_id = params['target_organization_id']
tmpdir = params.get('tmpdir')
with tempdir(tmpdir) as tmpdir:
print('starting generic_fetcher operator')
print(f'source_url={source_url} target_instance_name={target_instance_name} target_package_id={target_package_id} target_organization_id={target_organization_id}')
print(f'tmpdir={tmpdir}')
for fetcher in FETCHERS:
assert fetcher['match'].keys() == {'url_contains'}, 'only url_contains match is supported at the moment'
if fetcher['match']['url_contains'] in source_url:
import_module(f'datacity_ckan_dgp.generic_fetchers.{fetcher["fetcher"]}_fetcher').fetch(source_url, target_instance_name, target_package_id, target_organization_id, tmpdir)
break


# python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir"}'
if __name__ == '__main__':
operator('_', json.loads(sys.argv[1]))
14 changes: 10 additions & 4 deletions datacity_ckan_dgp/operators/instance_initializer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import traceback
from glob import glob
from ruamel import yaml
from ruamel.yaml import YAML
from collections import defaultdict

import pyproj
Expand All @@ -11,6 +11,9 @@
from datacity_ckan_dgp import utils


yaml = YAML(typ='safe', pure=True)


AUTOMATION_GROUP_NAME = 'instance_initializer'
ORGANIZATIONS_YAML = os.path.join(os.path.dirname(__file__), '..', 'instance_initializer_organizations.yaml')
GROUPS_YAML = os.path.join(os.path.dirname(__file__), '..', 'instance_initializer_groups.yaml')
Expand All @@ -35,7 +38,7 @@ def init_groups(instance_name):
print("Initializing groups")
if not ckan.automation_group_get(instance_name, AUTOMATION_GROUP_NAME, 'initialized_groups'):
with open(GROUPS_YAML) as f:
groups = yaml.safe_load(f)
groups = yaml.load(f)
for group in groups:
if not ckan.group_show(instance_name, group['id']):
ckan.group_create(instance_name, group['id'], title=group['title'], image_url=group['icon'])
Expand All @@ -49,7 +52,7 @@ def init_organizations(instance_name, default_organization_title):
print("Initializing organizations")
if not ckan.automation_group_get(instance_name, AUTOMATION_GROUP_NAME, 'initialized_organizations'):
with open(ORGANIZATIONS_YAML) as f:
organizations = yaml.safe_load(f)
organizations = yaml.load(f)
for organization in organizations:
if not ckan.organization_show(instance_name, organization['id']):
title = default_organization_title if organization['id'] == 'muni' else organization['title']
Expand Down Expand Up @@ -214,7 +217,7 @@ def init_packages(instance_name, muni_filter_texts, init_package_id=None):
print("Initializing packages")
if not ckan.automation_group_get(instance_name, AUTOMATION_GROUP_NAME, 'initialized_packages'):
with open(PACKAGES_YAML) as f:
packages = yaml.safe_load(f)
packages = yaml.load(f)
num_errors = 0
num_success = 0
for package in packages:
Expand Down Expand Up @@ -250,6 +253,9 @@ def operator(name, params, init_package_id=None):
init_packages(instance_name, muni_filter_texts, init_package_id)


# python3 -m datacity_ckan_dgp.operators.instance_initializer '{"instance_name": "local_development", "default_organization_title": "עיריית חיפה", "muni_filter_texts": "חיפה"}'


if __name__ == '__main__':
import sys
import json
Expand Down

0 comments on commit c564f18

Please sign in to comment.