diff --git a/tdrs-backend/tdpservice/data_files/admin/admin.py b/tdrs-backend/tdpservice/data_files/admin/admin.py index 4c9fce07a..eae060c36 100644 --- a/tdrs-backend/tdpservice/data_files/admin/admin.py +++ b/tdrs-backend/tdpservice/data_files/admin/admin.py @@ -14,7 +14,7 @@ class DataFileInline(admin.TabularInline): """Inline model for many to many relationship.""" - model = DataFile.reparse_meta_models.through + model = DataFile.reparses.through can_delete = False ordering = ["-pk"] diff --git a/tdrs-backend/tdpservice/data_files/admin/filters.py b/tdrs-backend/tdpservice/data_files/admin/filters.py index 1429ecd20..9fbb35530 100644 --- a/tdrs-backend/tdpservice/data_files/admin/filters.py +++ b/tdrs-backend/tdpservice/data_files/admin/filters.py @@ -35,7 +35,7 @@ def queryset(self, request, queryset): if self.value() is not None and queryset.exists(): latest_meta = ReparseMeta.get_latest() if latest_meta is not None: - queryset = queryset.filter(reparse_meta_models=latest_meta) + queryset = queryset.filter(reparses=latest_meta) return queryset diff --git a/tdrs-backend/tdpservice/data_files/migrations/0014_reparsefilemeta.py b/tdrs-backend/tdpservice/data_files/migrations/0014_reparsefilemeta.py new file mode 100644 index 000000000..d9cb6ee8b --- /dev/null +++ b/tdrs-backend/tdpservice/data_files/migrations/0014_reparsefilemeta.py @@ -0,0 +1,29 @@ +# Generated by Django 3.2.15 on 2024-10-08 12:18 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('search_indexes', '0031_alter_tribal_tanf_t4_closure_reason'), + ('data_files', '0013_datafile_reparse_meta'), + ] + + operations = [ + migrations.CreateModel( + name='ReparseFileMeta', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('finished', models.BooleanField(default=False)), + ('success', models.BooleanField(default=False)), + ('started_at', models.DateTimeField(null=True)), + ('finished_at', models.DateTimeField(null=True)), + ('num_records_created', models.PositiveIntegerField(default=0)), + ('cat_4_errors_generated', models.PositiveIntegerField(default=0)), + ('data_file', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='reparse_file_metas', to='data_files.datafile')), + ('reparse_meta', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='reparse_file_metas', to='search_indexes.reparsemeta')), + ], + ), + ] diff --git a/tdrs-backend/tdpservice/data_files/migrations/0015_datafile_reparses.py b/tdrs-backend/tdpservice/data_files/migrations/0015_datafile_reparses.py new file mode 100644 index 000000000..c4cdea583 --- /dev/null +++ b/tdrs-backend/tdpservice/data_files/migrations/0015_datafile_reparses.py @@ -0,0 +1,20 @@ +# Generated by Django 3.2.15 on 2024-10-04 12:17 + +from django.db import migrations, models +from tdpservice.data_files.models import ReparseFileMeta + + +class Migration(migrations.Migration): + + dependencies = [ + ('data_files', '0014_reparsefilemeta'), + ('search_indexes', '0031_alter_tribal_tanf_t4_closure_reason'), + ] + + operations = [ + migrations.AddField( + model_name='datafile', + name='reparses', + field=models.ManyToManyField(help_text='Reparse events this file has been associated with.', related_name='files', through="data_files.ReparseFileMeta", to='search_indexes.ReparseMeta'), + ), + ] diff --git a/tdrs-backend/tdpservice/data_files/migrations/0016_remove_datafile_reparse_meta_models.py b/tdrs-backend/tdpservice/data_files/migrations/0016_remove_datafile_reparse_meta_models.py new file mode 100644 index 000000000..8a8796e92 --- /dev/null +++ b/tdrs-backend/tdpservice/data_files/migrations/0016_remove_datafile_reparse_meta_models.py @@ -0,0 +1,38 @@ +# Generated by Django 3.2.15 on 2024-10-04 12:17 + +from django.db import migrations + + +def switch_reparse_meta_through_model(apps, schema_editor): + DataFile=apps.get_model("data_files","DataFile") + ReparseMeta=apps.get_model("search_indexes","ReparseMeta") + OldThru=DataFile.reparse_meta_models.through + ReparseFileMeta=apps.get_model("data_files", "ReparseFileMeta") + + q = OldThru.objects.all() + + print(f'switching {q.count()} through models') + + for m in q: + ReparseFileMeta.objects.create( + data_file_id=m.datafile.pk, + reparse_meta_id=m.reparsemeta.pk + ) + m.delete() + + +class Migration(migrations.Migration): + + dependencies = [ + ('data_files', '0015_datafile_reparses'), + ] + + operations = [ + migrations.RunPython( + switch_reparse_meta_through_model, + ), + migrations.RemoveField( + model_name='datafile', + name='reparse_meta_models', + ), + ] diff --git a/tdrs-backend/tdpservice/data_files/models.py b/tdrs-backend/tdpservice/data_files/models.py index 66d245c87..edf93e75e 100644 --- a/tdrs-backend/tdpservice/data_files/models.py +++ b/tdrs-backend/tdpservice/data_files/models.py @@ -79,6 +79,24 @@ class Meta: # separately extension = models.CharField(max_length=8, default="txt") +class ReparseFileMeta(models.Model): + """Meta data model representing a single file parse within a reparse execution.""" + + data_file = models.ForeignKey('data_files.DataFile', on_delete=models.CASCADE, related_name='reparse_file_metas') + reparse_meta = models.ForeignKey( + 'search_indexes.ReparseMeta', + on_delete=models.CASCADE, + related_name='reparse_file_metas' + ) + + finished = models.BooleanField(default=False) + success = models.BooleanField(default=False) + started_at = models.DateTimeField(auto_now_add=False, null=True) + finished_at = models.DateTimeField(auto_now_add=False, null=True) + + num_records_created = models.PositiveIntegerField(default=0) + cat_4_errors_generated = models.PositiveIntegerField(default=0) + class DataFile(FileRecord): """Represents a version of a data file.""" @@ -153,10 +171,12 @@ class Meta: null=True ) - reparse_meta_models = models.ManyToManyField("search_indexes.ReparseMeta", - help_text="Reparse events this file has been associated with.", - related_name="datafiles" - ) + reparses = models.ManyToManyField( + "search_indexes.ReparseMeta", + through="data_files.ReparseFileMeta", + help_text="Reparse events this file has been associated with.", + related_name="files" + ) @property def prog_type(self): diff --git a/tdrs-backend/tdpservice/data_files/tasks.py b/tdrs-backend/tdpservice/data_files/tasks.py index 16e35de79..0ea5446af 100644 --- a/tdrs-backend/tdpservice/data_files/tasks.py +++ b/tdrs-backend/tdpservice/data_files/tasks.py @@ -13,7 +13,7 @@ def get_stuck_files(): """Return a queryset containing files in a 'stuck' state.""" - stuck_files = DataFile.objects.annotate(reparse_count=Count('reparse_meta_models')).filter( + stuck_files = DataFile.objects.annotate(reparse_count=Count('reparses')).filter( # non-reparse submissions over an hour old Q( reparse_count=0, @@ -22,9 +22,9 @@ def get_stuck_files(): # reparse submissions past the timeout, where the reparse did not complete Q( reparse_count__gt=0, - reparse_meta_models__timeout_at__lte=timezone.now(), - reparse_meta_models__finished=False, - reparse_meta_models__success=False + reparses__timeout_at__lte=timezone.now(), + reparse_file_metas__finished=False, + reparse_file_metas__success=False ) ).filter( # where there is NO summary or the summary is in PENDING status diff --git a/tdrs-backend/tdpservice/data_files/test/test_stuck_files.py b/tdrs-backend/tdpservice/data_files/test/test_stuck_files.py index 95f4f8f3a..10a480ec4 100644 --- a/tdrs-backend/tdpservice/data_files/test/test_stuck_files.py +++ b/tdrs-backend/tdpservice/data_files/test/test_stuck_files.py @@ -31,12 +31,10 @@ def make_summary(datafile, status): ) -def make_reparse_meta(finished, success): +def make_reparse_meta(): """Create a test reparse meta model.""" return ReparseMetaFactory.create( - timeout_at=_time_ago(hours=1), - finished=finished, - success=success + timeout_at=_time_ago(hours=1) ) @@ -54,8 +52,8 @@ def test_find_pending_submissions__none_stuck(stt_user, stt): df2.created_at = _time_ago(hours=1) df2.save() make_summary(df2, DataFileSummary.Status.ACCEPTED) - rpm = make_reparse_meta(True, True) - df2.reparse_meta_models.add(rpm) + rpm = make_reparse_meta() + df2.reparses.add(rpm, through_defaults={'finished': True, 'success': True}) # a pending standard submission, less than an hour old df3 = make_datafile(stt_user, stt, 3) @@ -81,8 +79,8 @@ def test_find_pending_submissions__non_reparse_stuck(stt_user, stt): df2.created_at = _time_ago(hours=1) df2.save() make_summary(df2, DataFileSummary.Status.ACCEPTED) - rpm = make_reparse_meta(True, True) - df2.reparse_meta_models.add(rpm) + rpm = make_reparse_meta() + df2.reparses.add(rpm, through_defaults={'finished': True, 'success': True}) stuck_files = get_stuck_files() assert stuck_files.count() == 1 @@ -102,8 +100,8 @@ def test_find_pending_submissions__non_reparse_stuck__no_dfs(stt_user, stt): df2.created_at = _time_ago(hours=1) df2.save() make_summary(df2, DataFileSummary.Status.ACCEPTED) - rpm = make_reparse_meta(True, True) - df2.reparse_meta_models.add(rpm) + rpm = make_reparse_meta() + df2.reparses.add(rpm, through_defaults={'finished': True, 'success': True}) stuck_files = get_stuck_files() assert stuck_files.count() == 1 @@ -124,8 +122,8 @@ def test_find_pending_submissions__reparse_stuck(stt_user, stt): df2.created_at = _time_ago(hours=1) df2.save() make_summary(df2, DataFileSummary.Status.PENDING) - rpm = make_reparse_meta(False, False) - df2.reparse_meta_models.add(rpm) + rpm = make_reparse_meta() + df2.reparses.add(rpm, through_defaults={'finished': False, 'success': False}) stuck_files = get_stuck_files() assert stuck_files.count() == 1 @@ -145,8 +143,8 @@ def test_find_pending_submissions__reparse_stuck__no_dfs(stt_user, stt): df2 = make_datafile(stt_user, stt, 2) df2.created_at = _time_ago(hours=1) df2.save() - rpm = make_reparse_meta(False, False) - df2.reparse_meta_models.add(rpm) + rpm = make_reparse_meta() + df2.reparses.add(rpm, through_defaults={'finished': False, 'success': False}) stuck_files = get_stuck_files() assert stuck_files.count() == 1 @@ -167,8 +165,8 @@ def test_find_pending_submissions__reparse_and_non_reparse_stuck(stt_user, stt): df2.created_at = _time_ago(hours=1) df2.save() make_summary(df2, DataFileSummary.Status.PENDING) - rpm = make_reparse_meta(False, False) - df2.reparse_meta_models.add(rpm) + rpm = make_reparse_meta() + df2.reparses.add(rpm, through_defaults={'finished': False, 'success': False}) stuck_files = get_stuck_files() assert stuck_files.count() == 2 @@ -188,8 +186,8 @@ def test_find_pending_submissions__reparse_and_non_reparse_stuck_no_dfs(stt_user df2 = make_datafile(stt_user, stt, 2) df2.created_at = _time_ago(hours=1) df2.save() - rpm = make_reparse_meta(False, False) - df2.reparse_meta_models.add(rpm) + rpm = make_reparse_meta() + df2.reparses.add(rpm, through_defaults={'finished': False, 'success': False}) stuck_files = get_stuck_files() assert stuck_files.count() == 2 @@ -207,8 +205,8 @@ def test_find_pending_submissions__old_reparse_stuck__new_not_stuck(stt_user, st dfs1 = make_summary(df1, DataFileSummary.Status.PENDING) # reparse fails the first time - rpm1 = make_reparse_meta(False, False) - df1.reparse_meta_models.add(rpm1) + rpm1 = make_reparse_meta() + df1.reparses.add(rpm1, through_defaults={'finished': False, 'success': False}) stuck_files = get_stuck_files() assert stuck_files.count() == 1 @@ -217,8 +215,8 @@ def test_find_pending_submissions__old_reparse_stuck__new_not_stuck(stt_user, st dfs1.delete() # reparse deletes the original dfs and creates the new one make_summary(df1, DataFileSummary.Status.ACCEPTED) - rpm2 = make_reparse_meta(True, True) - df1.reparse_meta_models.add(rpm2) + rpm2 = make_reparse_meta() + df1.reparses.add(rpm2, through_defaults={'finished': True, 'success': True}) stuck_files = get_stuck_files() assert stuck_files.count() == 0 @@ -234,8 +232,8 @@ def test_find_pending_submissions__new_reparse_stuck__old_not_stuck(stt_user, st dfs1 = make_summary(df1, DataFileSummary.Status.REJECTED) # reparse succeeds - rpm1 = make_reparse_meta(True, True) - df1.reparse_meta_models.add(rpm1) + rpm1 = make_reparse_meta() + df1.reparses.add(rpm1, through_defaults={'finished': True, 'success': True}) # reparse again, fails this time dfs1.delete() # reparse deletes the original dfs and creates the new one @@ -244,8 +242,8 @@ def test_find_pending_submissions__new_reparse_stuck__old_not_stuck(stt_user, st status=DataFileSummary.Status.PENDING, ) - rpm2 = make_reparse_meta(False, False) - df1.reparse_meta_models.add(rpm2) + rpm2 = make_reparse_meta() + df1.reparses.add(rpm2, through_defaults={'finished': False, 'success': False}) stuck_files = get_stuck_files() assert stuck_files.count() == 1 diff --git a/tdrs-backend/tdpservice/parsers/parse.py b/tdrs-backend/tdpservice/parsers/parse.py index b2b9f0445..187787745 100644 --- a/tdrs-backend/tdpservice/parsers/parse.py +++ b/tdrs-backend/tdpservice/parsers/parse.py @@ -13,7 +13,6 @@ from tdpservice.parsers.schema_defs.utils import get_section_reference, get_program_model from tdpservice.parsers.case_consistency_validator import CaseConsistencyValidator from tdpservice.parsers.util import log_parser_exception -from tdpservice.search_indexes.models.reparse_meta import ReparseMeta logger = logging.getLogger(__name__) @@ -34,7 +33,6 @@ def parse_datafile(datafile, dfs): logger.info(f"Preparser Error: {len(header_errors)} header errors encountered.") errors['header'] = header_errors bulk_create_errors({1: header_errors}, 1, flush=True) - update_meta_model(datafile, dfs) return errors elif header_is_valid and len(header_errors) > 0: logger.info(f"Preparser Warning: {len(header_errors)} header warnings encountered.") @@ -75,7 +73,6 @@ def parse_datafile(datafile, dfs): f"({header['program_type']}) and FIPS Code ({field_values['state_fips']}).",) errors['header'] = [tribe_error] bulk_create_errors({1: [tribe_error]}, 1, flush=True) - update_meta_model(datafile, dfs) return errors # Ensure file section matches upload section @@ -90,7 +87,6 @@ def parse_datafile(datafile, dfs): errors['document'] = [section_error] unsaved_parser_errors = {1: [section_error]} bulk_create_errors(unsaved_parser_errors, 1, flush=True) - update_meta_model(datafile, dfs) return errors rpt_month_year_is_valid, rpt_month_year_error = category1.validate_header_rpt_month_year( @@ -103,7 +99,6 @@ def parse_datafile(datafile, dfs): errors['document'] = [rpt_month_year_error] unsaved_parser_errors = {1: [rpt_month_year_error]} bulk_create_errors(unsaved_parser_errors, 1, flush=True) - update_meta_model(datafile, dfs) return errors line_errors = parse_datafile_lines(datafile, dfs, program_type, section, is_encrypted, case_consistency_validator) @@ -112,11 +107,6 @@ def parse_datafile(datafile, dfs): return errors -def update_meta_model(datafile, dfs): - """Update appropriate meta models.""" - ReparseMeta.increment_records_created(datafile.reparse_meta_models, dfs.total_number_of_records_created) - ReparseMeta.increment_files_completed(datafile.reparse_meta_models) - def bulk_create_records(unsaved_records, line_number, header_count, datafile, dfs, flush=False): """Bulk create passed in records.""" batch_size = settings.BULK_CREATE_BATCH_SIZE @@ -385,7 +375,6 @@ def parse_datafile_lines(datafile, dfs, program_type, section, is_encrypted, cas rollback_records(unsaved_records.get_bulk_create_struct(), datafile) rollback_parser_errors(datafile) bulk_create_errors(preparse_error, num_errors, flush=True) - update_meta_model(datafile, dfs) return errors if prev_sum != header_count + trailer_count: @@ -448,7 +437,6 @@ def parse_datafile_lines(datafile, dfs, program_type, section, is_encrypted, cas rollback_parser_errors(datafile) preparse_error = {line_number: [err_obj]} bulk_create_errors(preparse_error, num_errors, flush=True) - update_meta_model(datafile, dfs) return errors should_remove = validate_case_consistency(case_consistency_validator) @@ -469,7 +457,6 @@ def parse_datafile_lines(datafile, dfs, program_type, section, is_encrypted, cas logger.error(f"Not all parsed records created for file: {datafile.id}!") rollback_records(unsaved_records.get_bulk_create_struct(), datafile) bulk_create_errors(unsaved_parser_errors, num_errors, flush=True) - update_meta_model(datafile, dfs) return errors # Add any generated cat4 errors to our error data structure & clear our caches errors list @@ -486,8 +473,6 @@ def parse_datafile_lines(datafile, dfs, program_type, section, is_encrypted, cas f"validated {case_consistency_validator.total_cases_validated} of them.") dfs.save() - update_meta_model(datafile, dfs) - return errors diff --git a/tdrs-backend/tdpservice/parsers/test/factories.py b/tdrs-backend/tdpservice/parsers/test/factories.py index c0f50e85b..5b952d02d 100644 --- a/tdrs-backend/tdpservice/parsers/test/factories.py +++ b/tdrs-backend/tdpservice/parsers/test/factories.py @@ -17,11 +17,6 @@ class Meta: model = "search_indexes.ReparseMeta" timeout_at = timezone.now() - finished = False - success = False - num_files_to_reparse = 1 - files_completed = 1 - files_failed = 0 class ParsingFileFactory(factory.django.DjangoModelFactory): diff --git a/tdrs-backend/tdpservice/scheduling/parser_task.py b/tdrs-backend/tdpservice/scheduling/parser_task.py index 2b1fb3d51..06d0f7b21 100644 --- a/tdrs-backend/tdpservice/scheduling/parser_task.py +++ b/tdrs-backend/tdpservice/scheduling/parser_task.py @@ -2,12 +2,13 @@ from __future__ import absolute_import from celery import shared_task import logging +from django.utils import timezone from django.contrib.auth.models import Group from django.db.utils import DatabaseError from tdpservice.users.models import AccountApprovalStatusChoices, User -from tdpservice.data_files.models import DataFile +from tdpservice.data_files.models import DataFile, ReparseFileMeta from tdpservice.parsers.parse import parse_datafile -from tdpservice.parsers.models import DataFileSummary, ParserErrorCategoryChoices +from tdpservice.parsers.models import DataFileSummary, ParserErrorCategoryChoices, ParserError from tdpservice.parsers.aggregates import case_aggregates_by_month, total_errors_by_month from tdpservice.parsers.util import log_parser_exception, make_generate_parser_error from tdpservice.email.helpers.data_file import send_data_submitted_email @@ -17,8 +18,16 @@ logger = logging.getLogger(__name__) +def set_reparse_file_meta_model_failed_state(file_meta): + """Set ReparseFileMeta fields to indicate a parse failure.""" + file_meta.finished = True + file_meta.success = False + file_meta.finished_at = timezone.now() + file_meta.save() + + @shared_task -def parse(data_file_id, should_send_submission_email=True): +def parse(data_file_id, reparse_id=None): """Send data file for processing.""" # passing the data file FileField across redis was rendering non-serializable failures, doing the below lookup # to avoid those. I suppose good practice to not store/serializer large file contents in memory when stored in redis @@ -27,6 +36,12 @@ def parse(data_file_id, should_send_submission_email=True): data_file = DataFile.objects.get(id=data_file_id) logger.info(f"DataFile parsing started for file {data_file.filename}") + file_meta = None + if reparse_id: + file_meta = ReparseFileMeta.objects.get(data_file_id=data_file_id, reparse_meta_id=reparse_id) + file_meta.started_at = timezone.now() + file_meta.save() + dfs = DataFileSummary.objects.create(datafile=data_file, status=DataFileSummary.Status.PENDING) errors = parse_datafile(data_file, dfs) dfs.status = dfs.get_status() @@ -41,7 +56,18 @@ def parse(data_file_id, should_send_submission_email=True): logger.info(f"Parsing finished for file -> {repr(data_file)} with status " f"{dfs.status} and {len(errors)} errors.") - if should_send_submission_email is True: + if reparse_id is not None: + file_meta.num_records_created = dfs.total_number_of_records_created + file_meta.cat_4_errors_generated = ParserError.objects.filter( + file_id=data_file_id, + error_type=ParserErrorCategoryChoices.CASE_CONSISTENCY + ).count() + file_meta.finished = True + file_meta.success = True + file_meta.finished_at = timezone.now() + file_meta.save() + ReparseMeta.set_total_num_records_post(ReparseMeta.objects.get(pk=reparse_id)) + else: recipients = User.objects.filter( stt=data_file.stt, account_approval_status=AccountApprovalStatusChoices.APPROVED, @@ -54,7 +80,8 @@ def parse(data_file_id, should_send_submission_email=True): f"Encountered Database exception in parser_task.py: \n{e}", "error" ) - ReparseMeta.increment_files_failed(data_file.reparse_meta_models) + if reparse_id: + set_reparse_file_meta_model_failed_state(file_meta) except Exception as e: generate_error = make_generate_parser_error(data_file, None) error = generate_error(schema=None, @@ -72,4 +99,5 @@ def parse(data_file_id, should_send_submission_email=True): (f"Uncaught exception while parsing datafile: {data_file.pk}! Please review the logs to " f"see if manual intervention is required. Exception: \n{e}"), "critical") - ReparseMeta.increment_files_failed(data_file.reparse_meta_models) + if reparse_id: + set_reparse_file_meta_model_failed_state(file_meta) diff --git a/tdrs-backend/tdpservice/search_indexes/admin/reparse_meta.py b/tdrs-backend/tdpservice/search_indexes/admin/reparse_meta.py index f030501f8..4ea731475 100644 --- a/tdrs-backend/tdpservice/search_indexes/admin/reparse_meta.py +++ b/tdrs-backend/tdpservice/search_indexes/admin/reparse_meta.py @@ -1,4 +1,4 @@ -"""ModelAdmin classes for parsed SSP data files.""" +"""ModelAdmin class for the ReparseMeta model.""" from .mixins import ReadOnlyAdminMixin from tdpservice.data_files.admin.admin import DataFileInline @@ -8,18 +8,37 @@ class ReparseMetaAdmin(ReadOnlyAdminMixin): inlines = [DataFileInline] + def reparse_is_finished(self, instance): + """Overload instance property for ui checkboxes.""" + return instance.is_finished + reparse_is_finished.boolean = True + + def reparse_is_success(self, instance): + """Overload instance property for ui checkboxes.""" + return instance.is_success + reparse_is_success.boolean = True + list_display = [ 'id', 'created_at', 'timeout_at', - 'success', - 'finished', + 'reparse_is_finished', + 'reparse_is_success', 'db_backup_location', ] list_filter = [ - 'success', - 'finished', 'fiscal_year', 'fiscal_quarter', ] + + readonly_fields = [ + 'reparse_is_finished', + 'reparse_is_success', + 'finished_at', + 'num_files', + 'num_files_completed', + 'num_files_succeeded', + 'num_files_failed', + 'num_records_created', + ] diff --git a/tdrs-backend/tdpservice/search_indexes/management/commands/clean_and_reparse.py b/tdrs-backend/tdpservice/search_indexes/management/commands/clean_and_reparse.py index d0c7a9934..48d4cf3fe 100644 --- a/tdrs-backend/tdpservice/search_indexes/management/commands/clean_and_reparse.py +++ b/tdrs-backend/tdpservice/search_indexes/management/commands/clean_and_reparse.py @@ -182,9 +182,9 @@ def _handle_datafiles(self, files, meta_model, log_context): """Delete, re-save, and reparse selected datafiles.""" for file in files: try: - file.reparse_meta_models.add(meta_model) + file.reparses.add(meta_model) file.save() - parser_task.parse.delay(file.pk, should_send_submission_email=False) + parser_task.parse.delay(file.pk, reparse_id=meta_model.pk) except DatabaseError as e: log('Encountered a DatabaseError while re-creating datafiles. The database ' 'and Elastic are INCONSISTENT! Restore the DB from the backup as soon as possible!', @@ -341,8 +341,7 @@ def handle(self, *args, **options): fiscal_year=fiscal_year, all=reparse_all, new_indices=new_indices, - delete_old_indices=new_indices, - num_files_to_reparse=num_files) + delete_old_indices=new_indices) # Backup the Postgres DB backup_file_name += f"_rpv{meta_model.pk}.pg" diff --git a/tdrs-backend/tdpservice/search_indexes/management/commands/tdp_search_index.py b/tdrs-backend/tdpservice/search_indexes/management/commands/tdp_search_index.py index a531ae558..c14a302a1 100644 --- a/tdrs-backend/tdpservice/search_indexes/management/commands/tdp_search_index.py +++ b/tdrs-backend/tdpservice/search_indexes/management/commands/tdp_search_index.py @@ -31,7 +31,7 @@ def __get_log_context(self): def __get_index_suffix(self): meta_model = ReparseMeta.get_latest() - if meta_model is not None and not meta_model.finished: + if meta_model is not None and not meta_model.is_finished: return f"_rpv{meta_model.pk}" fmt = "%Y-%m-%d_%H.%M.%S" return f"_{datetime.now().strftime(fmt)}" diff --git a/tdrs-backend/tdpservice/search_indexes/migrations/0032_auto_20241008_1745.py b/tdrs-backend/tdpservice/search_indexes/migrations/0032_auto_20241008_1745.py new file mode 100644 index 000000000..4724f0a3f --- /dev/null +++ b/tdrs-backend/tdpservice/search_indexes/migrations/0032_auto_20241008_1745.py @@ -0,0 +1,37 @@ +# Generated by Django 3.2.15 on 2024-10-08 17:45 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('search_indexes', '0031_alter_tribal_tanf_t4_closure_reason'), + ] + + operations = [ + migrations.RemoveField( + model_name='reparsemeta', + name='files_completed', + ), + migrations.RemoveField( + model_name='reparsemeta', + name='files_failed', + ), + migrations.RemoveField( + model_name='reparsemeta', + name='finished', + ), + migrations.RemoveField( + model_name='reparsemeta', + name='num_files_to_reparse', + ), + migrations.RemoveField( + model_name='reparsemeta', + name='num_records_created', + ), + migrations.RemoveField( + model_name='reparsemeta', + name='success', + ), + ] diff --git a/tdrs-backend/tdpservice/search_indexes/models/reparse_meta.py b/tdrs-backend/tdpservice/search_indexes/models/reparse_meta.py index ddbf4ce4a..a12d7b5b8 100644 --- a/tdrs-backend/tdpservice/search_indexes/models/reparse_meta.py +++ b/tdrs-backend/tdpservice/search_indexes/models/reparse_meta.py @@ -1,7 +1,6 @@ """Meta data model for tracking reparsed files.""" -from django.db import models, transaction -from django.db.utils import DatabaseError +from django.db import models from django.db.models import Max from tdpservice.search_indexes.util import count_all_records import logging @@ -25,28 +24,66 @@ class Meta: created_at = models.DateTimeField(auto_now_add=True) timeout_at = models.DateTimeField(auto_now_add=False, null=True) - finished = models.BooleanField(default=False) - success = models.BooleanField(default=False, help_text="All files completed parsing.") - - num_files_to_reparse = models.PositiveIntegerField(default=0) - files_completed = models.PositiveIntegerField(default=0) - files_failed = models.PositiveIntegerField(default=0) - num_records_deleted = models.PositiveIntegerField(default=0) - num_records_created = models.PositiveIntegerField(default=0) - total_num_records_initial = models.PositiveBigIntegerField(default=0) total_num_records_post = models.PositiveBigIntegerField(default=0) db_backup_location = models.CharField(max_length=512) - # Options used to select the files to reparse + # Options used to select the files to reparse (from mgmt cmd only, remove if command deprecated) fiscal_quarter = models.CharField(max_length=2, null=True) fiscal_year = models.PositiveIntegerField(null=True) all = models.BooleanField(default=False) new_indices = models.BooleanField(default=False) delete_old_indices = models.BooleanField(default=False) + @property + def is_finished(self): + """Return True if all associated ReparseFileMeta objects are finished.""" + if self.num_files > 0: + return all([r.finished for r in self.reparse_file_metas.all()]) + return False + + @property + def is_success(self): + """Return True if all associated ReparseFileMeta objects are successful.""" + if self.is_finished: + return all([r.success for r in self.reparse_file_metas.all()]) + return False + + @property + def finished_at(self): + """Return the finished_at timestamp of the last ReparseFileMeta object.""" + last_parse = self.reparse_file_metas.order_by('-finished_at').first() + return last_parse.finished_at if last_parse else None + + @property + def num_files(self): + """Return the number of associated ReparseFileMeta objects.""" + return self.reparse_file_metas.count() + + @property + def num_files_completed(self): + """Return the number of completed ReparseFileMeta objects.""" + return self.reparse_file_metas.filter(finished=True).count() + + @property + def num_files_succeeded(self): + """Return the number of successful ReparseFileMeta objects.""" + return self.reparse_file_metas.filter(finished=True, success=True).count() + + @property + def num_files_failed(self): + """Return the number of failed ReparseFileMeta objects.""" + return self.reparse_file_metas.filter(finished=True, success=False).count() + + @property + def num_records_created(self): + """Return the sum of records created for all associated ReparseFileMeta objects.""" + return sum([r.num_records_created for r in self.reparse_file_metas.all()]) + + # remove unused statics or change to utils funcs in own app and/or make new cleanup ticket for future + @staticmethod def file_counts_match(meta_model): """ @@ -56,11 +93,10 @@ def file_counts_match(meta_model): containing this model has not been locked the caller will experience race issues. """ print("\n\nINSIDE FILE COUNTS MATCH:") - print(f"{meta_model.num_files_to_reparse }, {meta_model.files_completed}, {meta_model.files_failed}\n\n") - return (meta_model.files_completed == meta_model.num_files_to_reparse or - meta_model.files_completed + meta_model.files_failed == - meta_model.num_files_to_reparse or - meta_model.files_failed == meta_model.num_files_to_reparse) + print(f"{meta_model.num_files }, {meta_model.num_files_completed}, {meta_model.num_files_failed}\n\n") + return (meta_model.num_files_completed == meta_model.num_files or + meta_model.num_files_completed + meta_model.num_files_failed == + meta_model.num_files or meta_model.num_files_failed == meta_model.num_files) @staticmethod def assert_all_files_done(meta_model): @@ -70,84 +106,10 @@ def assert_all_files_done(meta_model): This function assumes the meta_model has been passed in a distributed/thread safe way. If the database row containing this model has not been locked the caller will experience race issues. """ - if meta_model.finished and ReparseMeta.file_counts_match(meta_model): + if meta_model.is_finished and ReparseMeta.file_counts_match(meta_model): return True return False - @staticmethod - def set_reparse_finished(meta_model): - """ - Set status/completion fields to appropriate values. - - This function assumes the meta_model has been passed in a distributed/thread safe way. If the database row - containing this model has not been locked the caller will experience race issues. - """ - meta_model.finished = True - meta_model.success = meta_model.files_completed == meta_model.num_files_to_reparse - meta_model.total_num_records_post = count_all_records() - meta_model.save() - - @staticmethod - def increment_files_completed(reparse_meta_models): - """ - Increment the count of files that have completed parsing for the datafile's current/latest reparse model. - - Because this function can be called in parallel we use `select_for_update` because multiple parse tasks can - referrence the same ReparseMeta object that is being queried below. `select_for_update` provides a DB lock on - the object and forces other transactions on the object to wait until this one completes. - """ - if reparse_meta_models.exists(): - with transaction.atomic(): - try: - meta_model = reparse_meta_models.select_for_update().latest("pk") - meta_model.files_completed += 1 - if ReparseMeta.file_counts_match(meta_model): - ReparseMeta.set_reparse_finished(meta_model) - meta_model.save() - except DatabaseError: - logger.exception("Encountered exception while trying to update the `files_reparsed` field on the " - f"ReparseMeta object with ID: {meta_model.pk}.") - - @staticmethod - def increment_files_failed(reparse_meta_models): - """ - Increment the count of files that failed parsing for the datafile's current/latest reparse meta model. - - Because this function can be called in parallel we use `select_for_update` because multiple parse tasks can - referrence the same ReparseMeta object that is being queried below. `select_for_update` provides a DB lock on - the object and forces other transactions on the object to wait until this one completes. - """ - if reparse_meta_models.exists(): - with transaction.atomic(): - try: - meta_model = reparse_meta_models.select_for_update().latest("pk") - meta_model.files_failed += 1 - if ReparseMeta.file_counts_match(meta_model): - ReparseMeta.set_reparse_finished(meta_model) - meta_model.save() - except DatabaseError: - logger.exception("Encountered exception while trying to update the `files_failed` field on the " - f"ReparseMeta object with ID: {meta_model.pk}.") - - @staticmethod - def increment_records_created(reparse_meta_models, num_created): - """ - Increment the count of records created for the datafile's current/latest reparse meta model. - - Because this function can be called in parallel we use `select_for_update` because multiple parse tasks can - referrence the same ReparseMeta object that is being queried below. `select_for_update` provides a DB lock on - the object and forces other transactions on the object to wait until this one completes. - """ - if reparse_meta_models.exists(): - with transaction.atomic(): - try: - meta_model = reparse_meta_models.select_for_update().latest("pk") - meta_model.num_records_created += num_created - meta_model.save() - except DatabaseError: - logger.exception("Encountered exception while trying to update the `files_failed` field on the " - f"ReparseMeta object with ID: {meta_model.pk}.") - @staticmethod def get_latest(): """Get the ReparseMeta model with the greatest pk.""" @@ -155,3 +117,10 @@ def get_latest(): if max_pk.get("pk__max", None) is None: return None return ReparseMeta.objects.get(pk=max_pk["pk__max"]) + + @staticmethod + def set_total_num_records_post(meta_model): + """Update the total_num_records_post field once reparse has completed.""" + if meta_model.is_finished: + meta_model.total_num_records_post = count_all_records() + meta_model.save() diff --git a/tdrs-backend/tdpservice/search_indexes/test/test_reparse.py b/tdrs-backend/tdpservice/search_indexes/test/test_reparse.py index 2c8647cea..54d49aedb 100644 --- a/tdrs-backend/tdpservice/search_indexes/test/test_reparse.py +++ b/tdrs-backend/tdpservice/search_indexes/test/test_reparse.py @@ -6,6 +6,7 @@ from tdpservice.search_indexes.management.commands import clean_and_reparse from tdpservice.search_indexes.models.reparse_meta import ReparseMeta from tdpservice.users.models import User +from tdpservice.data_files.models import ReparseFileMeta from django.contrib.admin.models import LogEntry, ADDITION from django.db.utils import DatabaseError @@ -265,7 +266,7 @@ def test_reparse_dunce(): assert ReparseMeta.objects.count() == 0 @pytest.mark.django_db -def test_reparse_sequential(log_context): +def test_reparse_sequential(log_context, big_file): """Test reparse _assert_sequential_execution.""" cmd = clean_and_reparse.Command() assert True is cmd._assert_sequential_execution(log_context) @@ -278,6 +279,7 @@ def test_reparse_sequential(log_context): "safely execute reparse, please fix manually." ) + big_file.reparses.add(meta) meta.timeout_at = timezone.now() + timedelta(seconds=100) meta.save() assert False is cmd._assert_sequential_execution(log_context) @@ -287,6 +289,7 @@ def test_reparse_sequential(log_context): meta.timeout_at = timezone.now() meta.save() + assert True is cmd._assert_sequential_execution(log_context) timeout_entry = LogEntry.objects.latest('pk') assert timeout_entry.change_message == ("Previous reparse has exceeded the timeout. Allowing " @@ -308,7 +311,7 @@ def test_reparse_quarter_and_year(mocker, dfs, cat4_edge_case_file, big_file, sm cmd.handle(**opts) latest = ReparseMeta.objects.select_for_update().latest("pk") - assert latest.num_files_to_reparse == 1 + assert latest.num_files == 1 assert latest.num_records_deleted == 3073 @pytest.mark.django_db() @@ -327,7 +330,7 @@ def test_reparse_quarter(mocker, dfs, cat4_edge_case_file, big_file, small_ssp_s cmd.handle(**opts) latest = ReparseMeta.objects.select_for_update().latest("pk") - assert latest.num_files_to_reparse == 4 + assert latest.num_files == 4 assert latest.num_records_deleted == 3104 @pytest.mark.django_db() @@ -346,7 +349,7 @@ def test_reparse_year(mocker, dfs, cat4_edge_case_file, big_file, small_ssp_sect cmd.handle(**opts) latest = ReparseMeta.objects.select_for_update().latest("pk") - assert latest.num_files_to_reparse == 2 + assert latest.num_files == 2 assert latest.num_records_deleted == 27 @pytest.mark.django_db() @@ -365,7 +368,7 @@ def test_reparse_all(mocker, dfs, cat4_edge_case_file, big_file, small_ssp_secti cmd.handle(**opts) latest = ReparseMeta.objects.select_for_update().latest("pk") - assert latest.num_files_to_reparse == 4 + assert latest.num_files == 4 assert latest.num_records_deleted == 3104 @pytest.mark.django_db() @@ -387,97 +390,85 @@ def test_reparse_no_files(mocker): "Quarter: Q1-4. Nothing to do.") @pytest.mark.django_db() -def test_mm_all_files_done(): +def test_mm_all_files_done(big_file): """Test meta model all files done.""" meta_model = ReparseMeta.objects.create() + big_file.reparses.add(meta_model) assert ReparseMeta.assert_all_files_done(meta_model) is False - meta_model.finished = True - meta_model.files_completed = 1 - meta_model.num_files_to_reparse = 1 + fm = ReparseFileMeta.objects.get(data_file_id=big_file.pk, reparse_meta_id=meta_model.pk) + fm.finished = True + fm.save() assert ReparseMeta.assert_all_files_done(meta_model) is True @pytest.mark.django_db() -def test_mm_increment_files_completed(big_file): +def test_mm_files_completed(big_file): """Test meta model increment files completed.""" - meta_model = ReparseMeta.objects.create(num_files_to_reparse=2, all=True) - big_file.reparse_meta_models.add(meta_model) + meta_model = ReparseMeta.objects.create(all=True) + big_file.reparses.add(meta_model) big_file.save() - ReparseMeta.increment_files_completed(big_file.reparse_meta_models) meta_model = ReparseMeta.get_latest() - assert meta_model.finished is False - assert meta_model.files_completed == 1 - assert meta_model.files_failed == 0 + assert meta_model.is_finished is False + assert meta_model.num_files == 1 + assert meta_model.num_files_completed == 0 + assert meta_model.num_files_failed == 0 + assert ReparseMeta.assert_all_files_done(meta_model) is False - ReparseMeta.increment_files_completed(big_file.reparse_meta_models) + fm = ReparseFileMeta.objects.get(data_file_id=big_file.pk, reparse_meta_id=meta_model.pk) + fm.finished = True + fm.success = True + fm.save() meta_model = ReparseMeta.get_latest() - assert meta_model.finished is True - assert meta_model.files_completed == 2 - assert meta_model.files_failed == 0 + assert meta_model.is_finished is True + assert meta_model.num_files == 1 + assert meta_model.num_files_completed == 1 + assert meta_model.num_files_failed == 0 - assert meta_model.success is True + assert meta_model.is_success is True assert ReparseMeta.assert_all_files_done(meta_model) is True @pytest.mark.django_db() -def test_mm_increment_files_failed(big_file): +def test_mm_files_failed(big_file): """Test meta model increment files failed.""" - meta_model = ReparseMeta.objects.create(num_files_to_reparse=2, all=True) - big_file.reparse_meta_models.add(meta_model) - big_file.save() - - ReparseMeta.increment_files_failed(big_file.reparse_meta_models) - meta_model = ReparseMeta.get_latest() - assert meta_model.finished is False - assert meta_model.files_completed == 0 - assert meta_model.files_failed == 1 - - ReparseMeta.increment_files_failed(big_file.reparse_meta_models) - meta_model = ReparseMeta.get_latest() - assert meta_model.finished is True - assert meta_model.files_completed == 0 - assert meta_model.files_failed == 2 - - assert meta_model.success is False - - assert ReparseMeta.assert_all_files_done(meta_model) is True - -@pytest.mark.django_db() -def test_mm_increment_files_failed_and_passed(big_file): - """Test meta model both increment failed and passed files.""" - meta_model = ReparseMeta.objects.create(num_files_to_reparse=2, all=True) - big_file.reparse_meta_models.add(meta_model) + meta_model = ReparseMeta.objects.create(all=True) + big_file.reparses.add(meta_model) big_file.save() - ReparseMeta.increment_files_completed(big_file.reparse_meta_models) meta_model = ReparseMeta.get_latest() - assert meta_model.finished is False - assert meta_model.files_completed == 1 - assert meta_model.files_failed == 0 + assert meta_model.is_finished is False + assert meta_model.num_files_completed == 0 + assert meta_model.num_files_failed == 0 + assert ReparseMeta.assert_all_files_done(meta_model) is False - ReparseMeta.increment_files_failed(big_file.reparse_meta_models) + fm = ReparseFileMeta.objects.get(data_file_id=big_file.pk, reparse_meta_id=meta_model.pk) + fm.finished = True + fm.save() meta_model = ReparseMeta.get_latest() - assert meta_model.finished is True - assert meta_model.files_completed == 1 - assert meta_model.files_failed == 1 + assert meta_model.is_finished is True + assert meta_model.num_files_completed == 1 + assert meta_model.num_files_failed == 1 - assert meta_model.success is False + assert meta_model.is_success is False assert ReparseMeta.assert_all_files_done(meta_model) is True @pytest.mark.django_db() def test_mm_increment_records_created(big_file): """Test meta model increment records created.""" - meta_model = ReparseMeta.objects.create(num_files_to_reparse=2, all=True) - big_file.reparse_meta_models.add(meta_model) + meta_model = ReparseMeta.objects.create(all=True) + big_file.reparses.add(meta_model) big_file.save() - ReparseMeta.increment_records_created(big_file.reparse_meta_models, 500) meta_model = ReparseMeta.get_latest() - assert meta_model.num_records_created == 500 + assert meta_model.num_records_created == 0 - ReparseMeta.increment_records_created(big_file.reparse_meta_models, 888) + fm = ReparseFileMeta.objects.get(data_file_id=big_file.pk, reparse_meta_id=meta_model.pk) + fm.finished = True + fm.success = True + fm.num_records_created = 1388 + fm.save() meta_model = ReparseMeta.get_latest() assert meta_model.num_records_created == 1388 @@ -492,18 +483,37 @@ def test_mm_get_latest(): assert ReparseMeta.get_latest() != meta1 @pytest.mark.django_db() -def test_mm_file_counts_match(): +def test_mm_file_counts_match(big_file): """Test meta model file counts match.""" - meta_model = ReparseMeta.objects.create(num_files_to_reparse=2) + meta_model = ReparseMeta.objects.create() + big_file.reparses.add(meta_model) + big_file.save() assert ReparseMeta.file_counts_match(meta_model) is False - meta_model.files_completed = 2 + fm = ReparseFileMeta.objects.get(data_file_id=big_file.pk, reparse_meta_id=meta_model.pk) + fm.finished = True + fm.save() assert ReparseMeta.file_counts_match(meta_model) is True - meta_model.files_completed = 0 - meta_model.files_failed = 2 - assert ReparseMeta.file_counts_match(meta_model) is True +@pytest.mark.django_db() +def test_reparse_finished_success_false_before_file_queue(big_file): + """Test is_finished and is_success are False if no files added.""" + meta_model = ReparseMeta.objects.create() + assert meta_model.is_finished is False + assert meta_model.is_success is False - meta_model.files_completed = 1 - meta_model.files_failed = 1 - assert ReparseMeta.file_counts_match(meta_model) is True + big_file.reparses.add(meta_model) + big_file.save() + assert meta_model.is_finished is False + assert meta_model.is_success is False + + fm = ReparseFileMeta.objects.get(data_file_id=big_file.pk, reparse_meta_id=meta_model.pk) + fm.finished = True + fm.save() + assert meta_model.is_finished is True + assert meta_model.is_success is False + + fm.success = True + fm.save() + assert meta_model.is_finished is True + assert meta_model.is_success is True diff --git a/tdrs-backend/tdpservice/users/test/test_permissions.py b/tdrs-backend/tdpservice/users/test/test_permissions.py index ae53b3cda..f1b3847ad 100644 --- a/tdrs-backend/tdpservice/users/test/test_permissions.py +++ b/tdrs-backend/tdpservice/users/test/test_permissions.py @@ -159,6 +159,9 @@ def test_ofa_system_admin_permissions(ofa_system_admin): 'search_indexes.add_reparsemeta', 'search_indexes.view_reparsemeta', 'search_indexes.change_reparsemeta', + 'data_files.add_reparsefilemeta', + 'data_files.view_reparsefilemeta', + 'data_files.change_reparsefilemeta', } group_permissions = ofa_system_admin.get_group_permissions() assert group_permissions == expected_permissions