Skip to content

Commit

Permalink
Gqw/s2 pccl compile (#942)
Browse files Browse the repository at this point in the history
* add MACRO DIPU_PCCL_IMPL

* pass fake pcclcomm_t

* fix warning return value

* add pcclcomm check

* change name to dipu and delete unuse pcclfunc

* change CMakeLists and fix comm type error

* change workspace name and add ERROR MACRO

* and file comment& add tang_shared_so load & clang format

* clang format

* delete tang_shared.so dependency

* change var name & add TORCH_CHECK

* move if into TORCHCHECK && change error to warning

* fix no pccllib.so error

---------

Co-authored-by: qq <qq>
  • Loading branch information
Gong-air authored Oct 10, 2024
1 parent 9ee5b5b commit aec0e71
Show file tree
Hide file tree
Showing 6 changed files with 570 additions and 173 deletions.
22 changes: 0 additions & 22 deletions dipu/torch_dipu/csrc_dipu/vendor/droplet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,6 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${CMAKE_CURRENT_SOURCE_DIR}/cmake)
set(DROPLET_INSTALL "/usr/local/tangrt")
include(cmake/FindDropletCompiler.cmake)

set(USE_PCCL "AUTO" CACHE STRING "Whether to compile with or without PCCL. AUTO(default): auto-detecting; 1|ON|YES|TRUE|Y: force compiling with PCCL; 0|OFF|NO|FALSE|N: force compiling without PCCL")

find_package(PCCL)
if (USE_PCCL)
if ("${USE_PCCL}" AND NOT PCCL_FOUND)
message(FATAL_ERROR "Set to force compiling with PCCL, but PCCL is not found.")
endif()
if(PCCL_FOUND)
message(STATUS "Use PCCL to implement communicator")
message(STATUS "PCCL_LIBRARY: " ${PCCL_LIBRARY})
message(STATUS "PCCL_INCLUDE_DIR: " ${PCCL_INCLUDE_DIR})
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUSE_PCCL" PARENT_SCOPE)
set(DIPU_VENDOR_LIB ${DIPU_VENDOR_LIB} ${PCCL_LIBRARY})
set(VENDOR_INCLUDE_DIRS ${VENDOR_INCLUDE_DIRS} ${PCCL_INCLUDE_DIR})
else()
message(STATUS "PCCL not found, i.e. not supporting distributed on droplet")
endif()
else()
message(STATUS "Force compiling without PCCL, i.e. not supporting distributed on droplet")
endif()


set(VENDOR_INCLUDE_DIRS ${VENDOR_INCLUDE_DIRS} ${DROPLET_INSTALL}/include PARENT_SCOPE)
set(VENDOR_LIB_DIRS ${VENDOR_LIB_DIRS} ${DROPLET_INSTALL}/lib/linux-x86_64 PARENT_SCOPE)
set(DIPU_VENDOR_LIB ${DIPU_VENDOR_LIB} tangrt_shared PARENT_SCOPE)
Expand Down
147 changes: 5 additions & 142 deletions dipu/torch_dipu/csrc_dipu/vendor/droplet/communicatorimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@
#include <string>
#include <type_traits>

#include "pccl.h"

#include <c10/core/ScalarType.h>
#include <torch/csrc/distributed/c10d/Types.hpp>

#include "csrc_dipu/runtime/device/basedef.h"
#include "csrc_dipu/runtime/devproxy/deviceproxy.h"
#ifdef USE_PCCL
#include <pccl.h>
#endif // USE_PCCL
#include <torch/csrc/distributed/c10d/Types.hpp>

#include <csrc_dipu/common.h>
#include <csrc_dipu/runtime/device/diclapis.h>

#include "vendorapi.h"

namespace dipu {

namespace devapis {

#ifdef USE_PCCL

#define LINE_HELPER1(x) #x
#define LINE_HELPER2(x) LINE_HELPER1(x)
#define LOCATION __FILE__ " : " LINE_HELPER2(__LINE__)
Expand Down Expand Up @@ -178,141 +176,6 @@ DIPU_API diclResult_t diclRecv(void* recvbuff, size_t count,
return DICL_SUCCESS;
}

#else // USE_PCCL

namespace {

using diclCommValue_t = std::remove_pointer_t<diclComm_t>;
constexpr diclCommValue_t kMagicComm = 0x5043434C; // "PCCL"

diclComm_t createDiclComm() { return new diclCommValue_t(kMagicComm); }

void destroyDiclComm(diclComm_t comm) { delete comm; }

void checkCommOrThrow(diclComm_t comm) {
if (comm == nullptr || *comm != kMagicComm) {
throw std::runtime_error("Invalid comm.");
}
}

[[noreturn]] void throwNotSupportedError() {
throw std::runtime_error(
"PCCL is not enabled. DIPU only allows single GPU communication.");
}

void checkNrankOrThrow(int nranks) {
if (nranks != 1) {
throwNotSupportedError();
}
}

void checkRankOrThrow(int rank) {
if (rank != 0) {
throwNotSupportedError();
}
}

void singleDeviceMemcpy(deviceStream_t stream, void* dst, const void* src,
size_t nbytes) {
auto device = devproxy::current_device();
devproxy::memCopyD2DAsync(stream, nbytes, device, dst, device, src);
}

} // namespace

const int DICL_UNIQUE_ID_BYTES_SIZE = 0;

DIPU_API diclResult_t diclGetCommAsyncError(diclComm_t comm) {
checkCommOrThrow(comm);
return DICL_SUCCESS;
}

DIPU_API diclResult_t diclGetUniqueId(commUniqueId* uniqueId) {
return DICL_SUCCESS;
}

DIPU_API diclResult_t diclCommInitRank(diclComm_t* comm, int nranks,
commUniqueId uniqueId, int rank,
int localDeviceId) {
checkNrankOrThrow(nranks);
checkRankOrThrow(rank);
DIPU_LOGW(
"PCCL is not enabled. DIPU will simulate single GPU "
"communication using memcpy.");
*comm = createDiclComm();
return DICL_SUCCESS;
}

DIPU_API diclResult_t diclCommDestroy(diclComm_t comm) {
checkCommOrThrow(comm);
destroyDiclComm(comm);
return DICL_SUCCESS;
}

DIPU_API diclResult_t diclAllReduce(const void* sendbuff, void* recvbuff,
size_t count, at::ScalarType datatype,
const ReduceOp& reduceOp, diclComm_t comm,
deviceStream_t stream) {
checkCommOrThrow(comm);
singleDeviceMemcpy(stream, recvbuff, sendbuff,
count * at::elementSize(datatype));
return DICL_SUCCESS;
}

DIPU_API diclResult_t diclBroadcast(const void* sendbuff, void* recvbuff,
size_t count, at::ScalarType datatype,
int root, diclComm_t comm,
deviceStream_t stream) {
checkCommOrThrow(comm);
singleDeviceMemcpy(stream, recvbuff, sendbuff,
count * at::elementSize(datatype));
return DICL_SUCCESS;
}

DIPU_API diclResult_t diclAllGather(const void* sendBuf, void* recvBuf,
size_t count, at::ScalarType datatype,
diclComm_t comm, deviceStream_t stream) {
checkCommOrThrow(comm);
singleDeviceMemcpy(stream, recvBuf, sendBuf,
count * at::elementSize(datatype));
return DICL_SUCCESS;
}

DIPU_API diclResult_t diclReduce(const void* sendbuff, void* recvbuff,
size_t count, at::ScalarType datatype,
const ReduceOp& reduceOp, int root,
diclComm_t comm, deviceStream_t stream) {
checkCommOrThrow(comm);
checkRankOrThrow(root);
singleDeviceMemcpy(stream, recvbuff, sendbuff,
count * at::elementSize(datatype));
return DICL_SUCCESS;
}

DIPU_API diclResult_t diclReduceScatter(
void* sendBuf, void* recvBuf, size_t recvCount, at::ScalarType datatype,
const ReduceOp& reduceOp, diclComm_t comm, deviceStream_t stream) {
singleDeviceMemcpy(stream, recvBuf, sendBuf,
recvCount * at::elementSize(datatype));
return DICL_SUCCESS;
}

DIPU_API diclResult_t diclSend(const void* sendbuff, size_t count,
at::ScalarType datatype, int peer,
diclComm_t comm, deviceStream_t stream) {
throwNotSupportedError();
return DICL_ERR_UNDEF;
}

DIPU_API diclResult_t diclRecv(void* recvbuff, size_t count,
at::ScalarType datatype, int peer,
diclComm_t comm, deviceStream_t stream) {
throwNotSupportedError();
return DICL_ERR_UNDEF;
}

#endif // USE_PCCL

} // end namespace devapis

} // end namespace dipu
Loading

0 comments on commit aec0e71

Please sign in to comment.