Skip to content

Commit

Permalink
fix: respect 24 hour time span limits for CW upload (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDombo authored Jun 9, 2021
1 parent 9982656 commit 1240996
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -70,6 +71,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings("PMD.UnsynchronizedStaticFormatter")
@ExtendWith({GGExtension.class, MockitoExtension.class})
class LogManagerTest extends BaseITCase {
private static final String THING_NAME = "ThingName";
Expand All @@ -81,6 +83,10 @@ class LogManagerTest extends BaseITCase {
private Path tempDirectoryPath;
private final static ObjectMapper YAML_OBJECT_MAPPER = new ObjectMapper(new YAMLFactory());

static {
DATE_FORMATTER.setTimeZone(TimeZone.getTimeZone("UTC"));
}

@Mock
private CloudWatchLogsClient cloudWatchLogsClient;
@Captor
Expand All @@ -103,7 +109,7 @@ void setupKernel(Path storeDirectory, String configFileName) throws InterruptedE
System.setProperty("root", tempRootDir.toAbsolutePath().toString());
CountDownLatch logManagerRunning = new CountDownLatch(1);

CompletableFuture cf = new CompletableFuture();
CompletableFuture<Void> cf = new CompletableFuture<>();
cf.complete(null);
kernel = new Kernel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAccessor;
import java.util.Comparator;
import java.util.Date;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -55,10 +58,17 @@ public class CloudWatchAttemptLogsProcessor {
private static final int MAX_BATCH_SIZE = 1024 * 1024;
private static final ObjectMapper DESERIALIZER = new ObjectMapper()
.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
private final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy/MM/dd", Locale.ENGLISH);
private static final ThreadLocal<SimpleDateFormat> DATE_FORMATTER =
ThreadLocal.withInitial(() -> {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd", Locale.ENGLISH);
// Set timezone of the formatter to UTC since we always pass in UTC dates. We don't want it to think
// that it should use the local timezone.
sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
return sdf;
});
private final DeviceConfiguration deviceConfiguration;
private static final Logger logger = LogManager.getLogger(CloudWatchAttemptLogsProcessor.class);
private static Pattern textTimestampPattern = Pattern.compile("([\\w-:.+]+)");
private static final Pattern textTimestampPattern = Pattern.compile("([\\w-:.+]+)");

/**
* Constructor.
Expand Down Expand Up @@ -193,7 +203,7 @@ private boolean processLogLine(AtomicInteger totalBytesRead,
Optional<GreengrassLogMessage> logMessage = tryGetstructuredLogMessage(data);
if (logMessage.isPresent()) {
logStreamName = logStreamName.replace("{date}",
dateFormatter.format(new Date(logMessage.get().getTimestamp())));
DATE_FORMATTER.get().format(new Date(logMessage.get().getTimestamp())));
attemptLogInformation = logStreamsMap.computeIfAbsent(logStreamName,
key -> CloudWatchAttemptLogInformation.builder()
.componentName(componentName)
Expand All @@ -202,11 +212,6 @@ private boolean processLogLine(AtomicInteger totalBytesRead,
desiredLogLevel, logMessage.get());

} else {
logStreamName = logStreamName.replace("{date}", dateFormatter.format(new Date()));
attemptLogInformation = logStreamsMap.computeIfAbsent(logStreamName,
key -> CloudWatchAttemptLogInformation.builder()
.componentName(componentName)
.build());
Matcher matcher = textTimestampPattern.matcher(data);
Instant logTimestamp = Instant.now();
if (matcher.find()) {
Expand All @@ -221,8 +226,12 @@ private boolean processLogLine(AtomicInteger totalBytesRead,
}
}
}
reachedMaxSize = addNewLogEvent(totalBytesRead, attemptLogInformation, data.toString(),
logTimestamp.toEpochMilli());
logStreamName = logStreamName.replace("{date}", DATE_FORMATTER.get().format(Date.from(logTimestamp)));
attemptLogInformation = logStreamsMap.computeIfAbsent(logStreamName,
key -> CloudWatchAttemptLogInformation.builder()
.componentName(componentName)
.build());
reachedMaxSize = addNewLogEvent(totalBytesRead, attemptLogInformation, data.toString(), logTimestamp);
}
if (!reachedMaxSize) {
updateCloudWatchAttemptLogInformation(fileName, startPosition, currentPosition, attemptLogInformation,
Expand Down Expand Up @@ -290,7 +299,8 @@ private boolean checkAndAddNewLogEvent(AtomicInteger totalBytesRead,
if (currentLogLevel.toInt() < desiredLogLevel.toInt()) {
return false;
}
return addNewLogEvent(totalBytesRead, attemptLogInformation, data.toString(), logMessage.getTimestamp());
return addNewLogEvent(totalBytesRead, attemptLogInformation, data.toString(),
Instant.ofEpochMilli(logMessage.getTimestamp()));
}

/**
Expand All @@ -305,18 +315,28 @@ private boolean checkAndAddNewLogEvent(AtomicInteger totalBytesRead,
* the log line data size to get the exact size of the input log events.
*/
private boolean addNewLogEvent(AtomicInteger totalBytesRead, CloudWatchAttemptLogInformation attemptLogInformation,
String data, long timestamp) {
String data, Instant timestamp) {
int dataSize = data.getBytes(StandardCharsets.UTF_8).length;
// Total bytes equal the number of bytes of the data plus 8 bytes for the timestamp since its a long
// and there is an overhead for each log event on the cloud watch side which needs to be added.
if (totalBytesRead.get() + dataSize + TIMESTAMP_BYTES + EVENT_STORAGE_OVERHEAD > MAX_BATCH_SIZE) {
return true;
}

// If the earliest time + 24 hours is before the new time then we cannot insert it because the gap
// from earliest to latest is greater than 24 hours. Otherwise we can add it.
// Using 23 instead of 24 hours to have a bit of leeway.
Optional<InputLogEvent> earliestTime =
attemptLogInformation.getLogEvents().stream().min(Comparator.comparingLong(InputLogEvent::timestamp));
if (earliestTime.isPresent() && Instant.ofEpochMilli(earliestTime.get().timestamp()).plus(23, ChronoUnit.HOURS)
.isBefore(timestamp)) {
return true;
}
totalBytesRead.addAndGet(dataSize + TIMESTAMP_BYTES + EVENT_STORAGE_OVERHEAD);

InputLogEvent inputLogEvent = InputLogEvent.builder()
.message(data)
.timestamp(timestamp).build();
.timestamp(timestamp.toEpochMilli()).build();
attemptLogInformation.getLogEvents().add(inputLogEvent);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ private void processLogsAndUpload() throws InterruptedException {
.build()));
allFiles.forEach(file -> {
long startPosition = 0;
// If the file was paritially read in the previous run, then get the starting position for
// If the file was partially read in the previous run, then get the starting position for
// new log lines.
if (componentCurrentProcessingLogFile.containsKey(componentName)) {
CurrentProcessingFileInformation processingFileInformation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,29 @@
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.TimeZone;
import java.util.regex.Pattern;

import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_AWS_REGION;
import static com.aws.greengrass.deployment.DeviceConfiguration.DEVICE_PARAM_THING_NAME;
import static com.aws.greengrass.logmanager.CloudWatchAttemptLogsProcessor.DEFAULT_LOG_GROUP_NAME;
import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionOfType;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.lenient;


@SuppressWarnings("PMD.UnsynchronizedStaticFormatter")
@ExtendWith({MockitoExtension.class, GGExtension.class})
class CloudWatchAttemptLogsProcessorTest extends GGServiceTestUtil {
private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyy/MM/dd", Locale.ENGLISH);
static {
DATE_FORMATTER.setTimeZone(TimeZone.getTimeZone("UTC"));
}

@Mock
private DeviceConfiguration mockDeviceConfiguration;
@TempDir
Expand Down Expand Up @@ -97,7 +104,7 @@ void GIVEN_one_system_component_one_file_less_than_max_WHEN_merge_THEN_reads_ent
assertThat(attempt.getLogStreamsToLogEventsMap().entrySet(), IsNot.not(IsEmptyCollection.empty()));
String logGroup = calculateLogGroupName(ComponentType.GreengrassSystemComponent, "testRegion", "TestComponent");
assertEquals(attempt.getLogGroupName(), logGroup);
String logStream = calculateLogStreamName("testThing");
String logStream = "/2020/12/17/thing/testThing";
assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream));
CloudWatchAttemptLogInformation logEventsForStream1 = attempt.getLogStreamsToLogEventsMap().get(logStream);
assertNotNull(logEventsForStream1.getLogEvents());
Expand Down Expand Up @@ -139,7 +146,7 @@ void GIVEN_one_user_component_one_file_less_than_max_WHEN_merge_THEN_reads_entir
assertThat(attempt.getLogStreamsToLogEventsMap().entrySet(), IsNot.not(IsEmptyCollection.empty()));
String logGroup = calculateLogGroupName(ComponentType.UserComponent, "testRegion", "TestComponent");
assertEquals(attempt.getLogGroupName(), logGroup);
String logStream = calculateLogStreamName("testThing");
String logStream = "/2020/12/17/thing/testThing";
assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream));
CloudWatchAttemptLogInformation logEventsForStream1 = attempt.getLogStreamsToLogEventsMap().get(logStream);
assertNotNull(logEventsForStream1.getLogEvents());
Expand Down Expand Up @@ -181,7 +188,7 @@ void GIVEN_one_component_WHEN_one_file_less_than_max_THEN_reads_entire_file(
assertThat(attempt.getLogStreamsToLogEventsMap().entrySet(), IsNot.not(IsEmptyCollection.empty()));
String logGroup = calculateLogGroupName(ComponentType.GreengrassSystemComponent, "testRegion", "TestComponent");
assertEquals(attempt.getLogGroupName(), logGroup);
String logStream = calculateLogStreamName("testThing");
String logStream = "/2020/12/17/thing/testThing";
assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream));
CloudWatchAttemptLogInformation logEventsForStream1 = attempt.getLogStreamsToLogEventsMap().get(logStream);
assertNotNull(logEventsForStream1.getLogEvents());
Expand Down Expand Up @@ -262,6 +269,43 @@ void GIVEN_one_component_one_file_more_than_max_WHEN_merge_THEN_reads_partial_fi
}
}

@Test
void GIVEN_one_component_one_file_24h_gap_WHEN_merge_THEN_reads_partial_file(ExtensionContext context1)
throws IOException {
File file = new File(directoryPath.resolve("greengrass_test.log").toUri());
assertTrue(file.createNewFile());
assertTrue(file.setReadable(true));
assertTrue(file.setWritable(true));
try (OutputStream fileOutputStream = Files.newOutputStream(file.toPath())) {
fileOutputStream.write("2021-06-08T01:00:00Z ABC1\n".getBytes(StandardCharsets.UTF_8));
fileOutputStream.write("2021-06-08T02:00:00Z ABC2\n".getBytes(StandardCharsets.UTF_8));
fileOutputStream.write("2021-06-09T01:00:00Z ABC3\n".getBytes(StandardCharsets.UTF_8));
fileOutputStream.write("2021-06-09T02:00:00Z ABC4\n".getBytes(StandardCharsets.UTF_8));
}

try {
List<LogFileInformation> logFileInformationSet = new ArrayList<>();
logFileInformationSet.add(LogFileInformation.builder().startPosition(0).file(file).build());
ComponentLogFileInformation componentLogFileInformation = ComponentLogFileInformation.builder()
.name("TestComponent")
.multiLineStartPattern(Pattern.compile("^[^\\s]+(\\s+[^\\s]+)*$"))
.desiredLogLevel(Level.INFO)
.componentType(ComponentType.GreengrassSystemComponent)
.logFileInformationList(logFileInformationSet)
.build();

logsProcessor = new CloudWatchAttemptLogsProcessor(mockDeviceConfiguration);
CloudWatchAttempt attempt = logsProcessor.processLogFiles(componentLogFileInformation);
assertNotNull(attempt);
String logStream1 = "/2021/06/08/thing/testThing";
String logStream2 = "/2021/06/09/thing/testThing";
assertThat(attempt.getLogStreamsToLogEventsMap().get(logStream1).getLogEvents(), hasSize(2));
assertThat(attempt.getLogStreamsToLogEventsMap().get(logStream2).getLogEvents(), hasSize(2));
} finally {
assertTrue(file.delete());
}
}

@Test
void GIVEN_one_components_two_file_less_than_max_WHEN_merge_THEN_reads_and_merges_both_files(ExtensionContext ec)
throws URISyntaxException {
Expand All @@ -288,7 +332,7 @@ void GIVEN_one_components_two_file_less_than_max_WHEN_merge_THEN_reads_and_merge
assertThat(attempt.getLogStreamsToLogEventsMap().entrySet(), IsNot.not(IsEmptyCollection.empty()));
String logGroup = calculateLogGroupName(ComponentType.GreengrassSystemComponent, "testRegion", "TestComponent");
assertEquals(attempt.getLogGroupName(), logGroup);
String logStream = calculateLogStreamName("testThing");
String logStream = "/2020/12/17/thing/testThing";
String logStream2 = "/2020/02/10/thing/testThing";
assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream));
assertTrue(attempt.getLogStreamsToLogEventsMap().containsKey(logStream2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ public void cleanup() throws InterruptedException {
logsUploaderService.componentCurrentProcessingLogFile.clear();
logsUploaderService.lastComponentUploadedLogFileInstantMap.clear();
logsUploaderService.shutdown();
executor.shutdownNow();
}

@Test
Expand Down

0 comments on commit 1240996

Please sign in to comment.