Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[COST-861] Tokenizing error - skip bad rows when reading csv #5031

Merged
merged 5 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions koku/masu/external/kafka_msg_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,21 @@ class KafkaMsgHandlerError(Exception):
"""Kafka msg handler error."""


def divide_csv_daily(file_path: os.PathLike, manifest_id: int):
"""
Split local file into daily content.
"""
daily_files = []

def get_data_frame(file_path: os.PathLike):
"""read csv file into dataframe"""
try:
data_frame = pd.read_csv(file_path, dtype=pd.StringDtype(storage="pyarrow"))
return pd.read_csv(file_path, dtype=pd.StringDtype(storage="pyarrow"), on_bad_lines="warn")
except Exception as error:
samdoran marked this conversation as resolved.
Show resolved Hide resolved
LOG.error(f"File {file_path} could not be parsed. Reason: {str(error)}")
raise error


def divide_csv_daily(file_path: os.PathLike, manifest_id: int):
"""
Split local file into daily content.
"""
data_frame = get_data_frame(file_path)
samdoran marked this conversation as resolved.
Show resolved Hide resolved

report_type, _ = utils.detect_type(file_path)
unique_times = data_frame.interval_start.unique()
days = list({cur_dt[:10] for cur_dt in unique_times})
Expand All @@ -90,6 +93,7 @@ def divide_csv_daily(file_path: os.PathLike, manifest_id: int):
for cur_day in days
]

daily_files = []
for daily_data in daily_data_frames:
day = daily_data.get("date")
df = daily_data.get("data_frame")
Expand Down
3 changes: 3 additions & 0 deletions koku/masu/test/data/ocp/tokenizing-error.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
h1,h2
col1,col2
col1,col2,col3
3 changes: 3 additions & 0 deletions koku/masu/test/data/ocp/valid-csv.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
h1,h2
col1,col2
col1,col2
13 changes: 13 additions & 0 deletions koku/masu/test/external/test_kafka_msg_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,19 @@ def test_divide_csv_daily(self):
for expected_item in expected:
self.assertIn(expected_item, daily_files)

def test_get_data_frame_no_tokenizing_error(self):
"""Test get_data_frame does not raise Tokenizing error when reading files."""

file_paths = [
Path("./koku/masu/test/data/ocp/valid-csv.csv"),
Path("./koku/masu/test/data/ocp/tokenizing-error.csv"),
]
for file_path in file_paths:
try:
msg_handler.get_data_frame(file_path)
except Exception:
self.fail(f"failed to read: {file_path}")

@patch("masu.external.kafka_msg_handler.os")
@patch("masu.external.kafka_msg_handler.copy_local_report_file_to_s3_bucket")
@patch("masu.external.kafka_msg_handler.divide_csv_daily")
Expand Down
Loading