Skip to content

Commit

Permalink
Merge branch 'develop' into 2693-cat2-messaging-cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jtimpe committed Jun 27, 2024
2 parents 64c3b9f + 35adb87 commit b95fb9f
Show file tree
Hide file tree
Showing 39 changed files with 2,862 additions and 37,478 deletions.
378 changes: 378 additions & 0 deletions docs/Technical-Documentation/diagrams/parsing.drawio

Large diffs are not rendered by default.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 11 additions & 0 deletions docs/Technical-Documentation/parsing-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# High Level Parsing Flow

Parsing begins after a user submits a datafile or datafiles via the frontend. The submission generates a new Celery task
or tasks which are enqueued to Redis. As work becomes available the Celery workers dequeue a task from Redis and begin
working them. The parsing task gets the Datafile Django model and begins iterating over each line in the file. For each
line in the file this task: parses the line into a new record, performs category 1 - 3 validation on the record,
performs exact duplicate and partial duplicate detection, performs category 4 validation, and stores the record in a
cache to be bulk created/serialized to the database and ElasticSearch. The image below provides a high level flow of the
aforementioned steps.

![Parsing Flow](./diagrams/parsing.png)
18 changes: 11 additions & 7 deletions tdrs-backend/tdpservice/parsers/aggregates.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Aggregate methods for the parsers."""
from .row_schema import SchemaManager
from .models import ParserError
from .models import ParserError, ParserErrorCategoryChoices
from .util import month_to_int, \
transform_to_months, fiscal_to_calendar, get_prog_from_section
from .schema_defs.utils import get_program_models, get_text_from_df
from django.db.models import Q as Query


def case_aggregates_by_month(df, dfs_status):
Expand Down Expand Up @@ -39,22 +40,25 @@ def case_aggregates_by_month(df, dfs_status):
if isinstance(schema_model, SchemaManager):
schema_model = schema_model.schemas[0]

curr_case_numbers = set(schema_model.document.Django.model.objects.filter(datafile=df)
.filter(RPT_MONTH_YEAR=rpt_month_year)
curr_case_numbers = set(schema_model.document.Django.model.objects.filter(datafile=df,
RPT_MONTH_YEAR=rpt_month_year)
.distinct("CASE_NUMBER").values_list("CASE_NUMBER", flat=True))
case_numbers = case_numbers.union(curr_case_numbers)

total += len(case_numbers)
cases_with_errors += ParserError.objects.filter(file=df).filter(
case_number__in=case_numbers).distinct('case_number').count()
cases_with_errors += ParserError.objects.filter(file=df, case_number__in=case_numbers)\
.distinct('case_number').count()
accepted = total - cases_with_errors

aggregate_data['months'].append({"month": month,
"accepted_without_errors": accepted,
"accepted_with_errors": cases_with_errors})

aggregate_data['rejected'] = ParserError.objects.filter(file=df).filter(case_number=None).distinct("row_number")\
.exclude(row_number=0).count()
error_type_query = Query(error_type=ParserErrorCategoryChoices.PRE_CHECK) | \
Query(error_type=ParserErrorCategoryChoices.CASE_CONSISTENCY)

aggregate_data['rejected'] = ParserError.objects.filter(error_type_query, file=df)\
.distinct("row_number").exclude(row_number=0).count()

return aggregate_data

Expand Down
409 changes: 201 additions & 208 deletions tdrs-backend/tdpservice/parsers/case_consistency_validator.py

Large diffs are not rendered by default.

226 changes: 226 additions & 0 deletions tdrs-backend/tdpservice/parsers/duplicate_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
"""Class definition for record duplicate class and helper classes."""
from django.conf import settings
from enum import IntEnum
from .models import ParserErrorCategoryChoices

class ErrorLevel(IntEnum):
"""Error level enumerations for precedence."""

DUPLICATE = 0
PARTIAL_DUPLICATE = 1
NONE = 2 # This should always be the last level in the list

class ErrorPrecedence:
"""Data structure to manage error precedence."""

def __init__(self):
self.curr_max_precedence = ErrorLevel.NONE

def has_precedence(self, error_level):
"""Return tuple of bools: (has_precidence, is_new_max_precedence)."""
if settings.IGNORE_DUPLICATE_ERROR_PRECEDENCE:
return (True, False)
if self.curr_max_precedence == ErrorLevel.NONE:
self.curr_max_precedence = error_level
return (True, True)
elif self.curr_max_precedence > error_level:
self.curr_max_precedence = error_level
return (True, True)
elif self.curr_max_precedence == error_level:
return (True, False)
else:
return (False, False)


class CaseDuplicateDetector:
"""Container class.
Container class to manage records of the same case, cases that should be removed because of category 4 errors,
and to perform exact and partial duplicate detection of the records (a category 4 error type).
"""

def __init__(self, my_hash, manager_error_dict, generate_error):
self.my_hash = my_hash
################################################################################################################
# WARNING
self.manager_error_dict = manager_error_dict
# Do not change/re-assign this dictionary unless you know exactly what you're doing! This object is owned by the
# DuplicateManager object. The CaseDuplicateDetector has a reference to this object as a performance
# optimization which lets the DuplicateManager avoid having to iterate over all CaseDuplicateDetectors to get
# all of the duplicate errors.
################################################################################################################
self.generate_error = generate_error
self.record_ids = dict()
self.record_hashes = dict()
self.partial_hashes = dict()
self.error_precedence = ErrorPrecedence()
self.num_errors = 0
self.should_remove_from_db = False
self.current_line_number = None

def set_should_remove_from_db(self, should_remove):
"""Set should remove from DB."""
self.should_remove_from_db = should_remove

def has_errors(self):
"""Return case duplicate error state."""
return self.num_errors > 0

def get_num_errors(self):
"""Return the number of errors."""
return self.num_errors

def get_records_for_post_parse_deletion(self):
"""Return record ids if case has duplicate errors."""
if self.should_remove_from_db:
return self.record_ids
return dict()

def __generate_error(self, err_msg, record, schema, line_number, has_precedence, is_new_max_precedence):
"""Add an error to the managers error dictionary.
@param err_msg: string representation of the error message
@param record: a Django model representing a datafile record
@param schema: the schema from which the record was created
@param has_precedence: boolean indicating if this incoming error has equivalent precedence to current errors
@param is_new_max_precedence: boolean indicating if this error has the new max precedence
"""
if has_precedence:
error = self.generate_error(
error_category=ParserErrorCategoryChoices.CASE_CONSISTENCY,
line_number=line_number,
schema=schema,
record=record,
field=None,
error_message=err_msg,
)
if is_new_max_precedence:
self.manager_error_dict[self.my_hash] = [error]
else:
self.manager_error_dict.setdefault(self.my_hash, []).append(error)
self.num_errors = len(self.manager_error_dict[self.my_hash])

def __get_partial_dup_error_msg(self, schema, record_type, curr_line_number, existing_line_number):
"""Generate partial duplicate error message with friendly names."""
field_names = schema.get_partial_hash_members_func()
err_msg = (f"Partial duplicate record detected with record type "
f"{record_type} at line {curr_line_number}. Record is a partial duplicate of the "
f"record at line number {existing_line_number}. Duplicated fields causing error: ")
for i, name in enumerate(field_names):
if i == len(field_names) - 1 and len(field_names) != 1:
err_msg += f"and {schema.get_field_by_name(name).friendly_name}."
elif len(field_names) == 1:
err_msg += f"{schema.get_field_by_name(name).friendly_name}."
else:
err_msg += f"{schema.get_field_by_name(name).friendly_name}, "
return err_msg

def add_case_member(self, record, schema, line, line_number):
"""Add case member and generate errors if needed.
@param record: a Django model representing a datafile record
@param schema: the schema from which the record was created
@param line: the raw string line representing the record
@param line_number: the line number the record was generated from in the datafile
"""
# Add all records detector receives to id dictionary. That way if a line that has more than one record created
# from it will have all of it's records appropriately marked for deletion if need be.
self.record_ids.setdefault(schema.document, []).append(record.id)

# We do not run duplicate detection for records that have been generated on the same line: T3, M3, T6, M6, T7,
# M7. This is because we would incorrectly generate both duplicate and partial duplicate errors.
if self.current_line_number is None or self.current_line_number != line_number:
self.current_line_number = line_number
err_msg = None
has_precedence = False
is_new_max_precedence = False

line_hash, partial_hash = schema.generate_hashes_func(line, record)
should_skip_partial_dup = schema.should_skip_partial_dup_func(record)

if line_hash in self.record_hashes:
has_precedence, is_new_max_precedence = self.error_precedence.has_precedence(ErrorLevel.DUPLICATE)
existing_record_line_number = self.record_hashes[line_hash]
err_msg = (f"Duplicate record detected with record type "
f"{record.RecordType} at line {line_number}. Record is a duplicate of the record at "
f"line number {existing_record_line_number}.")
elif not should_skip_partial_dup and partial_hash in self.partial_hashes:
has_precedence, is_new_max_precedence = self.error_precedence.has_precedence(
ErrorLevel.PARTIAL_DUPLICATE)
existing_record_line_number = self.partial_hashes[partial_hash]
err_msg = self.__get_partial_dup_error_msg(schema, record.RecordType,
line_number, existing_record_line_number)

self.__generate_error(err_msg, record, schema, line_number, has_precedence, is_new_max_precedence)
if line_hash not in self.record_hashes:
self.record_hashes[line_hash] = line_number
if partial_hash is not None and partial_hash not in self.partial_hashes:
self.partial_hashes[partial_hash] = line_number


class DuplicateManager:
"""Manages all CaseDuplicateDetectors and their errors."""

def __init__(self, generate_error):
self.case_duplicate_detectors = dict()
self.generate_error = generate_error
################################################################################################################
# WARNING
self.generated_errors = dict()
# Do not change/re-assign the dictionary unless you exactly know what you're doing! This object is a one to many
# relationship. That is, each CaseDuplicateDetector has a reference to this dictionary so that it can store
# it's generated duplicate errors which avoids needing the DuplicateManager to loop over all
# CaseDuplicateDetectors to get their errors which is a serious performance boost.
################################################################################################################

def add_record(self, record, case_hash, schema, line, line_number):
"""Add record to CaseDuplicateDetector and return whether the record's case has errors.
@param record: a Django model representing a datafile record
@param case_hash: a hash value representing the @record's unique case
@param schema: the schema from which the record was created
@param line: the raw string from the datafile representing the record
@param line_number: the line number the record was generated from in the datafile
"""
if case_hash not in self.case_duplicate_detectors:
case_duplicate_detector = CaseDuplicateDetector(case_hash, self.generated_errors, self.generate_error)
self.case_duplicate_detectors[case_hash] = case_duplicate_detector
self.case_duplicate_detectors[case_hash].add_case_member(record, schema, line, line_number)

def get_generated_errors(self):
"""Return all errors from all CaseDuplicateDetectors."""
generated_errors = list()
for errors in self.generated_errors.values():
generated_errors.extend(errors)
return generated_errors

def clear_errors(self):
"""Clear all generated errors."""
# We MUST call .clear() here instead of re-assigning a new dict() because the case_duplicate_detectors have a
# reference to this dictionary. Re-assigning the dictionary means the case_duplicate_detectors lose their
# reference.
self.generated_errors.clear()

def get_records_to_remove(self):
"""Return dictionary of document:[errors]."""
records_to_remove = dict()
for case_duplicate_detector in self.case_duplicate_detectors.values():
for document, ids in case_duplicate_detector.get_records_for_post_parse_deletion().items():
records_to_remove.setdefault(document, []).extend(ids)

return records_to_remove

def update_removed(self, case_hash, should_remove, was_removed):
"""Notify CaseDuplicateDetectors whether case could or could not be removed from memory."""
case_duplicate_detector = self.case_duplicate_detectors.get(case_hash, False)
if case_duplicate_detector:
if was_removed and not should_remove:
case_duplicate_detector.set_should_remove_from_db(False)
elif not was_removed and should_remove:
case_duplicate_detector.set_should_remove_from_db(True)

def get_num_dup_errors(self, case_hash):
"""Return the number of duplicate errors for a specific duplicate detector."""
if case_hash in self.case_duplicate_detectors:
return self.case_duplicate_detectors.get(case_hash).get_num_errors()
return 0
4 changes: 3 additions & 1 deletion tdrs-backend/tdpservice/parsers/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def get_status(self):
.exclude(error_message__icontains="trailer")\
.exclude(error_message__icontains="Unknown Record_Type was found.")

case_consistency_errors = errors.filter(error_type=ParserErrorCategoryChoices.CASE_CONSISTENCY)

row_precheck_errors = errors.filter(error_type=ParserErrorCategoryChoices.PRE_CHECK)\
.filter(field_name="Record_Type")\
.exclude(error_message__icontains="trailer")
Expand All @@ -113,7 +115,7 @@ def get_status(self):
return DataFileSummary.Status.REJECTED
elif errors.count() == 0:
return DataFileSummary.Status.ACCEPTED
elif row_precheck_errors.count() > 0:
elif row_precheck_errors.count() > 0 or case_consistency_errors.count() > 0:
return DataFileSummary.Status.PARTIALLY_ACCEPTED
else:
return DataFileSummary.Status.ACCEPTED_WITH_ERRORS
Loading

0 comments on commit b95fb9f

Please sign in to comment.