Skip to content

Commit

Permalink
update shm naming scheme
Browse files Browse the repository at this point in the history
use the hex representation of the tensor key in shm names. It's easier
to tell the operation type, tensor id and partition number etc from the
hex representation.

Signed-off-by: yulu.jia <[email protected]>
  • Loading branch information
pleasantrabbit committed Apr 27, 2022
1 parent f5df52f commit 501cad9
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 15 deletions.
2 changes: 1 addition & 1 deletion 3rdparty/ps-lite
5 changes: 3 additions & 2 deletions byteps/common/communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ void BytePSCommSocket::init(int* rank, int* size, int* local_rank,
_recv_path = std::string(getenv("BYTEPS_SOCKET_PATH")) +
std::string("/socket_recv_");
} else {
_send_path = std::string(DEFAULT_BASE_SOCKET_PATH_SEND);
_recv_path = std::string(DEFAULT_BASE_SOCKET_PATH_RECV);
auto job_id = BytePSGlobal::GetJobId();
_send_path = std::string(DEFAULT_BASE_SOCKET_PATH_SEND) + job_id + "_";
_recv_path = std::string(DEFAULT_BASE_SOCKET_PATH_RECV) + job_id + "_";
}

_send_fd = initSocket(_local_rank, _send_path);
Expand Down
2 changes: 2 additions & 0 deletions byteps/common/global.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ unsigned int next_key_ = 0;
cudaStream_t* BytePSGlobal::_copy_device2host_stream = NULL;
cudaStream_t* BytePSGlobal::_copy_host2device_stream = NULL;
std::shared_ptr<NcclManager> BytePSGlobal::_nccl_manager;
std::string BytePSGlobal::_job_id = "0";
std::shared_ptr<CpuReducer> BytePSGlobal::_cpu_reducer;
std::shared_ptr<ThreadPool> BytePSGlobal::_thread_pool;

Expand Down Expand Up @@ -123,6 +124,7 @@ void BytePSGlobal::Init() {
? std::string(getenv("BYTEPS_TRACE_DIR"))
: "./trace";

_job_id = getenv("BYTEPS_JOB_ID") ? std::string(getenv("BYTEPS_JOB_ID")) : "0";
_basic_comm = std::make_shared<BytePSCommSocket>();

_basic_comm->init(&_rank, &_size, &_local_rank, &_local_size, &_worker_id,
Expand Down
4 changes: 4 additions & 0 deletions byteps/common/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class BytePSGlobal {
}
static bool IsRootDevice() { return _is_root_device; }
static bool IsDistributed() { return _is_distributed_job; }
static std::string GetJobId() { return _job_id; }
static bool IsCrossPcieSwitch() { return _is_cross_pcie_switch; }
static BytePSRole GetMyRole() { return _my_role; }
static std::shared_ptr<BytePSComm> GetBasicComm() { return _basic_comm; }
Expand Down Expand Up @@ -209,6 +210,9 @@ class BytePSGlobal {
}

static int _pagesize;
// unique identifier for the current application to avoid resource conflict
// (e.g. shared memory name, socket name, etc)
static std::string _job_id;
static size_t DivUp(size_t x, size_t y) { return (x + y - 1) / y; }
static size_t RoundUp(size_t x, size_t y) { return DivUp(x, y) * y; }

Expand Down
6 changes: 4 additions & 2 deletions byteps/common/operations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,13 @@ void InitTensor(BPSContext &context, size_t size, int dtype, void *cpubuff) {

size_t aligned_size = Align(size, dtype);
if (BytePSGlobal::IsCrossPcieSwitch()) {
auto shm_prefix = std::string("BytePS_Pcie_") + BytePSGlobal::GetJobId();
context.pcie_cpubuff =
shm_obj->openPcieSharedMemory(key_list[0], aligned_size);
shm_obj->openPcieSharedMemory(shm_prefix, key_list[0], aligned_size);
context.cpubuff = context.pcie_cpubuff.back();
} else {
context.cpubuff = shm_obj->openSharedMemory(std::string("BytePS_ShM_"),
auto shm_prefix = std::string("BytePS_ShM_") + BytePSGlobal::GetJobId() + "_";
context.cpubuff = shm_obj->openSharedMemory(shm_prefix,
key_list[0], aligned_size);
}
BPS_LOG(TRACE) << name << ": open shared memory size " << aligned_size;
Expand Down
23 changes: 14 additions & 9 deletions byteps/common/shared_memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ void* BytePSSharedMemory::openSharedMemory(const std::string& prefix,
uint64_t key, size_t size) {
size = BytePSGlobal::RoundUpToPageSize(size);
std::string shm_name(prefix);
shm_name += std::to_string(key);
std::stringstream stream;
stream << std::hex << key;

shm_name += stream.str();
int shm_fd = shm_open(shm_name.c_str(), O_CREAT | O_RDWR, 0666);
BPS_CHECK_GE(shm_fd, 0) << "shm_open failed for " << shm_name << " " << strerror(errno);

Expand All @@ -41,40 +44,42 @@ void* BytePSSharedMemory::openSharedMemory(const std::string& prefix,

BPS_CHECK_NE(ptr, (void*)-1) << strerror(errno);

BPS_LOG(TRACE) << "initialized share memory size " << size;
BPS_LOG(DEBUG) << "initialized share memory size " << size << ", name=" << shm_name
<< ", key = " << key << "(0x" << stream.str() << ")";

std::lock_guard<std::mutex> lock(_shm_mu);
_key_shm_addr[shm_name] = ptr;
_key_shm_size[shm_name] = size;
return ptr;
}

std::vector<void*> BytePSSharedMemory::openPcieSharedMemory(uint64_t key,
std::vector<void*> BytePSSharedMemory::openPcieSharedMemory(const std::string& prefix,
uint64_t key,
size_t size) {
std::vector<void*> r;
for (int i = 0; i < BytePSGlobal::GetPcieSwitchNum(); i++) {
auto prefix = std::string("BytePS_Pcie") + std::to_string(i) + "_Shm_";
auto prefix_i = prefix + std::to_string(i) + "_Shm_";
if (BytePSGlobal::IsDistributed()) {
if (BytePSGlobal::IsCrossPcieSwitch()) {
if (i <= numa_max_node()) {
numa_set_preferred(i);
r.push_back(openSharedMemory(prefix, key, size));
r.push_back(openSharedMemory(prefix_i, key, size));
numa_set_preferred(-1);
} else {
numa_set_preferred(numa_max_node());
r.push_back(openSharedMemory(prefix, key, size));
r.push_back(openSharedMemory(prefix_i, key, size));
numa_set_preferred(-1);
}
} else {
r.push_back(openSharedMemory(prefix, key, size));
r.push_back(openSharedMemory(prefix_i, key, size));
}
} else {
if (BytePSGlobal::IsCrossPcieSwitch()) {
numa_set_interleave_mask(numa_all_nodes_ptr);
r.push_back(openSharedMemory(prefix, key, size));
r.push_back(openSharedMemory(prefix_i, key, size));
numa_set_interleave_mask(numa_no_nodes_ptr);
} else {
r.push_back(openSharedMemory(prefix, key, size));
r.push_back(openSharedMemory(prefix_i, key, size));
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion byteps/common/shared_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class BytePSSharedMemory {
}

void *openSharedMemory(const std::string &prefix, uint64_t key, size_t size);
std::vector<void *> openPcieSharedMemory(uint64_t key, size_t size);
std::vector<void *> openPcieSharedMemory(const std::string &prefix, uint64_t key, size_t size);

private:
std::unordered_map<std::string, void *> _key_shm_addr;
Expand Down

0 comments on commit 501cad9

Please sign in to comment.