From 6473d8b3e27f9f41a4152d7632bbae548bcc6e96 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 27 Sep 2024 16:46:52 +0800 Subject: [PATCH] feat: http handler add X-DATABEND-VERSION in each response. (#16518) * refactor: split middleware.rs. * small refactors. * not check type of databend_token when in management_mode. * http handler add X-DATABEND-VERSION in each response. * small refactor. * add license. * update tests. * test X-DATABEND-VERSION. * test X-DATABEND-VERSION. --- src/common/base/src/headers.rs | 1 + .../src/servers/http/middleware/metrics.rs | 70 +++++++++++ .../src/servers/http/middleware/mod.rs | 26 ++++ .../servers/http/middleware/panic_handler.rs | 36 ++++++ .../{middleware.rs => middleware/session.rs} | 116 +++++------------- .../servers/http/v1/http_query_handlers.rs | 2 +- src/query/service/src/servers/http/v1/mod.rs | 3 +- .../service/src/servers/http/v1/query/mod.rs | 1 + .../src/servers/http/v1/query/page_manager.rs | 4 +- .../http/v1/{ => query}/string_block.rs | 4 - .../it/servers/http/http_query_handlers.rs | 17 +-- .../tests/it/servers/http/json_block.rs | 2 +- 12 files changed, 179 insertions(+), 103 deletions(-) create mode 100644 src/query/service/src/servers/http/middleware/metrics.rs create mode 100644 src/query/service/src/servers/http/middleware/mod.rs create mode 100644 src/query/service/src/servers/http/middleware/panic_handler.rs rename src/query/service/src/servers/http/{middleware.rs => middleware/session.rs} (85%) rename src/query/service/src/servers/http/v1/{ => query}/string_block.rs (98%) diff --git a/src/common/base/src/headers.rs b/src/common/base/src/headers.rs index db18c883948c..823e28e9e0a1 100644 --- a/src/common/base/src/headers.rs +++ b/src/common/base/src/headers.rs @@ -25,6 +25,7 @@ pub const HEADER_NODE_ID: &str = "X-DATABEND-NODE-ID"; pub const HEADER_QUERY_STATE: &str = "X-DATABEND-QUERY-STATE"; pub const HEADER_QUERY_PAGE_ROWS: &str = "X-DATABEND-QUERY-PAGE-ROWS"; +pub const HEADER_VERSION: &str = "X-DATABEND-VERSION"; pub const HEADER_SIGNATURE: &str = "X-DATABEND-SIGNATURE"; pub const HEADER_AUTH_METHOD: &str = "X-DATABEND-AUTH-METHOD"; diff --git a/src/query/service/src/servers/http/middleware/metrics.rs b/src/query/service/src/servers/http/middleware/metrics.rs new file mode 100644 index 000000000000..60afddc44f06 --- /dev/null +++ b/src/query/service/src/servers/http/middleware/metrics.rs @@ -0,0 +1,70 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Instant; + +use databend_common_metrics::http::metrics_incr_http_request_count; +use databend_common_metrics::http::metrics_incr_http_slow_request_count; +use databend_common_metrics::http::metrics_observe_http_response_duration; +use poem::Endpoint; +use poem::IntoResponse; +use poem::Middleware; +use poem::Request; +use poem::Response; + +pub struct MetricsMiddleware { + api: String, +} + +impl MetricsMiddleware { + pub fn new(api: impl Into) -> Self { + Self { api: api.into() } + } +} + +impl Middleware for MetricsMiddleware { + type Output = MetricsMiddlewareEndpoint; + + fn transform(&self, ep: E) -> Self::Output { + MetricsMiddlewareEndpoint { + ep, + api: self.api.clone(), + } + } +} + +pub struct MetricsMiddlewareEndpoint { + api: String, + ep: E, +} + +impl Endpoint for MetricsMiddlewareEndpoint { + type Output = Response; + + async fn call(&self, req: Request) -> poem::error::Result { + let start_time = Instant::now(); + let method = req.method().to_string(); + let output = self.ep.call(req).await?; + let resp = output.into_response(); + let status_code = resp.status().to_string(); + let duration = start_time.elapsed(); + metrics_incr_http_request_count(method.clone(), self.api.clone(), status_code.clone()); + metrics_observe_http_response_duration(method.clone(), self.api.clone(), duration); + if duration.as_secs_f64() > 60.0 { + // TODO: replace this into histogram + metrics_incr_http_slow_request_count(method, self.api.clone(), status_code); + } + Ok(resp) + } +} diff --git a/src/query/service/src/servers/http/middleware/mod.rs b/src/query/service/src/servers/http/middleware/mod.rs new file mode 100644 index 000000000000..5da4f1c621b4 --- /dev/null +++ b/src/query/service/src/servers/http/middleware/mod.rs @@ -0,0 +1,26 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod metrics; +mod panic_handler; +mod session; + +pub(crate) use metrics::MetricsMiddleware; +pub(crate) use panic_handler::PanicHandler; +pub use session::json_response; +pub(crate) use session::sanitize_request_headers; +pub use session::EndpointKind; +// for it tests only +pub use session::HTTPSessionEndpoint; +pub use session::HTTPSessionMiddleware; diff --git a/src/query/service/src/servers/http/middleware/panic_handler.rs b/src/query/service/src/servers/http/middleware/panic_handler.rs new file mode 100644 index 000000000000..cbff37083ef8 --- /dev/null +++ b/src/query/service/src/servers/http/middleware/panic_handler.rs @@ -0,0 +1,36 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; + +use databend_common_metrics::http::metrics_incr_http_response_panics_count; +use http::StatusCode; + +#[derive(Clone, Debug)] +pub(crate) struct PanicHandler {} + +impl PanicHandler { + pub fn new() -> Self { + Self {} + } +} + +impl poem::middleware::PanicHandler for PanicHandler { + type Response = (StatusCode, &'static str); + + fn get_response(&self, _err: Box) -> Self::Response { + metrics_incr_http_response_panics_count(); + (StatusCode::INTERNAL_SERVER_ERROR, "internal server error") + } +} diff --git a/src/query/service/src/servers/http/middleware.rs b/src/query/service/src/servers/http/middleware/session.rs similarity index 85% rename from src/query/service/src/servers/http/middleware.rs rename to src/query/service/src/servers/http/middleware/session.rs index a0474310b346..3a95456f4f36 100644 --- a/src/query/service/src/servers/http/middleware.rs +++ b/src/query/service/src/servers/http/middleware/session.rs @@ -12,26 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; use std::collections::HashMap; use std::sync::Arc; -use std::time::Instant; use databend_common_base::headers::HEADER_DEDUPLICATE_LABEL; use databend_common_base::headers::HEADER_NODE_ID; use databend_common_base::headers::HEADER_QUERY_ID; use databend_common_base::headers::HEADER_SESSION_ID; use databend_common_base::headers::HEADER_TENANT; +use databend_common_base::headers::HEADER_VERSION; use databend_common_base::runtime::ThreadTracker; use databend_common_config::GlobalConfig; +use databend_common_config::QUERY_SEMVER; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::user_token::TokenType; use databend_common_meta_app::tenant::Tenant; -use databend_common_metrics::http::metrics_incr_http_request_count; -use databend_common_metrics::http::metrics_incr_http_response_panics_count; -use databend_common_metrics::http::metrics_incr_http_slow_request_count; -use databend_common_metrics::http::metrics_observe_http_response_duration; use fastrace::func_name; use headers::authorization::Basic; use headers::authorization::Bearer; @@ -57,17 +53,18 @@ use poem::Request; use poem::Response; use uuid::Uuid; -use super::v1::HttpQueryContext; -use super::v1::SessionClaim; use crate::auth::AuthMgr; use crate::auth::Credential; use crate::servers::http::error::HttpErrorCode; use crate::servers::http::error::JsonErrorOnly; use crate::servers::http::error::QueryError; +use crate::servers::http::v1::HttpQueryContext; +use crate::servers::http::v1::SessionClaim; use crate::servers::HttpHandlerKind; use crate::sessions::SessionManager; use crate::sessions::SessionType; - +const USER_AGENT: &str = "User-Agent"; +const TRACE_PARENT: &str = "traceparent"; #[derive(Debug, Copy, Clone)] pub enum EndpointKind { Login, @@ -81,6 +78,7 @@ pub enum EndpointKind { } impl EndpointKind { + /// avoid the cost of get user from meta pub fn need_user_info(&self) -> bool { !matches!(self, EndpointKind::NoAuth | EndpointKind::PollQuery) } @@ -89,7 +87,11 @@ impl EndpointKind { EndpointKind::Verify => Ok(None), EndpointKind::Refresh => Ok(Some(TokenType::Refresh)), EndpointKind::StartQuery | EndpointKind::PollQuery | EndpointKind::Logout => { - Ok(Some(TokenType::Session)) + if GlobalConfig::instance().query.management_mode { + Ok(None) + } else { + Ok(Some(TokenType::Session)) + } } _ => Err(ErrorCode::AuthenticateFailure(format!( "should not use databend token for {self:?}", @@ -98,9 +100,6 @@ impl EndpointKind { } } -const USER_AGENT: &str = "User-Agent"; -const TRACE_PARENT: &str = "traceparent"; - pub struct HTTPSessionMiddleware { pub kind: HttpHandlerKind, pub endpoint_kind: EndpointKind, @@ -165,14 +164,14 @@ fn get_credential( let client_ip = get_client_ip(req); if std_auth_headers.is_empty() { if matches!(kind, HttpHandlerKind::Clickhouse) { - auth_clickhouse_name_password(req, client_ip) + get_clickhouse_name_password(req, client_ip) } else { Err(ErrorCode::AuthenticateFailure( "No authorization header detected", )) } } else { - auth_by_header(&std_auth_headers, client_ip, endpoint_kind) + get_credential_from_header(&std_auth_headers, client_ip, endpoint_kind) } } @@ -180,7 +179,7 @@ fn get_credential( /// not found, fallback to the remote address, which might be local proxy's ip address. /// please note that when it comes with network policy, we need make sure the incoming /// traffic comes from a trustworthy proxy instance. -pub fn get_client_ip(req: &Request) -> Option { +fn get_client_ip(req: &Request) -> Option { let headers = ["X-Real-IP", "X-Forwarded-For", "CF-Connecting-IP"]; for &header in headers.iter() { if let Some(value) = req.headers().get(header) { @@ -203,7 +202,7 @@ pub fn get_client_ip(req: &Request) -> Option { client_ip } -fn auth_by_header( +fn get_credential_from_header( std_auth_headers: &[&HeaderValue], client_ip: Option, endpoint_kind: EndpointKind, @@ -246,7 +245,7 @@ fn auth_by_header( } } -fn auth_clickhouse_name_password(req: &Request, client_ip: Option) -> Result { +fn get_clickhouse_name_password(req: &Request, client_ip: Option) -> Result { let (user, key) = ( req.headers().get("X-CLICKHOUSE-USER"), req.headers().get("X-CLICKHOUSE-KEY"), @@ -436,71 +435,8 @@ pub fn sanitize_request_headers(headers: &poem::http::HeaderMap) -> HashMap) -> Self { - Self { api: api.into() } - } -} - -impl Middleware for MetricsMiddleware { - type Output = MetricsMiddlewareEndpoint; - - fn transform(&self, ep: E) -> Self::Output { - MetricsMiddlewareEndpoint { - ep, - api: self.api.clone(), - } - } -} - -pub struct MetricsMiddlewareEndpoint { - api: String, - ep: E, -} - -impl Endpoint for MetricsMiddlewareEndpoint { - type Output = Response; - - async fn call(&self, req: Request) -> poem::error::Result { - let start_time = Instant::now(); - let method = req.method().to_string(); - let output = self.ep.call(req).await?; - let resp = output.into_response(); - let status_code = resp.status().to_string(); - let duration = start_time.elapsed(); - metrics_incr_http_request_count(method.clone(), self.api.clone(), status_code.clone()); - metrics_observe_http_response_duration(method.clone(), self.api.clone(), duration); - if duration.as_secs_f64() > 60.0 { - // TODO: replace this into histogram - metrics_incr_http_slow_request_count(method, self.api.clone(), status_code); - } - Ok(resp) - } -} - -#[derive(Clone, Debug)] -pub(crate) struct PanicHandler {} - -impl PanicHandler { - pub fn new() -> Self { - Self {} - } -} - -impl poem::middleware::PanicHandler for PanicHandler { - type Response = (StatusCode, &'static str); - - fn get_response(&self, _err: Box) -> Self::Response { - metrics_incr_http_response_panics_count(); - (StatusCode::INTERNAL_SERVER_ERROR, "internal server error") - } -} pub async fn json_response(next: E, req: Request) -> PoemResult { - let resp = match next.call(req).await { + let mut resp = match next.call(req).await { Ok(resp) => resp.into_response(), Err(err) => ( err.status(), @@ -514,5 +450,21 @@ pub async fn json_response(next: E, req: Request) -> PoemResult>>, } -pub type StringBlockRef = Arc; - fn data_is_null(column: &Column, row_index: usize) -> bool { match column { Column::Null { .. } => true, diff --git a/src/query/service/tests/it/servers/http/http_query_handlers.rs b/src/query/service/tests/it/servers/http/http_query_handlers.rs index 23d611edfbc4..2ebd78d7079d 100644 --- a/src/query/service/tests/it/servers/http/http_query_handlers.rs +++ b/src/query/service/tests/it/servers/http/http_query_handlers.rs @@ -21,15 +21,16 @@ use base64::engine::general_purpose; use base64::prelude::*; use databend_common_base::base::get_free_tcp_port; use databend_common_base::base::tokio; +use databend_common_base::headers::HEADER_VERSION; use databend_common_config::UserAuthConfig; use databend_common_config::UserConfig; +use databend_common_config::QUERY_SEMVER; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::principal::PasswordHashMethod; use databend_common_users::CustomClaims; use databend_common_users::EnsureUser; use databend_query::servers::http::error::QueryError; -use databend_query::servers::http::middleware::get_client_ip; use databend_query::servers::http::middleware::json_response; use databend_query::servers::http::v1::make_page_uri; use databend_query::servers::http::v1::query_route; @@ -184,6 +185,10 @@ impl TestHttpQueryRequest { .await .map_err(|e| ErrorCode::Internal(e.to_string())) .unwrap(); + assert_eq!( + resp.header(HEADER_VERSION), + Some(QUERY_SEMVER.to_string().as_str()) + ); let status_code = resp.status(); let body = resp.into_body().into_string().await.unwrap(); @@ -1671,16 +1676,6 @@ async fn test_txn_timeout() -> Result<()> { Ok(()) } -#[test] -fn test_parse_ip() -> Result<()> { - let req = poem::Request::builder() - .header("X-Forwarded-For", "1.2.3.4") - .finish(); - let ip = get_client_ip(&req); - assert_eq!(ip, Some("1.2.3.4".to_string())); - Ok(()) -} - #[tokio::test(flavor = "current_thread")] async fn test_has_result_set() -> Result<()> { let _fixture = TestFixture::setup().await?; diff --git a/src/query/service/tests/it/servers/http/json_block.rs b/src/query/service/tests/it/servers/http/json_block.rs index 060728c634cb..0ca921a6ed66 100644 --- a/src/query/service/tests/it/servers/http/json_block.rs +++ b/src/query/service/tests/it/servers/http/json_block.rs @@ -23,7 +23,7 @@ use databend_common_expression::types::StringType; use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_io::prelude::FormatSettings; -use databend_query::servers::http::v1::string_block::StringBlock; +use databend_query::servers::http::v1::StringBlock; use pretty_assertions::assert_eq; fn test_data_block(is_nullable: bool) -> Result<()> {