Skip to content

Commit

Permalink
Use one Oximeter client per admin server (#7171)
Browse files Browse the repository at this point in the history
Keeps the admin server's connections to ClickHouse from growing without
bound.

Also: don't automatically initialize an already-initialized ClickHouse
database. This prevents data loss when upgrading. Schema updates for
populated databases must be performed manually, as they have been in the
past.
  • Loading branch information
plotnick authored Nov 27, 2024
1 parent 888876a commit 6a773fb
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 88 deletions.
8 changes: 3 additions & 5 deletions clickhouse-admin/src/bin/clickhouse-admin-keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use anyhow::anyhow;
use camino::Utf8PathBuf;
use clap::Parser;
use omicron_clickhouse_admin::{ClickhouseCli, Clickward, Config};
use omicron_clickhouse_admin::{Clickward, Config};
use omicron_common::cmd::fatal;
use omicron_common::cmd::CmdError;
use std::net::{SocketAddr, SocketAddrV6};
Expand Down Expand Up @@ -55,12 +55,10 @@ async fn main_impl() -> Result<(), CmdError> {
.map_err(|err| CmdError::Failure(anyhow!(err)))?;
config.dropshot.bind_address = SocketAddr::V6(http_address);
let clickward = Clickward::new();
let clickhouse_cli =
ClickhouseCli::new(binary_path, listen_address);

let server = omicron_clickhouse_admin::start_keeper_admin_server(
clickward,
clickhouse_cli,
binary_path,
listen_address,
config,
)
.await
Expand Down
8 changes: 3 additions & 5 deletions clickhouse-admin/src/bin/clickhouse-admin-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use anyhow::anyhow;
use camino::Utf8PathBuf;
use clap::Parser;
use omicron_clickhouse_admin::{ClickhouseCli, Clickward, Config};
use omicron_clickhouse_admin::{Clickward, Config};
use omicron_common::cmd::fatal;
use omicron_common::cmd::CmdError;
use std::net::{SocketAddr, SocketAddrV6};
Expand Down Expand Up @@ -55,12 +55,10 @@ async fn main_impl() -> Result<(), CmdError> {
.map_err(|err| CmdError::Failure(anyhow!(err)))?;
config.dropshot.bind_address = SocketAddr::V6(http_address);
let clickward = Clickward::new();
let clickhouse_cli =
ClickhouseCli::new(binary_path, listen_address);

let server = omicron_clickhouse_admin::start_server_admin_server(
clickward,
clickhouse_cli,
binary_path,
listen_address,
config,
)
.await
Expand Down
8 changes: 3 additions & 5 deletions clickhouse-admin/src/bin/clickhouse-admin-single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use anyhow::anyhow;
use camino::Utf8PathBuf;
use clap::Parser;
use omicron_clickhouse_admin::{ClickhouseCli, Config};
use omicron_clickhouse_admin::Config;
use omicron_common::cmd::fatal;
use omicron_common::cmd::CmdError;
use std::net::{SocketAddr, SocketAddrV6};
Expand Down Expand Up @@ -53,11 +53,9 @@ async fn main_impl() -> Result<(), CmdError> {
let mut config = Config::from_file(&config)
.map_err(|err| CmdError::Failure(anyhow!(err)))?;
config.dropshot.bind_address = SocketAddr::V6(http_address);
let clickhouse_cli =
ClickhouseCli::new(binary_path, listen_address);

let server = omicron_clickhouse_admin::start_single_admin_server(
clickhouse_cli,
binary_path,
listen_address,
config,
)
.await
Expand Down
27 changes: 13 additions & 14 deletions clickhouse-admin/src/clickhouse_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,18 @@ impl Display for ClickhouseClientType {
pub struct ClickhouseCli {
/// Path to where the clickhouse binary is located
pub binary_path: Utf8PathBuf,
/// Address on where the clickhouse keeper is listening on
/// Address at which the clickhouse keeper/server is listening
pub listen_address: SocketAddrV6,
pub log: Option<Logger>,
pub log: Logger,
}

impl ClickhouseCli {
pub fn new(binary_path: Utf8PathBuf, listen_address: SocketAddrV6) -> Self {
Self { binary_path, listen_address, log: None }
}

pub fn with_log(mut self, log: Logger) -> Self {
self.log = Some(log);
self
pub fn new(
binary_path: Utf8PathBuf,
listen_address: SocketAddrV6,
log: Logger,
) -> Self {
Self { binary_path, listen_address, log }
}

pub async fn lgif(&self) -> Result<Lgif, ClickhouseCliError> {
Expand All @@ -105,7 +104,7 @@ impl ClickhouseCli {
"lgif",
"Retrieve logically grouped information file",
Lgif::parse,
self.log.clone().unwrap(),
self.log.clone(),
)
.await
}
Expand All @@ -116,7 +115,7 @@ impl ClickhouseCli {
"get /keeper/config",
"Retrieve raft configuration information",
RaftConfig::parse,
self.log.clone().unwrap(),
self.log.clone(),
)
.await
}
Expand All @@ -127,7 +126,7 @@ impl ClickhouseCli {
"conf",
"Retrieve keeper node configuration information",
KeeperConf::parse,
self.log.clone().unwrap(),
self.log.clone(),
)
.await
}
Expand Down Expand Up @@ -163,7 +162,7 @@ impl ClickhouseCli {
"Retrieve information about distributed ddl queries (ON CLUSTER clause)
that were executed on a cluster",
DistributedDdlQueue::parse,
self.log.clone().unwrap(),
self.log.clone(),
)
.await
}
Expand All @@ -172,7 +171,7 @@ impl ClickhouseCli {
&self,
settings: SystemTimeSeriesSettings,
) -> Result<Vec<SystemTimeSeries>, ClickhouseCliError> {
let log = self.log.clone().unwrap();
let log = self.log.clone();
let query = settings.query_avg();

debug!(&log, "Querying system database"; "query" => &query);
Expand Down
28 changes: 21 additions & 7 deletions clickhouse-admin/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use crate::{ClickhouseCli, Clickward};
use omicron_common::address::CLICKHOUSE_TCP_PORT;
use oximeter_db::Client as OximeterClient;
use slog::Logger;
use std::net::SocketAddrV6;
use std::sync::Arc;
use tokio::sync::Mutex;

Expand All @@ -14,12 +17,10 @@ pub struct ServerContext {
}

impl ServerContext {
pub fn new(
clickward: Clickward,
clickhouse_cli: ClickhouseCli,
_log: Logger,
) -> Self {
Self { clickward, clickhouse_cli, _log }
pub fn new(clickward: Clickward, clickhouse_cli: ClickhouseCli) -> Self {
let log =
clickhouse_cli.log.new(slog::o!("component" => "ServerContext"));
Self { clickward, clickhouse_cli, _log: log }
}

pub fn clickward(&self) -> &Clickward {
Expand All @@ -33,18 +34,31 @@ impl ServerContext {

pub struct SingleServerContext {
clickhouse_cli: ClickhouseCli,
oximeter_client: OximeterClient,
initialization_lock: Arc<Mutex<()>>,
}

impl SingleServerContext {
pub fn new(clickhouse_cli: ClickhouseCli) -> Self {
Self { clickhouse_cli, initialization_lock: Arc::new(Mutex::new(())) }
let ip = clickhouse_cli.listen_address.ip();
let address = SocketAddrV6::new(*ip, CLICKHOUSE_TCP_PORT, 0, 0);
let oximeter_client =
OximeterClient::new(address.into(), &clickhouse_cli.log);
Self {
clickhouse_cli,
oximeter_client,
initialization_lock: Arc::new(Mutex::new(())),
}
}

pub fn clickhouse_cli(&self) -> &ClickhouseCli {
&self.clickhouse_cli
}

pub fn oximeter_client(&self) -> &OximeterClient {
&self.oximeter_client
}

pub fn initialization_lock(&self) -> Arc<Mutex<()>> {
self.initialization_lock.clone()
}
Expand Down
52 changes: 31 additions & 21 deletions clickhouse-admin/src/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ use dropshot::{
HttpResponseUpdatedNoContent, Path, Query, RequestContext, TypedBody,
};
use illumos_utils::svcadm::Svcadm;
use omicron_common::address::CLICKHOUSE_TCP_PORT;
use oximeter_db::{Client as OximeterClient, OXIMETER_VERSION};
use slog::debug;
use std::net::SocketAddrV6;
use oximeter_db::OXIMETER_VERSION;
use slog::info;
use std::sync::Arc;

pub fn clickhouse_admin_server_api() -> ApiDescription<Arc<ServerContext>> {
Expand Down Expand Up @@ -146,28 +144,40 @@ impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl {
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
let log = &rqctx.log;
let ctx = rqctx.context();
let ip = ctx.clickhouse_cli().listen_address.ip();
let address = SocketAddrV6::new(*ip, CLICKHOUSE_TCP_PORT, 0, 0);
let client = OximeterClient::new(address.into(), log);
debug!(
log,
"initializing single-node ClickHouse \
at {address} to version {OXIMETER_VERSION}"
);

// Database initialization is idempotent, but not concurrency-safe.
// Use a mutex to serialize requests.
let lock = ctx.initialization_lock();
let _guard = lock.lock().await;
client
.initialize_db_with_version(false, OXIMETER_VERSION)
.await
.map_err(|e| {
HttpError::for_internal_error(format!(
"can't initialize single-node ClickHouse \
at {address} to version {OXIMETER_VERSION}: {e}",
))
})?;

// Initialize the database only if it was not previously initialized.
// TODO: Migrate schema to newer version without wiping data.
let client = ctx.oximeter_client();
let version = client.read_latest_version().await.map_err(|e| {
HttpError::for_internal_error(format!(
"can't read ClickHouse version: {e}",
))
})?;
if version == 0 {
info!(
log,
"initializing single-node ClickHouse to version {OXIMETER_VERSION}"
);
ctx.oximeter_client()
.initialize_db_with_version(false, OXIMETER_VERSION)
.await
.map_err(|e| {
HttpError::for_internal_error(format!(
"can't initialize single-node ClickHouse \
to version {OXIMETER_VERSION}: {e}",
))
})?;
} else {
info!(
log,
"skipping initialization of single-node ClickHouse at version {version}"
);
}

Ok(HttpResponseUpdatedNoContent())
}
Expand Down
38 changes: 23 additions & 15 deletions clickhouse-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use camino::Utf8PathBuf;
use context::{ServerContext, SingleServerContext};
use dropshot::HttpServer;
use omicron_common::FileKv;
use slog::{debug, error, Drain};
use slog_dtrace::ProbeRegistration;
use slog_error_chain::SlogInlineError;
use std::io;
use std::net::SocketAddrV6;
use std::sync::Arc;

mod clickhouse_cli;
Expand All @@ -35,7 +37,8 @@ pub enum StartError {
/// manages clickhouse replica servers.
pub async fn start_server_admin_server(
clickward: Clickward,
clickhouse_cli: ClickhouseCli,
binary_path: Utf8PathBuf,
listen_address: SocketAddrV6,
server_config: Config,
) -> Result<HttpServer<Arc<ServerContext>>, StartError> {
let (drain, registration) = slog_dtrace::with_drain(
Expand All @@ -56,13 +59,12 @@ pub async fn start_server_admin_server(
}
}

let context = ServerContext::new(
clickward,
clickhouse_cli
.with_log(log.new(slog::o!("component" => "ClickhouseCli"))),
log.new(slog::o!("component" => "ServerContext")),
let clickhouse_cli = ClickhouseCli::new(
binary_path,
listen_address,
log.new(slog::o!("component" => "ClickhouseCli")),
);

let context = ServerContext::new(clickward, clickhouse_cli);
dropshot::ServerBuilder::new(
http_entrypoints::clickhouse_admin_server_api(),
Arc::new(context),
Expand All @@ -77,7 +79,8 @@ pub async fn start_server_admin_server(
/// manages clickhouse replica servers.
pub async fn start_keeper_admin_server(
clickward: Clickward,
clickhouse_cli: ClickhouseCli,
binary_path: Utf8PathBuf,
listen_address: SocketAddrV6,
server_config: Config,
) -> Result<HttpServer<Arc<ServerContext>>, StartError> {
let (drain, registration) = slog_dtrace::with_drain(
Expand All @@ -98,13 +101,12 @@ pub async fn start_keeper_admin_server(
}
}

let context = ServerContext::new(
clickward,
clickhouse_cli
.with_log(log.new(slog::o!("component" => "ClickhouseCli"))),
log.new(slog::o!("component" => "ServerContext")),
let clickhouse_cli = ClickhouseCli::new(
binary_path,
listen_address,
log.new(slog::o!("component" => "ClickhouseCli")),
);

let context = ServerContext::new(clickward, clickhouse_cli);
dropshot::ServerBuilder::new(
http_entrypoints::clickhouse_admin_keeper_api(),
Arc::new(context),
Expand All @@ -118,7 +120,8 @@ pub async fn start_keeper_admin_server(
/// Start the dropshot server for `clickhouse-admin-single` which
/// manages a single-node ClickHouse database.
pub async fn start_single_admin_server(
clickhouse_cli: ClickhouseCli,
binary_path: Utf8PathBuf,
listen_address: SocketAddrV6,
server_config: Config,
) -> Result<HttpServer<Arc<SingleServerContext>>, StartError> {
let (drain, registration) = slog_dtrace::with_drain(
Expand All @@ -139,6 +142,11 @@ pub async fn start_single_admin_server(
}
}

let clickhouse_cli = ClickhouseCli::new(
binary_path,
listen_address,
log.new(slog::o!("component" => "ClickhouseCli")),
);
let context = SingleServerContext::new(clickhouse_cli);
dropshot::ServerBuilder::new(
http_entrypoints::clickhouse_admin_single_api(),
Expand Down
Loading

0 comments on commit 6a773fb

Please sign in to comment.