Skip to content

Commit

Permalink
Merge pull request #387 from JeffersonLab/nbrei_gluex_fixes
Browse files Browse the repository at this point in the history
Another round of fixes for GlueX
  • Loading branch information
nathanwbrei authored Nov 27, 2024
2 parents 0d3563d + 4148049 commit e6da892
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 55 deletions.
22 changes: 12 additions & 10 deletions .github/halld_recon_build.sh
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
#!/bin/bash

echo "mounting cvmfs"
echo "Mounting CVMFS"
yum -y install fuse
chmod 666 /dev/fuse
mkdir -p /cvmfs/oasis.opensciencegrid.org
mount -t cvmfs oasis.opensciencegrid.org /cvmfs/oasis.opensciencegrid.org

cd /workspace/halld_recon
ln -s /cvmfs/oasis.opensciencegrid.org/gluex/group /group
export BUILD_SCRIPTS=/group/halld/Software/build_scripts
source $BUILD_SCRIPTS/gluex_env_boot_jlab.sh --bs $BUILD_SCRIPTS

export BMS_OSNAME_OVERRIDE="Linux_Alma9-x86_64-gcc11.4.1-cntr"
# The BMS_OSNAME override is needed because Alma9 retroactively switched its system gcc
# from 11.4.1 to 11.5. We cannot create a new Alma9 container that uses gcc11.4.1, but
# the CVMFS artifact repository hasn't been updated to reflect that.

source /group/halld/Software/build_scripts/gluex_env_boot_jlab.sh
gxenv /workspace/JANA2/.github/halld_recon_build_prereqs_version.xml

echo "rootsys"
echo $ROOTSYS
chmod +x $JANA_HOME/bin/*
cd src
scons install -j12
echo "ROOTSYS=$ROOTSYS"
cd /workspace/halld_recon/src
scons install -j12 DEBUG=1 OPTIMIZATION=0 SHOWBUILD=1

105 changes: 74 additions & 31 deletions src/libraries/JANA/Engine/JExecutionEngine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -511,15 +511,23 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker)
if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Draining) {
// We only pick up a new task if the topology is running or draining.

for (size_t arrow_id=0; arrow_id<m_arrow_states.size(); ++arrow_id) {
// Each call to FindNextReadyTask_Unsafe() starts with a different m_next_arrow_id to ensure balanced arrow assignments
size_t arrow_count = m_arrow_states.size();
m_next_arrow_id += 1;
m_next_arrow_id %= arrow_count;

for (size_t i=m_next_arrow_id; i<(m_next_arrow_id+arrow_count); ++i) {
size_t arrow_id = i % arrow_count;

auto& state = m_arrow_states[arrow_id];
if (!state.is_parallel && (state.active_tasks != 0)) {
// We've found a sequential arrow that is already running. Nothing we can do here.
// We've found a sequential arrow that is already active. Nothing we can do here.
LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Sequential and already active." << LOG_END;
continue;
}

if (state.status != ArrowState::Status::Running) {
LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Arrow is either paused or finished." << LOG_END;
continue;
}
// TODO: Support next_visit_time so that we don't hammer blocked event sources
Expand All @@ -530,6 +538,7 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker)
auto port = arrow->get_next_port_index();
JEvent* event = (port == -1) ? nullptr : arrow->pull(port, worker.location_id);
if (event != nullptr || port == -1) {
LOG_TRACE(GetLogger()) << "Scheduler: Found next ready arrow with id " << arrow_id << LOG_END;
// We've found a task that is ready!
state.active_tasks += 1;

Expand All @@ -550,40 +559,46 @@ void JExecutionEngine::FindNextReadyTask_Unsafe(Task& task, WorkerState& worker)
}
return;
}
else {
LOG_TRACE(GetLogger()) << "Scheduler: Arrow with id " << arrow_id << " is unready: Input event is needed but not on queue yet." << LOG_END;
}
}
}

// Because we reached this point, we know that there aren't any tasks ready,
// so we check whether more are potentially coming. If not, we can pause the topology.
// Note that our worker threads will still wait at ExchangeTask() until they get
// shut down separately during Scale().

bool any_active_source_found = false;
bool any_active_task_found = false;

LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END;
if (m_runstatus == RunStatus::Running || m_runstatus == RunStatus::Pausing || m_runstatus == RunStatus::Draining) {
// We want to avoid scenarios such as where the topology already Finished but then gets reset to Paused
// This also leaves a cleaner narrative in the logs.

for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) {
auto& state = m_arrow_states[arrow_id];
auto* arrow = m_topology->arrows[arrow_id];
LOG_TRACE(GetLogger()) << "Scheduler: arrow=" << arrow->get_name() << ", is_source=" << state.is_source << ", active_tasks=" << state.active_tasks << ", is_parallel=" << state.is_parallel << LOG_END;
any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source);
any_active_task_found |= (state.active_tasks != 0);
// A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks
}

if (!any_active_source_found && !any_active_task_found) {
// Pause the topology
m_time_at_finish = clock_t::now();
m_event_count_at_finish = 0;
for (auto& arrow_state : m_arrow_states) {
if (arrow_state.is_sink) {
m_event_count_at_finish += arrow_state.events_processed;
bool any_active_source_found = false;
bool any_active_task_found = false;

LOG_DEBUG(GetLogger()) << "Scheduler: No tasks ready" << LOG_END;

for (size_t arrow_id = 0; arrow_id < m_arrow_states.size(); ++arrow_id) {
auto& state = m_arrow_states[arrow_id];
any_active_source_found |= (state.status == ArrowState::Status::Running && state.is_source);
any_active_task_found |= (state.active_tasks != 0);
// A source might have been deactivated by RequestPause, Ctrl-C, etc, and might be inactive even though it still has active tasks
}

if (!any_active_source_found && !any_active_task_found) {
// Pause the topology
m_time_at_finish = clock_t::now();
m_event_count_at_finish = 0;
for (auto& arrow_state : m_arrow_states) {
if (arrow_state.is_sink) {
m_event_count_at_finish += arrow_state.events_processed;
}
}
LOG_DEBUG(GetLogger()) << "Scheduler: Processing paused" << LOG_END;
m_runstatus = RunStatus::Paused;
// I think this is the ONLY site where the topology gets paused. Verify this?
}
LOG_DEBUG(GetLogger()) << "Processing paused" << LOG_END;
m_runstatus = RunStatus::Paused;
// I think this is the ONLY site where the topology gets paused. Verify this?
}

worker.last_arrow_id = -1;
Expand Down Expand Up @@ -668,13 +683,21 @@ bool JExecutionEngine::IsTimeoutEnabled() const {
JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {

std::unique_lock<std::mutex> lock(m_mutex);
if (arrow_id >= m_topology->arrows.size()) {
LOG_WARN(GetLogger()) << "Firing unsuccessful: No arrow exists with id=" << arrow_id << LOG_END;
return JArrow::FireResult::NotRunYet;
}
JArrow* arrow = m_topology->arrows[arrow_id];
LOG_WARN(GetLogger()) << "Attempting to fire arrow with name=" << arrow->get_name()
<< ", index=" << arrow_id << ", location=" << location_id << LOG_END;

ArrowState& arrow_state = m_arrow_states[arrow_id];
if (arrow_state.status == ArrowState::Status::Finished) {
LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow status is Finished." << arrow_id << LOG_END;
return JArrow::FireResult::Finished;
}
if (!arrow_state.is_parallel && arrow_state.active_tasks != 0) {
LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow is sequential and already has an active task." << arrow_id << LOG_END;
return JArrow::FireResult::NotRunYet;
}
arrow_state.active_tasks += 1;
Expand All @@ -683,21 +706,40 @@ JArrow::FireResult JExecutionEngine::Fire(size_t arrow_id, size_t location_id) {
JEvent* event = nullptr;
if (port != -1) {
event = arrow->pull(port, location_id);
if (event == nullptr) {
LOG_WARN(GetLogger()) << "Firing unsuccessful: Arrow needs an input event from port " << port << ", but the queue or pool is empty." << LOG_END;
arrow_state.active_tasks -= 1;
return JArrow::FireResult::NotRunYet;
}
else {
LOG_WARN(GetLogger()) << "Input event #" << event->GetEventNumber() << " from port " << port << LOG_END;
}
}
else {
LOG_WARN(GetLogger()) << "No input events" << LOG_END;
}
lock.unlock();

size_t output_count;
JArrow::OutputData outputs;
JArrow::FireResult result = JArrow::FireResult::NotRunYet;

if (event != nullptr || port == -1) {
arrow->fire(event, outputs, output_count, result);
lock.lock();
arrow->push(outputs, output_count, location_id);
arrow_state.active_tasks -= 1;
lock.unlock();
LOG_WARN(GetLogger()) << "Firing arrow" << LOG_END;
arrow->fire(event, outputs, output_count, result);
LOG_WARN(GetLogger()) << "Fired arrow with result " << to_string(result) << LOG_END;
if (output_count == 0) {
LOG_WARN(GetLogger()) << "No output events" << LOG_END;
}
else {
for (size_t i=0; i<output_count; ++i) {
LOG_WARN(GetLogger()) << "Output event #" << outputs.at(i).first->GetEventNumber() << " on port " << outputs.at(i).second << LOG_END;
}
}

lock.lock();
arrow->push(outputs, output_count, location_id);
arrow_state.active_tasks -= 1;
lock.unlock();
return result;
}

Expand Down Expand Up @@ -778,6 +820,7 @@ std::string ToString(JExecutionEngine::RunStatus runstatus) {
case JExecutionEngine::RunStatus::Pausing: return "Pausing";
case JExecutionEngine::RunStatus::Draining: return "Draining";
case JExecutionEngine::RunStatus::Finished: return "Finished";
default: return "CorruptedRunStatus";
}
}

1 change: 1 addition & 0 deletions src/libraries/JANA/Engine/JExecutionEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class JExecutionEngine : public JService {
std::atomic<InterruptStatus> m_interrupt_status { InterruptStatus::NoInterruptsUnsupervised };
std::atomic_bool m_print_worker_report_requested {false};
std::atomic_bool m_send_worker_report_requested {false};
size_t m_next_arrow_id=0;

// Metrics
size_t m_event_count_at_start = 0;
Expand Down
13 changes: 8 additions & 5 deletions src/libraries/JANA/Topology/JEventSourceArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_

// First check to see if we need to handle a barrier event before attempting to emit another event
if (m_barrier_active) {

auto emitted_event_count = m_sources[m_current_source]->GetEmittedEventCount();
auto finished_event_count = m_sources[m_current_source]->GetFinishedEventCount();

// A barrier event has been emitted by the source.
if (m_pending_barrier_event != nullptr) {

// This barrier event is pending until the topology drains
if (m_sources[m_current_source]->GetEmittedEventCount() -
m_sources[m_current_source]->GetFinishedEventCount() == 1) {
if ((emitted_event_count - finished_event_count) == 1) {
LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event is in-flight" << LOG_END;

// Topology has drained; only remaining in-flight event is the barrier event itself,
Expand All @@ -44,7 +48,7 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_
}
else {
// Topology has _not_ finished draining, all we can do is wait
LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event" << LOG_END;
LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event. Emitted = " << emitted_event_count << ", Finished = " << finished_event_count << LOG_END;
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;

assert(event == nullptr);
Expand All @@ -56,8 +60,7 @@ void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_
else {
// This barrier event has already been sent into the topology and we need to wait
// until it is finished before emitting any more events
if (m_sources[m_current_source]->GetFinishedEventCount() ==
m_sources[m_current_source]->GetEmittedEventCount()) {
if (finished_event_count == emitted_event_count) {

// Barrier event has finished.
LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event finished, returning to normal operation" << LOG_END;
Expand Down
3 changes: 1 addition & 2 deletions src/libraries/JANA/Utils/JApplicationInspector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ void InspectTopology(JApplication* app) {

void Fire(JApplication* app, int arrow_id) {
auto engine = app->GetService<JExecutionEngine>();
auto result = engine->Fire(arrow_id, 0);
std::cout << to_string(result);
engine->Fire(arrow_id, 0);
}

void InspectComponents(JApplication* app) {
Expand Down
28 changes: 27 additions & 1 deletion src/programs/unit_tests/Components/BarrierEventTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "JANA/Utils/JBenchUtils.h"
#include "catch.hpp"

int global_resource = 0;
size_t global_resource = 0;


struct BarrierSource : public JEventSource {
Expand Down Expand Up @@ -93,6 +93,32 @@ struct BarrierProcessor : public JEventProcessor {
};


TEST_CASE("BarrierEventTests_SingleThread") {
global_resource = 0;
JApplication app;
app.Add(new BarrierProcessor);
app.Add(new BarrierSource);
app.SetParameterValue("nthreads", 1);
app.SetParameterValue("jana:nevents", 40);
//app.SetParameterValue("jana:log:show_threadstamp", true);
//app.SetParameterValue("jana:loglevel", "debug");
app.Run(true);
};


TEST_CASE("BarrierEventTests_Legacy_SingleThread") {
global_resource = 0;
JApplication app;
app.Add(new LegacyBarrierProcessor);
app.Add(new BarrierSource);
app.SetParameterValue("nthreads", 1);
app.SetParameterValue("jana:nevents", 40);
//app.SetParameterValue("jana:log:show_threadstamp", true);
//app.SetParameterValue("jana:loglevel", "debug");
app.Run(true);
};


TEST_CASE("BarrierEventTests") {
global_resource = 0;
JApplication app;
Expand Down
42 changes: 41 additions & 1 deletion src/programs/unit_tests/Components/JFactoryTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


#include "JANA/Components/JComponentFwd.h"
#include "JANA/JApplicationFwd.h"
#include "JANA/JFactory.h"
#include "catch.hpp"
#include "JFactoryTests.h"
Expand Down Expand Up @@ -566,4 +567,43 @@ TEST_CASE("JFactory_ExceptionHandling") {
}
}


TEST_CASE("JFactory_GetObjects_Caching") {
JApplication app;
app.Add(new JFactoryGeneratorT<JFactoryT<JFactoryTestDummyObject>>());
app.Add(new JFactoryGeneratorT<JFactoryT<DifferentDummyObject>>());
auto source = new JFactoryTestDummySource;
auto event = std::make_shared<JEvent>(&app);
event->SetJEventSource(source);

SECTION("RepeatedGetObjectsAreCached") {
auto dummies = event->Get<JFactoryTestDummyObject>();
REQUIRE(dummies.at(0)->data == 8);
REQUIRE(source->get_objects_count == 1);
REQUIRE(source->get_objects_dummy_count == 1);

dummies = event->Get<JFactoryTestDummyObject>();
REQUIRE(dummies.at(0)->data == 8);
REQUIRE(source->get_objects_count == 1);
REQUIRE(source->get_objects_dummy_count == 1);
}

SECTION("DifferentGetObjectsAreNotCached") {
auto dummies = event->Get<JFactoryTestDummyObject>();
REQUIRE(dummies.at(0)->data == 8);
REQUIRE(source->get_objects_count == 1);
REQUIRE(source->get_objects_dummy_count == 1);
REQUIRE(source->get_objects_different_count == 0);

auto different = event->Get<DifferentDummyObject>();
REQUIRE(different.at(0)->E == 123.0);
REQUIRE(source->get_objects_count == 2);
REQUIRE(source->get_objects_dummy_count == 1);
REQUIRE(source->get_objects_different_count == 1);

different = event->Get<DifferentDummyObject>();
REQUIRE(different.at(0)->E == 123.0);
REQUIRE(source->get_objects_count == 2);
REQUIRE(source->get_objects_dummy_count == 1);
REQUIRE(source->get_objects_different_count == 1);
}
}
Loading

0 comments on commit e6da892

Please sign in to comment.