diff --git a/src/query/service/src/api/http/v1/background_tasks.rs b/src/query/service/src/api/http/v1/background_tasks.rs new file mode 100644 index 000000000000..4023ac5361ae --- /dev/null +++ b/src/query/service/src/api/http/v1/background_tasks.rs @@ -0,0 +1,104 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::DateTime; +use chrono::Utc; +use common_config::GlobalConfig; +use common_exception::Result; +use common_meta_api::BackgroundApi; +use common_meta_app::background::BackgroundTaskInfo; +use common_meta_app::background::BackgroundTaskState; +use common_meta_app::background::BackgroundTaskType; +use common_meta_app::background::ListBackgroundTasksReq; +use common_users::UserApiProvider; +use log::debug; +use poem::web::Json; +use poem::web::Query; +use poem::IntoResponse; +use serde::Deserialize; +use serde::Serialize; +#[derive(Debug, Serialize, Deserialize)] +pub struct BackgroundTaskQuery { + timestamp: DateTime, + table_id: u64, + task_state: BackgroundTaskState, + task_type: BackgroundTaskType, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ListBackgroundTasksResponse { + task_infos: Vec<(String, BackgroundTaskInfo)>, + params: BackgroundTaskQuery, +} + +#[async_backtrace::framed] +async fn load_background_tasks( + tenant: &str, + params: Query, +) -> Result { + let meta_api = UserApiProvider::instance().get_meta_store_client(); + let tasks = meta_api + .list_background_tasks(ListBackgroundTasksReq { + tenant: tenant.to_string(), + }) + .await?; + let mut task_infos = Vec::with_capacity(tasks.len()); + for (_, name, task) in tasks { + if task.task_state != params.task_state { + continue; + } + if task.task_type != params.task_type { + continue; + } + if task.task_type == BackgroundTaskType::COMPACTION { + if task.compaction_task_stats.is_none() { + continue; + } + if task.compaction_task_stats.as_ref().unwrap().table_id != params.table_id { + continue; + } + } + if task.last_updated.is_some() && task.last_updated.unwrap() < params.timestamp { + continue; + } + task_infos.push((name, task)); + } + Ok(ListBackgroundTasksResponse { + task_infos, + params: params.0, + }) +} + +#[poem::handler] +#[async_backtrace::framed] +pub async fn list_background_tasks( + params: Query, +) -> poem::Result { + let tenant = &GlobalConfig::instance().query.tenant_id; + debug!( + "list_background_tasks: tenant: {}, params: {:?}", + tenant, params + ); + if tenant.is_empty() { + return Ok(Json(ListBackgroundTasksResponse { + task_infos: vec![], + params: params.0, + })); + } + + let resp = load_background_tasks(tenant, params) + .await + .map_err(poem::error::InternalServerError)?; + Ok(Json(resp)) +} diff --git a/src/query/service/src/api/http/v1/mod.rs b/src/query/service/src/api/http/v1/mod.rs index a183647143f4..8294fbb8f4e7 100644 --- a/src/query/service/src/api/http/v1/mod.rs +++ b/src/query/service/src/api/http/v1/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod background_tasks; pub mod cluster; pub mod config; pub mod instance_status; diff --git a/src/query/service/src/api/http/v1/tenant_tables.rs b/src/query/service/src/api/http/v1/tenant_tables.rs index c2bb8093024b..d2d3c0f5572c 100644 --- a/src/query/service/src/api/http/v1/tenant_tables.rs +++ b/src/query/service/src/api/http/v1/tenant_tables.rs @@ -40,6 +40,9 @@ pub struct TenantTableInfo { pub data_bytes: u64, pub compressed_data_bytes: u64, pub index_bytes: u64, + pub number_of_blocks: Option, + pub number_of_segments: Option, + pub table_id: u64, } async fn load_tenant_tables(tenant: &str) -> Result { @@ -62,6 +65,7 @@ async fn load_tenant_tables(tenant: &str) -> Result { } }; for table in tables { + let table_id = table.get_table_info().ident.table_id; let stats = &table.get_table_info().meta.statistics; table_infos.push(TenantTableInfo { table: table.name().to_string(), @@ -73,6 +77,9 @@ async fn load_tenant_tables(tenant: &str) -> Result { data_bytes: stats.data_bytes, compressed_data_bytes: stats.compressed_data_bytes, index_bytes: stats.index_data_bytes, + number_of_blocks: stats.number_of_blocks, + number_of_segments: stats.number_of_segments, + table_id, }); } } diff --git a/src/query/service/src/api/http_service.rs b/src/query/service/src/api/http_service.rs index e8e80244ea99..4c3249577aa0 100644 --- a/src/query/service/src/api/http_service.rs +++ b/src/query/service/src/api/http_service.rs @@ -74,13 +74,16 @@ impl HttpService { ) .at("/debug/home", get(debug_home_handler)) .at("/debug/pprof/profile", get(debug_pprof_handler)) - .at("/debug/async_tasks/dump", get(debug_dump_stack)); - + .at("/debug/async_tasks/dump", get(debug_dump_stack)) + .at( + "/v1/background/:tenant/background_tasks", + get(super::http::v1::background_tasks::list_background_tasks), + ); if self.config.query.management_mode { route = route.at( "/v1/tenants/:tenant/tables", get(super::http::v1::tenant_tables::list_tenant_tables_handler), - ) + ); } #[cfg(feature = "memory-profiling")] diff --git a/src/query/service/src/interpreters/access/management_mode_access.rs b/src/query/service/src/interpreters/access/management_mode_access.rs index abae0feb0a47..a62b1b68a894 100644 --- a/src/query/service/src/interpreters/access/management_mode_access.rs +++ b/src/query/service/src/interpreters/access/management_mode_access.rs @@ -25,7 +25,6 @@ use crate::sessions::QueryContext; use crate::sql::plans::Plan; pub struct ManagementModeAccess {} - impl ManagementModeAccess { pub fn create() -> Box { Box::new(ManagementModeAccess {})