diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index c0f3f84f..e92c2fde 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -53,6 +53,13 @@ def xloader_submit(context, data_dict): if errors: raise p.toolkit.ValidationError(errors) + p.toolkit.check_access('xloader_submit', context, data_dict) + + # If sync is set to True, the xloader callback will be executed right + # away, instead of a job being enqueued. It will also delete any existing jobs + # for the given resource. This is only controlled by sysadmins or the system. + sync = data_dict.pop('sync', False) + res_id = data_dict['resource_id'] try: resource_dict = p.toolkit.get_action('resource_show')(context, { @@ -166,15 +173,20 @@ def xloader_submit(context, data_dict): job = enqueue_job( jobs.xloader_data_into_datastore, [data], queue=custom_queue, title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id), - rq_kwargs=dict(timeout=timeout) + rq_kwargs=dict(timeout=timeout, at_front=sync) ) except Exception: - log.exception('Unable to enqueued xloader res_id=%s', res_id) + if sync: + log.exception('Unable to xloader res_id=%s', res_id) + else: + log.exception('Unable to enqueued xloader res_id=%s', res_id) return False log.debug('Enqueued xloader job=%s res_id=%s', job.id, res_id) - value = json.dumps({'job_id': job.id}) + if sync: + log.debug('Pushed xloader sync mode job=%s res_id=%s to front of queue', job.id, res_id) + task['value'] = value task['state'] = 'pending' task['last_updated'] = str(datetime.datetime.utcnow()) diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index 717657bd..89114783 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -119,6 +119,25 @@ groups: to True. type: bool required: false + - key: ckanext.xloader.validation.requires_successful_report + default: False + example: True + description: | + Resources are required to pass Validation from the ckanext-validation + plugin to be able to get XLoadered. + type: bool + required: false + - key: ckanext.xloader.validation.enforce_schema + default: True + example: False + description: | + Resources are expected to have a Validation Schema, or use the default ones if not. + + If this option is set to `False`, Resources that do not have + a Validation Schema will be treated like they do not require Validation. + + See https://github.com/frictionlessdata/ckanext-validation?tab=readme-ov-file#data-schema + for more details. - key: ckanext.xloader.clean_datastore_tables default: False example: True diff --git a/ckanext/xloader/plugin.py b/ckanext/xloader/plugin.py index e0ce027e..09d138a7 100644 --- a/ckanext/xloader/plugin.py +++ b/ckanext/xloader/plugin.py @@ -12,6 +12,12 @@ from . import action, auth, helpers as xloader_helpers, utils from ckanext.xloader.utils import XLoaderFormats +try: + from ckanext.validation.interfaces import IPipeValidation + HAS_IPIPE_VALIDATION = True +except ImportError: + HAS_IPIPE_VALIDATION = False + try: config_declarations = toolkit.blanket.config_declarations except AttributeError: @@ -34,6 +40,8 @@ class xloaderPlugin(plugins.SingletonPlugin): plugins.implements(plugins.IResourceController, inherit=True) plugins.implements(plugins.IClick) plugins.implements(plugins.IBlueprint) + if HAS_IPIPE_VALIDATION: + plugins.implements(IPipeValidation) # IClick def get_commands(self): @@ -68,6 +76,22 @@ def configure(self, config_): ) ) + # IPipeValidation + + def receive_validation_report(self, validation_report): + if utils.requires_successful_validation_report(): + res_dict = toolkit.get_action('resource_show')({'ignore_auth': True}, + {'id': validation_report.get('resource_id')}) + if (toolkit.asbool(toolkit.config.get('ckanext.xloader.validation.enforce_schema', True)) + or res_dict.get('schema', None)) and validation_report.get('status') != 'success': + # A schema is present, or required to be present + return + # if validation is running in async mode, it is running from the redis workers. + # thus we need to do sync=True to have Xloader put the job at the front of the queue. + sync = toolkit.asbool(toolkit.config.get(u'ckanext.validation.run_on_update_async', True)) + self._submit_to_xloader(res_dict, sync=sync) + + # IDomainObjectModification def notify(self, entity, operation): @@ -95,7 +119,11 @@ def notify(self, entity, operation): if _should_remove_unsupported_resource_from_datastore(resource_dict): toolkit.enqueue_job(fn=_remove_unsupported_resource_from_datastore, args=[entity.id]) - if not getattr(entity, 'url_changed', False): + if utils.requires_successful_validation_report(): + log.debug("Deferring xloading resource %s because the " + "resource did not pass validation yet.", entity.id) + return + elif not getattr(entity, 'url_changed', False): # do not submit to xloader if the url has not changed. return @@ -104,6 +132,11 @@ def notify(self, entity, operation): # IResourceController def after_resource_create(self, context, resource_dict): + if utils.requires_successful_validation_report(): + log.debug("Deferring xloading resource %s because the " + "resource did not pass validation yet.", resource_dict.get('id')) + return + self._submit_to_xloader(resource_dict) def before_resource_show(self, resource_dict): @@ -146,7 +179,7 @@ def before_show(self, resource_dict): def after_update(self, context, resource_dict): self.after_resource_update(context, resource_dict) - def _submit_to_xloader(self, resource_dict): + def _submit_to_xloader(self, resource_dict, sync=False): context = {"ignore_auth": True, "defer_commit": True} if not XLoaderFormats.is_it_an_xloader_format(resource_dict["format"]): log.debug( @@ -165,14 +198,20 @@ def _submit_to_xloader(self, resource_dict): return try: - log.debug( - "Submitting resource %s to be xloadered", resource_dict["id"] - ) + if sync: + log.debug( + "xloadering resource %s in sync mode", resource_dict["id"] + ) + else: + log.debug( + "Submitting resource %s to be xloadered", resource_dict["id"] + ) toolkit.get_action("xloader_submit")( context, { "resource_id": resource_dict["id"], "ignore_hash": self.ignore_hash, + "sync": sync, }, ) except toolkit.ValidationError as e: diff --git a/ckanext/xloader/schema.py b/ckanext/xloader/schema.py index c0e8d938..47ae65a5 100644 --- a/ckanext/xloader/schema.py +++ b/ckanext/xloader/schema.py @@ -16,6 +16,7 @@ boolean_validator = get_validator('boolean_validator') int_validator = get_validator('int_validator') OneOf = get_validator('OneOf') +ignore_not_sysadmin = get_validator('ignore_not_sysadmin') if p.toolkit.check_ckan_version('2.9'): unicode_safe = get_validator('unicode_safe') @@ -29,6 +30,7 @@ def xloader_submit_schema(): 'id': [ignore_missing], 'set_url_type': [ignore_missing, boolean_validator], 'ignore_hash': [ignore_missing, boolean_validator], + 'sync': [ignore_missing, boolean_validator, ignore_not_sysadmin], '__junk': [empty], '__before': [dsschema.rename('id', 'resource_id')] } diff --git a/ckanext/xloader/tests/test_plugin.py b/ckanext/xloader/tests/test_plugin.py index 8382e68b..f22dafbd 100644 --- a/ckanext/xloader/tests/test_plugin.py +++ b/ckanext/xloader/tests/test_plugin.py @@ -60,6 +60,93 @@ def test_submit_when_url_changes(self, monkeypatch): assert func.called + @pytest.mark.ckan_config("ckanext.xloader.validation.requires_successful_report", True) + def test_require_validation(self, monkeypatch): + func = mock.Mock() + monkeypatch.setitem(_actions, "xloader_submit", func) + + mock_resource_validation_show = mock.Mock() + monkeypatch.setitem(_actions, "resource_validation_show", mock_resource_validation_show) + + dataset = factories.Dataset() + + resource = helpers.call_action( + "resource_create", + {}, + package_id=dataset["id"], + url="http://example.com/file.csv", + format="CSV", + validation_status='failure', + ) + + # TODO: test IPipeValidation + assert not func.called # because of the validation_status not being `success` + func.called = None # reset + + helpers.call_action( + "resource_update", + {}, + id=resource["id"], + package_id=dataset["id"], + url="http://example.com/file2.csv", + format="CSV", + validation_status='success', + ) + + # TODO: test IPipeValidation + assert not func.called # because of the validation_status is `success` + + @pytest.mark.ckan_config("ckanext.xloader.validation.requires_successful_report", True) + @pytest.mark.ckan_config("ckanext.xloader.validation.enforce_schema", False) + def test_enforce_validation_schema(self, monkeypatch): + func = mock.Mock() + monkeypatch.setitem(_actions, "xloader_submit", func) + + mock_resource_validation_show = mock.Mock() + monkeypatch.setitem(_actions, "resource_validation_show", mock_resource_validation_show) + + dataset = factories.Dataset() + + resource = helpers.call_action( + "resource_create", + {}, + package_id=dataset["id"], + url="http://example.com/file.csv", + schema='', + validation_status='', + ) + + # TODO: test IPipeValidation + assert not func.called # because of the schema being empty + func.called = None # reset + + helpers.call_action( + "resource_update", + {}, + id=resource["id"], + package_id=dataset["id"], + url="http://example.com/file2.csv", + schema='https://example.com/schema.json', + validation_status='failure', + ) + + # TODO: test IPipeValidation + assert not func.called # because of the validation_status not being `success` and there is a schema + func.called = None # reset + + helpers.call_action( + "resource_update", + {}, + package_id=dataset["id"], + id=resource["id"], + url="http://example.com/file3.csv", + schema='https://example.com/schema.json', + validation_status='success', + ) + + # TODO: test IPipeValidation + assert not func.called # because of the validation_status is `success` and there is a schema + @pytest.mark.parametrize("toolkit_config_value, mock_xloader_formats, url_type, datastore_active, expected_result", [ # Test1: Should pass as it is an upload with an active datastore entry but an unsupported format (True, False, 'upload', True, True), diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index 073a8091..3ed75055 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -2,6 +2,7 @@ import json import datetime +from rq import get_current_job from ckan import model from ckan.lib import search @@ -9,7 +10,12 @@ from decimal import Decimal import ckan.plugins as p -from ckan.plugins.toolkit import config +from ckan.plugins.toolkit import config, h, _ + +from logging import getLogger + + +log = getLogger(__name__) # resource.formats accepted by ckanext-xloader. Must be lowercase here. DEFAULT_FORMATS = [ @@ -42,9 +48,73 @@ def is_it_an_xloader_format(cls, format_): return format_.lower() in cls._formats +def requires_successful_validation_report(): + return p.toolkit.asbool(config.get('ckanext.xloader.validation.requires_successful_report', False)) + + +def awaiting_validation(res_dict): + # type: (dict) -> bool + """ + Checks the existence of a logic action from the ckanext-validation + plugin, thus supporting any extending of the Validation Plugin class. + + Checks ckanext.xloader.validation.requires_successful_report config + option value. + + Checks ckanext.xloader.validation.enforce_schema config + option value. Then checks the Resource's validation_status. + """ + if not requires_successful_validation_report(): + # validation.requires_successful_report is turned off, return right away + return False + + try: + # check for one of the main actions from ckanext-validation + # in the case that users extend the Validation plugin class + # and rename the plugin entry-point. + p.toolkit.get_action('resource_validation_show') + is_validation_plugin_loaded = True + except KeyError: + is_validation_plugin_loaded = False + + if not is_validation_plugin_loaded: + # the validation plugin is not loaded but required, log a warning + log.warning('ckanext.xloader.validation.requires_successful_report requires the ckanext-validation plugin to be activated.') + return False + + if (p.toolkit.asbool(config.get('ckanext.xloader.validation.enforce_schema', True)) + or res_dict.get('schema', None)) and res_dict.get('validation_status', None) != 'success': + + # either validation.enforce_schema is turned on or it is off and there is a schema, + # we then explicitly check for the `validation_status` report to be `success`` + return True + + # at this point, we can assume that the Resource is not waiting for Validation. + # or that the Resource does not have a Validation Schema and we are not enforcing schemas. + return False + + def resource_data(id, resource_id, rows=None): if p.toolkit.request.method == "POST": + + context = { + "ignore_auth": True, + } + resource_dict = p.toolkit.get_action("resource_show")( + context, + { + "id": resource_id, + }, + ) + + if awaiting_validation(resource_dict): + h.flash_error(_("Cannot upload resource %s to the DataStore " + "because the resource did not pass validation yet.") % resource_id) + return p.toolkit.redirect_to( + "xloader.resource_data", id=id, resource_id=resource_id + ) + try: p.toolkit.get_action("xloader_submit")( None, @@ -201,7 +271,7 @@ def type_guess(rows, types=TYPES, strict=False): at_least_one_value = [] for ri, row in enumerate(rows): diff = len(row) - len(guesses) - for _ in range(diff): + for _i in range(diff): typesdict = {} for type in types: typesdict[type] = 0 @@ -227,7 +297,7 @@ def type_guess(rows, types=TYPES, strict=False): else: for i, row in enumerate(rows): diff = len(row) - len(guesses) - for _ in range(diff): + for _i in range(diff): guesses.append(defaultdict(int)) for i, cell in enumerate(row): # add string guess so that we have at least one guess