diff --git a/clickhouse-admin/src/bin/clickhouse-admin-keeper.rs b/clickhouse-admin/src/bin/clickhouse-admin-keeper.rs index 4ec998920b..953d109fda 100644 --- a/clickhouse-admin/src/bin/clickhouse-admin-keeper.rs +++ b/clickhouse-admin/src/bin/clickhouse-admin-keeper.rs @@ -8,7 +8,7 @@ use anyhow::anyhow; use camino::Utf8PathBuf; use clap::Parser; -use omicron_clickhouse_admin::{ClickhouseCli, Clickward, Config}; +use omicron_clickhouse_admin::{Clickward, Config}; use omicron_common::cmd::fatal; use omicron_common::cmd::CmdError; use std::net::{SocketAddr, SocketAddrV6}; @@ -55,12 +55,10 @@ async fn main_impl() -> Result<(), CmdError> { .map_err(|err| CmdError::Failure(anyhow!(err)))?; config.dropshot.bind_address = SocketAddr::V6(http_address); let clickward = Clickward::new(); - let clickhouse_cli = - ClickhouseCli::new(binary_path, listen_address); - let server = omicron_clickhouse_admin::start_keeper_admin_server( clickward, - clickhouse_cli, + binary_path, + listen_address, config, ) .await diff --git a/clickhouse-admin/src/bin/clickhouse-admin-server.rs b/clickhouse-admin/src/bin/clickhouse-admin-server.rs index 1258f75805..486b91d3be 100644 --- a/clickhouse-admin/src/bin/clickhouse-admin-server.rs +++ b/clickhouse-admin/src/bin/clickhouse-admin-server.rs @@ -8,7 +8,7 @@ use anyhow::anyhow; use camino::Utf8PathBuf; use clap::Parser; -use omicron_clickhouse_admin::{ClickhouseCli, Clickward, Config}; +use omicron_clickhouse_admin::{Clickward, Config}; use omicron_common::cmd::fatal; use omicron_common::cmd::CmdError; use std::net::{SocketAddr, SocketAddrV6}; @@ -55,12 +55,10 @@ async fn main_impl() -> Result<(), CmdError> { .map_err(|err| CmdError::Failure(anyhow!(err)))?; config.dropshot.bind_address = SocketAddr::V6(http_address); let clickward = Clickward::new(); - let clickhouse_cli = - ClickhouseCli::new(binary_path, listen_address); - let server = omicron_clickhouse_admin::start_server_admin_server( clickward, - clickhouse_cli, + binary_path, + listen_address, config, ) .await diff --git a/clickhouse-admin/src/bin/clickhouse-admin-single.rs b/clickhouse-admin/src/bin/clickhouse-admin-single.rs index bc7097f980..a23eadd396 100644 --- a/clickhouse-admin/src/bin/clickhouse-admin-single.rs +++ b/clickhouse-admin/src/bin/clickhouse-admin-single.rs @@ -7,7 +7,7 @@ use anyhow::anyhow; use camino::Utf8PathBuf; use clap::Parser; -use omicron_clickhouse_admin::{ClickhouseCli, Config}; +use omicron_clickhouse_admin::Config; use omicron_common::cmd::fatal; use omicron_common::cmd::CmdError; use std::net::{SocketAddr, SocketAddrV6}; @@ -53,11 +53,9 @@ async fn main_impl() -> Result<(), CmdError> { let mut config = Config::from_file(&config) .map_err(|err| CmdError::Failure(anyhow!(err)))?; config.dropshot.bind_address = SocketAddr::V6(http_address); - let clickhouse_cli = - ClickhouseCli::new(binary_path, listen_address); - let server = omicron_clickhouse_admin::start_single_admin_server( - clickhouse_cli, + binary_path, + listen_address, config, ) .await diff --git a/clickhouse-admin/src/clickhouse_cli.rs b/clickhouse-admin/src/clickhouse_cli.rs index 3d8a1b076e..588ebe4290 100644 --- a/clickhouse-admin/src/clickhouse_cli.rs +++ b/clickhouse-admin/src/clickhouse_cli.rs @@ -84,19 +84,18 @@ impl Display for ClickhouseClientType { pub struct ClickhouseCli { /// Path to where the clickhouse binary is located pub binary_path: Utf8PathBuf, - /// Address on where the clickhouse keeper is listening on + /// Address at which the clickhouse keeper/server is listening pub listen_address: SocketAddrV6, - pub log: Option, + pub log: Logger, } impl ClickhouseCli { - pub fn new(binary_path: Utf8PathBuf, listen_address: SocketAddrV6) -> Self { - Self { binary_path, listen_address, log: None } - } - - pub fn with_log(mut self, log: Logger) -> Self { - self.log = Some(log); - self + pub fn new( + binary_path: Utf8PathBuf, + listen_address: SocketAddrV6, + log: Logger, + ) -> Self { + Self { binary_path, listen_address, log } } pub async fn lgif(&self) -> Result { @@ -105,7 +104,7 @@ impl ClickhouseCli { "lgif", "Retrieve logically grouped information file", Lgif::parse, - self.log.clone().unwrap(), + self.log.clone(), ) .await } @@ -116,7 +115,7 @@ impl ClickhouseCli { "get /keeper/config", "Retrieve raft configuration information", RaftConfig::parse, - self.log.clone().unwrap(), + self.log.clone(), ) .await } @@ -127,7 +126,7 @@ impl ClickhouseCli { "conf", "Retrieve keeper node configuration information", KeeperConf::parse, - self.log.clone().unwrap(), + self.log.clone(), ) .await } @@ -163,7 +162,7 @@ impl ClickhouseCli { "Retrieve information about distributed ddl queries (ON CLUSTER clause) that were executed on a cluster", DistributedDdlQueue::parse, - self.log.clone().unwrap(), + self.log.clone(), ) .await } @@ -172,7 +171,7 @@ impl ClickhouseCli { &self, settings: SystemTimeSeriesSettings, ) -> Result, ClickhouseCliError> { - let log = self.log.clone().unwrap(); + let log = self.log.clone(); let query = settings.query_avg(); debug!(&log, "Querying system database"; "query" => &query); diff --git a/clickhouse-admin/src/context.rs b/clickhouse-admin/src/context.rs index f423dafe1f..2469a347de 100644 --- a/clickhouse-admin/src/context.rs +++ b/clickhouse-admin/src/context.rs @@ -3,7 +3,10 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use crate::{ClickhouseCli, Clickward}; +use omicron_common::address::CLICKHOUSE_TCP_PORT; +use oximeter_db::Client as OximeterClient; use slog::Logger; +use std::net::SocketAddrV6; use std::sync::Arc; use tokio::sync::Mutex; @@ -14,12 +17,10 @@ pub struct ServerContext { } impl ServerContext { - pub fn new( - clickward: Clickward, - clickhouse_cli: ClickhouseCli, - _log: Logger, - ) -> Self { - Self { clickward, clickhouse_cli, _log } + pub fn new(clickward: Clickward, clickhouse_cli: ClickhouseCli) -> Self { + let log = + clickhouse_cli.log.new(slog::o!("component" => "ServerContext")); + Self { clickward, clickhouse_cli, _log: log } } pub fn clickward(&self) -> &Clickward { @@ -33,18 +34,31 @@ impl ServerContext { pub struct SingleServerContext { clickhouse_cli: ClickhouseCli, + oximeter_client: OximeterClient, initialization_lock: Arc>, } impl SingleServerContext { pub fn new(clickhouse_cli: ClickhouseCli) -> Self { - Self { clickhouse_cli, initialization_lock: Arc::new(Mutex::new(())) } + let ip = clickhouse_cli.listen_address.ip(); + let address = SocketAddrV6::new(*ip, CLICKHOUSE_TCP_PORT, 0, 0); + let oximeter_client = + OximeterClient::new(address.into(), &clickhouse_cli.log); + Self { + clickhouse_cli, + oximeter_client, + initialization_lock: Arc::new(Mutex::new(())), + } } pub fn clickhouse_cli(&self) -> &ClickhouseCli { &self.clickhouse_cli } + pub fn oximeter_client(&self) -> &OximeterClient { + &self.oximeter_client + } + pub fn initialization_lock(&self) -> Arc> { self.initialization_lock.clone() } diff --git a/clickhouse-admin/src/http_entrypoints.rs b/clickhouse-admin/src/http_entrypoints.rs index 9379e8102f..a64b3a6435 100644 --- a/clickhouse-admin/src/http_entrypoints.rs +++ b/clickhouse-admin/src/http_entrypoints.rs @@ -15,10 +15,8 @@ use dropshot::{ HttpResponseUpdatedNoContent, Path, Query, RequestContext, TypedBody, }; use illumos_utils::svcadm::Svcadm; -use omicron_common::address::CLICKHOUSE_TCP_PORT; -use oximeter_db::{Client as OximeterClient, OXIMETER_VERSION}; -use slog::debug; -use std::net::SocketAddrV6; +use oximeter_db::OXIMETER_VERSION; +use slog::info; use std::sync::Arc; pub fn clickhouse_admin_server_api() -> ApiDescription> { @@ -146,28 +144,40 @@ impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl { ) -> Result { let log = &rqctx.log; let ctx = rqctx.context(); - let ip = ctx.clickhouse_cli().listen_address.ip(); - let address = SocketAddrV6::new(*ip, CLICKHOUSE_TCP_PORT, 0, 0); - let client = OximeterClient::new(address.into(), log); - debug!( - log, - "initializing single-node ClickHouse \ - at {address} to version {OXIMETER_VERSION}" - ); // Database initialization is idempotent, but not concurrency-safe. // Use a mutex to serialize requests. let lock = ctx.initialization_lock(); let _guard = lock.lock().await; - client - .initialize_db_with_version(false, OXIMETER_VERSION) - .await - .map_err(|e| { - HttpError::for_internal_error(format!( - "can't initialize single-node ClickHouse \ - at {address} to version {OXIMETER_VERSION}: {e}", - )) - })?; + + // Initialize the database only if it was not previously initialized. + // TODO: Migrate schema to newer version without wiping data. + let client = ctx.oximeter_client(); + let version = client.read_latest_version().await.map_err(|e| { + HttpError::for_internal_error(format!( + "can't read ClickHouse version: {e}", + )) + })?; + if version == 0 { + info!( + log, + "initializing single-node ClickHouse to version {OXIMETER_VERSION}" + ); + ctx.oximeter_client() + .initialize_db_with_version(false, OXIMETER_VERSION) + .await + .map_err(|e| { + HttpError::for_internal_error(format!( + "can't initialize single-node ClickHouse \ + to version {OXIMETER_VERSION}: {e}", + )) + })?; + } else { + info!( + log, + "skipping initialization of single-node ClickHouse at version {version}" + ); + } Ok(HttpResponseUpdatedNoContent()) } diff --git a/clickhouse-admin/src/lib.rs b/clickhouse-admin/src/lib.rs index 0a79978e32..e0057609c2 100644 --- a/clickhouse-admin/src/lib.rs +++ b/clickhouse-admin/src/lib.rs @@ -2,6 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +use camino::Utf8PathBuf; use context::{ServerContext, SingleServerContext}; use dropshot::HttpServer; use omicron_common::FileKv; @@ -9,6 +10,7 @@ use slog::{debug, error, Drain}; use slog_dtrace::ProbeRegistration; use slog_error_chain::SlogInlineError; use std::io; +use std::net::SocketAddrV6; use std::sync::Arc; mod clickhouse_cli; @@ -35,7 +37,8 @@ pub enum StartError { /// manages clickhouse replica servers. pub async fn start_server_admin_server( clickward: Clickward, - clickhouse_cli: ClickhouseCli, + binary_path: Utf8PathBuf, + listen_address: SocketAddrV6, server_config: Config, ) -> Result>, StartError> { let (drain, registration) = slog_dtrace::with_drain( @@ -56,13 +59,12 @@ pub async fn start_server_admin_server( } } - let context = ServerContext::new( - clickward, - clickhouse_cli - .with_log(log.new(slog::o!("component" => "ClickhouseCli"))), - log.new(slog::o!("component" => "ServerContext")), + let clickhouse_cli = ClickhouseCli::new( + binary_path, + listen_address, + log.new(slog::o!("component" => "ClickhouseCli")), ); - + let context = ServerContext::new(clickward, clickhouse_cli); dropshot::ServerBuilder::new( http_entrypoints::clickhouse_admin_server_api(), Arc::new(context), @@ -77,7 +79,8 @@ pub async fn start_server_admin_server( /// manages clickhouse replica servers. pub async fn start_keeper_admin_server( clickward: Clickward, - clickhouse_cli: ClickhouseCli, + binary_path: Utf8PathBuf, + listen_address: SocketAddrV6, server_config: Config, ) -> Result>, StartError> { let (drain, registration) = slog_dtrace::with_drain( @@ -98,13 +101,12 @@ pub async fn start_keeper_admin_server( } } - let context = ServerContext::new( - clickward, - clickhouse_cli - .with_log(log.new(slog::o!("component" => "ClickhouseCli"))), - log.new(slog::o!("component" => "ServerContext")), + let clickhouse_cli = ClickhouseCli::new( + binary_path, + listen_address, + log.new(slog::o!("component" => "ClickhouseCli")), ); - + let context = ServerContext::new(clickward, clickhouse_cli); dropshot::ServerBuilder::new( http_entrypoints::clickhouse_admin_keeper_api(), Arc::new(context), @@ -118,7 +120,8 @@ pub async fn start_keeper_admin_server( /// Start the dropshot server for `clickhouse-admin-single` which /// manages a single-node ClickHouse database. pub async fn start_single_admin_server( - clickhouse_cli: ClickhouseCli, + binary_path: Utf8PathBuf, + listen_address: SocketAddrV6, server_config: Config, ) -> Result>, StartError> { let (drain, registration) = slog_dtrace::with_drain( @@ -139,6 +142,11 @@ pub async fn start_single_admin_server( } } + let clickhouse_cli = ClickhouseCli::new( + binary_path, + listen_address, + log.new(slog::o!("component" => "ClickhouseCli")), + ); let context = SingleServerContext::new(clickhouse_cli); dropshot::ServerBuilder::new( http_entrypoints::clickhouse_admin_single_api(), diff --git a/clickhouse-admin/tests/integration_test.rs b/clickhouse-admin/tests/integration_test.rs index 7c0d12b74b..2715afd9cf 100644 --- a/clickhouse-admin/tests/integration_test.rs +++ b/clickhouse-admin/tests/integration_test.rs @@ -42,8 +42,6 @@ fn get_keeper_raft_port(keeper_id: KeeperId) -> u16 { #[tokio::test] async fn test_lgif_parsing() -> anyhow::Result<()> { - let log = log(); - let clickhouse_cli = ClickhouseCli::new( Utf8PathBuf::from_str("clickhouse")?, SocketAddrV6::new( @@ -52,8 +50,8 @@ async fn test_lgif_parsing() -> anyhow::Result<()> { 0, 0, ), - ) - .with_log(log); + log(), + ); let lgif = clickhouse_cli.lgif().await.unwrap(); @@ -65,8 +63,6 @@ async fn test_lgif_parsing() -> anyhow::Result<()> { #[tokio::test] async fn test_raft_config_parsing() -> anyhow::Result<()> { - let log = log(); - let clickhouse_cli = ClickhouseCli::new( Utf8PathBuf::from_str("clickhouse").unwrap(), SocketAddrV6::new( @@ -75,8 +71,8 @@ async fn test_raft_config_parsing() -> anyhow::Result<()> { 0, 0, ), - ) - .with_log(log); + log(), + ); let raft_config = clickhouse_cli.raft_config().await.unwrap(); @@ -103,8 +99,6 @@ async fn test_raft_config_parsing() -> anyhow::Result<()> { #[tokio::test] async fn test_keeper_conf_parsing() -> anyhow::Result<()> { - let log = log(); - let clickhouse_cli = ClickhouseCli::new( Utf8PathBuf::from_str("clickhouse").unwrap(), SocketAddrV6::new( @@ -113,8 +107,8 @@ async fn test_keeper_conf_parsing() -> anyhow::Result<()> { 0, 0, ), - ) - .with_log(log); + log(), + ); let conf = clickhouse_cli.keeper_conf().await.unwrap(); @@ -125,8 +119,6 @@ async fn test_keeper_conf_parsing() -> anyhow::Result<()> { #[tokio::test] async fn test_keeper_cluster_membership() -> anyhow::Result<()> { - let log = log(); - let clickhouse_cli = ClickhouseCli::new( Utf8PathBuf::from_str("clickhouse").unwrap(), SocketAddrV6::new( @@ -135,8 +127,8 @@ async fn test_keeper_cluster_membership() -> anyhow::Result<()> { 0, 0, ), - ) - .with_log(log); + log(), + ); let keeper_cluster_membership = clickhouse_cli.keeper_cluster_membership().await.unwrap();