From da1605cba388404a02bb694a3de509a4b69c5583 Mon Sep 17 00:00:00 2001 From: Zachary Bischoff <116595361+bischoffz@users.noreply.github.com> Date: Fri, 16 Aug 2024 15:48:59 -0400 Subject: [PATCH] add feature to send a event to a specific actor (#251) * add feature to send a event to a specific actor * addresses issue #247 --- .../vaccine/VaccinationDataManager.java | 2 +- .../nucleus/DataManagerContext.java | 29 +++++++--- .../ms/gcm/simulation/nucleus/Simulation.java | 54 +++++++++++++++++++ 3 files changed, 77 insertions(+), 8 deletions(-) diff --git a/lessons/lesson-07/src/main/java/gov/hhs/aspr/ms/gcm/lessons/plugins/vaccine/VaccinationDataManager.java b/lessons/lesson-07/src/main/java/gov/hhs/aspr/ms/gcm/lessons/plugins/vaccine/VaccinationDataManager.java index 5f45e5964..a8e548671 100644 --- a/lessons/lesson-07/src/main/java/gov/hhs/aspr/ms/gcm/lessons/plugins/vaccine/VaccinationDataManager.java +++ b/lessons/lesson-07/src/main/java/gov/hhs/aspr/ms/gcm/lessons/plugins/vaccine/VaccinationDataManager.java @@ -75,7 +75,7 @@ public List getUnvaccinatedFamilyMembers(PersonId personId) { List familyMembers = familyDataManager.getFamilyMembers(familyId); for (PersonId familyMemeberId : familyMembers) { if (!isPersonVaccinated(familyMemeberId)) { - result.add(personId); + result.add(familyMemeberId); } } } diff --git a/simulation/src/main/java/gov/hhs/aspr/ms/gcm/simulation/nucleus/DataManagerContext.java b/simulation/src/main/java/gov/hhs/aspr/ms/gcm/simulation/nucleus/DataManagerContext.java index 0dfce12e8..f36786af8 100644 --- a/simulation/src/main/java/gov/hhs/aspr/ms/gcm/simulation/nucleus/DataManagerContext.java +++ b/simulation/src/main/java/gov/hhs/aspr/ms/gcm/simulation/nucleus/DataManagerContext.java @@ -27,7 +27,7 @@ protected DataManagerContext(DataManagerId dataManagerId, Simulation simulation) * executed at the given time. * * @throws ContractException - * - * */ public List retrievePlans() { return simulation.retrievePlansForDataManager(dataManagerId); diff --git a/simulation/src/main/java/gov/hhs/aspr/ms/gcm/simulation/nucleus/Simulation.java b/simulation/src/main/java/gov/hhs/aspr/ms/gcm/simulation/nucleus/Simulation.java index 739916cf5..5095719f3 100644 --- a/simulation/src/main/java/gov/hhs/aspr/ms/gcm/simulation/nucleus/Simulation.java +++ b/simulation/src/main/java/gov/hhs/aspr/ms/gcm/simulation/nucleus/Simulation.java @@ -1116,6 +1116,23 @@ protected void unsubscribeReportFromEvent(Class eventClass) { } + protected void releaseObservationEventForDataManagerToActor(final Event event, final ActorId actorId) { + if (event == null) { + throw new ContractException(NucleusError.NULL_EVENT); + } + + if (actorId == null) { + throw new ContractException(NucleusError.NULL_ACTOR_ID); + } + + if (!dataManagerQueueActive) { + throw new ContractException(NucleusError.OBSERVATION_EVENT_IMPROPER_RELEASE); + } + + // queue the event handling for actors + broadcastEventToFilterNodeAndActor(event, rootNode, actorId); + } + protected void releaseObservationEventForDataManager(final Event event) { if (event == null) { @@ -1474,6 +1491,43 @@ protected boolean subscribersExistForEvent(Class eventClass) { || rootNode.children.containsKey(eventClass) || rootNode.consumers.containsKey(eventClass)); } + /* + * Recursively processes the event through the filter node to the given actor. Events should be + * processed through the root filter node. Each node's consumers have each such + * consumer scheduled onto the actor queue for delayed execution of the + * consumer. + */ + private void broadcastEventToFilterNodeAndActor(final Event event, FilterNode filterNode, ActorId actorId) { + // determine the value of the function for the given event + Object value = filterNode.function.apply(event); + + // use that value to place any consumers that are matched to that value + // on the actor queue + Map> consumerMap = filterNode.consumers.get(value); + if (consumerMap != null) { + if (consumerMap.containsKey(actorId)) { + Consumer consumer = consumerMap.get(actorId); + final ActorContentRec actorContentRec = new ActorContentRec(); + actorContentRec.event = event; + actorContentRec.actorId = actorId; + actorContentRec.eventConsumer = consumer; + actorQueue.add(actorContentRec); + } + } + + // match the value to any child nodes and recursively call this method + // on that node + Map, FilterNode> childMap = filterNode.children.get(value); + if (childMap != null) { + for (Object id : childMap.keySet()) { + FilterNode childNode = childMap.get(id); + if (childNode != null) { + broadcastEventToFilterNodeAndActor(event, childNode, actorId); + } + } + } + } + /* * Recursively processes the event through the filter node . Events should be * processed through the root filter node. Each node's consumers have each such