Skip to content

Commit

Permalink
fix: max_active_session not work for http handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Dec 5, 2023
1 parent 35f129c commit 106a17f
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/query/service/src/servers/http/clickhouse_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ pub async fn clickhouse_handler_get(
let root = Span::root(full_name!(), SpanContext::random());

async {
let session = ctx.get_session(SessionType::ClickHouseHttpHandler);
let session = ctx.upgrade_session(SessionType::ClickHouseHttpHandler)?;
if let Some(db) = &params.database {
session.set_current_database(db.clone());
}
Expand Down Expand Up @@ -287,7 +287,7 @@ pub async fn clickhouse_handler_post(
sanitize_request_headers(headers),
params,
);
let session = ctx.get_session(SessionType::ClickHouseHttpHandler);
let session = ctx.upgrade_session(SessionType::ClickHouseHttpHandler)?;
if let Some(db) = &params.database {
session.set_current_database(db.clone());
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/servers/http/v1/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub async fn streaming_load(
"new streaming load request:, headers={:?}",
sanitize_request_headers(req.headers()),
);
let session = ctx.get_session(SessionType::HTTPStreamingLoad);
let session = ctx.upgrade_session(SessionType::HTTPStreamingLoad)?;
let context = session
.create_query_context()
.await
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/servers/http/v1/query/http_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ impl HttpQuery {
}
session
} else {
ctx.get_session(SessionType::HTTPQuery)
ctx.upgrade_session(SessionType::HTTPQuery)
.map_err(|err| ErrorCode::Internal(format!("{err}")))?
};

// Read the session variables in the request, and set them to the current session.
Expand Down
15 changes: 12 additions & 3 deletions src/query/service/src/servers/http/v1/query/http_query_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

use std::sync::Arc;

use http::StatusCode;
use poem::FromRequest;
use poem::Request;
use poem::RequestBody;
use poem::Result as PoemResult;

use crate::sessions::Session;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;

pub struct HttpQueryContext {
Expand Down Expand Up @@ -47,9 +49,16 @@ impl HttpQueryContext {
}
}

pub fn get_session(&self, session_type: SessionType) -> Arc<Session> {
self.session.set_type(session_type);
self.session.clone()
pub fn upgrade_session(&self, session_type: SessionType) -> Result<Arc<Session>, poem::Error> {
self.session.set_type(session_type.clone());
if self.session.get_type() == SessionType::Dummy && SessionType::Dummy != session_type {
SessionManager::instance()
.add_session(session_type, self.session.clone())
.map_err(|err| {
poem::Error::from_string(err.message(), StatusCode::TOO_MANY_REQUESTS)
})?;
}
Ok(self.session.clone())
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/servers/http/v1/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub async fn upload_to_stage(
req: &Request,
mut multipart: Multipart,
) -> PoemResult<Json<UploadToStageResponse>> {
let session = ctx.get_session(SessionType::HTTPAPI("UploadToStage".to_string()));
let session = ctx.upgrade_session(SessionType::HTTPAPI("UploadToStage".to_string()))?;
let context = session
.create_query_context()
.await
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/servers/http/v1/suggestions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub async fn list_suggestions(
ctx: &HttpQueryContext,
_req: &Request,
) -> PoemResult<Json<SuggestionsResponse>> {
let session = ctx.get_session(SessionType::HTTPAPI("ListSuggestions".to_string()));
let session = ctx.upgrade_session(SessionType::HTTPAPI("ListSuggestions".to_string()))?;
let context = session
.create_query_context()
.await
Expand Down
29 changes: 16 additions & 13 deletions src/query/service/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ impl SessionManager {
self.create_with_settings(typ, settings)
}

pub fn add_session(&self, typ: SessionType, session: Arc<Session>) -> Result<()> {
let mut sessions = self.active_sessions.write();
if !matches!(typ, SessionType::Dummy | SessionType::FlightRPC) {
self.validate_max_active_sessions(sessions.len(), "active sessions")?;
}

incr_session_connect_numbers();
set_session_active_connections(sessions.len());

if !matches!(typ, SessionType::FlightRPC) {
sessions.insert(session.get_id(), Arc::downgrade(&session));
}
Ok(())
}

pub fn load_config_changes(&self, settings: &Arc<Settings>) -> Result<()> {
let query_config = &GlobalConfig::instance().query;
if let Some(parquet_fast_read_bytes) = query_config.parquet_fast_read_bytes {
Expand Down Expand Up @@ -127,19 +142,7 @@ impl SessionManager {
let session_ctx = SessionContext::try_create(settings, typ.clone())?;
let session = Session::try_create(id.clone(), typ.clone(), session_ctx, mysql_conn_id)?;

{
let mut sessions = self.active_sessions.write();
if !matches!(typ, SessionType::Dummy | SessionType::FlightRPC) {
self.validate_max_active_sessions(sessions.len(), "active sessions")?;
}

incr_session_connect_numbers();
set_session_active_connections(sessions.len());

if !matches!(typ, SessionType::FlightRPC) {
sessions.insert(session.get_id(), Arc::downgrade(&session));
}
}
self.add_session(typ.clone(), session.clone())?;

if let SessionType::MySQL = typ {
let mut mysql_conn_map = self.mysql_conn_map.write();
Expand Down

0 comments on commit 106a17f

Please sign in to comment.