Skip to content

Commit

Permalink
Move queue and pool refs out of Arrow constructor args
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanwbrei committed Aug 20, 2024
1 parent b65ac3c commit d1d5162
Show file tree
Hide file tree
Showing 15 changed files with 73 additions and 90 deletions.
12 changes: 7 additions & 5 deletions src/examples/SubeventExample/SubeventExample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,13 @@ int main() {
auto subprocess_arrow = new JSubeventArrow<MyInput, MyOutput>("subprocess", &processor, &subevents_in, &subevents_out);
auto merge_arrow = new JMergeArrow<MyInput, MyOutput>("merge", &processor, &subevents_out, &events_out);

auto source_arrow = new JEventSourceArrow("simpleSource",
{source},
&events_in,
topology->event_pool);
auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool);
auto source_arrow = new JEventSourceArrow("simpleSource", {source});
source_arrow->set_input(topology->event_pool);
source_arrow->set_output(&events_in);

auto proc_arrow = new JEventProcessorArrow("simpleProcessor");
proc_arrow->set_input(&events_out);
proc_arrow->set_output(topology->event_pool);
proc_arrow->add_processor(new SimpleProcessor);

builder.arrows.push_back(source_arrow);
Expand Down
12 changes: 2 additions & 10 deletions src/libraries/JANA/Topology/JEventMapArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,8 @@
#include <JANA/JEvent.h>


JEventMapArrow::JEventMapArrow(std::string name,
EventQueue *input_queue,
EventQueue *output_queue)
: JPipelineArrow(std::move(name),
true,
false,
false,
input_queue,
output_queue,
nullptr) {}
JEventMapArrow::JEventMapArrow(std::string name)
: JPipelineArrow(std::move(name), true, false, false) {}

void JEventMapArrow::add_source(JEventSource* source) {
m_sources.push_back(source);
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/Topology/JEventMapArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class JEventMapArrow : public JPipelineArrow<JEventMapArrow, Event> {
std::vector<JEventProcessor*> m_procs;

public:
JEventMapArrow(std::string name, EventQueue *input_queue, EventQueue *output_queue);
JEventMapArrow(std::string name);

void add_source(JEventSource* source);
void add_unfolder(JEventUnfolder* unfolder);
Expand Down
13 changes: 2 additions & 11 deletions src/libraries/JANA/Topology/JEventProcessorArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,8 @@
#include <JANA/JEventSource.h>


JEventProcessorArrow::JEventProcessorArrow(std::string name,
EventQueue *input_queue,
EventQueue *output_queue,
JEventPool *pool)
: JPipelineArrow(std::move(name),
true,
false,
true,
input_queue,
output_queue,
pool) {}
JEventProcessorArrow::JEventProcessorArrow(std::string name)
: JPipelineArrow(std::move(name), true, false, true) {}

void JEventProcessorArrow::add_processor(JEventProcessor* processor) {
m_processors.push_back(processor);
Expand Down
8 changes: 1 addition & 7 deletions src/libraries/JANA/Topology/JEventProcessorArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,9 @@ class JEventProcessorArrow : public JPipelineArrow<JEventProcessorArrow, Event>
std::vector<JEventProcessor*> m_processors;

public:
JEventProcessorArrow(std::string name,
EventQueue *input_queue,
EventQueue *output_queue,
JEventPool *pool);

JEventProcessorArrow(std::string name);
void add_processor(JEventProcessor* processor);

void process(Event* event, bool& success, JArrowMetrics::Status& status);

void initialize() final;
void finalize() final;
};
Expand Down
7 changes: 2 additions & 5 deletions src/libraries/JANA/Topology/JEventSourceArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@


JEventSourceArrow::JEventSourceArrow(std::string name,
std::vector<JEventSource*> sources,
EventQueue* output_queue,
JEventPool* pool
)
: JPipelineArrow(name, false, true, false, nullptr, output_queue, pool), m_sources(sources) {
std::vector<JEventSource*> sources)
: JPipelineArrow(name, false, true, false), m_sources(sources) {
}


Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/Topology/JEventSourceArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class JEventSourceArrow : public JPipelineArrow<JEventSourceArrow, Event> {
size_t m_current_source = 0;

public:
JEventSourceArrow(std::string name, std::vector<JEventSource*> sources, EventQueue* output_queue, JEventPool* pool);
JEventSourceArrow(std::string name, std::vector<JEventSource*> sources);
void initialize() final;
void finalize() final;

Expand Down
13 changes: 2 additions & 11 deletions src/libraries/JANA/Topology/JEventTapArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,8 @@
#include <JANA/JEvent.h>


JEventTapArrow::JEventTapArrow(std::string name,
EventQueue *input_queue,
EventQueue *output_queue,
JEventPool *pool)
: JPipelineArrow(std::move(name),
false,
false,
false,
input_queue,
output_queue,
pool) {}
JEventTapArrow::JEventTapArrow(std::string name)
: JPipelineArrow(std::move(name), false, false, false) {}

void JEventTapArrow::add_processor(JEventProcessor* proc) {
m_procs.push_back(proc);
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/Topology/JEventTapArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class JEventTapArrow : public JPipelineArrow<JEventTapArrow, Event> {
std::vector<JEventProcessor*> m_procs;

public:
JEventTapArrow(std::string name, EventQueue *input_queue, EventQueue *output_queue, JEventPool *pool);
JEventTapArrow(std::string name);

void add_processor(JEventProcessor* proc);
void process(Event* event, bool& success, JArrowMetrics::Status& status);
Expand Down
32 changes: 13 additions & 19 deletions src/libraries/JANA/Topology/JPipelineArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,21 @@ class JPipelineArrow : public JArrow {
JPipelineArrow(std::string name,
bool is_parallel,
bool is_source,
bool is_sink,
JMailbox<MessageT*>* input_queue,
JMailbox<MessageT*>* output_queue,
JPool<MessageT>* pool
)
bool is_sink)
: JArrow(std::move(name), is_parallel, is_source, is_sink) {
}

if (input_queue == nullptr) {
assert(pool != nullptr);
m_input.set_pool(pool);
}
else {
m_input.set_queue(input_queue);
}
if (output_queue == nullptr) {
assert(pool != nullptr);
m_output.set_pool(pool);
}
else {
m_output.set_queue(output_queue);
}
void set_input(JMailbox<MessageT*>* queue) {
m_input.set_queue(queue);
}
void set_input(JPool<MessageT>* pool) {
m_input.set_pool(pool);
}
void set_output(JMailbox<MessageT*>* queue) {
m_output.set_queue(queue);
}
void set_output(JPool<MessageT>* pool) {
m_output.set_pool(pool);
}

void execute(JArrowMetrics& result, size_t location_id) final {
Expand Down
1 change: 1 addition & 0 deletions src/libraries/JANA/Topology/JPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <JANA/Utils/JCpuInfo.h>
#include <JANA/JLogger.h>
#include <mutex>
#include <vector>


class JPoolBase {
Expand Down
24 changes: 18 additions & 6 deletions src/libraries/JANA/Topology/JTopologyBuilder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ void JTopologyBuilder::attach_lower_level(JEventLevel current_level, JUnfoldArro
auto q2 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing);
queues.push_back(q2);

auto* proc_arrow = new JEventProcessorArrow(ss.str()+"Tap", q1, q2, nullptr);
auto* proc_arrow = new JEventProcessorArrow(ss.str()+"Tap");
proc_arrow->set_input(q1);
proc_arrow->set_output(q2);
arrows.push_back(proc_arrow);
proc_arrow->set_chunksize(m_event_processor_chunksize);
proc_arrow->set_logger(m_arrow_logger);
Expand Down Expand Up @@ -301,11 +303,15 @@ void JTopologyBuilder::attach_top_level(JEventLevel current_level) {
auto queue = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing);
queues.push_back(queue);

auto* src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level, queue, pool_at_level);
auto* src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level);
src_arrow->set_input(pool_at_level);
src_arrow->set_output(queue);
arrows.push_back(src_arrow);
src_arrow->set_chunksize(m_event_source_chunksize);

auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap", queue, nullptr, pool_at_level);
auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap");
proc_arrow->set_input(queue);
proc_arrow->set_output(pool_at_level);
arrows.push_back(proc_arrow);
proc_arrow->set_chunksize(m_event_processor_chunksize);

Expand All @@ -325,11 +331,15 @@ void JTopologyBuilder::attach_top_level(JEventLevel current_level) {
queues.push_back(q1);
queues.push_back(q2);

auto *src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level, q1, pool_at_level);
auto *src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level);
src_arrow->set_input(pool_at_level);
src_arrow->set_output(q1);
arrows.push_back(src_arrow);
src_arrow->set_chunksize(m_event_source_chunksize);

auto *map_arrow = new JEventMapArrow(level_str+"Map", q1, q2);;
auto *map_arrow = new JEventMapArrow(level_str+"Map");
map_arrow->set_input(q1);
map_arrow->set_output(q2);
arrows.push_back(map_arrow);
map_arrow->set_chunksize(m_event_source_chunksize);
src_arrow->attach(map_arrow);
Expand Down Expand Up @@ -357,7 +367,9 @@ void JTopologyBuilder::attach_top_level(JEventLevel current_level) {
auto q3 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing);
queues.push_back(q3);

auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap", q3, nullptr, pool_at_level);
auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap");
proc_arrow->set_input(q3);
proc_arrow->set_output(pool_at_level);
arrows.push_back(proc_arrow);
proc_arrow->set_chunksize(m_event_processor_chunksize);

Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/Topology/JTopologyBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <JANA/Services/JParameterManager.h>
#include <JANA/Services/JComponentManager.h>
#include <JANA/Services/JLoggingService.h>
#include <JANA/Utils/JEventPool.h>


class JParameterManager;
Expand All @@ -23,7 +24,6 @@ class JPoolBase;
class JQueue;
class JFoldArrow;
class JUnfoldArrow;
class JEventPool;

class JTopologyBuilder : public JService {
public:
Expand Down
15 changes: 9 additions & 6 deletions src/programs/unit_tests/Topology/SubeventTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,22 @@ TEST_CASE("Basic subevent arrow functionality") {

auto topology = app.GetService<JTopologyBuilder>();
topology->set_configure_fn([&](JTopologyBuilder& topology) {
auto source_arrow = new JEventSourceArrow("simpleSource",
{new SimpleSource},
&events_in,
topology.event_pool);
auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out,
nullptr, topology.event_pool);

auto source_arrow = new JEventSourceArrow("simpleSource", {new SimpleSource});
source_arrow->set_input(topology.event_pool);
source_arrow->set_output(&events_in);

auto proc_arrow = new JEventProcessorArrow("simpleProcessor");
proc_arrow->set_input(&events_out);
proc_arrow->set_output(topology.event_pool);
proc_arrow->add_processor(new SimpleProcessor);

topology.arrows.push_back(source_arrow);
topology.arrows.push_back(split_arrow);
topology.arrows.push_back(subprocess_arrow);
topology.arrows.push_back(merge_arrow);
topology.arrows.push_back(proc_arrow);

source_arrow->attach(split_arrow);
split_arrow->attach(subprocess_arrow);
subprocess_arrow->attach(merge_arrow);
Expand Down
18 changes: 12 additions & 6 deletions src/programs/unit_tests/Topology/TestTopologyComponents.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@

#pragma once

#include <iostream>

#include <JANA/Services/JLoggingService.h>
#include <JANA/Topology/JPipelineArrow.h>
#include <thread>
#include "MapArrow.h"


Expand All @@ -20,7 +17,10 @@ struct RandIntSource : public JPipelineArrow<RandIntSource, int> {
JLogger logger;

RandIntSource(std::string name, JPool<int>* pool, JMailbox<int*>* output_queue)
: JPipelineArrow<RandIntSource, int>(name, false, true, false, nullptr, output_queue, pool) {}
: JPipelineArrow<RandIntSource, int>(name, false, true, false) {
this->set_input(pool);
this->set_output(output_queue);
}

void process(int* item, bool& success, JArrowMetrics::Status& status) {

Expand Down Expand Up @@ -59,7 +59,10 @@ struct MultByTwoProcessor : public ParallelProcessor<int*, double*> {
struct SubOneProcessor : public JPipelineArrow<SubOneProcessor, double> {

SubOneProcessor(std::string name, JMailbox<double*>* input_queue, JMailbox<double*>* output_queue)
: JPipelineArrow<SubOneProcessor, double>(name, true, false, false, input_queue, output_queue, nullptr) {}
: JPipelineArrow<SubOneProcessor, double>(name, true, false, false) {
this->set_input(input_queue);
this->set_output(output_queue);
}

void process(double* item, bool&, JArrowMetrics::Status&) {
*item -= 1;
Expand All @@ -73,7 +76,10 @@ struct SumSink : public JPipelineArrow<SumSink<T>, T> {
T sum = 0;

SumSink(std::string name, JMailbox<T*>* input_queue, JPool<T>* pool)
: JPipelineArrow<SumSink<T>,T>(name, false, false, true, input_queue, nullptr, pool) {}
: JPipelineArrow<SumSink<T>,T>(name, false, false, true) {
this->set_input(input_queue);
this->set_output(pool);
}

void process(T* item, bool&, JArrowMetrics::Status&) {
sum += *item;
Expand Down

0 comments on commit d1d5162

Please sign in to comment.