From aa9f6b70e61d2f5ff26ea15fa2c638b7eb0b02a4 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 Nov 2024 13:54:53 -0500 Subject: [PATCH 01/11] Test JFactory::GetObjects caching behavior --- .../unit_tests/Components/JFactoryTests.cc | 42 ++++++++++++++++++- .../unit_tests/Components/JFactoryTests.h | 26 ++++++++++-- 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/src/programs/unit_tests/Components/JFactoryTests.cc b/src/programs/unit_tests/Components/JFactoryTests.cc index 07ada4b21..7149dfe9b 100644 --- a/src/programs/unit_tests/Components/JFactoryTests.cc +++ b/src/programs/unit_tests/Components/JFactoryTests.cc @@ -4,6 +4,7 @@ #include "JANA/Components/JComponentFwd.h" +#include "JANA/JApplicationFwd.h" #include "JANA/JFactory.h" #include "catch.hpp" #include "JFactoryTests.h" @@ -566,4 +567,43 @@ TEST_CASE("JFactory_ExceptionHandling") { } } - +TEST_CASE("JFactory_GetObjects_Caching") { + JApplication app; + app.Add(new JFactoryGeneratorT>()); + app.Add(new JFactoryGeneratorT>()); + auto source = new JFactoryTestDummySource; + auto event = std::make_shared(&app); + event->SetJEventSource(source); + + SECTION("RepeatedGetObjectsAreCached") { + auto dummies = event->Get(); + REQUIRE(dummies.at(0)->data == 8); + REQUIRE(source->get_objects_count == 1); + REQUIRE(source->get_objects_dummy_count == 1); + + dummies = event->Get(); + REQUIRE(dummies.at(0)->data == 8); + REQUIRE(source->get_objects_count == 1); + REQUIRE(source->get_objects_dummy_count == 1); + } + + SECTION("DifferentGetObjectsAreNotCached") { + auto dummies = event->Get(); + REQUIRE(dummies.at(0)->data == 8); + REQUIRE(source->get_objects_count == 1); + REQUIRE(source->get_objects_dummy_count == 1); + REQUIRE(source->get_objects_different_count == 0); + + auto different = event->Get(); + REQUIRE(different.at(0)->E == 123.0); + REQUIRE(source->get_objects_count == 2); + REQUIRE(source->get_objects_dummy_count == 1); + REQUIRE(source->get_objects_different_count == 1); + + different = event->Get(); + REQUIRE(different.at(0)->E == 123.0); + REQUIRE(source->get_objects_count == 2); + REQUIRE(source->get_objects_dummy_count == 1); + REQUIRE(source->get_objects_different_count == 1); + } +} diff --git a/src/programs/unit_tests/Components/JFactoryTests.h b/src/programs/unit_tests/Components/JFactoryTests.h index e700b86c4..e2cbc0188 100644 --- a/src/programs/unit_tests/Components/JFactoryTests.h +++ b/src/programs/unit_tests/Components/JFactoryTests.h @@ -23,6 +23,11 @@ struct JFactoryTestDummyObject : public JObject { } }; +struct DifferentDummyObject : public JObject { + double E; + DifferentDummyObject(double E) : E(E) {} +}; + /// DummyFactory is a trivial JFactory which reports how often its member functions get called @@ -69,6 +74,10 @@ struct JFactoryTestExceptingInInitFactory : public JFactoryT&, JFactory* aFactory) override { - auto factory = dynamic_cast*>(aFactory); - if (factory != nullptr) { - factory->Insert(new JFactoryTestDummyObject(8)); - factory->Insert(new JFactoryTestDummyObject(88)); + get_objects_count += 1; + auto dummy_factory = dynamic_cast*>(aFactory); + if (dummy_factory != nullptr) { + get_objects_dummy_count += 1; + dummy_factory->Insert(new JFactoryTestDummyObject(8)); + dummy_factory->Insert(new JFactoryTestDummyObject(88)); + return true; + } + auto different_factory = dynamic_cast*>(aFactory); + if (different_factory != nullptr) { + get_objects_different_count += 1; + different_factory->Insert(new DifferentDummyObject(123.)); + return true; } return false; } From 0769f907d24fe5dcf71f29ae3864f3973e610289 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 26 Nov 2024 12:50:59 -0500 Subject: [PATCH 02/11] Update halld_recon build script Making any updates to the Alma9 container will push the gcc version to 11.5.0, which isn't in CVMFS yet. --- .github/halld_recon_build.sh | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/.github/halld_recon_build.sh b/.github/halld_recon_build.sh index 19810271e..e2c405132 100644 --- a/.github/halld_recon_build.sh +++ b/.github/halld_recon_build.sh @@ -1,19 +1,21 @@ #!/bin/bash -echo "mounting cvmfs" +echo "Mounting CVMFS" yum -y install fuse chmod 666 /dev/fuse mkdir -p /cvmfs/oasis.opensciencegrid.org mount -t cvmfs oasis.opensciencegrid.org /cvmfs/oasis.opensciencegrid.org - -cd /workspace/halld_recon ln -s /cvmfs/oasis.opensciencegrid.org/gluex/group /group -export BUILD_SCRIPTS=/group/halld/Software/build_scripts -source $BUILD_SCRIPTS/gluex_env_boot_jlab.sh --bs $BUILD_SCRIPTS + +export BMS_OSNAME_OVERRIDE="Linux_Alma9-x86_64-gcc11.4.1-cntr" +# The BMS_OSNAME override is needed because Alma9 retroactively switched its system gcc +# from 11.4.1 to 11.5. We cannot create a new Alma9 container that uses gcc11.4.1, but +# the CVMFS artifact repository hasn't been updated to reflect that. + +source /group/halld/Software/build_scripts/gluex_env_boot_jlab.sh gxenv /workspace/JANA2/.github/halld_recon_build_prereqs_version.xml -echo "rootsys" -echo $ROOTSYS -chmod +x $JANA_HOME/bin/* -cd src -scons install -j12 \ No newline at end of file +echo "ROOTSYS=$ROOTSYS" +cd /workspace/halld_recon/src +scons install -j12 DEBUG=1 OPTIMIZATION=0 SHOWBUILD=1 + From 7f1e8e45a0b07446cff45d182c8a4210199b6962 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 26 Nov 2024 13:50:18 -0500 Subject: [PATCH 03/11] Disable ScaleThroughputImprovement because it fails intermittently in CI This brings it back to how it's always been. The plan is to set up a better distinction between unit, integration, and performance tests, and only run the latter on platforms with a known number of cpus available in the runner. --- src/programs/unit_tests/Engine/ScaleTests.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/programs/unit_tests/Engine/ScaleTests.cc b/src/programs/unit_tests/Engine/ScaleTests.cc index 4102b460a..cbc305463 100644 --- a/src/programs/unit_tests/Engine/ScaleTests.cc +++ b/src/programs/unit_tests/Engine/ScaleTests.cc @@ -57,7 +57,7 @@ TEST_CASE("ScaleNWorkerUpdate") { app.Stop(true); } -TEST_CASE("ScaleThroughputImprovement") { +TEST_CASE("ScaleThroughputImprovement", "[.][performance]") { JApplication app; app.SetParameterValue("jana:loglevel", "INFO"); From 9dd003bc32e7a41f3a0f108b4f2581b9aa6d48e2 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 26 Nov 2024 14:09:49 -0500 Subject: [PATCH 04/11] Test barrier events when nthreads=1 --- .../Components/BarrierEventTests.cc | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/programs/unit_tests/Components/BarrierEventTests.cc b/src/programs/unit_tests/Components/BarrierEventTests.cc index e3a7a467c..4a2ce5ce7 100644 --- a/src/programs/unit_tests/Components/BarrierEventTests.cc +++ b/src/programs/unit_tests/Components/BarrierEventTests.cc @@ -93,6 +93,32 @@ struct BarrierProcessor : public JEventProcessor { }; +TEST_CASE("BarrierEventTests_SingleThread") { + global_resource = 0; + JApplication app; + app.Add(new BarrierProcessor); + app.Add(new BarrierSource); + app.SetParameterValue("nthreads", 1); + app.SetParameterValue("jana:nevents", 40); + //app.SetParameterValue("jana:log:show_threadstamp", true); + //app.SetParameterValue("jana:loglevel", "debug"); + app.Run(true); +}; + + +TEST_CASE("BarrierEventTests_Legacy_SingleThread") { + global_resource = 0; + JApplication app; + app.Add(new LegacyBarrierProcessor); + app.Add(new BarrierSource); + app.SetParameterValue("nthreads", 1); + app.SetParameterValue("jana:nevents", 40); + //app.SetParameterValue("jana:log:show_threadstamp", true); + //app.SetParameterValue("jana:loglevel", "debug"); + app.Run(true); +}; + + TEST_CASE("BarrierEventTests") { global_resource = 0; JApplication app; From 22c685cd46196802fc0bacd4b3b7c60a8b69684d Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 26 Nov 2024 14:25:50 -0500 Subject: [PATCH 05/11] JExecutionEngine: Bring back m_next_arrow_id This fixes the livelock on barrier events when nthreads=1 --- src/libraries/JANA/Engine/JExecutionEngine.cc | 20 ++++++++++++++----- src/libraries/JANA/Engine/JExecutionEngine.h | 1 + 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/libraries/JANA/Engine/JExecutionEngine.cc b/src/libraries/JANA/Engine/JExecutionEngine.cc index c9ae36edf..7ad624eb4 100644 --- a/src/libraries/JANA/Engine/JExecutionEngine.cc +++ b/src/libraries/JANA/Engine/JExecutionEngine.cc @@ -511,15 +511,23 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Draining) { // We only pick up a new task if the topology is running or draining. - for (size_t arrow_id=0; arrow_idget_next_port_index(); JEvent* event = (port == -1) ? nullptr : arrow->pull(port, worker.location_id); if (event != nullptr || port == -1) { + LOG_TRACE(GetLogger()) << "Scheduler: Found next ready arrow with id " << arrow_id << LOG_END; // We've found a task that is ready! state.active_tasks += 1; @@ -550,6 +559,9 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) } return; } + else { + LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Input event is needed but not on queue yet." << LOG_END; + } } } @@ -565,8 +577,6 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) { auto& state = m_arrow_states[arrow_id]; - auto* arrow = m_topology->arrows[arrow_id]; - LOG_TRACE(GetLogger()) << "Scheduler: arrow=" << arrow->get_name() << ", is_source=" << state.is_source << ", active_tasks=" << state.active_tasks << ", is_parallel=" << state.is_parallel << LOG_END; any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source); any_active_task_found |= (state.active_tasks != 0); // A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks @@ -581,7 +591,7 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) m_event_count_at_finish += arrow_state.events_processed; } } - LOG_DEBUG(GetLogger()) << "Processing paused" << LOG_END; + LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END; m_runstatus = RunStatus::Paused; // I think this is the ONLY site where the topology gets paused. Verify this? } diff --git a/src/libraries/JANA/Engine/JExecutionEngine.h b/src/libraries/JANA/Engine/JExecutionEngine.h index f0e2ffd66..28e6c0468 100644 --- a/src/libraries/JANA/Engine/JExecutionEngine.h +++ b/src/libraries/JANA/Engine/JExecutionEngine.h @@ -103,6 +103,7 @@ class JExecutionEngine : public JService { std::atomic m_interrupt_status { InterruptStatus::NoInterruptsUnsupervised }; std::atomic_bool m_print_worker_report_requested {false}; std::atomic_bool m_send_worker_report_requested {false}; + size_t m_next_arrow_id=0; // Metrics size_t m_event_count_at_start = 0; From 784615a21bfe97984dcb023ed4ef60abde5c86fd Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 26 Nov 2024 14:58:19 -0500 Subject: [PATCH 06/11] Fix warnings on gcc 11.5 --- src/libraries/JANA/Engine/JExecutionEngine.cc | 1 + src/programs/unit_tests/Components/BarrierEventTests.cc | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/libraries/JANA/Engine/JExecutionEngine.cc b/src/libraries/JANA/Engine/JExecutionEngine.cc index 7ad624eb4..08e13fa0f 100644 --- a/src/libraries/JANA/Engine/JExecutionEngine.cc +++ b/src/libraries/JANA/Engine/JExecutionEngine.cc @@ -788,6 +788,7 @@ std::string ToString(JExecutionEngine::RunStatus runstatus) { case JExecutionEngine::RunStatus::Pausing: return "Pausing"; case JExecutionEngine::RunStatus::Draining: return "Draining"; case JExecutionEngine::RunStatus::Finished: return "Finished"; + default: return "CorruptedRunStatus"; } } diff --git a/src/programs/unit_tests/Components/BarrierEventTests.cc b/src/programs/unit_tests/Components/BarrierEventTests.cc index 4a2ce5ce7..37e62580c 100644 --- a/src/programs/unit_tests/Components/BarrierEventTests.cc +++ b/src/programs/unit_tests/Components/BarrierEventTests.cc @@ -8,7 +8,7 @@ #include "JANA/Utils/JBenchUtils.h" #include "catch.hpp" -int global_resource = 0; +size_t global_resource = 0; struct BarrierSource : public JEventSource { From ca3ce7ec0546273d9a7e911409d6d352c33e744c Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 26 Nov 2024 20:54:00 -0500 Subject: [PATCH 07/11] Improve log messages for barrier events --- src/libraries/JANA/Topology/JEventSourceArrow.cc | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.cc b/src/libraries/JANA/Topology/JEventSourceArrow.cc index 69b24fbe9..be116fe71 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.cc +++ b/src/libraries/JANA/Topology/JEventSourceArrow.cc @@ -23,11 +23,15 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_ // First check to see if we need to handle a barrier event before attempting to emit another event if (m_barrier_active) { + + auto emitted_event_count = m_sources[m_current_source]->GetEmittedEventCount(); + auto finished_event_count = m_sources[m_current_source]->GetFinishedEventCount(); + // A barrier event has been emitted by the source. if (m_pending_barrier_event != nullptr) { + // This barrier event is pending until the topology drains - if (m_sources[m_current_source]->GetEmittedEventCount() - - m_sources[m_current_source]->GetFinishedEventCount() == 1) { + if ((emitted_event_count - finished_event_count) == 1) { LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event is in-flight" << LOG_END; // Topology has drained; only remaining in-flight event is the barrier event itself, @@ -44,7 +48,7 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_ } else { // Topology has _not_ finished draining, all we can do is wait - LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event" << LOG_END; + LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event. Emitted = " << emitted_event_count << ", Finished = " << finished_event_count << LOG_END; LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END; assert(event == nullptr); @@ -56,8 +60,7 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_ else { // This barrier event has already been sent into the topology and we need to wait // until it is finished before emitting any more events - if (m_sources[m_current_source]->GetFinishedEventCount() == - m_sources[m_current_source]->GetEmittedEventCount()) { + if (finished_event_count == emitted_event_count) { // Barrier event has finished. LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event finished, returning to normal operation" << LOG_END; From 0e5d1d08307c389351e71f1cd6b242c22c0e85ea Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 26 Nov 2024 21:14:20 -0500 Subject: [PATCH 08/11] Improve log messages for manual arrow firing --- src/libraries/JANA/Engine/JExecutionEngine.cc | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/libraries/JANA/Engine/JExecutionEngine.cc b/src/libraries/JANA/Engine/JExecutionEngine.cc index 08e13fa0f..313e8f825 100644 --- a/src/libraries/JANA/Engine/JExecutionEngine.cc +++ b/src/libraries/JANA/Engine/JExecutionEngine.cc @@ -679,12 +679,15 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { std::unique_lock lock(m_mutex); JArrow* arrow = m_topology->arrows[arrow_id]; + LOG_INFO(GetLogger()) << "Attempting to fire arrow with name=" << arrow->get_name() << ", index=" << arrow_id << LOG_END; ArrowState& arrow_state = m_arrow_states[arrow_id]; if (arrow_state.status == ArrowState::Status::Finished) { + LOG_INFO(GetLogger()) << "Firing unsuccessful: Arrow status is Finished." << arrow_id << LOG_END; return JArrow::FireResult::Finished; } if (!arrow_state.is_parallel && arrow_state.active_tasks != 0) { + LOG_INFO(GetLogger()) << "Firing unsuccessful: Arrow is sequential and already has an active task." << arrow_id << LOG_END; return JArrow::FireResult::NotRunYet; } arrow_state.active_tasks += 1; @@ -701,12 +704,32 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { JArrow::FireResult result = JArrow::FireResult::NotRunYet; if (event != nullptr || port == -1) { + if (event != nullptr) { + LOG_INFO(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END; + } + else { + LOG_INFO(GetLogger()) << "No input events." << LOG_END; + } + LOG_INFO(GetLogger()) << "Firing arrow." << LOG_END; arrow->fire(event, outputs, output_count, result); + LOG_INFO(GetLogger()) << "Fired arrow." << LOG_END; + if (output_count == 0) { + LOG_INFO(GetLogger()) << "No output events." << LOG_END; + } + else { + for (size_t i=0; iGetEventNumber() << " on port " << outputs.at(i).second << LOG_END; + } + } + lock.lock(); arrow->push(outputs, output_count, location_id); arrow_state.active_tasks -= 1; lock.unlock(); } + else { + LOG_INFO(GetLogger()) << "Firing unsuccessful: Arrow needs an input event on port " << port << " but the queue or pool is empty." << LOG_END; + } return result; } From f9f0cbe1512f2ad9630ccbedeeb9e32baaaa8b99 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 26 Nov 2024 21:46:58 -0500 Subject: [PATCH 09/11] Improve topology pausing logic Previously, it would attempt to pause the topology regardless of runstate, including RunState::Finished, which could hypothetically lead to an invalid RunState::Finished->RunState::Paused transition. This also cleans up the narrative in the log messages. --- src/libraries/JANA/Engine/JExecutionEngine.cc | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/src/libraries/JANA/Engine/JExecutionEngine.cc b/src/libraries/JANA/Engine/JExecutionEngine.cc index 313e8f825..584c76831 100644 --- a/src/libraries/JANA/Engine/JExecutionEngine.cc +++ b/src/libraries/JANA/Engine/JExecutionEngine.cc @@ -569,31 +569,36 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) // so we check whether more are potentially coming. If not, we can pause the topology. // Note that our worker threads will still wait at ExchangeTask() until they get // shut down separately during Scale(). - - bool any_active_source_found = false; - bool any_active_task_found = false; - LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END; - - for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) { - auto& state = m_arrow_states[arrow_id]; - any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source); - any_active_task_found |= (state.active_tasks != 0); - // A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks - } - - if (!any_active_source_found && !any_active_task_found) { - // Pause the topology - m_time_at_finish = clock_t::now(); - m_event_count_at_finish = 0; - for (auto& arrow_state : m_arrow_states) { - if (arrow_state.is_sink) { - m_event_count_at_finish += arrow_state.events_processed; + if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Pausing || m_runstatus == RunStatus::Draining) { + // We want to avoid scenarios such as where the topology already Finished but then gets reset to Paused + // This also leaves a cleaner narrative in the logs. + + bool any_active_source_found = false; + bool any_active_task_found = false; + + LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END; + + for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) { + auto& state = m_arrow_states[arrow_id]; + any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source); + any_active_task_found |= (state.active_tasks != 0); + // A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks + } + + if (!any_active_source_found && !any_active_task_found) { + // Pause the topology + m_time_at_finish = clock_t::now(); + m_event_count_at_finish = 0; + for (auto& arrow_state : m_arrow_states) { + if (arrow_state.is_sink) { + m_event_count_at_finish += arrow_state.events_processed; + } } + LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END; + m_runstatus = RunStatus::Paused; + // I think this is the ONLY site where the topology gets paused. Verify this? } - LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END; - m_runstatus = RunStatus::Paused; - // I think this is the ONLY site where the topology gets paused. Verify this? } worker.last_arrow_id = -1; From ee3f3c4310f22194975d8e8c1d0d219d20c074cb Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 26 Nov 2024 21:55:58 -0500 Subject: [PATCH 10/11] Improve user interface for firing arrows from ApplicationInspector --- src/libraries/JANA/Engine/JExecutionEngine.cc | 25 +++++++++++-------- .../JANA/Utils/JApplicationInspector.cc | 3 +-- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/libraries/JANA/Engine/JExecutionEngine.cc b/src/libraries/JANA/Engine/JExecutionEngine.cc index 584c76831..41bc99f0b 100644 --- a/src/libraries/JANA/Engine/JExecutionEngine.cc +++ b/src/libraries/JANA/Engine/JExecutionEngine.cc @@ -683,16 +683,21 @@ bool JExecutionEngine::IsTimeoutEnabled() const { JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { std::unique_lock lock(m_mutex); + if (arrow_id >= m_topology->arrows.size()) { + LOG_WARN(GetLogger()) << "Firing unsuccessful: No arrow exists with id=" << arrow_id << LOG_END; + return JArrow::FireResult::NotRunYet; + } JArrow* arrow = m_topology->arrows[arrow_id]; - LOG_INFO(GetLogger()) << "Attempting to fire arrow with name=" << arrow->get_name() << ", index=" << arrow_id << LOG_END; + LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->get_name() + << ", index=" << arrow_id << ", location=" << location_id << LOG_END; ArrowState& arrow_state = m_arrow_states[arrow_id]; if (arrow_state.status == ArrowState::Status::Finished) { - LOG_INFO(GetLogger()) << "Firing unsuccessful: Arrow status is Finished." << arrow_id << LOG_END; + LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow status is Finished." << arrow_id << LOG_END; return JArrow::FireResult::Finished; } if (!arrow_state.is_parallel && arrow_state.active_tasks != 0) { - LOG_INFO(GetLogger()) << "Firing unsuccessful: Arrow is sequential and already has an active task." << arrow_id << LOG_END; + LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow is sequential and already has an active task." << arrow_id << LOG_END; return JArrow::FireResult::NotRunYet; } arrow_state.active_tasks += 1; @@ -710,20 +715,20 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { if (event != nullptr || port == -1) { if (event != nullptr) { - LOG_INFO(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END; + LOG_WARN(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END; } else { - LOG_INFO(GetLogger()) << "No input events." << LOG_END; + LOG_WARN(GetLogger()) << "No input events" << LOG_END; } - LOG_INFO(GetLogger()) << "Firing arrow." << LOG_END; + LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END; arrow->fire(event, outputs, output_count, result); - LOG_INFO(GetLogger()) << "Fired arrow." << LOG_END; + LOG_WARN(GetLogger()) << "Fired arrow" << LOG_END; if (output_count == 0) { - LOG_INFO(GetLogger()) << "No output events." << LOG_END; + LOG_WARN(GetLogger()) << "No output events" << LOG_END; } else { for (size_t i=0; iGetEventNumber() << " on port " << outputs.at(i).second << LOG_END; + LOG_WARN(GetLogger()) << "Output event #" << outputs.at(i).first->GetEventNumber() << " on port " << outputs.at(i).second << LOG_END; } } @@ -733,7 +738,7 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { lock.unlock(); } else { - LOG_INFO(GetLogger()) << "Firing unsuccessful: Arrow needs an input event on port " << port << " but the queue or pool is empty." << LOG_END; + LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END; } return result; diff --git a/src/libraries/JANA/Utils/JApplicationInspector.cc b/src/libraries/JANA/Utils/JApplicationInspector.cc index 26a520f71..368a0f516 100644 --- a/src/libraries/JANA/Utils/JApplicationInspector.cc +++ b/src/libraries/JANA/Utils/JApplicationInspector.cc @@ -32,8 +32,7 @@ void InspectTopology(JApplication* app) { void Fire(JApplication* app, int arrow_id) { auto engine = app->GetService(); - auto result = engine->Fire(arrow_id, 0); - std::cout << to_string(result); + engine->Fire(arrow_id, 0); } void InspectComponents(JApplication* app) { From 4148049819644d6eece0777837e6afe217d91469 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Wed, 27 Nov 2024 00:50:13 -0500 Subject: [PATCH 11/11] Fix logic in JExecutionEngine::Fire --- src/libraries/JANA/Engine/JExecutionEngine.cc | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/src/libraries/JANA/Engine/JExecutionEngine.cc b/src/libraries/JANA/Engine/JExecutionEngine.cc index 41bc99f0b..189684ba2 100644 --- a/src/libraries/JANA/Engine/JExecutionEngine.cc +++ b/src/libraries/JANA/Engine/JExecutionEngine.cc @@ -706,6 +706,17 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { JEvent* event = nullptr; if (port != -1) { event = arrow->pull(port, location_id); + if (event == nullptr) { + LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END; + arrow_state.active_tasks -= 1; + return JArrow::FireResult::NotRunYet; + } + else { + LOG_WARN(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END; + } + } + else { + LOG_WARN(GetLogger()) << "No input events" << LOG_END; } lock.unlock(); @@ -713,34 +724,22 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { JArrow::OutputData outputs; JArrow::FireResult result = JArrow::FireResult::NotRunYet; - if (event != nullptr || port == -1) { - if (event != nullptr) { - LOG_WARN(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END; - } - else { - LOG_WARN(GetLogger()) << "No input events" << LOG_END; - } - LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END; - arrow->fire(event, outputs, output_count, result); - LOG_WARN(GetLogger()) << "Fired arrow" << LOG_END; - if (output_count == 0) { - LOG_WARN(GetLogger()) << "No output events" << LOG_END; - } - else { - for (size_t i=0; iGetEventNumber() << " on port " << outputs.at(i).second << LOG_END; - } - } - - lock.lock(); - arrow->push(outputs, output_count, location_id); - arrow_state.active_tasks -= 1; - lock.unlock(); + LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END; + arrow->fire(event, outputs, output_count, result); + LOG_WARN(GetLogger()) << "Fired arrow with result " << to_string(result) << LOG_END; + if (output_count == 0) { + LOG_WARN(GetLogger()) << "No output events" << LOG_END; } else { - LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END; + for (size_t i=0; iGetEventNumber() << " on port " << outputs.at(i).second << LOG_END; + } } + lock.lock(); + arrow->push(outputs, output_count, location_id); + arrow_state.active_tasks -= 1; + lock.unlock(); return result; }