diff --git a/.github/halld_recon_build.sh b/.github/halld_recon_build.sh index 19810271e..e2c405132 100644 --- a/.github/halld_recon_build.sh +++ b/.github/halld_recon_build.sh @@ -1,19 +1,21 @@ #!/bin/bash -echo "mounting cvmfs" +echo "Mounting CVMFS" yum -y install fuse chmod 666 /dev/fuse mkdir -p /cvmfs/oasis.opensciencegrid.org mount -t cvmfs oasis.opensciencegrid.org /cvmfs/oasis.opensciencegrid.org - -cd /workspace/halld_recon ln -s /cvmfs/oasis.opensciencegrid.org/gluex/group /group -export BUILD_SCRIPTS=/group/halld/Software/build_scripts -source $BUILD_SCRIPTS/gluex_env_boot_jlab.sh --bs $BUILD_SCRIPTS + +export BMS_OSNAME_OVERRIDE="Linux_Alma9-x86_64-gcc11.4.1-cntr" +# The BMS_OSNAME override is needed because Alma9 retroactively switched its system gcc +# from 11.4.1 to 11.5. We cannot create a new Alma9 container that uses gcc11.4.1, but +# the CVMFS artifact repository hasn't been updated to reflect that. + +source /group/halld/Software/build_scripts/gluex_env_boot_jlab.sh gxenv /workspace/JANA2/.github/halld_recon_build_prereqs_version.xml -echo "rootsys" -echo $ROOTSYS -chmod +x $JANA_HOME/bin/* -cd src -scons install -j12 \ No newline at end of file +echo "ROOTSYS=$ROOTSYS" +cd /workspace/halld_recon/src +scons install -j12 DEBUG=1 OPTIMIZATION=0 SHOWBUILD=1 + diff --git a/src/libraries/JANA/Engine/JExecutionEngine.cc b/src/libraries/JANA/Engine/JExecutionEngine.cc index c9ae36edf..189684ba2 100644 --- a/src/libraries/JANA/Engine/JExecutionEngine.cc +++ b/src/libraries/JANA/Engine/JExecutionEngine.cc @@ -511,15 +511,23 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Draining) { // We only pick up a new task if the topology is running or draining. - for (size_t arrow_id=0; arrow_idget_next_port_index(); JEvent* event = (port == -1) ? nullptr : arrow->pull(port, worker.location_id); if (event != nullptr || port == -1) { + LOG_TRACE(GetLogger()) << "Scheduler: Found next ready arrow with id " << arrow_id << LOG_END; // We've found a task that is ready! state.active_tasks += 1; @@ -550,6 +559,9 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) } return; } + else { + LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Input event is needed but not on queue yet." << LOG_END; + } } } @@ -557,33 +569,36 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker) // so we check whether more are potentially coming. If not, we can pause the topology. // Note that our worker threads will still wait at ExchangeTask() until they get // shut down separately during Scale(). - - bool any_active_source_found = false; - bool any_active_task_found = false; - LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END; + if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Pausing || m_runstatus == RunStatus::Draining) { + // We want to avoid scenarios such as where the topology already Finished but then gets reset to Paused + // This also leaves a cleaner narrative in the logs. - for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) { - auto& state = m_arrow_states[arrow_id]; - auto* arrow = m_topology->arrows[arrow_id]; - LOG_TRACE(GetLogger()) << "Scheduler: arrow=" << arrow->get_name() << ", is_source=" << state.is_source << ", active_tasks=" << state.active_tasks << ", is_parallel=" << state.is_parallel << LOG_END; - any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source); - any_active_task_found |= (state.active_tasks != 0); - // A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks - } - - if (!any_active_source_found && !any_active_task_found) { - // Pause the topology - m_time_at_finish = clock_t::now(); - m_event_count_at_finish = 0; - for (auto& arrow_state : m_arrow_states) { - if (arrow_state.is_sink) { - m_event_count_at_finish += arrow_state.events_processed; + bool any_active_source_found = false; + bool any_active_task_found = false; + + LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END; + + for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) { + auto& state = m_arrow_states[arrow_id]; + any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source); + any_active_task_found |= (state.active_tasks != 0); + // A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks + } + + if (!any_active_source_found && !any_active_task_found) { + // Pause the topology + m_time_at_finish = clock_t::now(); + m_event_count_at_finish = 0; + for (auto& arrow_state : m_arrow_states) { + if (arrow_state.is_sink) { + m_event_count_at_finish += arrow_state.events_processed; + } } + LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END; + m_runstatus = RunStatus::Paused; + // I think this is the ONLY site where the topology gets paused. Verify this? } - LOG_DEBUG(GetLogger()) << "Processing paused" << LOG_END; - m_runstatus = RunStatus::Paused; - // I think this is the ONLY site where the topology gets paused. Verify this? } worker.last_arrow_id = -1; @@ -668,13 +683,21 @@ bool JExecutionEngine::IsTimeoutEnabled() const { JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { std::unique_lock lock(m_mutex); + if (arrow_id >= m_topology->arrows.size()) { + LOG_WARN(GetLogger()) << "Firing unsuccessful: No arrow exists with id=" << arrow_id << LOG_END; + return JArrow::FireResult::NotRunYet; + } JArrow* arrow = m_topology->arrows[arrow_id]; + LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->get_name() + << ", index=" << arrow_id << ", location=" << location_id << LOG_END; ArrowState& arrow_state = m_arrow_states[arrow_id]; if (arrow_state.status == ArrowState::Status::Finished) { + LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow status is Finished." << arrow_id << LOG_END; return JArrow::FireResult::Finished; } if (!arrow_state.is_parallel && arrow_state.active_tasks != 0) { + LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow is sequential and already has an active task." << arrow_id << LOG_END; return JArrow::FireResult::NotRunYet; } arrow_state.active_tasks += 1; @@ -683,6 +706,17 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { JEvent* event = nullptr; if (port != -1) { event = arrow->pull(port, location_id); + if (event == nullptr) { + LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END; + arrow_state.active_tasks -= 1; + return JArrow::FireResult::NotRunYet; + } + else { + LOG_WARN(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END; + } + } + else { + LOG_WARN(GetLogger()) << "No input events" << LOG_END; } lock.unlock(); @@ -690,14 +724,22 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) { JArrow::OutputData outputs; JArrow::FireResult result = JArrow::FireResult::NotRunYet; - if (event != nullptr || port == -1) { - arrow->fire(event, outputs, output_count, result); - lock.lock(); - arrow->push(outputs, output_count, location_id); - arrow_state.active_tasks -= 1; - lock.unlock(); + LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END; + arrow->fire(event, outputs, output_count, result); + LOG_WARN(GetLogger()) << "Fired arrow with result " << to_string(result) << LOG_END; + if (output_count == 0) { + LOG_WARN(GetLogger()) << "No output events" << LOG_END; + } + else { + for (size_t i=0; iGetEventNumber() << " on port " << outputs.at(i).second << LOG_END; + } } + lock.lock(); + arrow->push(outputs, output_count, location_id); + arrow_state.active_tasks -= 1; + lock.unlock(); return result; } @@ -778,6 +820,7 @@ std::string ToString(JExecutionEngine::RunStatus runstatus) { case JExecutionEngine::RunStatus::Pausing: return "Pausing"; case JExecutionEngine::RunStatus::Draining: return "Draining"; case JExecutionEngine::RunStatus::Finished: return "Finished"; + default: return "CorruptedRunStatus"; } } diff --git a/src/libraries/JANA/Engine/JExecutionEngine.h b/src/libraries/JANA/Engine/JExecutionEngine.h index f0e2ffd66..28e6c0468 100644 --- a/src/libraries/JANA/Engine/JExecutionEngine.h +++ b/src/libraries/JANA/Engine/JExecutionEngine.h @@ -103,6 +103,7 @@ class JExecutionEngine : public JService { std::atomic m_interrupt_status { InterruptStatus::NoInterruptsUnsupervised }; std::atomic_bool m_print_worker_report_requested {false}; std::atomic_bool m_send_worker_report_requested {false}; + size_t m_next_arrow_id=0; // Metrics size_t m_event_count_at_start = 0; diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.cc b/src/libraries/JANA/Topology/JEventSourceArrow.cc index 69b24fbe9..be116fe71 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.cc +++ b/src/libraries/JANA/Topology/JEventSourceArrow.cc @@ -23,11 +23,15 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_ // First check to see if we need to handle a barrier event before attempting to emit another event if (m_barrier_active) { + + auto emitted_event_count = m_sources[m_current_source]->GetEmittedEventCount(); + auto finished_event_count = m_sources[m_current_source]->GetFinishedEventCount(); + // A barrier event has been emitted by the source. if (m_pending_barrier_event != nullptr) { + // This barrier event is pending until the topology drains - if (m_sources[m_current_source]->GetEmittedEventCount() - - m_sources[m_current_source]->GetFinishedEventCount() == 1) { + if ((emitted_event_count - finished_event_count) == 1) { LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event is in-flight" << LOG_END; // Topology has drained; only remaining in-flight event is the barrier event itself, @@ -44,7 +48,7 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_ } else { // Topology has _not_ finished draining, all we can do is wait - LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event" << LOG_END; + LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event. Emitted = " << emitted_event_count << ", Finished = " << finished_event_count << LOG_END; LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END; assert(event == nullptr); @@ -56,8 +60,7 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_ else { // This barrier event has already been sent into the topology and we need to wait // until it is finished before emitting any more events - if (m_sources[m_current_source]->GetFinishedEventCount() == - m_sources[m_current_source]->GetEmittedEventCount()) { + if (finished_event_count == emitted_event_count) { // Barrier event has finished. LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event finished, returning to normal operation" << LOG_END; diff --git a/src/libraries/JANA/Utils/JApplicationInspector.cc b/src/libraries/JANA/Utils/JApplicationInspector.cc index 26a520f71..368a0f516 100644 --- a/src/libraries/JANA/Utils/JApplicationInspector.cc +++ b/src/libraries/JANA/Utils/JApplicationInspector.cc @@ -32,8 +32,7 @@ void InspectTopology(JApplication* app) { void Fire(JApplication* app, int arrow_id) { auto engine = app->GetService(); - auto result = engine->Fire(arrow_id, 0); - std::cout << to_string(result); + engine->Fire(arrow_id, 0); } void InspectComponents(JApplication* app) { diff --git a/src/programs/unit_tests/Components/BarrierEventTests.cc b/src/programs/unit_tests/Components/BarrierEventTests.cc index e3a7a467c..37e62580c 100644 --- a/src/programs/unit_tests/Components/BarrierEventTests.cc +++ b/src/programs/unit_tests/Components/BarrierEventTests.cc @@ -8,7 +8,7 @@ #include "JANA/Utils/JBenchUtils.h" #include "catch.hpp" -int global_resource = 0; +size_t global_resource = 0; struct BarrierSource : public JEventSource { @@ -93,6 +93,32 @@ struct BarrierProcessor : public JEventProcessor { }; +TEST_CASE("BarrierEventTests_SingleThread") { + global_resource = 0; + JApplication app; + app.Add(new BarrierProcessor); + app.Add(new BarrierSource); + app.SetParameterValue("nthreads", 1); + app.SetParameterValue("jana:nevents", 40); + //app.SetParameterValue("jana:log:show_threadstamp", true); + //app.SetParameterValue("jana:loglevel", "debug"); + app.Run(true); +}; + + +TEST_CASE("BarrierEventTests_Legacy_SingleThread") { + global_resource = 0; + JApplication app; + app.Add(new LegacyBarrierProcessor); + app.Add(new BarrierSource); + app.SetParameterValue("nthreads", 1); + app.SetParameterValue("jana:nevents", 40); + //app.SetParameterValue("jana:log:show_threadstamp", true); + //app.SetParameterValue("jana:loglevel", "debug"); + app.Run(true); +}; + + TEST_CASE("BarrierEventTests") { global_resource = 0; JApplication app; diff --git a/src/programs/unit_tests/Components/JFactoryTests.cc b/src/programs/unit_tests/Components/JFactoryTests.cc index 07ada4b21..7149dfe9b 100644 --- a/src/programs/unit_tests/Components/JFactoryTests.cc +++ b/src/programs/unit_tests/Components/JFactoryTests.cc @@ -4,6 +4,7 @@ #include "JANA/Components/JComponentFwd.h" +#include "JANA/JApplicationFwd.h" #include "JANA/JFactory.h" #include "catch.hpp" #include "JFactoryTests.h" @@ -566,4 +567,43 @@ TEST_CASE("JFactory_ExceptionHandling") { } } - +TEST_CASE("JFactory_GetObjects_Caching") { + JApplication app; + app.Add(new JFactoryGeneratorT>()); + app.Add(new JFactoryGeneratorT>()); + auto source = new JFactoryTestDummySource; + auto event = std::make_shared(&app); + event->SetJEventSource(source); + + SECTION("RepeatedGetObjectsAreCached") { + auto dummies = event->Get(); + REQUIRE(dummies.at(0)->data == 8); + REQUIRE(source->get_objects_count == 1); + REQUIRE(source->get_objects_dummy_count == 1); + + dummies = event->Get(); + REQUIRE(dummies.at(0)->data == 8); + REQUIRE(source->get_objects_count == 1); + REQUIRE(source->get_objects_dummy_count == 1); + } + + SECTION("DifferentGetObjectsAreNotCached") { + auto dummies = event->Get(); + REQUIRE(dummies.at(0)->data == 8); + REQUIRE(source->get_objects_count == 1); + REQUIRE(source->get_objects_dummy_count == 1); + REQUIRE(source->get_objects_different_count == 0); + + auto different = event->Get(); + REQUIRE(different.at(0)->E == 123.0); + REQUIRE(source->get_objects_count == 2); + REQUIRE(source->get_objects_dummy_count == 1); + REQUIRE(source->get_objects_different_count == 1); + + different = event->Get(); + REQUIRE(different.at(0)->E == 123.0); + REQUIRE(source->get_objects_count == 2); + REQUIRE(source->get_objects_dummy_count == 1); + REQUIRE(source->get_objects_different_count == 1); + } +} diff --git a/src/programs/unit_tests/Components/JFactoryTests.h b/src/programs/unit_tests/Components/JFactoryTests.h index e700b86c4..e2cbc0188 100644 --- a/src/programs/unit_tests/Components/JFactoryTests.h +++ b/src/programs/unit_tests/Components/JFactoryTests.h @@ -23,6 +23,11 @@ struct JFactoryTestDummyObject : public JObject { } }; +struct DifferentDummyObject : public JObject { + double E; + DifferentDummyObject(double E) : E(E) {} +}; + /// DummyFactory is a trivial JFactory which reports how often its member functions get called @@ -69,6 +74,10 @@ struct JFactoryTestExceptingInInitFactory : public JFactoryT&, JFactory* aFactory) override { - auto factory = dynamic_cast*>(aFactory); - if (factory != nullptr) { - factory->Insert(new JFactoryTestDummyObject(8)); - factory->Insert(new JFactoryTestDummyObject(88)); + get_objects_count += 1; + auto dummy_factory = dynamic_cast*>(aFactory); + if (dummy_factory != nullptr) { + get_objects_dummy_count += 1; + dummy_factory->Insert(new JFactoryTestDummyObject(8)); + dummy_factory->Insert(new JFactoryTestDummyObject(88)); + return true; + } + auto different_factory = dynamic_cast*>(aFactory); + if (different_factory != nullptr) { + get_objects_different_count += 1; + different_factory->Insert(new DifferentDummyObject(123.)); + return true; } return false; } diff --git a/src/programs/unit_tests/Engine/ScaleTests.cc b/src/programs/unit_tests/Engine/ScaleTests.cc index 4102b460a..cbc305463 100644 --- a/src/programs/unit_tests/Engine/ScaleTests.cc +++ b/src/programs/unit_tests/Engine/ScaleTests.cc @@ -57,7 +57,7 @@ TEST_CASE("ScaleNWorkerUpdate") { app.Stop(true); } -TEST_CASE("ScaleThroughputImprovement") { +TEST_CASE("ScaleThroughputImprovement", "[.][performance]") { JApplication app; app.SetParameterValue("jana:loglevel", "INFO");