Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow cleanup #375

Merged
merged 8 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions src/examples/SubeventCUDAExample/SubeventCUDAExample.cu
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

#include <JANA/JApplication.h>
#include <JANA/JObject.h>
#include <JANA/Engine/JSubeventArrow.h>
#include <JANA/JEventSource.h>
#include <JANA/JEventProcessor.h>
#include "JANA/Engine/JTopologyBuilder.h"
#include <JANA/Topology/JSubeventArrow.h>


struct MyInput : public JObject {
Expand Down Expand Up @@ -128,8 +128,7 @@ int main() {
JMailbox <SubeventWrapper<MyOutput>> subevents_out;

auto split_arrow = new JSplitArrow<MyInput, MyOutput>("split", &processor, &events_in, &subevents_in);
auto subprocess_arrow = new JSubeventArrow<MyInput, MyOutput>("subprocess", &processor, &subevents_in,
&subevents_out);
auto subprocess_arrow = new JSubeventArrow<MyInput, MyOutput>("subprocess", &processor, &subevents_in, &subevents_out);
auto merge_arrow = new JMergeArrow<MyInput, MyOutput>("merge", &processor, &subevents_out, &events_out);

JApplication app;
Expand All @@ -140,12 +139,14 @@ int main() {
source->SetNEvents(10); // limit ourselves to 10 events. Note that the 'jana:nevents' param won't work
// here because we aren't using JComponentManager to manage the EventSource

auto topology = app.GetService<JTopologyBuilder>()->create_empty();
auto source_arrow = new JEventSourceArrow("simpleSource",
{source},
&events_in,
topology->event_pool);
auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool);
auto topology = app.GetService<JTopologyBuilder>();
auto source_arrow = new JEventSourceArrow("simpleSource", {source});
source_arrow->set_input(topology->event_pool);
source_arrow->set_output(&events_in);

auto proc_arrow = new JEventMapArrow("simpleProcessor");
proc_arrow->set_input(&events_out);
proc_arrow->set_output(topology->event_pool);
proc_arrow->add_processor(new SimpleProcessor);

topology->arrows.push_back(source_arrow);
Expand Down
6 changes: 3 additions & 3 deletions src/examples/SubeventExample/SubeventExample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
#include <JANA/JEventProcessor.h>

#include <JANA/Topology/JEventSourceArrow.h>
#include <JANA/Topology/JEventProcessorArrow.h>
#include <JANA/Topology/JEventMapArrow.h>
#include <JANA/Topology/JSubeventArrow.h>
#include "JANA/Topology/JTopologyBuilder.h"
#include <JANA/Topology/JTopologyBuilder.h>


struct MyInput : public JObject {
Expand Down Expand Up @@ -111,7 +111,7 @@ int main() {
source_arrow->set_input(topology->event_pool);
source_arrow->set_output(&events_in);

auto proc_arrow = new JEventProcessorArrow("simpleProcessor");
auto proc_arrow = new JEventMapArrow("simpleProcessor");
proc_arrow->set_input(&events_out);
proc_arrow->set_output(topology->event_pool);
proc_arrow->add_processor(new SimpleProcessor);
Expand Down
1 change: 0 additions & 1 deletion src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ set(JANA2_SOURCES
Engine/JPerfMetrics.cc
Engine/JPerfSummary.cc

Topology/JEventProcessorArrow.cc
Topology/JEventSourceArrow.cc
Topology/JEventMapArrow.cc
Topology/JEventTapArrow.cc
Expand Down
84 changes: 37 additions & 47 deletions src/libraries/JANA/Topology/JArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,24 @@
#include <JANA/JLogger.h>
#include <JANA/JException.h>
#include <JANA/Topology/JMailbox.h>
#include <JANA/Topology/JPool.h>
#include <JANA/Topology/JEventPool.h>


#ifndef JANA2_ARROWDATA_MAX_SIZE
#define JANA2_ARROWDATA_MAX_SIZE 10
#endif

struct PlaceRefBase;
struct Place;

using EventT = std::shared_ptr<JEvent>;

class JArrow {
private:
const std::string m_name; // Used for human understanding
const bool m_is_parallel; // Whether or not it is safe to parallelize
const bool m_is_source; // Whether or not this arrow should activate/drain the topology
bool m_is_sink; // Whether or not tnis arrow contributes to the final event count
JArrowMetrics m_metrics; // Performance information accumulated over all workers
std::string m_name; // Used for human understanding
bool m_is_parallel; // Whether or not it is safe to parallelize
bool m_is_source; // Whether or not this arrow should activate/drain the topology
bool m_is_sink; // Whether or not tnis arrow contributes to the final event count
JArrowMetrics m_metrics; // Performance information accumulated over all workers

friend class JScheduler;
std::vector<JArrow *> m_listeners; // Downstream Arrows
Expand All @@ -34,27 +36,22 @@ class JArrow {
// This is usable by subclasses.
JLogger m_logger;
friend class JTopologyBuilder;
std::vector<PlaceRefBase*> m_places; // Will eventually supplant m_listeners, m_chunksize
std::vector<Place*> m_places; // Will eventually supplant m_listeners

public:
std::string get_name() { return m_name; }
JLogger& get_logger() { return m_logger; }
bool is_parallel() { return m_is_parallel; }
bool is_source() { return m_is_source; }
bool is_sink() { return m_is_sink; }
JArrowMetrics& get_metrics() { return m_metrics; }

std::string get_name() { return m_name; }

void set_logger(JLogger logger) {
m_logger = logger;
}
void set_name(std::string name) { m_name = name; }
void set_logger(JLogger logger) { m_logger = logger; }
void set_is_parallel(bool is_parallel) { m_is_parallel = is_parallel; }
void set_is_source(bool is_source) { m_is_source = is_source; }
void set_is_sink(bool is_sink) { m_is_sink = is_sink; }

void set_is_sink(bool is_sink) {
m_is_sink = is_sink;
}

// TODO: Metrics should be encapsulated so that only actions are to update, clear, or summarize
JArrowMetrics& get_metrics() {
return m_metrics;
}

JArrow(std::string name, bool is_parallel, bool is_source, bool is_sink) :
m_name(std::move(name)), m_is_parallel(is_parallel), m_is_source(is_source), m_is_sink(is_sink) {
Expand All @@ -77,16 +74,15 @@ class JArrow {
m_listeners.push_back(downstream);
};

void attach(PlaceRefBase* place) {
void attach(Place* place) {
if (std::find(m_places.begin(), m_places.end(), place) == m_places.end()) {
m_places.push_back(place);
}
};
};

template <typename T>
struct Data {
std::array<T*, JANA2_ARROWDATA_MAX_SIZE> items;
std::array<EventT*, JANA2_ARROWDATA_MAX_SIZE> items;
size_t item_count = 0;
size_t reserve_count = 0;
size_t location_id;
Expand All @@ -96,59 +92,53 @@ struct Data {
}
};

struct PlaceRefBase {
struct Place {
void* place_ref = nullptr;
bool is_queue = true;
bool is_input = false;
size_t min_item_count = 1;
size_t max_item_count = 1;

virtual size_t get_pending() { return 0; }
};

template <typename T>
struct PlaceRef : public PlaceRefBase {

PlaceRef(JArrow* parent, bool is_input, size_t min_item_count, size_t max_item_count) {
Place(JArrow* parent, bool is_input, size_t min_item_count, size_t max_item_count) {
assert(parent != nullptr);
parent->attach(this);
this->is_input = is_input;
this->min_item_count = min_item_count;
this->max_item_count = max_item_count;
}

void set_queue(JMailbox<T*>* queue) {
void set_queue(JMailbox<EventT*>* queue) {
assert(queue != nullptr);
this->place_ref = queue;
this->is_queue = true;
}

void set_pool(JPool<T>* pool) {
void set_pool(JEventPool* pool) {
assert(pool != nullptr);
this->place_ref = pool;
this->is_queue = false;
}

size_t get_pending() override {
size_t get_pending() {
assert(place_ref != nullptr);
if (is_input && is_queue) {
auto queue = static_cast<JMailbox<T*>*>(place_ref);
auto queue = static_cast<JMailbox<EventT*>*>(place_ref);
return queue->size();
}
return 0;
}

bool pull(Data<T>& data) {
bool pull(Data& data) {
assert(place_ref != nullptr);
if (is_input) { // Actually pull the data
if (is_queue) {
auto queue = static_cast<JMailbox<T*>*>(place_ref);
auto queue = static_cast<JMailbox<EventT*>*>(place_ref);
data.item_count = queue->pop_and_reserve(data.items.data(), min_item_count, max_item_count, data.location_id);
data.reserve_count = data.item_count;
return (data.item_count >= min_item_count);
}
else {
auto pool = static_cast<JPool<T>*>(place_ref);
auto pool = static_cast<JEventPool*>(place_ref);
data.item_count = pool->pop(data.items.data(), min_item_count, max_item_count, data.location_id);
data.reserve_count = 0;
return (data.item_count >= min_item_count);
Expand All @@ -158,7 +148,7 @@ struct PlaceRef : public PlaceRefBase {
if (is_queue) {
// Reserve a space on the output queue
data.item_count = 0;
auto queue = static_cast<JMailbox<T*>*>(place_ref);
auto queue = static_cast<JMailbox<EventT*>*>(place_ref);
data.reserve_count = queue->reserve(min_item_count, max_item_count, data.location_id);
return (data.reserve_count >= min_item_count);
}
Expand All @@ -171,31 +161,31 @@ struct PlaceRef : public PlaceRefBase {
}
}

void revert(Data<T>& data) {
void revert(Data& data) {
assert(place_ref != nullptr);
if (is_queue) {
auto queue = static_cast<JMailbox<T*>*>(place_ref);
auto queue = static_cast<JMailbox<EventT*>*>(place_ref);
queue->push_and_unreserve(data.items.data(), data.item_count, data.reserve_count, data.location_id);
}
else {
if (is_input) {
auto pool = static_cast<JPool<T>*>(place_ref);
auto pool = static_cast<JEventPool*>(place_ref);
pool->push(data.items.data(), data.item_count, false, data.location_id);
}
}
}

size_t push(Data<T>& data) {
size_t push(Data& data) {
assert(place_ref != nullptr);
if (is_queue) {
auto queue = static_cast<JMailbox<T*>*>(place_ref);
auto queue = static_cast<JMailbox<EventT*>*>(place_ref);
queue->push_and_unreserve(data.items.data(), data.item_count, data.reserve_count, data.location_id);
data.item_count = 0;
data.reserve_count = 0;
return is_input ? 0 : data.item_count;
}
else {
auto pool = static_cast<JPool<T>*>(place_ref);
auto pool = static_cast<JEventPool*>(place_ref);
pool->push(data.items.data(), data.item_count, !is_input, data.location_id);
data.item_count = 0;
data.reserve_count = 0;
Expand All @@ -206,7 +196,7 @@ struct PlaceRef : public PlaceRefBase {

inline size_t JArrow::get_pending() {
size_t sum = 0;
for (PlaceRefBase* place : m_places) {
for (Place* place : m_places) {
sum += place->get_pending();
}
return sum;
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/Topology/JEventMapArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void JEventMapArrow::add_processor(JEventProcessor* processor) {
m_procs.push_back(processor);
}

void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) {
void JEventMapArrow::process(std::shared_ptr<JEvent>* event, bool& success, JArrowMetrics::Status& status) {

LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END;
for (JEventSource* source : m_sources) {
Expand Down
6 changes: 2 additions & 4 deletions src/libraries/JANA/Topology/JEventMapArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ class JEventUnfolder;
class JEventProcessor;
class JEvent;

using Event = std::shared_ptr<JEvent>;
using EventQueue = JMailbox<Event*>;

class JEventMapArrow : public JPipelineArrow<JEventMapArrow, Event> {
class JEventMapArrow : public JPipelineArrow<JEventMapArrow> {

private:
std::vector<JEventSource*> m_sources;
Expand All @@ -28,7 +26,7 @@ class JEventMapArrow : public JPipelineArrow<JEventMapArrow, Event> {
void add_unfolder(JEventUnfolder* unfolder);
void add_processor(JEventProcessor* proc);

void process(Event* event, bool& success, JArrowMetrics::Status& status);
void process(std::shared_ptr<JEvent>* event, bool& success, JArrowMetrics::Status& status);

void initialize() final;
void finalize() final;
Expand Down
Loading
Loading