From 520dc602b662ae9b4076840c57a45eaadf7ed768 Mon Sep 17 00:00:00 2001 From: Simon <6615834+simon-20@users.noreply.github.com> Date: Wed, 14 Feb 2024 15:13:47 +0000 Subject: [PATCH 1/3] Changes to Solrize re-index stage to minimise time docs are absent --- src/library/solrize.py | 144 +++++++++++++++++++++++++---------------- 1 file changed, 88 insertions(+), 56 deletions(-) diff --git a/src/library/solrize.py b/src/library/solrize.py index ef0db1d..b5318f3 100644 --- a/src/library/solrize.py +++ b/src/library/solrize.py @@ -87,6 +87,48 @@ def validateLatLon(point_pos): pass return None +def delete_from_solr(cores: dict, file_id: str, file_hash: str, query: dict): + """Delete from the Solr cores""" + + for core_name in cores: + # if one of these deletions fails, the docs from last time in + # core which failed to delete will be cleaned up at end of solrize + # process + try: + cores[core_name].delete(**query) + except Exception as e: + e_message = e.args[0] if hasattr(e, 'args') else '' + raise SolrError('DELETING hash: ' + file_hash + ' and id: ' + file_id + + ', from collection with name ' + core_name + ': ' + e_message) + +def get_blob_data(blob_client: object, conn: object, file_id: str, file_hash: str, + fa: dict, hashed_identifier: str, blob_type: str): + """Downloads and returns activity blob data of the specified type""" + + blob_name = '{}/{}.{}'.format(file_id, hashed_identifier, blob_type) + + try: + blob_client = blob_client.get_blob_client( + container=config['ACTIVITIES_LAKE_CONTAINER_NAME'], + blob=blob_name) + downloader = blob_client.download_blob() + except: + db.resetUnfoundLakify(conn, file_id) + raise SolrizeSourceError(( + "Could not download {} activity blob: {}" + ", file hash: {}, iati-identifier: {}." + " Sending back to Lakify." + ).format(blob_type, blob_name, file_hash, fa['iati_identifier']) + ) + + try: + return utils.get_text_from_blob(downloader, blob_name) + except: + raise SolrizeSourceError(( + "Could not identify charset for {} blob: {}" + ", file hash: {}, iati-identifier: {}" + ).format(blob_type, blob_name, file_hash, fa['iati_identifier'])) + def process_hash_list(document_datasets): """ @@ -126,69 +168,46 @@ def process_hash_list(document_datasets): db.updateSolrizeStartDate(conn, file_id) - logger.info('Removing all docs for doc with hash: ' + - file_hash + ' and id: ' + file_id) - - for core_name in solr_cores: - try: - solr_cores[core_name].delete( - q='iati_activities_document_id:' + file_id) - except Exception as e: - e_message = e.args[0] if hasattr(e, 'args') else '' - raise SolrError('DELETING hash: ' + file_hash + ' and id: ' + file_id + - ', from collection with name ' + core_name + ': ' + e_message) - - logger.info('Adding docs for hash: ' + file_hash + ' and id: ' + file_id) + # check whether existing DS data and new dataset contain only "good" data: + # - flattened activities have any duplicated IATI identifiers in them? + # - existing Solr-docs in solr for this dataset have repeated IDs + new_identifiers = [fa['iati_identifier'] for fa in flattened_activities[0]] + has_dupes = len(set(new_identifiers)) != len(new_identifiers) or \ + solr_cores['activity'].search("id:" + file_id + "--*--1", rows=0).hits > 0 + identifiers_seen = [] identifier_indices = {} + # log to say which update method to be used, but outside the activity + # processing loop to avoid thousands of log messages for large files + if has_dupes: + logger.info(('File with id: {} (hash: {}) had or has dupes, so removing all ' + 'activity, budget, transaction Solr-docs at start of processing ' + 'for each activity').format(file_id, file_hash)) + else: + logger.info(('File with id: {} (hash: {}) had and has no dupes, so updating ' + 'activities in-place and removing budget, transaction Solr-docs ' + 'on a per-activity basis').format(file_id, file_hash)) + for fa in flattened_activities[0]: hashed_iati_identifier = utils.get_hash_for_identifier(fa['iati_identifier']) - blob_name = '{}/{}.xml'.format(file_id, hashed_iati_identifier) - try: - blob_client = blob_service_client.get_blob_client( - container=config['ACTIVITIES_LAKE_CONTAINER_NAME'], - blob=blob_name) - downloader = blob_client.download_blob() - except: - db.resetUnfoundLakify(conn, file_id) - raise SolrizeSourceError( - 'Could not download XML activity blob: ' + blob_name + - ', file hash: ' + file_hash + - ', iati-identifier: ' + fa['iati_identifier'] + - '. Sending back to Lakify.' - ) + # if first time seeing identifier in dataset, delete docs from Solr + if has_dupes and (fa['iati_identifier'] not in identifiers_seen): + delete_from_solr(solr_cores, file_id, file_hash, { + 'q': ('iati_activities_document_id:"{}" AND ' + 'iati_identifier_exact:"{}"').format(file_id, + fa['iati_identifier'])}) - try: - fa['iati_xml'] = utils.get_text_from_blob(downloader, blob_name) - except: - raise SolrizeSourceError('Could not identify charset for blob: ' + blob_name + - ', file hash: ' + file_hash + ', iati-identifier: ' + fa['iati_identifier']) + identifiers_seen.append(fa['iati_identifier']) - json_blob_name = '{}/{}.json'.format(file_id, hashed_iati_identifier) - - try: - json_blob_client = blob_service_client.get_blob_client( - container=config['ACTIVITIES_LAKE_CONTAINER_NAME'], - blob=json_blob_name) - json_downloader = json_blob_client.download_blob() - except: - db.resetUnfoundLakify(conn, file_id) - raise SolrizeSourceError( - 'Could not download JSON activity blob: ' + json_blob_name + - ', file hash: ' + file_hash + - ', iati-identifier: ' + fa['iati_identifier'] + - '. Sending back to Lakify.' - ) - - try: - fa['iati_json'] = utils.get_text_from_blob( - json_downloader, json_blob_name) - except: - raise SolrizeSourceError('Could not identify charset for blob: ' + blob_name + - ', file hash: ' + file_hash + ', iati-identifier: ' + fa['iati_identifier']) + # add the Activity XML/JSON blobs to the flattened activity + fa['iati_xml'] = get_blob_data(blob_service_client, conn, file_id, file_hash, + fa, hashed_iati_identifier, 'xml') + fa['iati_json'] = get_blob_data(blob_service_client, conn, file_id, file_hash, + fa, hashed_iati_identifier, 'json') + # add id and hash fa['iati_activities_document_id'] = file_id fa['iati_activities_document_hash'] = file_hash @@ -206,7 +225,7 @@ def process_hash_list(document_datasets): except KeyError: pass - # Remove sub lists + # Remove sub lists from flattened activity, saving data for use later sub_list_data = {} for element_name in explode_elements: if isinstance(fa.get('@'+element_name), list): @@ -225,11 +244,24 @@ def process_hash_list(document_datasets): del fa['iati_xml'] del fa['iati_json'] - # Now index explode_elements + # For budget, transaction cores, delete docs (if data had & has no dupes), then re-add for element_name, element_data in sub_list_data.items(): + if not has_dupes: + delete_from_solr({element_name: solr_cores[element_name]}, file_id, file_hash, { + 'q': ('iati_activities_document_id:"{}" AND ' + 'iati_identifier_exact:"{}"').format(file_id, + fa['iati_identifier'])}) results = get_explode_element_data(element_name, element_data, fa) addToSolr(element_name, results, file_hash, file_id) + # now do cleanup delete for docs from activities that have been deleted + logger.info(('Removing remaining old Solr-docs for file with id: {} ' + 'where the hash is not equal to current hash: {}').format(file_id, file_hash)) + delete_from_solr(solr_cores, file_id, file_hash, { + 'q': ('iati_activities_document_id:"{}" AND ' + 'NOT(iati_activities_document_hash:"{}")').format(file_id, + file_hash)}) + logger.info('Updating DB with successful Solrize for hash: ' + file_hash + ' and id: ' + file_id) db.completeSolrize(conn, file_id) From 67597424b3ff26ad2cd6deef3a810db0d8be6165 Mon Sep 17 00:00:00 2001 From: Simon <6615834+simon-20@users.noreply.github.com> Date: Fri, 23 Feb 2024 15:06:42 +0000 Subject: [PATCH 2/3] Fix to ensure user-derived IATI Identifier is escaped properly for Solr deletes --- src/library/solrize.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/library/solrize.py b/src/library/solrize.py index b5318f3..11e9c77 100644 --- a/src/library/solrize.py +++ b/src/library/solrize.py @@ -6,6 +6,7 @@ from library.logger import getLogger from constants.config import config from azure.storage.blob import BlobServiceClient +from xml.sax.saxutils import escape import library.db as db import pysolr import library.utils as utils @@ -130,6 +131,12 @@ def get_blob_data(blob_client: object, conn: object, file_id: str, file_hash: st ).format(blob_type, blob_name, file_hash, fa['iati_identifier'])) +def escape_param_for_pysolr_delete(query_param: str): + # escape is used to do XML escaping, since PySolr doesn't do it for delete + # queries, and the \ and " escaping is to escape for a param for a Solr query + # where the param will be in double quotes + return escape(query_param).replace("\\", "\\\\").replace("\"", "\\\"") + def process_hash_list(document_datasets): """ @@ -197,7 +204,7 @@ def process_hash_list(document_datasets): delete_from_solr(solr_cores, file_id, file_hash, { 'q': ('iati_activities_document_id:"{}" AND ' 'iati_identifier_exact:"{}"').format(file_id, - fa['iati_identifier'])}) + escape_param_for_pysolr_delete(fa['iati_identifier']))}) identifiers_seen.append(fa['iati_identifier']) @@ -250,7 +257,7 @@ def process_hash_list(document_datasets): delete_from_solr({element_name: solr_cores[element_name]}, file_id, file_hash, { 'q': ('iati_activities_document_id:"{}" AND ' 'iati_identifier_exact:"{}"').format(file_id, - fa['iati_identifier'])}) + escape_param_for_pysolr_delete(fa['iati_identifier']))}) results = get_explode_element_data(element_name, element_data, fa) addToSolr(element_name, results, file_hash, file_id) From 7bab3808bc4c7827d17c381e932c3e7ccbf5c6d6 Mon Sep 17 00:00:00 2001 From: Simon <6615834+simon-20@users.noreply.github.com> Date: Tue, 27 Feb 2024 18:52:15 +0000 Subject: [PATCH 3/3] Pin pysolr because of the issue with its delete command escaping --- requirements.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.in b/requirements.in index 55998e5..c57fabf 100644 --- a/requirements.in +++ b/requirements.in @@ -6,6 +6,6 @@ azure-storage-queue==12.2.0 lxml psycopg2 requests -pysolr +pysolr==3.9.0 # If this is upgraded, the Solrize needs revisiting, because 3.9 doesn't escape delete calls, so we've done it manually. chardet python-dateutil