Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3157 - Reparse Meta through model #3220

Merged
merged 15 commits into from
Oct 17, 2024
2 changes: 1 addition & 1 deletion tdrs-backend/tdpservice/data_files/admin/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
class DataFileInline(admin.TabularInline):
"""Inline model for many to many relationship."""

model = DataFile.reparse_meta_models.through
model = DataFile.reparses.through
can_delete = False
ordering = ["-pk"]

Expand Down
2 changes: 1 addition & 1 deletion tdrs-backend/tdpservice/data_files/admin/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def queryset(self, request, queryset):
if self.value() is not None and queryset.exists():
latest_meta = ReparseMeta.get_latest()
if latest_meta is not None:
queryset = queryset.filter(reparse_meta_models=latest_meta)
queryset = queryset.filter(reparses=latest_meta)
return queryset


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 3.2.15 on 2024-10-08 12:18

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('search_indexes', '0031_alter_tribal_tanf_t4_closure_reason'),
('data_files', '0013_datafile_reparse_meta'),
]

operations = [
migrations.CreateModel(
name='ReparseFileMeta',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('finished', models.BooleanField(default=False)),
('success', models.BooleanField(default=False)),
('started_at', models.DateTimeField(null=True)),
('finished_at', models.DateTimeField(null=True)),
('num_records_created', models.PositiveIntegerField(default=0)),
('cat_4_errors_generated', models.PositiveIntegerField(default=0)),
('data_file', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='reparse_file_metas', to='data_files.datafile')),
('reparse_meta', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='reparse_file_metas', to='search_indexes.reparsemeta')),
],
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 3.2.15 on 2024-10-04 12:17

from django.db import migrations, models
from tdpservice.data_files.models import ReparseFileMeta


class Migration(migrations.Migration):

dependencies = [
('data_files', '0014_reparsefilemeta'),
('search_indexes', '0031_alter_tribal_tanf_t4_closure_reason'),
]

operations = [
migrations.AddField(
model_name='datafile',
name='reparses',
field=models.ManyToManyField(help_text='Reparse events this file has been associated with.', related_name='files', through="data_files.ReparseFileMeta", to='search_indexes.ReparseMeta'),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Generated by Django 3.2.15 on 2024-10-04 12:17

from django.db import migrations


def switch_reparse_meta_through_model(apps, schema_editor):
DataFile=apps.get_model("data_files","DataFile")
ReparseMeta=apps.get_model("search_indexes","ReparseMeta")
OldThru=DataFile.reparse_meta_models.through
ReparseFileMeta=apps.get_model("data_files", "ReparseFileMeta")

q = OldThru.objects.all()

print(f'switching {q.count()} through models')

for m in q:
ReparseFileMeta.objects.create(
data_file_id=m.datafile.pk,
reparse_meta_id=m.reparsemeta.pk
)
m.delete()


class Migration(migrations.Migration):

dependencies = [
('data_files', '0015_datafile_reparses'),
]

operations = [
migrations.RunPython(
switch_reparse_meta_through_model,
),
migrations.RemoveField(
model_name='datafile',
name='reparse_meta_models',
),
]
28 changes: 24 additions & 4 deletions tdrs-backend/tdpservice/data_files/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,24 @@ class Meta:
# separately
extension = models.CharField(max_length=8, default="txt")

class ReparseFileMeta(models.Model):
"""Meta data model representing a single file parse within a reparse execution."""

data_file = models.ForeignKey('data_files.DataFile', on_delete=models.CASCADE, related_name='reparse_file_metas')
reparse_meta = models.ForeignKey(
'search_indexes.ReparseMeta',
on_delete=models.CASCADE,
related_name='reparse_file_metas'
)

finished = models.BooleanField(default=False)
success = models.BooleanField(default=False)
started_at = models.DateTimeField(auto_now_add=False, null=True)
finished_at = models.DateTimeField(auto_now_add=False, null=True)

num_records_created = models.PositiveIntegerField(default=0)
cat_4_errors_generated = models.PositiveIntegerField(default=0)


class DataFile(FileRecord):
"""Represents a version of a data file."""
Expand Down Expand Up @@ -153,10 +171,12 @@ class Meta:
null=True
)

reparse_meta_models = models.ManyToManyField("search_indexes.ReparseMeta",
help_text="Reparse events this file has been associated with.",
related_name="datafiles"
)
reparses = models.ManyToManyField(
"search_indexes.ReparseMeta",
through="data_files.ReparseFileMeta",
help_text="Reparse events this file has been associated with.",
related_name="files"
)

@property
def prog_type(self):
Expand Down
8 changes: 4 additions & 4 deletions tdrs-backend/tdpservice/data_files/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

def get_stuck_files():
"""Return a queryset containing files in a 'stuck' state."""
stuck_files = DataFile.objects.annotate(reparse_count=Count('reparse_meta_models')).filter(
stuck_files = DataFile.objects.annotate(reparse_count=Count('reparses')).filter(
# non-reparse submissions over an hour old
Q(
reparse_count=0,
Expand All @@ -22,9 +22,9 @@ def get_stuck_files():
# reparse submissions past the timeout, where the reparse did not complete
Q(
reparse_count__gt=0,
reparse_meta_models__timeout_at__lte=timezone.now(),
reparse_meta_models__finished=False,
reparse_meta_models__success=False
reparses__timeout_at__lte=timezone.now(),
reparse_file_metas__finished=False,
reparse_file_metas__success=False
)
).filter(
# where there is NO summary or the summary is in PENDING status
Expand Down
50 changes: 24 additions & 26 deletions tdrs-backend/tdpservice/data_files/test/test_stuck_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@ def make_summary(datafile, status):
)


def make_reparse_meta(finished, success):
def make_reparse_meta():
"""Create a test reparse meta model."""
return ReparseMetaFactory.create(
timeout_at=_time_ago(hours=1),
finished=finished,
success=success
timeout_at=_time_ago(hours=1)
)


Expand All @@ -54,8 +52,8 @@ def test_find_pending_submissions__none_stuck(stt_user, stt):
df2.created_at = _time_ago(hours=1)
df2.save()
make_summary(df2, DataFileSummary.Status.ACCEPTED)
rpm = make_reparse_meta(True, True)
df2.reparse_meta_models.add(rpm)
rpm = make_reparse_meta()
df2.reparses.add(rpm, through_defaults={'finished': True, 'success': True})

# a pending standard submission, less than an hour old
df3 = make_datafile(stt_user, stt, 3)
Expand All @@ -81,8 +79,8 @@ def test_find_pending_submissions__non_reparse_stuck(stt_user, stt):
df2.created_at = _time_ago(hours=1)
df2.save()
make_summary(df2, DataFileSummary.Status.ACCEPTED)
rpm = make_reparse_meta(True, True)
df2.reparse_meta_models.add(rpm)
rpm = make_reparse_meta()
df2.reparses.add(rpm, through_defaults={'finished': True, 'success': True})

stuck_files = get_stuck_files()
assert stuck_files.count() == 1
Expand All @@ -102,8 +100,8 @@ def test_find_pending_submissions__non_reparse_stuck__no_dfs(stt_user, stt):
df2.created_at = _time_ago(hours=1)
df2.save()
make_summary(df2, DataFileSummary.Status.ACCEPTED)
rpm = make_reparse_meta(True, True)
df2.reparse_meta_models.add(rpm)
rpm = make_reparse_meta()
df2.reparses.add(rpm, through_defaults={'finished': True, 'success': True})

stuck_files = get_stuck_files()
assert stuck_files.count() == 1
Expand All @@ -124,8 +122,8 @@ def test_find_pending_submissions__reparse_stuck(stt_user, stt):
df2.created_at = _time_ago(hours=1)
df2.save()
make_summary(df2, DataFileSummary.Status.PENDING)
rpm = make_reparse_meta(False, False)
df2.reparse_meta_models.add(rpm)
rpm = make_reparse_meta()
df2.reparses.add(rpm, through_defaults={'finished': False, 'success': False})

stuck_files = get_stuck_files()
assert stuck_files.count() == 1
Expand All @@ -145,8 +143,8 @@ def test_find_pending_submissions__reparse_stuck__no_dfs(stt_user, stt):
df2 = make_datafile(stt_user, stt, 2)
df2.created_at = _time_ago(hours=1)
df2.save()
rpm = make_reparse_meta(False, False)
df2.reparse_meta_models.add(rpm)
rpm = make_reparse_meta()
df2.reparses.add(rpm, through_defaults={'finished': False, 'success': False})

stuck_files = get_stuck_files()
assert stuck_files.count() == 1
Expand All @@ -167,8 +165,8 @@ def test_find_pending_submissions__reparse_and_non_reparse_stuck(stt_user, stt):
df2.created_at = _time_ago(hours=1)
df2.save()
make_summary(df2, DataFileSummary.Status.PENDING)
rpm = make_reparse_meta(False, False)
df2.reparse_meta_models.add(rpm)
rpm = make_reparse_meta()
df2.reparses.add(rpm, through_defaults={'finished': False, 'success': False})

stuck_files = get_stuck_files()
assert stuck_files.count() == 2
Expand All @@ -188,8 +186,8 @@ def test_find_pending_submissions__reparse_and_non_reparse_stuck_no_dfs(stt_user
df2 = make_datafile(stt_user, stt, 2)
df2.created_at = _time_ago(hours=1)
df2.save()
rpm = make_reparse_meta(False, False)
df2.reparse_meta_models.add(rpm)
rpm = make_reparse_meta()
df2.reparses.add(rpm, through_defaults={'finished': False, 'success': False})

stuck_files = get_stuck_files()
assert stuck_files.count() == 2
Expand All @@ -207,8 +205,8 @@ def test_find_pending_submissions__old_reparse_stuck__new_not_stuck(stt_user, st
dfs1 = make_summary(df1, DataFileSummary.Status.PENDING)

# reparse fails the first time
rpm1 = make_reparse_meta(False, False)
df1.reparse_meta_models.add(rpm1)
rpm1 = make_reparse_meta()
df1.reparses.add(rpm1, through_defaults={'finished': False, 'success': False})

stuck_files = get_stuck_files()
assert stuck_files.count() == 1
Expand All @@ -217,8 +215,8 @@ def test_find_pending_submissions__old_reparse_stuck__new_not_stuck(stt_user, st
dfs1.delete() # reparse deletes the original dfs and creates the new one
make_summary(df1, DataFileSummary.Status.ACCEPTED)

rpm2 = make_reparse_meta(True, True)
df1.reparse_meta_models.add(rpm2)
rpm2 = make_reparse_meta()
df1.reparses.add(rpm2, through_defaults={'finished': True, 'success': True})

stuck_files = get_stuck_files()
assert stuck_files.count() == 0
Expand All @@ -234,8 +232,8 @@ def test_find_pending_submissions__new_reparse_stuck__old_not_stuck(stt_user, st
dfs1 = make_summary(df1, DataFileSummary.Status.REJECTED)

# reparse succeeds
rpm1 = make_reparse_meta(True, True)
df1.reparse_meta_models.add(rpm1)
rpm1 = make_reparse_meta()
df1.reparses.add(rpm1, through_defaults={'finished': True, 'success': True})

# reparse again, fails this time
dfs1.delete() # reparse deletes the original dfs and creates the new one
Expand All @@ -244,8 +242,8 @@ def test_find_pending_submissions__new_reparse_stuck__old_not_stuck(stt_user, st
status=DataFileSummary.Status.PENDING,
)

rpm2 = make_reparse_meta(False, False)
df1.reparse_meta_models.add(rpm2)
rpm2 = make_reparse_meta()
df1.reparses.add(rpm2, through_defaults={'finished': False, 'success': False})

stuck_files = get_stuck_files()
assert stuck_files.count() == 1
Expand Down
15 changes: 0 additions & 15 deletions tdrs-backend/tdpservice/parsers/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from tdpservice.parsers.schema_defs.utils import get_section_reference, get_program_model
from tdpservice.parsers.case_consistency_validator import CaseConsistencyValidator
from tdpservice.parsers.util import log_parser_exception
from tdpservice.search_indexes.models.reparse_meta import ReparseMeta

logger = logging.getLogger(__name__)

Expand All @@ -34,7 +33,6 @@ def parse_datafile(datafile, dfs):
logger.info(f"Preparser Error: {len(header_errors)} header errors encountered.")
errors['header'] = header_errors
bulk_create_errors({1: header_errors}, 1, flush=True)
update_meta_model(datafile, dfs)
return errors
elif header_is_valid and len(header_errors) > 0:
logger.info(f"Preparser Warning: {len(header_errors)} header warnings encountered.")
Expand Down Expand Up @@ -75,7 +73,6 @@ def parse_datafile(datafile, dfs):
f"({header['program_type']}) and FIPS Code ({field_values['state_fips']}).",)
errors['header'] = [tribe_error]
bulk_create_errors({1: [tribe_error]}, 1, flush=True)
update_meta_model(datafile, dfs)
return errors

# Ensure file section matches upload section
Expand All @@ -90,7 +87,6 @@ def parse_datafile(datafile, dfs):
errors['document'] = [section_error]
unsaved_parser_errors = {1: [section_error]}
bulk_create_errors(unsaved_parser_errors, 1, flush=True)
update_meta_model(datafile, dfs)
return errors

rpt_month_year_is_valid, rpt_month_year_error = category1.validate_header_rpt_month_year(
Expand All @@ -103,7 +99,6 @@ def parse_datafile(datafile, dfs):
errors['document'] = [rpt_month_year_error]
unsaved_parser_errors = {1: [rpt_month_year_error]}
bulk_create_errors(unsaved_parser_errors, 1, flush=True)
update_meta_model(datafile, dfs)
return errors

line_errors = parse_datafile_lines(datafile, dfs, program_type, section, is_encrypted, case_consistency_validator)
Expand All @@ -112,11 +107,6 @@ def parse_datafile(datafile, dfs):

return errors

def update_meta_model(datafile, dfs):
"""Update appropriate meta models."""
ReparseMeta.increment_records_created(datafile.reparse_meta_models, dfs.total_number_of_records_created)
ReparseMeta.increment_files_completed(datafile.reparse_meta_models)

def bulk_create_records(unsaved_records, line_number, header_count, datafile, dfs, flush=False):
"""Bulk create passed in records."""
batch_size = settings.BULK_CREATE_BATCH_SIZE
Expand Down Expand Up @@ -385,7 +375,6 @@ def parse_datafile_lines(datafile, dfs, program_type, section, is_encrypted, cas
rollback_records(unsaved_records.get_bulk_create_struct(), datafile)
rollback_parser_errors(datafile)
bulk_create_errors(preparse_error, num_errors, flush=True)
update_meta_model(datafile, dfs)
return errors

if prev_sum != header_count + trailer_count:
Expand Down Expand Up @@ -448,7 +437,6 @@ def parse_datafile_lines(datafile, dfs, program_type, section, is_encrypted, cas
rollback_parser_errors(datafile)
preparse_error = {line_number: [err_obj]}
bulk_create_errors(preparse_error, num_errors, flush=True)
update_meta_model(datafile, dfs)
return errors

should_remove = validate_case_consistency(case_consistency_validator)
Expand All @@ -469,7 +457,6 @@ def parse_datafile_lines(datafile, dfs, program_type, section, is_encrypted, cas
logger.error(f"Not all parsed records created for file: {datafile.id}!")
rollback_records(unsaved_records.get_bulk_create_struct(), datafile)
bulk_create_errors(unsaved_parser_errors, num_errors, flush=True)
update_meta_model(datafile, dfs)
return errors

# Add any generated cat4 errors to our error data structure & clear our caches errors list
Expand All @@ -486,8 +473,6 @@ def parse_datafile_lines(datafile, dfs, program_type, section, is_encrypted, cas
f"validated {case_consistency_validator.total_cases_validated} of them.")
dfs.save()

update_meta_model(datafile, dfs)

return errors


Expand Down
Loading
Loading