Skip to content

Commit

Permalink
feat(query): add discovery nodes api (#16353)
Browse files Browse the repository at this point in the history
* feat(query): add discovery nodes api

* feat(query): add discovery nodes api

* feat(query): add discovery nodes api

* feat(query): add discovery nodes api

* feat(query): add discovery nodes api
  • Loading branch information
zhang2014 authored Aug 30, 2024
1 parent 23375d2 commit 5d3cbdd
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 18 deletions.
3 changes: 3 additions & 0 deletions src/meta/types/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub struct NodeInfo {
pub cpu_nums: u64,
pub version: u32,
pub flight_address: String,
pub discovery_address: String,
pub binary_version: String,
}

Expand All @@ -88,6 +89,7 @@ impl NodeInfo {
secret: String,
cpu_nums: u64,
flight_address: String,
discovery_address: String,
binary_version: String,
) -> NodeInfo {
NodeInfo {
Expand All @@ -96,6 +98,7 @@ impl NodeInfo {
cpu_nums,
version: 0,
flight_address,
discovery_address,
binary_version,
}
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/types/tests/it/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fn test_node_info_ip_port() -> anyhow::Result<()> {
cpu_nums: 1,
version: 1,
flight_address: "1.2.3.4:123".to_string(),
discovery_address: "4.5.6.7:456".to_string(),
binary_version: "v0.8-binary-version".to_string(),
};

Expand Down
5 changes: 5 additions & 0 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,9 @@ pub struct QueryConfig {
#[clap(long, value_name = "VALUE", default_value = "127.0.0.1:9090")]
pub flight_api_address: String,

#[clap(long, value_name = "VALUE", default_value = "")]
pub discovery_address: String,

#[clap(long, value_name = "VALUE", default_value = "127.0.0.1:8080")]
pub admin_api_address: String,

Expand Down Expand Up @@ -1714,6 +1717,7 @@ impl TryInto<InnerQueryConfig> for QueryConfig {
http_handler_port: self.http_handler_port,
http_handler_result_timeout_secs: self.http_handler_result_timeout_secs,
flight_api_address: self.flight_api_address,
discovery_address: self.discovery_address,
flight_sql_handler_host: self.flight_sql_handler_host,
flight_sql_handler_port: self.flight_sql_handler_port,
admin_api_address: self.admin_api_address,
Expand Down Expand Up @@ -1805,6 +1809,7 @@ impl From<InnerQueryConfig> for QueryConfig {
flight_api_address: inner.flight_api_address,
flight_sql_handler_host: inner.flight_sql_handler_host,
flight_sql_handler_port: inner.flight_sql_handler_port,
discovery_address: inner.discovery_address,
admin_api_address: inner.admin_api_address,
metric_api_address: inner.metric_api_address,
http_handler_tls_server_cert: inner.http_handler_tls_server_cert,
Expand Down
2 changes: 2 additions & 0 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub struct QueryConfig {
pub http_handler_port: u16,
pub http_handler_result_timeout_secs: u64,
pub flight_api_address: String,
pub discovery_address: String,
pub flight_sql_handler_host: String,
pub flight_sql_handler_port: u16,
pub admin_api_address: String,
Expand Down Expand Up @@ -267,6 +268,7 @@ impl Default for QueryConfig {
flight_api_address: "127.0.0.1:9090".to_string(),
flight_sql_handler_host: "127.0.0.1".to_string(),
flight_sql_handler_port: 8900,
discovery_address: "".to_string(),
admin_api_address: "127.0.0.1:8080".to_string(),
metric_api_address: "127.0.0.1:7070".to_string(),
api_tls_server_cert: "".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/management/src/cluster/cluster_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use databend_common_meta_types::Operation;

use crate::cluster::ClusterApi;

pub static CLUSTER_API_KEY_PREFIX: &str = "__fd_clusters_v2";
pub static CLUSTER_API_KEY_PREFIX: &str = "__fd_clusters_v3";

pub struct ClusterMgr {
metastore: MetaStore,
Expand Down
7 changes: 4 additions & 3 deletions src/query/management/tests/it/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async fn test_successfully_add_node() -> Result<()> {
let node_info = create_test_node_info();
cluster_api.add_node(node_info.clone()).await?;
let value = kv_api
.get_kv("__fd_clusters_v2/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.get_kv("__fd_clusters_v3/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.await?;

match value {
Expand Down Expand Up @@ -122,7 +122,7 @@ async fn test_successfully_heartbeat_node() -> Result<()> {
cluster_api.add_node(node_info.clone()).await?;

let value = kv_api
.get_kv("__fd_clusters_v2/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.get_kv("__fd_clusters_v3/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.await?;

let meta = value.unwrap().meta.unwrap();
Expand All @@ -133,7 +133,7 @@ async fn test_successfully_heartbeat_node() -> Result<()> {
cluster_api.heartbeat(&node_info, MatchSeq::GE(1)).await?;

let value = kv_api
.get_kv("__fd_clusters_v2/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.get_kv("__fd_clusters_v3/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node")
.await?;

assert!(value.unwrap().meta.unwrap().get_expire_at_ms().unwrap() - now_ms >= 59_000);
Expand All @@ -147,6 +147,7 @@ fn create_test_node_info() -> NodeInfo {
cpu_nums: 0,
version: 0,
flight_address: String::from("ip:port"),
discovery_address: "ip2:port".to_string(),
binary_version: "binary_version".to_string(),
}
}
Expand Down
41 changes: 27 additions & 14 deletions src/query/service/src/clusters/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,21 +336,33 @@ impl ClusterDiscovery {
pub async fn register_to_metastore(self: &Arc<Self>, cfg: &InnerConfig) -> Result<()> {
let cpus = cfg.query.num_cpus;
let mut address = cfg.query.flight_api_address.clone();
let mut discovery_address = match cfg.query.discovery_address.is_empty() {
true => format!(
"{}:{}",
cfg.query.http_handler_host, cfg.query.http_handler_port
),
false => cfg.query.discovery_address.clone(),
};

if let Ok(socket_addr) = SocketAddr::from_str(&address) {
let ip_addr = socket_addr.ip();
if ip_addr.is_loopback() || ip_addr.is_unspecified() {
if let Some(local_addr) = self.api_provider.get_local_addr().await? {
let local_socket_addr = SocketAddr::from_str(&local_addr)?;
let new_addr = format!("{}:{}", local_socket_addr.ip(), socket_addr.port());
warn!(
"Detected loopback or unspecified address as cluster flight endpoint. \
We rewrite it(\"{}\" -> \"{}\") for advertising to other nodes. \
If there are proxies between nodes, you can specify endpoint with --flight-api-address.",
address, new_addr
);

address = new_addr;
for (lookup_ip, typ) in [
(&mut address, "flight-api-address"),
(&mut discovery_address, "discovery-address"),
] {
if let Ok(socket_addr) = SocketAddr::from_str(lookup_ip) {
let ip_addr = socket_addr.ip();
if ip_addr.is_loopback() || ip_addr.is_unspecified() {
if let Some(local_addr) = self.api_provider.get_local_addr().await? {
let local_socket_addr = SocketAddr::from_str(&local_addr)?;
let new_addr = format!("{}:{}", local_socket_addr.ip(), socket_addr.port());
warn!(
"Detected loopback or unspecified address as {} endpoint. \
We rewrite it(\"{}\" -> \"{}\") for advertising to other nodes. \
If there are proxies between nodes, you can specify endpoint with --{}.",
typ, lookup_ip, new_addr, typ
);

*lookup_ip = new_addr;
}
}
}
}
Expand All @@ -360,6 +372,7 @@ impl ClusterDiscovery {
self.local_secret.clone(),
cpus,
address,
discovery_address,
DATABEND_COMMIT_VERSION.to_string(),
);

Expand Down
8 changes: 8 additions & 0 deletions src/query/service/src/servers/http/http_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use poem::EndpointExt;
use poem::IntoEndpoint;
use poem::Route;

use super::v1::discovery_nodes;
use super::v1::logout_handler;
use super::v1::upload_to_stage;
use crate::servers::http::middleware::json_response;
Expand Down Expand Up @@ -136,6 +137,13 @@ impl HttpHandler {
self.kind,
EndpointKind::StartQuery,
)),
)
.at(
"/discovery_nodes",
get(discovery_nodes).with(HTTPSessionMiddleware::create(
self.kind,
EndpointKind::StartQuery,
)),
);

let ep_clickhouse =
Expand Down
52 changes: 52 additions & 0 deletions src/query/service/src/servers/http/v1/discovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_config::GlobalConfig;
use poem::error::InternalServerError;
use poem::error::Result as PoemResult;
use poem::web::Json;
use poem::Request;

use crate::clusters::ClusterDiscovery;
use crate::clusters::ClusterHelper;
use crate::servers::http::v1::HttpQueryContext;

#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct DiscoveryNode {
pub address: String,
}

#[poem::handler]
#[async_backtrace::framed]
pub async fn discovery_nodes(
_: &HttpQueryContext,
_req: &Request,
) -> PoemResult<Json<Vec<DiscoveryNode>>> {
let config = GlobalConfig::instance();
let cluster = ClusterDiscovery::instance()
.discover(&config)
.await
.map_err(InternalServerError)?;

let nodes = cluster.get_nodes();
let mut discovery_nodes = Vec::with_capacity(nodes.len());

for node in nodes {
discovery_nodes.push(DiscoveryNode {
address: node.discovery_address.clone(),
});
}

Ok(Json(discovery_nodes))
}
2 changes: 2 additions & 0 deletions src/query/service/src/servers/http/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod discovery;
mod http_query_handlers;
mod query;
mod session;
mod stage;
pub mod string_block;
mod suggestions;

pub use discovery::discovery_nodes;
pub use http_query_handlers::make_final_uri;
pub use http_query_handlers::make_page_uri;
pub use http_query_handlers::make_state_uri;
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/test_kits/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl ClusterDescriptor {
"".to_string(),
0,
addr.into(),
"".to_string(),
DATABEND_COMMIT_VERSION.to_string(),
)));
ClusterDescriptor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| 'query' | 'default_compression' | 'auto' | '' |
| 'query' | 'default_storage_format' | 'auto' | '' |
| 'query' | 'disable_system_table_load' | 'false' | '' |
| 'query' | 'discovery_address' | '' | '' |
| 'query' | 'enable_meta_data_upgrade_json_to_pb_from_v307' | 'false' | '' |
| 'query' | 'enable_udf_server' | 'false' | '' |
| 'query' | 'flight_api_address' | '127.0.0.1:9090' | '' |
Expand Down

0 comments on commit 5d3cbdd

Please sign in to comment.