Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatize the reuse of the CUDA stream of an input product #305

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion CUDADataFormats/Common/interface/CUDAProductBase.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CUDADataFormats_Common_CUDAProductBase_h
#define CUDADataFormats_Common_CUDAProductBase_h

#include <atomic>
#include <memory>

#include <cuda/api_wrappers.h>
Expand All @@ -13,14 +14,27 @@ class CUDAProductBase {
public:
CUDAProductBase() = default; // Needed only for ROOT dictionary generation

CUDAProductBase(CUDAProductBase&& other):
stream_{std::move(other.stream_)},
event_{std::move(other.event_)},
mayReuseStream_{other.mayReuseStream_.load()},
device_{other.device_}
{}
CUDAProductBase& operator=(CUDAProductBase&& other) {
stream_ = std::move(other.stream_);
event_ = std::move(other.event_);
mayReuseStream_ = other.mayReuseStream_.load();
device_ = other.device_;
return *this;
}

bool isValid() const { return stream_.get() != nullptr; }
bool isAvailable() const;

int device() const { return device_; }

const cuda::stream_t<>& stream() const { return *stream_; }
cuda::stream_t<>& stream() { return *stream_; }
const std::shared_ptr<cuda::stream_t<>>& streamPtr() const { return stream_; }

const cuda::event_t *event() const { return event_.get(); }
cuda::event_t *event() { return event_.get(); }
Expand All @@ -29,12 +43,31 @@ class CUDAProductBase {
explicit CUDAProductBase(int device, std::shared_ptr<cuda::stream_t<>> stream, std::shared_ptr<cuda::event_t> event);

private:
friend class CUDAScopedContext;

// Intended to be used only from CUDAScopedContext
const std::shared_ptr<cuda::stream_t<>>& streamPtr() const { return stream_; }

bool mayReuseStream() const {
bool expected = true;
bool changed = mayReuseStream_.compare_exchange_strong(expected, false);
// If the current thread is the one flipping the flag, it may
// reuse the stream.
return changed;
}

// The cuda::stream_t is really shared among edm::Event products, so
// using shared_ptr also here
std::shared_ptr<cuda::stream_t<>> stream_; //!
// shared_ptr because of caching in CUDAService
std::shared_ptr<cuda::event_t> event_; //!

// This flag tells whether the CUDA stream may be reused by a
// consumer or not. The goal is to have a "chain" of modules to
// queue their work to the same stream.
mutable std::atomic<bool> mayReuseStream_ = true; //!

// The CUDA device associated with this product
int device_ = -1; //!
};

Expand Down
28 changes: 17 additions & 11 deletions HeterogeneousCore/CUDACore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,9 @@ void ProducerInputCUDA::acquire(edm::Event const& iEvent, edm::EventSetup& iSetu
CUDAProduct<InputData> const& inputDataWrapped = iEvent.get(inputToken_);

// Set the current device to the same that was used to produce
// InputData, and also use the same CUDA stream
// InputData, and possibly use the same CUDA stream
CUDAScopedContext ctx{inputDataWrapped, std::move(waitingTaskHolder)};

// Alternatively a new CUDA stream can be created here. This is for
// a case where there are two (or more) consumers of
// CUDAProduct<InputData> whose work is independent and thus can be run
// in parallel.
CUDAScopedContext ctx{iEvent.streamID(), std::move(waitingTaskHolder);

// Grab the real input data. Checks that the input data is on the
// current device. If the input data was produced in a different CUDA
// stream than the CUDAScopedContext holds, create an inter-stream
Expand Down Expand Up @@ -240,6 +234,12 @@ void ProducerInputCUDA::produce(edm::Event& iEvent, edm::EventSetup& iSetup) {
}
```

See [further below](#setting-the-current-device) for the conditions
when the `CUDAScopedContext` constructor reuses the CUDA stream. Note
that the `CUDAScopedContext` constructor taking `edm::StreamID` is
allowed, it will just always create a new CUDA stream.


### Producer with CUDA input and output (with ExternalWork)

```cpp
Expand Down Expand Up @@ -319,8 +319,8 @@ void ProducerInputOutputCUDA::produce(edm::StreamID streamID, edm::Event& iEvent
CUDAProduct<InputData> const& inputDataWrapped = iEvent.get(inputToken_);

// Set the current device to the same that was used to produce
// InputData, and also use the same CUDA stream
CUDAScopedContext ctx{streamID};
// InputData, and possibly use the same CUDA stream
CUDAScopedContext ctx{inputDataWrapped};

// Grab the real input data. Checks that the input data is on the
// current device. If the input data was produced in a different CUDA
Expand Down Expand Up @@ -368,7 +368,7 @@ void AnalyzerInputCUDA::analyze(edm::Event const& iEvent, edm::EventSetup& iSetu
CUDAProduct<InputData> const& inputDataWrapped = iEvent.get(inputToken_);

// Set the current device to the same that was used to produce
// InputData, and also use the same CUDA stream
// InputData, and possibly use the same CUDA stream
CUDAScopedContext ctx{inputDataWrapped};

// Alternatively a new CUDA stream can be created here. This is for
Expand Down Expand Up @@ -540,7 +540,13 @@ CUDAScopedContext ctx{cclus};
`CUDAScopedContext` works in the RAII way and does the following
* Sets the current device for the current scope
- If constructed from the `edm::StreamID`, chooses the device and creates a new CUDA stream
- If constructed from the `CUDAProduct<T>`, uses the same device and CUDA stream as was used to produce the `CUDAProduct<T>`
- If constructed from the `CUDAProduct<T>`, uses the same device and possibly the same CUDA stream as was used to produce the `CUDAProduct<T>`
* The CUDA stream is reused if this producer is the first consumer
of the `CUDAProduct<T>`, otherwise a new CUDA stream is created.
This approach is simple compromise to automatically express the work of
parallel producers in different CUDA streams, and at the same
time allow a chain of producers to queue their work to the same
CUDA stream.
* Gives access to the CUDA stream the algorithm should use to queue asynchronous work
* Calls `edm::WaitingTaskWithArenaHolder::doneWaiting()` when necessary
* [Synchronizes between CUDA streams if necessary](#synchronizing-between-cuda-streams)
Expand Down
10 changes: 2 additions & 8 deletions HeterogeneousCore/CUDACore/interface/CUDAScopedContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,15 @@ class CUDAScopedContext {
stream_(std::move(token.streamPtr()))
{}

template<typename T>
explicit CUDAScopedContext(const CUDAProduct<T>& data):
currentDevice_(data.device()),
setDeviceForThisScope_(currentDevice_),
stream_(data.streamPtr())
{}
explicit CUDAScopedContext(const CUDAProductBase& data);

explicit CUDAScopedContext(edm::StreamID streamID, edm::WaitingTaskWithArenaHolder waitingTaskHolder):
CUDAScopedContext(streamID)
{
waitingTaskHolder_ = std::move(waitingTaskHolder);
}

template <typename T>
explicit CUDAScopedContext(const CUDAProduct<T>& data, edm::WaitingTaskWithArenaHolder waitingTaskHolder):
explicit CUDAScopedContext(const CUDAProductBase& data, edm::WaitingTaskWithArenaHolder waitingTaskHolder):
CUDAScopedContext(data)
{
waitingTaskHolder_ = std::move(waitingTaskHolder);
Expand Down
14 changes: 14 additions & 0 deletions HeterogeneousCore/CUDACore/src/CUDAScopedContext.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ CUDAScopedContext::CUDAScopedContext(edm::StreamID streamID):
stream_ = cs->getCUDAStream();
}

CUDAScopedContext::CUDAScopedContext(const CUDAProductBase& data):
currentDevice_(data.device()),
setDeviceForThisScope_(currentDevice_)
{
if(data.mayReuseStream()) {
stream_ = data.streamPtr();
}
else {
edm::Service<CUDAService> cs;
stream_ = cs->getCUDAStream();
}
}


CUDAScopedContext::CUDAScopedContext(int device, std::unique_ptr<cuda::stream_t<>> stream, std::unique_ptr<cuda::event_t> event):
currentDevice_(device),
setDeviceForThisScope_(device),
Expand Down
5 changes: 5 additions & 0 deletions HeterogeneousCore/CUDACore/test/BuildFile.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
<bin file="test_*.cc test_*.cu" name="testHeterogeneousCoreCUDACore">
<use name="CUDADataFormats/Common"/>
<use name="FWCore/TestProcessor"/>
<use name="FWCore/ParameterSet"/>
<use name="FWCore/PluginManager"/>
<use name="FWCore/ServiceRegistry"/>
<use name="HeterogeneousCore/CUDACore"/>
<use name="HeterogeneousCore/CUDAServices"/>
<use name="catch2"/>
<use name="cuda"/>
</bin>
Expand Down
11 changes: 8 additions & 3 deletions HeterogeneousCore/CUDACore/test/test_CUDAScopedContext.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#include "catch.hpp"

#include "CUDADataFormats/Common/interface/CUDAProduct.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "HeterogeneousCore/CUDACore/interface/CUDAScopedContext.h"
#include "HeterogeneousCore/CUDAUtilities/interface/cudaCheck.h"
#include "HeterogeneousCore/CUDAUtilities/interface/exitSansCUDADevices.h"
Expand Down Expand Up @@ -59,6 +61,11 @@ TEST_CASE("Use of CUDAScopedContext", "[CUDACore]") {
CUDAScopedContext ctx2{data};
REQUIRE(cuda::device::current::get().id() == data.device());
REQUIRE(ctx2.stream().id() == data.stream().id());

// Second use of a product should lead to new stream
CUDAScopedContext ctx3{data};
REQUIRE(cuda::device::current::get().id() == data.device());
REQUIRE(ctx3.stream().id() != data.stream().id());
}

SECTION("Storing state as CUDAContextToken") {
Expand Down Expand Up @@ -118,9 +125,7 @@ TEST_CASE("Use of CUDAScopedContext", "[CUDACore]") {
}
}

// Destroy and clean up all resources so that the next test can
// assume to start from a clean state.
cudaCheck(cudaSetDevice(defaultDevice));
cudaCheck(cudaDeviceSynchronize());
cudaDeviceReset();
// Note: CUDA resources are cleaned up by CUDAService destructor
}
27 changes: 27 additions & 0 deletions HeterogeneousCore/CUDACore/test/test_main.cc
Original file line number Diff line number Diff line change
@@ -1,2 +1,29 @@
#define CATCH_CONFIG_MAIN
#include "catch.hpp"

#include "FWCore/PluginManager/interface/standard.h"
#include "FWCore/PluginManager/interface/PluginManager.h"
#include "FWCore/ServiceRegistry/interface/ServiceRegistry.h"

class ServiceRegistryListener: public Catch::TestEventListenerBase {
public:
using Catch::TestEventListenerBase::TestEventListenerBase; // inherit constructor

void testRunStarting(Catch::TestRunInfo const& testRunInfo) override {
edmplugin::PluginManager::configure(edmplugin::standard::config());

const std::string config{
R"_(import FWCore.ParameterSet.Config as cms
process = cms.Process('Test')
process.CUDAService = cms.Service('CUDAService')
)_"
};

edm::ServiceToken tempToken = edm::ServiceRegistry::createServicesFromConfig(config);
operate_.reset(new edm::ServiceRegistry::Operate(tempToken));
}

private:
std::unique_ptr<edm::ServiceRegistry::Operate> operate_;
};
CATCH_REGISTER_LISTENER(ServiceRegistryListener);