diff --git a/dipu/torch_dipu/csrc_dipu/vendor/droplet/CMakeLists.txt b/dipu/torch_dipu/csrc_dipu/vendor/droplet/CMakeLists.txt index e38bde029..2e7a877a1 100644 --- a/dipu/torch_dipu/csrc_dipu/vendor/droplet/CMakeLists.txt +++ b/dipu/torch_dipu/csrc_dipu/vendor/droplet/CMakeLists.txt @@ -5,28 +5,6 @@ set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} ${CMAKE_CURRENT_SOURCE_DIR}/cmake) set(DROPLET_INSTALL "/usr/local/tangrt") include(cmake/FindDropletCompiler.cmake) -set(USE_PCCL "AUTO" CACHE STRING "Whether to compile with or without PCCL. AUTO(default): auto-detecting; 1|ON|YES|TRUE|Y: force compiling with PCCL; 0|OFF|NO|FALSE|N: force compiling without PCCL") - -find_package(PCCL) -if (USE_PCCL) - if ("${USE_PCCL}" AND NOT PCCL_FOUND) - message(FATAL_ERROR "Set to force compiling with PCCL, but PCCL is not found.") - endif() - if(PCCL_FOUND) - message(STATUS "Use PCCL to implement communicator") - message(STATUS "PCCL_LIBRARY: " ${PCCL_LIBRARY}) - message(STATUS "PCCL_INCLUDE_DIR: " ${PCCL_INCLUDE_DIR}) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DUSE_PCCL" PARENT_SCOPE) - set(DIPU_VENDOR_LIB ${DIPU_VENDOR_LIB} ${PCCL_LIBRARY}) - set(VENDOR_INCLUDE_DIRS ${VENDOR_INCLUDE_DIRS} ${PCCL_INCLUDE_DIR}) - else() - message(STATUS "PCCL not found, i.e. not supporting distributed on droplet") - endif() -else() - message(STATUS "Force compiling without PCCL, i.e. not supporting distributed on droplet") -endif() - - set(VENDOR_INCLUDE_DIRS ${VENDOR_INCLUDE_DIRS} ${DROPLET_INSTALL}/include PARENT_SCOPE) set(VENDOR_LIB_DIRS ${VENDOR_LIB_DIRS} ${DROPLET_INSTALL}/lib/linux-x86_64 PARENT_SCOPE) set(DIPU_VENDOR_LIB ${DIPU_VENDOR_LIB} tangrt_shared PARENT_SCOPE) diff --git a/dipu/torch_dipu/csrc_dipu/vendor/droplet/communicatorimpl.cpp b/dipu/torch_dipu/csrc_dipu/vendor/droplet/communicatorimpl.cpp index d23cd0936..d459d2958 100644 --- a/dipu/torch_dipu/csrc_dipu/vendor/droplet/communicatorimpl.cpp +++ b/dipu/torch_dipu/csrc_dipu/vendor/droplet/communicatorimpl.cpp @@ -2,24 +2,22 @@ #include #include +#include "pccl.h" + #include +#include #include "csrc_dipu/runtime/device/basedef.h" #include "csrc_dipu/runtime/devproxy/deviceproxy.h" -#ifdef USE_PCCL -#include -#endif // USE_PCCL -#include - #include #include +#include "vendorapi.h" + namespace dipu { namespace devapis { -#ifdef USE_PCCL - #define LINE_HELPER1(x) #x #define LINE_HELPER2(x) LINE_HELPER1(x) #define LOCATION __FILE__ " : " LINE_HELPER2(__LINE__) @@ -178,141 +176,6 @@ DIPU_API diclResult_t diclRecv(void* recvbuff, size_t count, return DICL_SUCCESS; } -#else // USE_PCCL - -namespace { - -using diclCommValue_t = std::remove_pointer_t; -constexpr diclCommValue_t kMagicComm = 0x5043434C; // "PCCL" - -diclComm_t createDiclComm() { return new diclCommValue_t(kMagicComm); } - -void destroyDiclComm(diclComm_t comm) { delete comm; } - -void checkCommOrThrow(diclComm_t comm) { - if (comm == nullptr || *comm != kMagicComm) { - throw std::runtime_error("Invalid comm."); - } -} - -[[noreturn]] void throwNotSupportedError() { - throw std::runtime_error( - "PCCL is not enabled. DIPU only allows single GPU communication."); -} - -void checkNrankOrThrow(int nranks) { - if (nranks != 1) { - throwNotSupportedError(); - } -} - -void checkRankOrThrow(int rank) { - if (rank != 0) { - throwNotSupportedError(); - } -} - -void singleDeviceMemcpy(deviceStream_t stream, void* dst, const void* src, - size_t nbytes) { - auto device = devproxy::current_device(); - devproxy::memCopyD2DAsync(stream, nbytes, device, dst, device, src); -} - -} // namespace - -const int DICL_UNIQUE_ID_BYTES_SIZE = 0; - -DIPU_API diclResult_t diclGetCommAsyncError(diclComm_t comm) { - checkCommOrThrow(comm); - return DICL_SUCCESS; -} - -DIPU_API diclResult_t diclGetUniqueId(commUniqueId* uniqueId) { - return DICL_SUCCESS; -} - -DIPU_API diclResult_t diclCommInitRank(diclComm_t* comm, int nranks, - commUniqueId uniqueId, int rank, - int localDeviceId) { - checkNrankOrThrow(nranks); - checkRankOrThrow(rank); - DIPU_LOGW( - "PCCL is not enabled. DIPU will simulate single GPU " - "communication using memcpy."); - *comm = createDiclComm(); - return DICL_SUCCESS; -} - -DIPU_API diclResult_t diclCommDestroy(diclComm_t comm) { - checkCommOrThrow(comm); - destroyDiclComm(comm); - return DICL_SUCCESS; -} - -DIPU_API diclResult_t diclAllReduce(const void* sendbuff, void* recvbuff, - size_t count, at::ScalarType datatype, - const ReduceOp& reduceOp, diclComm_t comm, - deviceStream_t stream) { - checkCommOrThrow(comm); - singleDeviceMemcpy(stream, recvbuff, sendbuff, - count * at::elementSize(datatype)); - return DICL_SUCCESS; -} - -DIPU_API diclResult_t diclBroadcast(const void* sendbuff, void* recvbuff, - size_t count, at::ScalarType datatype, - int root, diclComm_t comm, - deviceStream_t stream) { - checkCommOrThrow(comm); - singleDeviceMemcpy(stream, recvbuff, sendbuff, - count * at::elementSize(datatype)); - return DICL_SUCCESS; -} - -DIPU_API diclResult_t diclAllGather(const void* sendBuf, void* recvBuf, - size_t count, at::ScalarType datatype, - diclComm_t comm, deviceStream_t stream) { - checkCommOrThrow(comm); - singleDeviceMemcpy(stream, recvBuf, sendBuf, - count * at::elementSize(datatype)); - return DICL_SUCCESS; -} - -DIPU_API diclResult_t diclReduce(const void* sendbuff, void* recvbuff, - size_t count, at::ScalarType datatype, - const ReduceOp& reduceOp, int root, - diclComm_t comm, deviceStream_t stream) { - checkCommOrThrow(comm); - checkRankOrThrow(root); - singleDeviceMemcpy(stream, recvbuff, sendbuff, - count * at::elementSize(datatype)); - return DICL_SUCCESS; -} - -DIPU_API diclResult_t diclReduceScatter( - void* sendBuf, void* recvBuf, size_t recvCount, at::ScalarType datatype, - const ReduceOp& reduceOp, diclComm_t comm, deviceStream_t stream) { - singleDeviceMemcpy(stream, recvBuf, sendBuf, - recvCount * at::elementSize(datatype)); - return DICL_SUCCESS; -} - -DIPU_API diclResult_t diclSend(const void* sendbuff, size_t count, - at::ScalarType datatype, int peer, - diclComm_t comm, deviceStream_t stream) { - throwNotSupportedError(); - return DICL_ERR_UNDEF; -} - -DIPU_API diclResult_t diclRecv(void* recvbuff, size_t count, - at::ScalarType datatype, int peer, - diclComm_t comm, deviceStream_t stream) { - throwNotSupportedError(); - return DICL_ERR_UNDEF; -} - -#endif // USE_PCCL - } // end namespace devapis } // end namespace dipu diff --git a/dipu/torch_dipu/csrc_dipu/vendor/droplet/pccl.cpp b/dipu/torch_dipu/csrc_dipu/vendor/droplet/pccl.cpp new file mode 100644 index 000000000..98884e74b --- /dev/null +++ b/dipu/torch_dipu/csrc_dipu/vendor/droplet/pccl.cpp @@ -0,0 +1,210 @@ +/** + * pccl.cpp + * + * Description: + * This file implements the dynamic loading and invocation of PCCL APIs required + * by DICL. If the pccllib.so library is not found, a log message will be + * printed, and a Fallback API will be executed. + * + * Notes: + * - We have copied the PCCL header file. If the PCCL header file are updated, + * please correspondingly update them here. + */ +#include "pccl.h" + +#include +#include + +#include "pcclcommon.h" + +#include +#include + +#include "csrc_dipu/runtime/device/basedef.h" +#include "csrc_dipu/runtime/devproxy/deviceproxy.h" +#include +#include + +namespace { + +#define DIPU_PCCL_IMPL(NAME, RETURN, ...) \ + RETURN NAME(DIPU_TYPE_PARAM(__VA_ARGS__)) { \ + static constexpr const char fstr[] = #NAME; \ + return callPcclImpl(DIPU_PARAM(__VA_ARGS__)); \ + } \ + static RETURN CONCAT(my__, NAME)(DIPU_TYPE_PARAM(__VA_ARGS__)); \ + static const int CONCAT(n_, NAME) = []() { \ + g_pccl_function_map[#NAME] = reinterpret_cast(CONCAT(my__, NAME)); \ + return 0; \ + }(); \ + RETURN CONCAT(my__, NAME)(DIPU_TYPE_PARAM(__VA_ARGS__)) + +#define DIPU_PCCL_COMM_IMPL(NAME, ...) \ + DIPU_PCCL_IMPL(NAME, pcclResult_t, __VA_ARGS__) +#define DIPU_PCCL_ERROR_IMPL(NAME, ...) \ + DIPU_PCCL_IMPL(NAME, const char*, __VA_ARGS__) + +std::map g_pccl_function_map; + +template +ReturnType callPcclImpl(Args... args) { + static const auto functionAddress = getCommPcclFuncAddr(PcclFuncName); + using PcclFuncType = ReturnType (*)(Args...); + static PcclFuncType pcclFunc = reinterpret_cast( + functionAddress != nullptr ? functionAddress + : g_pccl_function_map[PcclFuncName]); + auto pcclCallReturn = pcclFunc(args...); + return pcclCallReturn; +} + +static const std::map toScalarType = { + {pcclInt8, at::kChar}, + {pcclUint8, at::kByte}, + {pcclFloat, at::kFloat}, + // TODO: PCCL not support double now + // {pcclDouble, at::kDouble}, + {pcclInt32, at::kInt}, + {pcclInt64, at::kLong}, + {pcclHalf, at::kHalf}, + {pcclUint8, at::kBool}, + {pcclBfloat16, at::kBFloat16}, +}; + +at::ScalarType PcclDataTypeToScalarType(pcclDataType_t pccl_data_type) { + auto p = toScalarType.find(pccl_data_type); + TORCH_CHECK(p != toScalarType.end(), "Not supported pcclDataType_t: " + + std::to_string(pccl_data_type)); + return p->second; +} + +static const pcclComm_t kMagicComm = reinterpret_cast(0x5043434C); + +void checkCommOrThrow(pcclComm_t comm) { + TORCH_CHECK(comm != nullptr && comm == kMagicComm, "Invalid comm."); +} + +[[noreturn]] void throwNotSupportedError() { + TORCH_CHECK( + false, "PCCL is not enabled. DIPU only allows single GPU communication."); +} + +void checkNrankOrThrow(int nranks) { + if (nranks != 1) { + throwNotSupportedError(); + } +} + +void checkRankOrThrow(int rank) { + if (rank != 0) { + throwNotSupportedError(); + } +} + +void singleDeviceMemcpy(dipu::deviceStream_t stream, void* dst, const void* src, + size_t nbytes) { + if (dst != src) { + auto device = dipu::devproxy::current_device(); + dipu::devproxy::memCopyD2DAsync(stream, nbytes, device, dst, device, src); + } +} + +} // namespace + +DIPU_PCCL_COMM_IMPL(pcclGetUniqueId, (pcclUniqueId*, uniqueId)) { + return pcclSuccess; +} + +DIPU_PCCL_COMM_IMPL(pcclCommInitRank, (pcclComm_t*, comm), (int, ndev), + (pcclUniqueId, commIdI), (int, rank)) { + checkNrankOrThrow(ndev); + checkRankOrThrow(rank); + DIPU_LOGW( + "PCCL is not enabled. DIPU will simulate single GPU " + "communication using memcpy."); + *comm = kMagicComm; + return pcclSuccess; +} + +DIPU_PCCL_COMM_IMPL(pcclCommDestroy, (pcclComm_t, comm)) { + checkCommOrThrow(comm); + // destroyDiclComm(comm); + return pcclSuccess; +} + +DIPU_PCCL_COMM_IMPL(pcclCommGetAsyncError, (pcclComm_t, comm), + (pcclResult_t*, asyncError)) { + checkCommOrThrow(comm); + return pcclSuccess; +} + +DIPU_PCCL_ERROR_IMPL(pcclGetErrorString, (pcclResult_t, result)) { + TORCH_CHECK(false, "Fallback pccl impl should not call pcclGetErrorString"); +} + +DIPU_PCCL_ERROR_IMPL(pcclGetLastError, (pcclComm_t, comm)) { + TORCH_CHECK(false, "Fallback pccl impl should not call pcclGetLastError"); +} + +DIPU_PCCL_COMM_IMPL(pcclReduce, (const void*, sendbuff), (void*, recvbuff), + (size_t, count), (pcclDataType_t, datatype), + (pcclRedOp_t, op), (int, root), (pcclComm_t, comm), + (tangStream_t, stream)) { + checkCommOrThrow(comm); + checkRankOrThrow(root); + singleDeviceMemcpy( + stream, recvbuff, sendbuff, + count * at::elementSize(PcclDataTypeToScalarType(datatype))); + return pcclSuccess; +} + +DIPU_PCCL_COMM_IMPL(pcclAllReduce, (const void*, sendbuff), (void*, recvbuff), + (size_t, count), (pcclDataType_t, datatype), + (pcclRedOp_t, op), (pcclComm_t, comm), + (tangStream_t, stream)) { + checkCommOrThrow(comm); + singleDeviceMemcpy( + stream, recvbuff, sendbuff, + count * at::elementSize(PcclDataTypeToScalarType(datatype))); + return pcclSuccess; +} + +DIPU_PCCL_COMM_IMPL(pcclReduceScatter, (const void*, sendbuff), + (void*, recvbuff), (size_t, recvcount), + (pcclDataType_t, datatype), (pcclRedOp_t, op), + (pcclComm_t, comm), (tangStream_t, stream)) { + singleDeviceMemcpy( + stream, recvbuff, sendbuff, + recvcount * at::elementSize(PcclDataTypeToScalarType(datatype))); + return pcclSuccess; +} + +DIPU_PCCL_COMM_IMPL(pcclBroadcast, (const void*, sendbuff), (void*, recvbuff), + (size_t, count), (pcclDataType_t, datatype), (int, root), + (pcclComm_t, comm), (tangStream_t, stream)) { + checkCommOrThrow(comm); + singleDeviceMemcpy( + stream, recvbuff, sendbuff, + count * at::elementSize(PcclDataTypeToScalarType(datatype))); + return pcclSuccess; +} +DIPU_PCCL_COMM_IMPL(pcclAllGather, (const void*, sendbuff), (void*, recvbuff), + (size_t, count), (pcclDataType_t, datatype), + (pcclComm_t, comm), (tangStream_t, stream)) { + checkCommOrThrow(comm); + singleDeviceMemcpy( + stream, recvbuff, sendbuff, + count * at::elementSize(PcclDataTypeToScalarType(datatype))); + return pcclSuccess; +} +DIPU_PCCL_COMM_IMPL(pcclSend, (const void*, sendbuff), (size_t, count), + (pcclDataType_t, datatype), (int, peer), (pcclComm_t, comm), + (tangStream_t, stream)) { + throwNotSupportedError(); + return pcclInvalidUsage; +} +DIPU_PCCL_COMM_IMPL(pcclRecv, (void*, recvbuff), (size_t, count), + (pcclDataType_t, datatype), (int, peer), (pcclComm_t, comm), + (tangStream_t, stream)) { + throwNotSupportedError(); + return pcclInvalidUsage; +} diff --git a/dipu/torch_dipu/csrc_dipu/vendor/droplet/pccl.h b/dipu/torch_dipu/csrc_dipu/vendor/droplet/pccl.h new file mode 100644 index 000000000..9579532e7 --- /dev/null +++ b/dipu/torch_dipu/csrc_dipu/vendor/droplet/pccl.h @@ -0,0 +1,261 @@ +#ifndef __PCCL_API_H__ +#define __PCCL_API_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tang_rt/driver_types.h" + +#define PCCL_UNIQUE_ID_BYTES 128 +typedef struct { + char internal[PCCL_UNIQUE_ID_BYTES]; +} pcclUniqueId; + +/* Opaque handle to communicator */ +typedef struct pcclComm* pcclComm_t; + +/* Error enum */ +typedef enum { + pcclSuccess = 0, + pcclUnhandledTangError = 1, + pcclSystemError = 2, + pcclInternalError = 3, + pcclInvalidArgument = 4, + pcclInvalidUsage = 5, + pcclRemoteError = 6, + pcclInProgress = 7, + pcclInvalidDeviceIndex = 8, + pccl_NUM_RESULTS +} pcclResult_t; + +/* description : Generates a unique Id with each call + * input : pcclUniqueId type pointer + * output : 0:pcclSuccess, other failure + * note : N/A + */ +pcclResult_t pcclGetUniqueId(pcclUniqueId* uniqueId); + +/* description : Creates a new communicator + * input : comm, created communicator on tang device + * : ndev, number of logical devices + * : commId, unique Id for communicator + * : rank, must be between 0 and ndev-1 + * output : 0:pcclSuccess, other failure + * note : the func implicitly syncronizes with other ranks, so INIT OF + * EACH RANK MUST BE CALLED IN A SEPARATE HOST THREADS to avoid deadlock. + */ +pcclResult_t pcclCommInitRank(pcclComm_t* comm, int ndev, pcclUniqueId commId, + int rank); + +/* description : Creates a clique of communicators + * input : comms, should be pre-allocated with size at least + * ndev*sizeof(pcclComm_t) : ndev, number of logical devices : devlist, the set + * of dev pointer, if NULL, first device to ndev used output : + * 0:pcclSuccess, other failure note : This is a convenience function to + * create a single-process communicator clique + */ +pcclResult_t pcclCommInitAll(pcclComm_t* comms, int ndev, const int* devlist); + +/* description : Frees resources associated with communicator object + * input : comm, the communicator + * output : void + * note : N/A + */ +pcclResult_t pcclCommDestroy(pcclComm_t comm); + +/* description : communicator abort to ask device kernel to quit + * input : comm, the communicator + * output : void + * note : N/A + */ +pcclResult_t pcclCommAbort(pcclComm_t comm); + +/* description : Get communicator async error + * input : comm, the communicator + * output : asyncError, the out value error + * note : N/A + */ +pcclResult_t pcclCommGetAsyncError(pcclComm_t comm, pcclResult_t* asyncError); + +/* description : Returns human error message + * input : result, the result flag + * output : readable error string + * note : N/A + */ +const char* pcclGetErrorString(pcclResult_t result); + +const char* pcclGetLastError(pcclComm_t comm); + +/* description : get the number of devices in the communicator clique + * input : comm, the communicator + * : count, return value pointer + * output : 0:pcclSuccess, other failure + * note : N/A + */ +pcclResult_t pcclCommCount(const pcclComm_t comm, int* count); + +/* description : get tang device number associated with communicator + * input : comm, the communicator + * : device, return value pointer + * output : 0:pcclSuccess, other failure + * note : N/A + */ +pcclResult_t pcclCommCuDevice(const pcclComm_t comm, int* device); + +/* description : get user-ordered "rank" assocaiated with communicator + * input : comm, the communicator + * : rank, return value pointer + * output : 0:pcclSuccess, other failure + * note : N/A + */ +pcclResult_t pcclCommUserRank(const pcclComm_t comm, int* rank); + +/* description : get pccl lib version + * input : version, the pointers + * output : 0:pcclSuccess, other failure + * note : N/A + */ +pcclResult_t pcclGetVersion(int* version); + +/* Reduction opperation selector */ +typedef enum { + pcclSum = 0, + pcclProd = 1, + pcclMax = 2, + pcclMin = 3, + pcclAvg = 4, + pcclOpsNum = 5, + pcclNull = pcclOpsNum +} pcclRedOp_t; + +/* Data types unspported double */ +typedef enum { + pcclChar = 0, + pcclInt8 = pcclChar, + pcclUint8 = 1, + pcclInt = 2, + pcclInt32 = pcclInt, + pcclUint32 = 3, + pcclInt64 = 4, + pcclUint64 = 5, + pcclHalf = 6, + pcclFloat16 = pcclHalf, + pcclFloat = 7, + pcclFloat32 = pcclFloat, + pcclBfloat16 = 8, + pcclTypesNum +} pcclDataType_t; + +/* description : Reduces + * input : sendbuff, input data buffer + * : recvbuff, output data buffer + * : count, data size + * : datatype, data type + * : op, reduce op + * : root, root device + * : comm, communicator + * : stream, if null,used default + * output : 0:pcclSuccess, other failure + * note : recvbuf may be NULL on all calls except for root device, + * sendbuff and recvbuff are assumed to reside on root device + */ +pcclResult_t pcclReduce(const void* sendbuff, void* recvbuf, size_t count, + pcclDataType_t datatype, pcclRedOp_t op, int root, + pcclComm_t comm, tangStream_t stream); + +/* description : AllReduces + * input : sendbuff, input data buffer + * : recvbuff, output data buffer + * : count, data size + * : datatype, data type + * : op, reduce op + * : comm, communicator + * : stream, if null,used default + * output : 0:pcclSuccess, other failure + * note : N/A + */ +pcclResult_t pcclAllReduce(const void* sendbuff, void* recvbuff, size_t count, + pcclDataType_t datatype, pcclRedOp_t op, + pcclComm_t comm, tangStream_t stream); + +/* description : ReducesScatter + * input : sendbuff, input data buffer + * : recvbuff, output data buffer + * : recvcount, i-th block data size + * : datatype, data type + * : op, reduce op + * : comm, communicator + * : stream, if null,used default + * output : 0:pcclSuccess, other failure + * note : N/A + */ +pcclResult_t pcclReduceScatter(const void* sendbuff, void* recvbuff, + size_t recvcount, pcclDataType_t datatype, + pcclRedOp_t op, pcclComm_t comm, + tangStream_t stream); + +/* description : Broadcast + * input : buff, input data buffer + * : count, data size + * : datatype, data type + * : root, root device + * : comm, communicator + * : stream, if null,used default + * output : 0:pcclSuccess, other failure + * note : Must be called separately for each communicator in + * communicator clique + */ +pcclResult_t pcclBroadcast(const void* sendbuff, void* recvbuff, size_t count, + pcclDataType_t datatype, int root, pcclComm_t comm, + tangStream_t stream); + +/* description : AllGather + * input : sendbuff, input data buffer + * : recvbuff, output data buffer + * : count, data size + * : datatype, data type + * : comm, communicator + * : stream, if null,used default + * output : 0:pcclSuccess, other failure + * note : Must be called separately for each communicator in + * communicator clique + */ +pcclResult_t pcclAllGather(const void* sendbuff, void* recvbuff, size_t count, + pcclDataType_t datatype, pcclComm_t comm, + tangStream_t stream); + +/* description : P2P send + * input : sendbuff, input data buffer + * : count, data size + * : datatype, data type + * : peer, the send rank index + * : comm, communicator + * : stream, if null,used default + * output : 0:pcclSuccess, other failure + * note : Must be called pcclRecv in group protect + */ +pcclResult_t pcclSend(const void* sendbuff, size_t count, + pcclDataType_t datatype, int peer, pcclComm_t comm, + tangStream_t stream); + +/* description : P2P recv + * : recvbuff, output data buffer + * : count, data size + * : datatype, data type + * : peer, recv data from rank index + * : comm, communicator + * : stream, if null,used default + * output : 0:pcclSuccess, other failure + * note : Must be called pcclSend in group protect + */ +pcclResult_t pcclRecv(void* recvbuff, size_t count, pcclDataType_t datatype, + int peer, pcclComm_t comm, tangStream_t stream); + +pcclResult_t pcclGroupStart(void); +pcclResult_t pcclGroupEnd(void); + +#ifdef __cplusplus +} +#endif +#endif // end __PCCL_API_H__ diff --git a/dipu/torch_dipu/csrc_dipu/vendor/droplet/pcclcommon.h b/dipu/torch_dipu/csrc_dipu/vendor/droplet/pcclcommon.h new file mode 100644 index 000000000..99119b89e --- /dev/null +++ b/dipu/torch_dipu/csrc_dipu/vendor/droplet/pcclcommon.h @@ -0,0 +1,90 @@ +#include // For dlsym, dlopen, dlerror +#include +#include +#include // For std::runtime_error +#include // For std::string + +inline void* getCommPcclFuncAddrInLib(void* handler, const char* libName, + const char* apiName) { + void* funcAddr = dlsym(handler, apiName); + if (funcAddr == nullptr) { + std::cerr << "Warning: [" << __FILE__ << ":" << __LINE__ << "] " + << __FUNCTION__ << ": dlsym " << apiName << " from " << libName + << " failed, error: " << dlerror() << std::endl; + } + return funcAddr; +} + +inline void* getCommPcclLibHandler(const char* libName) { + auto handler = dlopen(libName, RTLD_LAZY); + if (handler == nullptr) { + std::cerr << "Warning: " << __FILE__ << ":" << __LINE__ << " " + << __FUNCTION__ << " dlopen " << libName << " failed" + << std::endl; + } + return handler; +} + +inline void* getCommPcclFuncAddr(const char* apiName) { + constexpr const char pcclLibName[] = "libpccl.so"; + constexpr const char pcclLibDependName[] = "libtangrt_shared.so"; + static void* pcclHandler = getCommPcclLibHandler(pcclLibName); + if (pcclHandler == nullptr) { + std::cerr << "Fallback " << apiName << " will be called" << std::endl; + return nullptr; + } + return getCommPcclFuncAddrInLib(pcclHandler, pcclLibName, apiName); +} + +#define EXPAND(x) x +#define DIPU_GET_MACRO(_1, _2, _3, _4, _5, _6, _7, _8, _9, NAME, ...) NAME +#define DIPU_TYPE_PARAM(...) \ + EXPAND(DIPU_GET_MACRO( \ + __VA_ARGS__, DIPU_TYPE_PARAM_9, DIPU_TYPE_PARAM_8, DIPU_TYPE_PARAM_7, \ + DIPU_TYPE_PARAM_6, DIPU_TYPE_PARAM_5, DIPU_TYPE_PARAM_4, \ + DIPU_TYPE_PARAM_3, DIPU_TYPE_PARAM_2, DIPU_TYPE_PARAM_1)(__VA_ARGS__)) +#define DIPU_FORMAT_TYPE_PARAM(T, ...) T __VA_ARGS__ +#define DIPU_TYPE_PARAM_1(TP1) DIPU_FORMAT_TYPE_PARAM TP1 +#define DIPU_TYPE_PARAM_2(TP1, TP2) \ + DIPU_FORMAT_TYPE_PARAM TP1, DIPU_TYPE_PARAM_1(TP2) +#define DIPU_TYPE_PARAM_3(TP1, TP2, TP3) \ + DIPU_FORMAT_TYPE_PARAM TP1, DIPU_TYPE_PARAM_2(TP2, TP3) +#define DIPU_TYPE_PARAM_4(TP1, TP2, TP3, TP4) \ + DIPU_FORMAT_TYPE_PARAM TP1, DIPU_TYPE_PARAM_3(TP2, TP3, TP4) +#define DIPU_TYPE_PARAM_5(TP1, TP2, TP3, TP4, TP5) \ + DIPU_FORMAT_TYPE_PARAM TP1, DIPU_TYPE_PARAM_4(TP2, TP3, TP4, TP5) +#define DIPU_TYPE_PARAM_6(TP1, TP2, TP3, TP4, TP5, TP6) \ + DIPU_FORMAT_TYPE_PARAM TP1, DIPU_TYPE_PARAM_5(TP2, TP3, TP4, TP5, TP6) +#define DIPU_TYPE_PARAM_7(TP1, TP2, TP3, TP4, TP5, TP6, TP7) \ + DIPU_FORMAT_TYPE_PARAM TP1, DIPU_TYPE_PARAM_6(TP2, TP3, TP4, TP5, TP6, TP7) +#define DIPU_TYPE_PARAM_8(TP1, TP2, TP3, TP4, TP5, TP6, TP7, TP8) \ + DIPU_FORMAT_TYPE_PARAM TP1, \ + DIPU_TYPE_PARAM_7(TP2, TP3, TP4, TP5, TP6, TP7, TP8) +#define DIPU_TYPE_PARAM_9(TP1, TP2, TP3, TP4, TP5, TP6, TP7, TP8, TP9) \ + DIPU_FORMAT_TYPE_PARAM TP1, \ + DIPU_TYPE_PARAM_8(TP2, TP3, TP4, TP5, TP6, TP7, TP8, TP9) +#define DIPU_PARAM(...) \ + EXPAND(DIPU_GET_MACRO(__VA_ARGS__, DIPU_PARAM_9, DIPU_PARAM_8, DIPU_PARAM_7, \ + DIPU_PARAM_6, DIPU_PARAM_5, DIPU_PARAM_4, \ + DIPU_PARAM_3, DIPU_PARAM_2, \ + DIPU_PARAM_1)(__VA_ARGS__)) +#define DIPU_FORMAT_PARAM(T, ...) __VA_ARGS__ +#define DIPU_PARAM_1(TP1) DIPU_FORMAT_PARAM TP1 +#define DIPU_PARAM_2(TP1, TP2) DIPU_FORMAT_PARAM TP1, DIPU_PARAM_1(TP2) +#define DIPU_PARAM_3(TP1, TP2, TP3) \ + DIPU_FORMAT_PARAM TP1, DIPU_PARAM_2(TP2, TP3) +#define DIPU_PARAM_4(TP1, TP2, TP3, TP4) \ + DIPU_FORMAT_PARAM TP1, DIPU_PARAM_3(TP2, TP3, TP4) +#define DIPU_PARAM_5(TP1, TP2, TP3, TP4, TP5) \ + DIPU_FORMAT_PARAM TP1, DIPU_PARAM_4(TP2, TP3, TP4, TP5) +#define DIPU_PARAM_6(TP1, TP2, TP3, TP4, TP5, TP6) \ + DIPU_FORMAT_PARAM TP1, DIPU_PARAM_5(TP2, TP3, TP4, TP5, TP6) +#define DIPU_PARAM_7(TP1, TP2, TP3, TP4, TP5, TP6, TP7) \ + DIPU_FORMAT_PARAM TP1, DIPU_PARAM_6(TP2, TP3, TP4, TP5, TP6, TP7) +#define DIPU_PARAM_8(TP1, TP2, TP3, TP4, TP5, TP6, TP7, TP8) \ + DIPU_FORMAT_PARAM TP1, DIPU_PARAM_7(TP2, TP3, TP4, TP5, TP6, TP7, TP8) +#define DIPU_PARAM_9(TP1, TP2, TP3, TP4, TP5, TP6, TP7, TP8, TP9) \ + DIPU_FORMAT_PARAM TP1, DIPU_PARAM_8(TP2, TP3, TP4, TP5, TP6, TP7, TP8, TP9) + +#define DIPU_CONCAT_IMPL(x, y) x##y +#define CONCAT(x, y) DIPU_CONCAT_IMPL(x, y) diff --git a/dipu/torch_dipu/csrc_dipu/vendor/droplet/vendorapi.h b/dipu/torch_dipu/csrc_dipu/vendor/droplet/vendorapi.h index 91ee934f0..f8f6270e2 100644 --- a/dipu/torch_dipu/csrc_dipu/vendor/droplet/vendorapi.h +++ b/dipu/torch_dipu/csrc_dipu/vendor/droplet/vendorapi.h @@ -1,12 +1,12 @@ #pragma once #include -#include -#ifdef USE_PCCL -#include -#endif // USE_PCCL +#include "pccl.h" #include +#include + +#include "csrc_dipu/vendor/droplet/pccl.h" #include namespace dipu { @@ -26,12 +26,7 @@ using deviceStream_t = tangStream_t; #define deviceDefaultStreamLiteral nullptr using deviceEvent_t = tangEvent_t; using deviceHandle_t = tangContext_t*; -#ifdef USE_PCCL using diclComm_t = pcclComm_t; using commUniqueId = pcclUniqueId; -#else // USE_PCCL -using diclComm_t = uint32_t*; -struct commUniqueId {}; -#endif // USE_PCCL } // namespace dipu