Skip to content

Commit

Permalink
fix: bugs resulting log events created incorrectly (#77)
Browse files Browse the repository at this point in the history
* fix total chunk calculating logic to not introduce an extra chunk
* fix log line copy logic to copy correct chunk of log line instead
  of always copy from the begining

Co-authored-by: Lihao Lu <[email protected]>
  • Loading branch information
rightpeter and Lihao Lu authored Jan 28, 2022
1 parent ca1b727 commit e4dd008
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,23 +367,25 @@ private Pair<Boolean, AtomicInteger> addNewLogEvent(AtomicInteger totalBytesRead
logger.atTrace().kv("log-line-size-bytes", dataSize)
.log("Log line larger than maximum event size 256KB, " + "will be split into multiple log events");
}
int totalChunks = dataSize / MAX_EVENT_LENGTH + 1;
int totalChunks = (dataSize - 1) / MAX_EVENT_LENGTH + 1;
int currChunk = 1;
int currChunkSize;
boolean reachedMaxBatchSize = false;
AtomicInteger currBytesRead = new AtomicInteger();

// Keep adding events until there are more chunks left or max batch size limit is reached.
// Keep adding events until there are no more chunks left or max batch size limit is reached.
while (currChunk <= totalChunks) {
currChunkSize = currChunk == totalChunks ? dataSize % MAX_EVENT_LENGTH : MAX_EVENT_LENGTH;
currChunkSize = currChunk == totalChunks ? (dataSize - 1) % MAX_EVENT_LENGTH + 1 : MAX_EVENT_LENGTH;

reachedMaxBatchSize = reachedMaxBatchSize(totalBytesRead, currChunkSize);
if (reachedMaxBatchSize) {
break;
}

int startFromByte = MAX_EVENT_LENGTH * (currChunk - 1);
String partialData =
new String(Arrays.copyOfRange(data.getBytes(StandardCharsets.UTF_8), 0, currChunkSize),
new String(Arrays.copyOfRange(data.getBytes(StandardCharsets.UTF_8), startFromByte,
startFromByte + currChunkSize),
StandardCharsets.UTF_8);
totalBytesRead.addAndGet(currChunkSize + TIMESTAMP_BYTES + EVENT_STORAGE_OVERHEAD);
currBytesRead.addAndGet(currChunkSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
@ExtendWith({MockitoExtension.class, GGExtension.class})
class CloudWatchAttemptLogsProcessorTest extends GGServiceTestUtil {
private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy/MM/dd", Locale.ENGLISH);
private static final int EVENT_STORAGE_OVERHEAD = 26;
private static final int TIMESTAMP_BYTES = 8;
private static final int MAX_EVENT_LENGTH = 1024 * 256 - TIMESTAMP_BYTES - EVENT_STORAGE_OVERHEAD;
static {
DATE_FORMATTER.setTimeZone(TimeZone.getTimeZone("UTC"));
}
Expand Down Expand Up @@ -430,8 +433,8 @@ void GIVEN_unstructured_log_WHEN_breaches_event_size_limit_THEN_split_line(Exten
.append("end\n");
// 1 log line of size ~256KB i.e. larger than event size limit of 256KB (after additional bytes for
// timestamp and overhead), should amount to 2 log events.
fileContent = fileContent.append(timestampPrefix).append(RandomStringUtils.random(1024*256, true, true))
.append("end\n");
String overSizeLogLine = timestampPrefix + RandomStringUtils.random(1024*256, true, true) + "end";
fileContent = fileContent.append(overSizeLogLine).append("\n");
// 1 more log line of size ~1KB i.e. within event size limit of 256KB, should amount to 1 log event
fileContent = fileContent.append(timestampPrefix)
.append(StringUtils.repeat(RandomStringUtils.random(1, true, true), 1024 * 1)).append("end\n");
Expand Down Expand Up @@ -459,7 +462,13 @@ void GIVEN_unstructured_log_WHEN_breaches_event_size_limit_THEN_split_line(Exten
assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream));
CloudWatchAttemptLogInformation logEventsForStream1 = attempt.getLogStreamsToLogEventsMap().get(logStream);
assertNotNull(logEventsForStream1.getLogEvents());
assertEquals(4, logEventsForStream1.getLogEvents().size());

List<InputLogEvent> logEvents = logEventsForStream1.getLogEvents();
assertEquals(4, logEvents.size());
// Over size log line got divided and successfully added as log events
assertEquals(overSizeLogLine.substring(0, MAX_EVENT_LENGTH), logEvents.get(1).message());
assertEquals(overSizeLogLine.substring(MAX_EVENT_LENGTH), logEvents.get(2).message());

assertTrue(logEventsForStream1.getAttemptLogFileInformationMap().containsKey(file.getAbsolutePath()));
assertEquals(0, logEventsForStream1.getAttemptLogFileInformationMap().get(file.getAbsolutePath())
.getStartPosition());
Expand All @@ -485,13 +494,17 @@ void GIVEN_unstructured_log_WHEN_breaches_event_size_and_batch_size_limits_THEN_
StringBuilder fileContent = new StringBuilder();
// 1 log line of size ~1KB i.e. within event size limit of 256KB, should amount to 1 log event
fileContent = fileContent.append(RandomStringUtils.random(1024*1, true, true)).append("end\n");
// 4 log lines of size ~256KB i.e. larger than event size limit of 256KB (after additional bytes for
// 1 log line of size 2 * MAX_EVENT_LENGTH KB, should amount to 2 log events
fileContent = fileContent.append(RandomStringUtils.random(MAX_EVENT_LENGTH * 2 - 3, true, true)).append(
"end\n");
// 2 log lines of size ~256KB i.e. larger than event size limit of 256KB (after additional bytes for
// timestamp and overhead), should amount to 2 log events each.
// This should max out batch size limit in the 4th line and 4th line will not get processed
for (int i = 0; i < 4; i++) {
// This should max out batch size limit in the 2nd line of these 2 and it will not get processed
for (int i = 0; i < 2; i++) {
fileContent = fileContent.append(RandomStringUtils.random(1024*256, true, true)).append("end\n");
}
int expectedBytesRead = 4*4 + 1024*1 + 3*1024*256; // 4 additional byte in each line for 'end\n'
// 3 additional byte in each line for 'end\n', 1 additional for '\n'
int expectedBytesRead = 2*4 + 1024*1 + MAX_EVENT_LENGTH*2 + 1 + 1*1024*256;
fileOutputStream.write(fileContent.toString().getBytes(StandardCharsets.UTF_8));

List<LogFileInformation> logFileInformationSet = new ArrayList<>();
Expand All @@ -515,7 +528,8 @@ void GIVEN_unstructured_log_WHEN_breaches_event_size_and_batch_size_limits_THEN_
assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream));
CloudWatchAttemptLogInformation logEventsForStream1 = attempt.getLogStreamsToLogEventsMap().get(logStream);
assertNotNull(logEventsForStream1.getLogEvents());
assertEquals(7, logEventsForStream1.getLogEvents().size());
// Total log events: 1 (first line) + 2 (second line) + 2 (third line) = 5
assertEquals(5, logEventsForStream1.getLogEvents().size());
assertTrue(logEventsForStream1.getAttemptLogFileInformationMap().containsKey(file.getAbsolutePath()));
assertEquals(0, logEventsForStream1.getAttemptLogFileInformationMap().get(file.getAbsolutePath())
.getStartPosition());
Expand Down

0 comments on commit e4dd008

Please sign in to comment.