diff --git a/docker/Dockerfile.vineyardd b/docker/Dockerfile.vineyardd index ee584c00..d3b24d6c 100644 --- a/docker/Dockerfile.vineyardd +++ b/docker/Dockerfile.vineyardd @@ -29,7 +29,6 @@ RUN export arch="$PLATFORM" && \ curl -LO https://github.com/etcd-io/etcd/releases/download/v3.5.9/etcd-v3.5.9-linux-$arch.tar.gz && \ tar zxf etcd-v3.5.9-linux-$arch.tar.gz && \ mv /tmp/etcd-v3.5.9-linux-$arch/etcd /usr/bin/etcd && \ - mv /tmp/etcd-v3.5.9-linux-$arch/etcdctl /usr/bin/etcdctl && \ curl -LO https://dl.k8s.io/release/v1.24.0/bin/linux/$arch/kubectl && \ chmod +x kubectl && \ mv /tmp/kubectl /usr/bin/kubectl @@ -86,7 +85,6 @@ SHELL ["/bin/bash", "-c"] COPY --from=builder /usr/bin/bash-linux /bin/bash COPY --from=builder /usr/bin/dumb-init /usr/bin/dumb-init COPY --from=builder /usr/bin/etcd /usr/bin/etcd -COPY --from=builder /usr/bin/etcdctl /usr/bin/etcdctl COPY --from=builder /usr/bin/kubectl /usr/bin/kubectl COPY --from=builder /work/v6d/build/bin/vineyardd /usr/local/bin/vineyardd RUN ln -s /busybox/env /usr/bin/env diff --git a/k8s/test/e2e/Makefile b/k8s/test/e2e/Makefile index 2c09f554..2a289c30 100644 --- a/k8s/test/e2e/Makefile +++ b/k8s/test/e2e/Makefile @@ -29,6 +29,10 @@ load-vineyardd-image: @docker push localhost:5001/vineyardd:latest .PHONY: load-vineyardd-image +load-vineyard-python-dev-image: + @docker tag ghcr.io/v6d-io/v6d/vineyard-python-dev:latest localhost:5001/vineyard-python-dev:latest + @docker push localhost:5001/vineyard-python-dev:latest + load-vineyard-operator-image: @docker tag vineyardcloudnative/vineyard-operator:latest localhost:5001/vineyard-operator:latest @docker push localhost:5001/vineyard-operator:latest @@ -248,13 +252,13 @@ e2e-tests-failover: prepare-e2e-test install-vineyard-cluster ############# etcd failover testing ############################################# -e2e-tests-three-etcd-nodes-failover: prepare-e2e-test build-local-cluster load-vineyardd-image +e2e-tests-three-etcd-nodes-failover: prepare-e2e-test build-local-cluster load-vineyardd-image load-vineyard-python-dev-image @echo "Running three etcd nodes failover e2e test..." @cd ${ROOT_DIR} && ${GOBIN}/e2e run --config=${E2E_DIR}/etcd-failover/three-etcd-nodes-failover-e2e.yaml @echo "three etcd nodes failover e2e test passed." @make delete-local-cluster -e2e-tests-five-etcd-nodes-failover: prepare-e2e-test build-local-cluster load-vineyardd-image +e2e-tests-five-etcd-nodes-failover: prepare-e2e-test build-local-cluster load-vineyardd-image load-vineyard-python-dev-image @echo "Running five etcd nodes failover e2e test..." @cd ${ROOT_DIR} && ${GOBIN}/e2e run --config=${E2E_DIR}/etcd-failover/five-etcd-nodes-failover-e2e.yaml @echo "five etcd nodes failover e2e test passed." diff --git a/k8s/test/e2e/etcd-failover/consumer.yaml b/k8s/test/e2e/etcd-failover/consumer.yaml index 5f93fcb0..61f015b2 100644 --- a/k8s/test/e2e/etcd-failover/consumer.yaml +++ b/k8s/test/e2e/etcd-failover/consumer.yaml @@ -12,12 +12,12 @@ spec: restartPolicy: Never containers: - name: consumer - image: python:3.10 + image: localhost:5001/vineyard-python-dev:latest + imagePullPolicy: IfNotPresent command: - bash - -c - | - pip install vineyard numpy pandas --index-url https://pypi.tuna.tsinghua.edu.cn/simple; cat << EOF >> consumer.py import vineyard client = vineyard.connect(host="vineyardd-svc.default.svc.cluster.local",port=9600) diff --git a/k8s/test/e2e/etcd-failover/five-etcd-nodes-failover-e2e.yaml b/k8s/test/e2e/etcd-failover/five-etcd-nodes-failover-e2e.yaml index 676a8f59..ef4559be 100644 --- a/k8s/test/e2e/etcd-failover/five-etcd-nodes-failover-e2e.yaml +++ b/k8s/test/e2e/etcd-failover/five-etcd-nodes-failover-e2e.yaml @@ -36,8 +36,9 @@ setup: done kubectl delete pod "vineyardd-$num1" -n default --force kubectl delete pod "vineyardd-$num2" -n default --force + kubectl rollout status statefulset/vineyardd # wait for the instance quit messages to be propagated - sleep 240 + sleep 360 kubectl rollout status statefulset/vineyardd done - name: install consumer diff --git a/k8s/test/e2e/etcd-failover/producer.yaml b/k8s/test/e2e/etcd-failover/producer.yaml index 8dee9090..c3ba50b7 100644 --- a/k8s/test/e2e/etcd-failover/producer.yaml +++ b/k8s/test/e2e/etcd-failover/producer.yaml @@ -27,12 +27,12 @@ spec: restartPolicy: Never containers: - name: producer - image: python:3.10 + image: localhost:5001/vineyard-python-dev:latest + imagePullPolicy: IfNotPresent command: - bash - -c - | - pip install vineyard numpy pandas --index-url https://pypi.tuna.tsinghua.edu.cn/simple; cat << EOF >> producer.py import vineyard import numpy as np @@ -42,4 +42,4 @@ spec: client.put(data, persist=True, name="test_data"); client.close() EOF - python producer.py; \ No newline at end of file + python producer.py; diff --git a/k8s/test/e2e/etcd-failover/three-etcd-nodes-failover-e2e.yaml b/k8s/test/e2e/etcd-failover/three-etcd-nodes-failover-e2e.yaml index df67ada7..06ca4b4c 100644 --- a/k8s/test/e2e/etcd-failover/three-etcd-nodes-failover-e2e.yaml +++ b/k8s/test/e2e/etcd-failover/three-etcd-nodes-failover-e2e.yaml @@ -32,7 +32,7 @@ setup: kubectl delete pod vineyardd-$(shuf -i 0-2 -n 1) -n default --force kubectl rollout status statefulset/vineyardd # wait for the instance quit messages to be propagated - sleep 60 + sleep 120 kubectl rollout status statefulset/vineyardd done - name: install consumer diff --git a/src/server/services/etcd_meta_service.cc b/src/server/services/etcd_meta_service.cc index 1c7865cb..463928a6 100644 --- a/src/server/services/etcd_meta_service.cc +++ b/src/server/services/etcd_meta_service.cc @@ -435,8 +435,8 @@ Status EtcdMetaService::preStart(const bool create_new_instance) { return etcd_launcher_->LaunchEtcdServer(etcd_, meta_sync_lock_); } -Status EtcdMetaService::RemoveMember(const std::string member_id) { - auto status = etcd_launcher_->RemoveMember(member_id); +Status EtcdMetaService::RemoveMember(const uint64_t& member_id) { + auto status = etcd_launcher_->RemoveMember(etcd_, member_id); if (!status.ok()) { LOG(ERROR) << "Failed to remove member " << member_id << " from etcd: " << status.ToString(); @@ -449,7 +449,7 @@ Status EtcdMetaService::UpdateEndpoint() { if (etcd_launcher_ == nullptr) { return Status::Invalid("etcd launcher is not initialized"); } - return etcd_launcher_->UpdateEndpoint(); + return etcd_launcher_->UpdateEndpoint(etcd_); } } // namespace vineyard diff --git a/src/server/services/etcd_meta_service.h b/src/server/services/etcd_meta_service.h index 14b9500a..fb317502 100644 --- a/src/server/services/etcd_meta_service.h +++ b/src/server/services/etcd_meta_service.h @@ -131,9 +131,9 @@ class EtcdMetaService : public IMetaService { void TryReleaseLock(std::string key, callback_t) override; - Status RemoveMember(std::string member_id); + Status RemoveMember(const uint64_t& member_id); - std::string GetMemberID() { return etcd_launcher_->GetMemberID(); } + const uint64_t GetMemberID() { return etcd_launcher_->GetMemberID(); } Status UpdateEndpoint(); diff --git a/src/server/services/meta_service.cc b/src/server/services/meta_service.cc index 5c0cbfbe..bf02e76a 100644 --- a/src/server/services/meta_service.cc +++ b/src/server/services/meta_service.cc @@ -555,11 +555,12 @@ void IMetaService::registerToEtcd() { self->meta_["my_nodename"] = nodename; self->instances_list_.emplace(rank); - auto etcd_member_id = self->GetEtcdMemberID(); + uint64_t etcd_member_id = self->GetEtcdMemberID(); std::string key = "/instances/" + self->server_ptr_->instance_name(); ops.emplace_back(op_t::Put(key + "/hostid", self_host_id)); - if (etcd_member_id != "") { - ops.emplace_back(op_t::Put(key + "/member_id", etcd_member_id)); + if (etcd_member_id != 0) { + ops.emplace_back( + op_t::Put(key + "/member_id", std::to_string(etcd_member_id))); } ops.emplace_back(op_t::Put(key + "/hostname", hostname)); ops.emplace_back(op_t::Put(key + "/nodename", nodename)); @@ -1218,7 +1219,8 @@ void IMetaService::instanceUpdate(const op_t& op, const bool from_remote) { } // reset the etcd client VINEYARD_CHECK_OK(this->probe()); - instance_to_member_id_[instance_id] = member_id; + uint64_t member_id_ = std::stoull(member_id); + instance_to_member_id_[instance_id] = member_id_; } else if (op.op != op_t::op_type_t::kDel) { if (from_remote) { LOG(ERROR) << "Unknown op type: " << op.ToString(); @@ -1265,7 +1267,7 @@ Status IMetaService::daemonWatchHandler( return callback_after_update(Status::OK(), rev); } -Status IMetaService::RemoveEtcdMember(const std::string& member_id) { +Status IMetaService::RemoveEtcdMember(const uint64_t& member_id) { return callIfEtcdMetaService( [&member_id](std::shared_ptr etcd_meta_service) { return etcd_meta_service->RemoveMember(member_id); @@ -1273,12 +1275,12 @@ Status IMetaService::RemoveEtcdMember(const std::string& member_id) { Status::OK()); } -std::string IMetaService::GetEtcdMemberID() { +const uint64_t IMetaService::GetEtcdMemberID() { return callIfEtcdMetaService( [](std::shared_ptr etcd_meta_service) { return etcd_meta_service->GetMemberID(); }, - std::string()); + (uint64_t) 0); } Status IMetaService::UpdateEtcdEndpoint() { diff --git a/src/server/services/meta_service.h b/src/server/services/meta_service.h index 7dc21847..e8c8fb5b 100644 --- a/src/server/services/meta_service.h +++ b/src/server/services/meta_service.h @@ -149,9 +149,9 @@ class IMetaService : public std::enable_shared_from_this { virtual void TryReleaseLock(std::string key, callback_t callback) = 0; - Status RemoveEtcdMember(const std::string& member_id); + Status RemoveEtcdMember(const uint64_t& member_id); - std::string GetEtcdMemberID(); + const uint64_t GetEtcdMemberID(); Status UpdateEtcdEndpoint(); @@ -262,7 +262,7 @@ class IMetaService : public std::enable_shared_from_this { std::unique_ptr heartbeat_timer_; std::set instances_list_; - std::map instance_to_member_id_; + std::map instance_to_member_id_; int64_t target_latest_time_ = 0; size_t timeout_count_ = 0; diff --git a/src/server/util/etcd_launcher.cc b/src/server/util/etcd_launcher.cc index 34e450f9..400b22f4 100644 --- a/src/server/util/etcd_launcher.cc +++ b/src/server/util/etcd_launcher.cc @@ -114,25 +114,6 @@ Status checkEtcdCmd(const std::string& etcd_cmd) { return Status::OK(); } -Status checkEtcdctlCommand(const std::string& etcdctl_cmd) { - if (etcdctl_cmd.empty()) { - std::string error_message = - "Failed to find etcdctl binary, please specify its path using the " - "`--etcdctl_cmd` argument and try again."; - LOG(WARNING) << error_message; - return Status::EtcdError("Failed to find etcdctl binary"); - } - if (!ghc::filesystem::exists(ghc::filesystem::path(etcdctl_cmd))) { - std::string error_message = - "The etcd binary '" + etcdctl_cmd + - "' does not exist, please specify the correct path using " - "the `--etcdctl_cmd` argument and try again."; - LOG(WARNING) << error_message; - return Status::EtcdError("The etcdctl binary does not exist"); - } - return Status::OK(); -} - EtcdLauncher::EtcdLauncher(const json& etcd_spec, const uint32_t& rpc_socket_port, const bool create_new_instance) @@ -166,15 +147,6 @@ Status EtcdLauncher::LaunchEtcdServer( return Status::OK(); } - // resolve etcdctl binary - std::string etcdctl_cmd = etcd_spec_.value("etcdctl_cmd", ""); - if (etcdctl_cmd.empty()) { - etcdctl_cmd = lookupCommand(etcd_spec_, "etcdctl"); - } - RETURN_ON_ERROR(checkEtcdctlCommand(etcdctl_cmd)); - etcdctl_ = std::make_shared(etcdctl_cmd); - LOG(INFO) << "Found etcdctl at: " << etcdctl_cmd; - bool skip_launch_etcd = etcd_spec_.value("skip_launch_etcd", true); bool etcd_cluster_existing = false; // create_new_instance_ is a flag to indicate whether we should launch an etcd @@ -267,25 +239,25 @@ Status EtcdLauncher::LaunchEtcdServer( if (etcd_cluster_existing) { std::string cluster_name; - std::vector all_members = etcdctl_->listMembers(etcd_endpoint); - std::vector members = etcdctl_->listHealthyMembers(all_members); + std::vector all_members = listMembers(etcd_client); + std::vector members = listHealthyMembers(all_members); if (members.size() == 0) { return Status::EtcdError("No healthy members found via etcdctl"); } - existing_members = etcdctl_->listMembersName(members); + existing_members = listMembersName(members); new_member_name = generateMemberName(existing_members); - peer_urls = etcdctl_->listPeerURLs(members); + peer_urls = listPeerURLs(members); if (peer_urls.size() == 0) { return Status::EtcdError("No peer urls found via etcdctl"); } - std::vector client_urls = etcdctl_->listClientURLs(members); + std::vector client_urls = listClientURLs(members); if (peer_urls.size() == 0) { return Status::EtcdError("No client urls found via etcdctl"); } endpoint = boost::algorithm::join(client_urls, ","); - if (!etcdctl_->addMember(new_member_name, peer_endpoint, endpoint).ok()) { + if (!addMember(etcd_client, peer_endpoint).ok()) { return Status::EtcdError("Failed to add new member to the etcd cluster"); } @@ -378,8 +350,7 @@ Status EtcdLauncher::LaunchEtcdServer( retries < max_probe_retries) { etcd_client.reset(new etcd::Client(etcd_endpoints_)); if (probeEtcdServer(etcd_client, sync_lock)) { - etcd_member_id_ = - etcdctl_->findMemberID(peer_endpoint, etcd_endpoints_); + etcd_member_id_ = findMemberID(etcd_client, peer_endpoint); // reset the etcd watcher break; } @@ -388,25 +359,28 @@ Status EtcdLauncher::LaunchEtcdServer( } if (!etcd_proc_) { return handleEtcdFailure( - peer_endpoint, + etcd_client, peer_endpoint, "Failed to wait until etcd ready: operation has been interrupted"); } else if (err) { return handleEtcdFailure( - peer_endpoint, "Failed to wait until etcd ready: " + err.message()); + etcd_client, peer_endpoint, + "Failed to wait until etcd ready: " + err.message()); } else if (retries >= max_probe_retries) { return handleEtcdFailure( - peer_endpoint, "Etcd has been launched but failed to connect to it"); + etcd_client, peer_endpoint, + "Etcd has been launched but failed to connect to it"); } else { return Status::OK(); } } } -Status EtcdLauncher::handleEtcdFailure(const std::string& peer_urls, - const std::string& errMessage) { - auto member_id = etcdctl_->findMemberID(peer_urls, etcd_endpoints_); - RETURN_ON_ERROR(etcdctl_->removeMember(etcd_member_id_, etcd_endpoints_)); - etcd_member_id_.clear(); +Status EtcdLauncher::handleEtcdFailure( + std::unique_ptr& etcd_client, const std::string& peer_urls, + const std::string& errMessage) { + auto member_id = findMemberID(etcd_client, peer_urls); + RETURN_ON_ERROR(removeMember(etcd_client, member_id)); + etcd_member_id_ = 0; return Status::IOError(errMessage); } @@ -488,10 +462,11 @@ bool EtcdLauncher::probeEtcdServer(std::unique_ptr& etcd_client, return etcd_client && response.is_ok(); } -Status EtcdLauncher::UpdateEndpoint() { - auto all_members = etcdctl_->listMembers(etcd_endpoints_); - auto members = etcdctl_->listHealthyMembers(all_members); - auto client_urls = etcdctl_->listClientURLs(members); +Status EtcdLauncher::UpdateEndpoint( + std::unique_ptr& etcd_client) { + auto all_members = listMembers(etcd_client); + auto members = listHealthyMembers(all_members); + auto client_urls = listClientURLs(members); etcd_endpoints_ = boost::algorithm::join(client_urls, ","); return Status::OK(); } diff --git a/src/server/util/etcd_launcher.h b/src/server/util/etcd_launcher.h index e94730e8..edf021dd 100644 --- a/src/server/util/etcd_launcher.h +++ b/src/server/util/etcd_launcher.h @@ -29,7 +29,7 @@ limitations under the License. #include "etcd/Client.hpp" #include "common/util/status.h" -#include "server/util/etcdctl.h" +#include "server/util/etcd_member.h" namespace vineyard { @@ -48,7 +48,8 @@ class EtcdLauncher { std::string const& key); private: - Status handleEtcdFailure(const std::string& member_name, + Status handleEtcdFailure(std::unique_ptr& etcd_client, + const std::string& member_name, const std::string& errMessage); Status parseEndpoint(); @@ -56,13 +57,14 @@ class EtcdLauncher { std::string generateMemberName( const std::vector& existing_members_name); - std::string GetMemberID() { return etcd_member_id_; } + const uint64_t GetMemberID() { return etcd_member_id_; } - Status RemoveMember(const std::string member_id) { - return etcdctl_->removeMember(member_id, etcd_endpoints_); + Status RemoveMember(std::unique_ptr& etcd_client, + const uint64_t& member_id) { + return removeMember(etcd_client, member_id); } - Status UpdateEndpoint(); + Status UpdateEndpoint(std::unique_ptr& etcd_client); Status initHostInfo(); @@ -75,11 +77,9 @@ class EtcdLauncher { std::set local_hostnames_; std::set local_ip_addresses_; - std::string etcd_member_id_; + uint64_t etcd_member_id_; std::string etcd_endpoints_; - std::shared_ptr etcdctl_; - std::unique_ptr etcd_proc_; friend class EtcdMetaService; diff --git a/src/server/util/etcd_member.cc b/src/server/util/etcd_member.cc new file mode 100644 index 00000000..46b22f90 --- /dev/null +++ b/src/server/util/etcd_member.cc @@ -0,0 +1,205 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "server/util/etcd_member.h" + +#include +#include +#include +#include "etcd/Response.hpp" + +#if defined(BUILD_VINEYARDD_ETCD) + +#include "boost/process.hpp" // IWYU pragma: keep + +#include "common/util/logging.h" // IWYU pragma: keep +#include "common/util/status.h" + +namespace vineyard { + +json member_to_json(const etcdv3::Member& m) { + json j = json{}; + + if (m.get_id() != 0) { + j["ID"] = m.get_id(); + } + + if (!m.get_name().empty()) { + j["name"] = m.get_name(); + } + + if (!m.get_peerURLs().empty()) { + j["peerURLs"] = m.get_peerURLs(); + } + + if (!m.get_clientURLs().empty()) { + j["clientURLs"] = m.get_clientURLs(); + } + + j["isLearner"] = m.get_learner(); + return j; +} + +Status addMember(std::unique_ptr& etcd_client, + const std::string& peer_endpoint, bool is_learner, + int max_retries) { + int retries = 0; + while (retries < max_retries) { + etcd::Response res = + etcd_client->add_member(peer_endpoint, is_learner).get(); + if (!res.is_ok()) { + LOG(ERROR) << "Failed to add etcd member: " << res.error_message(); + retries += 1; + sleep(1); + continue; + } else { + return Status::OK(); + } + } + return Status::EtcdError("Failed to add etcd member after " + + std::to_string(max_retries) + " retries"); +} + +Status removeMember(std::unique_ptr& etcd_client, + const uint64_t& member_id, int max_retries) { + int retries = 0; + + auto members = listMembers(etcd_client); + bool member_exist = false; + for (const auto& member : members) { + if (member["ID"].get() == member_id) { + member_exist = true; + break; + } + } + if (!member_exist) { + LOG(INFO) << "The member id " << std::to_string(member_id) + << " has been removed"; + return Status::OK(); + } + + if (members.size() == 1) { + LOG(INFO) << "The last member can not be removed"; + return Status::OK(); + } + + while (retries < max_retries) { + etcd::Response res = etcd_client->remove_member(member_id).get(); + if (!res.is_ok()) { + LOG(ERROR) << "Failed to remove etcd member: " << res.error_message(); + retries += 1; + sleep(1); + continue; + } else { + return Status::OK(); + } + } + return Status::EtcdError("Failed to remove etcd member " + + std::to_string(member_id) + " after " + + std::to_string(max_retries) + " retries"); +} + +uint64_t findMemberID(std::unique_ptr& etcd_client, + const std::string& peer_urls) { + uint64_t member_id = 0; + + auto members = listMembers(etcd_client); + for (const auto& member : members) { + auto peers = member["peerURLs"]; + for (const auto& peer : peers) { + if (peer.get() == peer_urls) { + member_id = member["ID"].get(); + break; + } + } + } + LOG(INFO) << "Find member id: " << member_id << " for peer urls " + << peer_urls; + return member_id; +} + +std::vector listMembers(std::unique_ptr& etcd_client) { + std::vector members; + + etcd::Response res = etcd_client->list_member().get(); + if (!res.is_ok()) { + LOG(ERROR) << "Failed to list etcd members: " << res.error_message(); + return members; + } + + for (const auto& member : res.members()) { + json member_json = member_to_json(member); + members.emplace_back(member_json); + } + + return members; +} + +std::vector listHealthyMembers(const std::vector& members) { + std::vector healthy_members; + for (const auto& member : members) { + if (member.find("clientURLs") == member.end()) { + continue; + } + healthy_members.emplace_back(member); + } + return healthy_members; +} + +std::vector listPeerURLs(const std::vector& members) { + std::vector peerURLs; + + for (const auto& member : members) { + if (member.find("peerURLs") == member.end()) { + continue; + } + auto peers = member["peerURLs"]; + for (const auto& peer : peers) { + peerURLs.emplace_back(peer.get()); + } + } + return peerURLs; +} + +std::vector listClientURLs(const std::vector& members) { + std::vector clientURLs; + + for (const auto& member : members) { + if (member.find("clientURLs") == member.end()) { + continue; + } + auto clients = member["clientURLs"]; + for (const auto& client : clients) { + clientURLs.emplace_back(client.get()); + } + } + return clientURLs; +} + +std::vector listMembersName(const std::vector& members) { + std::vector members_name; + for (const auto& member : members) { + if (member.find("name") == member.end()) { + continue; + } + auto name = member["name"]; + members_name.emplace_back(name.get()); + } + return members_name; +} + +} // namespace vineyard + +#endif // BUILD_VINEYARDD_ETCD diff --git a/src/server/util/etcd_member.h b/src/server/util/etcd_member.h new file mode 100644 index 00000000..de01804c --- /dev/null +++ b/src/server/util/etcd_member.h @@ -0,0 +1,51 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef SRC_SERVER_UTIL_ETCD_MEMBER_H_ +#define SRC_SERVER_UTIL_ETCD_MEMBER_H_ + +#include +#include +#include + +#include "etcd/Client.hpp" + +#include "common/util/status.h" + +namespace vineyard { + +Status removeMember(std::unique_ptr& etcd_client, + const uint64_t& member_id, int max_retries = 5); + +uint64_t findMemberID(std::unique_ptr& etcd_client, + const std::string& peer_urls); + +Status addMember(std::unique_ptr& etcd_client, + const std::string& peer_endpoint, bool is_learner = false, + int max_retries = 5); + +std::vector listMembers(std::unique_ptr& etcd_client); + +std::vector listHealthyMembers(const std::vector& members); + +std::vector listPeerURLs(const std::vector& members); + +std::vector listClientURLs(const std::vector& members); + +std::vector listMembersName(const std::vector& members); + +} // namespace vineyard + +#endif // SRC_SERVER_UTIL_ETCD_MEMBER_H_ diff --git a/src/server/util/etcdctl.cc b/src/server/util/etcdctl.cc deleted file mode 100644 index 2900f948..00000000 --- a/src/server/util/etcdctl.cc +++ /dev/null @@ -1,265 +0,0 @@ -/** Copyright 2020-2023 Alibaba Group Holding Limited. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -#include "server/util/etcdctl.h" - -#include -#include -#include - -#if defined(BUILD_VINEYARDD_ETCD) - -#include "boost/process.hpp" // IWYU pragma: keep - -#include "common/util/logging.h" // IWYU pragma: keep -#include "common/util/status.h" - -namespace vineyard { - -Status Etcdctl::addMember(const std::string& member_name, - const std::string& peer_endpoint, - const std::string& etcd_endpoints, int max_retries) { - int retries = 0; - while (retries < max_retries) { - std::error_code ec; - std::unique_ptr etcdctl_proc_ = - std::make_unique( - etcdctl_cmd_, "member", "add", member_name, - "--peer-urls=" + peer_endpoint, "--endpoints=" + etcd_endpoints, - "--command-timeout=30s", "--keepalive-timeout=30s", - "--dial-timeout=20s", boost::process::std_out > stdout, - boost::process::std_err > stderr, ec); - if (!etcdctl_proc_) { - LOG(ERROR) << "Failed to start etcdctl"; - return Status::EtcdError("Failed to start etcdctl"); - } - if (ec) { - LOG(ERROR) << "Failed to add etcd member: " << ec.message(); - return Status::EtcdError("Failed to add etcd member: " + ec.message()); - } - - // wait for the etcdctl to finish the add member operation - etcdctl_proc_->wait(); - int exit_code = etcdctl_proc_->exit_code(); - - if (exit_code != 0) { - LOG(ERROR) << "Failed to add etcd member: exit code: " << exit_code - << ", retries: " << retries << "/" << max_retries; - retries += 1; - sleep(1); - continue; - } else { - return Status::OK(); - } - } - return Status::EtcdError("Failed to add etcd member after " + - std::to_string(max_retries) + " retries"); -} - -Status Etcdctl::removeMember(const std::string& member_id, - const std::string& etcd_endpoints, - int max_retries) { - int retries = 0; - - auto members = listMembers(etcd_endpoints); - bool member_exist = false; - for (const auto& member : members) { - std::stringstream ss; - ss << std::hex << member["ID"].get(); - if (ss.str() == member_id) { - member_exist = true; - break; - } - } - if (!member_exist) { - LOG(INFO) << "The member id " << member_id << " has been removed"; - return Status::OK(); - } - - if (members.size() == 1) { - LOG(INFO) << "The last member can not be removed"; - return Status::OK(); - } - - while (retries < max_retries) { - std::error_code ec; - std::unique_ptr etcdctl_proc_ = - std::make_unique( - etcdctl_cmd_, "member", "remove", member_id, - "--endpoints=" + etcd_endpoints, boost::process::std_out > stdout, - boost::process::std_err > stderr, ec); - if (!etcdctl_proc_) { - LOG(ERROR) << "Failed to start etcdctl"; - return Status::EtcdError("Failed to start etcdctl"); - } - if (ec) { - LOG(ERROR) << "Failed to remove etcd member: " << ec.message(); - return Status::EtcdError("Failed to remove etcd member: " + ec.message()); - } - // wait for the etcdctl to finish the remove member operation - etcdctl_proc_->wait(); - int exit_code = etcdctl_proc_->exit_code(); - - if (exit_code != 0) { - LOG(ERROR) << "Failed to remove etcd member: exit code: " << exit_code - << ", retries: " << retries << "/" << max_retries; - retries += 1; - sleep(1); - continue; - } else { - return Status::OK(); - } - } - return Status::EtcdError("Failed to remove etcd member after " + - std::to_string(max_retries) + " retries"); -} - -std::string Etcdctl::findMemberID(const std::string& peer_urls, - const std::string& etcd_endpoints) { - std::string member_id = ""; - - auto members = listMembers(etcd_endpoints); - for (const auto& member : members) { - auto peers = member["peerURLs"]; - for (const auto& peer : peers) { - if (peer.get() == peer_urls) { - std::stringstream ss; - ss << std::hex << member["ID"].get(); - member_id = ss.str(); - break; - } - } - } - return member_id; -} - -bool Etcdctl::checkMemberStatus(const std::string& client_endpoint) { - std::error_code ec; - - std::unique_ptr etcdctl_proc_ = - std::make_unique( - etcdctl_cmd_, "endpoint", "status", "--endpoints=" + client_endpoint, - "--write-out=json", boost::process::std_out > stdout, - boost::process::std_err > stderr, ec); - - if (!etcdctl_proc_) { - LOG(ERROR) << "Failed to start etcdctl endpoint status"; - return false; - } - if (ec) { - LOG(ERROR) << "Failed to check the status of " << client_endpoint << ": " - << ec.message(); - return false; - } - - return true; -} - -std::vector Etcdctl::listMembers(const std::string& etcd_endpoints) { - std::vector members; - boost::process::ipstream output_stream; - std::error_code ec; - - std::unique_ptr etcdctl_proc_ = - std::make_unique( - etcdctl_cmd_, "member", "list", "--endpoints=" + etcd_endpoints, - "--write-out=json", boost::process::std_out > output_stream, - boost::process::std_err > stderr, ec); - - if (!etcdctl_proc_) { - LOG(ERROR) << "Failed to start etcdctl"; - return members; - } - if (ec) { - LOG(ERROR) << "Failed to list etcd members: " << ec.message(); - return members; - } - - std::stringstream buffer; - std::string line; - while (std::getline(output_stream, line)) { - buffer << line << '\n'; - } - - std::string output = buffer.str(); - auto result = json::parse(output); - for (const auto& member : result["members"]) { - members.emplace_back(member); - } - return members; -} - -std::vector Etcdctl::listHealthyMembers( - const std::vector& members) { - std::vector healthy_members; - for (const auto& member : members) { - if (member.find("clientURLs") == member.end()) { - continue; - } - if (checkMemberStatus(member["clientURLs"][0].get())) { - healthy_members.emplace_back(member); - } - } - return healthy_members; -} - -std::vector Etcdctl::listPeerURLs( - const std::vector& members) { - std::vector peerURLs; - - for (const auto& member : members) { - if (member.find("peerURLs") == member.end()) { - continue; - } - auto peers = member["peerURLs"]; - for (const auto& peer : peers) { - peerURLs.emplace_back(peer.get()); - } - } - return peerURLs; -} - -std::vector Etcdctl::listClientURLs( - const std::vector& members) { - std::vector clientURLs; - - for (const auto& member : members) { - if (member.find("clientURLs") == member.end()) { - continue; - } - auto clients = member["clientURLs"]; - for (const auto& client : clients) { - clientURLs.emplace_back(client.get()); - } - } - return clientURLs; -} - -std::vector Etcdctl::listMembersName( - const std::vector& members) { - std::vector members_name; - for (const auto& member : members) { - if (member.find("name") == member.end()) { - continue; - } - auto name = member["name"]; - members_name.emplace_back(name.get()); - } - return members_name; -} - -} // namespace vineyard - -#endif // BUILD_VINEYARDD_ETCD diff --git a/src/server/util/etcdctl.h b/src/server/util/etcdctl.h deleted file mode 100644 index b512b6fd..00000000 --- a/src/server/util/etcdctl.h +++ /dev/null @@ -1,59 +0,0 @@ -/** Copyright 2020-2023 Alibaba Group Holding Limited. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -#ifndef SRC_SERVER_UTIL_ETCDCTL_H_ -#define SRC_SERVER_UTIL_ETCDCTL_H_ - -#include -#include - -#include "common/util/status.h" - -namespace vineyard { - -class Etcdctl { - public: - explicit Etcdctl(const std::string& etcdctl_cmd) - : etcdctl_cmd_(etcdctl_cmd) {} - - Status removeMember(const std::string& member_id, - const std::string& etcd_endpoints, int max_retries = 5); - - std::string findMemberID(const std::string& peer_urls, - const std::string& etcd_endpoints); - - bool checkMemberStatus(const std::string& client_endpoint); - - Status addMember(const std::string& member_name, - const std::string& peer_endpoint, - const std::string& etcd_endpoints, int max_retries = 5); - - std::vector listMembers(const std::string& etcd_endpoints); - - std::vector listHealthyMembers(const std::vector& members); - - std::vector listPeerURLs(const std::vector& members); - - std::vector listClientURLs(const std::vector& members); - - std::vector listMembersName(const std::vector& members); - - private: - std::string etcdctl_cmd_; -}; - -} // namespace vineyard - -#endif // SRC_SERVER_UTIL_ETCDCTL_H_ diff --git a/src/server/util/spec_resolvers.cc b/src/server/util/spec_resolvers.cc index 5b0ddd5c..024042c8 100644 --- a/src/server/util/spec_resolvers.cc +++ b/src/server/util/spec_resolvers.cc @@ -51,7 +51,6 @@ DEFINE_bool(skip_launch_etcd, true, "Whether to skip launching etcd"); DEFINE_string(etcd_endpoint, "http://127.0.0.1:2379", "endpoint of etcd"); DEFINE_string(etcd_prefix, "vineyard", "metadata path prefix in etcd"); DEFINE_string(etcd_cmd, "", "path of etcd executable"); -DEFINE_string(etcdctl_cmd, "", "path of etcdctl executable"); DEFINE_string(etcd_data_dir, "", "path of etcd's data directory"); #endif @@ -149,7 +148,6 @@ json MetaStoreSpecResolver::resolve() const { spec["etcd_prefix"] = FLAGS_etcd_prefix; spec["etcd_endpoint"] = FLAGS_etcd_endpoint; spec["etcd_cmd"] = FLAGS_etcd_cmd; - spec["etcdctl_cmd"] = FLAGS_etcdctl_cmd; spec["etcd_data_dir"] = FLAGS_etcd_data_dir; #endif diff --git a/thirdparty/etcd-cpp-apiv3 b/thirdparty/etcd-cpp-apiv3 index c911c83c..ea56cee8 160000 --- a/thirdparty/etcd-cpp-apiv3 +++ b/thirdparty/etcd-cpp-apiv3 @@ -1 +1 @@ -Subproject commit c911c83c53ef49084d9b7be6ae3afe7b605f4100 +Subproject commit ea56cee80f441973a0149b57604e7a7874c61b65