Skip to content

Commit

Permalink
Merge pull request #325 from IATI/develop
Browse files Browse the repository at this point in the history
Updates to Solrize algorithm
  • Loading branch information
simon-20 authored Apr 17, 2024
2 parents 77305b4 + 7ffb6e2 commit 0411ff0
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 57 deletions.
2 changes: 1 addition & 1 deletion requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ azure-storage-queue==12.2.0
lxml
psycopg2
requests
pysolr
pysolr==3.9.0 # If this is upgraded, the Solrize needs revisiting, because 3.9 doesn't escape delete calls, so we've done it manually.
chardet
python-dateutil
151 changes: 95 additions & 56 deletions src/library/solrize.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from library.logger import getLogger
from constants.config import config
from azure.storage.blob import BlobServiceClient
from xml.sax.saxutils import escape
import library.db as db
import pysolr
import library.utils as utils
Expand Down Expand Up @@ -87,6 +88,54 @@ def validateLatLon(point_pos):
pass
return None

def delete_from_solr(cores: dict, file_id: str, file_hash: str, query: dict):
"""Delete from the Solr cores"""

for core_name in cores:
# if one of these deletions fails, the docs from last time in
# core which failed to delete will be cleaned up at end of solrize
# process
try:
cores[core_name].delete(**query)
except Exception as e:
e_message = e.args[0] if hasattr(e, 'args') else ''
raise SolrError('DELETING hash: ' + file_hash + ' and id: ' + file_id +
', from collection with name ' + core_name + ': ' + e_message)

def get_blob_data(blob_client: object, conn: object, file_id: str, file_hash: str,
fa: dict, hashed_identifier: str, blob_type: str):
"""Downloads and returns activity blob data of the specified type"""

blob_name = '{}/{}.{}'.format(file_id, hashed_identifier, blob_type)

try:
blob_client = blob_client.get_blob_client(
container=config['ACTIVITIES_LAKE_CONTAINER_NAME'],
blob=blob_name)
downloader = blob_client.download_blob()
except:
db.resetUnfoundLakify(conn, file_id)
raise SolrizeSourceError((
"Could not download {} activity blob: {}"
", file hash: {}, iati-identifier: {}."
" Sending back to Lakify."
).format(blob_type, blob_name, file_hash, fa['iati_identifier'])
)

try:
return utils.get_text_from_blob(downloader, blob_name)
except:
raise SolrizeSourceError((
"Could not identify charset for {} blob: {}"
", file hash: {}, iati-identifier: {}"
).format(blob_type, blob_name, file_hash, fa['iati_identifier']))


def escape_param_for_pysolr_delete(query_param: str):
# escape is used to do XML escaping, since PySolr doesn't do it for delete
# queries, and the \ and " escaping is to escape for a param for a Solr query
# where the param will be in double quotes
return escape(query_param).replace("\\", "\\\\").replace("\"", "\\\"")

def process_hash_list(document_datasets):
"""
Expand Down Expand Up @@ -126,69 +175,46 @@ def process_hash_list(document_datasets):

db.updateSolrizeStartDate(conn, file_id)

logger.info('Removing all docs for doc with hash: ' +
file_hash + ' and id: ' + file_id)

for core_name in solr_cores:
try:
solr_cores[core_name].delete(
q='iati_activities_document_id:' + file_id)
except Exception as e:
e_message = e.args[0] if hasattr(e, 'args') else ''
raise SolrError('DELETING hash: ' + file_hash + ' and id: ' + file_id +
', from collection with name ' + core_name + ': ' + e_message)

logger.info('Adding docs for hash: ' + file_hash + ' and id: ' + file_id)
# check whether existing DS data and new dataset contain only "good" data:
# - flattened activities have any duplicated IATI identifiers in them?
# - existing Solr-docs in solr for this dataset have repeated IDs
new_identifiers = [fa['iati_identifier'] for fa in flattened_activities[0]]
has_dupes = len(set(new_identifiers)) != len(new_identifiers) or \
solr_cores['activity'].search("id:" + file_id + "--*--1", rows=0).hits > 0

identifiers_seen = []
identifier_indices = {}

# log to say which update method to be used, but outside the activity
# processing loop to avoid thousands of log messages for large files
if has_dupes:
logger.info(('File with id: {} (hash: {}) had or has dupes, so removing all '
'activity, budget, transaction Solr-docs at start of processing '
'for each activity').format(file_id, file_hash))
else:
logger.info(('File with id: {} (hash: {}) had and has no dupes, so updating '
'activities in-place and removing budget, transaction Solr-docs '
'on a per-activity basis').format(file_id, file_hash))

for fa in flattened_activities[0]:
hashed_iati_identifier = utils.get_hash_for_identifier(fa['iati_identifier'])
blob_name = '{}/{}.xml'.format(file_id, hashed_iati_identifier)

try:
blob_client = blob_service_client.get_blob_client(
container=config['ACTIVITIES_LAKE_CONTAINER_NAME'],
blob=blob_name)
downloader = blob_client.download_blob()
except:
db.resetUnfoundLakify(conn, file_id)
raise SolrizeSourceError(
'Could not download XML activity blob: ' + blob_name +
', file hash: ' + file_hash +
', iati-identifier: ' + fa['iati_identifier'] +
'. Sending back to Lakify.'
)

try:
fa['iati_xml'] = utils.get_text_from_blob(downloader, blob_name)
except:
raise SolrizeSourceError('Could not identify charset for blob: ' + blob_name +
', file hash: ' + file_hash + ', iati-identifier: ' + fa['iati_identifier'])
# if first time seeing identifier in dataset, delete docs from Solr
if has_dupes and (fa['iati_identifier'] not in identifiers_seen):
delete_from_solr(solr_cores, file_id, file_hash, {
'q': ('iati_activities_document_id:"{}" AND '
'iati_identifier_exact:"{}"').format(file_id,
escape_param_for_pysolr_delete(fa['iati_identifier']))})

json_blob_name = '{}/{}.json'.format(file_id, hashed_iati_identifier)

try:
json_blob_client = blob_service_client.get_blob_client(
container=config['ACTIVITIES_LAKE_CONTAINER_NAME'],
blob=json_blob_name)
json_downloader = json_blob_client.download_blob()
except:
db.resetUnfoundLakify(conn, file_id)
raise SolrizeSourceError(
'Could not download JSON activity blob: ' + json_blob_name +
', file hash: ' + file_hash +
', iati-identifier: ' + fa['iati_identifier'] +
'. Sending back to Lakify.'
)
identifiers_seen.append(fa['iati_identifier'])

try:
fa['iati_json'] = utils.get_text_from_blob(
json_downloader, json_blob_name)
except:
raise SolrizeSourceError('Could not identify charset for blob: ' + blob_name +
', file hash: ' + file_hash + ', iati-identifier: ' + fa['iati_identifier'])
# add the Activity XML/JSON blobs to the flattened activity
fa['iati_xml'] = get_blob_data(blob_service_client, conn, file_id, file_hash,
fa, hashed_iati_identifier, 'xml')
fa['iati_json'] = get_blob_data(blob_service_client, conn, file_id, file_hash,
fa, hashed_iati_identifier, 'json')

# add id and hash
fa['iati_activities_document_id'] = file_id
fa['iati_activities_document_hash'] = file_hash

Expand All @@ -206,7 +232,7 @@ def process_hash_list(document_datasets):
except KeyError:
pass

# Remove sub lists
# Remove sub lists from flattened activity, saving data for use later
sub_list_data = {}
for element_name in explode_elements:
if isinstance(fa.get('@'+element_name), list):
Expand All @@ -225,11 +251,24 @@ def process_hash_list(document_datasets):
del fa['iati_xml']
del fa['iati_json']

# Now index explode_elements
# For budget, transaction cores, delete docs (if data had & has no dupes), then re-add
for element_name, element_data in sub_list_data.items():
if not has_dupes:
delete_from_solr({element_name: solr_cores[element_name]}, file_id, file_hash, {
'q': ('iati_activities_document_id:"{}" AND '
'iati_identifier_exact:"{}"').format(file_id,
escape_param_for_pysolr_delete(fa['iati_identifier']))})
results = get_explode_element_data(element_name, element_data, fa)
addToSolr(element_name, results, file_hash, file_id)

# now do cleanup delete for docs from activities that have been deleted
logger.info(('Removing remaining old Solr-docs for file with id: {} '
'where the hash is not equal to current hash: {}').format(file_id, file_hash))
delete_from_solr(solr_cores, file_id, file_hash, {
'q': ('iati_activities_document_id:"{}" AND '
'NOT(iati_activities_document_hash:"{}")').format(file_id,
file_hash)})

logger.info('Updating DB with successful Solrize for hash: ' +
file_hash + ' and id: ' + file_id)
db.completeSolrize(conn, file_id)
Expand Down

0 comments on commit 0411ff0

Please sign in to comment.