diff --git a/end_to_end_tests/upload_test.py b/end_to_end_tests/upload_test.py index 08d7eebb1e..3de7fc2da5 100755 --- a/end_to_end_tests/upload_test.py +++ b/end_to_end_tests/upload_test.py @@ -230,5 +230,109 @@ def test_large_upload_csv_over_flush_limit(self): events = sketch.explore("data_type:73kcsv", as_pandas=True, max_entries=100000) self.assertions.assertEqual(len(events), 73251) + def test_datetime_out_of_normal_range_in_csv(self): + """Test uploading a file with events from way back and some + in a distant future. This test can reveal edge cases that might occur + when tools produce a "fake" datetime value""" + + rand = str(random.randint(0, 10000)) + sketch = self.api.create_sketch( + name=f"datetime_out_of_normal_range_in_csv_{rand}" + ) + self.sketch = sketch + file_path = "/usr/local/src/timesketch/test_tools/test_events/validate_time_out_of_range.csv" # pylint: disable=line-too-long + self.import_timeline(file_path, index_name=rand, sketch=sketch) + timeline = sketch.list_timelines()[0] + # check that timeline was uploaded correctly + self.assertions.assertEqual(timeline.name, file_path) + self.assertions.assertEqual(timeline.index.name, str(rand)) + self.assertions.assertEqual(timeline.index.status, "ready") + + # Search for the very old event + search_obj = search.Search(sketch) + search_obj.query_string = "data_type:csv_very_old_event" + search_obj.commit() + self.assertions.assertEqual(len(search_obj.table), 1) + self.assertions.assertEqual( + "1601-01-01" in str(search_obj.table["datetime"]), True + ) + + # Search for future event check if datetime value is in the result + search_obj2 = search.Search(sketch) + search_obj2.query_string = "data_type:csv_very_future_event" + search_obj2.commit() + self.assertions.assertEqual(len(search_obj2.table), 1) + self.assertions.assertEqual( + "2227-12-31" in str(search_obj2.table["datetime"]), True + ) + + def test_csv_different_timestamps(self): + """Test uploading a timeline with different precision of timestamps.""" + + # create a new sketch + rand = str(random.randint(0, 10000)) + sketch = self.api.create_sketch(name=f"csv_different_timestamps_{rand}") + self.sketch = sketch + + file_path = "/tmp/timestamptest.csv" + + with open(file_path, "w", encoding="utf-8") as file_object: + file_object.write( + '"message","timestamp","datetime","timestamp_desc","data_type"\n' + ) + string = ( + '"total precision","123456789",' + '"2024-07-24T10:57:02.877297Z","Write time","timestamptest"\n' + ) + file_object.write(string) + string = ( + '"ISO8601","1331698658276340",' + '"2015-07-24T19:01:01+00:00","Write time","timestamptest"\n' + ) + file_object.write(string) + string = ( + '"Wrong epoch","123456",' + '"2015-07-24 19:01:01","Write time","timestamptest fail"\n' + ) + file_object.write(string) + string = '"no_datetime","123456","","Write time","no_datetime"\n' + file_object.write(string) + string = ( + '"Notimestamp","",' + '"2015-07-24 19:01:01","Write time","no_timestamp"\n' + ) + file_object.write(string) + string = ( + '"Accurate_timestamp","1331712840499027",' + '"2015-07-24 19:01:01","Write time","Accurate_timestamp"\n' + ) + file_object.write(string) + + self.import_timeline("/tmp/timestamptest.csv", index_name=rand, sketch=sketch) + os.remove(file_path) + + timeline = sketch.list_timelines()[0] + # check that timeline was uploaded correctly + self.assertions.assertEqual(timeline.name, file_path) + self.assertions.assertEqual(timeline.index.name, str(rand)) + self.assertions.assertEqual(timeline.index.status, "ready") + + search_obj = search.Search(sketch) + search_obj.query_string = "data_type:timestamptest" + search_obj.commit() + self.assertions.assertEqual(len(search_obj.table), 3) + + # check that the number of events is correct with a different method + events = sketch.explore("data_type:timestamptest", as_pandas=True) + self.assertions.assertEqual(len(events), 3) + + # check that events with no timestamp + events = sketch.explore("data_type:no_timestamp", as_pandas=True) + self.assertions.assertEqual(len(events), 1) + + # check number of events with no datetime + events = sketch.explore("data_type:no_datetime", as_pandas=True) + self.assertions.assertEqual(len(events), 1) + manager.EndToEndTestManager.register_test(UploadTest) diff --git a/test_tools/test_events/validate_time_out_of_range.csv b/test_tools/test_events/validate_time_out_of_range.csv new file mode 100644 index 0000000000..be54ff37b2 --- /dev/null +++ b/test_tools/test_events/validate_time_out_of_range.csv @@ -0,0 +1,3 @@ +"message","timestamp","datetime","timestamp_desc","data_type" +"csv very old event","","1601-01-01T00:00:00Z","Write time","csv_very_old_event" +"csv event in the distant future","","2227-12-31T23:01:01+00:00","Write time","csv_very_future_event" \ No newline at end of file diff --git a/timesketch/lib/utils_test.py b/timesketch/lib/utils_test.py index 1abdaa07bc..751ee5297e 100644 --- a/timesketch/lib/utils_test.py +++ b/timesketch/lib/utils_test.py @@ -296,6 +296,26 @@ def test_time_datetime_valueerror(self): ) self.assertIn("long_timestamp", str(results_list)) + def test_datetime_out_of_normal_range_in_csv(self): + """Test for parsing a file with datetimes that are way out of range for + normal usage + One of the reasons to create this is: + https://github.com/google/timesketch/issues/1617 + """ + results = iter( + read_and_validate_csv( + "test_tools/test_events/validate_time_out_of_range.csv" + ) + ) + results_list = [] + for item in results: + results_list.append(item) + self.assertIsNotNone(item["timestamp"]) + + self.assertIn("csv_very_future_event", str(results_list)) + self.assertIn("2227-12-31T23:01:01+00:00", str(results_list)) + self.assertNotIn("1601-01-01", str(results_list)) + def test_time_precision_in_csv(self): """Test for parsing a file with time precision""" results = iter(