Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace the etcdctl proc call with etcd client. #1970

Merged
merged 2 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docker/Dockerfile.vineyardd
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ RUN export arch="$PLATFORM" && \
curl -LO https://github.com/etcd-io/etcd/releases/download/v3.5.9/etcd-v3.5.9-linux-$arch.tar.gz && \
tar zxf etcd-v3.5.9-linux-$arch.tar.gz && \
mv /tmp/etcd-v3.5.9-linux-$arch/etcd /usr/bin/etcd && \
mv /tmp/etcd-v3.5.9-linux-$arch/etcdctl /usr/bin/etcdctl && \
curl -LO https://dl.k8s.io/release/v1.24.0/bin/linux/$arch/kubectl && \
chmod +x kubectl && \
mv /tmp/kubectl /usr/bin/kubectl
Expand Down Expand Up @@ -86,7 +85,6 @@ SHELL ["/bin/bash", "-c"]
COPY --from=builder /usr/bin/bash-linux /bin/bash
COPY --from=builder /usr/bin/dumb-init /usr/bin/dumb-init
COPY --from=builder /usr/bin/etcd /usr/bin/etcd
COPY --from=builder /usr/bin/etcdctl /usr/bin/etcdctl
COPY --from=builder /usr/bin/kubectl /usr/bin/kubectl
COPY --from=builder /work/v6d/build/bin/vineyardd /usr/local/bin/vineyardd
RUN ln -s /busybox/env /usr/bin/env
Expand Down
8 changes: 6 additions & 2 deletions k8s/test/e2e/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ load-vineyardd-image:
@docker push localhost:5001/vineyardd:latest
.PHONY: load-vineyardd-image

load-vineyard-python-dev-image:
@docker tag ghcr.io/v6d-io/v6d/vineyard-python-dev:latest localhost:5001/vineyard-python-dev:latest
@docker push localhost:5001/vineyard-python-dev:latest

load-vineyard-operator-image:
@docker tag vineyardcloudnative/vineyard-operator:latest localhost:5001/vineyard-operator:latest
@docker push localhost:5001/vineyard-operator:latest
Expand Down Expand Up @@ -248,13 +252,13 @@ e2e-tests-failover: prepare-e2e-test install-vineyard-cluster

############# etcd failover testing #############################################

e2e-tests-three-etcd-nodes-failover: prepare-e2e-test build-local-cluster load-vineyardd-image
e2e-tests-three-etcd-nodes-failover: prepare-e2e-test build-local-cluster load-vineyardd-image load-vineyard-python-dev-image
@echo "Running three etcd nodes failover e2e test..."
@cd ${ROOT_DIR} && ${GOBIN}/e2e run --config=${E2E_DIR}/etcd-failover/three-etcd-nodes-failover-e2e.yaml
@echo "three etcd nodes failover e2e test passed."
@make delete-local-cluster

e2e-tests-five-etcd-nodes-failover: prepare-e2e-test build-local-cluster load-vineyardd-image
e2e-tests-five-etcd-nodes-failover: prepare-e2e-test build-local-cluster load-vineyardd-image load-vineyard-python-dev-image
@echo "Running five etcd nodes failover e2e test..."
@cd ${ROOT_DIR} && ${GOBIN}/e2e run --config=${E2E_DIR}/etcd-failover/five-etcd-nodes-failover-e2e.yaml
@echo "five etcd nodes failover e2e test passed."
Expand Down
4 changes: 2 additions & 2 deletions k8s/test/e2e/etcd-failover/consumer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ spec:
restartPolicy: Never
containers:
- name: consumer
image: python:3.10
image: localhost:5001/vineyard-python-dev:latest
imagePullPolicy: IfNotPresent
command:
- bash
- -c
- |
pip install vineyard numpy pandas --index-url https://pypi.tuna.tsinghua.edu.cn/simple;
cat << EOF >> consumer.py
import vineyard
client = vineyard.connect(host="vineyardd-svc.default.svc.cluster.local",port=9600)
Expand Down
3 changes: 2 additions & 1 deletion k8s/test/e2e/etcd-failover/five-etcd-nodes-failover-e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ setup:
done
kubectl delete pod "vineyardd-$num1" -n default --force
kubectl delete pod "vineyardd-$num2" -n default --force
kubectl rollout status statefulset/vineyardd
# wait for the instance quit messages to be propagated
sleep 240
sleep 360
kubectl rollout status statefulset/vineyardd
done
- name: install consumer
Expand Down
6 changes: 3 additions & 3 deletions k8s/test/e2e/etcd-failover/producer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ spec:
restartPolicy: Never
containers:
- name: producer
image: python:3.10
image: localhost:5001/vineyard-python-dev:latest
imagePullPolicy: IfNotPresent
command:
- bash
- -c
- |
pip install vineyard numpy pandas --index-url https://pypi.tuna.tsinghua.edu.cn/simple;
cat << EOF >> producer.py
import vineyard
import numpy as np
Expand All @@ -42,4 +42,4 @@ spec:
client.put(data, persist=True, name="test_data");
client.close()
EOF
python producer.py;
python producer.py;
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ setup:
kubectl delete pod vineyardd-$(shuf -i 0-2 -n 1) -n default --force
kubectl rollout status statefulset/vineyardd
# wait for the instance quit messages to be propagated
sleep 60
sleep 120
kubectl rollout status statefulset/vineyardd
done
- name: install consumer
Expand Down
6 changes: 3 additions & 3 deletions src/server/services/etcd_meta_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,8 @@ Status EtcdMetaService::preStart(const bool create_new_instance) {
return etcd_launcher_->LaunchEtcdServer(etcd_, meta_sync_lock_);
}

Status EtcdMetaService::RemoveMember(const std::string member_id) {
auto status = etcd_launcher_->RemoveMember(member_id);
Status EtcdMetaService::RemoveMember(const uint64_t& member_id) {
auto status = etcd_launcher_->RemoveMember(etcd_, member_id);
if (!status.ok()) {
LOG(ERROR) << "Failed to remove member " << member_id
<< " from etcd: " << status.ToString();
Expand All @@ -449,7 +449,7 @@ Status EtcdMetaService::UpdateEndpoint() {
if (etcd_launcher_ == nullptr) {
return Status::Invalid("etcd launcher is not initialized");
}
return etcd_launcher_->UpdateEndpoint();
return etcd_launcher_->UpdateEndpoint(etcd_);
}

} // namespace vineyard
Expand Down
4 changes: 2 additions & 2 deletions src/server/services/etcd_meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,9 @@ class EtcdMetaService : public IMetaService {

void TryReleaseLock(std::string key, callback_t<bool>) override;

Status RemoveMember(std::string member_id);
Status RemoveMember(const uint64_t& member_id);

std::string GetMemberID() { return etcd_launcher_->GetMemberID(); }
const uint64_t GetMemberID() { return etcd_launcher_->GetMemberID(); }

Status UpdateEndpoint();

Expand Down
16 changes: 9 additions & 7 deletions src/server/services/meta_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -555,11 +555,12 @@ void IMetaService::registerToEtcd() {
self->meta_["my_nodename"] = nodename;

self->instances_list_.emplace(rank);
auto etcd_member_id = self->GetEtcdMemberID();
uint64_t etcd_member_id = self->GetEtcdMemberID();
std::string key = "/instances/" + self->server_ptr_->instance_name();
ops.emplace_back(op_t::Put(key + "/hostid", self_host_id));
if (etcd_member_id != "") {
ops.emplace_back(op_t::Put(key + "/member_id", etcd_member_id));
if (etcd_member_id != 0) {
ops.emplace_back(
op_t::Put(key + "/member_id", std::to_string(etcd_member_id)));
}
ops.emplace_back(op_t::Put(key + "/hostname", hostname));
ops.emplace_back(op_t::Put(key + "/nodename", nodename));
Expand Down Expand Up @@ -1218,7 +1219,8 @@ void IMetaService::instanceUpdate(const op_t& op, const bool from_remote) {
}
// reset the etcd client
VINEYARD_CHECK_OK(this->probe());
instance_to_member_id_[instance_id] = member_id;
uint64_t member_id_ = std::stoull(member_id);
instance_to_member_id_[instance_id] = member_id_;
} else if (op.op != op_t::op_type_t::kDel) {
if (from_remote) {
LOG(ERROR) << "Unknown op type: " << op.ToString();
Expand Down Expand Up @@ -1265,20 +1267,20 @@ Status IMetaService::daemonWatchHandler(
return callback_after_update(Status::OK(), rev);
}

Status IMetaService::RemoveEtcdMember(const std::string& member_id) {
Status IMetaService::RemoveEtcdMember(const uint64_t& member_id) {
return callIfEtcdMetaService(
[&member_id](std::shared_ptr<EtcdMetaService> etcd_meta_service) {
return etcd_meta_service->RemoveMember(member_id);
},
Status::OK());
}

std::string IMetaService::GetEtcdMemberID() {
const uint64_t IMetaService::GetEtcdMemberID() {
return callIfEtcdMetaService(
[](std::shared_ptr<EtcdMetaService> etcd_meta_service) {
return etcd_meta_service->GetMemberID();
},
std::string());
(uint64_t) 0);
}

Status IMetaService::UpdateEtcdEndpoint() {
Expand Down
6 changes: 3 additions & 3 deletions src/server/services/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ class IMetaService : public std::enable_shared_from_this<IMetaService> {

virtual void TryReleaseLock(std::string key, callback_t<bool> callback) = 0;

Status RemoveEtcdMember(const std::string& member_id);
Status RemoveEtcdMember(const uint64_t& member_id);

std::string GetEtcdMemberID();
const uint64_t GetEtcdMemberID();

Status UpdateEtcdEndpoint();

Expand Down Expand Up @@ -262,7 +262,7 @@ class IMetaService : public std::enable_shared_from_this<IMetaService> {

std::unique_ptr<asio::steady_timer> heartbeat_timer_;
std::set<InstanceID> instances_list_;
std::map<InstanceID, std::string> instance_to_member_id_;
std::map<InstanceID, uint64_t> instance_to_member_id_;
int64_t target_latest_time_ = 0;
size_t timeout_count_ = 0;

Expand Down
71 changes: 23 additions & 48 deletions src/server/util/etcd_launcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,25 +114,6 @@ Status checkEtcdCmd(const std::string& etcd_cmd) {
return Status::OK();
}

Status checkEtcdctlCommand(const std::string& etcdctl_cmd) {
if (etcdctl_cmd.empty()) {
std::string error_message =
"Failed to find etcdctl binary, please specify its path using the "
"`--etcdctl_cmd` argument and try again.";
LOG(WARNING) << error_message;
return Status::EtcdError("Failed to find etcdctl binary");
}
if (!ghc::filesystem::exists(ghc::filesystem::path(etcdctl_cmd))) {
std::string error_message =
"The etcd binary '" + etcdctl_cmd +
"' does not exist, please specify the correct path using "
"the `--etcdctl_cmd` argument and try again.";
LOG(WARNING) << error_message;
return Status::EtcdError("The etcdctl binary does not exist");
}
return Status::OK();
}

EtcdLauncher::EtcdLauncher(const json& etcd_spec,
const uint32_t& rpc_socket_port,
const bool create_new_instance)
Expand Down Expand Up @@ -166,15 +147,6 @@ Status EtcdLauncher::LaunchEtcdServer(
return Status::OK();
}

// resolve etcdctl binary
std::string etcdctl_cmd = etcd_spec_.value("etcdctl_cmd", "");
if (etcdctl_cmd.empty()) {
etcdctl_cmd = lookupCommand(etcd_spec_, "etcdctl");
}
RETURN_ON_ERROR(checkEtcdctlCommand(etcdctl_cmd));
etcdctl_ = std::make_shared<Etcdctl>(etcdctl_cmd);
LOG(INFO) << "Found etcdctl at: " << etcdctl_cmd;

bool skip_launch_etcd = etcd_spec_.value("skip_launch_etcd", true);
bool etcd_cluster_existing = false;
// create_new_instance_ is a flag to indicate whether we should launch an etcd
Expand Down Expand Up @@ -267,25 +239,25 @@ Status EtcdLauncher::LaunchEtcdServer(
if (etcd_cluster_existing) {
std::string cluster_name;

std::vector<json> all_members = etcdctl_->listMembers(etcd_endpoint);
std::vector<json> members = etcdctl_->listHealthyMembers(all_members);
std::vector<json> all_members = listMembers(etcd_client);
std::vector<json> members = listHealthyMembers(all_members);
if (members.size() == 0) {
return Status::EtcdError("No healthy members found via etcdctl");
}

existing_members = etcdctl_->listMembersName(members);
existing_members = listMembersName(members);
new_member_name = generateMemberName(existing_members);
peer_urls = etcdctl_->listPeerURLs(members);
peer_urls = listPeerURLs(members);
if (peer_urls.size() == 0) {
return Status::EtcdError("No peer urls found via etcdctl");
}
std::vector<std::string> client_urls = etcdctl_->listClientURLs(members);
std::vector<std::string> client_urls = listClientURLs(members);
if (peer_urls.size() == 0) {
return Status::EtcdError("No client urls found via etcdctl");
}

endpoint = boost::algorithm::join(client_urls, ",");
if (!etcdctl_->addMember(new_member_name, peer_endpoint, endpoint).ok()) {
if (!addMember(etcd_client, peer_endpoint).ok()) {
return Status::EtcdError("Failed to add new member to the etcd cluster");
}

Expand Down Expand Up @@ -378,8 +350,7 @@ Status EtcdLauncher::LaunchEtcdServer(
retries < max_probe_retries) {
etcd_client.reset(new etcd::Client(etcd_endpoints_));
if (probeEtcdServer(etcd_client, sync_lock)) {
etcd_member_id_ =
etcdctl_->findMemberID(peer_endpoint, etcd_endpoints_);
etcd_member_id_ = findMemberID(etcd_client, peer_endpoint);
// reset the etcd watcher
break;
}
Expand All @@ -388,25 +359,28 @@ Status EtcdLauncher::LaunchEtcdServer(
}
if (!etcd_proc_) {
return handleEtcdFailure(
peer_endpoint,
etcd_client, peer_endpoint,
"Failed to wait until etcd ready: operation has been interrupted");
} else if (err) {
return handleEtcdFailure(
peer_endpoint, "Failed to wait until etcd ready: " + err.message());
etcd_client, peer_endpoint,
"Failed to wait until etcd ready: " + err.message());
} else if (retries >= max_probe_retries) {
return handleEtcdFailure(
peer_endpoint, "Etcd has been launched but failed to connect to it");
etcd_client, peer_endpoint,
"Etcd has been launched but failed to connect to it");
} else {
return Status::OK();
}
}
}

Status EtcdLauncher::handleEtcdFailure(const std::string& peer_urls,
const std::string& errMessage) {
auto member_id = etcdctl_->findMemberID(peer_urls, etcd_endpoints_);
RETURN_ON_ERROR(etcdctl_->removeMember(etcd_member_id_, etcd_endpoints_));
etcd_member_id_.clear();
Status EtcdLauncher::handleEtcdFailure(
std::unique_ptr<etcd::Client>& etcd_client, const std::string& peer_urls,
const std::string& errMessage) {
auto member_id = findMemberID(etcd_client, peer_urls);
RETURN_ON_ERROR(removeMember(etcd_client, member_id));
etcd_member_id_ = 0;
return Status::IOError(errMessage);
}

Expand Down Expand Up @@ -488,10 +462,11 @@ bool EtcdLauncher::probeEtcdServer(std::unique_ptr<etcd::Client>& etcd_client,
return etcd_client && response.is_ok();
}

Status EtcdLauncher::UpdateEndpoint() {
auto all_members = etcdctl_->listMembers(etcd_endpoints_);
auto members = etcdctl_->listHealthyMembers(all_members);
auto client_urls = etcdctl_->listClientURLs(members);
Status EtcdLauncher::UpdateEndpoint(
std::unique_ptr<etcd::Client>& etcd_client) {
auto all_members = listMembers(etcd_client);
auto members = listHealthyMembers(all_members);
auto client_urls = listClientURLs(members);
etcd_endpoints_ = boost::algorithm::join(client_urls, ",");
return Status::OK();
}
Expand Down
18 changes: 9 additions & 9 deletions src/server/util/etcd_launcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ limitations under the License.
#include "etcd/Client.hpp"

#include "common/util/status.h"
#include "server/util/etcdctl.h"
#include "server/util/etcd_member.h"

namespace vineyard {

Expand All @@ -48,21 +48,23 @@ class EtcdLauncher {
std::string const& key);

private:
Status handleEtcdFailure(const std::string& member_name,
Status handleEtcdFailure(std::unique_ptr<etcd::Client>& etcd_client,
const std::string& member_name,
const std::string& errMessage);

Status parseEndpoint();

std::string generateMemberName(
const std::vector<std::string>& existing_members_name);

std::string GetMemberID() { return etcd_member_id_; }
const uint64_t GetMemberID() { return etcd_member_id_; }

Status RemoveMember(const std::string member_id) {
return etcdctl_->removeMember(member_id, etcd_endpoints_);
Status RemoveMember(std::unique_ptr<etcd::Client>& etcd_client,
const uint64_t& member_id) {
return removeMember(etcd_client, member_id);
}

Status UpdateEndpoint();
Status UpdateEndpoint(std::unique_ptr<etcd::Client>& etcd_client);

Status initHostInfo();

Expand All @@ -75,11 +77,9 @@ class EtcdLauncher {
std::set<std::string> local_hostnames_;
std::set<std::string> local_ip_addresses_;

std::string etcd_member_id_;
uint64_t etcd_member_id_;
std::string etcd_endpoints_;

std::shared_ptr<Etcdctl> etcdctl_;

std::unique_ptr<boost::process::child> etcd_proc_;

friend class EtcdMetaService;
Expand Down
Loading
Loading