From 8410998b372ed884648a8c5f59a8e62a00ed703c Mon Sep 17 00:00:00 2001 From: ori Date: Sun, 4 Aug 2024 16:01:11 +0300 Subject: [PATCH] fixes to ckan dataset fetcher --- .../generic_fetchers/ckan_dataset_fetcher.py | 121 +++++++++++++----- .../operators/generic_fetcher.py | 2 +- .../operators/packages_processing.py | 8 +- datacity_ckan_dgp/utils/locking.py | 62 +++++++++ tests/requirements.txt | 4 +- tests/test_locking.py | 73 +++++++++++ 6 files changed, 232 insertions(+), 38 deletions(-) create mode 100644 datacity_ckan_dgp/utils/locking.py create mode 100644 tests/test_locking.py diff --git a/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py b/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py index 1ea10ad..ee6f2d2 100644 --- a/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py +++ b/datacity_ckan_dgp/generic_fetchers/ckan_dataset_fetcher.py @@ -7,6 +7,8 @@ from .. import ckan from ..utils import http_stream_download +from ..utils.locking import instance_package_lock +from ..operators import packages_processing DESCRIPTION = """ @@ -22,6 +24,8 @@ 3. If GEOJSON resource is available it will be copied and filtered separately 4. All other resources will be ignored +After create/update of data it will run the packages_processing to create xlsx / geojson related resources + Local development examples: without source filter: python3 -m datacity_ckan_dgp.operators.generic_fetcher '{"source_url": "https://data.gov.il/dataset/automated-devices", "target_instance_name": "LOCAL_DEVELOPMENT", "target_package_id": "automated-devices", "target_organization_id": "israel-gov", "tmpdir": ".data/ckan_fetcher_tmpdir"}' @@ -32,12 +36,51 @@ DEVEL_SKIP_DOWNLOAD = os.getenv('DEVEL_SKIP_DOWNLOAD', 'false').lower() == 'true' +def run_packages_processing(instance_name, package_id): + for task in ['geojson', 'xlsx']: + assert packages_processing.operator('_', { + 'instance_name': instance_name, + 'task': task + }, only_package_id=package_id, with_lock=False) + + +def check_row_kv(row, key, val): + rowval = str(row.get(key) or '').strip() + val = str(val or '').strip() + return rowval == val + + +def filter_rows(source_filter): + + def filter_row(row): + if isinstance(source_filter, list): + for filter_ in source_filter: + if all(check_row_kv(row, k, v) for k, v in filter_.items()): + return True + elif isinstance(source_filter, dict): + filter_type = source_filter.pop('..filtertype', None) + if filter_type == 'multi_key_vals': + for key in source_filter['keys']: + for val in source_filter['vals']: + if check_row_kv(row, key, val): + return True + elif filter_type is None: + return all(check_row_kv(row, k, v) for k, v in source_filter.items()) + else: + raise ValueError(f'unsupported filter type: {filter_type}') + else: + raise ValueError('source_filter must be a list of dictionaries or a dictionary') + return False + + return filter_row + + def get_filtered_tabular_resources_to_update(tmpdir, source_filter, id_, name, format_, hash_, description, filename): print(f'filtering tabular data from {filename} with format {format_}...') resources_to_update = [] DF.Flow( DF.load(f'{tmpdir}/{id_}', name='filtered', format=format_.lower(), infer_strategy=DF.load.INFER_STRINGS, cast_strategy=DF.load.CAST_TO_STRINGS), - DF.filter_rows(lambda row: all(row.get(k) == v for k, v in source_filter.items())), + DF.filter_rows(filter_rows(source_filter)), DF.printer(), DF.dump_to_path(f'{tmpdir}/{id_}-filtered') ).process() @@ -91,7 +134,7 @@ def get_resources_to_update(resources, tmpdir, headers, existing_target_resource source_format = resource.get('format') or '' source_name = resource.get('name') or '' description = resource.get('description') or '' - if source_filter or existing_target_resources.get(f'{source_name}.{source_format}', {}).get('hash') != source_hash: + if source_filter or existing_target_resources.get(f'{source_name}.{source_format}'.lower(), {}).get('hash') != source_hash: resources_to_update.append((id_, source_name, source_format, source_hash, description, filename)) if source_filter: prefiltered_resources = resources_to_update @@ -121,7 +164,7 @@ def fetch(source_url, target_instance_name, target_package_id, target_organizati hash_ = resource.get('hash') or '' id_ = resource.get('id') or '' if format_ and name and id_: - existing_target_resources[f'{name}.{format_}'] = {'hash': hash_, 'id': id_} + existing_target_resources[f'{name}.{format_}'.lower()] = {'hash': hash_, 'id': id_} source_package_id = source_url.split('/dataset/')[1].split('/')[0] source_instance_baseurl = source_url.split('/dataset/')[0] if 'data.gov.il' in source_instance_baseurl: @@ -143,43 +186,53 @@ def fetch(source_url, target_instance_name, target_package_id, target_organizati with open(f'{tmpdir}/package.json', 'w') as f: json.dump(res, f) package_title = res['result']['title'] + # organization_title = res['result'].get('organization', {}).get('title') + # if organization_title: + # package_title = f'{package_title} | {organization_title}' + description = 'מקור המידע: ' + source_url + notes = res['result'].get('notes') or '' + if notes: + description = f'{notes}\n\n{description}' resources_to_update = get_resources_to_update(res['result']['resources'], tmpdir, headers, existing_target_resources, source_filter) if resources_to_update: - print(f'updating {len(resources_to_update)} resources') - if not target_package_exists: - print('creating target package') - res = ckan.package_create(target_instance_name, { - 'name': target_package_id, - 'title': package_title, - 'owner_org': target_organization_id - }) - assert res['success'], str(res) - for id_, name, format_, hash_, description, filename in resources_to_update: - print(f'{name}.{format_}') - if os.path.exists(f'{tmpdir}/{filename}'): - os.unlink(f'{tmpdir}/{filename}') - os.rename(f'{tmpdir}/{id_}', f'{tmpdir}/{filename}') - if f'{name}.{format_}' in existing_target_resources: - if existing_target_resources[f'{name}.{format_}'].get('hash') and existing_target_resources[f'{name}.{format_}']['hash'] == hash_: - print('existing resource found and hash is the same, skipping resource data update') + with instance_package_lock(target_instance_name, target_package_id): + print(f'updating {len(resources_to_update)} resources') + if not target_package_exists: + print('creating target package') + res = ckan.package_create(target_instance_name, { + 'name': target_package_id, + 'title': package_title, + 'notes': description, + 'owner_org': target_organization_id + }) + assert res['success'], str(res) + for id_, name, format_, hash_, description, filename in resources_to_update: + print(f'{name}.{format_}') + if os.path.exists(f'{tmpdir}/{filename}'): + os.unlink(f'{tmpdir}/{filename}') + os.rename(f'{tmpdir}/{id_}', f'{tmpdir}/{filename}') + if f'{name}.{format_}'.lower() in existing_target_resources: + if existing_target_resources[f'{name}.{format_}'.lower()].get('hash') and existing_target_resources[f'{name}.{format_}'.lower()]['hash'] == hash_: + print('existing resource found and hash is the same, skipping resource data update') + else: + print('existing resource found, but hash is different, updating resource data') + res = ckan.resource_update(target_instance_name, { + 'id': existing_target_resources[f'{name}.{format_}'.lower()]['id'], + 'hash': hash_, + 'description': description + }, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))]) + assert res['success'], str(res) else: - print('existing resource found, but hash is different, updating resource data') - res = ckan.resource_update(target_instance_name, { - 'id': existing_target_resources[f'{name}.{format_}']['id'], + print('no existing resource found, creating new resource') + res = ckan.resource_create(target_instance_name, { + 'package_id': target_package_id, + 'format': format_, + 'name': name, 'hash': hash_, 'description': description }, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))]) assert res['success'], str(res) - else: - print('no existing resource found, creating new resource') - res = ckan.resource_create(target_instance_name, { - 'package_id': target_package_id, - 'format': format_, - 'name': name, - 'hash': hash_, - 'description': description - }, files=[('upload', open(f'{tmpdir}/{filename}', 'rb'))]) - assert res['success'], str(res) - print('done, all resources created/updated') + run_packages_processing(target_instance_name, target_package_id) + print('done, all resources created/updated') else: print('no resources to create/update') diff --git a/datacity_ckan_dgp/operators/generic_fetcher.py b/datacity_ckan_dgp/operators/generic_fetcher.py index b4021d0..2aa6c4f 100644 --- a/datacity_ckan_dgp/operators/generic_fetcher.py +++ b/datacity_ckan_dgp/operators/generic_fetcher.py @@ -35,7 +35,7 @@ def operator(name, params): tmpdir = params.get('tmpdir') with tempdir(tmpdir) as tmpdir: print('starting generic_fetcher operator') - print(json.dumps(params)) + print(json.dumps(params, ensure_ascii=False)) for fetcher in FETCHERS: assert fetcher['match'].keys() == {'url_contains'}, 'only url_contains match is supported at the moment' if fetcher['match']['url_contains'] in source_url: diff --git a/datacity_ckan_dgp/operators/packages_processing.py b/datacity_ckan_dgp/operators/packages_processing.py index 4c2c9c3..72d14e3 100644 --- a/datacity_ckan_dgp/operators/packages_processing.py +++ b/datacity_ckan_dgp/operators/packages_processing.py @@ -1,8 +1,9 @@ import traceback from datacity_ckan_dgp import ckan +from datacity_ckan_dgp.utils.locking import instance_package_lock -def operator(name, params): +def operator(name, params, only_package_id=None, with_lock=True): instance_name = params['instance_name'] task = params['task'] if task == "geojson": @@ -13,8 +14,11 @@ def operator(name, params): raise Exception("Unknown processing task: {}".format(task)) num_errors = 0 for package_id in ckan.package_list(instance_name): + if only_package_id and package_id != only_package_id: + continue try: - process_package(instance_name, package_id) + with instance_package_lock(instance_name, package_id, with_lock): + process_package(instance_name, package_id) except: traceback.print_exc() num_errors += 1 diff --git a/datacity_ckan_dgp/utils/locking.py b/datacity_ckan_dgp/utils/locking.py new file mode 100644 index 0000000..2a84864 --- /dev/null +++ b/datacity_ckan_dgp/utils/locking.py @@ -0,0 +1,62 @@ +import os +import uuid +import json +import glob +import time +import datetime +from contextlib import contextmanager + + +LOCK_TTL_SECONDS = 3600 # 1 hour +WAIT_TTL_SECONDS = 60 * 10 # 10 minutes +BASE_LOCK_PATH = os.getenv('BASE_LOCK_PATH', '/var/ckan_dgp_locks') + + +def is_my_lock_active(my_lock_id, lock_path): + locks_to_remove = [] + valid_locks = {} + for lock_file in glob.glob(os.path.join(lock_path, f'*.json')): + with open(lock_file) as f: + lock = json.load(f) + lock_id = lock['id'] + lock_time = datetime.datetime.strptime(lock['time'], '%Y-%m-%d %H:%M:%S:%f') + if lock_time < datetime.datetime.now() - datetime.timedelta(seconds=LOCK_TTL_SECONDS): + locks_to_remove.append(lock_file) + else: + valid_locks[f'{lock_time.strftime("%Y-%m-%d %H:%M:%S:%f")}_{lock_id}'] = lock_id + active_lock_key = list(sorted(valid_locks.keys()))[0] if len(valid_locks) > 0 else None + active_lock_id = valid_locks[active_lock_key] if active_lock_key else None + if active_lock_id == my_lock_id: + for lock_file in locks_to_remove: + os.remove(lock_file) + return True + else: + return False + + +@contextmanager +def instance_package_lock(instance_name, package_id, with_lock=True): + if with_lock: + lock_id = str(uuid.uuid4()) + lock_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f') + lock_file = os.path.join(BASE_LOCK_PATH, instance_name, package_id, f'{lock_id}.json') + os.makedirs(os.path.dirname(lock_file), exist_ok=True) + with open(lock_file, 'w') as f: + json.dump({ + 'id': lock_id, + 'time': lock_time, + }, f) + start_wait_time = datetime.datetime.now() + while True: + if is_my_lock_active(lock_id, os.path.dirname(lock_file)): + break + if datetime.datetime.now() - start_wait_time > datetime.timedelta(seconds=WAIT_TTL_SECONDS): + os.remove(lock_file) + raise Exception(f'Failed to acquire lock for {instance_name}/{package_id} after {WAIT_TTL_SECONDS} seconds') + time.sleep(1) + try: + yield + finally: + os.remove(lock_file) + else: + yield diff --git a/tests/requirements.txt b/tests/requirements.txt index d5bd56f..6255a49 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1 +1,3 @@ -pytest==6.2.1 +pytest==8.3.2 +pytest-mock==3.14.0 + diff --git a/tests/test_locking.py b/tests/test_locking.py new file mode 100644 index 0000000..9d1b7a8 --- /dev/null +++ b/tests/test_locking.py @@ -0,0 +1,73 @@ +import os +import uuid +import json +import glob +import tempfile +import datetime + +import pytest + +from datacity_ckan_dgp.utils.locking import is_my_lock_active, LOCK_TTL_SECONDS, instance_package_lock + + +def create_lock_file(lockdir, my_lock_time=None): + my_lock_id = str(uuid.uuid4()) + if my_lock_time is None: + my_lock_time = datetime.datetime.now() + with open(f'{lockdir}/{my_lock_id}.json', 'w') as f: + json.dump({ + 'id': my_lock_id, + 'time': my_lock_time.strftime('%Y-%m-%d %H:%M:%S:%f') + }, f) + return my_lock_id + + +def test_is_my_lock_active_no_locks(): + with tempfile.TemporaryDirectory() as tmpdir: + my_lock_id = create_lock_file(tmpdir) + assert is_my_lock_active(my_lock_id, tmpdir) + + +def test_is_my_lock_active_older_lock_exists(): + with tempfile.TemporaryDirectory() as tmpdir: + older_lock_id = create_lock_file(tmpdir, datetime.datetime.now() - datetime.timedelta(seconds=1)) + my_lock_id = create_lock_file(tmpdir) + assert not is_my_lock_active(my_lock_id, tmpdir) + os.remove(f'{tmpdir}/{older_lock_id}.json') + assert is_my_lock_active(my_lock_id, tmpdir) + + +def test_is_my_lock_active_delete_older_expired_lock(): + with tempfile.TemporaryDirectory() as tmpdir: + expired_lock_id = create_lock_file(tmpdir, datetime.datetime.now() - datetime.timedelta(seconds=LOCK_TTL_SECONDS+1)) + my_lock_id = create_lock_file(tmpdir) + assert is_my_lock_active(my_lock_id, tmpdir) + assert not os.path.exists(f'{tmpdir}/{expired_lock_id}.json') + + +def test_is_my_lock_active_ignore_newer_lock(): + with tempfile.TemporaryDirectory() as tmpdir: + create_lock_file(tmpdir, datetime.datetime.now() + datetime.timedelta(seconds=5)) + my_lock_id = create_lock_file(tmpdir) + assert is_my_lock_active(my_lock_id, tmpdir) + + +def test_is_my_lock_active_same_time(): + with tempfile.TemporaryDirectory() as tmpdir: + my_lock_time = datetime.datetime.now() + my_lock_id = create_lock_file(tmpdir, my_lock_time) + other_lock_id = create_lock_file(tmpdir, my_lock_time) + assert is_my_lock_active(my_lock_id, tmpdir) == (my_lock_id < other_lock_id) + + +def test_instance_package_lock(): + from datacity_ckan_dgp.utils import locking + with tempfile.TemporaryDirectory() as tmpdir: + locking.BASE_LOCK_PATH = tmpdir + locking.WAIT_TTL_SECONDS = 2 + with instance_package_lock('test_instance', 'test_package'): + assert len(glob.glob(f'{tmpdir}/test_instance/test_package/*.json')) == 1 + with pytest.raises(Exception, match='Failed to acquire lock for test_instance/test_package after 2 seconds'): + with instance_package_lock('test_instance', 'test_package'): + pass + assert len(glob.glob(f'{tmpdir}/test_instance/test_package/*.json')) == 0