Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding unittests for several csv import related timestamp / datetime edge cases #3177

Merged
merged 8 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions importer_client/python/timesketch_import_client/importer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ def setUp(self):

self.frame = pandas.DataFrame(self.lines)

self._importer = importer.ImportStreamer()

def test_adding_data_frames(self):
"""Test adding a data frame to the importer."""
with MockStreamer() as streamer:
Expand Down Expand Up @@ -175,6 +177,67 @@ def test_adding_json(self):
streamer.flush()
self._run_all_tests(streamer.columns, streamer.lines)

# pylint: disable=protected-access
def test_fix_data_frame(self):
"""Test fixing a data frame.
create a pandas dataframe with timestamp, datetime, message and data_type
columns and check some basics that the method is actually working.
"""

data_frame = pandas.DataFrame(
{
"timestamp": ["1435789661000000"],
"stuff": ["foobar"],
"correct": [True],
"random_number": [11332],
"vital_stats": ["ille"],
"datetime": ["2019-01-03T02:39:42"],
}
)
fixed_frame = self._importer._fix_data_frame(data_frame)
self.assertIsNotNone(fixed_frame)

self.assertIs("ille" in fixed_frame["vital_stats"].values, True)
print(fixed_frame["datetime"].values)
self.assertIs(
"2019-01-03T02:39:42+0000" in fixed_frame["datetime"].values, True
)

def test_fix_data_frame_precision_datetime(self):
"""Test fixing a data frame with a datetime hat has microsecond precision."""

data_frame = pandas.DataFrame(
{
"timestamp": ["1456"],
"datetime": ["2024-07-24T10:57:02.877297Z"],
}
)
fixed_frame = self._importer._fix_data_frame(data_frame)
self.assertIsNotNone(fixed_frame)

print(fixed_frame["datetime"].values)
self.assertIs(
"2024-07-24T10:57:02+0000" in fixed_frame["datetime"].values, True
)

def test_fix_data_frame_precision_timestamp(self):
"""Test fixing a data frame with a timestamp hat has microsecond precision."""

data_frame = pandas.DataFrame(
{
"timestamp": ["1331698658276340"],
"datetime": ["1985-01-21T10:57:02.25Z"],
}
)
fixed_frame = self._importer._fix_data_frame(data_frame)
self.assertIsNotNone(fixed_frame)

self.assertIs(
"1985-01-21T10:57:02+0000" in fixed_frame["datetime"].values, True
)
self.assertIs("1331698658276340" in fixed_frame["timestamp"].values, True)

# pylint: enable=protected-access
def _run_all_tests(self, columns, lines):
"""Run all tests on the result set of a streamer."""
# The first line is the column line.
Expand Down
5 changes: 5 additions & 0 deletions test_tools/test_events/invalid_datetime.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"message","timestamp","datetime","timestamp_desc","data_type"
"Missing timezone info","123456","2017-09-24 19:01:01","Write time","Missing_timezone_info"
"Wrong epoch","123456","2017-07-24T19:01:01","Write time","wrong_timestamp"
"Wrong epoch","9999999999999","2017-10-24 19:01:01","Write time","long_timestamp"
"Wrong epoch","88888888","1234 19:01:01","Write time","wrong_datetime_1"
6 changes: 6 additions & 0 deletions test_tools/test_events/validate_no_datetime_timestamps.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"message","timestamp","datetime","timestamp_desc","data_type"
"No datetime given","1435789661000000","","Logging","No_datetime"
"Whitespace datetime","1437789661000000"," ","Logging","Whitespace_datetime"
"No Timestamp1","","2015-07-25 02:01:01+00:00","Logging","No timestamp1"
"No Timestamp2",,"2014-07-25 02:01:01+00:00","Logging","No timestamp2"
"Whitespace Timestamp"," ","2016-07-25 02:01:01+00:00","Logging","Whitespace timestamp"
3 changes: 3 additions & 0 deletions test_tools/test_events/validate_time_precision.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"message","timestamp","datetime","timestamp_desc","data_type"
"total precision in datetime","123456789","2024-07-24T10:57:02.877297Z","Write time","timestamptest1"
"precision in timestamp","1331698658276340","2015-07-24T19:01:01+00:00","Write time","timestamptest2"
78 changes: 78 additions & 0 deletions timesketch/lib/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,84 @@ def test_timestamp_is_ISOformat(self):
for output in expected_outputs:
self.assertDictEqual(next(results), output)

def test_missing_datetime_in_CSV(self):
"""Test for parsing a file with missing datetime field does attempt
to get it from timestamp or fail"""
results = iter(
read_and_validate_csv(
"test_tools/test_events/validate_no_datetime_timestamps.csv"
)
)

n = 1
for item in results:
n = n + 1
if item["data_type"] == "No timestamp1":
self.assertIsNotNone(item["timestamp"])
self.assertEqual(item["timestamp"], 1437789661000000)
self.assertIsNotNone(item["datetime"])
self.assertEqual(item["datetime"], "2015-07-25T02:01:01+00:00")

elif item["data_type"] == "No timestamp2":
self.assertIsNotNone(item["timestamp"])
self.assertEqual(item["timestamp"], 1406253661000000)
self.assertIsNotNone(item["datetime"])
self.assertEqual(item["datetime"], "2014-07-25T02:01:01+00:00")
elif item["data_type"] == "Whitespace datetime":
self.assertIsNotNone(item["timestamp"])
self.assertEqual(item["datetime"], "2016-07-25T02:01:01+00:00")
self.assertIsNotNone(item["datetime"])

self.assertGreaterEqual(n, 3)

def test_time_datetime_valueerror(self):
"""Test for parsing a file with time precision

The file is currently parsed as:
{'message': 'Missing timezone info', 'timestamp': 123456,
'datetime': '2017-09-24T19:01:01',
'timestamp_desc': 'Write time',
'data_type': 'Missing_timezone_info'}
{'message': 'Wrong epoch', 'timestamp': 123456,
'datetime': '2017-07-24T19:01:01',
'timestamp_desc': 'Write time',
'data_type': 'wrong_timestamp'}
{'message': 'Wrong epoch', 'timestamp': 9999999999999,
'datetime': '2017-10-24T19:01:01',
'timestamp_desc': 'Write time',
'data_type': 'long_timestamp'}

"""

results = iter(
read_and_validate_csv("test_tools/test_events/invalid_datetime.csv")
)
results_list = []
for item in results:
results_list.append(item)
self.assertIsNotNone(item)
# check that certain values are not present in results_list
self.assertNotIn(
"wrong_datetime_1",
str(results_list),
"Parsed line is in results but should be skipped",
)
self.assertIn("long_timestamp", str(results_list))

def test_time_precision_in_csv(self):
"""Test for parsing a file with time precision"""
results = iter(
read_and_validate_csv("test_tools/test_events/validate_time_precision.csv")
)
results_list = []
for item in results:
results_list.append(item)
self.assertIsNotNone(item["timestamp"])

self.assertIn("timestamptest1", str(results_list))
self.assertIn("2024-07-24T10:57:02.877297+00:00", str(results_list))
self.assertIn("timestamptest2", str(results_list))

def test_invalid_JSONL_file(self):
"""Test for JSONL with missing keys in the dictionary wrt headers mapping"""
linedict = {"DT": "2011-11-11", "MSG": "this is a test"}
Expand Down
Loading