diff --git a/demo/quick_start/cluster/cluster_train.sh b/demo/quick_start/cluster/cluster_train.sh index aac9b89b14b98..a7b1f01064b29 100755 --- a/demo/quick_start/cluster/cluster_train.sh +++ b/demo/quick_start/cluster/cluster_train.sh @@ -25,6 +25,7 @@ log_file="$bin_dir/train.log" pushd "$home_dir" cfg=trainer_config.lr.py paddle train \ + --start_pserver=false \ --config=$cfg \ --save_dir=${model_dir} \ --trainer_count=4 \ diff --git a/paddle/pserver/CMakeLists.txt b/paddle/pserver/CMakeLists.txt index 1c1e1964b8d3f..b7f85ea1a6dfd 100644 --- a/paddle/pserver/CMakeLists.txt +++ b/paddle/pserver/CMakeLists.txt @@ -24,13 +24,15 @@ set(PSERVER_SOURCES BaseClient.cpp ParameterClient2.cpp ParameterServer2.cpp - SparseParameterDistribution.cpp) + SparseParameterDistribution.cpp + ParameterServerController.cpp) set(PSERVER_HEADERS BaseClient.h ParameterClient2.h ParameterServer2.h - SparseParameterDistribution.h) + SparseParameterDistribution.h + ParameterServerController.h) add_library(paddle_pserver STATIC ${PSERVER_SOURCES}) diff --git a/paddle/pserver/ParameterServer2Main.cpp b/paddle/pserver/ParameterServer2Main.cpp index ffc521f2c143d..845a2c27e242c 100644 --- a/paddle/pserver/ParameterServer2Main.cpp +++ b/paddle/pserver/ParameterServer2Main.cpp @@ -13,66 +13,17 @@ See the License for the specific language governing permissions and limitations under the License. */ #include -#include "paddle/utils/StringUtil.h" -#include "paddle/utils/Util.h" - -#include "ParameterServer2.h" -#include "RDMANetwork.h" -#include "paddle/utils/Flags.h" +#include "ParameterServerController.h" using namespace paddle; // NOLINT int main(int argc, char** argv) { initMain(argc, argv); - std::vector devices; - std::vector> pservers; - - // round robin to loadbalance RDMA server ENGINE - int rdmaCpu = 0; - int onlineCpus = rdma::numCpus(); - int numPorts = FLAGS_ports_num + FLAGS_ports_num_for_sparse; - if (FLAGS_nics.empty()) { - pservers.resize(numPorts); - for (int i = 0; i < numPorts; ++i) { - if (FLAGS_rdma_tcp == "rdma") { - pservers[i].reset( - new ParameterServer2(std::string(), FLAGS_port + i, rdmaCpu++)); - rdmaCpu = rdmaCpu % onlineCpus; - } else { - pservers[i].reset(new ParameterServer2(std::string(), FLAGS_port + i)); - } - CHECK(pservers[i]->init()) << "Fail to initialize parameter server" - << FLAGS_port + i; - LOG(INFO) << "pserver started : " << FLAGS_port + i; - pservers[i]->start(); - } - } else { - str::split(FLAGS_nics, ',', &devices); - pservers.resize(devices.size() * numPorts); - for (int i = 0; i < numPorts; ++i) { - for (size_t j = 0; j < devices.size(); ++j) { - if (FLAGS_rdma_tcp == "rdma") { - pservers[i * devices.size() + j].reset(new ParameterServer2( - getIpAddr(devices[j]), FLAGS_port + i, rdmaCpu++)); - rdmaCpu = rdmaCpu % onlineCpus; - } else { - pservers[i * devices.size() + j].reset( - new ParameterServer2(getIpAddr(devices[j]), FLAGS_port + i)); - } - CHECK(pservers[i * devices.size() + j]->init()) - << "Fail to initialize parameter server" << devices[j] - << FLAGS_port + i; - LOG(INFO) << "pserver started : " << devices[j] << ":" - << FLAGS_port + i; - pservers[i * devices.size() + j]->start(); - } - } - } - - for (auto& pserver : pservers) { - pserver->join(); - } + std::unique_ptr parameterServerPtr( + paddle::ParameterServerController::createFromGflags()); + parameterServerPtr->start(); + parameterServerPtr->wait(); return 0; } diff --git a/paddle/pserver/ParameterServerController.cpp b/paddle/pserver/ParameterServerController.cpp new file mode 100644 index 0000000000000..1d11a2e1acbc0 --- /dev/null +++ b/paddle/pserver/ParameterServerController.cpp @@ -0,0 +1,102 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "ParameterServerController.h" + +namespace paddle { + +ParameterServerController::ParameterServerController( + const ParameterServerConfig& config) { + // round robin to load balance RDMA server ENGINE + std::vector devices; + int rdmaCpu = 0; + int onlineCpus = rdma::numCpus(); + int numPorts = config.ports_num() + config.ports_num_for_sparse(); + + if (config.nics().empty()) { + parameterServers_.resize(numPorts); + for (int i = 0; i < numPorts; ++i) { + if (config.rdma_tcp() == "rdma") { + parameterServers_[i].reset( + new ParameterServer2(std::string(), config.port() + i, rdmaCpu++)); + rdmaCpu = rdmaCpu % onlineCpus; + } else { + parameterServers_[i].reset( + new ParameterServer2(std::string(), config.port() + i)); + } + CHECK(parameterServers_[i]->init()) << "Fail to initialize parameter " + "server on port " + << config.port() + i; + } + } else { + str::split(config.nics(), ',', &devices); + parameterServers_.resize(devices.size() * numPorts); + for (int i = 0; i < numPorts; ++i) { + for (size_t j = 0; j < devices.size(); ++j) { + if (config.rdma_tcp() == "rdma") { + parameterServers_[i * devices.size() + j].reset(new ParameterServer2( + getIpAddr(devices[j]), config.port() + i, rdmaCpu++)); + rdmaCpu = rdmaCpu % onlineCpus; + } else { + parameterServers_[i * devices.size() + j].reset( + new ParameterServer2(getIpAddr(devices[j]), config.port() + i)); + } + CHECK(parameterServers_[i * devices.size() + j]->init()) + << "Fail to initialize parameter server with device " << devices[j] + << config.port() + i; + } + } + } +} + +ParameterServerController::~ParameterServerController() { this->wait(); } + +ParameterServerController* ParameterServerController::createFromGflags() { + ParameterServerConfig config; + + config.set_nics(FLAGS_nics); + config.set_rdma_tcp(FLAGS_rdma_tcp); + config.set_port(FLAGS_port); + config.set_ports_num(FLAGS_ports_num); + config.set_ports_num_for_sparse(FLAGS_ports_num_for_sparse); + + return create(config); +} + +ParameterServerController* ParameterServerController::create( + const ParameterServerConfig& config) { + return new ParameterServerController(config); +} + +void ParameterServerController::start() { + LOG(INFO) << "number of parameterServer instances: " + << parameterServers_.size(); + int i = 0; + for (const auto& parameterServer : parameterServers_) { + LOG(INFO) << "Starting parameterServer[" << i << "]"; + parameterServer->start(); + i++; + } +} + +void ParameterServerController::wait() { + int i = 0; + for (const auto& parameterServer : parameterServers_) { + LOG(INFO) << "Waiting parameterServer[" << i << "]"; + parameterServer->join(); + i++; + } +} + +} // namespace paddle diff --git a/paddle/pserver/ParameterServerController.h b/paddle/pserver/ParameterServerController.h new file mode 100644 index 0000000000000..fe9bb0b4d0233 --- /dev/null +++ b/paddle/pserver/ParameterServerController.h @@ -0,0 +1,74 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include "ParameterServer2.h" +#include "ParameterServerConfig.pb.h" +#include "RDMANetwork.h" +#include "paddle/utils/StringUtil.h" + +namespace paddle { + +/** + * @brief ParameterServerController is used for create, init and manage multi + * parameter server instances. The num of the instances is decided by port + * num(the ports number for parameter send) and network devices configured + * by gflags or proto. + */ +class ParameterServerController final { +public: + DISABLE_COPY(ParameterServerController); + + /** + * @brief Ctor, Create a ParameterServerController from ParameterServerConfig. + */ + explicit ParameterServerController(const ParameterServerConfig& config); + + /** + * @brief Dtor. + */ + ~ParameterServerController(); + + /** + * @brief create ParameterServerController from gflags, this is used for + * compatibility with the old usage of configuration by gflags. + */ + static ParameterServerController* createFromGflags(); + + /** + * @brief create ParameterServerController with ParameterServerConfig, remove + * gflags from ParameterServer. Init all ParameterServer2 instances according + * to + * the config. + */ + static ParameterServerController* create(const ParameterServerConfig& config); + + /** + * @brief start all ParameterServer2 instances in this + * ParameterServerController. + */ + void start(); + + /** + * @brief join and wait for all ParameterServer2 instances thread in this + * ParameterServerController. + */ + void wait(); + +private: + std::vector> parameterServers_; +}; + +} // namespace paddle diff --git a/paddle/trainer/TrainerMain.cpp b/paddle/trainer/TrainerMain.cpp index e2fbd21e14afa..c5c1d484e5f85 100644 --- a/paddle/trainer/TrainerMain.cpp +++ b/paddle/trainer/TrainerMain.cpp @@ -12,14 +12,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "paddle/pserver/ParameterServer2.h" -#include "paddle/utils/Common.h" +#include +#include "paddle/pserver/ParameterServerController.h" #include "paddle/utils/PythonUtil.h" -#include "paddle/utils/StringUtil.h" #include "ParamUtil.h" #include "Trainer.h" -#include "paddle/pserver/RDMANetwork.h" DEFINE_bool(start_pserver, false, "Whether to start pserver"); DECLARE_int32(gpu_id); @@ -38,54 +36,11 @@ int main(int argc, char** argv) { initMain(argc, argv); initPython(argc, argv); - std::vector> pservers; - std::vector devices; - + std::unique_ptr parameterServerPtr(nullptr); if (FLAGS_start_pserver) { - // round robin to loadbalance RDMA server ENGINE - int rdmaCpu = 0; - int onlineCpus = rdma::numCpus(); - int numPorts = FLAGS_ports_num + FLAGS_ports_num_for_sparse; - if (FLAGS_nics.empty()) { - pservers.resize(numPorts); - for (int i = 0; i < numPorts; ++i) { - if (FLAGS_rdma_tcp == "rdma") { - pservers[i].reset( - new ParameterServer2(std::string(), FLAGS_port + i, rdmaCpu++)); - rdmaCpu = rdmaCpu % onlineCpus; - } else { - pservers[i].reset( - new ParameterServer2(std::string(), FLAGS_port + i)); - } - - CHECK(pservers[i]->init()) << "Fail to initialize parameter server" - << FLAGS_port + i; - LOG(INFO) << "pserver started : " << FLAGS_port + i; - pservers[i]->start(); - } - } else { - str::split(FLAGS_nics, ',', &devices); - pservers.resize(devices.size() * numPorts); - for (int i = 0; i < numPorts; ++i) { - for (size_t j = 0; j < devices.size(); ++j) { - if (FLAGS_rdma_tcp == "rdma") { - pservers[i * devices.size() + j].reset(new ParameterServer2( - getIpAddr(devices[j]), FLAGS_port + i, rdmaCpu++)); - rdmaCpu = rdmaCpu % onlineCpus; - } else { - pservers[i * devices.size() + j].reset( - new ParameterServer2(getIpAddr(devices[j]), FLAGS_port + i)); - } - - CHECK(pservers[i * devices.size() + j]->init()) - << "Fail to initialize parameter server" << devices[j] - << FLAGS_port + i; - LOG(INFO) << "pserver started : " << devices[j] << ":" - << FLAGS_port + i; - pservers[i * devices.size() + j]->start(); - } - } - } + parameterServerPtr.reset( + paddle::ParameterServerController::createFromGflags()); + parameterServerPtr->start(); } Trainer trainer; auto config = TrainerConfigHelper::createFromFlags(); diff --git a/proto/CMakeLists.txt b/proto/CMakeLists.txt index e854b2b427e55..62d5b9e38b21e 100644 --- a/proto/CMakeLists.txt +++ b/proto/CMakeLists.txt @@ -4,7 +4,8 @@ set(proto_filenames ModelConfig.proto ParameterConfig.proto ParameterService.proto - TrainerConfig.proto) + TrainerConfig.proto + ParameterServerConfig.proto) set(PROTO_GEN) set(PROTO_GEN_PY) diff --git a/proto/ParameterServerConfig.proto b/proto/ParameterServerConfig.proto new file mode 100644 index 0000000000000..3068bba8b10d8 --- /dev/null +++ b/proto/ParameterServerConfig.proto @@ -0,0 +1,50 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +syntax = "proto2"; + +package paddle; + + +/** + * Configuration structure for ParameterClient2. + */ +message ParameterClientConfig { + required int32 trainer_id = 1; +} + +/** + * Configuration structure for ParameterServer2. + */ +message ParameterServerConfig { + // The ports number for parameter send, + // increment based on default port number + required int32 ports_num = 1 [default = 1]; + // The ports number for parameter send, + // increment based on default (port + ports_num + required int32 ports_num_for_sparse = 2 [default = 0]; + // network device name for pservers + required string nics = 3 [default = "xgbe0,xgbe1"]; + required string rdma_tcp = 4 [default = "tcp"]; + // Listening port for pserver + required int32 port = 5 [default = 20134]; + // number of gradient servers + required int32 num_gradient_servers = 6 [default = 1]; + // number of threads for sync op exec + required int32 pserver_num_threads = 7 [default = 1]; + // control config_.async_lagged_grad_discard_ratio() min value + required double async_lagged_ratio_min = 8 [default = 1.0]; + // if async_lagged_grad_discard_ratio is not set in trainer_config.conf + // use it as defalut value + required double async_lagged_ratio_default = 9 [default = 1.5]; +} \ No newline at end of file