Skip to content

Commit

Permalink
feat: add zk auth
Browse files Browse the repository at this point in the history
  • Loading branch information
dl239 committed Nov 7, 2023
1 parent 190992d commit 0437f75
Show file tree
Hide file tree
Showing 23 changed files with 91 additions and 29 deletions.
2 changes: 2 additions & 0 deletions release/conf/apiserver.flags.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
--role=apiserver
--zk_cluster=127.0.0.1:2181
--zk_root_path=/openmldb
#--zk_auth_schema=digest
#--zk_cert=user:passwd

--openmldb_log_dir=./logs
--log_level=info
Expand Down
2 changes: 2 additions & 0 deletions release/conf/nameserver.flags.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
--role=nameserver
--zk_cluster=127.0.0.1:2181
--zk_root_path=/openmldb
#--zk_auth_schema=digest
#--zk_cert=user:passwd

--openmldb_log_dir=./logs
--log_level=info
Expand Down
2 changes: 2 additions & 0 deletions release/conf/tablet.flags.template
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

--zk_cluster=127.0.0.1:2181
--zk_root_path=/openmldb
#--zk_auth_schema=digest
#--zk_cert=user:passwd

# thread_pool_size建议和cpu核数一致
--thread_pool_size=24
Expand Down
6 changes: 5 additions & 1 deletion src/cmd/openmldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ DECLARE_string(nameserver);
DECLARE_int32(port);
DECLARE_string(zk_cluster);
DECLARE_string(zk_root_path);
DECLARE_string(zk_auth_schema);
DECLARE_string(zk_cert);
DECLARE_int32(thread_pool_size);
DECLARE_int32(put_concurrency_limit);
DECLARE_int32(get_concurrency_limit);
Expand Down Expand Up @@ -3680,7 +3682,7 @@ void StartNsClient() {
std::shared_ptr<::openmldb::zk::ZkClient> zk_client;
if (!FLAGS_zk_cluster.empty()) {
zk_client = std::make_shared<::openmldb::zk::ZkClient>(FLAGS_zk_cluster, "",
FLAGS_zk_session_timeout, "", FLAGS_zk_root_path);
FLAGS_zk_session_timeout, "", FLAGS_zk_root_path, FLAGS_zk_auth_schema, FLAGS_zk_cert);
if (!zk_client->Init()) {
std::cout << "zk client init failed" << std::endl;
return;
Expand Down Expand Up @@ -3903,6 +3905,8 @@ void StartAPIServer() {
cluster_options.zk_cluster = FLAGS_zk_cluster;
cluster_options.zk_path = FLAGS_zk_root_path;
cluster_options.zk_session_timeout = FLAGS_zk_session_timeout;
cluster_options.zk_auth_schema = FLAGS_zk_auth_schema;
cluster_options.zk_cert = FLAGS_zk_cert;
if (!api_service->Init(cluster_options)) {
PDLOG(WARNING, "Fail to init");
exit(1);
Expand Down
4 changes: 4 additions & 0 deletions src/cmd/sql_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ DEFINE_string(spark_conf, "", "The config file of Spark job");
// cluster mode
DECLARE_string(zk_cluster);
DECLARE_string(zk_root_path);
DECLARE_string(zk_auth_schema);
DECLARE_string(zk_cert);
DECLARE_int32(zk_session_timeout);
DECLARE_uint32(zk_log_level);
DECLARE_string(zk_log_file);
Expand Down Expand Up @@ -267,6 +269,8 @@ bool InitClusterSDK() {
copt.zk_session_timeout = FLAGS_zk_session_timeout;
copt.zk_log_level = FLAGS_zk_log_level;
copt.zk_log_file = FLAGS_zk_log_file;
copt.zk_auth_schema = FLAGS_zk_auth_schema;
copt.zk_cert = FLAGS_zk_cert;

cs = new ::openmldb::sdk::ClusterSDK(copt);
if (!cs->Init()) {
Expand Down
5 changes: 4 additions & 1 deletion src/datacollector/data_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

DECLARE_string(zk_cluster);
DECLARE_string(zk_root_path);
DECLARE_string(zk_auth_schema);
DECLARE_string(zk_cert);
DECLARE_int32(thread_pool_size);
DECLARE_int32(zk_session_timeout);
DECLARE_int32(zk_keep_alive_check_interval);
Expand Down Expand Up @@ -179,7 +181,8 @@ bool DataCollectorImpl::Init(const std::string& endpoint) {
}
bool DataCollectorImpl::Init(const std::string& zk_cluster, const std::string& zk_path, const std::string& endpoint) {
zk_client_ = std::make_shared<zk::ZkClient>(zk_cluster, FLAGS_zk_session_timeout, endpoint, zk_path,
zk_path + kDataCollectorRegisterPath);
zk_path + kDataCollectorRegisterPath,
FLAGS_zk_auth_schema, FLAGS_zk_cert);
if (!zk_client_->Init()) {
LOG(WARNING) << "fail to init zk client";
return false;
Expand Down
2 changes: 2 additions & 0 deletions src/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ DEFINE_uint32(tablet_heartbeat_timeout, 5 * 60 * 1000, "config the heartbeat of
DEFINE_uint32(tablet_offline_check_interval, 1000, "config the check interval of tablet offline. unit is milliseconds");
DEFINE_string(zk_cluster, "", "config the zookeeper cluster eg ip:2181,ip2:2181,ip3:2181");
DEFINE_string(zk_root_path, "/openmldb", "config the root path of zookeeper");
DEFINE_string(zk_auth_schema, "digest", "config the id of authentication schema");
DEFINE_string(zk_cert, "", "config the application credentials");
DEFINE_string(tablet, "", "config the endpoint of tablet");
DEFINE_string(nameserver, "", "config the endpoint of nameserver");
DEFINE_int32(zk_keep_alive_check_interval, 15000, "config the interval of keep alive check. unit is milliseconds");
Expand Down
3 changes: 2 additions & 1 deletion src/nameserver/cluster_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ void ClusterInfo::UpdateNSClient(const std::vector<std::string>& children) {

int ClusterInfo::Init(std::string& msg) {
zk_client_ = std::make_shared<::openmldb::zk::ZkClient>(cluster_add_.zk_endpoints(), FLAGS_zk_session_timeout, "",
cluster_add_.zk_path(), cluster_add_.zk_path() + "/leader");
cluster_add_.zk_path(), cluster_add_.zk_path() + "/leader",
cluster_add_.zk_auth_schema(), cluster_add_.zk_cert());
bool ok = zk_client_->Init();
for (int i = 1; i < 3; i++) {
if (ok) {
Expand Down
2 changes: 0 additions & 2 deletions src/nameserver/name_server_create_remote_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ DECLARE_uint32(name_server_task_max_concurrency);
DECLARE_uint32(system_table_replica_num);
DECLARE_bool(auto_failover);

using ::openmldb::zk::ZkClient;

namespace openmldb {
namespace nameserver {

Expand Down
5 changes: 4 additions & 1 deletion src/nameserver/name_server_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
DECLARE_string(endpoint);
DECLARE_string(zk_cluster);
DECLARE_string(zk_root_path);
DECLARE_string(zk_auth_schema);
DECLARE_string(zk_cert);
DECLARE_string(tablet);
DECLARE_int32(zk_session_timeout);
DECLARE_int32(zk_keep_alive_check_interval);
Expand Down Expand Up @@ -1411,7 +1413,8 @@ bool NameServerImpl::Init(const std::string& zk_cluster, const std::string& zk_p
zone_info_.set_replica_alias("");
zone_info_.set_zone_term(1);
LOG(INFO) << "zone name " << zone_info_.zone_name();
zk_client_ = new ZkClient(zk_cluster, real_endpoint, FLAGS_zk_session_timeout, endpoint, zk_path);
zk_client_ = new ZkClient(zk_cluster, real_endpoint, FLAGS_zk_session_timeout, endpoint, zk_path,
FLAGS_zk_auth_schema, FLAGS_zk_cert);
if (!zk_client_->Init()) {
PDLOG(WARNING, "fail to init zookeeper with cluster[%s]", zk_cluster.c_str());
return false;
Expand Down
5 changes: 4 additions & 1 deletion src/nameserver/name_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ DECLARE_string(ssd_root_path);
DECLARE_string(hdd_root_path);
DECLARE_string(zk_cluster);
DECLARE_string(zk_root_path);
DECLARE_string(zk_auth_schema);
DECLARE_string(zk_cert);
DECLARE_int32(zk_session_timeout);
DECLARE_int32(request_timeout_ms);
DECLARE_int32(zk_keep_alive_check_interval);
Expand Down Expand Up @@ -171,7 +173,8 @@ TEST_P(NameServerImplTest, MakesnapshotTask) {

sleep(5);

ZkClient zk_client(FLAGS_zk_cluster, "", 1000, FLAGS_endpoint, FLAGS_zk_root_path);
ZkClient zk_client(FLAGS_zk_cluster, "", 1000, FLAGS_endpoint, FLAGS_zk_root_path,
FLAGS_zk_auth_schema, FLAGS_zk_cert);
ok = zk_client.Init();
ASSERT_TRUE(ok);
std::string op_index_node = FLAGS_zk_root_path + "/op/op_index";
Expand Down
5 changes: 4 additions & 1 deletion src/nameserver/new_server_env_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ DECLARE_string(endpoint);
DECLARE_string(db_root_path);
DECLARE_string(zk_cluster);
DECLARE_string(zk_root_path);
DECLARE_string(zk_auth_schema);
DECLARE_string(zk_cert);
DECLARE_int32(zk_session_timeout);
DECLARE_int32(request_timeout_ms);
DECLARE_int32(request_timeout_ms);
Expand Down Expand Up @@ -108,7 +110,8 @@ void SetSdkEndpoint(::openmldb::RpcClient<::openmldb::nameserver::NameServer_Stu

void ShowNameServer(std::map<std::string, std::string>* map) {
std::shared_ptr<::openmldb::zk::ZkClient> zk_client;
zk_client = std::make_shared<::openmldb::zk::ZkClient>(FLAGS_zk_cluster, "", 1000, "", FLAGS_zk_root_path);
zk_client = std::make_shared<::openmldb::zk::ZkClient>(FLAGS_zk_cluster, "", 1000, "", FLAGS_zk_root_path,
FLAGS_zk_auth_schema, FLAGS_zk_cert);
if (!zk_client->Init()) {
ASSERT_TRUE(false);
}
Expand Down
2 changes: 2 additions & 0 deletions src/proto/name_server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ message ClusterAddress {
optional string zk_endpoints = 1;
optional string zk_path = 2;
optional string alias = 3;
optional string zk_auth_schema = 4;
optional string zk_cert = 5;
}

message GeneralRequest {}
Expand Down
4 changes: 3 additions & 1 deletion src/sdk/db_sdk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ void ClusterSDK::CheckZk() {
bool ClusterSDK::Init() {
zk_client_ = new ::openmldb::zk::ZkClient(options_.zk_cluster, "",
options_.zk_session_timeout, "",
options_.zk_path);
options_.zk_path,
options_.zk_auth_schema,
options_.zk_cert);

bool ok = zk_client_->Init(options_.zk_log_level, options_.zk_log_file);
if (!ok) {
Expand Down
5 changes: 4 additions & 1 deletion src/sdk/db_sdk.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ struct ClusterOptions {
int32_t zk_session_timeout = 2000;
int32_t zk_log_level = 3;
std::string zk_log_file;
std::string zk_auth_schema;
std::string zk_cert;
std::string to_string() {
std::stringstream ss;
ss << "zk options [cluster:" << zk_cluster << ", path:" << zk_path
<< ", zk_session_timeout:" << zk_session_timeout
<< ", log_level:" << zk_log_level << ", log_file:" << zk_log_file << "]";
<< ", log_level:" << zk_log_level << ", log_file:" << zk_log_file
<< ", zk_auth_schema:" << zk_auth_schema << ", zk_cert:" << zk_cert << "]";
return ss.str();
}
};
Expand Down
2 changes: 2 additions & 0 deletions src/sdk/sql_cluster_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ bool SQLClusterRouter::Init() {
coptions.zk_session_timeout = ops->zk_session_timeout;
coptions.zk_log_level = ops->zk_log_level;
coptions.zk_log_file = ops->zk_log_file;
coptions.zk_auth_schema = ops->zk_auth_schema;
coptions.zk_cert = ops->zk_cert;
cluster_sdk_ = new ClusterSDK(coptions);
// TODO(hw): no detail error info
bool ok = cluster_sdk_->Init();
Expand Down
2 changes: 2 additions & 0 deletions src/sdk/sql_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ struct SQLRouterOptions : BasicRouterOptions {
std::string spark_conf_path;
uint32_t zk_log_level = 3; // PY/JAVA SDK default info log
std::string zk_log_file;
std::string zk_auth_schema;
std::string zk_cert;
};

struct StandaloneOptions : BasicRouterOptions {
Expand Down
5 changes: 4 additions & 1 deletion src/tablet/tablet_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ DECLARE_string(zk_cluster);
DECLARE_string(zk_root_path);
DECLARE_int32(zk_session_timeout);
DECLARE_int32(zk_keep_alive_check_interval);
DECLARE_string(zk_auth_schema);
DECLARE_string(zk_cert);

DECLARE_int32(binlog_sync_to_disk_interval);
DECLARE_int32(binlog_delete_interval);
Expand Down Expand Up @@ -194,7 +196,8 @@ bool TabletImpl::Init(const std::string& zk_cluster, const std::string& zk_path,
deploy_collector_ = std::make_unique<::openmldb::statistics::DeployQueryTimeCollector>();

if (!zk_cluster.empty()) {
zk_client_ = new ZkClient(zk_cluster, real_endpoint, FLAGS_zk_session_timeout, endpoint, zk_path);
zk_client_ = new ZkClient(zk_cluster, real_endpoint, FLAGS_zk_session_timeout, endpoint, zk_path,
FLAGS_zk_auth_schema, FLAGS_zk_cert);
bool ok = zk_client_->Init();
if (!ok) {
PDLOG(ERROR, "fail to init zookeeper with cluster %s", zk_cluster.c_str());
Expand Down
2 changes: 1 addition & 1 deletion src/tablet/tablet_impl_keep_alive_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ TEST_F(TabletImplTest, KeepAlive) {
FLAGS_endpoint = "127.0.0.1:9527";
FLAGS_zk_cluster = "127.0.0.1:6181";
FLAGS_zk_root_path = "/rtidb2";
ZkClient zk_client(FLAGS_zk_cluster, "", 1000, "test1", FLAGS_zk_root_path);
ZkClient zk_client(FLAGS_zk_cluster, "", 1000, "test1", FLAGS_zk_root_path, "", "");
bool ok = zk_client.Init();
ASSERT_TRUE(ok);
ok = zk_client.Mkdir("/rtidb2/nodes");
Expand Down
4 changes: 2 additions & 2 deletions src/zk/dist_lock_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void OnLockedCallback() { call_invoked = true; }
void OnLostCallback() {}

TEST_F(DistLockTest, Lock) {
ZkClient client("127.0.0.1:6181", "", 10000, "127.0.0.1:9527", "/openmldb_lock");
ZkClient client("127.0.0.1:6181", "", 10000, "127.0.0.1:9527", "/openmldb_lock", "", "");
bool ok = client.Init();
ASSERT_TRUE(ok);
DistLock lock("/openmldb_lock/nameserver_lock", &client, boost::bind(&OnLockedCallback),
Expand All @@ -59,7 +59,7 @@ TEST_F(DistLockTest, Lock) {
lock.CurrentLockValue(current_lock);
ASSERT_EQ("endpoint1", current_lock);
call_invoked = false;
ZkClient client2("127.0.0.1:6181", "", 10000, "127.0.0.1:9527", "/openmldb_lock");
ZkClient client2("127.0.0.1:6181", "", 10000, "127.0.0.1:9527", "/openmldb_lock", "", "");
ok = client2.Init();
if (!ok) {
lock.Stop();
Expand Down
28 changes: 22 additions & 6 deletions src/zk/zk_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,15 @@ void ItemWatcher(zhandle_t* zh, int type, int state, const char* path, void* wat
}

ZkClient::ZkClient(const std::string& hosts, const std::string& real_endpoint, int32_t session_timeout,
const std::string& endpoint, const std::string& zk_root_path)
const std::string& endpoint, const std::string& zk_root_path,
const std::string& auth_schema, const std::string& cert)
: hosts_(hosts),
session_timeout_(session_timeout),
endpoint_(endpoint),
zk_root_path_(zk_root_path),
auth_schema_(auth_schema),
cert_(cert),
acl_vector_(ZOO_OPEN_ACL_UNSAFE),
real_endpoint_(real_endpoint),
nodes_root_path_(zk_root_path_ + "/nodes"),
nodes_watch_callbacks_(),
Expand All @@ -88,11 +92,15 @@ ZkClient::ZkClient(const std::string& hosts, const std::string& real_endpoint, i
}

ZkClient::ZkClient(const std::string& hosts, int32_t session_timeout, const std::string& endpoint,
const std::string& zk_root_path, const std::string& zone_path)
const std::string& zk_root_path, const std::string& zone_path,
const std::string& auth_schema, const std::string& cert)
: hosts_(hosts),
session_timeout_(session_timeout),
endpoint_(endpoint),
zk_root_path_(zk_root_path),
auth_schema_(auth_schema),
cert_(cert),
acl_vector_(ZOO_OPEN_ACL_UNSAFE),
nodes_root_path_(zone_path),
nodes_watch_callbacks_(),
mu_(),
Expand Down Expand Up @@ -133,6 +141,14 @@ bool ZkClient::Init(int log_level, const std::string& log_file) {
PDLOG(WARNING, "fail to init zk handler with hosts %s, session_timeout %d", hosts_.c_str(), session_timeout_);
return false;
}
if (!cert_.empty()) {
if (zoo_add_auth(zk_, auth_schema_.c_str(), cert_.data(), cert_.length(), NULL, NULL) != ZOK) {
PDLOG(WARNING, "auth failed. schema: %s cert: %s", auth_schema_.c_str(), cert_.c_str());
return false;
}
acl_vector_ = acl_vector_;
PDLOG(INFO, "auth ok. schema: %s cert: %s", auth_schema_.c_str(), cert_.c_str());
}
return true;
}

Expand Down Expand Up @@ -173,7 +189,7 @@ bool ZkClient::Register(bool startup_flag) {
if (startup_flag) {
value = "startup_" + endpoint_;
}
int ret = zoo_create(zk_, node.c_str(), value.c_str(), value.size(), &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
int ret = zoo_create(zk_, node.c_str(), value.c_str(), value.size(), &acl_vector_, ZOO_EPHEMERAL, NULL, 0);
if (ret == ZOK) {
PDLOG(INFO, "register self with endpoint %s ok", endpoint_.c_str());
registed_.store(true, std::memory_order_relaxed);
Expand Down Expand Up @@ -231,7 +247,7 @@ bool ZkClient::RegisterName() {
}
PDLOG(WARNING, "set node with name %s value %s failed", sname.c_str(), value.c_str());
} else {
int ret = zoo_create(zk_, name.c_str(), value.c_str(), value.size(), &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
int ret = zoo_create(zk_, name.c_str(), value.c_str(), value.size(), &acl_vector_, 0, NULL, 0);
if (ret == ZOK) {
PDLOG(INFO, "register with name %s value %s ok", sname.c_str(), value.c_str());
return true;
Expand Down Expand Up @@ -281,7 +297,7 @@ bool ZkClient::CreateNode(const std::string& node, const std::string& value, int
uint32_t size = node.size() + 11;
char path_buffer[size]; // NOLINT
int ret =
zoo_create(zk_, node.c_str(), value.c_str(), value.size(), &ZOO_OPEN_ACL_UNSAFE, flags, path_buffer, size);
zoo_create(zk_, node.c_str(), value.c_str(), value.size(), &acl_vector_, flags, path_buffer, size);
if (ret == ZOK) {
assigned_path_name.assign(path_buffer, size - 1);
PDLOG(INFO, "create node %s ok and real node name %s", node.c_str(), assigned_path_name.c_str());
Expand Down Expand Up @@ -597,7 +613,7 @@ bool ZkClient::MkdirNoLock(const std::string& path) {
}
full_path += *it;
index++;
int ret = zoo_create(zk_, full_path.c_str(), "", 0, &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
int ret = zoo_create(zk_, full_path.c_str(), "", 0, &acl_vector_, 0, NULL, 0);
if (ret == ZNODEEXISTS || ret == ZOK) {
continue;
}
Expand Down
9 changes: 7 additions & 2 deletions src/zk/zk_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ class ZkClient {
// session_timeout, the session timeout
// endpoint, the client endpoint
ZkClient(const std::string& hosts, const std::string& real_endpoint, int32_t session_timeout,
const std::string& endpoint, const std::string& zk_root_path);
const std::string& endpoint, const std::string& zk_root_path,
const std::string& auth_schema, const std::string& cert);

ZkClient(const std::string& hosts, int32_t session_timeout, const std::string& endpoint,
const std::string& zk_root_path, const std::string& zone_path);
const std::string& zk_root_path, const std::string& zone_path,
const std::string& auth_schema, const std::string& cert);
~ZkClient();

// init zookeeper connections
Expand Down Expand Up @@ -145,6 +147,9 @@ class ZkClient {
int32_t session_timeout_;
std::string endpoint_;
std::string zk_root_path_;
std::string auth_schema_;
std::string cert_;
struct ACL_vector acl_vector_;
std::string real_endpoint_;

FILE* zk_log_stream_file_ = NULL;
Expand Down
Loading

0 comments on commit 0437f75

Please sign in to comment.