Skip to content

Commit

Permalink
feat: http handler add X-DATABEND-VERSION in each response. (#16518)
Browse files Browse the repository at this point in the history
* refactor: split middleware.rs.

* small refactors.

* not check type of databend_token when in management_mode.

* http handler add X-DATABEND-VERSION in each response.

* small refactor.

* add license.

* update tests.

* test X-DATABEND-VERSION.

* test X-DATABEND-VERSION.
  • Loading branch information
youngsofun authored Sep 27, 2024
1 parent 9f9e508 commit 6473d8b
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 103 deletions.
1 change: 1 addition & 0 deletions src/common/base/src/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub const HEADER_NODE_ID: &str = "X-DATABEND-NODE-ID";

pub const HEADER_QUERY_STATE: &str = "X-DATABEND-QUERY-STATE";
pub const HEADER_QUERY_PAGE_ROWS: &str = "X-DATABEND-QUERY-PAGE-ROWS";
pub const HEADER_VERSION: &str = "X-DATABEND-VERSION";

pub const HEADER_SIGNATURE: &str = "X-DATABEND-SIGNATURE";
pub const HEADER_AUTH_METHOD: &str = "X-DATABEND-AUTH-METHOD";
70 changes: 70 additions & 0 deletions src/query/service/src/servers/http/middleware/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Instant;

use databend_common_metrics::http::metrics_incr_http_request_count;
use databend_common_metrics::http::metrics_incr_http_slow_request_count;
use databend_common_metrics::http::metrics_observe_http_response_duration;
use poem::Endpoint;
use poem::IntoResponse;
use poem::Middleware;
use poem::Request;
use poem::Response;

pub struct MetricsMiddleware {
api: String,
}

impl MetricsMiddleware {
pub fn new(api: impl Into<String>) -> Self {
Self { api: api.into() }
}
}

impl<E: Endpoint> Middleware<E> for MetricsMiddleware {
type Output = MetricsMiddlewareEndpoint<E>;

fn transform(&self, ep: E) -> Self::Output {
MetricsMiddlewareEndpoint {
ep,
api: self.api.clone(),
}
}
}

pub struct MetricsMiddlewareEndpoint<E> {
api: String,
ep: E,
}

impl<E: Endpoint> Endpoint for MetricsMiddlewareEndpoint<E> {
type Output = Response;

async fn call(&self, req: Request) -> poem::error::Result<Self::Output> {
let start_time = Instant::now();
let method = req.method().to_string();
let output = self.ep.call(req).await?;
let resp = output.into_response();
let status_code = resp.status().to_string();
let duration = start_time.elapsed();
metrics_incr_http_request_count(method.clone(), self.api.clone(), status_code.clone());
metrics_observe_http_response_duration(method.clone(), self.api.clone(), duration);
if duration.as_secs_f64() > 60.0 {
// TODO: replace this into histogram
metrics_incr_http_slow_request_count(method, self.api.clone(), status_code);
}
Ok(resp)
}
}
26 changes: 26 additions & 0 deletions src/query/service/src/servers/http/middleware/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod metrics;
mod panic_handler;
mod session;

pub(crate) use metrics::MetricsMiddleware;
pub(crate) use panic_handler::PanicHandler;
pub use session::json_response;
pub(crate) use session::sanitize_request_headers;
pub use session::EndpointKind;
// for it tests only
pub use session::HTTPSessionEndpoint;
pub use session::HTTPSessionMiddleware;
36 changes: 36 additions & 0 deletions src/query/service/src/servers/http/middleware/panic_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;

use databend_common_metrics::http::metrics_incr_http_response_panics_count;
use http::StatusCode;

#[derive(Clone, Debug)]
pub(crate) struct PanicHandler {}

impl PanicHandler {
pub fn new() -> Self {
Self {}
}
}

impl poem::middleware::PanicHandler for PanicHandler {
type Response = (StatusCode, &'static str);

fn get_response(&self, _err: Box<dyn Any + Send + 'static>) -> Self::Response {
metrics_incr_http_response_panics_count();
(StatusCode::INTERNAL_SERVER_ERROR, "internal server error")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;

use databend_common_base::headers::HEADER_DEDUPLICATE_LABEL;
use databend_common_base::headers::HEADER_NODE_ID;
use databend_common_base::headers::HEADER_QUERY_ID;
use databend_common_base::headers::HEADER_SESSION_ID;
use databend_common_base::headers::HEADER_TENANT;
use databend_common_base::headers::HEADER_VERSION;
use databend_common_base::runtime::ThreadTracker;
use databend_common_config::GlobalConfig;
use databend_common_config::QUERY_SEMVER;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::principal::user_token::TokenType;
use databend_common_meta_app::tenant::Tenant;
use databend_common_metrics::http::metrics_incr_http_request_count;
use databend_common_metrics::http::metrics_incr_http_response_panics_count;
use databend_common_metrics::http::metrics_incr_http_slow_request_count;
use databend_common_metrics::http::metrics_observe_http_response_duration;
use fastrace::func_name;
use headers::authorization::Basic;
use headers::authorization::Bearer;
Expand All @@ -57,17 +53,18 @@ use poem::Request;
use poem::Response;
use uuid::Uuid;

use super::v1::HttpQueryContext;
use super::v1::SessionClaim;
use crate::auth::AuthMgr;
use crate::auth::Credential;
use crate::servers::http::error::HttpErrorCode;
use crate::servers::http::error::JsonErrorOnly;
use crate::servers::http::error::QueryError;
use crate::servers::http::v1::HttpQueryContext;
use crate::servers::http::v1::SessionClaim;
use crate::servers::HttpHandlerKind;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;

const USER_AGENT: &str = "User-Agent";
const TRACE_PARENT: &str = "traceparent";
#[derive(Debug, Copy, Clone)]
pub enum EndpointKind {
Login,
Expand All @@ -81,6 +78,7 @@ pub enum EndpointKind {
}

impl EndpointKind {
/// avoid the cost of get user from meta
pub fn need_user_info(&self) -> bool {
!matches!(self, EndpointKind::NoAuth | EndpointKind::PollQuery)
}
Expand All @@ -89,7 +87,11 @@ impl EndpointKind {
EndpointKind::Verify => Ok(None),
EndpointKind::Refresh => Ok(Some(TokenType::Refresh)),
EndpointKind::StartQuery | EndpointKind::PollQuery | EndpointKind::Logout => {
Ok(Some(TokenType::Session))
if GlobalConfig::instance().query.management_mode {
Ok(None)
} else {
Ok(Some(TokenType::Session))
}
}
_ => Err(ErrorCode::AuthenticateFailure(format!(
"should not use databend token for {self:?}",
Expand All @@ -98,9 +100,6 @@ impl EndpointKind {
}
}

const USER_AGENT: &str = "User-Agent";
const TRACE_PARENT: &str = "traceparent";

pub struct HTTPSessionMiddleware {
pub kind: HttpHandlerKind,
pub endpoint_kind: EndpointKind,
Expand Down Expand Up @@ -165,22 +164,22 @@ fn get_credential(
let client_ip = get_client_ip(req);
if std_auth_headers.is_empty() {
if matches!(kind, HttpHandlerKind::Clickhouse) {
auth_clickhouse_name_password(req, client_ip)
get_clickhouse_name_password(req, client_ip)
} else {
Err(ErrorCode::AuthenticateFailure(
"No authorization header detected",
))
}
} else {
auth_by_header(&std_auth_headers, client_ip, endpoint_kind)
get_credential_from_header(&std_auth_headers, client_ip, endpoint_kind)
}
}

/// this function tries to get the client IP address from the headers. if the ip in header
/// not found, fallback to the remote address, which might be local proxy's ip address.
/// please note that when it comes with network policy, we need make sure the incoming
/// traffic comes from a trustworthy proxy instance.
pub fn get_client_ip(req: &Request) -> Option<String> {
fn get_client_ip(req: &Request) -> Option<String> {
let headers = ["X-Real-IP", "X-Forwarded-For", "CF-Connecting-IP"];
for &header in headers.iter() {
if let Some(value) = req.headers().get(header) {
Expand All @@ -203,7 +202,7 @@ pub fn get_client_ip(req: &Request) -> Option<String> {
client_ip
}

fn auth_by_header(
fn get_credential_from_header(
std_auth_headers: &[&HeaderValue],
client_ip: Option<String>,
endpoint_kind: EndpointKind,
Expand Down Expand Up @@ -246,7 +245,7 @@ fn auth_by_header(
}
}

fn auth_clickhouse_name_password(req: &Request, client_ip: Option<String>) -> Result<Credential> {
fn get_clickhouse_name_password(req: &Request, client_ip: Option<String>) -> Result<Credential> {
let (user, key) = (
req.headers().get("X-CLICKHOUSE-USER"),
req.headers().get("X-CLICKHOUSE-KEY"),
Expand Down Expand Up @@ -436,71 +435,8 @@ pub fn sanitize_request_headers(headers: &poem::http::HeaderMap) -> HashMap<Stri
.collect()
}

pub struct MetricsMiddleware {
api: String,
}

impl MetricsMiddleware {
pub fn new(api: impl Into<String>) -> Self {
Self { api: api.into() }
}
}

impl<E: Endpoint> Middleware<E> for MetricsMiddleware {
type Output = MetricsMiddlewareEndpoint<E>;

fn transform(&self, ep: E) -> Self::Output {
MetricsMiddlewareEndpoint {
ep,
api: self.api.clone(),
}
}
}

pub struct MetricsMiddlewareEndpoint<E> {
api: String,
ep: E,
}

impl<E: Endpoint> Endpoint for MetricsMiddlewareEndpoint<E> {
type Output = Response;

async fn call(&self, req: Request) -> poem::error::Result<Self::Output> {
let start_time = Instant::now();
let method = req.method().to_string();
let output = self.ep.call(req).await?;
let resp = output.into_response();
let status_code = resp.status().to_string();
let duration = start_time.elapsed();
metrics_incr_http_request_count(method.clone(), self.api.clone(), status_code.clone());
metrics_observe_http_response_duration(method.clone(), self.api.clone(), duration);
if duration.as_secs_f64() > 60.0 {
// TODO: replace this into histogram
metrics_incr_http_slow_request_count(method, self.api.clone(), status_code);
}
Ok(resp)
}
}

#[derive(Clone, Debug)]
pub(crate) struct PanicHandler {}

impl PanicHandler {
pub fn new() -> Self {
Self {}
}
}

impl poem::middleware::PanicHandler for PanicHandler {
type Response = (StatusCode, &'static str);

fn get_response(&self, _err: Box<dyn Any + Send + 'static>) -> Self::Response {
metrics_incr_http_response_panics_count();
(StatusCode::INTERNAL_SERVER_ERROR, "internal server error")
}
}
pub async fn json_response<E: Endpoint>(next: E, req: Request) -> PoemResult<Response> {
let resp = match next.call(req).await {
let mut resp = match next.call(req).await {
Ok(resp) => resp.into_response(),
Err(err) => (
err.status(),
Expand All @@ -514,5 +450,21 @@ pub async fn json_response<E: Endpoint>(next: E, req: Request) -> PoemResult<Res
)
.into_response(),
};
resp.headers_mut()
.insert(HEADER_VERSION, QUERY_SEMVER.to_string().parse().unwrap());
Ok(resp)
}

#[cfg(test)]
mod tests {
use crate::servers::http::middleware::session::get_client_ip;

#[test]
fn test_parse_ip() {
let req = poem::Request::builder()
.header("X-Forwarded-For", "1.2.3.4")
.finish();
let ip = get_client_ip(&req);
assert_eq!(ip, Some("1.2.3.4".to_string()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ use crate::servers::http::error::QueryError;
use crate::servers::http::middleware::EndpointKind;
use crate::servers::http::middleware::HTTPSessionMiddleware;
use crate::servers::http::middleware::MetricsMiddleware;
use crate::servers::http::v1::query::string_block::StringBlock;
use crate::servers::http::v1::query::Progresses;
use crate::servers::http::v1::HttpQueryContext;
use crate::servers::http::v1::HttpQueryManager;
use crate::servers::http::v1::HttpSessionConf;
use crate::servers::http::v1::StringBlock;
use crate::servers::HttpHandlerKind;
use crate::sessions::QueryAffect;

Expand Down
Loading

0 comments on commit 6473d8b

Please sign in to comment.