diff --git a/.gitignore b/.gitignore index 39a8f36144..6e4e0eb42a 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ tags .falcon/* .img/* connectivity-report.json +*.local diff --git a/Cargo.lock b/Cargo.lock index 2b6a918590..b27115b42a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5641,6 +5641,7 @@ dependencies = [ "omicron-uuid-kinds", "omicron-workspace-hack", "oximeter-client", + "oximeter-db", "pq-sys", "ratatui", "reedline", @@ -5655,6 +5656,7 @@ dependencies = [ "textwrap", "tokio", "unicode-width", + "url", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index f217bcd169..06d3ef3130 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -384,7 +384,7 @@ oso = "0.27" owo-colors = "4.0.0" oximeter = { path = "oximeter/oximeter" } oximeter-client = { path = "clients/oximeter-client" } -oximeter-db = { path = "oximeter/db/" } +oximeter-db = { path = "oximeter/db/", default-features = false } oximeter-collector = { path = "oximeter/collector" } oximeter-impl = { path = "oximeter/impl" } oximeter-instruments = { path = "oximeter/instruments" } diff --git a/dev-tools/omdb/Cargo.toml b/dev-tools/omdb/Cargo.toml index 9cdf03093c..e5d898509c 100644 --- a/dev-tools/omdb/Cargo.toml +++ b/dev-tools/omdb/Cargo.toml @@ -37,6 +37,7 @@ nexus-types.workspace = true omicron-common.workspace = true omicron-uuid-kinds.workspace = true oximeter-client.workspace = true +oximeter-db = { workspace = true, default-features = false, features = [ "oxql" ] } # See omicron-rpaths for more about the "pq-sys" dependency. pq-sys = "*" ratatui.workspace = true @@ -51,6 +52,7 @@ tabled.workspace = true textwrap.workspace = true tokio = { workspace = true, features = [ "full" ] } unicode-width.workspace = true +url.workspace = true uuid.workspace = true ipnetwork.workspace = true omicron-workspace-hack.workspace = true diff --git a/dev-tools/omdb/src/bin/omdb/main.rs b/dev-tools/omdb/src/bin/omdb/main.rs index 7469e2ba54..8fc48f5028 100644 --- a/dev-tools/omdb/src/bin/omdb/main.rs +++ b/dev-tools/omdb/src/bin/omdb/main.rs @@ -50,6 +50,7 @@ mod helpers; mod mgs; mod nexus; mod oximeter; +mod oxql; mod sled_agent; #[tokio::main] @@ -66,7 +67,8 @@ async fn main() -> Result<(), anyhow::Error> { OmdbCommands::Db(db) => db.run_cmd(&args, &log).await, OmdbCommands::Mgs(mgs) => mgs.run_cmd(&args, &log).await, OmdbCommands::Nexus(nexus) => nexus.run_cmd(&args, &log).await, - OmdbCommands::Oximeter(oximeter) => oximeter.run_cmd(&log).await, + OmdbCommands::Oximeter(oximeter) => oximeter.run_cmd(&args, &log).await, + OmdbCommands::Oxql(oxql) => oxql.run_cmd(&args, &log).await, OmdbCommands::SledAgent(sled) => sled.run_cmd(&args, &log).await, OmdbCommands::CrucibleAgent(crucible) => crucible.run_cmd(&args).await, } @@ -269,6 +271,8 @@ enum OmdbCommands { Nexus(nexus::NexusArgs), /// Query oximeter collector state Oximeter(oximeter::OximeterArgs), + /// Enter the Oximeter Query Language shell for interactive querying. + Oxql(oxql::OxqlArgs), /// Debug a specific Sled SledAgent(sled_agent::SledAgentArgs), } diff --git a/dev-tools/omdb/src/bin/omdb/oximeter.rs b/dev-tools/omdb/src/bin/omdb/oximeter.rs index a6dc2ce011..c068110b4c 100644 --- a/dev-tools/omdb/src/bin/omdb/oximeter.rs +++ b/dev-tools/omdb/src/bin/omdb/oximeter.rs @@ -5,6 +5,7 @@ //! omdb commands that query oximeter use crate::helpers::CONNECTION_OPTIONS_HEADING; +use crate::Omdb; use anyhow::Context; use clap::Args; use clap::Subcommand; @@ -18,18 +19,17 @@ use tabled::Table; use tabled::Tabled; use uuid::Uuid; +/// Arguments for the oximeter subcommand. #[derive(Debug, Args)] pub struct OximeterArgs { /// URL of the oximeter collector to query #[arg( long, env = "OMDB_OXIMETER_URL", - // This can't be global = true (i.e. passed in later in the - // command-line) because global options can't be required. If this - // changes to being optional, we should set global = true. + global = true, help_heading = CONNECTION_OPTIONS_HEADING, )] - oximeter_url: String, + oximeter_url: Option, #[command(subcommand)] command: OximeterCommands, @@ -38,20 +38,47 @@ pub struct OximeterArgs { /// Subcommands that query oximeter collector state #[derive(Debug, Subcommand)] enum OximeterCommands { - /// List the producers the collector is assigned to poll + /// List the producers the collector is assigned to poll. ListProducers, } impl OximeterArgs { - fn client(&self, log: &Logger) -> Client { - Client::new( - &self.oximeter_url, + async fn client( + &self, + omdb: &Omdb, + log: &Logger, + ) -> Result { + let oximeter_url = match &self.oximeter_url { + Some(cli_or_env_url) => cli_or_env_url.clone(), + None => { + eprintln!( + "note: Oximeter URL not specified. Will pick one from DNS." + ); + let addr = omdb + .dns_lookup_one( + log.clone(), + internal_dns::ServiceName::Oximeter, + ) + .await?; + format!("http://{}", addr) + } + }; + eprintln!("note: using Oximeter URL {}", &oximeter_url); + + let client = Client::new( + &oximeter_url, log.new(slog::o!("component" => "oximeter-client")), - ) + ); + Ok(client) } - pub async fn run_cmd(&self, log: &Logger) -> anyhow::Result<()> { - let client = self.client(log); + /// Run the command. + pub async fn run_cmd( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result<()> { + let client = self.client(omdb, log).await?; match self.command { OximeterCommands::ListProducers => { self.list_producers(client).await diff --git a/dev-tools/omdb/src/bin/omdb/oxql.rs b/dev-tools/omdb/src/bin/omdb/oxql.rs new file mode 100644 index 0000000000..89ddae9cf2 --- /dev/null +++ b/dev-tools/omdb/src/bin/omdb/oxql.rs @@ -0,0 +1,97 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! omdb OxQL shell for interactive queries on metrics/timeseries. + +// Copyright 2024 Oxide Computer + +use crate::helpers::CONNECTION_OPTIONS_HEADING; +use crate::Omdb; +use anyhow::Context; +use clap::Args; +use oximeter_db::{ + self, + shells::oxql::{self, ShellOptions}, +}; +use slog::Logger; +use std::net::SocketAddr; +use url::Url; + +/// Command-line arguments for the OxQL shell. +#[derive(Debug, Args)] +pub struct OxqlArgs { + /// URL of the ClickHouse server to connect to. + #[arg( + long, + env = "OMDB_CLICKHOUSE_URL", + global = true, + help_heading = CONNECTION_OPTIONS_HEADING, + )] + clickhouse_url: Option, + + /// Print summaries of each SQL query run against the database. + #[clap(long = "summaries")] + print_summaries: bool, + + /// Print the total elapsed query duration. + #[clap(long = "elapsed")] + print_elapsed: bool, +} + +impl OxqlArgs { + /// Run the OxQL shell via the `omdb oxql` subcommand. + pub async fn run_cmd( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result<()> { + let addr = self.addr(omdb, log).await?; + + let opts = ShellOptions { + print_summaries: self.print_summaries, + print_elapsed: self.print_elapsed, + }; + + oxql::shell( + addr.ip(), + addr.port(), + log.new(slog::o!("component" => "clickhouse-client")), + opts, + ) + .await + } + + /// Resolve the ClickHouse URL to a socket address. + async fn addr( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result { + match &self.clickhouse_url { + Some(cli_or_env_url) => Url::parse(&cli_or_env_url) + .context( + "failed parsing URL from command-line or environment variable", + )? + .socket_addrs(|| None) + .context("failed resolving socket addresses")? + .into_iter() + .next() + .context("failed resolving socket addresses"), + None => { + eprintln!( + "note: ClickHouse URL not specified. Will pick one from DNS." + ); + + Ok(SocketAddr::V6( + omdb.dns_lookup_one( + log.clone(), + internal_dns::ServiceName::Clickhouse, + ) + .await + .context("failed looking up ClickHouse internal DNS entry")?, + )) + } + } + } +} diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index 348ff5e9ac..66a48ab394 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -433,3 +433,28 @@ note: using database URL postgresql://root@[::1]:REDACTED_PORT/omicron?sslmode=d note: database schema version matches expected () note: listing all commissioned sleds (use -F to filter, e.g. -F in-service) ============================================= +EXECUTING COMMAND: omdb ["oximeter", "--oximeter-url", "junk", "list-producers"] +termination: Exited(1) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +note: using Oximeter URL junk +Error: failed to fetch collector info + +Caused by: + 0: Communication Error: builder error: relative URL without a base + 1: builder error: relative URL without a base + 2: relative URL without a base +============================================= +EXECUTING COMMAND: omdb ["oxql", "--clickhouse-url", "junk"] +termination: Exited(1) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +Error: failed parsing URL from command-line or environment variable + +Caused by: + relative URL without a base +============================================= diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index b5147b66f9..a65098d7aa 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -405,14 +405,14 @@ task: "dns_propagation_external" task: "nat_v4_garbage_collector" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms last completion reported error: failed to resolve addresses for Dendrite services: no record found for Query { name: Name("_dendrite._tcp.control-plane.oxide.internal."), query_type: SRV, query_class: IN } task: "blueprint_loader" - configured period: every 1m 40s + configured period: every 1m s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -436,7 +436,7 @@ task: "abandoned_vmm_reaper" sled resource reservations deleted: 0 task: "bfd_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -467,7 +467,7 @@ task: "external_endpoints" TLS certificates: 0 task: "instance_watcher" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -503,7 +503,7 @@ task: "metrics_producer_gc" warning: unknown background task: "metrics_producer_gc" (don't know how to interpret details: Object {"expiration": String(""), "pruned": Array []}) task: "phantom_disks" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -511,14 +511,14 @@ task: "phantom_disks" number of phantom disk delete errors: 0 task: "physical_disk_adoption" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a dependent task completing started at (s ago) and ran for ms last completion reported error: task disabled task: "region_replacement" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -526,7 +526,7 @@ task: "region_replacement" number of region replacement start errors: 0 task: "region_replacement_driver" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -541,28 +541,28 @@ task: "service_firewall_rule_propagation" started at (s ago) and ran for ms task: "service_zone_nat_tracker" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms last completion reported error: inventory collection is None task: "switch_port_config_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms warning: unknown background task: "switch_port_config_manager" (don't know how to interpret details: Object {}) task: "v2p_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms warning: unknown background task: "v2p_manager" (don't know how to interpret details: Object {}) task: "vpc_route_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms diff --git a/dev-tools/omdb/tests/test_all_output.rs b/dev-tools/omdb/tests/test_all_output.rs index 19be33631d..6a959d726a 100644 --- a/dev-tools/omdb/tests/test_all_output.rs +++ b/dev-tools/omdb/tests/test_all_output.rs @@ -8,6 +8,7 @@ //! sure you're only breaking what you intend. use expectorate::assert_contents; +use nexus_test_utils::{OXIMETER_UUID, PRODUCER_UUID}; use nexus_test_utils_macros::nexus_test; use nexus_types::deployment::SledFilter; use nexus_types::deployment::UnstableReconfiguratorState; @@ -17,6 +18,7 @@ use omicron_test_utils::dev::test_cmds::run_command; use omicron_test_utils::dev::test_cmds::ExtraRedactions; use slog_error_chain::InlineErrorChain; use std::fmt::Write; +use std::net::IpAddr; use std::path::Path; use subprocess::Exec; @@ -26,6 +28,32 @@ const CMD_OMDB: &str = env!("CARGO_BIN_EXE_omdb"); type ControlPlaneTestContext = nexus_test_utils::ControlPlaneTestContext; +/// The `oximeter` list-producers command output is not easy to compare as a +/// string directly because the timing of registrations with both our test +/// producer and the one nexus registers. But, let's find our test producer +/// in the list. +fn assert_oximeter_list_producers_output( + output: &str, + ox_url: &str, + test_producer: IpAddr, +) { + assert!( + output.contains(format!("Collector ID: {}", OXIMETER_UUID).as_str()) + ); + assert!(output.contains(ox_url)); + + let found = output.lines().any(|line| { + line.contains(PRODUCER_UUID) + && line.contains(&test_producer.to_string()) + }); + + assert!( + found, + "test producer {} and producer UUID {} not found on line together", + test_producer, PRODUCER_UUID + ); +} + #[tokio::test] async fn test_omdb_usage_errors() { let cmd_path = path_to_executable(CMD_OMDB); @@ -57,6 +85,10 @@ async fn test_omdb_usage_errors() { &["sled-agent"], &["sled-agent", "zones"], &["sled-agent", "zpools"], + &["oximeter", "--help"], + &["oxql", "--help"], + // Mispelled argument + &["oxql", "--summarizes"], ]; for args in invocations { @@ -74,10 +106,15 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { ) .await; let cmd_path = path_to_executable(CMD_OMDB); + let postgres_url = cptestctx.database.listen_url(); let nexus_internal_url = format!("http://{}/", cptestctx.internal_client.bind_address); let mgs_url = format!("http://{}/", gwtestctx.client.bind_address); + let ox_url = format!("http://{}/", cptestctx.oximeter.server_address()); + let ox_test_producer = cptestctx.producer.address().ip(); + let ch_url = format!("http://{}/", cptestctx.clickhouse.address); + let tmpdir = camino_tempfile::tempdir() .expect("failed to create temporary directory"); let tmppath = tmpdir.path().join("reconfigurator-save.out"); @@ -124,18 +161,24 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { let p = postgres_url.to_string(); let u = nexus_internal_url.clone(); let g = mgs_url.clone(); + let ox = ox_url.clone(); + let ch = ch_url.clone(); do_run_extra( &mut output, move |exec| { exec.env("OMDB_DB_URL", &p) .env("OMDB_NEXUS_URL", &u) .env("OMDB_MGS_URL", &g) + .env("OMDB_OXIMETER_URL", &ox) + .env("OMDB_CLICKHOUSE_URL", &ch) }, &cmd_path, args, - ExtraRedactions::new() - .variable_length("tmp_path", tmppath.as_str()) - .fixed_length("blueprint_id", &initial_blueprint_id), + Some( + ExtraRedactions::new() + .variable_length("tmp_path", tmppath.as_str()) + .fixed_length("blueprint_id", &initial_blueprint_id), + ), ) .await; } @@ -170,6 +213,23 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { .is_some()); assert!(!parsed.collections.is_empty()); + let ox_invocation = &["oximeter", "list-producers"]; + let mut ox_output = String::new(); + let ox = ox_url.clone(); + + do_run_no_redactions( + &mut ox_output, + move |exec| exec.env("OMDB_OXIMETER_URL", &ox), + &cmd_path, + ox_invocation, + ) + .await; + assert_oximeter_list_producers_output( + &ox_output, + &ox_url, + ox_test_producer, + ); + gwtestctx.teardown().await; } @@ -188,6 +248,9 @@ async fn test_omdb_env_settings(cptestctx: &ControlPlaneTestContext) { let postgres_url = cptestctx.database.listen_url().to_string(); let nexus_internal_url = format!("http://{}", cptestctx.internal_client.bind_address); + let ox_url = format!("http://{}/", cptestctx.oximeter.server_address()); + let ox_test_producer = cptestctx.producer.address().ip(); + let ch_url = format!("http://{}/", cptestctx.clickhouse.address); let dns_sockaddr = cptestctx.internal_dns.dns_server.local_address(); let mut output = String::new(); @@ -263,7 +326,46 @@ async fn test_omdb_env_settings(cptestctx: &ControlPlaneTestContext) { let args = &["--dns-server", &dns_sockaddr.to_string(), "db", "sleds"]; do_run(&mut output, move |exec| exec, &cmd_path, args).await; + // Case: specified in multiple places (command-line argument wins) + let args = &["oximeter", "--oximeter-url", "junk", "list-producers"]; + let ox = ox_url.clone(); + do_run( + &mut output, + move |exec| exec.env("OMDB_OXIMETER_URL", &ox), + &cmd_path, + args, + ) + .await; + + // Case: specified in multiple places (command-line argument wins) + let args = &["oxql", "--clickhouse-url", "junk"]; + do_run( + &mut output, + move |exec| exec.env("OMDB_CLICKHOUSE_URL", &ch_url), + &cmd_path, + args, + ) + .await; + assert_contents("tests/env.out", &output); + + // Oximeter URL + // Case 1: specified on the command line. + // Case 2: is covered by the success tests above. + let ox_args1 = &["oximeter", "--oximeter-url", &ox_url, "list-producers"]; + let mut ox_output1 = String::new(); + do_run_no_redactions( + &mut ox_output1, + move |exec| exec, + &cmd_path, + ox_args1, + ) + .await; + assert_oximeter_list_producers_output( + &ox_output1, + &ox_url, + ox_test_producer, + ); } async fn do_run( @@ -274,8 +376,25 @@ async fn do_run( ) where F: FnOnce(Exec) -> Exec + Send + 'static, { - do_run_extra(output, modexec, cmd_path, args, &ExtraRedactions::new()) - .await; + do_run_extra( + output, + modexec, + cmd_path, + args, + Some(&ExtraRedactions::new()), + ) + .await; +} + +async fn do_run_no_redactions( + output: &mut String, + modexec: F, + cmd_path: &Path, + args: &[&str], +) where + F: FnOnce(Exec) -> Exec + Send + 'static, +{ + do_run_extra(output, modexec, cmd_path, args, None).await; } async fn do_run_extra( @@ -283,18 +402,22 @@ async fn do_run_extra( modexec: F, cmd_path: &Path, args: &[&str], - extra_redactions: &ExtraRedactions<'_>, + extra_redactions: Option<&ExtraRedactions<'_>>, ) where F: FnOnce(Exec) -> Exec + Send + 'static, { - println!("running command with args: {:?}", args); write!( output, "EXECUTING COMMAND: {} {:?}\n", cmd_path.file_name().expect("missing command").to_string_lossy(), args.iter() - .map(|r| redact_extra(r, extra_redactions)) - .collect::>(), + .map(|r| { + extra_redactions.map_or_else( + || r.to_string(), + |redactions| redact_extra(r, redactions), + ) + }) + .collect::>() ) .unwrap(); @@ -326,9 +449,21 @@ async fn do_run_extra( write!(output, "termination: {:?}\n", exit_status).unwrap(); write!(output, "---------------------------------------------\n").unwrap(); write!(output, "stdout:\n").unwrap(); - output.push_str(&redact_extra(&stdout_text, extra_redactions)); + + if let Some(extra_redactions) = extra_redactions { + output.push_str(&redact_extra(&stdout_text, extra_redactions)); + } else { + output.push_str(&stdout_text); + } + write!(output, "---------------------------------------------\n").unwrap(); write!(output, "stderr:\n").unwrap(); - output.push_str(&redact_extra(&stderr_text, extra_redactions)); + + if let Some(extra_redactions) = extra_redactions { + output.push_str(&redact_extra(&stderr_text, extra_redactions)); + } else { + output.push_str(&stderr_text); + } + write!(output, "=============================================\n").unwrap(); } diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index b4679001fa..8762907e81 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -14,6 +14,7 @@ Commands: mgs Debug a specific Management Gateway Service instance nexus Debug a specific Nexus instance oximeter Query oximeter collector state + oxql Enter the Oximeter Query Language shell for interactive querying sled-agent Debug a specific Sled help Print this message or the help of the given subcommand(s) @@ -44,6 +45,7 @@ Commands: mgs Debug a specific Management Gateway Service instance nexus Debug a specific Nexus instance oximeter Query oximeter collector state + oxql Enter the Oximeter Query Language shell for interactive querying sled-agent Debug a specific Sled help Print this message or the help of the given subcommand(s) @@ -615,3 +617,68 @@ Connection Options: Safety Options: -w, --destructive Allow potentially-destructive subcommands ============================================= +EXECUTING COMMAND: omdb ["oximeter", "--help"] +termination: Exited(0) +--------------------------------------------- +stdout: +Query oximeter collector state + +Usage: omdb oximeter [OPTIONS] + +Commands: + list-producers List the producers the collector is assigned to poll + help Print this message or the help of the given subcommand(s) + +Options: + --log-level log level filter [env: LOG_LEVEL=] [default: warn] + -h, --help Print help + +Connection Options: + --oximeter-url URL of the oximeter collector to query [env: + OMDB_OXIMETER_URL=] + --dns-server [env: OMDB_DNS_SERVER=] + +Safety Options: + -w, --destructive Allow potentially-destructive subcommands +--------------------------------------------- +stderr: +============================================= +EXECUTING COMMAND: omdb ["oxql", "--help"] +termination: Exited(0) +--------------------------------------------- +stdout: +Enter the Oximeter Query Language shell for interactive querying + +Usage: omdb oxql [OPTIONS] + +Options: + --log-level log level filter [env: LOG_LEVEL=] [default: warn] + --summaries Print summaries of each SQL query run against the database + --elapsed Print the total elapsed query duration + -h, --help Print help + +Connection Options: + --clickhouse-url + URL of the ClickHouse server to connect to [env: OMDB_CLICKHOUSE_URL=] + --dns-server + [env: OMDB_DNS_SERVER=] + +Safety Options: + -w, --destructive Allow potentially-destructive subcommands +--------------------------------------------- +stderr: +============================================= +EXECUTING COMMAND: omdb ["oxql", "--summarizes"] +termination: Exited(2) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +error: unexpected argument '--summarizes' found + + tip: a similar argument exists: '--summaries' + +Usage: omdb oxql <--clickhouse-url |--summaries|--elapsed> + +For more information, try '--help'. +============================================= diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index 5b7069d06f..5ca1d2d6ed 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -6,16 +6,14 @@ //! at deployment time. use crate::PostgresConfigWithUrl; - -use omicron_common::address::Ipv6Subnet; -use omicron_common::address::NEXUS_TECHPORT_EXTERNAL_PORT; -use omicron_common::address::RACK_PREFIX; -use omicron_common::api::internal::shared::SwitchLocation; - use anyhow::anyhow; use camino::{Utf8Path, Utf8PathBuf}; use dropshot::ConfigDropshot; use dropshot::ConfigLogging; +use omicron_common::address::Ipv6Subnet; +use omicron_common::address::NEXUS_TECHPORT_EXTERNAL_PORT; +use omicron_common::address::RACK_PREFIX; +use omicron_common::api::internal::shared::SwitchLocation; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_with::serde_as; diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 367a2066a1..02bf9152f4 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -409,4 +409,9 @@ impl Oximeter { pub fn collector_id(&self) -> &Uuid { &self.agent.id } + + /// Return the address of the server. + pub fn server_address(&self) -> SocketAddr { + self.server.local_addr() + } } diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index c446bc7822..3811f45be0 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -77,14 +77,14 @@ optional = true workspace = true optional = true -[dependencies.tokio] -workspace = true -features = [ "rt-multi-thread", "macros" ] - [dependencies.tabled] workspace = true optional = true +[dependencies.tokio] +workspace = true +features = [ "rt-multi-thread", "macros" ] + [dev-dependencies] expectorate.workspace = true indexmap.workspace = true diff --git a/oximeter/db/src/bin/oxdb/main.rs b/oximeter/db/src/bin/oxdb/main.rs index ca11dd18a3..871135fb16 100644 --- a/oximeter/db/src/bin/oxdb/main.rs +++ b/oximeter/db/src/bin/oxdb/main.rs @@ -2,7 +2,8 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! Tool for developing against the Oximeter timeseries database, populating data and querying. +//! CLI-Tool for developing against the Oximeter timeseries database, populating +//! data and querying. // Copyright 2024 Oxide Computer Company @@ -13,20 +14,13 @@ use oximeter::{ types::{Cumulative, Sample}, Metric, Target, }; -use oximeter_db::{query, Client, DbWrite}; +use oximeter_db::{query, shells::make_client, Client, DbWrite}; use slog::{debug, info, o, Drain, Level, Logger}; use std::net::IpAddr; -use std::net::SocketAddr; use uuid::Uuid; -#[cfg(feature = "sql")] -mod sql; - -#[cfg(feature = "oxql")] -mod oxql; - -// Samples are inserted in chunks of this size, to avoid large allocations when inserting huge -// numbers of timeseries. +/// Samples are inserted in chunks of this size, to avoid large allocations when inserting huge +/// numbers of timeseries. const INSERT_CHUNK_SIZE: usize = 100_000; /// A target identifying a single virtual machine instance @@ -150,31 +144,16 @@ enum Subcommand { #[cfg(feature = "sql")] Sql { #[clap(flatten)] - opts: crate::sql::ShellOptions, + opts: oximeter_db::shells::sql::ShellOptions, }, /// Enter the Oximeter Query Language shell for interactive querying. - #[cfg(feature = "oxql")] Oxql { #[clap(flatten)] - opts: crate::oxql::ShellOptions, + opts: oximeter_db::shells::oxql::ShellOptions, }, } -async fn make_client( - address: IpAddr, - port: u16, - log: &Logger, -) -> Result { - let address = SocketAddr::new(address, port); - let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .context("Failed to initialize timeseries database")?; - Ok(client) -} - fn describe_data() { let vm = VirtualMachine::new(); print!("Target:\n\n Name: {target_name:?}\n", target_name = vm.name()); @@ -368,11 +347,12 @@ async fn main() -> anyhow::Result<()> { } #[cfg(feature = "sql")] Subcommand::Sql { opts } => { - crate::sql::sql_shell(args.address, args.port, log, opts).await? + oximeter_db::shells::sql::shell(args.address, args.port, log, opts) + .await? } - #[cfg(feature = "oxql")] Subcommand::Oxql { opts } => { - crate::oxql::oxql_shell(args.address, args.port, log, opts).await? + oximeter_db::shells::oxql::shell(args.address, args.port, log, opts) + .await? } } Ok(()) diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 2d6212971e..0c372cedae 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -7,7 +7,6 @@ // Copyright 2024 Oxide Computer Company pub(crate) mod dbwrite; -#[cfg(any(feature = "oxql", test))] pub(crate) mod oxql; pub(crate) mod query_summary; #[cfg(any(feature = "sql", test))] diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index c471a837ea..bbf29653e9 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -32,13 +32,14 @@ use thiserror::Error; mod client; pub mod model; -#[cfg(feature = "oxql")] +#[cfg(any(feature = "oxql", test))] pub mod oxql; pub mod query; +#[cfg(any(feature = "oxql", feature = "sql", test))] +pub mod shells; #[cfg(any(feature = "sql", test))] pub mod sql; -#[cfg(feature = "oxql")] pub use client::oxql::OxqlResult; pub use client::query_summary::QuerySummary; pub use client::Client; @@ -141,12 +142,10 @@ pub enum Error { #[error("SQL error")] Sql(#[from] sql::Error), - #[cfg(any(feature = "oxql", test))] #[error(transparent)] Oxql(oxql::Error), } -#[cfg(any(feature = "oxql", test))] impl From for Error { fn from(e: crate::oxql::Error) -> Self { Error::Oxql(e) diff --git a/oximeter/db/src/oxql/ast/table_ops/filter.rs b/oximeter/db/src/oxql/ast/table_ops/filter.rs index 9e796bc730..b6fc533e4d 100644 --- a/oximeter/db/src/oxql/ast/table_ops/filter.rs +++ b/oximeter/db/src/oxql/ast/table_ops/filter.rs @@ -16,10 +16,10 @@ use crate::oxql::point::DataType; use crate::oxql::point::MetricType; use crate::oxql::point::Points; use crate::oxql::point::ValueArray; -use crate::oxql::query::special_idents; use crate::oxql::Error; use crate::oxql::Table; use crate::oxql::Timeseries; +use crate::shells::special_idents; use chrono::DateTime; use chrono::Utc; use oximeter::FieldType; @@ -61,7 +61,7 @@ impl core::str::FromStr for Filter { const EXPR_COMPLEXITY_ITERATIVE_LIMIT: usize = 32; // A crude limit on expression complexity, governing how many times we -// recurisvely apply a DNF simplification before bailing out. +// recursively apply a DNF simplification before bailing out. const EXPR_COMPLEXITY_RECURSIVE_LIMIT: usize = 32; impl Filter { diff --git a/oximeter/db/src/oxql/query/mod.rs b/oximeter/db/src/oxql/query/mod.rs index 40a6c82f93..e1fada9f2a 100644 --- a/oximeter/db/src/oxql/query/mod.rs +++ b/oximeter/db/src/oxql/query/mod.rs @@ -25,63 +25,6 @@ use chrono::DateTime; use chrono::Utc; use std::time::Duration; -/// Special identifiers for column names or other widely-used values. -pub mod special_idents { - use oximeter::DatumType; - - macro_rules! gen_marker { - ($p:expr, $field:expr) => { - concat!("p", $p, "_", $field) - }; - } - - pub const TIMESTAMP: &str = "timestamp"; - pub const START_TIME: &str = "start_time"; - pub const DATUM: &str = "datum"; - pub const BINS: &str = "bins"; - pub const COUNTS: &str = "counts"; - pub const MIN: &str = "min"; - pub const MAX: &str = "max"; - pub const SUM_OF_SAMPLES: &str = "sum_of_samples"; - pub const SQUARED_MEAN: &str = "squared_mean"; - pub const DATETIME64: &str = "DateTime64"; - pub const ARRAYU64: &str = "Array[u64]"; - pub const ARRAYFLOAT64: &str = "Array[f64]"; - pub const ARRAYINT64: &str = "Array[i64]"; - pub const FLOAT64: &str = "f64"; - pub const UINT64: &str = "u64"; - - pub const DISTRIBUTION_IDENTS: [&str; 15] = [ - "bins", - "counts", - "min", - "max", - "sum_of_samples", - "squared_mean", - gen_marker!("50", "marker_heights"), - gen_marker!("50", "marker_positions"), - gen_marker!("50", "desired_marker_positions"), - gen_marker!("90", "marker_heights"), - gen_marker!("90", "marker_positions"), - gen_marker!("90", "desired_marker_positions"), - gen_marker!("99", "marker_heights"), - gen_marker!("99", "marker_positions"), - gen_marker!("99", "desired_marker_positions"), - ]; - - pub fn array_type_name_from_histogram_type( - type_: DatumType, - ) -> Option { - if !type_.is_histogram() { - return None; - } - Some(format!( - "Array[{}]", - type_.to_string().strip_prefix("Histogram").unwrap().to_lowercase(), - )) - } -} - /// A parsed OxQL query. #[derive(Clone, Debug, PartialEq)] pub struct Query { diff --git a/oximeter/db/src/shells/mod.rs b/oximeter/db/src/shells/mod.rs new file mode 100644 index 0000000000..86ac053c80 --- /dev/null +++ b/oximeter/db/src/shells/mod.rs @@ -0,0 +1,209 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Helper for the OxQL and SQL shell implementations. + +// Copyright 2024 Oxide Computer Company + +use crate::Client; +use crate::DbWrite as _; +use anyhow::Context as _; +use dropshot::EmptyScanParams; +use dropshot::WhichPage; +use oximeter::TimeseriesSchema; +use slog::Logger; +use std::net::IpAddr; +use std::net::SocketAddr; + +#[cfg(any(feature = "oxql", test))] +pub mod oxql; +#[cfg(any(feature = "sql", test))] +pub mod sql; + +/// Special identifiers for column names or other widely-used values. +pub mod special_idents { + use oximeter::DatumType; + + macro_rules! gen_marker { + ($p:expr, $field:expr) => { + concat!("p", $p, "_", $field) + }; + } + + pub const TIMESTAMP: &str = "timestamp"; + pub const START_TIME: &str = "start_time"; + pub const DATUM: &str = "datum"; + pub const BINS: &str = "bins"; + pub const COUNTS: &str = "counts"; + pub const MIN: &str = "min"; + pub const MAX: &str = "max"; + pub const SUM_OF_SAMPLES: &str = "sum_of_samples"; + pub const SQUARED_MEAN: &str = "squared_mean"; + pub const DATETIME64: &str = "DateTime64"; + pub const ARRAYU64: &str = "Array[u64]"; + pub const ARRAYFLOAT64: &str = "Array[f64]"; + pub const ARRAYINT64: &str = "Array[i64]"; + pub const FLOAT64: &str = "f64"; + pub const UINT64: &str = "u64"; + + /// Distribution identifiers. + pub const DISTRIBUTION_IDENTS: [&str; 15] = [ + "bins", + "counts", + "min", + "max", + "sum_of_samples", + "squared_mean", + gen_marker!("50", "marker_heights"), + gen_marker!("50", "marker_positions"), + gen_marker!("50", "desired_marker_positions"), + gen_marker!("90", "marker_heights"), + gen_marker!("90", "marker_positions"), + gen_marker!("90", "desired_marker_positions"), + gen_marker!("99", "marker_heights"), + gen_marker!("99", "marker_positions"), + gen_marker!("99", "desired_marker_positions"), + ]; + + /// Get the array type name for a histogram type. + pub fn array_type_name_from_histogram_type( + type_: DatumType, + ) -> Option { + if !type_.is_histogram() { + return None; + } + Some(format!( + "Array[{}]", + type_.to_string().strip_prefix("Histogram").unwrap().to_lowercase(), + )) + } +} + +/// List the known timeseries. +pub async fn list_timeseries(client: &Client) -> anyhow::Result<()> { + let mut page = WhichPage::First(EmptyScanParams {}); + let limit = 100.try_into().unwrap(); + loop { + let results = client.timeseries_schema_list(&page, limit).await?; + for schema in results.items.iter() { + println!("{}", schema.timeseries_name); + } + if results.next_page.is_some() { + if let Some(last) = results.items.last() { + page = WhichPage::Next(last.timeseries_name.clone()); + } else { + return Ok(()); + } + } else { + return Ok(()); + } + } +} + +/// Describe a single timeseries. +pub async fn describe_timeseries( + client: &Client, + timeseries: &str, +) -> anyhow::Result<()> { + match timeseries.parse() { + Err(_) => eprintln!( + "Invalid timeseries name '{timeseries}, \ + use \\l to list available timeseries by name + " + ), + Ok(name) => { + if let Some(schema) = client.schema_for_timeseries(&name).await? { + let (cols, types) = prepare_columns(&schema); + let mut builder = tabled::builder::Builder::default(); + builder.push_record(cols); // first record is the header + builder.push_record(types); + println!( + "{}", + builder.build().with(tabled::settings::Style::psql()) + ); + } else { + eprintln!("No such timeseries: {timeseries}"); + } + } + } + Ok(()) +} + +/// Create a client to the timeseries database, and ensure the database exists. +pub async fn make_client( + address: IpAddr, + port: u16, + log: &Logger, +) -> Result { + let address = SocketAddr::new(address, port); + let client = Client::new(address, &log); + client + .init_single_node_db() + .await + .context("Failed to initialize timeseries database")?; + Ok(client) +} + +/// Prepare the columns for a timeseries or virtual table. +pub(crate) fn prepare_columns( + schema: &TimeseriesSchema, +) -> (Vec, Vec) { + let mut cols = Vec::with_capacity(schema.field_schema.len() + 2); + let mut types = cols.clone(); + + for field in schema.field_schema.iter() { + cols.push(field.name.clone()); + types.push(field.field_type.to_string()); + } + + cols.push(special_idents::TIMESTAMP.into()); + types.push(special_idents::DATETIME64.into()); + + if schema.datum_type.is_histogram() { + cols.push(special_idents::START_TIME.into()); + types.push(special_idents::DATETIME64.into()); + + cols.push(special_idents::BINS.into()); + types.push( + special_idents::array_type_name_from_histogram_type( + schema.datum_type, + ) + .unwrap(), + ); + + cols.push(special_idents::COUNTS.into()); + types.push(special_idents::ARRAYU64.into()); + + cols.push(special_idents::MIN.into()); + types.push(special_idents::FLOAT64.into()); + + cols.push(special_idents::MAX.into()); + types.push(special_idents::FLOAT64.into()); + + cols.push(special_idents::SUM_OF_SAMPLES.into()); + types.push(special_idents::UINT64.into()); + + cols.push(special_idents::SQUARED_MEAN.into()); + types.push(special_idents::UINT64.into()); + + for quantile in ["P50", "P90", "P99"].iter() { + cols.push(format!("{}_MARKER_HEIGHTS", quantile)); + types.push(special_idents::ARRAYFLOAT64.into()); + cols.push(format!("{}_MARKER_POSITIONS", quantile)); + types.push(special_idents::ARRAYINT64.into()); + cols.push(format!("{}_DESIRED_MARKER_POSITIONS", quantile)); + types.push(special_idents::ARRAYFLOAT64.into()); + } + } else if schema.datum_type.is_cumulative() { + cols.push(special_idents::START_TIME.into()); + types.push(special_idents::DATETIME64.into()); + cols.push(special_idents::DATUM.into()); + types.push(schema.datum_type.to_string()); + } else { + cols.push(special_idents::DATUM.into()); + types.push(schema.datum_type.to_string()); + } + + (cols, types) +} diff --git a/oximeter/db/src/bin/oxdb/oxql.rs b/oximeter/db/src/shells/oxql.rs similarity index 72% rename from oximeter/db/src/bin/oxdb/oxql.rs rename to oximeter/db/src/shells/oxql.rs index ebe55dc7a7..3e81a0f6ec 100644 --- a/oximeter/db/src/bin/oxdb/oxql.rs +++ b/oximeter/db/src/shells/oxql.rs @@ -2,20 +2,14 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! OxQL shell. +//! OxQL shell implementation. // Copyright 2024 Oxide Computer -use crate::make_client; +use super::{list_timeseries, make_client, prepare_columns}; +use crate::{oxql::Table, Client, OxqlResult}; use clap::Args; use crossterm::style::Stylize; -use dropshot::EmptyScanParams; -use dropshot::WhichPage; -use oximeter::TimeseriesSchema; -use oximeter_db::oxql::query::special_idents; -use oximeter_db::oxql::Table; -use oximeter_db::Client; -use oximeter_db::OxqlResult; use reedline::DefaultPrompt; use reedline::DefaultPromptSegment; use reedline::Reedline; @@ -23,228 +17,25 @@ use reedline::Signal; use slog::Logger; use std::net::IpAddr; -#[derive(Clone, Debug, Args)] +/// Options for the OxQL shell. +#[derive(Clone, Debug, Default, Args)] pub struct ShellOptions { /// Print summaries of each SQL query run against the database. #[clap(long = "summaries")] - print_summaries: bool, + pub print_summaries: bool, /// Print the total elapsed query duration. #[clap(long = "elapsed")] - print_elapsed: bool, -} - -// Print help for the basic OxQL commands. -fn print_basic_commands() { - println!("Basic commands:"); - println!(" \\?, \\h, help - Print this help"); - println!(" \\q, quit, exit, ^D - Exit the shell"); - println!(" \\l - List timeseries"); - println!(" \\d - Describe a timeseries"); - println!(" \\ql [] - Get OxQL help about an operation"); - println!(); - println!("Or try entering an OxQL `get` query"); -} - -// Print high-level information about OxQL. -fn print_general_oxql_help() { - const HELP: &str = r#"Oximeter Query Language - -The Oximeter Query Language (OxQL) implements queries as -as sequence of operations. Each of these takes zero or more -timeseries as inputs, and produces zero or more timeseries -as outputs. Operations are chained together with the pipe -operator, "|". - -All queries start with a `get` operation, which selects a -timeseries from the database, by name. For example: - -`get physical_data_link:bytes_received` - -The supported timeseries operations are: - -- get: Select a timeseries by name -- filter: Filter timeseries by field or sample values -- group_by: Group timeseries by fields, applying a reducer. -- join: Join two or more timeseries together - -Run `\ql ` to get specific help about that operation. - "#; - println!("{HELP}"); -} - -// Print help for a specific OxQL operation. -fn print_oxql_operation_help(op: &str) { - match op { - "get" => { - const HELP: &str = r#"get "); - -Get instances of a timeseries by name"#; - println!("{HELP}"); - } - "filter" => { - const HELP: &str = r#"filter "); - -Filter timeseries based on their attributes. - can be a logical combination of filtering -\"atoms\", such as `field_foo > 0`. Expressions -may use any of the usual comparison operators, and -can be nested and combined with && or ||. - -Expressions must refer to the name of a field -for a timeseries at this time, and must compare -against literals. For example, `some_field > 0` -is supported, but `some_field > other_field` is not."#; - println!("{HELP}"); - } - "group_by" => { - const HELP: &str = r#"group_by [, ... ] -group_by [, ... ], - -Group timeseries by the named fields, optionally -specifying a reducer to use when aggregating the -timeseries within each group. If no reducer is -specified, `mean` is used, averaging the values -within each group. - -Current supported reducers: - - mean - - sum"#; - println!("{HELP}"); - } - "join" => { - const HELP: &str = r#"join - -Combine 2 or more tables by peforming a natural -inner join, matching up those with fields of the -same value. Currently, joining does not take into -account the timestamps, and does not align the outputs -directly."#; - println!("{HELP}"); - } - _ => eprintln!("unrecognized OxQL operation: '{op}'"), - } -} - -// List the known timeseries. -async fn list_timeseries(client: &Client) -> anyhow::Result<()> { - let mut page = WhichPage::First(EmptyScanParams {}); - let limit = 100.try_into().unwrap(); - loop { - let results = client.timeseries_schema_list(&page, limit).await?; - for schema in results.items.iter() { - println!("{}", schema.timeseries_name); - } - if results.next_page.is_some() { - if let Some(last) = results.items.last() { - page = WhichPage::Next(last.timeseries_name.clone()); - } else { - return Ok(()); - } - } else { - return Ok(()); - } - } -} - -/// Prepare the columns for a timeseries or virtual table. -pub(crate) fn prepare_columns( - schema: &TimeseriesSchema, -) -> (Vec, Vec) { - let mut cols = Vec::with_capacity(schema.field_schema.len() + 2); - let mut types = cols.clone(); - - for field in schema.field_schema.iter() { - cols.push(field.name.clone()); - types.push(field.field_type.to_string()); - } - - cols.push(special_idents::TIMESTAMP.into()); - types.push(special_idents::DATETIME64.into()); - - if schema.datum_type.is_histogram() { - cols.push(special_idents::START_TIME.into()); - types.push(special_idents::DATETIME64.into()); - - cols.push(special_idents::BINS.into()); - types.push( - special_idents::array_type_name_from_histogram_type( - schema.datum_type, - ) - .unwrap(), - ); - - cols.push(special_idents::COUNTS.into()); - types.push(special_idents::ARRAYU64.into()); - - cols.push(special_idents::MIN.into()); - types.push(special_idents::FLOAT64.into()); - - cols.push(special_idents::MAX.into()); - types.push(special_idents::FLOAT64.into()); - - cols.push(special_idents::SUM_OF_SAMPLES.into()); - types.push(special_idents::UINT64.into()); - - cols.push(special_idents::SQUARED_MEAN.into()); - types.push(special_idents::UINT64.into()); - - for quantile in ["P50", "P90", "P99"].iter() { - cols.push(format!("{}_MARKER_HEIGHTS", quantile)); - types.push(special_idents::ARRAYFLOAT64.into()); - cols.push(format!("{}_MARKER_POSITIONS", quantile)); - types.push(special_idents::ARRAYINT64.into()); - cols.push(format!("{}_DESIRED_MARKER_POSITIONS", quantile)); - types.push(special_idents::ARRAYFLOAT64.into()); - } - } else if schema.datum_type.is_cumulative() { - cols.push(special_idents::START_TIME.into()); - types.push(special_idents::DATETIME64.into()); - cols.push(special_idents::DATUM.into()); - types.push(schema.datum_type.to_string()); - } else { - cols.push(special_idents::DATUM.into()); - types.push(schema.datum_type.to_string()); - } - - (cols, types) -} - -/// Describe a single timeseries. -async fn describe_timeseries( - client: &Client, - timeseries: &str, -) -> anyhow::Result<()> { - match timeseries.parse() { - Err(_) => eprintln!( - "Invalid timeseries name '{timeseries}, \ - use \\l to list available timeseries by name - " - ), - Ok(name) => { - if let Some(schema) = client.schema_for_timeseries(&name).await? { - let (cols, types) = prepare_columns(&schema); - let mut builder = tabled::builder::Builder::default(); - builder.push_record(cols); // first record is the header - builder.push_record(types); - println!( - "{}", - builder.build().with(tabled::settings::Style::psql()) - ); - } else { - eprintln!("No such timeseries: {timeseries}"); - } - } - } - Ok(()) + pub print_elapsed: bool, } -/// Run the OxQL shell. -pub async fn oxql_shell( +/// Run/execute the OxQL shell. +pub async fn shell( address: IpAddr, port: u16, log: Logger, opts: ShellOptions, ) -> anyhow::Result<()> { + // Create the client. let client = make_client(address, port, &log).await?; // A workaround to ensure the client has all available timeseries when the @@ -320,6 +111,127 @@ pub async fn oxql_shell( } } +/// Describe a single timeseries. +async fn describe_timeseries( + client: &Client, + timeseries: &str, +) -> anyhow::Result<()> { + match timeseries.parse() { + Err(_) => eprintln!( + "Invalid timeseries name '{timeseries}, \ + use \\l to list available timeseries by name + " + ), + Ok(name) => { + if let Some(schema) = client.schema_for_timeseries(&name).await? { + let (cols, types) = prepare_columns(&schema); + let mut builder = tabled::builder::Builder::default(); + builder.push_record(cols); // first record is the header + builder.push_record(types); + println!( + "{}", + builder.build().with(tabled::settings::Style::psql()) + ); + } else { + eprintln!("No such timeseries: {timeseries}"); + } + } + } + Ok(()) +} + +/// Print help for a specific OxQL operation. +fn print_oxql_operation_help(op: &str) { + match op { + "get" => { + const HELP: &str = r#"get "); + +Get instances of a timeseries by name"#; + println!("{HELP}"); + } + "filter" => { + const HELP: &str = r#"filter "); + +Filter timeseries based on their attributes. + can be a logical combination of filtering +\"atoms\", such as `field_foo > 0`. Expressions +may use any of the usual comparison operators, and +can be nested and combined with && or ||. + +Expressions must refer to the name of a field +for a timeseries at this time, and must compare +against literals. For example, `some_field > 0` +is supported, but `some_field > other_field` is not."#; + println!("{HELP}"); + } + "group_by" => { + const HELP: &str = r#"group_by [, ... ] +group_by [, ... ], + +Group timeseries by the named fields, optionally +specifying a reducer to use when aggregating the +timeseries within each group. If no reducer is +specified, `mean` is used, averaging the values +within each group. + +Current supported reducers: + - mean + - sum"#; + println!("{HELP}"); + } + "join" => { + const HELP: &str = r#"join + +Combine 2 or more tables by peforming a natural +inner join, matching up those with fields of the +same value. Currently, joining does not take into +account the timestamps, and does not align the outputs +directly."#; + println!("{HELP}"); + } + _ => eprintln!("unrecognized OxQL operation: '{op}'"), + } +} + +/// Print help for the basic OxQL commands. +fn print_basic_commands() { + println!("Basic commands:"); + println!(" \\?, \\h, help - Print this help"); + println!(" \\q, quit, exit, ^D - Exit the shell"); + println!(" \\l - List timeseries"); + println!(" \\d - Describe a timeseries"); + println!(" \\ql [] - Get OxQL help about an operation"); + println!(); + println!("Or try entering an OxQL `get` query"); +} + +/// Print high-level information about OxQL. +fn print_general_oxql_help() { + const HELP: &str = r#"Oximeter Query Language + +The Oximeter Query Language (OxQL) implements queries as +as sequence of operations. Each of these takes zero or more +timeseries as inputs, and produces zero or more timeseries +as outputs. Operations are chained together with the pipe +operator, "|". + +All queries start with a `get` operation, which selects a +timeseries from the database, by name. For example: + +`get physical_data_link:bytes_received` + +The supported timeseries operations are: + +- get: Select a timeseries by name +- filter: Filter timeseries by field or sample values +- group_by: Group timeseries by fields, applying a reducer. +- join: Join two or more timeseries together + +Run `\ql ` to get specific help about that operation. + "#; + println!("{HELP}"); +} + fn print_query_summary( result: &OxqlResult, print_elapsed: bool, diff --git a/oximeter/db/src/bin/oxdb/sql.rs b/oximeter/db/src/shells/sql.rs similarity index 96% rename from oximeter/db/src/bin/oxdb/sql.rs rename to oximeter/db/src/shells/sql.rs index 44780592fc..389ebd5019 100644 --- a/oximeter/db/src/bin/oxdb/sql.rs +++ b/oximeter/db/src/shells/sql.rs @@ -2,20 +2,16 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! SQL shell subcommand for `oxdb`. +//! SQL shell implementation. // Copyright 2024 Oxide Computer Company -use super::oxql; -use crate::make_client; +use super::{make_client, prepare_columns}; +use crate::sql::{function_allow_list, QueryResult, Table}; +use crate::{Client, QuerySummary}; use clap::Args; use dropshot::EmptyScanParams; use dropshot::WhichPage; -use oximeter_db::sql::function_allow_list; -use oximeter_db::sql::QueryResult; -use oximeter_db::sql::Table; -use oximeter_db::Client; -use oximeter_db::QuerySummary; use reedline::DefaultPrompt; use reedline::DefaultPromptSegment; use reedline::Reedline; @@ -23,63 +19,7 @@ use reedline::Signal; use slog::Logger; use std::net::IpAddr; -fn print_basic_commands() { - println!("Basic commands:"); - println!(" \\?, \\h, help - Print this help"); - println!(" \\q, quit, exit, ^D - Exit the shell"); - println!(" \\l - List tables"); - println!(" \\d - Describe a table"); - println!( - " \\f - List or describe ClickHouse SQL functions" - ); - println!(); - println!("Or try entering a SQL `SELECT` statement"); -} - -async fn list_virtual_tables(client: &Client) -> anyhow::Result<()> { - let mut page = WhichPage::First(EmptyScanParams {}); - let limit = 100.try_into().unwrap(); - loop { - let results = client.timeseries_schema_list(&page, limit).await?; - for schema in results.items.iter() { - println!("{}", schema.timeseries_name); - } - if results.next_page.is_some() { - if let Some(last) = results.items.last() { - page = WhichPage::Next(last.timeseries_name.clone()); - } else { - return Ok(()); - } - } else { - return Ok(()); - } - } -} - -async fn describe_virtual_table( - client: &Client, - table: &str, -) -> anyhow::Result<()> { - match table.parse() { - Err(_) => println!("Invalid timeseries name: {table}"), - Ok(name) => { - if let Some(schema) = client.schema_for_timeseries(&name).await? { - let (cols, types) = oxql::prepare_columns(&schema); - let mut builder = tabled::builder::Builder::default(); - builder.push_record(cols); // first record is the header - builder.push_record(types); - println!( - "{}", - builder.build().with(tabled::settings::Style::psql()) - ); - } else { - println!("No such timeseries: {table}"); - } - } - } - Ok(()) -} - +/// Options for the SQL shell. #[derive(Clone, Debug, Args)] pub struct ShellOptions { /// Print query metadata. @@ -107,48 +47,8 @@ impl Default for ShellOptions { } } -fn list_supported_functions() { - println!("Subset of ClickHouse SQL functions currently supported"); - println!( - "See https://clickhouse.com/docs/en/sql-reference/functions for more" - ); - println!(); - for func in function_allow_list().iter() { - println!(" {func}"); - } -} - -fn show_supported_function(name: &str) { - if let Some(func) = function_allow_list().iter().find(|f| f.name == name) { - println!("{}", func.name); - println!(" {}", func.usage); - println!(" {}", func.description); - } else { - println!("No supported function '{name}'"); - } -} - -fn print_sql_query(query: &str) { - println!( - "{}", - sqlformat::format( - &query, - &sqlformat::QueryParams::None, - sqlformat::FormatOptions { uppercase: true, ..Default::default() } - ) - ); - println!(); -} - -fn print_query_summary(table: &Table, summary: &QuerySummary) { - println!("Summary"); - println!(" Query ID: {}", summary.id); - println!(" Result rows: {}", table.rows.len()); - println!(" Time: {:?}", summary.elapsed); - println!(" Read: {}\n", summary.io_summary.read); -} - -pub async fn sql_shell( +/// Run/execute the SQL shell. +pub async fn shell( address: IpAddr, port: u16, log: Logger, @@ -261,3 +161,101 @@ pub async fn sql_shell( } } } + +fn print_basic_commands() { + println!("Basic commands:"); + println!(" \\?, \\h, help - Print this help"); + println!(" \\q, quit, exit, ^D - Exit the shell"); + println!(" \\l - List tables"); + println!(" \\d
- Describe a table"); + println!( + " \\f - List or describe ClickHouse SQL functions" + ); + println!(); + println!("Or try entering a SQL `SELECT` statement"); +} + +async fn list_virtual_tables(client: &Client) -> anyhow::Result<()> { + let mut page = WhichPage::First(EmptyScanParams {}); + let limit = 100.try_into().unwrap(); + loop { + let results = client.timeseries_schema_list(&page, limit).await?; + for schema in results.items.iter() { + println!("{}", schema.timeseries_name); + } + if results.next_page.is_some() { + if let Some(last) = results.items.last() { + page = WhichPage::Next(last.timeseries_name.clone()); + } else { + return Ok(()); + } + } else { + return Ok(()); + } + } +} + +async fn describe_virtual_table( + client: &Client, + table: &str, +) -> anyhow::Result<()> { + match table.parse() { + Err(_) => println!("Invalid timeseries name: {table}"), + Ok(name) => { + if let Some(schema) = client.schema_for_timeseries(&name).await? { + let (cols, types) = prepare_columns(&schema); + let mut builder = tabled::builder::Builder::default(); + builder.push_record(cols); // first record is the header + builder.push_record(types); + println!( + "{}", + builder.build().with(tabled::settings::Style::psql()) + ); + } else { + println!("No such timeseries: {table}"); + } + } + } + Ok(()) +} + +fn list_supported_functions() { + println!("Subset of ClickHouse SQL functions currently supported"); + println!( + "See https://clickhouse.com/docs/en/sql-reference/functions for more" + ); + println!(); + for func in function_allow_list().iter() { + println!(" {func}"); + } +} + +fn show_supported_function(name: &str) { + if let Some(func) = function_allow_list().iter().find(|f| f.name == name) { + println!("{}", func.name); + println!(" {}", func.usage); + println!(" {}", func.description); + } else { + println!("No supported function '{name}'"); + } +} + +fn print_sql_query(query: &str) { + println!( + "{}", + sqlformat::format( + &query, + &sqlformat::QueryParams::None, + sqlformat::FormatOptions { uppercase: true, ..Default::default() } + ) + ); + println!(); +} + +fn print_query_summary(table: &Table, summary: &QuerySummary) { + println!("Summary"); + println!(" Query ID: {}", summary.id); + println!(" Result rows: {}", table.rows.len()); + println!(" Time: {:?}", summary.elapsed); + println!(" Read: {}\n", summary.io_summary.read); +} diff --git a/test-utils/src/dev/test_cmds.rs b/test-utils/src/dev/test_cmds.rs index 3c675ddfd9..5d6b9a152e 100644 --- a/test-utils/src/dev/test_cmds.rs +++ b/test-utils/src/dev/test_cmds.rs @@ -126,8 +126,8 @@ pub fn error_for_enoent() -> String { /// /// This allows use to use expectorate to verify the shape of the CLI output. pub fn redact_variable(input: &str) -> String { - // Replace TCP port numbers. We include the localhost characters to avoid - // catching any random sequence of numbers. + // Replace TCP port numbers. We include the localhost + // characters to avoid catching any random sequence of numbers. let s = regex::Regex::new(r"\[::1\]:\d{4,5}") .unwrap() .replace_all(&input, "[::1]:REDACTED_PORT") @@ -189,6 +189,13 @@ pub fn redact_variable(input: &str) -> String { .replace_all(&s, "s ago") .to_string(); + // Replace interval (s). + let s = regex::Regex::new(r"\d+s") + .unwrap() + .replace_all(&s, "s") + .to_string(); + + // Replace interval (ms). let s = regex::Regex::new(r"\d+ms") .unwrap() .replace_all(&s, "ms")