From dcfac83e3012423e886a1fe24a2a368a16f33a72 Mon Sep 17 00:00:00 2001 From: Zeeshan Lakhani Date: Mon, 8 Jul 2024 23:16:54 -0400 Subject: [PATCH] Move `oxql` from `oxdb` to mainline `omdb` (#5988) ## Why?: Concerning [network observability work](https://github.com/orgs/oxidecomputer/projects/55/views/1?filterQuery=&pane=issue&itemId=68336554), this makes the [`oxql`](https://rfd.shared.oxide.computer/rfd/0463) interactive query repl accessible via omdb, as we start to give users and ourselves the ability to query timeseries and metrics more easily. Additionally, in the "now", this aids in debugging through our metrics set and makes it available, via omdb, throughout our ecosystem/a4x2. ## Includes: * Moves `oxql_shell` into the oximeter_db lib for use by both omdb and oxdb. * If no URL is given to `omdb oxql`, it will leverage internal DNS. * Update the oximeter omdb call (for listing producers) to leverage internal. DNS if no URL is given. * Update command/output tests/generations and collector-specific tests for list producers. ## Notes: * The oxql client still expects an socket address as liked it typed specifically v.s. a String. Instead, upon running the `omdb oxql` command, we take in a URL String and parse it into the socket address directly. --------- Co-authored-by: Benjamin Naecker --- .gitignore | 1 + Cargo.lock | 2 + Cargo.toml | 2 +- dev-tools/omdb/Cargo.toml | 2 + dev-tools/omdb/src/bin/omdb/main.rs | 6 +- dev-tools/omdb/src/bin/omdb/oximeter.rs | 49 ++- dev-tools/omdb/src/bin/omdb/oxql.rs | 97 +++++ dev-tools/omdb/tests/env.out | 25 ++ dev-tools/omdb/tests/successes.out | 24 +- dev-tools/omdb/tests/test_all_output.rs | 157 ++++++++- dev-tools/omdb/tests/usage_errors.out | 67 ++++ nexus-config/src/nexus_config.rs | 10 +- oximeter/collector/src/lib.rs | 5 + oximeter/db/Cargo.toml | 8 +- oximeter/db/src/bin/oxdb/main.rs | 42 +-- oximeter/db/src/client/mod.rs | 1 - oximeter/db/src/lib.rs | 7 +- oximeter/db/src/oxql/ast/table_ops/filter.rs | 4 +- oximeter/db/src/oxql/query/mod.rs | 57 --- oximeter/db/src/shells/mod.rs | 209 +++++++++++ oximeter/db/src/{bin/oxdb => shells}/oxql.rs | 350 +++++++------------ oximeter/db/src/{bin/oxdb => shells}/sql.rs | 212 ++++++----- test-utils/src/dev/test_cmds.rs | 11 +- 23 files changed, 879 insertions(+), 469 deletions(-) create mode 100644 dev-tools/omdb/src/bin/omdb/oxql.rs create mode 100644 oximeter/db/src/shells/mod.rs rename oximeter/db/src/{bin/oxdb => shells}/oxql.rs (72%) rename oximeter/db/src/{bin/oxdb => shells}/sql.rs (96%) diff --git a/.gitignore b/.gitignore index 39a8f36144..6e4e0eb42a 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ tags .falcon/* .img/* connectivity-report.json +*.local diff --git a/Cargo.lock b/Cargo.lock index 2b6a918590..b27115b42a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5641,6 +5641,7 @@ dependencies = [ "omicron-uuid-kinds", "omicron-workspace-hack", "oximeter-client", + "oximeter-db", "pq-sys", "ratatui", "reedline", @@ -5655,6 +5656,7 @@ dependencies = [ "textwrap", "tokio", "unicode-width", + "url", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index f217bcd169..06d3ef3130 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -384,7 +384,7 @@ oso = "0.27" owo-colors = "4.0.0" oximeter = { path = "oximeter/oximeter" } oximeter-client = { path = "clients/oximeter-client" } -oximeter-db = { path = "oximeter/db/" } +oximeter-db = { path = "oximeter/db/", default-features = false } oximeter-collector = { path = "oximeter/collector" } oximeter-impl = { path = "oximeter/impl" } oximeter-instruments = { path = "oximeter/instruments" } diff --git a/dev-tools/omdb/Cargo.toml b/dev-tools/omdb/Cargo.toml index 9cdf03093c..e5d898509c 100644 --- a/dev-tools/omdb/Cargo.toml +++ b/dev-tools/omdb/Cargo.toml @@ -37,6 +37,7 @@ nexus-types.workspace = true omicron-common.workspace = true omicron-uuid-kinds.workspace = true oximeter-client.workspace = true +oximeter-db = { workspace = true, default-features = false, features = [ "oxql" ] } # See omicron-rpaths for more about the "pq-sys" dependency. pq-sys = "*" ratatui.workspace = true @@ -51,6 +52,7 @@ tabled.workspace = true textwrap.workspace = true tokio = { workspace = true, features = [ "full" ] } unicode-width.workspace = true +url.workspace = true uuid.workspace = true ipnetwork.workspace = true omicron-workspace-hack.workspace = true diff --git a/dev-tools/omdb/src/bin/omdb/main.rs b/dev-tools/omdb/src/bin/omdb/main.rs index 7469e2ba54..8fc48f5028 100644 --- a/dev-tools/omdb/src/bin/omdb/main.rs +++ b/dev-tools/omdb/src/bin/omdb/main.rs @@ -50,6 +50,7 @@ mod helpers; mod mgs; mod nexus; mod oximeter; +mod oxql; mod sled_agent; #[tokio::main] @@ -66,7 +67,8 @@ async fn main() -> Result<(), anyhow::Error> { OmdbCommands::Db(db) => db.run_cmd(&args, &log).await, OmdbCommands::Mgs(mgs) => mgs.run_cmd(&args, &log).await, OmdbCommands::Nexus(nexus) => nexus.run_cmd(&args, &log).await, - OmdbCommands::Oximeter(oximeter) => oximeter.run_cmd(&log).await, + OmdbCommands::Oximeter(oximeter) => oximeter.run_cmd(&args, &log).await, + OmdbCommands::Oxql(oxql) => oxql.run_cmd(&args, &log).await, OmdbCommands::SledAgent(sled) => sled.run_cmd(&args, &log).await, OmdbCommands::CrucibleAgent(crucible) => crucible.run_cmd(&args).await, } @@ -269,6 +271,8 @@ enum OmdbCommands { Nexus(nexus::NexusArgs), /// Query oximeter collector state Oximeter(oximeter::OximeterArgs), + /// Enter the Oximeter Query Language shell for interactive querying. + Oxql(oxql::OxqlArgs), /// Debug a specific Sled SledAgent(sled_agent::SledAgentArgs), } diff --git a/dev-tools/omdb/src/bin/omdb/oximeter.rs b/dev-tools/omdb/src/bin/omdb/oximeter.rs index a6dc2ce011..c068110b4c 100644 --- a/dev-tools/omdb/src/bin/omdb/oximeter.rs +++ b/dev-tools/omdb/src/bin/omdb/oximeter.rs @@ -5,6 +5,7 @@ //! omdb commands that query oximeter use crate::helpers::CONNECTION_OPTIONS_HEADING; +use crate::Omdb; use anyhow::Context; use clap::Args; use clap::Subcommand; @@ -18,18 +19,17 @@ use tabled::Table; use tabled::Tabled; use uuid::Uuid; +/// Arguments for the oximeter subcommand. #[derive(Debug, Args)] pub struct OximeterArgs { /// URL of the oximeter collector to query #[arg( long, env = "OMDB_OXIMETER_URL", - // This can't be global = true (i.e. passed in later in the - // command-line) because global options can't be required. If this - // changes to being optional, we should set global = true. + global = true, help_heading = CONNECTION_OPTIONS_HEADING, )] - oximeter_url: String, + oximeter_url: Option, #[command(subcommand)] command: OximeterCommands, @@ -38,20 +38,47 @@ pub struct OximeterArgs { /// Subcommands that query oximeter collector state #[derive(Debug, Subcommand)] enum OximeterCommands { - /// List the producers the collector is assigned to poll + /// List the producers the collector is assigned to poll. ListProducers, } impl OximeterArgs { - fn client(&self, log: &Logger) -> Client { - Client::new( - &self.oximeter_url, + async fn client( + &self, + omdb: &Omdb, + log: &Logger, + ) -> Result { + let oximeter_url = match &self.oximeter_url { + Some(cli_or_env_url) => cli_or_env_url.clone(), + None => { + eprintln!( + "note: Oximeter URL not specified. Will pick one from DNS." + ); + let addr = omdb + .dns_lookup_one( + log.clone(), + internal_dns::ServiceName::Oximeter, + ) + .await?; + format!("http://{}", addr) + } + }; + eprintln!("note: using Oximeter URL {}", &oximeter_url); + + let client = Client::new( + &oximeter_url, log.new(slog::o!("component" => "oximeter-client")), - ) + ); + Ok(client) } - pub async fn run_cmd(&self, log: &Logger) -> anyhow::Result<()> { - let client = self.client(log); + /// Run the command. + pub async fn run_cmd( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result<()> { + let client = self.client(omdb, log).await?; match self.command { OximeterCommands::ListProducers => { self.list_producers(client).await diff --git a/dev-tools/omdb/src/bin/omdb/oxql.rs b/dev-tools/omdb/src/bin/omdb/oxql.rs new file mode 100644 index 0000000000..89ddae9cf2 --- /dev/null +++ b/dev-tools/omdb/src/bin/omdb/oxql.rs @@ -0,0 +1,97 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! omdb OxQL shell for interactive queries on metrics/timeseries. + +// Copyright 2024 Oxide Computer + +use crate::helpers::CONNECTION_OPTIONS_HEADING; +use crate::Omdb; +use anyhow::Context; +use clap::Args; +use oximeter_db::{ + self, + shells::oxql::{self, ShellOptions}, +}; +use slog::Logger; +use std::net::SocketAddr; +use url::Url; + +/// Command-line arguments for the OxQL shell. +#[derive(Debug, Args)] +pub struct OxqlArgs { + /// URL of the ClickHouse server to connect to. + #[arg( + long, + env = "OMDB_CLICKHOUSE_URL", + global = true, + help_heading = CONNECTION_OPTIONS_HEADING, + )] + clickhouse_url: Option, + + /// Print summaries of each SQL query run against the database. + #[clap(long = "summaries")] + print_summaries: bool, + + /// Print the total elapsed query duration. + #[clap(long = "elapsed")] + print_elapsed: bool, +} + +impl OxqlArgs { + /// Run the OxQL shell via the `omdb oxql` subcommand. + pub async fn run_cmd( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result<()> { + let addr = self.addr(omdb, log).await?; + + let opts = ShellOptions { + print_summaries: self.print_summaries, + print_elapsed: self.print_elapsed, + }; + + oxql::shell( + addr.ip(), + addr.port(), + log.new(slog::o!("component" => "clickhouse-client")), + opts, + ) + .await + } + + /// Resolve the ClickHouse URL to a socket address. + async fn addr( + &self, + omdb: &Omdb, + log: &Logger, + ) -> anyhow::Result { + match &self.clickhouse_url { + Some(cli_or_env_url) => Url::parse(&cli_or_env_url) + .context( + "failed parsing URL from command-line or environment variable", + )? + .socket_addrs(|| None) + .context("failed resolving socket addresses")? + .into_iter() + .next() + .context("failed resolving socket addresses"), + None => { + eprintln!( + "note: ClickHouse URL not specified. Will pick one from DNS." + ); + + Ok(SocketAddr::V6( + omdb.dns_lookup_one( + log.clone(), + internal_dns::ServiceName::Clickhouse, + ) + .await + .context("failed looking up ClickHouse internal DNS entry")?, + )) + } + } + } +} diff --git a/dev-tools/omdb/tests/env.out b/dev-tools/omdb/tests/env.out index 348ff5e9ac..66a48ab394 100644 --- a/dev-tools/omdb/tests/env.out +++ b/dev-tools/omdb/tests/env.out @@ -433,3 +433,28 @@ note: using database URL postgresql://root@[::1]:REDACTED_PORT/omicron?sslmode=d note: database schema version matches expected () note: listing all commissioned sleds (use -F to filter, e.g. -F in-service) ============================================= +EXECUTING COMMAND: omdb ["oximeter", "--oximeter-url", "junk", "list-producers"] +termination: Exited(1) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +note: using Oximeter URL junk +Error: failed to fetch collector info + +Caused by: + 0: Communication Error: builder error: relative URL without a base + 1: builder error: relative URL without a base + 2: relative URL without a base +============================================= +EXECUTING COMMAND: omdb ["oxql", "--clickhouse-url", "junk"] +termination: Exited(1) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +Error: failed parsing URL from command-line or environment variable + +Caused by: + relative URL without a base +============================================= diff --git a/dev-tools/omdb/tests/successes.out b/dev-tools/omdb/tests/successes.out index b5147b66f9..a65098d7aa 100644 --- a/dev-tools/omdb/tests/successes.out +++ b/dev-tools/omdb/tests/successes.out @@ -405,14 +405,14 @@ task: "dns_propagation_external" task: "nat_v4_garbage_collector" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms last completion reported error: failed to resolve addresses for Dendrite services: no record found for Query { name: Name("_dendrite._tcp.control-plane.oxide.internal."), query_type: SRV, query_class: IN } task: "blueprint_loader" - configured period: every 1m 40s + configured period: every 1m s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -436,7 +436,7 @@ task: "abandoned_vmm_reaper" sled resource reservations deleted: 0 task: "bfd_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -467,7 +467,7 @@ task: "external_endpoints" TLS certificates: 0 task: "instance_watcher" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -503,7 +503,7 @@ task: "metrics_producer_gc" warning: unknown background task: "metrics_producer_gc" (don't know how to interpret details: Object {"expiration": String(""), "pruned": Array []}) task: "phantom_disks" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -511,14 +511,14 @@ task: "phantom_disks" number of phantom disk delete errors: 0 task: "physical_disk_adoption" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a dependent task completing started at (s ago) and ran for ms last completion reported error: task disabled task: "region_replacement" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -526,7 +526,7 @@ task: "region_replacement" number of region replacement start errors: 0 task: "region_replacement_driver" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms @@ -541,28 +541,28 @@ task: "service_firewall_rule_propagation" started at (s ago) and ran for ms task: "service_zone_nat_tracker" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms last completion reported error: inventory collection is None task: "switch_port_config_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms warning: unknown background task: "switch_port_config_manager" (don't know how to interpret details: Object {}) task: "v2p_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms warning: unknown background task: "v2p_manager" (don't know how to interpret details: Object {}) task: "vpc_route_manager" - configured period: every 30s + configured period: every s currently executing: no last completed activation: , triggered by a periodic timer firing started at (s ago) and ran for ms diff --git a/dev-tools/omdb/tests/test_all_output.rs b/dev-tools/omdb/tests/test_all_output.rs index 19be33631d..6a959d726a 100644 --- a/dev-tools/omdb/tests/test_all_output.rs +++ b/dev-tools/omdb/tests/test_all_output.rs @@ -8,6 +8,7 @@ //! sure you're only breaking what you intend. use expectorate::assert_contents; +use nexus_test_utils::{OXIMETER_UUID, PRODUCER_UUID}; use nexus_test_utils_macros::nexus_test; use nexus_types::deployment::SledFilter; use nexus_types::deployment::UnstableReconfiguratorState; @@ -17,6 +18,7 @@ use omicron_test_utils::dev::test_cmds::run_command; use omicron_test_utils::dev::test_cmds::ExtraRedactions; use slog_error_chain::InlineErrorChain; use std::fmt::Write; +use std::net::IpAddr; use std::path::Path; use subprocess::Exec; @@ -26,6 +28,32 @@ const CMD_OMDB: &str = env!("CARGO_BIN_EXE_omdb"); type ControlPlaneTestContext = nexus_test_utils::ControlPlaneTestContext; +/// The `oximeter` list-producers command output is not easy to compare as a +/// string directly because the timing of registrations with both our test +/// producer and the one nexus registers. But, let's find our test producer +/// in the list. +fn assert_oximeter_list_producers_output( + output: &str, + ox_url: &str, + test_producer: IpAddr, +) { + assert!( + output.contains(format!("Collector ID: {}", OXIMETER_UUID).as_str()) + ); + assert!(output.contains(ox_url)); + + let found = output.lines().any(|line| { + line.contains(PRODUCER_UUID) + && line.contains(&test_producer.to_string()) + }); + + assert!( + found, + "test producer {} and producer UUID {} not found on line together", + test_producer, PRODUCER_UUID + ); +} + #[tokio::test] async fn test_omdb_usage_errors() { let cmd_path = path_to_executable(CMD_OMDB); @@ -57,6 +85,10 @@ async fn test_omdb_usage_errors() { &["sled-agent"], &["sled-agent", "zones"], &["sled-agent", "zpools"], + &["oximeter", "--help"], + &["oxql", "--help"], + // Mispelled argument + &["oxql", "--summarizes"], ]; for args in invocations { @@ -74,10 +106,15 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { ) .await; let cmd_path = path_to_executable(CMD_OMDB); + let postgres_url = cptestctx.database.listen_url(); let nexus_internal_url = format!("http://{}/", cptestctx.internal_client.bind_address); let mgs_url = format!("http://{}/", gwtestctx.client.bind_address); + let ox_url = format!("http://{}/", cptestctx.oximeter.server_address()); + let ox_test_producer = cptestctx.producer.address().ip(); + let ch_url = format!("http://{}/", cptestctx.clickhouse.address); + let tmpdir = camino_tempfile::tempdir() .expect("failed to create temporary directory"); let tmppath = tmpdir.path().join("reconfigurator-save.out"); @@ -124,18 +161,24 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { let p = postgres_url.to_string(); let u = nexus_internal_url.clone(); let g = mgs_url.clone(); + let ox = ox_url.clone(); + let ch = ch_url.clone(); do_run_extra( &mut output, move |exec| { exec.env("OMDB_DB_URL", &p) .env("OMDB_NEXUS_URL", &u) .env("OMDB_MGS_URL", &g) + .env("OMDB_OXIMETER_URL", &ox) + .env("OMDB_CLICKHOUSE_URL", &ch) }, &cmd_path, args, - ExtraRedactions::new() - .variable_length("tmp_path", tmppath.as_str()) - .fixed_length("blueprint_id", &initial_blueprint_id), + Some( + ExtraRedactions::new() + .variable_length("tmp_path", tmppath.as_str()) + .fixed_length("blueprint_id", &initial_blueprint_id), + ), ) .await; } @@ -170,6 +213,23 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) { .is_some()); assert!(!parsed.collections.is_empty()); + let ox_invocation = &["oximeter", "list-producers"]; + let mut ox_output = String::new(); + let ox = ox_url.clone(); + + do_run_no_redactions( + &mut ox_output, + move |exec| exec.env("OMDB_OXIMETER_URL", &ox), + &cmd_path, + ox_invocation, + ) + .await; + assert_oximeter_list_producers_output( + &ox_output, + &ox_url, + ox_test_producer, + ); + gwtestctx.teardown().await; } @@ -188,6 +248,9 @@ async fn test_omdb_env_settings(cptestctx: &ControlPlaneTestContext) { let postgres_url = cptestctx.database.listen_url().to_string(); let nexus_internal_url = format!("http://{}", cptestctx.internal_client.bind_address); + let ox_url = format!("http://{}/", cptestctx.oximeter.server_address()); + let ox_test_producer = cptestctx.producer.address().ip(); + let ch_url = format!("http://{}/", cptestctx.clickhouse.address); let dns_sockaddr = cptestctx.internal_dns.dns_server.local_address(); let mut output = String::new(); @@ -263,7 +326,46 @@ async fn test_omdb_env_settings(cptestctx: &ControlPlaneTestContext) { let args = &["--dns-server", &dns_sockaddr.to_string(), "db", "sleds"]; do_run(&mut output, move |exec| exec, &cmd_path, args).await; + // Case: specified in multiple places (command-line argument wins) + let args = &["oximeter", "--oximeter-url", "junk", "list-producers"]; + let ox = ox_url.clone(); + do_run( + &mut output, + move |exec| exec.env("OMDB_OXIMETER_URL", &ox), + &cmd_path, + args, + ) + .await; + + // Case: specified in multiple places (command-line argument wins) + let args = &["oxql", "--clickhouse-url", "junk"]; + do_run( + &mut output, + move |exec| exec.env("OMDB_CLICKHOUSE_URL", &ch_url), + &cmd_path, + args, + ) + .await; + assert_contents("tests/env.out", &output); + + // Oximeter URL + // Case 1: specified on the command line. + // Case 2: is covered by the success tests above. + let ox_args1 = &["oximeter", "--oximeter-url", &ox_url, "list-producers"]; + let mut ox_output1 = String::new(); + do_run_no_redactions( + &mut ox_output1, + move |exec| exec, + &cmd_path, + ox_args1, + ) + .await; + assert_oximeter_list_producers_output( + &ox_output1, + &ox_url, + ox_test_producer, + ); } async fn do_run( @@ -274,8 +376,25 @@ async fn do_run( ) where F: FnOnce(Exec) -> Exec + Send + 'static, { - do_run_extra(output, modexec, cmd_path, args, &ExtraRedactions::new()) - .await; + do_run_extra( + output, + modexec, + cmd_path, + args, + Some(&ExtraRedactions::new()), + ) + .await; +} + +async fn do_run_no_redactions( + output: &mut String, + modexec: F, + cmd_path: &Path, + args: &[&str], +) where + F: FnOnce(Exec) -> Exec + Send + 'static, +{ + do_run_extra(output, modexec, cmd_path, args, None).await; } async fn do_run_extra( @@ -283,18 +402,22 @@ async fn do_run_extra( modexec: F, cmd_path: &Path, args: &[&str], - extra_redactions: &ExtraRedactions<'_>, + extra_redactions: Option<&ExtraRedactions<'_>>, ) where F: FnOnce(Exec) -> Exec + Send + 'static, { - println!("running command with args: {:?}", args); write!( output, "EXECUTING COMMAND: {} {:?}\n", cmd_path.file_name().expect("missing command").to_string_lossy(), args.iter() - .map(|r| redact_extra(r, extra_redactions)) - .collect::>(), + .map(|r| { + extra_redactions.map_or_else( + || r.to_string(), + |redactions| redact_extra(r, redactions), + ) + }) + .collect::>() ) .unwrap(); @@ -326,9 +449,21 @@ async fn do_run_extra( write!(output, "termination: {:?}\n", exit_status).unwrap(); write!(output, "---------------------------------------------\n").unwrap(); write!(output, "stdout:\n").unwrap(); - output.push_str(&redact_extra(&stdout_text, extra_redactions)); + + if let Some(extra_redactions) = extra_redactions { + output.push_str(&redact_extra(&stdout_text, extra_redactions)); + } else { + output.push_str(&stdout_text); + } + write!(output, "---------------------------------------------\n").unwrap(); write!(output, "stderr:\n").unwrap(); - output.push_str(&redact_extra(&stderr_text, extra_redactions)); + + if let Some(extra_redactions) = extra_redactions { + output.push_str(&redact_extra(&stderr_text, extra_redactions)); + } else { + output.push_str(&stderr_text); + } + write!(output, "=============================================\n").unwrap(); } diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index b4679001fa..8762907e81 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -14,6 +14,7 @@ Commands: mgs Debug a specific Management Gateway Service instance nexus Debug a specific Nexus instance oximeter Query oximeter collector state + oxql Enter the Oximeter Query Language shell for interactive querying sled-agent Debug a specific Sled help Print this message or the help of the given subcommand(s) @@ -44,6 +45,7 @@ Commands: mgs Debug a specific Management Gateway Service instance nexus Debug a specific Nexus instance oximeter Query oximeter collector state + oxql Enter the Oximeter Query Language shell for interactive querying sled-agent Debug a specific Sled help Print this message or the help of the given subcommand(s) @@ -615,3 +617,68 @@ Connection Options: Safety Options: -w, --destructive Allow potentially-destructive subcommands ============================================= +EXECUTING COMMAND: omdb ["oximeter", "--help"] +termination: Exited(0) +--------------------------------------------- +stdout: +Query oximeter collector state + +Usage: omdb oximeter [OPTIONS] + +Commands: + list-producers List the producers the collector is assigned to poll + help Print this message or the help of the given subcommand(s) + +Options: + --log-level log level filter [env: LOG_LEVEL=] [default: warn] + -h, --help Print help + +Connection Options: + --oximeter-url URL of the oximeter collector to query [env: + OMDB_OXIMETER_URL=] + --dns-server [env: OMDB_DNS_SERVER=] + +Safety Options: + -w, --destructive Allow potentially-destructive subcommands +--------------------------------------------- +stderr: +============================================= +EXECUTING COMMAND: omdb ["oxql", "--help"] +termination: Exited(0) +--------------------------------------------- +stdout: +Enter the Oximeter Query Language shell for interactive querying + +Usage: omdb oxql [OPTIONS] + +Options: + --log-level log level filter [env: LOG_LEVEL=] [default: warn] + --summaries Print summaries of each SQL query run against the database + --elapsed Print the total elapsed query duration + -h, --help Print help + +Connection Options: + --clickhouse-url + URL of the ClickHouse server to connect to [env: OMDB_CLICKHOUSE_URL=] + --dns-server + [env: OMDB_DNS_SERVER=] + +Safety Options: + -w, --destructive Allow potentially-destructive subcommands +--------------------------------------------- +stderr: +============================================= +EXECUTING COMMAND: omdb ["oxql", "--summarizes"] +termination: Exited(2) +--------------------------------------------- +stdout: +--------------------------------------------- +stderr: +error: unexpected argument '--summarizes' found + + tip: a similar argument exists: '--summaries' + +Usage: omdb oxql <--clickhouse-url |--summaries|--elapsed> + +For more information, try '--help'. +============================================= diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index 5b7069d06f..5ca1d2d6ed 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -6,16 +6,14 @@ //! at deployment time. use crate::PostgresConfigWithUrl; - -use omicron_common::address::Ipv6Subnet; -use omicron_common::address::NEXUS_TECHPORT_EXTERNAL_PORT; -use omicron_common::address::RACK_PREFIX; -use omicron_common::api::internal::shared::SwitchLocation; - use anyhow::anyhow; use camino::{Utf8Path, Utf8PathBuf}; use dropshot::ConfigDropshot; use dropshot::ConfigLogging; +use omicron_common::address::Ipv6Subnet; +use omicron_common::address::NEXUS_TECHPORT_EXTERNAL_PORT; +use omicron_common::address::RACK_PREFIX; +use omicron_common::api::internal::shared::SwitchLocation; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_with::serde_as; diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 367a2066a1..02bf9152f4 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -409,4 +409,9 @@ impl Oximeter { pub fn collector_id(&self) -> &Uuid { &self.agent.id } + + /// Return the address of the server. + pub fn server_address(&self) -> SocketAddr { + self.server.local_addr() + } } diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index c446bc7822..3811f45be0 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -77,14 +77,14 @@ optional = true workspace = true optional = true -[dependencies.tokio] -workspace = true -features = [ "rt-multi-thread", "macros" ] - [dependencies.tabled] workspace = true optional = true +[dependencies.tokio] +workspace = true +features = [ "rt-multi-thread", "macros" ] + [dev-dependencies] expectorate.workspace = true indexmap.workspace = true diff --git a/oximeter/db/src/bin/oxdb/main.rs b/oximeter/db/src/bin/oxdb/main.rs index ca11dd18a3..871135fb16 100644 --- a/oximeter/db/src/bin/oxdb/main.rs +++ b/oximeter/db/src/bin/oxdb/main.rs @@ -2,7 +2,8 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! Tool for developing against the Oximeter timeseries database, populating data and querying. +//! CLI-Tool for developing against the Oximeter timeseries database, populating +//! data and querying. // Copyright 2024 Oxide Computer Company @@ -13,20 +14,13 @@ use oximeter::{ types::{Cumulative, Sample}, Metric, Target, }; -use oximeter_db::{query, Client, DbWrite}; +use oximeter_db::{query, shells::make_client, Client, DbWrite}; use slog::{debug, info, o, Drain, Level, Logger}; use std::net::IpAddr; -use std::net::SocketAddr; use uuid::Uuid; -#[cfg(feature = "sql")] -mod sql; - -#[cfg(feature = "oxql")] -mod oxql; - -// Samples are inserted in chunks of this size, to avoid large allocations when inserting huge -// numbers of timeseries. +/// Samples are inserted in chunks of this size, to avoid large allocations when inserting huge +/// numbers of timeseries. const INSERT_CHUNK_SIZE: usize = 100_000; /// A target identifying a single virtual machine instance @@ -150,31 +144,16 @@ enum Subcommand { #[cfg(feature = "sql")] Sql { #[clap(flatten)] - opts: crate::sql::ShellOptions, + opts: oximeter_db::shells::sql::ShellOptions, }, /// Enter the Oximeter Query Language shell for interactive querying. - #[cfg(feature = "oxql")] Oxql { #[clap(flatten)] - opts: crate::oxql::ShellOptions, + opts: oximeter_db::shells::oxql::ShellOptions, }, } -async fn make_client( - address: IpAddr, - port: u16, - log: &Logger, -) -> Result { - let address = SocketAddr::new(address, port); - let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .context("Failed to initialize timeseries database")?; - Ok(client) -} - fn describe_data() { let vm = VirtualMachine::new(); print!("Target:\n\n Name: {target_name:?}\n", target_name = vm.name()); @@ -368,11 +347,12 @@ async fn main() -> anyhow::Result<()> { } #[cfg(feature = "sql")] Subcommand::Sql { opts } => { - crate::sql::sql_shell(args.address, args.port, log, opts).await? + oximeter_db::shells::sql::shell(args.address, args.port, log, opts) + .await? } - #[cfg(feature = "oxql")] Subcommand::Oxql { opts } => { - crate::oxql::oxql_shell(args.address, args.port, log, opts).await? + oximeter_db::shells::oxql::shell(args.address, args.port, log, opts) + .await? } } Ok(()) diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 2d6212971e..0c372cedae 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -7,7 +7,6 @@ // Copyright 2024 Oxide Computer Company pub(crate) mod dbwrite; -#[cfg(any(feature = "oxql", test))] pub(crate) mod oxql; pub(crate) mod query_summary; #[cfg(any(feature = "sql", test))] diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index c471a837ea..bbf29653e9 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -32,13 +32,14 @@ use thiserror::Error; mod client; pub mod model; -#[cfg(feature = "oxql")] +#[cfg(any(feature = "oxql", test))] pub mod oxql; pub mod query; +#[cfg(any(feature = "oxql", feature = "sql", test))] +pub mod shells; #[cfg(any(feature = "sql", test))] pub mod sql; -#[cfg(feature = "oxql")] pub use client::oxql::OxqlResult; pub use client::query_summary::QuerySummary; pub use client::Client; @@ -141,12 +142,10 @@ pub enum Error { #[error("SQL error")] Sql(#[from] sql::Error), - #[cfg(any(feature = "oxql", test))] #[error(transparent)] Oxql(oxql::Error), } -#[cfg(any(feature = "oxql", test))] impl From for Error { fn from(e: crate::oxql::Error) -> Self { Error::Oxql(e) diff --git a/oximeter/db/src/oxql/ast/table_ops/filter.rs b/oximeter/db/src/oxql/ast/table_ops/filter.rs index 9e796bc730..b6fc533e4d 100644 --- a/oximeter/db/src/oxql/ast/table_ops/filter.rs +++ b/oximeter/db/src/oxql/ast/table_ops/filter.rs @@ -16,10 +16,10 @@ use crate::oxql::point::DataType; use crate::oxql::point::MetricType; use crate::oxql::point::Points; use crate::oxql::point::ValueArray; -use crate::oxql::query::special_idents; use crate::oxql::Error; use crate::oxql::Table; use crate::oxql::Timeseries; +use crate::shells::special_idents; use chrono::DateTime; use chrono::Utc; use oximeter::FieldType; @@ -61,7 +61,7 @@ impl core::str::FromStr for Filter { const EXPR_COMPLEXITY_ITERATIVE_LIMIT: usize = 32; // A crude limit on expression complexity, governing how many times we -// recurisvely apply a DNF simplification before bailing out. +// recursively apply a DNF simplification before bailing out. const EXPR_COMPLEXITY_RECURSIVE_LIMIT: usize = 32; impl Filter { diff --git a/oximeter/db/src/oxql/query/mod.rs b/oximeter/db/src/oxql/query/mod.rs index 40a6c82f93..e1fada9f2a 100644 --- a/oximeter/db/src/oxql/query/mod.rs +++ b/oximeter/db/src/oxql/query/mod.rs @@ -25,63 +25,6 @@ use chrono::DateTime; use chrono::Utc; use std::time::Duration; -/// Special identifiers for column names or other widely-used values. -pub mod special_idents { - use oximeter::DatumType; - - macro_rules! gen_marker { - ($p:expr, $field:expr) => { - concat!("p", $p, "_", $field) - }; - } - - pub const TIMESTAMP: &str = "timestamp"; - pub const START_TIME: &str = "start_time"; - pub const DATUM: &str = "datum"; - pub const BINS: &str = "bins"; - pub const COUNTS: &str = "counts"; - pub const MIN: &str = "min"; - pub const MAX: &str = "max"; - pub const SUM_OF_SAMPLES: &str = "sum_of_samples"; - pub const SQUARED_MEAN: &str = "squared_mean"; - pub const DATETIME64: &str = "DateTime64"; - pub const ARRAYU64: &str = "Array[u64]"; - pub const ARRAYFLOAT64: &str = "Array[f64]"; - pub const ARRAYINT64: &str = "Array[i64]"; - pub const FLOAT64: &str = "f64"; - pub const UINT64: &str = "u64"; - - pub const DISTRIBUTION_IDENTS: [&str; 15] = [ - "bins", - "counts", - "min", - "max", - "sum_of_samples", - "squared_mean", - gen_marker!("50", "marker_heights"), - gen_marker!("50", "marker_positions"), - gen_marker!("50", "desired_marker_positions"), - gen_marker!("90", "marker_heights"), - gen_marker!("90", "marker_positions"), - gen_marker!("90", "desired_marker_positions"), - gen_marker!("99", "marker_heights"), - gen_marker!("99", "marker_positions"), - gen_marker!("99", "desired_marker_positions"), - ]; - - pub fn array_type_name_from_histogram_type( - type_: DatumType, - ) -> Option { - if !type_.is_histogram() { - return None; - } - Some(format!( - "Array[{}]", - type_.to_string().strip_prefix("Histogram").unwrap().to_lowercase(), - )) - } -} - /// A parsed OxQL query. #[derive(Clone, Debug, PartialEq)] pub struct Query { diff --git a/oximeter/db/src/shells/mod.rs b/oximeter/db/src/shells/mod.rs new file mode 100644 index 0000000000..86ac053c80 --- /dev/null +++ b/oximeter/db/src/shells/mod.rs @@ -0,0 +1,209 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Helper for the OxQL and SQL shell implementations. + +// Copyright 2024 Oxide Computer Company + +use crate::Client; +use crate::DbWrite as _; +use anyhow::Context as _; +use dropshot::EmptyScanParams; +use dropshot::WhichPage; +use oximeter::TimeseriesSchema; +use slog::Logger; +use std::net::IpAddr; +use std::net::SocketAddr; + +#[cfg(any(feature = "oxql", test))] +pub mod oxql; +#[cfg(any(feature = "sql", test))] +pub mod sql; + +/// Special identifiers for column names or other widely-used values. +pub mod special_idents { + use oximeter::DatumType; + + macro_rules! gen_marker { + ($p:expr, $field:expr) => { + concat!("p", $p, "_", $field) + }; + } + + pub const TIMESTAMP: &str = "timestamp"; + pub const START_TIME: &str = "start_time"; + pub const DATUM: &str = "datum"; + pub const BINS: &str = "bins"; + pub const COUNTS: &str = "counts"; + pub const MIN: &str = "min"; + pub const MAX: &str = "max"; + pub const SUM_OF_SAMPLES: &str = "sum_of_samples"; + pub const SQUARED_MEAN: &str = "squared_mean"; + pub const DATETIME64: &str = "DateTime64"; + pub const ARRAYU64: &str = "Array[u64]"; + pub const ARRAYFLOAT64: &str = "Array[f64]"; + pub const ARRAYINT64: &str = "Array[i64]"; + pub const FLOAT64: &str = "f64"; + pub const UINT64: &str = "u64"; + + /// Distribution identifiers. + pub const DISTRIBUTION_IDENTS: [&str; 15] = [ + "bins", + "counts", + "min", + "max", + "sum_of_samples", + "squared_mean", + gen_marker!("50", "marker_heights"), + gen_marker!("50", "marker_positions"), + gen_marker!("50", "desired_marker_positions"), + gen_marker!("90", "marker_heights"), + gen_marker!("90", "marker_positions"), + gen_marker!("90", "desired_marker_positions"), + gen_marker!("99", "marker_heights"), + gen_marker!("99", "marker_positions"), + gen_marker!("99", "desired_marker_positions"), + ]; + + /// Get the array type name for a histogram type. + pub fn array_type_name_from_histogram_type( + type_: DatumType, + ) -> Option { + if !type_.is_histogram() { + return None; + } + Some(format!( + "Array[{}]", + type_.to_string().strip_prefix("Histogram").unwrap().to_lowercase(), + )) + } +} + +/// List the known timeseries. +pub async fn list_timeseries(client: &Client) -> anyhow::Result<()> { + let mut page = WhichPage::First(EmptyScanParams {}); + let limit = 100.try_into().unwrap(); + loop { + let results = client.timeseries_schema_list(&page, limit).await?; + for schema in results.items.iter() { + println!("{}", schema.timeseries_name); + } + if results.next_page.is_some() { + if let Some(last) = results.items.last() { + page = WhichPage::Next(last.timeseries_name.clone()); + } else { + return Ok(()); + } + } else { + return Ok(()); + } + } +} + +/// Describe a single timeseries. +pub async fn describe_timeseries( + client: &Client, + timeseries: &str, +) -> anyhow::Result<()> { + match timeseries.parse() { + Err(_) => eprintln!( + "Invalid timeseries name '{timeseries}, \ + use \\l to list available timeseries by name + " + ), + Ok(name) => { + if let Some(schema) = client.schema_for_timeseries(&name).await? { + let (cols, types) = prepare_columns(&schema); + let mut builder = tabled::builder::Builder::default(); + builder.push_record(cols); // first record is the header + builder.push_record(types); + println!( + "{}", + builder.build().with(tabled::settings::Style::psql()) + ); + } else { + eprintln!("No such timeseries: {timeseries}"); + } + } + } + Ok(()) +} + +/// Create a client to the timeseries database, and ensure the database exists. +pub async fn make_client( + address: IpAddr, + port: u16, + log: &Logger, +) -> Result { + let address = SocketAddr::new(address, port); + let client = Client::new(address, &log); + client + .init_single_node_db() + .await + .context("Failed to initialize timeseries database")?; + Ok(client) +} + +/// Prepare the columns for a timeseries or virtual table. +pub(crate) fn prepare_columns( + schema: &TimeseriesSchema, +) -> (Vec, Vec) { + let mut cols = Vec::with_capacity(schema.field_schema.len() + 2); + let mut types = cols.clone(); + + for field in schema.field_schema.iter() { + cols.push(field.name.clone()); + types.push(field.field_type.to_string()); + } + + cols.push(special_idents::TIMESTAMP.into()); + types.push(special_idents::DATETIME64.into()); + + if schema.datum_type.is_histogram() { + cols.push(special_idents::START_TIME.into()); + types.push(special_idents::DATETIME64.into()); + + cols.push(special_idents::BINS.into()); + types.push( + special_idents::array_type_name_from_histogram_type( + schema.datum_type, + ) + .unwrap(), + ); + + cols.push(special_idents::COUNTS.into()); + types.push(special_idents::ARRAYU64.into()); + + cols.push(special_idents::MIN.into()); + types.push(special_idents::FLOAT64.into()); + + cols.push(special_idents::MAX.into()); + types.push(special_idents::FLOAT64.into()); + + cols.push(special_idents::SUM_OF_SAMPLES.into()); + types.push(special_idents::UINT64.into()); + + cols.push(special_idents::SQUARED_MEAN.into()); + types.push(special_idents::UINT64.into()); + + for quantile in ["P50", "P90", "P99"].iter() { + cols.push(format!("{}_MARKER_HEIGHTS", quantile)); + types.push(special_idents::ARRAYFLOAT64.into()); + cols.push(format!("{}_MARKER_POSITIONS", quantile)); + types.push(special_idents::ARRAYINT64.into()); + cols.push(format!("{}_DESIRED_MARKER_POSITIONS", quantile)); + types.push(special_idents::ARRAYFLOAT64.into()); + } + } else if schema.datum_type.is_cumulative() { + cols.push(special_idents::START_TIME.into()); + types.push(special_idents::DATETIME64.into()); + cols.push(special_idents::DATUM.into()); + types.push(schema.datum_type.to_string()); + } else { + cols.push(special_idents::DATUM.into()); + types.push(schema.datum_type.to_string()); + } + + (cols, types) +} diff --git a/oximeter/db/src/bin/oxdb/oxql.rs b/oximeter/db/src/shells/oxql.rs similarity index 72% rename from oximeter/db/src/bin/oxdb/oxql.rs rename to oximeter/db/src/shells/oxql.rs index ebe55dc7a7..3e81a0f6ec 100644 --- a/oximeter/db/src/bin/oxdb/oxql.rs +++ b/oximeter/db/src/shells/oxql.rs @@ -2,20 +2,14 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! OxQL shell. +//! OxQL shell implementation. // Copyright 2024 Oxide Computer -use crate::make_client; +use super::{list_timeseries, make_client, prepare_columns}; +use crate::{oxql::Table, Client, OxqlResult}; use clap::Args; use crossterm::style::Stylize; -use dropshot::EmptyScanParams; -use dropshot::WhichPage; -use oximeter::TimeseriesSchema; -use oximeter_db::oxql::query::special_idents; -use oximeter_db::oxql::Table; -use oximeter_db::Client; -use oximeter_db::OxqlResult; use reedline::DefaultPrompt; use reedline::DefaultPromptSegment; use reedline::Reedline; @@ -23,228 +17,25 @@ use reedline::Signal; use slog::Logger; use std::net::IpAddr; -#[derive(Clone, Debug, Args)] +/// Options for the OxQL shell. +#[derive(Clone, Debug, Default, Args)] pub struct ShellOptions { /// Print summaries of each SQL query run against the database. #[clap(long = "summaries")] - print_summaries: bool, + pub print_summaries: bool, /// Print the total elapsed query duration. #[clap(long = "elapsed")] - print_elapsed: bool, -} - -// Print help for the basic OxQL commands. -fn print_basic_commands() { - println!("Basic commands:"); - println!(" \\?, \\h, help - Print this help"); - println!(" \\q, quit, exit, ^D - Exit the shell"); - println!(" \\l - List timeseries"); - println!(" \\d - Describe a timeseries"); - println!(" \\ql [] - Get OxQL help about an operation"); - println!(); - println!("Or try entering an OxQL `get` query"); -} - -// Print high-level information about OxQL. -fn print_general_oxql_help() { - const HELP: &str = r#"Oximeter Query Language - -The Oximeter Query Language (OxQL) implements queries as -as sequence of operations. Each of these takes zero or more -timeseries as inputs, and produces zero or more timeseries -as outputs. Operations are chained together with the pipe -operator, "|". - -All queries start with a `get` operation, which selects a -timeseries from the database, by name. For example: - -`get physical_data_link:bytes_received` - -The supported timeseries operations are: - -- get: Select a timeseries by name -- filter: Filter timeseries by field or sample values -- group_by: Group timeseries by fields, applying a reducer. -- join: Join two or more timeseries together - -Run `\ql ` to get specific help about that operation. - "#; - println!("{HELP}"); -} - -// Print help for a specific OxQL operation. -fn print_oxql_operation_help(op: &str) { - match op { - "get" => { - const HELP: &str = r#"get "); - -Get instances of a timeseries by name"#; - println!("{HELP}"); - } - "filter" => { - const HELP: &str = r#"filter "); - -Filter timeseries based on their attributes. - can be a logical combination of filtering -\"atoms\", such as `field_foo > 0`. Expressions -may use any of the usual comparison operators, and -can be nested and combined with && or ||. - -Expressions must refer to the name of a field -for a timeseries at this time, and must compare -against literals. For example, `some_field > 0` -is supported, but `some_field > other_field` is not."#; - println!("{HELP}"); - } - "group_by" => { - const HELP: &str = r#"group_by [, ... ] -group_by [, ... ], - -Group timeseries by the named fields, optionally -specifying a reducer to use when aggregating the -timeseries within each group. If no reducer is -specified, `mean` is used, averaging the values -within each group. - -Current supported reducers: - - mean - - sum"#; - println!("{HELP}"); - } - "join" => { - const HELP: &str = r#"join - -Combine 2 or more tables by peforming a natural -inner join, matching up those with fields of the -same value. Currently, joining does not take into -account the timestamps, and does not align the outputs -directly."#; - println!("{HELP}"); - } - _ => eprintln!("unrecognized OxQL operation: '{op}'"), - } -} - -// List the known timeseries. -async fn list_timeseries(client: &Client) -> anyhow::Result<()> { - let mut page = WhichPage::First(EmptyScanParams {}); - let limit = 100.try_into().unwrap(); - loop { - let results = client.timeseries_schema_list(&page, limit).await?; - for schema in results.items.iter() { - println!("{}", schema.timeseries_name); - } - if results.next_page.is_some() { - if let Some(last) = results.items.last() { - page = WhichPage::Next(last.timeseries_name.clone()); - } else { - return Ok(()); - } - } else { - return Ok(()); - } - } -} - -/// Prepare the columns for a timeseries or virtual table. -pub(crate) fn prepare_columns( - schema: &TimeseriesSchema, -) -> (Vec, Vec) { - let mut cols = Vec::with_capacity(schema.field_schema.len() + 2); - let mut types = cols.clone(); - - for field in schema.field_schema.iter() { - cols.push(field.name.clone()); - types.push(field.field_type.to_string()); - } - - cols.push(special_idents::TIMESTAMP.into()); - types.push(special_idents::DATETIME64.into()); - - if schema.datum_type.is_histogram() { - cols.push(special_idents::START_TIME.into()); - types.push(special_idents::DATETIME64.into()); - - cols.push(special_idents::BINS.into()); - types.push( - special_idents::array_type_name_from_histogram_type( - schema.datum_type, - ) - .unwrap(), - ); - - cols.push(special_idents::COUNTS.into()); - types.push(special_idents::ARRAYU64.into()); - - cols.push(special_idents::MIN.into()); - types.push(special_idents::FLOAT64.into()); - - cols.push(special_idents::MAX.into()); - types.push(special_idents::FLOAT64.into()); - - cols.push(special_idents::SUM_OF_SAMPLES.into()); - types.push(special_idents::UINT64.into()); - - cols.push(special_idents::SQUARED_MEAN.into()); - types.push(special_idents::UINT64.into()); - - for quantile in ["P50", "P90", "P99"].iter() { - cols.push(format!("{}_MARKER_HEIGHTS", quantile)); - types.push(special_idents::ARRAYFLOAT64.into()); - cols.push(format!("{}_MARKER_POSITIONS", quantile)); - types.push(special_idents::ARRAYINT64.into()); - cols.push(format!("{}_DESIRED_MARKER_POSITIONS", quantile)); - types.push(special_idents::ARRAYFLOAT64.into()); - } - } else if schema.datum_type.is_cumulative() { - cols.push(special_idents::START_TIME.into()); - types.push(special_idents::DATETIME64.into()); - cols.push(special_idents::DATUM.into()); - types.push(schema.datum_type.to_string()); - } else { - cols.push(special_idents::DATUM.into()); - types.push(schema.datum_type.to_string()); - } - - (cols, types) -} - -/// Describe a single timeseries. -async fn describe_timeseries( - client: &Client, - timeseries: &str, -) -> anyhow::Result<()> { - match timeseries.parse() { - Err(_) => eprintln!( - "Invalid timeseries name '{timeseries}, \ - use \\l to list available timeseries by name - " - ), - Ok(name) => { - if let Some(schema) = client.schema_for_timeseries(&name).await? { - let (cols, types) = prepare_columns(&schema); - let mut builder = tabled::builder::Builder::default(); - builder.push_record(cols); // first record is the header - builder.push_record(types); - println!( - "{}", - builder.build().with(tabled::settings::Style::psql()) - ); - } else { - eprintln!("No such timeseries: {timeseries}"); - } - } - } - Ok(()) + pub print_elapsed: bool, } -/// Run the OxQL shell. -pub async fn oxql_shell( +/// Run/execute the OxQL shell. +pub async fn shell( address: IpAddr, port: u16, log: Logger, opts: ShellOptions, ) -> anyhow::Result<()> { + // Create the client. let client = make_client(address, port, &log).await?; // A workaround to ensure the client has all available timeseries when the @@ -320,6 +111,127 @@ pub async fn oxql_shell( } } +/// Describe a single timeseries. +async fn describe_timeseries( + client: &Client, + timeseries: &str, +) -> anyhow::Result<()> { + match timeseries.parse() { + Err(_) => eprintln!( + "Invalid timeseries name '{timeseries}, \ + use \\l to list available timeseries by name + " + ), + Ok(name) => { + if let Some(schema) = client.schema_for_timeseries(&name).await? { + let (cols, types) = prepare_columns(&schema); + let mut builder = tabled::builder::Builder::default(); + builder.push_record(cols); // first record is the header + builder.push_record(types); + println!( + "{}", + builder.build().with(tabled::settings::Style::psql()) + ); + } else { + eprintln!("No such timeseries: {timeseries}"); + } + } + } + Ok(()) +} + +/// Print help for a specific OxQL operation. +fn print_oxql_operation_help(op: &str) { + match op { + "get" => { + const HELP: &str = r#"get "); + +Get instances of a timeseries by name"#; + println!("{HELP}"); + } + "filter" => { + const HELP: &str = r#"filter "); + +Filter timeseries based on their attributes. + can be a logical combination of filtering +\"atoms\", such as `field_foo > 0`. Expressions +may use any of the usual comparison operators, and +can be nested and combined with && or ||. + +Expressions must refer to the name of a field +for a timeseries at this time, and must compare +against literals. For example, `some_field > 0` +is supported, but `some_field > other_field` is not."#; + println!("{HELP}"); + } + "group_by" => { + const HELP: &str = r#"group_by [, ... ] +group_by [, ... ], + +Group timeseries by the named fields, optionally +specifying a reducer to use when aggregating the +timeseries within each group. If no reducer is +specified, `mean` is used, averaging the values +within each group. + +Current supported reducers: + - mean + - sum"#; + println!("{HELP}"); + } + "join" => { + const HELP: &str = r#"join + +Combine 2 or more tables by peforming a natural +inner join, matching up those with fields of the +same value. Currently, joining does not take into +account the timestamps, and does not align the outputs +directly."#; + println!("{HELP}"); + } + _ => eprintln!("unrecognized OxQL operation: '{op}'"), + } +} + +/// Print help for the basic OxQL commands. +fn print_basic_commands() { + println!("Basic commands:"); + println!(" \\?, \\h, help - Print this help"); + println!(" \\q, quit, exit, ^D - Exit the shell"); + println!(" \\l - List timeseries"); + println!(" \\d - Describe a timeseries"); + println!(" \\ql [] - Get OxQL help about an operation"); + println!(); + println!("Or try entering an OxQL `get` query"); +} + +/// Print high-level information about OxQL. +fn print_general_oxql_help() { + const HELP: &str = r#"Oximeter Query Language + +The Oximeter Query Language (OxQL) implements queries as +as sequence of operations. Each of these takes zero or more +timeseries as inputs, and produces zero or more timeseries +as outputs. Operations are chained together with the pipe +operator, "|". + +All queries start with a `get` operation, which selects a +timeseries from the database, by name. For example: + +`get physical_data_link:bytes_received` + +The supported timeseries operations are: + +- get: Select a timeseries by name +- filter: Filter timeseries by field or sample values +- group_by: Group timeseries by fields, applying a reducer. +- join: Join two or more timeseries together + +Run `\ql ` to get specific help about that operation. + "#; + println!("{HELP}"); +} + fn print_query_summary( result: &OxqlResult, print_elapsed: bool, diff --git a/oximeter/db/src/bin/oxdb/sql.rs b/oximeter/db/src/shells/sql.rs similarity index 96% rename from oximeter/db/src/bin/oxdb/sql.rs rename to oximeter/db/src/shells/sql.rs index 44780592fc..389ebd5019 100644 --- a/oximeter/db/src/bin/oxdb/sql.rs +++ b/oximeter/db/src/shells/sql.rs @@ -2,20 +2,16 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! SQL shell subcommand for `oxdb`. +//! SQL shell implementation. // Copyright 2024 Oxide Computer Company -use super::oxql; -use crate::make_client; +use super::{make_client, prepare_columns}; +use crate::sql::{function_allow_list, QueryResult, Table}; +use crate::{Client, QuerySummary}; use clap::Args; use dropshot::EmptyScanParams; use dropshot::WhichPage; -use oximeter_db::sql::function_allow_list; -use oximeter_db::sql::QueryResult; -use oximeter_db::sql::Table; -use oximeter_db::Client; -use oximeter_db::QuerySummary; use reedline::DefaultPrompt; use reedline::DefaultPromptSegment; use reedline::Reedline; @@ -23,63 +19,7 @@ use reedline::Signal; use slog::Logger; use std::net::IpAddr; -fn print_basic_commands() { - println!("Basic commands:"); - println!(" \\?, \\h, help - Print this help"); - println!(" \\q, quit, exit, ^D - Exit the shell"); - println!(" \\l - List tables"); - println!(" \\d - Describe a table"); - println!( - " \\f - List or describe ClickHouse SQL functions" - ); - println!(); - println!("Or try entering a SQL `SELECT` statement"); -} - -async fn list_virtual_tables(client: &Client) -> anyhow::Result<()> { - let mut page = WhichPage::First(EmptyScanParams {}); - let limit = 100.try_into().unwrap(); - loop { - let results = client.timeseries_schema_list(&page, limit).await?; - for schema in results.items.iter() { - println!("{}", schema.timeseries_name); - } - if results.next_page.is_some() { - if let Some(last) = results.items.last() { - page = WhichPage::Next(last.timeseries_name.clone()); - } else { - return Ok(()); - } - } else { - return Ok(()); - } - } -} - -async fn describe_virtual_table( - client: &Client, - table: &str, -) -> anyhow::Result<()> { - match table.parse() { - Err(_) => println!("Invalid timeseries name: {table}"), - Ok(name) => { - if let Some(schema) = client.schema_for_timeseries(&name).await? { - let (cols, types) = oxql::prepare_columns(&schema); - let mut builder = tabled::builder::Builder::default(); - builder.push_record(cols); // first record is the header - builder.push_record(types); - println!( - "{}", - builder.build().with(tabled::settings::Style::psql()) - ); - } else { - println!("No such timeseries: {table}"); - } - } - } - Ok(()) -} - +/// Options for the SQL shell. #[derive(Clone, Debug, Args)] pub struct ShellOptions { /// Print query metadata. @@ -107,48 +47,8 @@ impl Default for ShellOptions { } } -fn list_supported_functions() { - println!("Subset of ClickHouse SQL functions currently supported"); - println!( - "See https://clickhouse.com/docs/en/sql-reference/functions for more" - ); - println!(); - for func in function_allow_list().iter() { - println!(" {func}"); - } -} - -fn show_supported_function(name: &str) { - if let Some(func) = function_allow_list().iter().find(|f| f.name == name) { - println!("{}", func.name); - println!(" {}", func.usage); - println!(" {}", func.description); - } else { - println!("No supported function '{name}'"); - } -} - -fn print_sql_query(query: &str) { - println!( - "{}", - sqlformat::format( - &query, - &sqlformat::QueryParams::None, - sqlformat::FormatOptions { uppercase: true, ..Default::default() } - ) - ); - println!(); -} - -fn print_query_summary(table: &Table, summary: &QuerySummary) { - println!("Summary"); - println!(" Query ID: {}", summary.id); - println!(" Result rows: {}", table.rows.len()); - println!(" Time: {:?}", summary.elapsed); - println!(" Read: {}\n", summary.io_summary.read); -} - -pub async fn sql_shell( +/// Run/execute the SQL shell. +pub async fn shell( address: IpAddr, port: u16, log: Logger, @@ -261,3 +161,101 @@ pub async fn sql_shell( } } } + +fn print_basic_commands() { + println!("Basic commands:"); + println!(" \\?, \\h, help - Print this help"); + println!(" \\q, quit, exit, ^D - Exit the shell"); + println!(" \\l - List tables"); + println!(" \\d
- Describe a table"); + println!( + " \\f - List or describe ClickHouse SQL functions" + ); + println!(); + println!("Or try entering a SQL `SELECT` statement"); +} + +async fn list_virtual_tables(client: &Client) -> anyhow::Result<()> { + let mut page = WhichPage::First(EmptyScanParams {}); + let limit = 100.try_into().unwrap(); + loop { + let results = client.timeseries_schema_list(&page, limit).await?; + for schema in results.items.iter() { + println!("{}", schema.timeseries_name); + } + if results.next_page.is_some() { + if let Some(last) = results.items.last() { + page = WhichPage::Next(last.timeseries_name.clone()); + } else { + return Ok(()); + } + } else { + return Ok(()); + } + } +} + +async fn describe_virtual_table( + client: &Client, + table: &str, +) -> anyhow::Result<()> { + match table.parse() { + Err(_) => println!("Invalid timeseries name: {table}"), + Ok(name) => { + if let Some(schema) = client.schema_for_timeseries(&name).await? { + let (cols, types) = prepare_columns(&schema); + let mut builder = tabled::builder::Builder::default(); + builder.push_record(cols); // first record is the header + builder.push_record(types); + println!( + "{}", + builder.build().with(tabled::settings::Style::psql()) + ); + } else { + println!("No such timeseries: {table}"); + } + } + } + Ok(()) +} + +fn list_supported_functions() { + println!("Subset of ClickHouse SQL functions currently supported"); + println!( + "See https://clickhouse.com/docs/en/sql-reference/functions for more" + ); + println!(); + for func in function_allow_list().iter() { + println!(" {func}"); + } +} + +fn show_supported_function(name: &str) { + if let Some(func) = function_allow_list().iter().find(|f| f.name == name) { + println!("{}", func.name); + println!(" {}", func.usage); + println!(" {}", func.description); + } else { + println!("No supported function '{name}'"); + } +} + +fn print_sql_query(query: &str) { + println!( + "{}", + sqlformat::format( + &query, + &sqlformat::QueryParams::None, + sqlformat::FormatOptions { uppercase: true, ..Default::default() } + ) + ); + println!(); +} + +fn print_query_summary(table: &Table, summary: &QuerySummary) { + println!("Summary"); + println!(" Query ID: {}", summary.id); + println!(" Result rows: {}", table.rows.len()); + println!(" Time: {:?}", summary.elapsed); + println!(" Read: {}\n", summary.io_summary.read); +} diff --git a/test-utils/src/dev/test_cmds.rs b/test-utils/src/dev/test_cmds.rs index 3c675ddfd9..5d6b9a152e 100644 --- a/test-utils/src/dev/test_cmds.rs +++ b/test-utils/src/dev/test_cmds.rs @@ -126,8 +126,8 @@ pub fn error_for_enoent() -> String { /// /// This allows use to use expectorate to verify the shape of the CLI output. pub fn redact_variable(input: &str) -> String { - // Replace TCP port numbers. We include the localhost characters to avoid - // catching any random sequence of numbers. + // Replace TCP port numbers. We include the localhost + // characters to avoid catching any random sequence of numbers. let s = regex::Regex::new(r"\[::1\]:\d{4,5}") .unwrap() .replace_all(&input, "[::1]:REDACTED_PORT") @@ -189,6 +189,13 @@ pub fn redact_variable(input: &str) -> String { .replace_all(&s, "s ago") .to_string(); + // Replace interval (s). + let s = regex::Regex::new(r"\d+s") + .unwrap() + .replace_all(&s, "s") + .to_string(); + + // Replace interval (ms). let s = regex::Regex::new(r"\d+ms") .unwrap() .replace_all(&s, "ms")