diff --git a/3rdparty/ps-lite b/3rdparty/ps-lite index 78b7fe49d..91808a8ca 160000 --- a/3rdparty/ps-lite +++ b/3rdparty/ps-lite @@ -1 +1 @@ -Subproject commit 78b7fe49db47ee293a4fd3cd867635e07c584627 +Subproject commit 91808a8cad88cbee36dd9413663cebb9993df966 diff --git a/byteps/common/communicator.cc b/byteps/common/communicator.cc index 1ed9b238d..70157d199 100644 --- a/byteps/common/communicator.cc +++ b/byteps/common/communicator.cc @@ -102,8 +102,9 @@ void BytePSCommSocket::init(int* rank, int* size, int* local_rank, _recv_path = std::string(getenv("BYTEPS_SOCKET_PATH")) + std::string("/socket_recv_"); } else { - _send_path = std::string(DEFAULT_BASE_SOCKET_PATH_SEND); - _recv_path = std::string(DEFAULT_BASE_SOCKET_PATH_RECV); + auto job_id = BytePSGlobal::GetJobId(); + _send_path = std::string(DEFAULT_BASE_SOCKET_PATH_SEND) + job_id + "_"; + _recv_path = std::string(DEFAULT_BASE_SOCKET_PATH_RECV) + job_id + "_"; } _send_fd = initSocket(_local_rank, _send_path); diff --git a/byteps/common/global.cc b/byteps/common/global.cc index 8db5bce27..94e4a053f 100644 --- a/byteps/common/global.cc +++ b/byteps/common/global.cc @@ -80,6 +80,7 @@ unsigned int next_key_ = 0; cudaStream_t* BytePSGlobal::_copy_device2host_stream = NULL; cudaStream_t* BytePSGlobal::_copy_host2device_stream = NULL; std::shared_ptr BytePSGlobal::_nccl_manager; +std::string BytePSGlobal::_job_id = "0"; std::shared_ptr BytePSGlobal::_cpu_reducer; std::shared_ptr BytePSGlobal::_thread_pool; @@ -123,6 +124,7 @@ void BytePSGlobal::Init() { ? std::string(getenv("BYTEPS_TRACE_DIR")) : "./trace"; + _job_id = getenv("BYTEPS_JOB_ID") ? std::string(getenv("BYTEPS_JOB_ID")) : "0"; _basic_comm = std::make_shared(); _basic_comm->init(&_rank, &_size, &_local_rank, &_local_size, &_worker_id, diff --git a/byteps/common/global.h b/byteps/common/global.h index 387e84f24..10ea98139 100644 --- a/byteps/common/global.h +++ b/byteps/common/global.h @@ -72,6 +72,7 @@ class BytePSGlobal { } static bool IsRootDevice() { return _is_root_device; } static bool IsDistributed() { return _is_distributed_job; } + static std::string GetJobId() { return _job_id; } static bool IsCrossPcieSwitch() { return _is_cross_pcie_switch; } static BytePSRole GetMyRole() { return _my_role; } static std::shared_ptr GetBasicComm() { return _basic_comm; } @@ -209,6 +210,9 @@ class BytePSGlobal { } static int _pagesize; + // unique identifier for the current application to avoid resource conflict + // (e.g. shared memory name, socket name, etc) + static std::string _job_id; static size_t DivUp(size_t x, size_t y) { return (x + y - 1) / y; } static size_t RoundUp(size_t x, size_t y) { return DivUp(x, y) * y; } diff --git a/byteps/common/operations.cc b/byteps/common/operations.cc index b11806219..a876ae46f 100644 --- a/byteps/common/operations.cc +++ b/byteps/common/operations.cc @@ -344,11 +344,13 @@ void InitTensor(BPSContext &context, size_t size, int dtype, void *cpubuff) { size_t aligned_size = Align(size, dtype); if (BytePSGlobal::IsCrossPcieSwitch()) { + auto shm_prefix = std::string("BytePS_Pcie_") + BytePSGlobal::GetJobId(); context.pcie_cpubuff = - shm_obj->openPcieSharedMemory(key_list[0], aligned_size); + shm_obj->openPcieSharedMemory(shm_prefix, key_list[0], aligned_size); context.cpubuff = context.pcie_cpubuff.back(); } else { - context.cpubuff = shm_obj->openSharedMemory(std::string("BytePS_ShM_"), + auto shm_prefix = std::string("BytePS_ShM_") + BytePSGlobal::GetJobId() + "_"; + context.cpubuff = shm_obj->openSharedMemory(shm_prefix, key_list[0], aligned_size); } BPS_LOG(TRACE) << name << ": open shared memory size " << aligned_size; diff --git a/byteps/common/shared_memory.cc b/byteps/common/shared_memory.cc index 4157147a2..7f3161365 100644 --- a/byteps/common/shared_memory.cc +++ b/byteps/common/shared_memory.cc @@ -29,7 +29,10 @@ void* BytePSSharedMemory::openSharedMemory(const std::string& prefix, uint64_t key, size_t size) { size = BytePSGlobal::RoundUpToPageSize(size); std::string shm_name(prefix); - shm_name += std::to_string(key); + std::stringstream stream; + stream << std::hex << key; + + shm_name += stream.str(); int shm_fd = shm_open(shm_name.c_str(), O_CREAT | O_RDWR, 0666); BPS_CHECK_GE(shm_fd, 0) << "shm_open failed for " << shm_name << " " << strerror(errno); @@ -41,7 +44,8 @@ void* BytePSSharedMemory::openSharedMemory(const std::string& prefix, BPS_CHECK_NE(ptr, (void*)-1) << strerror(errno); - BPS_LOG(TRACE) << "initialized share memory size " << size; + BPS_LOG(DEBUG) << "initialized share memory size " << size << ", name=" << shm_name + << ", key = " << key << "(0x" << stream.str() << ")"; std::lock_guard lock(_shm_mu); _key_shm_addr[shm_name] = ptr; @@ -49,32 +53,33 @@ void* BytePSSharedMemory::openSharedMemory(const std::string& prefix, return ptr; } -std::vector BytePSSharedMemory::openPcieSharedMemory(uint64_t key, +std::vector BytePSSharedMemory::openPcieSharedMemory(const std::string& prefix, + uint64_t key, size_t size) { std::vector r; for (int i = 0; i < BytePSGlobal::GetPcieSwitchNum(); i++) { - auto prefix = std::string("BytePS_Pcie") + std::to_string(i) + "_Shm_"; + auto prefix_i = prefix + std::to_string(i) + "_Shm_"; if (BytePSGlobal::IsDistributed()) { if (BytePSGlobal::IsCrossPcieSwitch()) { if (i <= numa_max_node()) { numa_set_preferred(i); - r.push_back(openSharedMemory(prefix, key, size)); + r.push_back(openSharedMemory(prefix_i, key, size)); numa_set_preferred(-1); } else { numa_set_preferred(numa_max_node()); - r.push_back(openSharedMemory(prefix, key, size)); + r.push_back(openSharedMemory(prefix_i, key, size)); numa_set_preferred(-1); } } else { - r.push_back(openSharedMemory(prefix, key, size)); + r.push_back(openSharedMemory(prefix_i, key, size)); } } else { if (BytePSGlobal::IsCrossPcieSwitch()) { numa_set_interleave_mask(numa_all_nodes_ptr); - r.push_back(openSharedMemory(prefix, key, size)); + r.push_back(openSharedMemory(prefix_i, key, size)); numa_set_interleave_mask(numa_no_nodes_ptr); } else { - r.push_back(openSharedMemory(prefix, key, size)); + r.push_back(openSharedMemory(prefix_i, key, size)); } } } diff --git a/byteps/common/shared_memory.h b/byteps/common/shared_memory.h index 1d4c425c4..e66059c0a 100644 --- a/byteps/common/shared_memory.h +++ b/byteps/common/shared_memory.h @@ -47,7 +47,7 @@ class BytePSSharedMemory { } void *openSharedMemory(const std::string &prefix, uint64_t key, size_t size); - std::vector openPcieSharedMemory(uint64_t key, size_t size); + std::vector openPcieSharedMemory(const std::string &prefix, uint64_t key, size_t size); private: std::unordered_map _key_shm_addr;