Skip to content

Commit

Permalink
wp
Browse files Browse the repository at this point in the history
  • Loading branch information
jtimpe committed Oct 7, 2024
1 parent ed99523 commit 6c136d2
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 219 deletions.
2 changes: 1 addition & 1 deletion tdrs-backend/tdpservice/data_files/admin/filters.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Filter classes for DataFiles admin page."""
from django.contrib import admin
from django.utils.translation import ugettext_lazy as _
from tdpservice.search_indexes.models.reparse_meta import ReparseMeta
from tdpservice.data_files.models import ReparseMeta
from tdpservice.core.filters import MostRecentVersionFilter

class DataFileSummaryPrgTypeFilter(admin.SimpleListFilter):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
# Generated by Django 3.2.15 on 2024-10-04 12:17

from django.db import migrations, models
from tdpservice.data_files.models import ReparseFileMeta


class Migration(migrations.Migration):

dependencies = [
('data_files', '0013_datafile_reparse_meta'),
('search_indexes', '0031_reparsefilemeta'),
('search_indexes', '0031_alter_tribal_tanf_t4_closure_reason'),
]

operations = [
migrations.AddField(
model_name='datafile',
name='reparses',
field=models.ManyToManyField(help_text='Reparse events this file has been associated with.', related_name='files', through='search_indexes.ReparseFileMeta', to='search_indexes.ReparseMeta'),
field=models.ManyToManyField(help_text='Reparse events this file has been associated with.', related_name='files', through=ReparseFileMeta, to='search_indexes.ReparseMeta'),
),
]
210 changes: 207 additions & 3 deletions tdrs-backend/tdpservice/data_files/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,220 @@
from django.conf import settings
from django.contrib.admin.models import ADDITION, ContentType, LogEntry
from django.core.files.base import File
from django.db import models
from django.db import models, transaction
from django.db.models import Max
from django.db.utils import DatabaseError

from tdpservice.backends import DataFilesS3Storage
from tdpservice.stts.models import STT
from tdpservice.users.models import User
from tdpservice.data_files.models import ReparseFileMeta
from tdpservice.search_indexes.util import count_all_records

logger = logging.getLogger(__name__)



# give reparse its own app
class ReparseMeta(models.Model):
"""
Meta data model representing a single execution of `clean_and_reparse`.
Because this model is intended to be queried in a distributed and parrallel fashion, all queries should rely on
database level locking to ensure race conditions aren't introduced. See `increment_files_reparsed` for an example.
"""

class Meta:
"""Meta class for the model."""

verbose_name = "Reparse Meta Model"

created_at = models.DateTimeField(auto_now_add=True)
timeout_at = models.DateTimeField(auto_now_add=False, null=True)
# finished_at # property

finished = models.BooleanField(default=False) # property
success = models.BooleanField(default=False, help_text="All files completed parsing.") # property

num_files_to_reparse = models.PositiveIntegerField(default=0) # property
files_completed = models.PositiveIntegerField(default=0) # property
files_failed = models.PositiveIntegerField(default=0) # property

num_records_deleted = models.PositiveIntegerField(default=0)
num_records_created = models.PositiveIntegerField(default=0) # property

total_num_records_initial = models.PositiveBigIntegerField(default=0)
total_num_records_post = models.PositiveBigIntegerField(default=0)

db_backup_location = models.CharField(max_length=512)

# Options used to select the files to reparse (from mgmt cmd only, remove if command deprecated)
fiscal_quarter = models.CharField(max_length=2, null=True)
fiscal_year = models.PositiveIntegerField(null=True)
all = models.BooleanField(default=False)
new_indices = models.BooleanField(default=False)
delete_old_indices = models.BooleanField(default=False)

@property
def is_finished(self):
return all([r.finished for r in self.reparse_file_metas.all()])

@property
def is_success(self):
return all([r.success for r in self.reparse_file_metas.all()])

@property
def finished_at(self):
last_parse = self.reparse_file_metas.order_by('-finished_at').first()
return last_parse.finished_at if last_parse else None

@property
def num_files(self):
return self.reparse_file_metas.count()

@property
def num_files_completed(self):
return self.reparse_file_metas.filter(finished=True).count()

@property
def num_files_failed(self):
return self.reparse_file_metas.filter(success=False).count()

@property
def num_records_cre(self):
return sum([r.num_records_created for r in self.reparse_file_metas.all()])


# remove unused statics or change to utils funcs in own app and/or make new cleanup ticket for future

@staticmethod
def file_counts_match(meta_model):
"""
Check whether the file counts match.
This function assumes the meta_model has been passed in a distributed/thread safe way. If the database row
containing this model has not been locked the caller will experience race issues.
"""
print("\n\nINSIDE FILE COUNTS MATCH:")
print(f"{meta_model.num_files_to_reparse }, {meta_model.files_completed}, {meta_model.files_failed}\n\n")
return (meta_model.files_completed == meta_model.num_files_to_reparse or
meta_model.files_completed + meta_model.files_failed ==
meta_model.num_files_to_reparse or
meta_model.files_failed == meta_model.num_files_to_reparse)

@staticmethod
def assert_all_files_done(meta_model):
"""
Check if all files have been parsed with or without exceptions.
This function assumes the meta_model has been passed in a distributed/thread safe way. If the database row
containing this model has not been locked the caller will experience race issues.
"""
if meta_model.finished and ReparseMeta.file_counts_match(meta_model):
return True
return False

@staticmethod
def set_reparse_finished(meta_model):
"""
Set status/completion fields to appropriate values.
This function assumes the meta_model has been passed in a distributed/thread safe way. If the database row
containing this model has not been locked the caller will experience race issues.
"""
meta_model.finished = True
meta_model.success = meta_model.files_completed == meta_model.num_files_to_reparse
meta_model.total_num_records_post = count_all_records()
meta_model.save()

@staticmethod
def increment_files_completed(reparse_meta_models):
"""
Increment the count of files that have completed parsing for the datafile's current/latest reparse model.
Because this function can be called in parallel we use `select_for_update` because multiple parse tasks can
referrence the same ReparseMeta object that is being queried below. `select_for_update` provides a DB lock on
the object and forces other transactions on the object to wait until this one completes.
"""
if reparse_meta_models.exists():
with transaction.atomic():
try:
meta_model = reparse_meta_models.select_for_update().latest("pk")
meta_model.files_completed += 1
if ReparseMeta.file_counts_match(meta_model):
ReparseMeta.set_reparse_finished(meta_model)
meta_model.save()
except DatabaseError:
logger.exception("Encountered exception while trying to update the `files_reparsed` field on the "
f"ReparseMeta object with ID: {meta_model.pk}.")

@staticmethod
def increment_files_failed(reparse_meta_models):
"""
Increment the count of files that failed parsing for the datafile's current/latest reparse meta model.
Because this function can be called in parallel we use `select_for_update` because multiple parse tasks can
referrence the same ReparseMeta object that is being queried below. `select_for_update` provides a DB lock on
the object and forces other transactions on the object to wait until this one completes.
"""
if reparse_meta_models.exists():
with transaction.atomic():
try:
meta_model = reparse_meta_models.select_for_update().latest("pk")
meta_model.files_failed += 1
if ReparseMeta.file_counts_match(meta_model):
ReparseMeta.set_reparse_finished(meta_model)
meta_model.save()
except DatabaseError:
logger.exception("Encountered exception while trying to update the `files_failed` field on the "
f"ReparseMeta object with ID: {meta_model.pk}.")

@staticmethod
def increment_records_created(reparse_meta_models, num_created):
"""
Increment the count of records created for the datafile's current/latest reparse meta model.
Because this function can be called in parallel we use `select_for_update` because multiple parse tasks can
referrence the same ReparseMeta object that is being queried below. `select_for_update` provides a DB lock on
the object and forces other transactions on the object to wait until this one completes.
"""
if reparse_meta_models.exists():
with transaction.atomic():
try:
meta_model = reparse_meta_models.select_for_update().latest("pk")
meta_model.num_records_created += num_created
meta_model.save()
except DatabaseError:
logger.exception("Encountered exception while trying to update the `files_failed` field on the "
f"ReparseMeta object with ID: {meta_model.pk}.")

@staticmethod
def get_latest():
"""Get the ReparseMeta model with the greatest pk."""
max_pk = ReparseMeta.objects.all().aggregate(Max('pk'))
if max_pk.get("pk__max", None) is None:
return None
return ReparseMeta.objects.get(pk=max_pk["pk__max"])


class ReparseFileMeta(models.Model):
"""Meta data model representing a single file parse within a reparse execution."""
data_file = models.ForeignKey('data_files.DataFile', on_delete=models.CASCADE, related_name='reparse_file_metas')
reparse_meta = models.ForeignKey('data_files.ReparseMeta', on_delete=models.CASCADE, related_name='reparse_file_metas')

finished = models.BooleanField(default=False)
success = models.BooleanField(default=False)
started_at = models.DateTimeField(auto_now_add=False, null=True) # set at beg of parse run
finished_at = models.DateTimeField(auto_now_add=False, null=True)

# num_records_deleted = models.PositiveIntegerField(default=0)
num_records_created = models.PositiveIntegerField(default=0)
cat_4_errors_generated = models.PositiveIntegerField(default=0)


##


def get_file_shasum(file: Union[File, StringIO]) -> str:
"""Derive the SHA256 checksum of a file."""
_hash = sha256()
Expand Down Expand Up @@ -154,8 +358,8 @@ class Meta:
)

reparses = models.ManyToManyField(
"search_indexes.ReparseMeta",
through='search_indexes.ReparseFileMeta',
"data_files.ReparseMeta",
through=ReparseFileMeta,
help_text="Reparse events this file has been associated with.",
related_name="files"
)
Expand Down
2 changes: 1 addition & 1 deletion tdrs-backend/tdpservice/parsers/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from tdpservice.parsers.schema_defs.utils import get_section_reference, get_program_model
from tdpservice.parsers.case_consistency_validator import CaseConsistencyValidator
from tdpservice.parsers.util import log_parser_exception
from tdpservice.search_indexes.models.reparse_meta import ReparseMeta, ReparseFileMeta
from tdpservice.data_files.models import ReparseMeta, ReparseFileMeta

logger = logging.getLogger(__name__)

Expand Down
2 changes: 1 addition & 1 deletion tdrs-backend/tdpservice/scheduling/parser_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from tdpservice.parsers.aggregates import case_aggregates_by_month, total_errors_by_month
from tdpservice.parsers.util import log_parser_exception, make_generate_parser_error
from tdpservice.email.helpers.data_file import send_data_submitted_email
from tdpservice.search_indexes.models.reparse_meta import ReparseMeta, ReparseFileMeta
from tdpservice.data_files.models import ReparseMeta, ReparseFileMeta


logger = logging.getLogger(__name__)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from tdpservice.parsers.models import DataFileSummary, ParserError
from tdpservice.scheduling import parser_task
from tdpservice.search_indexes.util import DOCUMENTS, count_all_records
from tdpservice.search_indexes.models.reparse_meta import ReparseMeta
from tdpservice.data_files.models import ReparseMeta
from tdpservice.core.utils import log
from django.contrib.admin.models import ADDITION
from tdpservice.users.models import User
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from tdpservice.core.utils import log
from django.contrib.admin.models import ADDITION
from tdpservice.users.models import User
from tdpservice.search_indexes.models.reparse_meta import ReparseMeta
from tdpservice.data_files.models import ReparseMeta


class Command(search_index.Command):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class Migration(migrations.Migration):

dependencies = [
('data_files', '0013_datafile_reparse_meta'),
('search_indexes', '0030_reparse_meta_model'),
('search_indexes', '0031_alter_tribal_tanf_t4_closure_reason'),
]

operations = [
Expand All @@ -22,8 +22,8 @@ class Migration(migrations.Migration):
('finished_at', models.DateTimeField(null=True)),
('num_records_created', models.PositiveIntegerField(default=0)),
('cat_4_errors_generated', models.PositiveIntegerField(default=0)),
('data_file', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='reparse_file_metas', to='data_files.datafile')),
('reparse_meta', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='reparse_file_metas', to='search_indexes.reparsemeta')),
('data_file', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='reparse_file_metas', to='data_files.DataFile')),
('reparse_meta', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='reparse_file_metas', to='search_indexes.ReparseMeta')),
],
),
]
3 changes: 1 addition & 2 deletions tdrs-backend/tdpservice/search_indexes/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from . import tanf, tribal, ssp, reparse_meta
from . import tanf, tribal, ssp

tanf = tanf
tribal = tribal
ssp = ssp
reparse_meta = reparse_meta
Loading

0 comments on commit 6c136d2

Please sign in to comment.