From 501cad94d73fd748f0e6410c1291b42fd6e1a17c Mon Sep 17 00:00:00 2001 From: "yulu.jia" Date: Wed, 27 Apr 2022 03:06:16 +0800 Subject: [PATCH] update shm naming scheme use the hex representation of the tensor key in shm names. It's easier to tell the operation type, tensor id and partition number etc from the hex representation. Signed-off-by: yulu.jia --- 3rdparty/ps-lite | 2 +- byteps/common/communicator.cc | 5 +++-- byteps/common/global.cc | 2 ++ byteps/common/global.h | 4 ++++ byteps/common/operations.cc | 6 ++++-- byteps/common/shared_memory.cc | 23 ++++++++++++++--------- byteps/common/shared_memory.h | 2 +- 7 files changed, 29 insertions(+), 15 deletions(-) diff --git a/3rdparty/ps-lite b/3rdparty/ps-lite index 78b7fe49db..91808a8cad 160000 --- a/3rdparty/ps-lite +++ b/3rdparty/ps-lite @@ -1 +1 @@ -Subproject commit 78b7fe49db47ee293a4fd3cd867635e07c584627 +Subproject commit 91808a8cad88cbee36dd9413663cebb9993df966 diff --git a/byteps/common/communicator.cc b/byteps/common/communicator.cc index 1ed9b238d9..70157d1996 100644 --- a/byteps/common/communicator.cc +++ b/byteps/common/communicator.cc @@ -102,8 +102,9 @@ void BytePSCommSocket::init(int* rank, int* size, int* local_rank, _recv_path = std::string(getenv("BYTEPS_SOCKET_PATH")) + std::string("/socket_recv_"); } else { - _send_path = std::string(DEFAULT_BASE_SOCKET_PATH_SEND); - _recv_path = std::string(DEFAULT_BASE_SOCKET_PATH_RECV); + auto job_id = BytePSGlobal::GetJobId(); + _send_path = std::string(DEFAULT_BASE_SOCKET_PATH_SEND) + job_id + "_"; + _recv_path = std::string(DEFAULT_BASE_SOCKET_PATH_RECV) + job_id + "_"; } _send_fd = initSocket(_local_rank, _send_path); diff --git a/byteps/common/global.cc b/byteps/common/global.cc index 8db5bce27f..94e4a053f9 100644 --- a/byteps/common/global.cc +++ b/byteps/common/global.cc @@ -80,6 +80,7 @@ unsigned int next_key_ = 0; cudaStream_t* BytePSGlobal::_copy_device2host_stream = NULL; cudaStream_t* BytePSGlobal::_copy_host2device_stream = NULL; std::shared_ptr BytePSGlobal::_nccl_manager; +std::string BytePSGlobal::_job_id = "0"; std::shared_ptr BytePSGlobal::_cpu_reducer; std::shared_ptr BytePSGlobal::_thread_pool; @@ -123,6 +124,7 @@ void BytePSGlobal::Init() { ? std::string(getenv("BYTEPS_TRACE_DIR")) : "./trace"; + _job_id = getenv("BYTEPS_JOB_ID") ? std::string(getenv("BYTEPS_JOB_ID")) : "0"; _basic_comm = std::make_shared(); _basic_comm->init(&_rank, &_size, &_local_rank, &_local_size, &_worker_id, diff --git a/byteps/common/global.h b/byteps/common/global.h index 387e84f244..10ea981397 100644 --- a/byteps/common/global.h +++ b/byteps/common/global.h @@ -72,6 +72,7 @@ class BytePSGlobal { } static bool IsRootDevice() { return _is_root_device; } static bool IsDistributed() { return _is_distributed_job; } + static std::string GetJobId() { return _job_id; } static bool IsCrossPcieSwitch() { return _is_cross_pcie_switch; } static BytePSRole GetMyRole() { return _my_role; } static std::shared_ptr GetBasicComm() { return _basic_comm; } @@ -209,6 +210,9 @@ class BytePSGlobal { } static int _pagesize; + // unique identifier for the current application to avoid resource conflict + // (e.g. shared memory name, socket name, etc) + static std::string _job_id; static size_t DivUp(size_t x, size_t y) { return (x + y - 1) / y; } static size_t RoundUp(size_t x, size_t y) { return DivUp(x, y) * y; } diff --git a/byteps/common/operations.cc b/byteps/common/operations.cc index b118062196..a876ae46f4 100644 --- a/byteps/common/operations.cc +++ b/byteps/common/operations.cc @@ -344,11 +344,13 @@ void InitTensor(BPSContext &context, size_t size, int dtype, void *cpubuff) { size_t aligned_size = Align(size, dtype); if (BytePSGlobal::IsCrossPcieSwitch()) { + auto shm_prefix = std::string("BytePS_Pcie_") + BytePSGlobal::GetJobId(); context.pcie_cpubuff = - shm_obj->openPcieSharedMemory(key_list[0], aligned_size); + shm_obj->openPcieSharedMemory(shm_prefix, key_list[0], aligned_size); context.cpubuff = context.pcie_cpubuff.back(); } else { - context.cpubuff = shm_obj->openSharedMemory(std::string("BytePS_ShM_"), + auto shm_prefix = std::string("BytePS_ShM_") + BytePSGlobal::GetJobId() + "_"; + context.cpubuff = shm_obj->openSharedMemory(shm_prefix, key_list[0], aligned_size); } BPS_LOG(TRACE) << name << ": open shared memory size " << aligned_size; diff --git a/byteps/common/shared_memory.cc b/byteps/common/shared_memory.cc index 4157147a20..7f3161365c 100644 --- a/byteps/common/shared_memory.cc +++ b/byteps/common/shared_memory.cc @@ -29,7 +29,10 @@ void* BytePSSharedMemory::openSharedMemory(const std::string& prefix, uint64_t key, size_t size) { size = BytePSGlobal::RoundUpToPageSize(size); std::string shm_name(prefix); - shm_name += std::to_string(key); + std::stringstream stream; + stream << std::hex << key; + + shm_name += stream.str(); int shm_fd = shm_open(shm_name.c_str(), O_CREAT | O_RDWR, 0666); BPS_CHECK_GE(shm_fd, 0) << "shm_open failed for " << shm_name << " " << strerror(errno); @@ -41,7 +44,8 @@ void* BytePSSharedMemory::openSharedMemory(const std::string& prefix, BPS_CHECK_NE(ptr, (void*)-1) << strerror(errno); - BPS_LOG(TRACE) << "initialized share memory size " << size; + BPS_LOG(DEBUG) << "initialized share memory size " << size << ", name=" << shm_name + << ", key = " << key << "(0x" << stream.str() << ")"; std::lock_guard lock(_shm_mu); _key_shm_addr[shm_name] = ptr; @@ -49,32 +53,33 @@ void* BytePSSharedMemory::openSharedMemory(const std::string& prefix, return ptr; } -std::vector BytePSSharedMemory::openPcieSharedMemory(uint64_t key, +std::vector BytePSSharedMemory::openPcieSharedMemory(const std::string& prefix, + uint64_t key, size_t size) { std::vector r; for (int i = 0; i < BytePSGlobal::GetPcieSwitchNum(); i++) { - auto prefix = std::string("BytePS_Pcie") + std::to_string(i) + "_Shm_"; + auto prefix_i = prefix + std::to_string(i) + "_Shm_"; if (BytePSGlobal::IsDistributed()) { if (BytePSGlobal::IsCrossPcieSwitch()) { if (i <= numa_max_node()) { numa_set_preferred(i); - r.push_back(openSharedMemory(prefix, key, size)); + r.push_back(openSharedMemory(prefix_i, key, size)); numa_set_preferred(-1); } else { numa_set_preferred(numa_max_node()); - r.push_back(openSharedMemory(prefix, key, size)); + r.push_back(openSharedMemory(prefix_i, key, size)); numa_set_preferred(-1); } } else { - r.push_back(openSharedMemory(prefix, key, size)); + r.push_back(openSharedMemory(prefix_i, key, size)); } } else { if (BytePSGlobal::IsCrossPcieSwitch()) { numa_set_interleave_mask(numa_all_nodes_ptr); - r.push_back(openSharedMemory(prefix, key, size)); + r.push_back(openSharedMemory(prefix_i, key, size)); numa_set_interleave_mask(numa_no_nodes_ptr); } else { - r.push_back(openSharedMemory(prefix, key, size)); + r.push_back(openSharedMemory(prefix_i, key, size)); } } } diff --git a/byteps/common/shared_memory.h b/byteps/common/shared_memory.h index 1d4c425c41..e66059c0ac 100644 --- a/byteps/common/shared_memory.h +++ b/byteps/common/shared_memory.h @@ -47,7 +47,7 @@ class BytePSSharedMemory { } void *openSharedMemory(const std::string &prefix, uint64_t key, size_t size); - std::vector openPcieSharedMemory(uint64_t key, size_t size); + std::vector openPcieSharedMemory(const std::string &prefix, uint64_t key, size_t size); private: std::unordered_map _key_shm_addr;