Skip to content

Commit

Permalink
feat: add http query interface for background tasks (#13027)
Browse files Browse the repository at this point in the history
* save

* add license header
  • Loading branch information
ZhiHanZ authored Sep 26, 2023
1 parent 42ae1e6 commit 00261dc
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 4 deletions.
104 changes: 104 additions & 0 deletions src/query/service/src/api/http/v1/background_tasks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::DateTime;
use chrono::Utc;
use common_config::GlobalConfig;
use common_exception::Result;
use common_meta_api::BackgroundApi;
use common_meta_app::background::BackgroundTaskInfo;
use common_meta_app::background::BackgroundTaskState;
use common_meta_app::background::BackgroundTaskType;
use common_meta_app::background::ListBackgroundTasksReq;
use common_users::UserApiProvider;
use log::debug;
use poem::web::Json;
use poem::web::Query;
use poem::IntoResponse;
use serde::Deserialize;
use serde::Serialize;
#[derive(Debug, Serialize, Deserialize)]
pub struct BackgroundTaskQuery {
timestamp: DateTime<Utc>,
table_id: u64,
task_state: BackgroundTaskState,
task_type: BackgroundTaskType,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct ListBackgroundTasksResponse {
task_infos: Vec<(String, BackgroundTaskInfo)>,
params: BackgroundTaskQuery,
}

#[async_backtrace::framed]
async fn load_background_tasks(
tenant: &str,
params: Query<BackgroundTaskQuery>,
) -> Result<ListBackgroundTasksResponse> {
let meta_api = UserApiProvider::instance().get_meta_store_client();
let tasks = meta_api
.list_background_tasks(ListBackgroundTasksReq {
tenant: tenant.to_string(),
})
.await?;
let mut task_infos = Vec::with_capacity(tasks.len());
for (_, name, task) in tasks {
if task.task_state != params.task_state {
continue;
}
if task.task_type != params.task_type {
continue;
}
if task.task_type == BackgroundTaskType::COMPACTION {
if task.compaction_task_stats.is_none() {
continue;
}
if task.compaction_task_stats.as_ref().unwrap().table_id != params.table_id {
continue;
}
}
if task.last_updated.is_some() && task.last_updated.unwrap() < params.timestamp {
continue;
}
task_infos.push((name, task));
}
Ok(ListBackgroundTasksResponse {
task_infos,
params: params.0,
})
}

#[poem::handler]
#[async_backtrace::framed]
pub async fn list_background_tasks(
params: Query<BackgroundTaskQuery>,
) -> poem::Result<impl IntoResponse> {
let tenant = &GlobalConfig::instance().query.tenant_id;
debug!(
"list_background_tasks: tenant: {}, params: {:?}",
tenant, params
);
if tenant.is_empty() {
return Ok(Json(ListBackgroundTasksResponse {
task_infos: vec![],
params: params.0,
}));
}

let resp = load_background_tasks(tenant, params)
.await
.map_err(poem::error::InternalServerError)?;
Ok(Json(resp))
}
1 change: 1 addition & 0 deletions src/query/service/src/api/http/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod background_tasks;
pub mod cluster;
pub mod config;
pub mod instance_status;
Expand Down
7 changes: 7 additions & 0 deletions src/query/service/src/api/http/v1/tenant_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub struct TenantTableInfo {
pub data_bytes: u64,
pub compressed_data_bytes: u64,
pub index_bytes: u64,
pub number_of_blocks: Option<u64>,
pub number_of_segments: Option<u64>,
pub table_id: u64,
}

async fn load_tenant_tables(tenant: &str) -> Result<TenantTablesResponse> {
Expand All @@ -62,6 +65,7 @@ async fn load_tenant_tables(tenant: &str) -> Result<TenantTablesResponse> {
}
};
for table in tables {
let table_id = table.get_table_info().ident.table_id;
let stats = &table.get_table_info().meta.statistics;
table_infos.push(TenantTableInfo {
table: table.name().to_string(),
Expand All @@ -73,6 +77,9 @@ async fn load_tenant_tables(tenant: &str) -> Result<TenantTablesResponse> {
data_bytes: stats.data_bytes,
compressed_data_bytes: stats.compressed_data_bytes,
index_bytes: stats.index_data_bytes,
number_of_blocks: stats.number_of_blocks,
number_of_segments: stats.number_of_segments,
table_id,
});
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/query/service/src/api/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ impl HttpService {
)
.at("/debug/home", get(debug_home_handler))
.at("/debug/pprof/profile", get(debug_pprof_handler))
.at("/debug/async_tasks/dump", get(debug_dump_stack));

.at("/debug/async_tasks/dump", get(debug_dump_stack))
.at(
"/v1/background/:tenant/background_tasks",
get(super::http::v1::background_tasks::list_background_tasks),
);
if self.config.query.management_mode {
route = route.at(
"/v1/tenants/:tenant/tables",
get(super::http::v1::tenant_tables::list_tenant_tables_handler),
)
);
}

#[cfg(feature = "memory-profiling")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::sessions::QueryContext;
use crate::sql::plans::Plan;

pub struct ManagementModeAccess {}

impl ManagementModeAccess {
pub fn create() -> Box<dyn AccessChecker> {
Box::new(ManagementModeAccess {})
Expand Down

1 comment on commit 00261dc

@vercel
Copy link

@vercel vercel bot commented on 00261dc Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.