Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encapsulate multi-process communication #1691

Merged
merged 42 commits into from
Aug 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
ed65c00
helper: add more MPI_Datatype values to mpidummy
bradking Aug 22, 2019
b1e41ae
helper: add MPI_IN_PLACE to mpidummy
bradking Aug 22, 2019
541f380
helper: Take CheckMPIReturn hint string by reference
bradking Jul 23, 2019
78311d8
helper: Add placeholder for Comm encapsulation of multi-process commu…
bradking Jun 5, 2019
33280a2
core: Refactor IO to get communicator from ADIOS factory
bradking Jun 6, 2019
a9a46ce
core: Refactor ADIOS factory to make communicator member private
bradking Jun 5, 2019
f7fec28
core: Refactor ADIOS factory to use Comm encapsulation
bradking Jun 21, 2019
9e7cdf6
core: Expose Comm encapsulation outside ADIOS factory
bradking Jun 18, 2019
64f04be
helper: Encapsulate MPI_Comm_free as Comm::Free
bradking Jun 18, 2019
8b8b797
engine: Rename m_MPIComm to m_Comm
bradking Jun 18, 2019
29177b0
engine: Construct with Comm encapsulation
bradking Jun 18, 2019
7ea742d
bp4: Rename m_MPIComm to m_Comm
bradking Jun 18, 2019
ff37bac
bp4: Use Comm encapsulation
bradking Jun 18, 2019
a491675
bp3: Rename m_MPIComm to m_Comm
bradking Jun 18, 2019
be80eb7
bp3: Use Comm encapsulation
bradking Jun 18, 2019
19b0910
helper: Port network functions to Comm encapsulation
bradking Jun 18, 2019
6bd190d
helper: Encapsulate MPI_Comm_split as Comm::Split
bradking Jun 21, 2019
3d0481e
aggregator: Simplify substream assignment logic
bradking Jun 19, 2019
d121f2b
aggregator: Refactor to use Comm encapsulation
bradking Jun 18, 2019
22a4487
bp3: Use Comm encapsulation in Aggregate methods
bradking Jul 16, 2019
420c061
bp4: Use Comm encapsulation in Aggregate methods
bradking Jul 16, 2019
abd26d8
helper: Move Gather functions to Comm encapsulation
bradking Jun 18, 2019
71b826c
helper: Move Allgather functions to Comm encapsulation
bradking Jul 2, 2019
9360cd6
helper: Move ReduceValues function to Comm encapsulation
bradking Jul 2, 2019
08e03c7
helper: Move Broadcast functions to Comm encapsulation
bradking Jul 2, 2019
323125b
aggregator: Name async request types
bradking Jul 22, 2019
5c24ca2
helper: Encapsulate MPI_{Request,Status} as Comm::{Req,Status}
bradking Jul 23, 2019
b1da13e
helper: Add internal Comm method to look up MPI data type
bradking Jul 23, 2019
5c1b6a1
helper: Move async send/recv functions to Comm encapsulation
bradking Jul 23, 2019
b3876c6
helper: Encapsulate MPI_Bcast as Comm::Bcast
bradking Jul 23, 2019
d0a2fb6
toolkit: Port transport and transportman to Comm encapsulation
bradking Jul 23, 2019
c6e6758
helper: Encapsulate MPI_Barrier as Comm::Barrier
bradking Jul 23, 2019
ba93031
helper: Move CheckMPIReturn helper into Comm encapsulation
bradking Jul 23, 2019
af7198e
helper: Drop empty adiosMPIFunctions.h
bradking Jul 23, 2019
b6a34da
helper: Encapsulate MPI_Comm_{rank,size} as Comm::{Rank,Size}
bradking Jul 31, 2019
39696b4
helper: Encapsulate MPI_Reduce as Comm::Reduce
bradking Aug 1, 2019
4b07c8f
helper: Encapsulate MPI_{Send,Recv} as Comm::{Send,Recv}
bradking Aug 5, 2019
b282a11
helper: Encapsulate MPI_Allgather as Comm::Allgather
bradking Aug 5, 2019
d0f520b
helper: Encapsulate MPI_Allreduce as Comm::Allreduce
bradking Aug 5, 2019
3808644
helper: Encapsulate MPI_Scatter as Comm::Scatter
bradking Aug 5, 2019
f5357b4
helper: Encapsulate MPI_Gather as Comm::Gather
bradking Aug 5, 2019
3f711df
helper: Replace implicit Comm conversion to MPI with explicit Comm::A…
bradking Aug 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ add_library(adios2
operator/callback/Signature2.cpp

#helper
helper/adiosComm.h helper/adiosComm.cpp
helper/adiosDynamicBinder.h helper/adiosDynamicBinder.cpp
helper/adiosMath.cpp
helper/adiosMemory.cpp
helper/adiosMPIFunctions.cpp
helper/adiosNetwork.cpp
helper/adiosString.cpp helper/adiosString.tcc
helper/adiosSystem.cpp
Expand Down
19 changes: 4 additions & 15 deletions source/adios2/core/ADIOS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ namespace core

ADIOS::ADIOS(const std::string configFile, MPI_Comm mpiComm,
const bool debugMode, const std::string hostLanguage)
: m_ConfigFile(configFile), m_DebugMode(debugMode), m_HostLanguage(hostLanguage)
: m_ConfigFile(configFile), m_DebugMode(debugMode),
m_HostLanguage(hostLanguage), m_Comm(helper::Comm::Duplicate(mpiComm))
{
SMPI_Comm_dup(mpiComm, &m_MPIComm);

if (!configFile.empty())
{
if (configFile.substr(configFile.size() - 3) == "xml")
Expand Down Expand Up @@ -90,17 +89,7 @@ ADIOS::ADIOS(const bool debugMode, const std::string hostLanguage)
{
}

ADIOS::~ADIOS()
{
// Handle the case where MPI is finalized before the ADIOS destructor is
// called, which happens, e.g., with global / static ADIOS objects
int flag;
MPI_Finalized(&flag);
if (!flag)
{
SMPI_Comm_free(&m_MPIComm);
}
}
ADIOS::~ADIOS() = default;

IO &ADIOS::DeclareIO(const std::string name)
{
Expand Down Expand Up @@ -128,7 +117,7 @@ IO &ADIOS::DeclareIO(const std::string name)
}

auto ioPair = m_IOs.emplace(
name, IO(*this, name, m_MPIComm, false, m_HostLanguage, m_DebugMode));
name, IO(*this, name, false, m_HostLanguage, m_DebugMode));
IO &io = ioPair.first->second;
io.SetDeclared();
return io;
Expand Down
8 changes: 6 additions & 2 deletions source/adios2/core/ADIOS.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "adios2/common/ADIOSMPI.h"
#include "adios2/common/ADIOSTypes.h"
#include "adios2/core/Operator.h"
#include "adios2/helper/adiosComm.h"

namespace adios2
{
Expand All @@ -38,8 +39,8 @@ class ADIOS
/** if true will do more checks, exceptions, warnings, expect slower code */
const bool m_DebugMode = true;

/** Passed from parallel constructor, MPI_Comm is a pointer itself. */
MPI_Comm m_MPIComm;
/** Get the communicator passed to constructor for parallel case. */
helper::Comm const &GetComm() const { return m_Comm; }

/** Changed by language bindings in constructor */
const std::string m_HostLanguage = "C++";
Expand Down Expand Up @@ -184,6 +185,9 @@ class ADIOS
void RemoveAllIOs() noexcept;

private:
/** Communicator given to parallel constructor. */
helper::Comm m_Comm;

/** XML File to be read containing configuration information */
const std::string m_ConfigFile;

Expand Down
9 changes: 4 additions & 5 deletions source/adios2/core/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "Engine.tcc"

#include <stdexcept>
#include <utility>

#include "adios2/core/IO.h"

Expand All @@ -21,9 +22,9 @@ namespace core
{

Engine::Engine(const std::string engineType, IO &io, const std::string &name,
const Mode openMode, MPI_Comm mpiComm)
const Mode openMode, helper::Comm comm)
: m_EngineType(engineType), m_IO(io), m_Name(name), m_OpenMode(openMode),
m_MPIComm(mpiComm), m_DebugMode(io.m_DebugMode)
m_Comm(std::move(comm)), m_DebugMode(io.m_DebugMode)
{
}

Expand Down Expand Up @@ -69,9 +70,7 @@ void Engine::Close(const int transportIndex)

if (transportIndex == -1)
{
helper::CheckMPIReturn(SMPI_Comm_free(&m_MPIComm),
"freeing comm in Engine " + m_Name +
", in call to Close");
m_Comm.Free("freeing comm in Engine " + m_Name + ", in call to Close");
m_IsClosed = true;
}
}
Expand Down
9 changes: 5 additions & 4 deletions source/adios2/core/Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "adios2/core/IO.h"
#include "adios2/core/Variable.h"
#include "adios2/core/VariableCompound.h"
#include "adios2/helper/adiosComm.h"

namespace adios2
{
Expand Down Expand Up @@ -62,10 +63,10 @@ class Engine
* @param io object that generates this Engine
* @param name unique engine name within IO class object
* @param mode open mode from ADIOSTypes.h Mode
* @param mpiComm new communicator passed at Open or from ADIOS class
* @param comm communicator passed at Open or from ADIOS class
*/
Engine(const std::string engineType, IO &io, const std::string &name,
const Mode mode, MPI_Comm mpiComm);
const Mode mode, helper::Comm comm);

virtual ~Engine();

Expand Down Expand Up @@ -446,8 +447,8 @@ class Engine

protected:
/** from ADIOS class passed to Engine created with Open
* if no new communicator is passed */
MPI_Comm m_MPIComm;
* if no communicator is passed */
helper::Comm m_Comm;

/** true: additional exceptions */
const bool m_DebugMode = false;
Expand Down
97 changes: 47 additions & 50 deletions source/adios2/core/IO.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "adios2/engine/skeleton/SkeletonReader.h"
#include "adios2/engine/skeleton/SkeletonWriter.h"

#include "adios2/helper/adiosComm.h"
#include "adios2/helper/adiosFunctions.h" //BuildParametersMap
#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"
#include <adios2sys/SystemTools.hxx> // FileIsDirectory()
Expand Down Expand Up @@ -73,12 +74,10 @@ namespace adios2
namespace core
{

IO::IO(ADIOS &adios, const std::string name, MPI_Comm mpiComm,
const bool inConfigFile, const std::string hostLanguage,
const bool debugMode)
: m_ADIOS(adios), m_Name(name), m_MPIComm(mpiComm),
m_InConfigFile(inConfigFile), m_HostLanguage(hostLanguage),
m_DebugMode(debugMode)
IO::IO(ADIOS &adios, const std::string name, const bool inConfigFile,
const std::string hostLanguage, const bool debugMode)
: m_ADIOS(adios), m_Name(name), m_InConfigFile(inConfigFile),
m_HostLanguage(hostLanguage), m_DebugMode(debugMode)
{
}

Expand Down Expand Up @@ -426,8 +425,7 @@ size_t IO::AddOperation(Operator &op, const Params &parameters) noexcept
return m_Operations.size() - 1;
}

Engine &IO::Open(const std::string &name, const Mode mode,
MPI_Comm mpiComm_orig)
Engine &IO::Open(const std::string &name, const Mode mode, MPI_Comm mpiComm)
{
TAU_SCOPED_TIMER("IO::Open");
auto itEngineFound = m_Engines.find(name);
Expand Down Expand Up @@ -459,8 +457,7 @@ Engine &IO::Open(const std::string &name, const Mode mode,
}
}

MPI_Comm mpiComm;
SMPI_Comm_dup(mpiComm_orig, &mpiComm);
auto comm = helper::Comm::Duplicate(mpiComm);
std::shared_ptr<Engine> engine;
const bool isDefaultEngine = m_EngineType.empty() ? true : false;
std::string engineTypeLC = m_EngineType;
Expand Down Expand Up @@ -495,26 +492,26 @@ Engine &IO::Open(const std::string &name, const Mode mode,
{
if (mode == Mode::Read)
{
engine =
std::make_shared<engine::BP3Reader>(*this, name, mode, mpiComm);
engine = std::make_shared<engine::BP3Reader>(*this, name, mode,
std::move(comm));
}
else
{
engine =
std::make_shared<engine::BP3Writer>(*this, name, mode, mpiComm);
engine = std::make_shared<engine::BP3Writer>(*this, name, mode,
std::move(comm));
}
}
else if (engineTypeLC == "bp4")
{
if (mode == Mode::Read)
{
engine =
std::make_shared<engine::BP4Reader>(*this, name, mode, mpiComm);
engine = std::make_shared<engine::BP4Reader>(*this, name, mode,
std::move(comm));
}
else
{
engine =
std::make_shared<engine::BP4Writer>(*this, name, mode, mpiComm);
engine = std::make_shared<engine::BP4Writer>(*this, name, mode,
std::move(comm));
}
}
else if (engineTypeLC == "hdfmixer")
Expand All @@ -523,10 +520,10 @@ Engine &IO::Open(const std::string &name, const Mode mode,
#if H5_VERSION_GE(1, 11, 0)
if (mode == Mode::Read)
engine = std::make_shared<engine::HDF5ReaderP>(*this, name, mode,
mpiComm);
std::move(comm));
else
engine =
std::make_shared<engine::HDFMixer>(*this, name, mode, mpiComm);
engine = std::make_shared<engine::HDFMixer>(*this, name, mode,
std::move(comm));
#else
throw std::invalid_argument(
"ERROR: update HDF5 >= 1.11 to support VDS.");
Expand All @@ -541,10 +538,10 @@ Engine &IO::Open(const std::string &name, const Mode mode,
#ifdef ADIOS2_HAVE_DATAMAN
if (mode == Mode::Read)
engine = std::make_shared<engine::DataManReader>(*this, name, mode,
mpiComm);
std::move(comm));
else
engine = std::make_shared<engine::DataManWriter>(*this, name, mode,
mpiComm);
std::move(comm));
#else
throw std::invalid_argument(
"ERROR: this version didn't compile with "
Expand All @@ -555,11 +552,11 @@ Engine &IO::Open(const std::string &name, const Mode mode,
{
#ifdef ADIOS2_HAVE_SSC
if (mode == Mode::Read)
engine =
std::make_shared<engine::SscReader>(*this, name, mode, mpiComm);
engine = std::make_shared<engine::SscReader>(*this, name, mode,
std::move(comm));
else
engine =
std::make_shared<engine::SscWriter>(*this, name, mode, mpiComm);
engine = std::make_shared<engine::SscWriter>(*this, name, mode,
std::move(comm));
#else
throw std::invalid_argument("ERROR: this version didn't compile with "
"SSC library, can't use SSC engine\n");
Expand All @@ -570,7 +567,7 @@ Engine &IO::Open(const std::string &name, const Mode mode,
#ifdef ADIOS2_HAVE_TABLE
if (mode == Mode::Write)
engine = std::make_shared<engine::TableWriter>(*this, name, mode,
mpiComm);
std::move(comm));
else
throw std::invalid_argument(
"ERROR: Table engine only supports Write. It uses other "
Expand All @@ -585,11 +582,11 @@ Engine &IO::Open(const std::string &name, const Mode mode,
{
#ifdef ADIOS2_HAVE_SST
if (mode == Mode::Read)
engine =
std::make_shared<engine::SstReader>(*this, name, mode, mpiComm);
engine = std::make_shared<engine::SstReader>(*this, name, mode,
std::move(comm));
else
engine =
std::make_shared<engine::SstWriter>(*this, name, mode, mpiComm);
engine = std::make_shared<engine::SstWriter>(*this, name, mode,
std::move(comm));
#else
throw std::invalid_argument("ERROR: this version didn't compile with "
"Sst library, can't use Sst engine\n");
Expand All @@ -599,11 +596,11 @@ Engine &IO::Open(const std::string &name, const Mode mode,
{
#ifdef ADIOS2_HAVE_DATASPACES
if (mode == Mode::Read)
engine = std::make_shared<engine::DataSpacesReader>(*this, name,
mode, mpiComm);
engine = std::make_shared<engine::DataSpacesReader>(
*this, name, mode, std::move(comm));
else
engine = std::make_shared<engine::DataSpacesWriter>(*this, name,
mode, mpiComm);
engine = std::make_shared<engine::DataSpacesWriter>(
*this, name, mode, std::move(comm));
#else
throw std::invalid_argument(
"ERROR: this version didn't compile with "
Expand All @@ -615,10 +612,10 @@ Engine &IO::Open(const std::string &name, const Mode mode,
#ifdef ADIOS2_HAVE_HDF5
if (mode == Mode::Read)
engine = std::make_shared<engine::HDF5ReaderP>(*this, name, mode,
mpiComm);
std::move(comm));
else
engine = std::make_shared<engine::HDF5WriterP>(*this, name, mode,
mpiComm);
std::move(comm));
#else
throw std::invalid_argument("ERROR: this version didn't compile with "
"HDF5 library, can't use HDF5 engine\n");
Expand All @@ -628,11 +625,11 @@ Engine &IO::Open(const std::string &name, const Mode mode,
{
#ifdef ADIOS2_HAVE_MPI
if (mode == Mode::Read)
engine = std::make_shared<engine::InSituMPIReader>(*this, name,
mode, mpiComm);
engine = std::make_shared<engine::InSituMPIReader>(
*this, name, mode, std::move(comm));
else
engine = std::make_shared<engine::InSituMPIWriter>(*this, name,
mode, mpiComm);
engine = std::make_shared<engine::InSituMPIWriter>(
*this, name, mode, std::move(comm));
#else
throw std::invalid_argument("ERROR: this version didn't compile with "
"MPI, can't use InSituMPI engine\n");
Expand All @@ -642,24 +639,24 @@ Engine &IO::Open(const std::string &name, const Mode mode,
{
if (mode == Mode::Read)
engine = std::make_shared<engine::SkeletonReader>(*this, name, mode,
mpiComm);
std::move(comm));
else
engine = std::make_shared<engine::SkeletonWriter>(*this, name, mode,
mpiComm);
std::move(comm));
}
else if (engineTypeLC == "inline")
{
if (mode == Mode::Read)
engine = std::make_shared<engine::InlineReader>(*this, name, mode,
mpiComm);
std::move(comm));
else
engine = std::make_shared<engine::InlineWriter>(*this, name, mode,
mpiComm);
std::move(comm));
}
else if (engineTypeLC == "null")
{
engine =
std::make_shared<engine::NullEngine>(*this, name, mode, mpiComm);
engine = std::make_shared<engine::NullEngine>(*this, name, mode,
std::move(comm));
}
else if (engineTypeLC == "nullcore")
{
Expand All @@ -668,7 +665,7 @@ Engine &IO::Open(const std::string &name, const Mode mode,
"ERROR: nullcore engine does not support read mode");
else
engine = std::make_shared<engine::NullCoreWriter>(*this, name, mode,
mpiComm);
std::move(comm));
}
else
{
Expand Down Expand Up @@ -698,7 +695,7 @@ Engine &IO::Open(const std::string &name, const Mode mode,

Engine &IO::Open(const std::string &name, const Mode mode)
{
return Open(name, mode, m_MPIComm);
return Open(name, mode, m_ADIOS.GetComm().AsMPI());
}

Engine &IO::GetEngine(const std::string &name)
Expand Down
Loading