Skip to content

Commit

Permalink
cleaning clean_and_reparse
Browse files Browse the repository at this point in the history
  • Loading branch information
raftmsohani committed Dec 18, 2024
1 parent 7fb2cde commit 259df3e
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@

logger = logging.getLogger(__name__)

from tdpservice.search_indexes.utils import (
backup,
get_log_context,
assert_sequential_execution,
should_exit,
handle_elastic,
delete_summaries,
delete_associated_models,
count_total_num_records,
calculate_timeout,
handle_datafiles,
handle_elastic_doc_delete
)


class Command(BaseCommand):
"""Command class."""
Expand All @@ -35,73 +49,6 @@ def add_arguments(self, parser):
"fiscal_year/quarter aren't necessary.")
parser.add_argument("-f", "--files", nargs='+', type=str, help="Re-parse specific datafiles by datafile id")

def _get_log_context(self, system_user):
"""Return logger context."""
context = {'user_id': system_user.id,
'action_flag': ADDITION,
'object_repr': "Clean and Reparse"
}
return context

def _backup(self, backup_file_name, log_context):
"""Execute Postgres DB backup."""
try:
logger.info("Beginning reparse DB Backup.")
call_command('backup_db', '-b', '-f', f'{backup_file_name}')
logger.info("Backup complete! Commencing clean and reparse.")

log("Database backup complete.",
logger_context=log_context,
level='info')
except Exception as e:
log("Database backup FAILED. Clean and reparse NOT executed. Database and Elastic are CONSISTENT!",
logger_context=log_context,
level='error')
raise e

def _handle_elastic(self, new_indices, log_context):
"""Create new Elastic indices and delete old ones."""
if new_indices:
try:
logger.info("Creating new elastic indexes.")
call_command('tdp_search_index', '--create', '-f', '--use-alias')
log("Index creation complete.",
logger_context=log_context,
level='info')
except ElasticsearchException as e:
log("Elastic index creation FAILED. Clean and reparse NOT executed. "
"Database is CONSISTENT, Elastic is INCONSISTENT!",
logger_context=log_context,
level='error')
raise e
except Exception as e:
log("Caught generic exception in _handle_elastic. Clean and reparse NOT executed. "
"Database is CONSISTENT, Elastic is INCONSISTENT!",
logger_context=log_context,
level='error')
raise e

def _delete_summaries(self, file_ids, log_context):
"""Raw delete all DataFileSummary objects."""
try:
qset = DataFileSummary.objects.filter(datafile_id__in=file_ids)
count = qset.count()
logger.info(f"Deleting {count} datafile summary objects.")
qset._raw_delete(qset.db)
logger.info("Successfully deleted datafile summary objects.")
except DatabaseError as e:
log('Encountered a DatabaseError while deleting DataFileSummary from Postgres. The database '
'and Elastic are INCONSISTENT! Restore the DB from the backup as soon as possible!',
logger_context=log_context,
level='critical')
raise e
except Exception as e:
log('Caught generic exception while deleting DataFileSummary. The database and Elastic are INCONSISTENT! '
'Restore the DB from the backup as soon as possible!',
logger_context=log_context,
level='critical')
raise e

def __handle_elastic_doc_delete(self, doc, qset, model, elastic_exceptions, new_indices):
"""Delete documents from Elastic and handle exceptions."""
if not new_indices:
Expand All @@ -128,7 +75,7 @@ def _delete_records(self, file_ids, new_indices, log_context):
count = qset.count()
total_deleted += count
logger.info(f"Deleting {count} records of type: {model}.")
self.__handle_elastic_doc_delete(doc, qset, model, elastic_exceptions, new_indices)
handle_elastic_doc_delete(doc, qset, model, elastic_exceptions, new_indices)
qset._raw_delete(qset.db)
except DatabaseError as e:
log(f'Encountered a DatabaseError while deleting records of type {model} from Postgres. The database '
Expand Down Expand Up @@ -174,7 +121,7 @@ def _delete_errors(self, file_ids, log_context):

def _delete_associated_models(self, meta_model, file_ids, new_indices, log_context):
"""Delete all models associated to the selected datafiles."""
self._delete_summaries(file_ids, log_context)
delete_summaries(file_ids, log_context)
self._delete_errors(file_ids, log_context)
num_deleted = self._delete_records(file_ids, new_indices, log_context)
meta_model.num_records_deleted = num_deleted
Expand Down Expand Up @@ -302,10 +249,8 @@ def handle(self, *args, **options):
fiscal_year = options.get('fiscal_year', None)
fiscal_quarter = options.get('fiscal_quarter', None)
reparse_all = options.get('all', False)
print(f'************** reparse all {reparse_all}')
selected_files = options.get('files', None)
selected_files = [int(file) for file in selected_files[0].split(',')] if selected_files else None
print(f'************** selected files {selected_files}')
new_indices = reparse_all is True

# Option that can only be specified by calling `handle` directly and passing it.
Expand Down Expand Up @@ -341,7 +286,7 @@ def handle(self, *args, **options):
system_user, created = User.objects.get_or_create(username='system')
if created:
logger.debug('Created reserved system user.')
log_context = self._get_log_context(system_user)
log_context = get_log_context(system_user)

all_fy = "All"
all_q = "Q1-4"
Expand Down Expand Up @@ -373,13 +318,13 @@ def handle(self, *args, **options):

# Backup the Postgres DB
backup_file_name += f"_rpv{meta_model.pk}.pg"
self._backup(backup_file_name, log_context)
backup(backup_file_name, log_context)

meta_model.db_backup_location = backup_file_name
meta_model.save()

# Create and delete Elastic indices if necessary
self._handle_elastic(new_indices, log_context)
handle_elastic(new_indices, log_context)

# Delete records from Postgres and Elastic if necessary
file_ids = files.values_list('id', flat=True).distinct()
Expand Down
109 changes: 54 additions & 55 deletions tdrs-backend/tdpservice/search_indexes/reparse.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,37 @@
# Re-write management command clean_and_reparse as a function without callinf call_command
# should include all the steps in the management command
#
from django.core.management.base import BaseCommand
from django.core.management import call_command
from django.core.paginator import Paginator
from django.db.utils import DatabaseError
from elasticsearch.exceptions import ElasticsearchException
from tdpservice.data_files.models import DataFile
from tdpservice.parsers.models import DataFileSummary, ParserError
from tdpservice.scheduling import parser_task
from tdpservice.search_indexes.util import DOCUMENTS, count_all_records
from tdpservice.search_indexes.models.reparse_meta import ReparseMeta
from tdpservice.core.utils import log
from django.contrib.admin.models import ADDITION
from tdpservice.users.models import User
from datetime import timedelta
from django.utils import timezone
from django.conf import settings

import logging

logger = logging.getLogger(__name__)

def _get_log_context(system_user):
"""Return logger context."""
context = {'user_id': system_user.id,
'action_flag': ADDITION,
'object_repr': "Clean and Reparse"
}
return context

def _assert_sequential_execution(log_context):
"""Assert that no other reparse commands are still executing."""
latest_meta_model = ReparseMeta.get_latest()
now = timezone.now()
is_not_none = latest_meta_model is not None
if (is_not_none and latest_meta_model.timeout_at is None):
log(f"The latest ReparseMeta model's (ID: {latest_meta_model.pk}) timeout_at field is None. "
"Cannot safely execute reparse, please fix manually.",
logger_context=log_context,
level='error')
return False
if (is_not_none and not ReparseMeta.assert_all_files_done(latest_meta_model) and
not now > latest_meta_model.timeout_at):
log('A previous execution of the reparse command is RUNNING. Cannot execute in parallel, exiting.',
logger_context=log_context,
level='warn')
return False
elif (is_not_none and latest_meta_model.timeout_at is not None and now > latest_meta_model.timeout_at and not
ReparseMeta.assert_all_files_done(latest_meta_model)):
log("Previous reparse has exceeded the timeout. Allowing execution of the command.",
logger_context=log_context,
level='warn')
return True
return True

def _should_exit(condition):
"""Exit on condition."""
if condition:
exit(1)
from tdpservice.search_indexes.utils import (
backup,
get_log_context,
assert_sequential_execution,
should_exit,
handle_elastic,
delete_associated_models,
count_total_num_records,
calculate_timeout,
handle_datafiles,
)

def clean_reparse(selected_file_ids):
"""Reparse selected files."""
selected_files = [int(file_id) for file_id in selected_file_ids[0].split(",")]

files = files.filter(id__in=selected_files)
######
files = DataFile.objects.filter(id__in=selected_files)
backup_file_name = "/tmp/reparsing_backup"
backup_file_name += "_selected_files"
continue_msg = "You have selected to reparse datafiles for FY {fy} and {q}. The reparsed files "
continue_msg = continue_msg.format(fy=f"selected files: {str(selected_files)}", q="Q1-4")

num_files = files.count()
Expand All @@ -74,7 +41,7 @@ def clean_reparse(selected_file_ids):
system_user, created = User.objects.get_or_create(username="system")
if created:
logger.info("Created system user")
log_context = _get_log_context(system_user)
log_context = get_log_context(system_user)

all_fy = "All"
all_q = "Q1-4"
Expand All @@ -83,10 +50,16 @@ def clean_reparse(selected_file_ids):
logger_context=log_context,
level=logging.INFO)

is_sequential = _assert_sequential_execution(log_context)
_should_exit(not is_sequential)
is_sequential = assert_sequential_execution(log_context)
should_exit(not is_sequential)

meta_model = ReparseFileMeta.objects.create(
fiscal_quarter = None
fiscal_year = None
all_reparse = False
new_indices = False


meta_model = ReparseMeta.objects.create(
fiscal_quarter=fiscal_quarter,
fiscal_year=fiscal_year,
all=all_reparse,
Expand All @@ -95,10 +68,36 @@ def clean_reparse(selected_file_ids):

# Backup the Postgres DB
backup_file_name += f"_rpv{meta_model.pk}.pg"
_backup(backup_file_name, log_context)
backup(backup_file_name, log_context)

meta_model.db_backup_location = backup_file_name
meta_model.save()

# Create and delete Elastic indices if necessary
_handle_elastic(new_indices, log_context)
handle_elastic(new_indices, log_context)

file_ids = files.values_list('id', flat=True).distinct()
meta_model.total_num_records_initial = count_total_num_records(log_context)
meta_model.save()

delete_associated_models(meta_model, file_ids, new_indices, log_context)

meta_model.timeout_at = meta_model.created_at + calculate_timeout(
num_files,
meta_model.num_records_deleted)

meta_model.save()
logger.info(f"Deleted a total of {meta_model.num_records_deleted} records across {num_files} files.")

# Delete and re-save datafiles to handle cascading dependencies
logger.info(f'Deleting and re-parsing {num_files} files')
handle_datafiles(files, meta_model, log_context)

log("Database cleansing complete and all files have been re-scheduling for parsing and validation.",
logger_context=log_context,
level='info')
log(f"Clean and reparse command completed. All files for FY {fiscal_year if fiscal_year else all_fy} and "
f"{fiscal_quarter if fiscal_quarter else all_q} have been queued for parsing.",
logger_context=log_context,
level='info')
logger.info('Done. All tasks have been queued to parse the selected datafiles.')
41 changes: 30 additions & 11 deletions tdrs-backend/tdpservice/search_indexes/test/test_reparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@
import os
import time

from tdpservice.search_indexes.utils import (
backup,
assert_sequential_execution,
should_exit,
delete_associated_models,
count_total_num_records,
calculate_timeout,
handle_datafiles,
delete_summaries
)

@pytest.fixture
def cat4_edge_case_file(stt_user, stt):
"""Fixture for cat_4_edge_case.txt."""
Expand Down Expand Up @@ -87,6 +98,16 @@ def parse_files(summary, f1, f2, f3, f4):
f4.save()
return [f1.pk, f2.pk, f3.pk, f4.pk]


# write test for reparse command
@pytest.mark.django_db
def test_reparse_a_file(log_context, dfs, cat4_edge_case_file, big_file, small_ssp_section1_datafile,
tribal_section_1_file):
"""Count total number of files in DB."""
parsed_files = parse_files(dfs, cat4_edge_case_file, big_file, small_ssp_section1_datafile, tribal_section_1_file)
from tdpservice.search_indexes.reparse import clean_reparse
clean_reparse(['1,3'])

@pytest.mark.django_db
def test_count_total_num_records(log_context, dfs, cat4_edge_case_file, big_file,
small_ssp_section1_datafile, tribal_section_1_file):
Expand All @@ -106,7 +127,7 @@ def test_reparse_backup_succeed(log_context, dfs, cat4_edge_case_file, big_file,

cmd = clean_and_reparse.Command()
file_name = "/tmp/test_reparse.pg"
cmd._backup(file_name, log_context)
backup(file_name, log_context)
time.sleep(10)

file_size = os.path.getsize(file_name)
Expand All @@ -119,13 +140,13 @@ def test_reparse_backup_fail(mocker, log_context, dfs, cat4_edge_case_file, big_
parse_files(dfs, cat4_edge_case_file, big_file, small_ssp_section1_datafile, tribal_section_1_file)

mocker.patch(
'tdpservice.search_indexes.management.commands.clean_and_reparse.Command._backup',
'tdpservice.search_indexes.utils.backup',
side_effect=Exception('Backup exception')
)
cmd = clean_and_reparse.Command()
file_name = "/tmp/test_reparse.pg"
with pytest.raises(Exception):
cmd._backup(file_name, log_context)
backup(file_name, log_context)
assert os.path.exists(file_name) is False
exception_msg = LogEntry.objects.latest('pk').change_message
assert exception_msg == ("Database backup FAILED. Clean and reparse NOT executed. Database "
Expand Down Expand Up @@ -160,13 +181,12 @@ class Fake:
@pytest.mark.django_db
def test_delete_summaries_exceptions(mocker, log_context, exc_msg, exception_type):
"""Test summary exception handling."""
mocker.patch(
'tdpservice.search_indexes.management.commands.clean_and_reparse.Command._delete_summaries',
mocked_delete_summaries = mocker.patch(
'tdpservice.search_indexes.utils.delete_summaries',
side_effect=exception_type('Summary delete exception')
)
cmd = clean_and_reparse.Command()
with pytest.raises(exception_type):
cmd._delete_summaries([], log_context)
mocked_delete_summaries([], log_context)
exception_msg = LogEntry.objects.latest('pk').change_message
assert exception_msg == exc_msg

Expand All @@ -179,13 +199,12 @@ def test_delete_summaries_exceptions(mocker, log_context, exc_msg, exception_typ
@pytest.mark.django_db
def test_handle_elastic_exceptions(mocker, log_context, exc_msg, exception_type):
"""Test summary exception handling."""
mocker.patch(
'tdpservice.search_indexes.management.commands.clean_and_reparse.Command._handle_elastic',
mocked_handle_elastic = mocker.patch(
'tdpservice.search_indexes.utils.handle_elastic',
side_effect=exception_type('Summary delete exception')
)
cmd = clean_and_reparse.Command()
with pytest.raises(exception_type):
cmd._handle_elastic([], True, log_context)
mocked_handle_elastic(True, log_context)
exception_msg = LogEntry.objects.latest('pk').change_message
assert exception_msg == exc_msg

Expand Down
Loading

0 comments on commit 259df3e

Please sign in to comment.