From e4dd0082edfe9bbfadd5b9d543d794082042ce38 Mon Sep 17 00:00:00 2001 From: rightpeter Date: Fri, 28 Jan 2022 15:34:21 -0800 Subject: [PATCH] fix: bugs resulting log events created incorrectly (#77) * fix total chunk calculating logic to not introduce an extra chunk * fix log line copy logic to copy correct chunk of log line instead of always copy from the begining Co-authored-by: Lihao Lu --- .../CloudWatchAttemptLogsProcessor.java | 10 ++++--- .../CloudWatchAttemptLogsProcessorTest.java | 30 ++++++++++++++----- 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessor.java b/src/main/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessor.java index 726891f9..7d2d1522 100644 --- a/src/main/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessor.java +++ b/src/main/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessor.java @@ -367,23 +367,25 @@ private Pair addNewLogEvent(AtomicInteger totalBytesRead logger.atTrace().kv("log-line-size-bytes", dataSize) .log("Log line larger than maximum event size 256KB, " + "will be split into multiple log events"); } - int totalChunks = dataSize / MAX_EVENT_LENGTH + 1; + int totalChunks = (dataSize - 1) / MAX_EVENT_LENGTH + 1; int currChunk = 1; int currChunkSize; boolean reachedMaxBatchSize = false; AtomicInteger currBytesRead = new AtomicInteger(); - // Keep adding events until there are more chunks left or max batch size limit is reached. + // Keep adding events until there are no more chunks left or max batch size limit is reached. while (currChunk <= totalChunks) { - currChunkSize = currChunk == totalChunks ? dataSize % MAX_EVENT_LENGTH : MAX_EVENT_LENGTH; + currChunkSize = currChunk == totalChunks ? (dataSize - 1) % MAX_EVENT_LENGTH + 1 : MAX_EVENT_LENGTH; reachedMaxBatchSize = reachedMaxBatchSize(totalBytesRead, currChunkSize); if (reachedMaxBatchSize) { break; } + int startFromByte = MAX_EVENT_LENGTH * (currChunk - 1); String partialData = - new String(Arrays.copyOfRange(data.getBytes(StandardCharsets.UTF_8), 0, currChunkSize), + new String(Arrays.copyOfRange(data.getBytes(StandardCharsets.UTF_8), startFromByte, + startFromByte + currChunkSize), StandardCharsets.UTF_8); totalBytesRead.addAndGet(currChunkSize + TIMESTAMP_BYTES + EVENT_STORAGE_OVERHEAD); currBytesRead.addAndGet(currChunkSize); diff --git a/src/test/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessorTest.java b/src/test/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessorTest.java index 392eacee..104e0fe7 100644 --- a/src/test/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessorTest.java +++ b/src/test/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessorTest.java @@ -66,6 +66,9 @@ @ExtendWith({MockitoExtension.class, GGExtension.class}) class CloudWatchAttemptLogsProcessorTest extends GGServiceTestUtil { private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy/MM/dd", Locale.ENGLISH); + private static final int EVENT_STORAGE_OVERHEAD = 26; + private static final int TIMESTAMP_BYTES = 8; + private static final int MAX_EVENT_LENGTH = 1024 * 256 - TIMESTAMP_BYTES - EVENT_STORAGE_OVERHEAD; static { DATE_FORMATTER.setTimeZone(TimeZone.getTimeZone("UTC")); } @@ -430,8 +433,8 @@ void GIVEN_unstructured_log_WHEN_breaches_event_size_limit_THEN_split_line(Exten .append("end\n"); // 1 log line of size ~256KB i.e. larger than event size limit of 256KB (after additional bytes for // timestamp and overhead), should amount to 2 log events. - fileContent = fileContent.append(timestampPrefix).append(RandomStringUtils.random(1024*256, true, true)) - .append("end\n"); + String overSizeLogLine = timestampPrefix + RandomStringUtils.random(1024*256, true, true) + "end"; + fileContent = fileContent.append(overSizeLogLine).append("\n"); // 1 more log line of size ~1KB i.e. within event size limit of 256KB, should amount to 1 log event fileContent = fileContent.append(timestampPrefix) .append(StringUtils.repeat(RandomStringUtils.random(1, true, true), 1024 * 1)).append("end\n"); @@ -459,7 +462,13 @@ void GIVEN_unstructured_log_WHEN_breaches_event_size_limit_THEN_split_line(Exten assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream)); CloudWatchAttemptLogInformation logEventsForStream1 = attempt.getLogStreamsToLogEventsMap().get(logStream); assertNotNull(logEventsForStream1.getLogEvents()); - assertEquals(4, logEventsForStream1.getLogEvents().size()); + + List logEvents = logEventsForStream1.getLogEvents(); + assertEquals(4, logEvents.size()); + // Over size log line got divided and successfully added as log events + assertEquals(overSizeLogLine.substring(0, MAX_EVENT_LENGTH), logEvents.get(1).message()); + assertEquals(overSizeLogLine.substring(MAX_EVENT_LENGTH), logEvents.get(2).message()); + assertTrue(logEventsForStream1.getAttemptLogFileInformationMap().containsKey(file.getAbsolutePath())); assertEquals(0, logEventsForStream1.getAttemptLogFileInformationMap().get(file.getAbsolutePath()) .getStartPosition()); @@ -485,13 +494,17 @@ void GIVEN_unstructured_log_WHEN_breaches_event_size_and_batch_size_limits_THEN_ StringBuilder fileContent = new StringBuilder(); // 1 log line of size ~1KB i.e. within event size limit of 256KB, should amount to 1 log event fileContent = fileContent.append(RandomStringUtils.random(1024*1, true, true)).append("end\n"); - // 4 log lines of size ~256KB i.e. larger than event size limit of 256KB (after additional bytes for + // 1 log line of size 2 * MAX_EVENT_LENGTH KB, should amount to 2 log events + fileContent = fileContent.append(RandomStringUtils.random(MAX_EVENT_LENGTH * 2 - 3, true, true)).append( + "end\n"); + // 2 log lines of size ~256KB i.e. larger than event size limit of 256KB (after additional bytes for // timestamp and overhead), should amount to 2 log events each. - // This should max out batch size limit in the 4th line and 4th line will not get processed - for (int i = 0; i < 4; i++) { + // This should max out batch size limit in the 2nd line of these 2 and it will not get processed + for (int i = 0; i < 2; i++) { fileContent = fileContent.append(RandomStringUtils.random(1024*256, true, true)).append("end\n"); } - int expectedBytesRead = 4*4 + 1024*1 + 3*1024*256; // 4 additional byte in each line for 'end\n' + // 3 additional byte in each line for 'end\n', 1 additional for '\n' + int expectedBytesRead = 2*4 + 1024*1 + MAX_EVENT_LENGTH*2 + 1 + 1*1024*256; fileOutputStream.write(fileContent.toString().getBytes(StandardCharsets.UTF_8)); List logFileInformationSet = new ArrayList<>(); @@ -515,7 +528,8 @@ void GIVEN_unstructured_log_WHEN_breaches_event_size_and_batch_size_limits_THEN_ assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream)); CloudWatchAttemptLogInformation logEventsForStream1 = attempt.getLogStreamsToLogEventsMap().get(logStream); assertNotNull(logEventsForStream1.getLogEvents()); - assertEquals(7, logEventsForStream1.getLogEvents().size()); + // Total log events: 1 (first line) + 2 (second line) + 2 (third line) = 5 + assertEquals(5, logEventsForStream1.getLogEvents().size()); assertTrue(logEventsForStream1.getAttemptLogFileInformationMap().containsKey(file.getAbsolutePath())); assertEquals(0, logEventsForStream1.getAttemptLogFileInformationMap().get(file.getAbsolutePath()) .getStartPosition());