diff --git a/src/meta/types/src/cluster.rs b/src/meta/types/src/cluster.rs index 30a3b6a4ce9a..3f2b1c5f3e5d 100644 --- a/src/meta/types/src/cluster.rs +++ b/src/meta/types/src/cluster.rs @@ -79,6 +79,7 @@ pub struct NodeInfo { pub cpu_nums: u64, pub version: u32, pub flight_address: String, + pub discovery_address: String, pub binary_version: String, } @@ -88,6 +89,7 @@ impl NodeInfo { secret: String, cpu_nums: u64, flight_address: String, + discovery_address: String, binary_version: String, ) -> NodeInfo { NodeInfo { @@ -96,6 +98,7 @@ impl NodeInfo { cpu_nums, version: 0, flight_address, + discovery_address, binary_version, } } diff --git a/src/meta/types/tests/it/cluster.rs b/src/meta/types/tests/it/cluster.rs index 879cd6e18d67..b9f13bf4d268 100644 --- a/src/meta/types/tests/it/cluster.rs +++ b/src/meta/types/tests/it/cluster.rs @@ -22,6 +22,7 @@ fn test_node_info_ip_port() -> anyhow::Result<()> { cpu_nums: 1, version: 1, flight_address: "1.2.3.4:123".to_string(), + discovery_address: "4.5.6.7:456".to_string(), binary_version: "v0.8-binary-version".to_string(), }; diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index 4b9fa1d725eb..2fb34057a65b 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -1446,6 +1446,9 @@ pub struct QueryConfig { #[clap(long, value_name = "VALUE", default_value = "127.0.0.1:9090")] pub flight_api_address: String, + #[clap(long, value_name = "VALUE", default_value = "")] + pub discovery_address: String, + #[clap(long, value_name = "VALUE", default_value = "127.0.0.1:8080")] pub admin_api_address: String, @@ -1714,6 +1717,7 @@ impl TryInto for QueryConfig { http_handler_port: self.http_handler_port, http_handler_result_timeout_secs: self.http_handler_result_timeout_secs, flight_api_address: self.flight_api_address, + discovery_address: self.discovery_address, flight_sql_handler_host: self.flight_sql_handler_host, flight_sql_handler_port: self.flight_sql_handler_port, admin_api_address: self.admin_api_address, @@ -1805,6 +1809,7 @@ impl From for QueryConfig { flight_api_address: inner.flight_api_address, flight_sql_handler_host: inner.flight_sql_handler_host, flight_sql_handler_port: inner.flight_sql_handler_port, + discovery_address: inner.discovery_address, admin_api_address: inner.admin_api_address, metric_api_address: inner.metric_api_address, http_handler_tls_server_cert: inner.http_handler_tls_server_cert, diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index b5f06c6aafde..cf6dc8fb847f 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -174,6 +174,7 @@ pub struct QueryConfig { pub http_handler_port: u16, pub http_handler_result_timeout_secs: u64, pub flight_api_address: String, + pub discovery_address: String, pub flight_sql_handler_host: String, pub flight_sql_handler_port: u16, pub admin_api_address: String, @@ -267,6 +268,7 @@ impl Default for QueryConfig { flight_api_address: "127.0.0.1:9090".to_string(), flight_sql_handler_host: "127.0.0.1".to_string(), flight_sql_handler_port: 8900, + discovery_address: "".to_string(), admin_api_address: "127.0.0.1:8080".to_string(), metric_api_address: "127.0.0.1:7070".to_string(), api_tls_server_cert: "".to_string(), diff --git a/src/query/management/src/cluster/cluster_mgr.rs b/src/query/management/src/cluster/cluster_mgr.rs index 3e2f5bb3a790..d9c7952096cd 100644 --- a/src/query/management/src/cluster/cluster_mgr.rs +++ b/src/query/management/src/cluster/cluster_mgr.rs @@ -30,7 +30,7 @@ use databend_common_meta_types::Operation; use crate::cluster::ClusterApi; -pub static CLUSTER_API_KEY_PREFIX: &str = "__fd_clusters_v2"; +pub static CLUSTER_API_KEY_PREFIX: &str = "__fd_clusters_v3"; pub struct ClusterMgr { metastore: MetaStore, diff --git a/src/query/management/tests/it/cluster.rs b/src/query/management/tests/it/cluster.rs index db43eaf45b8c..a7b8ac49712c 100644 --- a/src/query/management/tests/it/cluster.rs +++ b/src/query/management/tests/it/cluster.rs @@ -33,7 +33,7 @@ async fn test_successfully_add_node() -> Result<()> { let node_info = create_test_node_info(); cluster_api.add_node(node_info.clone()).await?; let value = kv_api - .get_kv("__fd_clusters_v2/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node") + .get_kv("__fd_clusters_v3/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node") .await?; match value { @@ -122,7 +122,7 @@ async fn test_successfully_heartbeat_node() -> Result<()> { cluster_api.add_node(node_info.clone()).await?; let value = kv_api - .get_kv("__fd_clusters_v2/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node") + .get_kv("__fd_clusters_v3/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node") .await?; let meta = value.unwrap().meta.unwrap(); @@ -133,7 +133,7 @@ async fn test_successfully_heartbeat_node() -> Result<()> { cluster_api.heartbeat(&node_info, MatchSeq::GE(1)).await?; let value = kv_api - .get_kv("__fd_clusters_v2/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node") + .get_kv("__fd_clusters_v3/test%2dtenant%2did/test%2dcluster%2did/databend_query/test_node") .await?; assert!(value.unwrap().meta.unwrap().get_expire_at_ms().unwrap() - now_ms >= 59_000); @@ -147,6 +147,7 @@ fn create_test_node_info() -> NodeInfo { cpu_nums: 0, version: 0, flight_address: String::from("ip:port"), + discovery_address: "ip2:port".to_string(), binary_version: "binary_version".to_string(), } } diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index 2d2309c61810..ba9349572d12 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -336,21 +336,33 @@ impl ClusterDiscovery { pub async fn register_to_metastore(self: &Arc, cfg: &InnerConfig) -> Result<()> { let cpus = cfg.query.num_cpus; let mut address = cfg.query.flight_api_address.clone(); + let mut discovery_address = match cfg.query.discovery_address.is_empty() { + true => format!( + "{}:{}", + cfg.query.http_handler_host, cfg.query.http_handler_port + ), + false => cfg.query.discovery_address.clone(), + }; - if let Ok(socket_addr) = SocketAddr::from_str(&address) { - let ip_addr = socket_addr.ip(); - if ip_addr.is_loopback() || ip_addr.is_unspecified() { - if let Some(local_addr) = self.api_provider.get_local_addr().await? { - let local_socket_addr = SocketAddr::from_str(&local_addr)?; - let new_addr = format!("{}:{}", local_socket_addr.ip(), socket_addr.port()); - warn!( - "Detected loopback or unspecified address as cluster flight endpoint. \ - We rewrite it(\"{}\" -> \"{}\") for advertising to other nodes. \ - If there are proxies between nodes, you can specify endpoint with --flight-api-address.", - address, new_addr - ); - - address = new_addr; + for (lookup_ip, typ) in [ + (&mut address, "flight-api-address"), + (&mut discovery_address, "discovery-address"), + ] { + if let Ok(socket_addr) = SocketAddr::from_str(lookup_ip) { + let ip_addr = socket_addr.ip(); + if ip_addr.is_loopback() || ip_addr.is_unspecified() { + if let Some(local_addr) = self.api_provider.get_local_addr().await? { + let local_socket_addr = SocketAddr::from_str(&local_addr)?; + let new_addr = format!("{}:{}", local_socket_addr.ip(), socket_addr.port()); + warn!( + "Detected loopback or unspecified address as {} endpoint. \ + We rewrite it(\"{}\" -> \"{}\") for advertising to other nodes. \ + If there are proxies between nodes, you can specify endpoint with --{}.", + typ, lookup_ip, new_addr, typ + ); + + *lookup_ip = new_addr; + } } } } @@ -360,6 +372,7 @@ impl ClusterDiscovery { self.local_secret.clone(), cpus, address, + discovery_address, DATABEND_COMMIT_VERSION.to_string(), ); diff --git a/src/query/service/src/servers/http/http_services.rs b/src/query/service/src/servers/http/http_services.rs index 04dd294e3e57..18e5afb5dcf8 100644 --- a/src/query/service/src/servers/http/http_services.rs +++ b/src/query/service/src/servers/http/http_services.rs @@ -34,6 +34,7 @@ use poem::EndpointExt; use poem::IntoEndpoint; use poem::Route; +use super::v1::discovery_nodes; use super::v1::logout_handler; use super::v1::upload_to_stage; use crate::servers::http::middleware::json_response; @@ -136,6 +137,13 @@ impl HttpHandler { self.kind, EndpointKind::StartQuery, )), + ) + .at( + "/discovery_nodes", + get(discovery_nodes).with(HTTPSessionMiddleware::create( + self.kind, + EndpointKind::StartQuery, + )), ); let ep_clickhouse = diff --git a/src/query/service/src/servers/http/v1/discovery.rs b/src/query/service/src/servers/http/v1/discovery.rs new file mode 100644 index 000000000000..055b60859890 --- /dev/null +++ b/src/query/service/src/servers/http/v1/discovery.rs @@ -0,0 +1,52 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_config::GlobalConfig; +use poem::error::InternalServerError; +use poem::error::Result as PoemResult; +use poem::web::Json; +use poem::Request; + +use crate::clusters::ClusterDiscovery; +use crate::clusters::ClusterHelper; +use crate::servers::http::v1::HttpQueryContext; + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct DiscoveryNode { + pub address: String, +} + +#[poem::handler] +#[async_backtrace::framed] +pub async fn discovery_nodes( + _: &HttpQueryContext, + _req: &Request, +) -> PoemResult>> { + let config = GlobalConfig::instance(); + let cluster = ClusterDiscovery::instance() + .discover(&config) + .await + .map_err(InternalServerError)?; + + let nodes = cluster.get_nodes(); + let mut discovery_nodes = Vec::with_capacity(nodes.len()); + + for node in nodes { + discovery_nodes.push(DiscoveryNode { + address: node.discovery_address.clone(), + }); + } + + Ok(Json(discovery_nodes)) +} diff --git a/src/query/service/src/servers/http/v1/mod.rs b/src/query/service/src/servers/http/v1/mod.rs index 9204b2e2dde7..15296f68b791 100644 --- a/src/query/service/src/servers/http/v1/mod.rs +++ b/src/query/service/src/servers/http/v1/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod discovery; mod http_query_handlers; mod query; mod session; @@ -19,6 +20,7 @@ mod stage; pub mod string_block; mod suggestions; +pub use discovery::discovery_nodes; pub use http_query_handlers::make_final_uri; pub use http_query_handlers::make_page_uri; pub use http_query_handlers::make_state_uri; diff --git a/src/query/service/src/test_kits/cluster.rs b/src/query/service/src/test_kits/cluster.rs index bfe48bbec780..450cbe8c2c30 100644 --- a/src/query/service/src/test_kits/cluster.rs +++ b/src/query/service/src/test_kits/cluster.rs @@ -39,6 +39,7 @@ impl ClusterDescriptor { "".to_string(), 0, addr.into(), + "".to_string(), DATABEND_COMMIT_VERSION.to_string(), ))); ClusterDescriptor { diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index 098187d5ecca..7e20e16c1e52 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -84,6 +84,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'query' | 'default_compression' | 'auto' | '' | | 'query' | 'default_storage_format' | 'auto' | '' | | 'query' | 'disable_system_table_load' | 'false' | '' | +| 'query' | 'discovery_address' | '' | '' | | 'query' | 'enable_meta_data_upgrade_json_to_pb_from_v307' | 'false' | '' | | 'query' | 'enable_udf_server' | 'false' | '' | | 'query' | 'flight_api_address' | '127.0.0.1:9090' | '' |