diff --git a/src/query/service/src/servers/http/clickhouse_handler.rs b/src/query/service/src/servers/http/clickhouse_handler.rs index fc82175a7812..e8b86c3c69d1 100644 --- a/src/query/service/src/servers/http/clickhouse_handler.rs +++ b/src/query/service/src/servers/http/clickhouse_handler.rs @@ -233,7 +233,7 @@ pub async fn clickhouse_handler_get( let root = Span::root(full_name!(), SpanContext::random()); async { - let session = ctx.get_session(SessionType::ClickHouseHttpHandler); + let session = ctx.upgrade_session(SessionType::ClickHouseHttpHandler)?; if let Some(db) = ¶ms.database { session.set_current_database(db.clone()); } @@ -287,7 +287,7 @@ pub async fn clickhouse_handler_post( sanitize_request_headers(headers), params, ); - let session = ctx.get_session(SessionType::ClickHouseHttpHandler); + let session = ctx.upgrade_session(SessionType::ClickHouseHttpHandler)?; if let Some(db) = ¶ms.database { session.set_current_database(db.clone()); } diff --git a/src/query/service/src/servers/http/v1/load.rs b/src/query/service/src/servers/http/v1/load.rs index ff69592204a7..1e37cf11cae4 100644 --- a/src/query/service/src/servers/http/v1/load.rs +++ b/src/query/service/src/servers/http/v1/load.rs @@ -97,7 +97,7 @@ pub async fn streaming_load( "new streaming load request:, headers={:?}", sanitize_request_headers(req.headers()), ); - let session = ctx.get_session(SessionType::HTTPStreamingLoad); + let session = ctx.upgrade_session(SessionType::HTTPStreamingLoad)?; let context = session .create_query_context() .await diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 315a12e0acbb..858dcdd8c8bd 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -235,7 +235,8 @@ impl HttpQuery { } session } else { - ctx.get_session(SessionType::HTTPQuery) + ctx.upgrade_session(SessionType::HTTPQuery) + .map_err(|err| ErrorCode::Internal(format!("{err}")))? }; // Read the session variables in the request, and set them to the current session. diff --git a/src/query/service/src/servers/http/v1/query/http_query_context.rs b/src/query/service/src/servers/http/v1/query/http_query_context.rs index 7e932078b584..f02bd46e6370 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_context.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_context.rs @@ -14,12 +14,14 @@ use std::sync::Arc; +use http::StatusCode; use poem::FromRequest; use poem::Request; use poem::RequestBody; use poem::Result as PoemResult; use crate::sessions::Session; +use crate::sessions::SessionManager; use crate::sessions::SessionType; pub struct HttpQueryContext { @@ -47,9 +49,16 @@ impl HttpQueryContext { } } - pub fn get_session(&self, session_type: SessionType) -> Arc { - self.session.set_type(session_type); - self.session.clone() + pub fn upgrade_session(&self, session_type: SessionType) -> Result, poem::Error> { + self.session.set_type(session_type.clone()); + if self.session.get_type() == SessionType::Dummy && SessionType::Dummy != session_type { + SessionManager::instance() + .add_session(session_type, self.session.clone()) + .map_err(|err| { + poem::Error::from_string(err.message(), StatusCode::TOO_MANY_REQUESTS) + })?; + } + Ok(self.session.clone()) } } diff --git a/src/query/service/src/servers/http/v1/stage.rs b/src/query/service/src/servers/http/v1/stage.rs index 473aba0c195f..38772102a20b 100644 --- a/src/query/service/src/servers/http/v1/stage.rs +++ b/src/query/service/src/servers/http/v1/stage.rs @@ -85,7 +85,7 @@ pub async fn upload_to_stage( req: &Request, mut multipart: Multipart, ) -> PoemResult> { - let session = ctx.get_session(SessionType::HTTPAPI("UploadToStage".to_string())); + let session = ctx.upgrade_session(SessionType::HTTPAPI("UploadToStage".to_string()))?; let context = session .create_query_context() .await diff --git a/src/query/service/src/servers/http/v1/suggestions.rs b/src/query/service/src/servers/http/v1/suggestions.rs index 1b8eb10d149e..551b51d42cda 100644 --- a/src/query/service/src/servers/http/v1/suggestions.rs +++ b/src/query/service/src/servers/http/v1/suggestions.rs @@ -36,7 +36,7 @@ pub async fn list_suggestions( ctx: &HttpQueryContext, _req: &Request, ) -> PoemResult> { - let session = ctx.get_session(SessionType::HTTPAPI("ListSuggestions".to_string())); + let session = ctx.upgrade_session(SessionType::HTTPAPI("ListSuggestions".to_string()))?; let context = session .create_query_context() .await diff --git a/src/query/service/src/sessions/session_mgr.rs b/src/query/service/src/sessions/session_mgr.rs index 16e48010095c..3ffe50acc520 100644 --- a/src/query/service/src/sessions/session_mgr.rs +++ b/src/query/service/src/sessions/session_mgr.rs @@ -95,6 +95,21 @@ impl SessionManager { self.create_with_settings(typ, settings) } + pub fn add_session(&self, typ: SessionType, session: Arc) -> Result<()> { + let mut sessions = self.active_sessions.write(); + if !matches!(typ, SessionType::Dummy | SessionType::FlightRPC) { + self.validate_max_active_sessions(sessions.len(), "active sessions")?; + } + + incr_session_connect_numbers(); + set_session_active_connections(sessions.len()); + + if !matches!(typ, SessionType::FlightRPC) { + sessions.insert(session.get_id(), Arc::downgrade(&session)); + } + Ok(()) + } + pub fn load_config_changes(&self, settings: &Arc) -> Result<()> { let query_config = &GlobalConfig::instance().query; if let Some(parquet_fast_read_bytes) = query_config.parquet_fast_read_bytes { @@ -127,19 +142,7 @@ impl SessionManager { let session_ctx = SessionContext::try_create(settings, typ.clone())?; let session = Session::try_create(id.clone(), typ.clone(), session_ctx, mysql_conn_id)?; - { - let mut sessions = self.active_sessions.write(); - if !matches!(typ, SessionType::Dummy | SessionType::FlightRPC) { - self.validate_max_active_sessions(sessions.len(), "active sessions")?; - } - - incr_session_connect_numbers(); - set_session_active_connections(sessions.len()); - - if !matches!(typ, SessionType::FlightRPC) { - sessions.insert(session.get_id(), Arc::downgrade(&session)); - } - } + self.add_session(typ.clone(), session.clone())?; if let SessionType::MySQL = typ { let mut mysql_conn_map = self.mysql_conn_map.write();