From 12409960c631b1006830a8c1156530f6c75e6929 Mon Sep 17 00:00:00 2001 From: Michael Dombrowski Date: Tue, 8 Jun 2021 17:25:21 -0700 Subject: [PATCH] fix: respect 24 hour time span limits for CW upload (#62) --- .../logmanager/LogManagerTest.java | 8 ++- .../CloudWatchAttemptLogsProcessor.java | 46 +++++++++++----- .../logmanager/LogManagerService.java | 2 +- .../CloudWatchAttemptLogsProcessorTest.java | 52 +++++++++++++++++-- .../logmanager/LogManagerServiceTest.java | 1 + 5 files changed, 90 insertions(+), 19 deletions(-) diff --git a/src/integrationtests/java/com/aws/greengrass/integrationtests/logmanager/LogManagerTest.java b/src/integrationtests/java/com/aws/greengrass/integrationtests/logmanager/LogManagerTest.java index 097e3046..aa0e4a04 100644 --- a/src/integrationtests/java/com/aws/greengrass/integrationtests/logmanager/LogManagerTest.java +++ b/src/integrationtests/java/com/aws/greengrass/integrationtests/logmanager/LogManagerTest.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.TimeZone; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -70,6 +71,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +@SuppressWarnings("PMD.UnsynchronizedStaticFormatter") @ExtendWith({GGExtension.class, MockitoExtension.class}) class LogManagerTest extends BaseITCase { private static final String THING_NAME = "ThingName"; @@ -81,6 +83,10 @@ class LogManagerTest extends BaseITCase { private Path tempDirectoryPath; private final static ObjectMapper YAML_OBJECT_MAPPER = new ObjectMapper(new YAMLFactory()); + static { + DATE_FORMATTER.setTimeZone(TimeZone.getTimeZone("UTC")); + } + @Mock private CloudWatchLogsClient cloudWatchLogsClient; @Captor @@ -103,7 +109,7 @@ void setupKernel(Path storeDirectory, String configFileName) throws InterruptedE System.setProperty("root", tempRootDir.toAbsolutePath().toString()); CountDownLatch logManagerRunning = new CountDownLatch(1); - CompletableFuture cf = new CompletableFuture(); + CompletableFuture cf = new CompletableFuture<>(); cf.complete(null); kernel = new Kernel(); diff --git a/src/main/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessor.java b/src/main/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessor.java index b669daf3..2ce98e12 100644 --- a/src/main/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessor.java +++ b/src/main/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessor.java @@ -28,11 +28,14 @@ import java.time.Instant; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; +import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalAccessor; +import java.util.Comparator; import java.util.Date; import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -55,10 +58,17 @@ public class CloudWatchAttemptLogsProcessor { private static final int MAX_BATCH_SIZE = 1024 * 1024; private static final ObjectMapper DESERIALIZER = new ObjectMapper() .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true); - private final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd", Locale.ENGLISH); + private static final ThreadLocal DATE_FORMATTER = + ThreadLocal.withInitial(() -> { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd", Locale.ENGLISH); + // Set timezone of the formatter to UTC since we always pass in UTC dates. We don't want it to think + // that it should use the local timezone. + sdf.setTimeZone(TimeZone.getTimeZone("UTC")); + return sdf; + }); private final DeviceConfiguration deviceConfiguration; private static final Logger logger = LogManager.getLogger(CloudWatchAttemptLogsProcessor.class); - private static Pattern textTimestampPattern = Pattern.compile("([\\w-:.+]+)"); + private static final Pattern textTimestampPattern = Pattern.compile("([\\w-:.+]+)"); /** * Constructor. @@ -193,7 +203,7 @@ private boolean processLogLine(AtomicInteger totalBytesRead, Optional logMessage = tryGetstructuredLogMessage(data); if (logMessage.isPresent()) { logStreamName = logStreamName.replace("{date}", - dateFormatter.format(new Date(logMessage.get().getTimestamp()))); + DATE_FORMATTER.get().format(new Date(logMessage.get().getTimestamp()))); attemptLogInformation = logStreamsMap.computeIfAbsent(logStreamName, key -> CloudWatchAttemptLogInformation.builder() .componentName(componentName) @@ -202,11 +212,6 @@ private boolean processLogLine(AtomicInteger totalBytesRead, desiredLogLevel, logMessage.get()); } else { - logStreamName = logStreamName.replace("{date}", dateFormatter.format(new Date())); - attemptLogInformation = logStreamsMap.computeIfAbsent(logStreamName, - key -> CloudWatchAttemptLogInformation.builder() - .componentName(componentName) - .build()); Matcher matcher = textTimestampPattern.matcher(data); Instant logTimestamp = Instant.now(); if (matcher.find()) { @@ -221,8 +226,12 @@ private boolean processLogLine(AtomicInteger totalBytesRead, } } } - reachedMaxSize = addNewLogEvent(totalBytesRead, attemptLogInformation, data.toString(), - logTimestamp.toEpochMilli()); + logStreamName = logStreamName.replace("{date}", DATE_FORMATTER.get().format(Date.from(logTimestamp))); + attemptLogInformation = logStreamsMap.computeIfAbsent(logStreamName, + key -> CloudWatchAttemptLogInformation.builder() + .componentName(componentName) + .build()); + reachedMaxSize = addNewLogEvent(totalBytesRead, attemptLogInformation, data.toString(), logTimestamp); } if (!reachedMaxSize) { updateCloudWatchAttemptLogInformation(fileName, startPosition, currentPosition, attemptLogInformation, @@ -290,7 +299,8 @@ private boolean checkAndAddNewLogEvent(AtomicInteger totalBytesRead, if (currentLogLevel.toInt() < desiredLogLevel.toInt()) { return false; } - return addNewLogEvent(totalBytesRead, attemptLogInformation, data.toString(), logMessage.getTimestamp()); + return addNewLogEvent(totalBytesRead, attemptLogInformation, data.toString(), + Instant.ofEpochMilli(logMessage.getTimestamp())); } /** @@ -305,18 +315,28 @@ private boolean checkAndAddNewLogEvent(AtomicInteger totalBytesRead, * the log line data size to get the exact size of the input log events. */ private boolean addNewLogEvent(AtomicInteger totalBytesRead, CloudWatchAttemptLogInformation attemptLogInformation, - String data, long timestamp) { + String data, Instant timestamp) { int dataSize = data.getBytes(StandardCharsets.UTF_8).length; // Total bytes equal the number of bytes of the data plus 8 bytes for the timestamp since its a long // and there is an overhead for each log event on the cloud watch side which needs to be added. if (totalBytesRead.get() + dataSize + TIMESTAMP_BYTES + EVENT_STORAGE_OVERHEAD > MAX_BATCH_SIZE) { return true; } + + // If the earliest time + 24 hours is before the new time then we cannot insert it because the gap + // from earliest to latest is greater than 24 hours. Otherwise we can add it. + // Using 23 instead of 24 hours to have a bit of leeway. + Optional earliestTime = + attemptLogInformation.getLogEvents().stream().min(Comparator.comparingLong(InputLogEvent::timestamp)); + if (earliestTime.isPresent() && Instant.ofEpochMilli(earliestTime.get().timestamp()).plus(23, ChronoUnit.HOURS) + .isBefore(timestamp)) { + return true; + } totalBytesRead.addAndGet(dataSize + TIMESTAMP_BYTES + EVENT_STORAGE_OVERHEAD); InputLogEvent inputLogEvent = InputLogEvent.builder() .message(data) - .timestamp(timestamp).build(); + .timestamp(timestamp.toEpochMilli()).build(); attemptLogInformation.getLogEvents().add(inputLogEvent); return false; } diff --git a/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java b/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java index 57a6bc8f..f461ba25 100644 --- a/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java +++ b/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java @@ -539,7 +539,7 @@ private void processLogsAndUpload() throws InterruptedException { .build())); allFiles.forEach(file -> { long startPosition = 0; - // If the file was paritially read in the previous run, then get the starting position for + // If the file was partially read in the previous run, then get the starting position for // new log lines. if (componentCurrentProcessingLogFile.containsKey(componentName)) { CurrentProcessingFileInformation processingFileInformation = diff --git a/src/test/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessorTest.java b/src/test/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessorTest.java index 6ed8fd5a..cc82c5d0 100644 --- a/src/test/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessorTest.java +++ b/src/test/java/com/aws/greengrass/logmanager/CloudWatchAttemptLogsProcessorTest.java @@ -44,6 +44,7 @@ import java.util.Date; import java.util.List; import java.util.Locale; +import java.util.TimeZone; import java.util.regex.Pattern; import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_AWS_REGION; @@ -51,15 +52,21 @@ import static com.aws.greengrass.logmanager.CloudWatchAttemptLogsProcessor.DEFAULT_LOG_GROUP_NAME; import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionOfType; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.lenient; +@SuppressWarnings("PMD.UnsynchronizedStaticFormatter") @ExtendWith({MockitoExtension.class, GGExtension.class}) class CloudWatchAttemptLogsProcessorTest extends GGServiceTestUtil { private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy/MM/dd", Locale.ENGLISH); + static { + DATE_FORMATTER.setTimeZone(TimeZone.getTimeZone("UTC")); + } + @Mock private DeviceConfiguration mockDeviceConfiguration; @TempDir @@ -97,7 +104,7 @@ void GIVEN_one_system_component_one_file_less_than_max_WHEN_merge_THEN_reads_ent assertThat(attempt.getLogStreamsToLogEventsMap().entrySet(), IsNot.not(IsEmptyCollection.empty())); String logGroup = calculateLogGroupName(ComponentType.GreengrassSystemComponent, "testRegion", "TestComponent"); assertEquals(attempt.getLogGroupName(), logGroup); - String logStream = calculateLogStreamName("testThing"); + String logStream = "/2020/12/17/thing/testThing"; assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream)); CloudWatchAttemptLogInformation logEventsForStream1 = attempt.getLogStreamsToLogEventsMap().get(logStream); assertNotNull(logEventsForStream1.getLogEvents()); @@ -139,7 +146,7 @@ void GIVEN_one_user_component_one_file_less_than_max_WHEN_merge_THEN_reads_entir assertThat(attempt.getLogStreamsToLogEventsMap().entrySet(), IsNot.not(IsEmptyCollection.empty())); String logGroup = calculateLogGroupName(ComponentType.UserComponent, "testRegion", "TestComponent"); assertEquals(attempt.getLogGroupName(), logGroup); - String logStream = calculateLogStreamName("testThing"); + String logStream = "/2020/12/17/thing/testThing"; assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream)); CloudWatchAttemptLogInformation logEventsForStream1 = attempt.getLogStreamsToLogEventsMap().get(logStream); assertNotNull(logEventsForStream1.getLogEvents()); @@ -181,7 +188,7 @@ void GIVEN_one_component_WHEN_one_file_less_than_max_THEN_reads_entire_file( assertThat(attempt.getLogStreamsToLogEventsMap().entrySet(), IsNot.not(IsEmptyCollection.empty())); String logGroup = calculateLogGroupName(ComponentType.GreengrassSystemComponent, "testRegion", "TestComponent"); assertEquals(attempt.getLogGroupName(), logGroup); - String logStream = calculateLogStreamName("testThing"); + String logStream = "/2020/12/17/thing/testThing"; assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream)); CloudWatchAttemptLogInformation logEventsForStream1 = attempt.getLogStreamsToLogEventsMap().get(logStream); assertNotNull(logEventsForStream1.getLogEvents()); @@ -262,6 +269,43 @@ void GIVEN_one_component_one_file_more_than_max_WHEN_merge_THEN_reads_partial_fi } } + @Test + void GIVEN_one_component_one_file_24h_gap_WHEN_merge_THEN_reads_partial_file(ExtensionContext context1) + throws IOException { + File file = new File(directoryPath.resolve("greengrass_test.log").toUri()); + assertTrue(file.createNewFile()); + assertTrue(file.setReadable(true)); + assertTrue(file.setWritable(true)); + try (OutputStream fileOutputStream = Files.newOutputStream(file.toPath())) { + fileOutputStream.write("2021-06-08T01:00:00Z ABC1\n".getBytes(StandardCharsets.UTF_8)); + fileOutputStream.write("2021-06-08T02:00:00Z ABC2\n".getBytes(StandardCharsets.UTF_8)); + fileOutputStream.write("2021-06-09T01:00:00Z ABC3\n".getBytes(StandardCharsets.UTF_8)); + fileOutputStream.write("2021-06-09T02:00:00Z ABC4\n".getBytes(StandardCharsets.UTF_8)); + } + + try { + List logFileInformationSet = new ArrayList<>(); + logFileInformationSet.add(LogFileInformation.builder().startPosition(0).file(file).build()); + ComponentLogFileInformation componentLogFileInformation = ComponentLogFileInformation.builder() + .name("TestComponent") + .multiLineStartPattern(Pattern.compile("^[^\\s]+(\\s+[^\\s]+)*$")) + .desiredLogLevel(Level.INFO) + .componentType(ComponentType.GreengrassSystemComponent) + .logFileInformationList(logFileInformationSet) + .build(); + + logsProcessor = new CloudWatchAttemptLogsProcessor(mockDeviceConfiguration); + CloudWatchAttempt attempt = logsProcessor.processLogFiles(componentLogFileInformation); + assertNotNull(attempt); + String logStream1 = "/2021/06/08/thing/testThing"; + String logStream2 = "/2021/06/09/thing/testThing"; + assertThat(attempt.getLogStreamsToLogEventsMap().get(logStream1).getLogEvents(), hasSize(2)); + assertThat(attempt.getLogStreamsToLogEventsMap().get(logStream2).getLogEvents(), hasSize(2)); + } finally { + assertTrue(file.delete()); + } + } + @Test void GIVEN_one_components_two_file_less_than_max_WHEN_merge_THEN_reads_and_merges_both_files(ExtensionContext ec) throws URISyntaxException { @@ -288,7 +332,7 @@ void GIVEN_one_components_two_file_less_than_max_WHEN_merge_THEN_reads_and_merge assertThat(attempt.getLogStreamsToLogEventsMap().entrySet(), IsNot.not(IsEmptyCollection.empty())); String logGroup = calculateLogGroupName(ComponentType.GreengrassSystemComponent, "testRegion", "TestComponent"); assertEquals(attempt.getLogGroupName(), logGroup); - String logStream = calculateLogStreamName("testThing"); + String logStream = "/2020/12/17/thing/testThing"; String logStream2 = "/2020/02/10/thing/testThing"; assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream)); assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream2)); diff --git a/src/test/java/com/aws/greengrass/logmanager/LogManagerServiceTest.java b/src/test/java/com/aws/greengrass/logmanager/LogManagerServiceTest.java index 41265ac9..9e72e0b0 100644 --- a/src/test/java/com/aws/greengrass/logmanager/LogManagerServiceTest.java +++ b/src/test/java/com/aws/greengrass/logmanager/LogManagerServiceTest.java @@ -212,6 +212,7 @@ public void cleanup() throws InterruptedException { logsUploaderService.componentCurrentProcessingLogFile.clear(); logsUploaderService.lastComponentUploadedLogFileInstantMap.clear(); logsUploaderService.shutdown(); + executor.shutdownNow(); } @Test