Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[COST-861] Tokenizing error - skip bad rows when reading csv #5031

Merged
merged 5 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions koku/masu/external/kafka_msg_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,21 @@ class KafkaMsgHandlerError(Exception):
"""Kafka msg handler error."""


def divide_csv_daily(file_path: os.PathLike, manifest_id: int):
"""
Split local file into daily content.
"""
daily_files = []

def get_data_frame(file_path: os.PathLike):
"""read csv file into dataframe"""
try:
data_frame = pd.read_csv(file_path, dtype=pd.StringDtype(storage="pyarrow"))
return pd.read_csv(file_path, dtype=pd.StringDtype(storage="pyarrow"), on_bad_lines="warn")
except Exception as error:
samdoran marked this conversation as resolved.
Show resolved Hide resolved
LOG.error(f"File {file_path} could not be parsed. Reason: {str(error)}")
raise error


def divide_csv_daily(file_path: os.PathLike, manifest_id: int):
"""
Split local file into daily content.
"""
data_frame = get_data_frame(file_path)
samdoran marked this conversation as resolved.
Show resolved Hide resolved

report_type, _ = utils.detect_type(file_path)
unique_times = data_frame.interval_start.unique()
days = list({cur_dt[:10] for cur_dt in unique_times})
Expand All @@ -90,6 +93,7 @@ def divide_csv_daily(file_path: os.PathLike, manifest_id: int):
for cur_day in days
]

daily_files = []
for daily_data in daily_data_frames:
day = daily_data.get("date")
df = daily_data.get("data_frame")
Expand Down
3 changes: 3 additions & 0 deletions koku/masu/test/data/ocp/tokenizing-error.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
h1,h2
col1,col2
col1,col2,col3
3 changes: 3 additions & 0 deletions koku/masu/test/data/ocp/valid-csv.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
h1,h2
col1,col2
col1,col2
13 changes: 13 additions & 0 deletions koku/masu/test/external/test_kafka_msg_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,19 @@ def test_divide_csv_daily(self):
for expected_item in expected:
self.assertIn(expected_item, daily_files)

def test_get_data_frame(self):
"""Test the divide_csv_daily method."""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should be broken into two distinct tests: one that asserts it returns as expected with valid data, the other that it behaves as intended with invalid data.

The way it reads right now is not immediatebly obvious what the test is asserting. It's a "no news is good news" test instead of explicitly spelling out pass/fail criteria.

Copy link
Contributor

@samdoran samdoran Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the docstring needs to be updated or, if the test code is obvious enough, omitted.


file_paths = [
Path("./koku/masu/test/data/ocp/valid-csv.csv"),
Path("./koku/masu/test/data/ocp/tokenizing-error.csv"),
]
for file_path in file_paths:
try:
msg_handler.get_data_frame(file_path)
except Exception:
self.fail(f"failed to read: {file_path}")

@patch("masu.external.kafka_msg_handler.os")
@patch("masu.external.kafka_msg_handler.copy_local_report_file_to_s3_bucket")
@patch("masu.external.kafka_msg_handler.divide_csv_daily")
Expand Down
Loading