Skip to content

Commit

Permalink
fixes to ckan dataset fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
OriHoch committed Aug 4, 2024
1 parent be6ae23 commit 8410998
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 38 deletions.
121 changes: 87 additions & 34 deletions datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from .. import ckan
from ..utils import http_stream_download
from ..utils.locking import instance_package_lock
from ..operators import packages_processing


DESCRIPTION = """
Expand All @@ -22,6 +24,8 @@
3. If GEOJSON resource is available it will be copied and filtered separately
4. All other resources will be ignored
After create/update of data it will run the packages_processing to create xlsx / geojson related resources
Local development examples:
without source filter:
python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir"}'
Expand All @@ -32,12 +36,51 @@
DEVEL_SKIP_DOWNLOAD = os.getenv('DEVEL_SKIP_DOWNLOAD', 'false').lower() == 'true'


def run_packages_processing(instance_name, package_id):
for task in ['geojson', 'xlsx']:
assert packages_processing.operator('_', {
'instance_name': instance_name,
'task': task
}, only_package_id=package_id, with_lock=False)


def check_row_kv(row, key, val):
rowval = str(row.get(key) or '').strip()
val = str(val or '').strip()
return rowval == val


def filter_rows(source_filter):

def filter_row(row):
if isinstance(source_filter, list):
for filter_ in source_filter:
if all(check_row_kv(row, k, v) for k, v in filter_.items()):
return True
elif isinstance(source_filter, dict):
filter_type = source_filter.pop('..filtertype', None)
if filter_type == 'multi_key_vals':
for key in source_filter['keys']:
for val in source_filter['vals']:
if check_row_kv(row, key, val):
return True
elif filter_type is None:
return all(check_row_kv(row, k, v) for k, v in source_filter.items())
else:
raise ValueError(f'unsupported filter type: {filter_type}')
else:
raise ValueError('source_filter must be a list of dictionaries or a dictionary')
return False

return filter_row


def get_filtered_tabular_resources_to_update(tmpdir, source_filter, id_, name, format_, hash_, description, filename):
print(f'filtering tabular data from {filename} with format {format_}...')
resources_to_update = []
DF.Flow(
DF.load(f'{tmpdir}/{id_}', name='filtered', format=format_.lower(), infer_strategy=DF.load.INFER_STRINGS, cast_strategy=DF.load.CAST_TO_STRINGS),
DF.filter_rows(lambda row: all(row.get(k) == v for k, v in source_filter.items())),
DF.filter_rows(filter_rows(source_filter)),
DF.printer(),
DF.dump_to_path(f'{tmpdir}/{id_}-filtered')
).process()
Expand Down Expand Up @@ -91,7 +134,7 @@ def get_resources_to_update(resources, tmpdir, headers, existing_target_resource
source_format = resource.get('format') or ''
source_name = resource.get('name') or ''
description = resource.get('description') or ''
if source_filter or existing_target_resources.get(f'{source_name}.{source_format}', {}).get('hash') != source_hash:
if source_filter or existing_target_resources.get(f'{source_name}.{source_format}'.lower(), {}).get('hash') != source_hash:
resources_to_update.append((id_, source_name, source_format, source_hash, description, filename))
if source_filter:
prefiltered_resources = resources_to_update
Expand Down Expand Up @@ -121,7 +164,7 @@ def fetch(source_url, target_instance_name, target_package_id, target_organizati
hash_ = resource.get('hash') or ''
id_ = resource.get('id') or ''
if format_ and name and id_:
existing_target_resources[f'{name}.{format_}'] = {'hash': hash_, 'id': id_}
existing_target_resources[f'{name}.{format_}'.lower()] = {'hash': hash_, 'id': id_}
source_package_id = source_url.split('/dataset/')[1].split('/')[0]
source_instance_baseurl = source_url.split('/dataset/')[0]
if 'data.gov.il' in source_instance_baseurl:
Expand All @@ -143,43 +186,53 @@ def fetch(source_url, target_instance_name, target_package_id, target_organizati
with open(f'{tmpdir}/package.json', 'w') as f:
json.dump(res, f)
package_title = res['result']['title']
# organization_title = res['result'].get('organization', {}).get('title')
# if organization_title:
# package_title = f'{package_title} | {organization_title}'
description = 'מקור המידע: ' + source_url
notes = res['result'].get('notes') or ''
if notes:
description = f'{notes}\n\n{description}'
resources_to_update = get_resources_to_update(res['result']['resources'], tmpdir, headers, existing_target_resources, source_filter)
if resources_to_update:
print(f'updating {len(resources_to_update)} resources')
if not target_package_exists:
print('creating target package')
res = ckan.package_create(target_instance_name, {
'name': target_package_id,
'title': package_title,
'owner_org': target_organization_id
})
assert res['success'], str(res)
for id_, name, format_, hash_, description, filename in resources_to_update:
print(f'{name}.{format_}')
if os.path.exists(f'{tmpdir}/{filename}'):
os.unlink(f'{tmpdir}/{filename}')
os.rename(f'{tmpdir}/{id_}', f'{tmpdir}/{filename}')
if f'{name}.{format_}' in existing_target_resources:
if existing_target_resources[f'{name}.{format_}'].get('hash') and existing_target_resources[f'{name}.{format_}']['hash'] == hash_:
print('existing resource found and hash is the same, skipping resource data update')
with instance_package_lock(target_instance_name, target_package_id):
print(f'updating {len(resources_to_update)} resources')
if not target_package_exists:
print('creating target package')
res = ckan.package_create(target_instance_name, {
'name': target_package_id,
'title': package_title,
'notes': description,
'owner_org': target_organization_id
})
assert res['success'], str(res)
for id_, name, format_, hash_, description, filename in resources_to_update:
print(f'{name}.{format_}')
if os.path.exists(f'{tmpdir}/{filename}'):
os.unlink(f'{tmpdir}/{filename}')
os.rename(f'{tmpdir}/{id_}', f'{tmpdir}/{filename}')
if f'{name}.{format_}'.lower() in existing_target_resources:
if existing_target_resources[f'{name}.{format_}'.lower()].get('hash') and existing_target_resources[f'{name}.{format_}'.lower()]['hash'] == hash_:
print('existing resource found and hash is the same, skipping resource data update')
else:
print('existing resource found, but hash is different, updating resource data')
res = ckan.resource_update(target_instance_name, {
'id': existing_target_resources[f'{name}.{format_}'.lower()]['id'],
'hash': hash_,
'description': description
}, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))])
assert res['success'], str(res)
else:
print('existing resource found, but hash is different, updating resource data')
res = ckan.resource_update(target_instance_name, {
'id': existing_target_resources[f'{name}.{format_}']['id'],
print('no existing resource found, creating new resource')
res = ckan.resource_create(target_instance_name, {
'package_id': target_package_id,
'format': format_,
'name': name,
'hash': hash_,
'description': description
}, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))])
assert res['success'], str(res)
else:
print('no existing resource found, creating new resource')
res = ckan.resource_create(target_instance_name, {
'package_id': target_package_id,
'format': format_,
'name': name,
'hash': hash_,
'description': description
}, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))])
assert res['success'], str(res)
print('done, all resources created/updated')
run_packages_processing(target_instance_name, target_package_id)
print('done, all resources created/updated')
else:
print('no resources to create/update')
2 changes: 1 addition & 1 deletion datacity_ckan_dgp/operators/generic_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def operator(name, params):
tmpdir = params.get('tmpdir')
with tempdir(tmpdir) as tmpdir:
print('starting generic_fetcher operator')
print(json.dumps(params))
print(json.dumps(params, ensure_ascii=False))
for fetcher in FETCHERS:
assert fetcher['match'].keys() == {'url_contains'}, 'only url_contains match is supported at the moment'
if fetcher['match']['url_contains'] in source_url:
Expand Down
8 changes: 6 additions & 2 deletions datacity_ckan_dgp/operators/packages_processing.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import traceback
from datacity_ckan_dgp import ckan
from datacity_ckan_dgp.utils.locking import instance_package_lock


def operator(name, params):
def operator(name, params, only_package_id=None, with_lock=True):
instance_name = params['instance_name']
task = params['task']
if task == "geojson":
Expand All @@ -13,8 +14,11 @@ def operator(name, params):
raise Exception("Unknown processing task: {}".format(task))
num_errors = 0
for package_id in ckan.package_list(instance_name):
if only_package_id and package_id != only_package_id:
continue
try:
process_package(instance_name, package_id)
with instance_package_lock(instance_name, package_id, with_lock):
process_package(instance_name, package_id)
except:
traceback.print_exc()
num_errors += 1
Expand Down
62 changes: 62 additions & 0 deletions datacity_ckan_dgp/utils/locking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os
import uuid
import json
import glob
import time
import datetime
from contextlib import contextmanager


LOCK_TTL_SECONDS = 3600 # 1 hour
WAIT_TTL_SECONDS = 60 * 10 # 10 minutes
BASE_LOCK_PATH = os.getenv('BASE_LOCK_PATH', '/var/ckan_dgp_locks')


def is_my_lock_active(my_lock_id, lock_path):
locks_to_remove = []
valid_locks = {}
for lock_file in glob.glob(os.path.join(lock_path, f'*.json')):
with open(lock_file) as f:
lock = json.load(f)
lock_id = lock['id']
lock_time = datetime.datetime.strptime(lock['time'], '%Y-%m-%d %H:%M:%S:%f')
if lock_time < datetime.datetime.now() - datetime.timedelta(seconds=LOCK_TTL_SECONDS):
locks_to_remove.append(lock_file)
else:
valid_locks[f'{lock_time.strftime("%Y-%m-%d %H:%M:%S:%f")}_{lock_id}'] = lock_id
active_lock_key = list(sorted(valid_locks.keys()))[0] if len(valid_locks) > 0 else None
active_lock_id = valid_locks[active_lock_key] if active_lock_key else None
if active_lock_id == my_lock_id:
for lock_file in locks_to_remove:
os.remove(lock_file)
return True
else:
return False


@contextmanager
def instance_package_lock(instance_name, package_id, with_lock=True):
if with_lock:
lock_id = str(uuid.uuid4())
lock_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f')
lock_file = os.path.join(BASE_LOCK_PATH, instance_name, package_id, f'{lock_id}.json')
os.makedirs(os.path.dirname(lock_file), exist_ok=True)
with open(lock_file, 'w') as f:
json.dump({
'id': lock_id,
'time': lock_time,
}, f)
start_wait_time = datetime.datetime.now()
while True:
if is_my_lock_active(lock_id, os.path.dirname(lock_file)):
break
if datetime.datetime.now() - start_wait_time > datetime.timedelta(seconds=WAIT_TTL_SECONDS):
os.remove(lock_file)
raise Exception(f'Failed to acquire lock for {instance_name}/{package_id} after {WAIT_TTL_SECONDS} seconds')
time.sleep(1)
try:
yield
finally:
os.remove(lock_file)
else:
yield
4 changes: 3 additions & 1 deletion tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pytest==6.2.1
pytest==8.3.2
pytest-mock==3.14.0

73 changes: 73 additions & 0 deletions tests/test_locking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os
import uuid
import json
import glob
import tempfile
import datetime

import pytest

from datacity_ckan_dgp.utils.locking import is_my_lock_active, LOCK_TTL_SECONDS, instance_package_lock


def create_lock_file(lockdir, my_lock_time=None):
my_lock_id = str(uuid.uuid4())
if my_lock_time is None:
my_lock_time = datetime.datetime.now()
with open(f'{lockdir}/{my_lock_id}.json', 'w') as f:
json.dump({
'id': my_lock_id,
'time': my_lock_time.strftime('%Y-%m-%d %H:%M:%S:%f')
}, f)
return my_lock_id


def test_is_my_lock_active_no_locks():
with tempfile.TemporaryDirectory() as tmpdir:
my_lock_id = create_lock_file(tmpdir)
assert is_my_lock_active(my_lock_id, tmpdir)


def test_is_my_lock_active_older_lock_exists():
with tempfile.TemporaryDirectory() as tmpdir:
older_lock_id = create_lock_file(tmpdir, datetime.datetime.now() - datetime.timedelta(seconds=1))
my_lock_id = create_lock_file(tmpdir)
assert not is_my_lock_active(my_lock_id, tmpdir)
os.remove(f'{tmpdir}/{older_lock_id}.json')
assert is_my_lock_active(my_lock_id, tmpdir)


def test_is_my_lock_active_delete_older_expired_lock():
with tempfile.TemporaryDirectory() as tmpdir:
expired_lock_id = create_lock_file(tmpdir, datetime.datetime.now() - datetime.timedelta(seconds=LOCK_TTL_SECONDS+1))
my_lock_id = create_lock_file(tmpdir)
assert is_my_lock_active(my_lock_id, tmpdir)
assert not os.path.exists(f'{tmpdir}/{expired_lock_id}.json')


def test_is_my_lock_active_ignore_newer_lock():
with tempfile.TemporaryDirectory() as tmpdir:
create_lock_file(tmpdir, datetime.datetime.now() + datetime.timedelta(seconds=5))
my_lock_id = create_lock_file(tmpdir)
assert is_my_lock_active(my_lock_id, tmpdir)


def test_is_my_lock_active_same_time():
with tempfile.TemporaryDirectory() as tmpdir:
my_lock_time = datetime.datetime.now()
my_lock_id = create_lock_file(tmpdir, my_lock_time)
other_lock_id = create_lock_file(tmpdir, my_lock_time)
assert is_my_lock_active(my_lock_id, tmpdir) == (my_lock_id < other_lock_id)


def test_instance_package_lock():
from datacity_ckan_dgp.utils import locking
with tempfile.TemporaryDirectory() as tmpdir:
locking.BASE_LOCK_PATH = tmpdir
locking.WAIT_TTL_SECONDS = 2
with instance_package_lock('test_instance', 'test_package'):
assert len(glob.glob(f'{tmpdir}/test_instance/test_package/*.json')) == 1
with pytest.raises(Exception, match='Failed to acquire lock for test_instance/test_package after 2 seconds'):
with instance_package_lock('test_instance', 'test_package'):
pass
assert len(glob.glob(f'{tmpdir}/test_instance/test_package/*.json')) == 0

0 comments on commit 8410998

Please sign in to comment.