Skip to content

Commit

Permalink
add feature to send a event to a specific actor (#251)
Browse files Browse the repository at this point in the history
* add feature to send a event to a specific actor

* addresses issue #247
  • Loading branch information
bischoffz authored Aug 16, 2024
1 parent 6e61d70 commit da1605c
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public List<PersonId> getUnvaccinatedFamilyMembers(PersonId personId) {
List<PersonId> familyMembers = familyDataManager.getFamilyMembers(familyId);
for (PersonId familyMemeberId : familyMembers) {
if (!isPersonVaccinated(familyMemeberId)) {
result.add(personId);
result.add(familyMemeberId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ protected DataManagerContext(DataManagerId dataManagerId, Simulation simulation)
* executed at the given time.
*
* @throws ContractException
* <ul>
* <ul>
* <li>{@link NucleusError#NULL_PLAN_CONSUMER} if the
* consumer is null</li>
* <li>{@link NucleusError#PAST_PLANNING_TIME} if the
Expand All @@ -36,7 +36,6 @@ protected DataManagerContext(DataManagerId dataManagerId, Simulation simulation)
* the plan is added to the simulation after event
* processing is finished</li>
* </ul>
*
*/
public void addPlan(final Consumer<DataManagerContext> consumer, final double planTime) {
simulation.addDataManagerPlan(dataManagerId, new ConsumerDataManagerPlan(planTime, consumer));
Expand All @@ -50,21 +49,20 @@ public void addPlan(final Consumer<DataManagerContext> consumer, final double pl
* plans (having arrival id = -1) are scheduled in the planning queue with
* higher arrival ids than all the serialized plans.
*
*
* @throws ContractException
* <ul>
* <li>{@link NucleusError#NULL_PLAN} if the plan is
* null</li>
* <li>{@link NucleusError#INVALID_PLAN_ARRIVAL_ID} if the
* arrival id is less than -1</li>
* <li>{@link NucleusError#INVALID_PLAN_ARRIVAL_ID} if
* the arrival id is less than -1</li>
* <li>{@link NucleusError#PAST_PLANNING_TIME} if the
* plan is scheduled for a time in the past *</li>
* <li>{@link NucleusError#PLANNING_QUEUE_CLOSED} if
* the plan is added to the simulation after event
* processing is finished</li>
* </ul>
*/
public void addPlan(DataManagerPlan plan) {
public void addPlan(DataManagerPlan plan) {
simulation.addDataManagerPlan(dataManagerId, plan);
}

Expand Down Expand Up @@ -111,6 +109,24 @@ public void releaseObservationEvent(final Event event) {
simulation.releaseObservationEventForDataManager(event);
}

/**
* Broadcasts the given event to a single actor. This is used
* for OBSERVATION events that are generated by the data managers. MUTATION
* events that are generated by the data managers as a proxy for actors and data
* managers should use releaseMutationEvent() instead.
*
* @throws ContractException
* <ul>
* <li>{@link NucleusError#NULL_EVENT} if the event is
* null</li>
* <li>{@link NucleusError#NULL_ACTOR_ID} if the
* actorId is null</li>
* </ul>
*/
public void releaseObservationEventToActor(final Event event, final ActorId actorId) {
simulation.releaseObservationEventForDataManagerToActor(event, actorId);
}

/**
* Starts the event handling process for the given event This is used for
* MUTATION events.
Expand Down Expand Up @@ -243,7 +259,6 @@ public LocalDateTime getLocalDateTime(double simulationTime) {
* if this method is invoked before the termination of
* the simulation</li>
* </ul>
*
*/
public List<DataManagerPlan> retrievePlans() {
return simulation.retrievePlansForDataManager(dataManagerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,23 @@ protected void unsubscribeReportFromEvent(Class<? extends Event> eventClass) {

}

protected void releaseObservationEventForDataManagerToActor(final Event event, final ActorId actorId) {
if (event == null) {
throw new ContractException(NucleusError.NULL_EVENT);
}

if (actorId == null) {
throw new ContractException(NucleusError.NULL_ACTOR_ID);
}

if (!dataManagerQueueActive) {
throw new ContractException(NucleusError.OBSERVATION_EVENT_IMPROPER_RELEASE);
}

// queue the event handling for actors
broadcastEventToFilterNodeAndActor(event, rootNode, actorId);
}

protected void releaseObservationEventForDataManager(final Event event) {

if (event == null) {
Expand Down Expand Up @@ -1474,6 +1491,43 @@ protected boolean subscribersExistForEvent(Class<? extends Event> eventClass) {
|| rootNode.children.containsKey(eventClass) || rootNode.consumers.containsKey(eventClass));
}

/*
* Recursively processes the event through the filter node to the given actor. Events should be
* processed through the root filter node. Each node's consumers have each such
* consumer scheduled onto the actor queue for delayed execution of the
* consumer.
*/
private void broadcastEventToFilterNodeAndActor(final Event event, FilterNode filterNode, ActorId actorId) {
// determine the value of the function for the given event
Object value = filterNode.function.apply(event);

// use that value to place any consumers that are matched to that value
// on the actor queue
Map<ActorId, Consumer<Event>> consumerMap = filterNode.consumers.get(value);
if (consumerMap != null) {
if (consumerMap.containsKey(actorId)) {
Consumer<Event> consumer = consumerMap.get(actorId);
final ActorContentRec actorContentRec = new ActorContentRec();
actorContentRec.event = event;
actorContentRec.actorId = actorId;
actorContentRec.eventConsumer = consumer;
actorQueue.add(actorContentRec);
}
}

// match the value to any child nodes and recursively call this method
// on that node
Map<IdentifiableFunction<?>, FilterNode> childMap = filterNode.children.get(value);
if (childMap != null) {
for (Object id : childMap.keySet()) {
FilterNode childNode = childMap.get(id);
if (childNode != null) {
broadcastEventToFilterNodeAndActor(event, childNode, actorId);
}
}
}
}

/*
* Recursively processes the event through the filter node . Events should be
* processed through the root filter node. Each node's consumers have each such
Expand Down

0 comments on commit da1605c

Please sign in to comment.