diff --git a/src/integrationtests/java/com/aws/greengrass/integrationtests/logmanager/LogManagerConfigTest.java b/src/integrationtests/java/com/aws/greengrass/integrationtests/logmanager/LogManagerConfigTest.java index 2a380f80..87adaf4d 100644 --- a/src/integrationtests/java/com/aws/greengrass/integrationtests/logmanager/LogManagerConfigTest.java +++ b/src/integrationtests/java/com/aws/greengrass/integrationtests/logmanager/LogManagerConfigTest.java @@ -135,7 +135,7 @@ void GIVEN_periodicUploadIntervalSec_config_WHEN_value_is_reset_and_replaced_THE Exception { setupKernel(); - assertThat(() -> logManagerService.getPeriodicUpdateIntervalSec(), eventuallyEval(is(60), + assertThat(() -> logManagerService.getPeriodicUpdateIntervalSec(), eventuallyEval(is(60d), Duration.ofSeconds(30))); logManagerService.getConfig().find(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC).remove(); @@ -143,7 +143,11 @@ void GIVEN_periodicUploadIntervalSec_config_WHEN_value_is_reset_and_replaced_THE eventuallyEval(is(LogManagerService.DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC), Duration.ofSeconds(30))); logManagerService.getConfig().lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC).withValue(600); - assertThat(() -> logManagerService.getPeriodicUpdateIntervalSec(), eventuallyEval(is(600), + assertThat(() -> logManagerService.getPeriodicUpdateIntervalSec(), eventuallyEval(is(600d), + Duration.ofSeconds(30))); + + logManagerService.getConfig().lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC).withValue(0.0001d); + assertThat(() -> logManagerService.getPeriodicUpdateIntervalSec(), eventuallyEval(is(0.0001d), Duration.ofSeconds(30))); } @@ -489,4 +493,4 @@ void GIVEN_runtimeConfiguration_withOldProcessingFileFormat_WHEN_onStartUp_THEN_ }}; assertEquals(expected, processingFiles.toMap()); } -} \ No newline at end of file +} diff --git a/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java b/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java index 6ae78add..1eff44b2 100644 --- a/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java +++ b/src/main/java/com/aws/greengrass/logmanager/LogManagerService.java @@ -103,7 +103,7 @@ public class LogManagerService extends PluginService { public static final String DELETE_LOG_FILES_AFTER_UPLOAD_CONFIG_TOPIC_NAME = "deleteLogFileAfterCloudUpload"; public static final String UPLOAD_TO_CW_CONFIG_TOPIC_NAME = "uploadToCloudWatch"; public static final String MULTILINE_PATTERN_CONFIG_TOPIC_NAME = "multiLineStartPattern"; - public static final int DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC = 300; + public static final double DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC = 300; public static final int MAX_CACHE_INACTIVE_TIME_SECONDS = 60 * 60 * 24; // 1 day private final List> serviceStatusListeners = new ArrayList<>(); @@ -124,7 +124,7 @@ public class LogManagerService extends PluginService { private final CloudWatchAttemptLogsProcessor logsProcessor; private final AtomicBoolean isCurrentlyUploading = new AtomicBoolean(false); @Getter - private int periodicUpdateIntervalSec; + private double periodicUpdateIntervalSec; private final Path workDir; /** @@ -156,16 +156,15 @@ public class LogManagerService extends PluginService { } private void handlePeriodicUploadIntervalSecConfig(Topics topics) { - int periodicUploadIntervalSecInput = Coerce.toInt(topics.lookup(CONFIGURATION_CONFIG_KEY, - LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC) - .dflt(DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC) - .toPOJO()); + double periodicUploadIntervalSecInput = + Coerce.toDouble(topics.findOrDefault(Double.toString(DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC), + CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)); if (periodicUploadIntervalSecInput > 0) { periodicUpdateIntervalSec = periodicUploadIntervalSecInput; } else { - logger.atWarn().log("Invalid config value, {}, for periodicUploadIntervalSec. Must be an " - + "integer greater than 0. Using default value of 300 (5 minutes)", + logger.atWarn().log("Invalid config value, {}, for periodicUploadIntervalSec. Must be a " + + "number greater than 0. Using default value of 300 (5 minutes)", periodicUploadIntervalSecInput); periodicUpdateIntervalSec = DEFAULT_PERIODIC_UPDATE_INTERVAL_SEC; } @@ -674,7 +673,7 @@ private void processLogsAndUpload() throws InterruptedException { //TODO: this is only done for passing the current text. But in practise, we don`t need to intentionally // sleep here. if (!isCurrentlyUploading.compareAndSet(false, true)) { - TimeUnit.SECONDS.sleep(periodicUpdateIntervalSec); + sleepFractionalSeconds(periodicUpdateIntervalSec); continue; } List componentMetadata = new ArrayList<>(); @@ -762,10 +761,14 @@ private void processLogsAndUpload() throws InterruptedException { emitEventStatus(EventType.ALL_COMPONENTS_PROCESSED); // after handle one cycle, we sleep for interval to avoid seamless scanning and processing next cycle. // TODO, do not use lazy sleep. Use scheduler to unblock the thread. - TimeUnit.SECONDS.sleep(periodicUpdateIntervalSec); + sleepFractionalSeconds(periodicUpdateIntervalSec); } } + private void sleepFractionalSeconds(double timeToSleep) throws InterruptedException { + TimeUnit.MICROSECONDS.sleep(Math.round(timeToSleep * TimeUnit.SECONDS.toMicros(1))); + } + public void registerEventStatusListener(Consumer callback) { serviceStatusListeners.add(callback); } diff --git a/src/test/java/com/aws/greengrass/logmanager/LogManagerServiceTest.java b/src/test/java/com/aws/greengrass/logmanager/LogManagerServiceTest.java index f107b08e..3a2c49df 100644 --- a/src/test/java/com/aws/greengrass/logmanager/LogManagerServiceTest.java +++ b/src/test/java/com/aws/greengrass/logmanager/LogManagerServiceTest.java @@ -91,6 +91,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.lenient; @@ -280,9 +281,9 @@ public void cleanup() throws InterruptedException { void GIVEN_system_log_files_to_be_uploaded_WHEN_merger_merges_THEN_we_get_all_log_files() throws InterruptedException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); - when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)) - .thenReturn(periodicUpdateIntervalMsTopic); + Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); + when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) + .thenReturn(periodicUpdateIntervalSecTopic); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); @@ -334,9 +335,9 @@ void GIVEN_system_log_files_to_be_uploaded_WHEN_merger_merges_THEN_we_get_all_lo void GIVEN_user_component_log_files_to_be_uploaded_with_required_config_as_array_WHEN_merger_merges_THEN_we_get_all_log_files() throws InterruptedException, UnsupportedInputTypeException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); - when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)) - .thenReturn(periodicUpdateIntervalMsTopic); + Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); + when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) + .thenReturn(periodicUpdateIntervalSecTopic); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); @@ -385,11 +386,11 @@ void GIVEN_user_component_log_files_to_be_uploaded_with_required_config_as_array @Test void GIVEN_user_component_log_files_to_be_uploaded_with_required_config_WHEN_merger_merges_THEN_we_get_all_log_files() - throws InterruptedException, UnsupportedInputTypeException, IOException { + throws InterruptedException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); - when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)) - .thenReturn(periodicUpdateIntervalMsTopic); + Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); + when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) + .thenReturn(periodicUpdateIntervalSecTopic); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); @@ -437,11 +438,11 @@ void GIVEN_user_component_log_files_to_be_uploaded_with_required_config_WHEN_mer @Test void GIVEN_user_component_log_files_to_be_uploaded_with_all_config_WHEN_merger_merges_THEN_we_get_all_log_files() - throws InterruptedException, UnsupportedInputTypeException, IOException { + throws InterruptedException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); - when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)) - .thenReturn(periodicUpdateIntervalMsTopic); + Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); + when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) + .thenReturn(periodicUpdateIntervalSecTopic); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); @@ -490,11 +491,11 @@ void GIVEN_user_component_log_files_to_be_uploaded_with_all_config_WHEN_merger_m @Test void GIVEN_multiple_user_components_log_files_to_be_uploaded_with_all_config_WHEN_merger_merges_THEN_we_get_all_log_files() - throws InterruptedException, UnsupportedInputTypeException, IOException { + throws InterruptedException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); - when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)) - .thenReturn(periodicUpdateIntervalMsTopic); + Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1"); + when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) + .thenReturn(periodicUpdateIntervalSecTopic); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); @@ -564,9 +565,9 @@ void GIVEN_null_config_WHEN_config_is_processed_THEN_no_component_config_is_adde ignoreExceptionOfType(context1, MismatchedInputException.class); Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics); - Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3"); - when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)) - .thenReturn(periodicUpdateIntervalMsTopic); + Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3"); + when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) + .thenReturn(periodicUpdateIntervalSecTopic); Topics logsUploaderConfigTopic = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null); when(config.lookupTopics(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_CONFIGURATION_TOPIC)) .thenReturn(logsUploaderConfigTopic); @@ -583,9 +584,9 @@ void GIVEN_cloud_watch_attempt_handler_WHEN_attempt_completes_THEN_successfully_ mockDefaultPersistedState(); LogFile processingFile = createLogFileWithSize(directoryPath.resolve("testlogs1.log").toUri(), 1061); LogFile lastProcessedFile = createLogFileWithSize(directoryPath.resolve("testlogs2.log").toUri(), 2943); - Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1000"); - when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)) - .thenReturn(periodicUpdateIntervalMsTopic); + Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "1000"); + when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) + .thenReturn(periodicUpdateIntervalSecTopic); Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics); Topics logsUploaderConfigTopics = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null); @@ -743,9 +744,9 @@ void GIVEN_cloud_watch_attempt_handler_WHEN_attempt_completes_THEN_successfully_ void GIVEN_some_system_files_uploaded_and_another_partially_uploaded_WHEN_merger_merges_THEN_sets_the_start_position_correctly() throws InterruptedException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3"); - when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)) - .thenReturn(periodicUpdateIntervalMsTopic); + Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3"); + when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) + .thenReturn(periodicUpdateIntervalSecTopic); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); @@ -800,9 +801,9 @@ void GIVEN_some_system_files_uploaded_and_another_partially_uploaded_WHEN_merger void GIVEN_a_partially_uploaded_file_but_rotated_WHEN_merger_merges_THEN_sets_the_start_position_correctly() throws InterruptedException, IOException { mockDefaultPersistedState(); - Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3"); - when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)) - .thenReturn(periodicUpdateIntervalMsTopic); + Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3"); + when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) + .thenReturn(periodicUpdateIntervalSecTopic); when(mockMerger.processLogFiles(componentLogsInformationCaptor.capture())).thenReturn(new CloudWatchAttempt()); Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); @@ -851,9 +852,9 @@ void GIVEN_a_partially_uploaded_file_but_rotated_WHEN_merger_merges_THEN_sets_th @Test void GIVEN_persisted_data_WHEN_log_uploader_initialises_THEN_correctly_sets_the_persisted_data() throws IOException { - Topic periodicUpdateIntervalMsTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3"); - when(config.lookup(CONFIGURATION_CONFIG_KEY, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC)) - .thenReturn(periodicUpdateIntervalMsTopic); + Topic periodicUpdateIntervalSecTopic = Topic.of(context, LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC, "3"); + when(config.findOrDefault(any(), eq(CONFIGURATION_CONFIG_KEY), eq(LOGS_UPLOADER_PERIODIC_UPDATE_INTERVAL_SEC))) + .thenReturn(periodicUpdateIntervalSecTopic); Topics configTopics = Topics.of(context, CONFIGURATION_CONFIG_KEY, null); when(config.lookupTopics(CONFIGURATION_CONFIG_KEY)).thenReturn(configTopics); Topics logsUploaderConfigTopics = Topics.of(context, LOGS_UPLOADER_CONFIGURATION_TOPIC, null);