Skip to content

Commit

Permalink
Merge pull request #378 from JeffersonLab/nbrei_optimization
Browse files Browse the repository at this point in the history
Optimization: Remove constraints on arrow firing
  • Loading branch information
nathanwbrei authored Oct 29, 2024
2 parents b08acbb + 79e48a9 commit a347e9b
Show file tree
Hide file tree
Showing 40 changed files with 981 additions and 1,300 deletions.
52 changes: 25 additions & 27 deletions docs/howto/other-howtos.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,46 +91,44 @@ The `JTest` plugin lets you test JANA's performance for different workloads. It

| Name | Type | Default | Description |
|:-----|:-----|:------------|:--------|
jtest:parser:cputime_ms | int | 0 | Time spent during parsing
jtest:parser:cputime_spread | int | 0.25 | Spread of time spent during parsing
jtest:parser:bytes | int | 2000000 | Bytes written during parsing
jtest:parser:bytes_spread | double | 0.25 | Spread of bytes written during parsing
jtest:disentangler:cputime_ms | int | 20 | Time spent during disentangling
jtest:disentangler:cputime_spread | double | 0.25 | Spread of time spent during disentangling
jtest:disentangler:bytes | int | 500000 | Bytes written during disentangling
jtest:disentangler:bytes_spread | double | 0.25 | Spread of bytes written during disentangling
jtest:tracker:cputime_ms | int | 200 | Time spent during tracking
jtest:tracker:cputime_spread | double | 0.25 | Spread of time spent during tracking
jtest:tracker:bytes | int | 1000 | Bytes written during tracking
jtest:tracker:bytes_spread | double | 0.25 | Spread of bytes written during tracking
jtest:plotter:cputime_ms | int | 0 | Time spent during plotting
jtest:plotter:cputime_spread | double | 0.25 | Spread of time spent during plotting
jtest:plotter:bytes | int | 1000 | Bytes written during plotting
jtest:plotter:bytes_spread | double | 0.25 | Spread of bytes written during plotting
| jtest:parser:cputime_ms | int | 0 | Time spent during parsing |
| jtest:parser:cputime_spread | int | 0.25 | Spread of time spent during parsing |
| jtest:parser:bytes | int | 2000000 | Bytes written during parsing |
| jtest:parser:bytes_spread | double | 0.25 | Spread of bytes written during parsing |
| jtest:disentangler:cputime_ms | int | 20 | Time spent during disentangling |
| jtest:disentangler:cputime_spread | double | 0.25 | Spread of time spent during disentangling |
| jtest:disentangler:bytes | int | 500000 | Bytes written during disentangling |
| jtest:disentangler:bytes_spread | double | 0.25 | Spread of bytes written during disentangling |
| jtest:tracker:cputime_ms | int | 200 | Time spent during tracking |
| jtest:tracker:cputime_spread | double | 0.25 | Spread of time spent during tracking |
| jtest:tracker:bytes | int | 1000 | Bytes written during tracking |
| jtest:tracker:bytes_spread | double | 0.25 | Spread of bytes written during tracking |
| jtest:plotter:cputime_ms | int | 0 | Time spent during plotting |
| jtest:plotter:cputime_spread | double | 0.25 | Spread of time spent during plotting |
| jtest:plotter:bytes | int | 1000 | Bytes written during plotting |
| jtest:plotter:bytes_spread | double | 0.25 | Spread of bytes written during plotting |



The following parameters are used for benchmarking:

| Name | Type | Default | Description |
|:-----|:-----|:------------|:--------|
benchmark:nsamples | int | 15 | Number of measurements made for each thread count
benchmark:minthreads | int | 1 | Minimum thread count
benchmark:maxthreads | int | ncores | Maximum thread count
benchmark:threadstep | int | 1 | Thread count increment
benchmark:resultsdir | string | JANA_Test_Results | Directory name for benchmark test results
| benchmark:nsamples | int | 15 | Number of measurements made for each thread count |
| benchmark:minthreads | int | 1 | Minimum thread count |
| benchmark:maxthreads | int | ncores | Maximum thread count |
| benchmark:threadstep | int | 1 | Thread count increment |
| benchmark:resultsdir | string | JANA_Test_Results | Directory name for benchmark test results |


The following parameters are more advanced, but may come in handy when doing performance tuning:

| Name | Type | Default | Description |
|:-----|:-----|:------------|:--------|
jana:event_pool_size | int | nthreads | The number of events which may be in-flight at once
jana:limit_total_events_in_flight | bool | 1 | Whether the number of in-flight events should be limited
jana:affinity | int | 0 | Thread pinning strategy. 0: None. 1: Minimize number of memory localities. 2: Minimize number of hyperthreads.
jana:locality | int | 0 | Memory locality strategy. 0: Global. 1: Socket-local. 2: Numa-domain-local. 3. Core-local. 4. Cpu-local
jana:enable_stealing | bool | 0 | Allow threads to pick up work from a different memory location if their local mailbox is empty.
jana:event_queue_threshold | int | 80 | Mailbox buffer size
| jana:max_inflight_events | int | nthreads | The number of events which may be in-flight at once. Should be at least `nthreads`, more gives better load balancing. |
| jana:affinity | int | 0 | Thread pinning strategy. 0: None. 1: Minimize number of memory localities. 2: Minimize number of hyperthreads. |
| jana:locality | int | 0 | Memory locality strategy. 0: Global. 1: Socket-local. 2: Numa-domain-local. 3. Core-local. 4. Cpu-local |
| jana:enable_stealing | bool | 0 | Allow threads to pick up work from a different memory location if their local mailbox is empty. |


Creating code skeletons
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ void InitPlugin(JApplication* app) {

app->SetParameterValue("nthreads", 4);
app->SetParameterValue("jana:extended_report", true);
app->SetParameterValue("jana:limit_total_events_in_flight", true);
app->SetParameterValue("jana:event_pool_size", 16);
app->SetParameterValue("jana:max_inflight_events", 16);

// TODO: Consider making streamDet:sub_socket be the 'source_name', and use JESG to switch between JSES and DecodeDASSource
// TODO: Improve parametermanager interface
Expand Down
12 changes: 6 additions & 6 deletions src/examples/SubeventExample/SubeventExample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ int main() {
auto topology = app.GetService<JTopologyBuilder>();
topology->set_configure_fn([&](JTopologyBuilder& builder) {

JMailbox<std::shared_ptr<JEvent>*> events_in;
JMailbox<std::shared_ptr<JEvent>*> events_out;
JMailbox<JEvent*> events_in;
JMailbox<JEvent*> events_out;
JMailbox<SubeventWrapper<MyInput>> subevents_in;
JMailbox<SubeventWrapper<MyOutput>> subevents_out;

Expand All @@ -108,12 +108,12 @@ int main() {
auto merge_arrow = new JMergeArrow<MyInput, MyOutput>("merge", &processor, &subevents_out, &events_out);

auto source_arrow = new JEventSourceArrow("simpleSource", {source});
source_arrow->set_input(topology->event_pool);
source_arrow->set_output(&events_in);
source_arrow->attach(topology->event_pool, 0);
source_arrow->attach(&events_in, 1);

auto proc_arrow = new JEventMapArrow("simpleProcessor");
proc_arrow->set_input(&events_out);
proc_arrow->set_output(topology->event_pool);
proc_arrow->attach(&events_out, 0);
proc_arrow->attach(topology->event_pool, 1);
proc_arrow->add_processor(new SimpleProcessor);

builder.arrows.push_back(source_arrow);
Expand Down
1 change: 1 addition & 0 deletions src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ set(JANA2_SOURCES
Engine/JPerfMetrics.cc
Engine/JPerfSummary.cc

Topology/JArrow.cc
Topology/JEventSourceArrow.cc
Topology/JEventMapArrow.cc
Topology/JEventTapArrow.cc
Expand Down
6 changes: 1 addition & 5 deletions src/libraries/JANA/Engine/JArrowProcessingController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,10 @@ bool JArrowProcessingController::is_timed_out() {
auto metrics = measure_performance();

int timeout_s;
if (metrics->total_uptime_s < (m_warmup_timeout_s * m_topology->m_pool_capacity * 1.0) / metrics->thread_count) {
if (metrics->total_uptime_s < (m_warmup_timeout_s * m_topology->m_max_inflight_events * 1.0) / metrics->thread_count) {
// We are at the beginning and not all events have necessarily had a chance to warm up
timeout_s = m_warmup_timeout_s;
}
else if (!m_topology->m_limit_total_events_in_flight) {
// New events are constantly emitted, each of which may contain jfactorysets which need to be warmed up
timeout_s = m_warmup_timeout_s;
}
else {
timeout_s = m_timeout_s;
}
Expand Down
14 changes: 7 additions & 7 deletions src/libraries/JANA/JEvent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ bool JEvent::HasParent(JEventLevel level) const {

const JEvent& JEvent::GetParent(JEventLevel level) const {
for (const auto& pair : mParents) {
if (pair.first == level) return *(*(pair.second));
if (pair.first == level) return *pair.second;
}
throw JException("Unable to find parent at level %s",
toString(level).c_str());
}

void JEvent::SetParent(std::shared_ptr<JEvent>* parent) {
JEventLevel level = parent->get()->GetLevel();
void JEvent::SetParent(JEvent* parent) {
JEventLevel level = parent->GetLevel();
for (const auto& pair : mParents) {
if (pair.first == level) throw JException("Event already has a parent at level %s",
toString(parent->get()->GetLevel()).c_str());
toString(parent->GetLevel()).c_str());
}
mParents.push_back({level, parent});
parent->get()->mReferenceCount.fetch_add(1);
parent->mReferenceCount.fetch_add(1);
}

std::shared_ptr<JEvent>* JEvent::ReleaseParent(JEventLevel level) {
JEvent* JEvent::ReleaseParent(JEventLevel level) {
if (mParents.size() == 0) {
throw JException("ReleaseParent failed: child has no parents!");
}
Expand All @@ -88,7 +88,7 @@ std::shared_ptr<JEvent>* JEvent::ReleaseParent(JEventLevel level) {
toString(level).c_str(), toString(pair.first).c_str());
}
mParents.pop_back();
auto remaining_refs = pair.second->get()->mReferenceCount.fetch_sub(1);
auto remaining_refs = pair.second->mReferenceCount.fetch_sub(1);
if (remaining_refs < 1) { // Remember, this was fetched _before_ the last subtraction
throw JException("Parent refcount has gone negative!");
}
Expand Down
6 changes: 3 additions & 3 deletions src/libraries/JANA/JEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class JEvent : public std::enable_shared_from_this<JEvent> {
bool mIsBarrierEvent = false;

// Hierarchical event memory management
std::vector<std::pair<JEventLevel, std::shared_ptr<JEvent>*>> mParents;
std::vector<std::pair<JEventLevel, JEvent*>> mParents;
std::atomic_int mReferenceCount {1};
int64_t mEventIndex = -1;

Expand Down Expand Up @@ -89,8 +89,8 @@ class JEvent : public std::enable_shared_from_this<JEvent> {

bool HasParent(JEventLevel level) const;
const JEvent& GetParent(JEventLevel level) const;
void SetParent(std::shared_ptr<JEvent>* parent);
std::shared_ptr<JEvent>* ReleaseParent(JEventLevel level);
void SetParent(JEvent* parent);
JEvent* ReleaseParent(JEventLevel level);
void Release();

// Lifecycle
Expand Down
164 changes: 164 additions & 0 deletions src/libraries/JANA/JEventFolder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once

#include <JANA/Components/JComponent.h>
#include <JANA/Components/JHasInputs.h>
#include <JANA/Components/JHasOutputs.h>
#include <JANA/Components/JHasRunCallbacks.h>
#include <JANA/JEvent.h>

class JApplication;
class JEventFolder : public jana::components::JComponent,
public jana::components::JHasRunCallbacks,
public jana::components::JHasInputs,
public jana::components::JHasOutputs {

private:
int32_t m_last_run_number = -1;
JEventLevel m_child_level;
bool m_call_preprocess_upstream = true;


public:

JEventFolder() = default;
virtual ~JEventFolder() {};

virtual void Init() {};

virtual void Preprocess(const JEvent& /*parent*/) const {};

virtual void Fold(const JEvent& /*child*/, JEvent& /*parent*/, int /*item_nr*/) {
throw JException("Not implemented yet!");
};

virtual void Finish() {};


// Configuration

void SetParentLevel(JEventLevel level) { m_level = level; }

void SetChildLevel(JEventLevel level) { m_child_level = level; }

void SetCallPreprocessUpstream(bool call_upstream) { m_call_preprocess_upstream = call_upstream; }

JEventLevel GetChildLevel() { return m_child_level; }


public:
// Backend

void DoInit() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_status != Status::Uninitialized) {
throw JException("JEventFolder: Attempting to initialize twice or from an invalid state");
}
// TODO: Obtain overrides of collection names from param manager
for (auto* parameter : m_parameters) {
parameter->Configure(*(m_app->GetJParameterManager()), m_prefix);
}
for (auto* service : m_services) {
service->Fetch(m_app);
}
CallWithJExceptionWrapper("JEventFolder::Init", [&](){Init();});
m_status = Status::Initialized;
}

void DoPreprocess(const JEvent& child) {
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_status != Status::Initialized) {
throw JException("JEventFolder: Component needs to be initialized and not finalized before Fold can be called");
// TODO: Consider calling Initialize(with_lock=false) like we do elsewhere
}
}
for (auto* input : m_inputs) {
input->PrefetchCollection(child);
}
if (m_callback_style != CallbackStyle::DeclarativeMode) {
CallWithJExceptionWrapper("JEventFolder::Preprocess", [&](){
Preprocess(child);
});
}
}

void DoFold(const JEvent& child, JEvent& parent) {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_status != Status::Initialized) {
throw JException("Component needs to be initialized and not finalized before Fold() can be called");
}
if (!m_call_preprocess_upstream) {
CallWithJExceptionWrapper("JEventFolder::Preprocess", [&](){
Preprocess(parent);
});
}
if (m_last_run_number != parent.GetRunNumber()) {
for (auto* resource : m_resources) {
resource->ChangeRun(parent.GetRunNumber(), m_app);
}
if (m_callback_style == CallbackStyle::DeclarativeMode) {
CallWithJExceptionWrapper("JEventFolder::ChangeRun", [&](){
ChangeRun(parent.GetRunNumber());
});
}
else {
CallWithJExceptionWrapper("JEventFolder::ChangeRun", [&](){
ChangeRun(parent);
});
}
m_last_run_number = parent.GetRunNumber();
}
for (auto* input : m_inputs) {
input->GetCollection(parent);
// TODO: This requires that all inputs come from the parent.
// However, eventually we will want to support inputs
// that come from the child.
}
for (auto* output : m_outputs) {
output->Reset();
}
auto child_number = child.GetEventIndex();
CallWithJExceptionWrapper("JEventFolder::Fold", [&](){
Fold(child, parent, child_number);
});

for (auto* output : m_outputs) {
output->InsertCollection(parent);
}
}

void DoFinish() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_status != Status::Finalized) {
CallWithJExceptionWrapper("JEventFolder::Finish", [&](){
Finish();
});
m_status = Status::Finalized;
}
}

void Summarize(JComponentSummary& summary) const override {
auto* us = new JComponentSummary::Component(
"Folder", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName());

for (const auto* input : m_inputs) {
size_t subinput_count = input->names.size();
for (size_t i=0; i<subinput_count; ++i) {
us->AddInput(new JComponentSummary::Collection("", input->names[i], input->type_name, input->levels[i]));
}
}
for (const auto* output : m_outputs) {
size_t suboutput_count = output->collection_names.size();
for (size_t i=0; i<suboutput_count; ++i) {
us->AddOutput(new JComponentSummary::Collection("", output->collection_names[i], output->type_name, GetLevel()));
}
}
summary.Add(us);
}

};


Loading

0 comments on commit a347e9b

Please sign in to comment.