Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[clickhouse] Internal monitoring endpoint #7114

Merged
merged 18 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ clickhouse-admin-server-client = { path = "clients/clickhouse-admin-server-clien
clickhouse-admin-single-client = { path = "clients/clickhouse-admin-single-client" }
clickhouse-admin-types = { path = "clickhouse-admin/types" }
clickhouse-admin-test-utils = { path = "clickhouse-admin/test-utils" }
clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "a1b342c2558e835d09e6e39a40d3de798a29c2f" }
clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "242fd812aaeafec99ba01b5505ffbb2bd2370917" }
cockroach-admin-api = { path = "cockroach-admin/api" }
cockroach-admin-client = { path = "clients/cockroach-admin-client" }
cockroach-admin-types = { path = "cockroach-admin/types" }
Expand Down
37 changes: 34 additions & 3 deletions clickhouse-admin/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@

use clickhouse_admin_types::{
ClickhouseKeeperClusterMembership, DistributedDdlQueue, KeeperConf,
KeeperConfig, KeeperConfigurableSettings, Lgif, RaftConfig, ReplicaConfig,
ServerConfigurableSettings,
KeeperConfig, KeeperConfigurableSettings, Lgif, MetricInfoPath, RaftConfig,
ReplicaConfig, ServerConfigurableSettings, SystemTimeSeries,
TimeSeriesSettingsQuery,
};
use dropshot::{
HttpError, HttpResponseCreated, HttpResponseOk,
HttpResponseUpdatedNoContent, RequestContext, TypedBody,
HttpResponseUpdatedNoContent, Path, Query, RequestContext, TypedBody,
};

/// API interface for our clickhouse-admin-keeper server
Expand Down Expand Up @@ -116,6 +117,21 @@ pub trait ClickhouseAdminServerApi {
async fn distributed_ddl_queue(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<Vec<DistributedDdlQueue>>, HttpError>;

/// Retrieve time series from the system database.
///
/// The value of each data point is the average of all stored data points
/// within the interval.
/// These are internal ClickHouse metrics.
#[endpoint {
method = GET,
path = "/timeseries/{table}/{metric}/avg"
}]
async fn system_timeseries_avg(
rqctx: RequestContext<Self::Context>,
path_params: Path<MetricInfoPath>,
query_params: Query<TimeSeriesSettingsQuery>,
) -> Result<HttpResponseOk<Vec<SystemTimeSeries>>, HttpError>;
}

/// API interface for our clickhouse-admin-single server
Expand All @@ -136,4 +152,19 @@ pub trait ClickhouseAdminSingleApi {
async fn init_db(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseUpdatedNoContent, HttpError>;

/// Retrieve time series from the system database.
///
/// The value of each data point is the average of all stored data points
/// within the interval.
/// These are internal ClickHouse metrics.
#[endpoint {
method = GET,
path = "/timeseries/{table}/{metric}/avg"
}]
async fn system_timeseries_avg(
rqctx: RequestContext<Self::Context>,
path_params: Path<MetricInfoPath>,
query_params: Query<TimeSeriesSettingsQuery>,
) -> Result<HttpResponseOk<Vec<SystemTimeSeries>>, HttpError>;
}
68 changes: 54 additions & 14 deletions clickhouse-admin/src/clickhouse_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,23 @@ use anyhow::Result;
use camino::Utf8PathBuf;
use clickhouse_admin_types::{
ClickhouseKeeperClusterMembership, DistributedDdlQueue, KeeperConf,
KeeperId, Lgif, RaftConfig, OXIMETER_CLUSTER,
KeeperId, Lgif, RaftConfig, SystemTimeSeries, SystemTimeSeriesSettings,
OXIMETER_CLUSTER,
};
use dropshot::HttpError;
use illumos_utils::{output_to_exec_error, ExecutionError};
use slog::Logger;
use slog::{debug, Logger};
use slog_error_chain::{InlineErrorChain, SlogInlineError};
use std::collections::BTreeSet;
use std::ffi::OsStr;
use std::fmt::Display;
use std::io;
use std::net::SocketAddrV6;
use std::time::Duration;
use tokio::process::Command;

const DEFAULT_COMMAND_TIMEOUT: Duration = Duration::from_secs(30);

#[derive(Debug, thiserror::Error, SlogInlineError)]
pub enum ClickhouseCliError {
#[error("failed to run `clickhouse {subcommand}`")]
Expand All @@ -38,13 +42,16 @@ pub enum ClickhouseCliError {
#[source]
err: anyhow::Error,
},
#[error("clickhouse server unavailable: {0}")]
ServerUnavailable(String),
}

impl From<ClickhouseCliError> for HttpError {
fn from(err: ClickhouseCliError) -> Self {
match err {
ClickhouseCliError::Run { .. }
| ClickhouseCliError::Parse { .. }
| ClickhouseCliError::ServerUnavailable { .. }
| ClickhouseCliError::ExecutionError(_) => {
let message = InlineErrorChain::new(&err).to_string();
HttpError {
Expand Down Expand Up @@ -161,6 +168,25 @@ impl ClickhouseCli {
.await
}

pub async fn system_timeseries_avg(
&self,
settings: SystemTimeSeriesSettings,
) -> Result<Vec<SystemTimeSeries>, ClickhouseCliError> {
let log = self.log.clone().unwrap();
let query = settings.query_avg();

debug!(&log, "Querying system database"; "query" => &query);

self.client_non_interactive(
ClickhouseClientType::Server,
&query,
"Retrieve time series from the system database",
SystemTimeSeries::parse,
log,
)
.await
}

async fn client_non_interactive<F, T>(
&self,
client: ClickhouseClientType,
Expand All @@ -182,19 +208,33 @@ impl ClickhouseCli {
.arg("--query")
.arg(query);

let output = command.output().await.map_err(|err| {
let err_args: Vec<&OsStr> = command.as_std().get_args().collect();
let err_args_parsed: Vec<String> = err_args
.iter()
.map(|&os_str| os_str.to_string_lossy().into_owned())
.collect();
let err_args_str = err_args_parsed.join(" ");
ClickhouseCliError::Run {
description: subcommand_description,
subcommand: err_args_str,
err,
let now = tokio::time::Instant::now();
let result =
tokio::time::timeout(DEFAULT_COMMAND_TIMEOUT, command.output())
.await;

let elapsed = now.elapsed();
let output = match result {
Ok(result) => result.map_err(|err| {
let err_args: Vec<&OsStr> =
command.as_std().get_args().collect();
let err_args_parsed: Vec<String> = err_args
.iter()
.map(|&os_str| os_str.to_string_lossy().into_owned())
.collect();
let err_args_str = err_args_parsed.join(" ");
ClickhouseCliError::Run {
description: subcommand_description,
subcommand: err_args_str,
err,
}
})?,
Err(e) => {
return Err(ClickhouseCliError::ServerUnavailable(format!(
"command timed out after {elapsed:?}: {e}"
)))
}
})?;
};

if !output.status.success() {
return Err(output_to_exec_error(command.as_std(), &output).into());
Expand Down
37 changes: 34 additions & 3 deletions clickhouse-admin/src/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use crate::context::{ServerContext, SingleServerContext};
use clickhouse_admin_api::*;
use clickhouse_admin_types::{
ClickhouseKeeperClusterMembership, DistributedDdlQueue, KeeperConf,
KeeperConfig, KeeperConfigurableSettings, Lgif, RaftConfig, ReplicaConfig,
ServerConfigurableSettings,
KeeperConfig, KeeperConfigurableSettings, Lgif, MetricInfoPath, RaftConfig,
ReplicaConfig, ServerConfigurableSettings, SystemTimeSeries,
SystemTimeSeriesSettings, TimeSeriesSettingsQuery,
};
use dropshot::{
ApiDescription, HttpError, HttpResponseCreated, HttpResponseOk,
HttpResponseUpdatedNoContent, RequestContext, TypedBody,
HttpResponseUpdatedNoContent, Path, Query, RequestContext, TypedBody,
};
use illumos_utils::svcadm::Svcadm;
use omicron_common::address::CLICKHOUSE_TCP_PORT;
Expand Down Expand Up @@ -64,6 +65,21 @@ impl ClickhouseAdminServerApi for ClickhouseAdminServerImpl {
let output = ctx.clickhouse_cli().distributed_ddl_queue().await?;
Ok(HttpResponseOk(output))
}

async fn system_timeseries_avg(
rqctx: RequestContext<Self::Context>,
path_params: Path<MetricInfoPath>,
query_params: Query<TimeSeriesSettingsQuery>,
) -> Result<HttpResponseOk<Vec<SystemTimeSeries>>, HttpError> {
let ctx = rqctx.context();
let retrieval_settings = query_params.into_inner();
let metric_info = path_params.into_inner();
let settings =
SystemTimeSeriesSettings { retrieval_settings, metric_info };
let output =
ctx.clickhouse_cli().system_timeseries_avg(settings).await?;
Ok(HttpResponseOk(output))
}
}

enum ClickhouseAdminKeeperImpl {}
Expand Down Expand Up @@ -155,4 +171,19 @@ impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl {

Ok(HttpResponseUpdatedNoContent())
}

async fn system_timeseries_avg(
rqctx: RequestContext<Self::Context>,
path_params: Path<MetricInfoPath>,
query_params: Query<TimeSeriesSettingsQuery>,
) -> Result<HttpResponseOk<Vec<SystemTimeSeries>>, HttpError> {
let ctx = rqctx.context();
let retrieval_settings = query_params.into_inner();
let metric_info = path_params.into_inner();
let settings =
SystemTimeSeriesSettings { retrieval_settings, metric_info };
let output =
ctx.clickhouse_cli().system_timeseries_avg(settings).await?;
Ok(HttpResponseOk(output))
}
}
Loading
Loading