Skip to content

Commit

Permalink
Change where Solr deletion happens; change db migration detection; & …
Browse files Browse the repository at this point in the history
…misc

Only delete changed data from Solr just before adding in solrise stage
https://iaticonnect.org/group/9/topic/proposed-update-iati-datastore
New DB column to track if data is in Solr or not

Database Migrations now run on migration var from src/constants/version.py only
#273
This removes need to make sure number var there moves in sync with migrations.
Also fixes bug in isUpgrade(fromVersion, toVersion) by scrapping function entirely.
Previously isUpgrade("1.1.1", "0.2.0") would return True
Not scrapping number var yet, as that is stored in DB and may be used elsewhere.
It is also used in a user agent string.
(That GitHub issue can remain open to look at that later.)

Also remove combined_datasets_toclean var - it is only used once
and we don't need to copy what might be large data sets
  • Loading branch information
jarofgreen committed Aug 30, 2023
1 parent 859944b commit 84c5b00
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/constants/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = {'number': "0.1.24", 'migration': 24}
__version__ = {'number': "0.1.24", 'migration': 25}
28 changes: 6 additions & 22 deletions src/library/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,6 @@ def getDirectConnection(retry_counter=0):
str(e).strip(), retry_counter))
raise e


def isUpgrade(fromVersion, toVersion):
fromSplit = fromVersion.split('.')
toSplit = toVersion.split('.')

if int(fromSplit[0]) < int(toSplit[0]):
return True

if int(fromSplit[1]) < int(toSplit[1]):
return True

if int(fromSplit[2]) < int(toSplit[2]):
return True

return False


def get_current_db_version(conn):
sql = 'SELECT number, migration FROM version LIMIT 1'

Expand All @@ -75,7 +58,7 @@ def checkVersionMatch():
conn.set_session(autocommit=True)
current_db_version = get_current_db_version(conn)

while current_db_version['number'] != __version__['number']:
while current_db_version['migration'] != __version__['migration']:
logger.info('DB version incorrect. Sleeping...')
time.sleep(60)
current_db_version = get_current_db_version(conn)
Expand All @@ -100,17 +83,17 @@ def migrateIfRequired():
'migration': -1
}

if current_db_version['number'] == __version__['number']:
if current_db_version['migration'] == __version__['migration']:
logger.info('DB at correct version')
return

upgrade = isUpgrade(current_db_version['number'], __version__['number'])
upgrade = __version__['migration'] > current_db_version['migration']

if upgrade:
logger.info('DB upgrading to version ' + __version__['number'])
logger.info('DB upgrading to version ' + str(__version__['migration']))
step = 1
else:
logger.info('DB downgrading to version ' + __version__['number'])
logger.info('DB downgrading to version ' + str(__version__['migration']))
step = -1

try:
Expand Down Expand Up @@ -729,6 +712,7 @@ def completeSolrize(conn, id):
sql = """
UPDATE document
SET solrize_end = %(now)s,
last_solrize_end = %(now)s,
solr_api_error = null,
solrize_reindex = 'f'
WHERE id = %(id)s
Expand Down
18 changes: 8 additions & 10 deletions src/library/refresher.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ def clean_datasets(stale_datasets, changed_datasets):
file_id + ' and hash: ' + file_hash)

# clean up source xml and solr for both stale and changed datasets
combined_datasets_toclean = stale_datasets + changed_datasets
if len(combined_datasets_toclean) > 0:
if len(stale_datasets) > 0 or len(changed_datasets) > 0:
logger.info('Removing ' + str(len(stale_datasets)) + ' stale and ' +
str(len(changed_datasets)) + ' changed documents')

Expand Down Expand Up @@ -232,6 +231,9 @@ def clean_datasets(stale_datasets, changed_datasets):
except:
logger.error('Failed to remove stale docs from solr with hash: ' +
file_hash + ' and id: ' + file_id + ' from core with name ' + core_name)
# Maybe we should clean up the last_solrize_end field here, as they are now gone?
# However, we don't have to as if you look at how clean_datasets is called,
# in each case right afterwards the rows are removed from the DB anyway.
except Exception as e:
logger.error(
'Unknown error occurred while attempting to remove stale document ID {} from Source and SOLR'.format(file_id))
Expand All @@ -253,14 +255,10 @@ def clean_datasets(stale_datasets, changed_datasets):
clean_containers_by_id(blob_service_client, file_id, containers=[
config['CLEAN_CONTAINER_NAME']])

# remove from all solr collections
for core_name in solr_cores:
try:
solr_cores[core_name].delete(
q='iati_activities_document_id:' + file_id)
except:
logger.warn('Failed to remove changed docs from solr with hash: ' +
file_hash + ' and id: ' + file_id + ' from core with name ' + core_name)
# Don't remove from any solr collections!
# We want old data to stay in the system until new data is ready to be put in Solr.
# The Solrize stage will delete the old data from Solr.

except Exception as e:
logger.error(
'Unknown error occurred while attempting to remove changed document ID {} from Source and SOLR'.format(file_id))
Expand Down
6 changes: 6 additions & 0 deletions src/migrations/mig_25.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
upgrade = """
ALTER TABLE public.document ADD COLUMN last_solrize_end timestamp without time zone;
"""
downgrade = """
ALTER TABLE public.document DROP COLUMN last_solrize_end;
"""

0 comments on commit 84c5b00

Please sign in to comment.