diff --git a/src/integrationtests/java/com/aws/greengrass/integrationtests/ipc/IPCServicesTest.java b/src/integrationtests/java/com/aws/greengrass/integrationtests/ipc/IPCServicesTest.java index bf45e6858e..f699c0d105 100644 --- a/src/integrationtests/java/com/aws/greengrass/integrationtests/ipc/IPCServicesTest.java +++ b/src/integrationtests/java/com/aws/greengrass/integrationtests/ipc/IPCServicesTest.java @@ -390,70 +390,69 @@ public void onStreamClosed() { @SuppressWarnings({"PMD.CloseResource", "PMD.AvoidCatchingGenericException"}) @Test - void GIVEN_ConfigStoreEventStreamClient_WHEN_adding_new_leaf_node_to_existing_container_node_THEN_config_is_updated3() - throws Exception { + void GIVEN_ConfigStoreEventStreamClient_WHEN_adding_new_leaf_node_to_existing_container_node_THEN_config_is_updated3() throws Exception { LogConfig.getRootLogConfig().setLevel(Level.DEBUG); Topics configuration = kernel.findServiceTopic("ServiceName").createInteriorChild(CONFIGURATION_CONFIG_KEY); - configuration.createInteriorChild("SomeContainerKeyToUpdate").createLeafChild("SomeContainerValue") - .withValue("InitialValue"); + configuration.createInteriorChild("SomeContainerKeyToUpdate").createLeafChild("SomeContainerValue").withValue("InitialValue"); Topics configToUpdate = configuration.lookupTopics("SomeContainerKeyToUpdate"); CountDownLatch cdl = new CountDownLatch(1); CountDownLatch subscriptionLatch = new CountDownLatch(1); - try (AutoCloseable c = TestUtils.createCloseableLogListener(m -> { + Slf4jLogAdapter.addGlobalListener(m -> { if (m.getMessage().contains("subscribed to configuration update")) { subscriptionLatch.countDown(); } - })) { - SubscribeToConfigurationUpdateRequest subscribe = new SubscribeToConfigurationUpdateRequest(); - subscribe.setComponentName("ServiceName"); - subscribe.setKeyPath(Collections.singletonList("SomeContainerKeyToUpdate")); - CompletableFuture fut = - greengrassCoreIPCClient.subscribeToConfigurationUpdate(subscribe, - Optional.of(new StreamResponseHandler() { - @Override - public void onStreamEvent(ConfigurationUpdateEvents event) { - assertNotNull(event.getConfigurationUpdateEvent()); - assertEquals("ServiceName", - event.getConfigurationUpdateEvent().getComponentName()); - assertNotNull(event.getConfigurationUpdateEvent().getKeyPath()); - cdl.countDown(); - } - - @Override - public boolean onStreamError(Throwable error) { - logger.atError().log("Received stream error.", error); - return false; - } - - @Override - public void onStreamClosed() { - - } - })).getResponse(); - fut.get(3, TimeUnit.SECONDS); + }); + SubscribeToConfigurationUpdateRequest subscribe = new SubscribeToConfigurationUpdateRequest(); + subscribe.setComponentName("ServiceName"); + subscribe.setKeyPath(Collections.singletonList("SomeContainerKeyToUpdate")); + CompletableFuture fut = + greengrassCoreIPCClient.subscribeToConfigurationUpdate(subscribe, Optional.of(new StreamResponseHandler() { + @Override + public void onStreamEvent(ConfigurationUpdateEvents event) { + assertNotNull(event.getConfigurationUpdateEvent()); + assertEquals("ServiceName", event.getConfigurationUpdateEvent().getComponentName()); + assertNotNull(event.getConfigurationUpdateEvent().getKeyPath()); + cdl.countDown(); + } - assertTrue(subscriptionLatch.await(20, TimeUnit.SECONDS)); + @Override + public boolean onStreamError(Throwable error) { + logger.atError().log("Received stream error.", error); + return false; + } + + @Override + public void onStreamClosed() { - // count down 1 is during the call to subscribe - CountDownLatch configUpdated = new CountDownLatch(2); - configToUpdate.subscribe((what, node) -> configUpdated.countDown()); - kernel.getContext().waitForPublishQueueToClear(); - - Map map2 = new HashMap<>(); - map2.put("SomeNewChild", "NewValue"); - List l = new ArrayList<>(); - l.add("SomeContainerKeyToUpdate"); - Instant now = Instant.now(); - UpdateConfigurationRequest updateConfigurationRequest = new UpdateConfigurationRequest(); - updateConfigurationRequest.setKeyPath(l); - updateConfigurationRequest.setValueToMerge(map2); - updateConfigurationRequest.setTimestamp(now); - greengrassCoreIPCClient.updateConfiguration(updateConfigurationRequest, Optional.empty()).getResponse().get(50, TimeUnit.SECONDS); - assertTrue(configUpdated.await(TIMEOUT_FOR_CONFIG_STORE_SECONDS, TimeUnit.SECONDS)); - assertTrue(cdl.await(TIMEOUT_FOR_CONFIG_STORE_SECONDS, TimeUnit.SECONDS)); - Topic topic = (Topic) configToUpdate.getChild("SomeNewChild"); - assertEquals("NewValue", topic.getOnce()); + } + })).getResponse(); + try { + fut.get(3, TimeUnit.SECONDS); + } catch (Exception e) { + logger.atError().setCause(e).log("Error when subscribing to component updates"); + fail("Caught exception when subscribing to component updates"); } + + assertTrue(subscriptionLatch.await(20, TimeUnit.SECONDS)); + + CountDownLatch configUpdated = new CountDownLatch(1); + configToUpdate.subscribe((what, node) -> configUpdated.countDown()); + + Map map2 = new HashMap<>(); + map2.put("SomeNewChild", "NewValue"); + List l = new ArrayList<>(); + l.add("SomeContainerKeyToUpdate"); + Instant now = Instant.now(); + UpdateConfigurationRequest updateConfigurationRequest = new UpdateConfigurationRequest(); + updateConfigurationRequest.setKeyPath(l); + updateConfigurationRequest.setValueToMerge(map2); + updateConfigurationRequest.setTimestamp(now); + greengrassCoreIPCClient.updateConfiguration(updateConfigurationRequest, Optional.empty()).getResponse().get(50, TimeUnit.SECONDS); + assertTrue(configUpdated.await(TIMEOUT_FOR_CONFIG_STORE_SECONDS, TimeUnit.SECONDS)); + assertTrue(cdl.await(TIMEOUT_FOR_CONFIG_STORE_SECONDS, TimeUnit.SECONDS)); + Topic topic = (Topic) configToUpdate.getChild("SomeNewChild"); + assertEquals("NewValue", topic.getOnce()); + } diff --git a/src/main/java/com/aws/greengrass/authorization/AuthorizationHandler.java b/src/main/java/com/aws/greengrass/authorization/AuthorizationHandler.java index 42ccc405af..00c3b0b1be 100644 --- a/src/main/java/com/aws/greengrass/authorization/AuthorizationHandler.java +++ b/src/main/java/com/aws/greengrass/authorization/AuthorizationHandler.java @@ -10,7 +10,6 @@ import com.aws.greengrass.lifecyclemanager.Kernel; import com.aws.greengrass.logging.api.Logger; import com.aws.greengrass.logging.impl.LogManager; -import com.aws.greengrass.util.BatchedSubscriber; import com.aws.greengrass.util.LockScope; import com.aws.greengrass.util.Utils; import lombok.NonNull; @@ -87,8 +86,7 @@ public AuthorizationHandler(Kernel kernel, AuthorizationModule authModule, this.kernel = kernel; this.authModule = authModule; // Adding TES component and operation before it's default policies are fetched - componentToOperationsMap.put(TOKEN_EXCHANGE_SERVICE_TOPICS, new HashSet<>( - Collections.singletonList(AUTHZ_TES_OPERATION))); + componentToOperationsMap.put(TOKEN_EXCHANGE_SERVICE_TOPICS, new HashSet<>(Arrays.asList(AUTHZ_TES_OPERATION))); componentToOperationsMap.put(PUB_SUB_SERVICE_NAME, new HashSet<>(Arrays.asList(PUBLISH_TO_TOPIC, SUBSCRIBE_TO_TOPIC, ANY_REGEX))); componentToOperationsMap.put(MQTT_PROXY_SERVICE_NAME, new HashSet<>(Arrays.asList(PUBLISH_TO_IOT_CORE, @@ -110,58 +108,57 @@ public AuthorizationHandler(Kernel kernel, AuthorizationModule authModule, } // Subscribe to future auth config updates - new BatchedSubscriber(this.kernel.getConfig().lookupTopics(SERVICES_NAMESPACE_TOPIC), (why, newv) -> { - if (WhatHappened.interiorAdded.equals(why) || WhatHappened.timestampUpdated.equals(why)) { - return true; - } - if (newv == null) { - return false; - } + this.kernel.getConfig().lookupTopics(SERVICES_NAMESPACE_TOPIC).subscribe( + (why, newv) -> { + if (newv == null) { + return; + } - //If there is a childChanged event, it has to be the 'accessControl' Topic that has bubbled up - //If there is a childRemoved event, it could be the component is removed, or either the - //'accessControl' Topic or/the 'parameters' Topics that has bubbled up, so we need to handle and - //filter out all other WhatHappeneds - if (WhatHappened.childRemoved.equals(why) || WhatHappened.removed.equals(why)) { - // Either a service or a parameter block or acl subkey - if (!newv.parent.getName().equals(SERVICES_NAMESPACE_TOPIC) && !newv.getName() - .equals(CONFIGURATION_CONFIG_KEY) && !newv.getName().equals(ACCESS_CONTROL_NAMESPACE_TOPIC) - && !newv.childOf(ACCESS_CONTROL_NAMESPACE_TOPIC)) { - return true; - } - } else if (!newv.childOf(ACCESS_CONTROL_NAMESPACE_TOPIC) && !newv.getName() - .equals(ACCESS_CONTROL_NAMESPACE_TOPIC)) { - // for all other WhatHappened cases we only care about access control change - return true; - } - return false; - }, (why) -> { - // TODO: [V243584397]: Partial policy reload - // For now, reload all policies - Map> reloadedPolicies = - policyParser.parseAllAuthorizationPolicies(kernel); - - // Load default policies - reloadedPolicies.putAll(getDefaultPolicies()); - - try (LockScope scope = LockScope.lock(rwLock.writeLock())) { - for (Map.Entry> primaryPolicyList - : componentToAuthZConfig.entrySet()) { - String policyType = primaryPolicyList.getKey(); - if (!reloadedPolicies.containsKey(policyType)) { - //If the policyType already exists and was not reparsed correctly and/or removed from - //the newly parsed list, delete it from our store since it is now an unwanted relic - componentToAuthZConfig.remove(policyType); - authModule.deletePermissionsWithDestination(policyType); + //If there is a childChanged event, it has to be the 'accessControl' Topic that has bubbled up + //If there is a childRemoved event, it could be the component is removed, or either the + //'accessControl' Topic or/the 'parameters' Topics that has bubbled up, so we need to handle and + //filter out all other WhatHappeneds + if (WhatHappened.childRemoved.equals(why) || WhatHappened.removed.equals(why)) { + // Either a service or a parameter block or acl subkey + if (!newv.parent.getName().equals(SERVICES_NAMESPACE_TOPIC) + && !newv.getName().equals(CONFIGURATION_CONFIG_KEY) + && !newv.getName().equals(ACCESS_CONTROL_NAMESPACE_TOPIC) + && !newv.childOf(ACCESS_CONTROL_NAMESPACE_TOPIC)) { + return; + } + } else if (!newv.childOf(ACCESS_CONTROL_NAMESPACE_TOPIC) + && !newv.getName().equals(ACCESS_CONTROL_NAMESPACE_TOPIC)) { + // for all other WhatHappened cases we only care about access control change + return; } - } - //Now we reload the policies that reflect the current state of the Nucleus config - for (Map.Entry> acl : reloadedPolicies.entrySet()) { - this.loadAuthorizationPolicies(acl.getKey(), acl.getValue(), true); - } - } - }).subscribe(); + // TODO: [V243584397]: Partial policy reload + // For now, reload all policies + Map> reloadedPolicies = policyParser + .parseAllAuthorizationPolicies(kernel); + + // Load default policies + reloadedPolicies.putAll(getDefaultPolicies()); + + try (LockScope scope = LockScope.lock(rwLock.writeLock())) { + + for (Map.Entry> primaryPolicyList : + componentToAuthZConfig.entrySet()) { + String policyType = primaryPolicyList.getKey(); + if (!reloadedPolicies.containsKey(policyType)) { + //If the policyType already exists and was not reparsed correctly and/or removed from + //the newly parsed list, delete it from our store since it is now an unwanted relic + componentToAuthZConfig.remove(policyType); + authModule.deletePermissionsWithDestination(policyType); + } + } + + //Now we reload the policies that reflect the current state of the Nucleus config + for (Map.Entry> acl : reloadedPolicies.entrySet()) { + this.loadAuthorizationPolicies(acl.getKey(), acl.getValue(), true); + } + } + }); } /** diff --git a/src/main/java/com/aws/greengrass/dependency/Context.java b/src/main/java/com/aws/greengrass/dependency/Context.java index 6b4b68300e..904620868d 100644 --- a/src/main/java/com/aws/greengrass/dependency/Context.java +++ b/src/main/java/com/aws/greengrass/dependency/Context.java @@ -74,7 +74,6 @@ public void run() { } } }; - private static final Crashable doNothing = () -> {}; // magical private boolean shuttingDown = false; // global state change notification @@ -305,14 +304,8 @@ public Throwable runOnPublishQueueAndWait(Crashable r) { return ret.get(); } - @SuppressWarnings("checkstyle:MissingJavadocMethod") public void waitForPublishQueueToClear() { - // Run on the publish queue until it is empty. When the queue is empty that doesn't mean that - // all jobs have finished processing though, so we run it once again at the end. - do { - runOnPublishQueueAndWait(doNothing); - } while (!serialized.isEmpty()); - runOnPublishQueueAndWait(doNothing); + runOnPublishQueueAndWait(() -> {}); } private boolean onPublishThread() { diff --git a/src/main/java/com/aws/greengrass/status/FleetStatusService.java b/src/main/java/com/aws/greengrass/status/FleetStatusService.java index cc99805ef6..3880c001e9 100644 --- a/src/main/java/com/aws/greengrass/status/FleetStatusService.java +++ b/src/main/java/com/aws/greengrass/status/FleetStatusService.java @@ -7,7 +7,6 @@ import com.aws.greengrass.componentmanager.KernelConfigResolver; import com.aws.greengrass.config.PlatformResolver; -import com.aws.greengrass.config.Subscriber; import com.aws.greengrass.config.Topic; import com.aws.greengrass.config.Topics; import com.aws.greengrass.config.WhatHappened; @@ -18,7 +17,6 @@ import com.aws.greengrass.deployment.DeviceConfiguration; import com.aws.greengrass.deployment.exceptions.DeviceConfigurationException; import com.aws.greengrass.deployment.model.Deployment.DeploymentType; -import com.aws.greengrass.lifecyclemanager.GlobalStateChangeListener; import com.aws.greengrass.lifecyclemanager.GreengrassService; import com.aws.greengrass.lifecyclemanager.Kernel; import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException; @@ -46,7 +44,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import javax.inject.Inject; import static com.aws.greengrass.deployment.DeploymentService.COMPONENTS_TO_GROUPS_TOPICS; @@ -74,8 +71,6 @@ public class FleetStatusService extends GreengrassService { public static final String DEVICE_OFFLINE_MESSAGE = "Device not configured to talk to AWS IoT cloud. " + "FleetStatusService is offline"; private final DeviceConfiguration deviceConfiguration; - private final GlobalStateChangeListener handleServiceStateChange = this::handleServiceStateChange; - private final Function, Boolean> deploymentStatusChanged = this::deploymentStatusChanged; private String updateTopic; private String thingName; @@ -113,18 +108,6 @@ public void onConnectionResumed(boolean sessionPresent) { schedulePeriodicFleetStatusDataUpdate(true); } }; - private final Subscriber publishIntervalSubscriber = (why, newv) -> { - int newPeriodicUpdateIntervalSec = Coerce.toInt(newv); - // Do not update the scheduled interval if it is less than the default. - if (newPeriodicUpdateIntervalSec < DEFAULT_PERIODIC_PUBLISH_INTERVAL_SEC) { - return; - } - this.periodicPublishIntervalSec = TestFeatureParameters.retrieveWithDefault(Double.class, - FLEET_STATUS_TEST_PERIODIC_UPDATE_INTERVAL_SEC, newPeriodicUpdateIntervalSec).intValue(); - if (periodicUpdateFuture != null) { - schedulePeriodicFleetStatusDataUpdate(false); - } - }; /** * Constructor for FleetStatusService. @@ -191,7 +174,7 @@ public void postInject() { deviceConfiguration.onAnyChange((what, node) -> { if (node != null && WhatHappened.childChanged.equals(what) - && DeviceConfiguration.provisionInfoNodeChanged(node, false)) { + && deviceConfiguration.provisionInfoNodeChanged(node, false)) { try { setUpFSS(); } catch (DeviceConfigurationException e) { @@ -212,15 +195,27 @@ private void setUpFSS() throws DeviceConfigurationException { if (isFSSSetupComplete.compareAndSet(false, true)) { Topics configurationTopics = deviceConfiguration.getStatusConfigurationTopics(); configurationTopics.lookup(FLEET_STATUS_PERIODIC_PUBLISH_INTERVAL_SEC) - .dflt(DEFAULT_PERIODIC_PUBLISH_INTERVAL_SEC).subscribe(publishIntervalSubscriber); - - config.getContext().addGlobalStateChangeListener(handleServiceStateChange); - - this.deploymentStatusKeeper.registerDeploymentStatusConsumer(IOT_JOBS, deploymentStatusChanged, + .dflt(DEFAULT_PERIODIC_PUBLISH_INTERVAL_SEC).subscribe((why, newv) -> { + int newPeriodicUpdateIntervalSec = Coerce.toInt(newv); + // Do not update the scheduled interval if it is less than the default. + if (newPeriodicUpdateIntervalSec < DEFAULT_PERIODIC_PUBLISH_INTERVAL_SEC) { + return; + } + this.periodicPublishIntervalSec = TestFeatureParameters.retrieveWithDefault(Double.class, + FLEET_STATUS_TEST_PERIODIC_UPDATE_INTERVAL_SEC, + newPeriodicUpdateIntervalSec).intValue(); + if (periodicUpdateFuture != null) { + schedulePeriodicFleetStatusDataUpdate(false); + } + }); + + config.getContext().addGlobalStateChangeListener(this::handleServiceStateChange); + + this.deploymentStatusKeeper.registerDeploymentStatusConsumer(IOT_JOBS, this::deploymentStatusChanged, FLEET_STATUS_SERVICE_TOPICS); - this.deploymentStatusKeeper.registerDeploymentStatusConsumer(LOCAL, deploymentStatusChanged, + this.deploymentStatusKeeper.registerDeploymentStatusConsumer(LOCAL, this::deploymentStatusChanged, FLEET_STATUS_SERVICE_TOPICS); - this.deploymentStatusKeeper.registerDeploymentStatusConsumer(SHADOW, deploymentStatusChanged, + this.deploymentStatusKeeper.registerDeploymentStatusConsumer(SHADOW, this::deploymentStatusChanged, FLEET_STATUS_SERVICE_TOPICS); schedulePeriodicFleetStatusDataUpdate(false); }