Skip to content

Commit

Permalink
Add a query planner for OxQL
Browse files Browse the repository at this point in the history
- Add types to represent the plan tree and the currently-supported plan
  tree nodes. These mostly correspond to the existing query AST nodes,
  but include information about the expected schema for the input and
  output tables, along with the query AST nodes that "implement" that
  transformation.
- Add an explicit node for computing deltas from a cumulative
  timeseries, automatically after the node for fetching its data from
  the DB. This is currently implicitly done after fetching the data, but
  will be part of an explicit plan step going forward. The ultimate goal
  is to push that into the database itself where possible.
- Adds methods to optimize a query plan, which currently includes the
  predicate-pushdown and limit-pushdown tricks we already do to limit
  the amount of data we get from the database. Adds some tests to verify
  behavior of these optimizations, in particular that they don't change
  the _planned_ output of the query itself.
- Add pretty-printing of the plan tree, and include a way to show that
  in the OxQL shell.
- Add detection of full table scans. Use the planner in OxQL queries,
  _only to verify them_ and check that there are no scans. The queries
  themselves are executed in the original method today.
  • Loading branch information
bnaecker committed Nov 22, 2024
1 parent df13cc7 commit 20b4c6c
Show file tree
Hide file tree
Showing 30 changed files with 4,018 additions and 27 deletions.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ tar = "0.4"
tempfile = "3.10"
term = "0.7"
termios = "0.3"
termtree = "0.5.1"
textwrap = "0.16.1"
test-strategy = "0.3.1"
thiserror = "1.0"
Expand Down
1 change: 1 addition & 0 deletions oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ slog-async.workspace = true
slog-dtrace.workspace = true
slog-term.workspace = true
strum.workspace = true
termtree.workspace = true
thiserror.workspace = true
tokio-util.workspace = true
usdt.workspace = true
Expand Down
60 changes: 53 additions & 7 deletions oximeter/db/src/client/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::oxql::ast::table_ops::filter;
use crate::oxql::ast::table_ops::filter::Filter;
use crate::oxql::ast::table_ops::limit::Limit;
use crate::oxql::ast::table_ops::limit::LimitKind;
use crate::oxql::Query;
use crate::query::field_table_name;
use crate::Error;
use crate::Metric;
Expand Down Expand Up @@ -113,6 +114,40 @@ struct ConsistentKeyGroup {
}

impl Client {
/// Build a query plan for the OxQL query.
pub async fn plan_oxql_query(
&self,
query: impl AsRef<str>,
) -> Result<oxql::plan::Plan, Error> {
let query = query.as_ref();
let parsed_query = oxql::Query::new(query)?;
self.build_query_plan(&parsed_query).await
}

/// Build a query plan for the OxQL query.
async fn build_query_plan(
&self,
query: &Query,
) -> Result<oxql::plan::Plan, Error> {
let referenced_timeseries = query.all_timeseries_names();
let schema = self
.schema
.lock()
.await
.iter()
.filter_map(|(name, schema)| {
if referenced_timeseries.contains(name) {
Some((name.clone(), schema.clone()))
} else {
None
}
})
.collect();
let plan =
oxql::plan::Plan::new(query.parsed_query().clone(), &schema)?;
Ok(plan)
}

/// Run a OxQL query.
pub async fn oxql_query(
&self,
Expand All @@ -132,6 +167,15 @@ impl Client {
// See https://github.com/oxidecomputer/omicron/issues/5298.
let query = query.as_ref();
let parsed_query = oxql::Query::new(query)?;
let plan = self.build_query_plan(&parsed_query).await?;
if plan.requires_full_table_scan() {
return Err(Error::Oxql(anyhow::anyhow!(
"This query requires at least one full table scan. \
Please rewrite the query to filter either the fields \
or timestamps, in order to reduce the amount of data \
fetched from the database."
)));
}
let query_id = Uuid::new_v4();
let query_log =
self.log.new(slog::o!("query_id" => query_id.to_string()));
Expand Down Expand Up @@ -837,12 +881,12 @@ impl Client {
// return.
//
// This is used to ensure that we never go above the limit in
// `MAX_RESULT_SIZE`. That restricts the _total_ number of rows we want
// to retch from the database. So we set our limit to be one more than
// the remainder on our allotment. If we get exactly as many as we set
// in the limit, then we fail the query because there are more rows that
// _would_ be returned. We don't know how many more, but there is at
// least 1 that pushes us over the limit. This prevents tricky
// `MAX_DATABASE_ROWS`. That restricts the _total_ number of rows we
// want to retch from the database. So we set our limit to be one more
// than the remainder on our allotment. If we get exactly as many as we
// set in the limit, then we fail the query because there are more row
// that _would_ be returned. We don't know how many more, but there is
// at least 1 that pushes us over the limit. This prevents tricky
// TOCTOU-like bugs where we need to check the limit twice, and improves
// performance, since we don't return much more than we could possibly
// handle.
Expand Down Expand Up @@ -1293,7 +1337,9 @@ mod tests {
#[tokio::test]
async fn test_get_entire_table() {
let ctx = setup_oxql_test("test_get_entire_table").await;
let query = "get some_target:some_metric";
// We need _some_ filter here to avoid a provable full-table scan.
let query =
"get some_target:some_metric | filter timestamp > @2020-01-01";
let result = ctx
.client
.oxql_query(query)
Expand Down
31 changes: 31 additions & 0 deletions oximeter/db/src/oxql/ast/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use chrono::DateTime;
use chrono::Utc;
use oximeter::FieldType;
use oximeter::FieldValue;
use oxql_types::point::DataType;
use regex::Regex;
use std::borrow::Borrow;
use std::fmt;
Expand All @@ -35,6 +36,20 @@ pub enum Literal {
}

impl Literal {
// Return the name of this literal's type as a string.
pub(crate) fn type_name(&self) -> &'static str {
match self {
Literal::Integer(_) => "Integer",
Literal::Double(_) => "Double",
Literal::String(_) => "String",
Literal::Boolean(_) => "Boolean",
Literal::Uuid(_) => "Uuid",
Literal::Duration(_) => "Duration",
Literal::Timestamp(_) => "Timestamp",
Literal::IpAddr(_) => "IpAddr",
}
}

// Format the literal as a safe, typed string for ClickHouse.
pub(crate) fn as_db_safe_string(&self) -> String {
match self {
Expand Down Expand Up @@ -93,6 +108,22 @@ impl Literal {
}
}

// Return true if this literal can be compared to a datum of the provided
// type.
pub(crate) fn is_compatible_with_datum(&self, data_type: DataType) -> bool {
match (self, data_type) {
(Literal::Integer(_), DataType::Integer)
| (Literal::Double(_), DataType::Double)
| (Literal::String(_), DataType::String)
| (Literal::Boolean(_), DataType::Boolean)
| (Literal::Duration(_), DataType::Integer)
| (Literal::Duration(_), DataType::Double)
| (Literal::Timestamp(_), DataType::Integer)
| (Literal::Timestamp(_), DataType::Double) => true,
(_, _) => false,
}
}

/// Apply the comparison op between self and the provided field.
///
/// Return None if the comparison cannot be applied, either because the type
Expand Down
49 changes: 49 additions & 0 deletions oximeter/db/src/oxql/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
// Copyright 2024 Oxide Computer Company

use std::collections::BTreeSet;
use std::fmt;

use chrono::DateTime;
use chrono::Utc;
use oximeter::TimeseriesName;
Expand All @@ -26,12 +29,32 @@ pub struct Query {
ops: Vec<TableOp>,
}

impl fmt::Display for Query {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let n_ops = self.ops.len();
for (i, op) in self.ops.iter().enumerate() {
write!(f, "{op}")?;
if i < n_ops - 1 {
write!(f, " | ")?;
}
}
Ok(())
}
}

impl Query {
// Return the first operation in the query, which is always a form of `get`.
fn first_op(&self) -> &TableOp {
self.ops.first().expect("Should have parsed at least 1 operation")
}

/// Iterate over the table operations.
pub(crate) fn table_ops(
&self,
) -> impl ExactSizeIterator<Item = &'_ TableOp> + '_ {
self.ops.iter()
}

pub(crate) fn timeseries_name(&self) -> &TimeseriesName {
match self.first_op() {
TableOp::Basic(BasicTableOp::Get(n)) => n,
Expand All @@ -42,6 +65,32 @@ impl Query {
}
}

/// Return _all_ timeseries names referred to by get table operations.
pub(crate) fn all_timeseries_names(&self) -> BTreeSet<&TimeseriesName> {
let mut set = BTreeSet::new();
self.all_timeseries_names_impl(&mut set);
set
}

fn all_timeseries_names_impl<'a>(
&'a self,
set: &mut BTreeSet<&'a TimeseriesName>,
) {
for op in self.ops.iter() {
match op {
TableOp::Basic(BasicTableOp::Get(name)) => {
set.insert(name);
}
TableOp::Basic(_) => {}
TableOp::Grouped(GroupedTableOp { ops }) => {
for query in ops.iter() {
query.all_timeseries_names_impl(set);
}
}
}
}
}

// Check that this query (and any subqueries) start with a get table op, and
// that there are no following get operations. I.e., we have:
//
Expand Down
24 changes: 22 additions & 2 deletions oximeter/db/src/oxql/ast/table_ops/align.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use oxql_types::point::Values;
use oxql_types::Alignment;
use oxql_types::Table;
use oxql_types::Timeseries;
use std::fmt;
use std::time::Duration;

// The maximum factor by which an alignment operation may upsample data.
Expand Down Expand Up @@ -68,7 +69,7 @@ fn verify_max_upsampling_ratio(
///
/// Alignment is used to produce data at the defined timestamps, so that samples
/// from multiple timeseries may be combined or correlated in meaningful ways.
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct Align {
/// The alignment method, used to describe how data over the input period
/// is used to generate an output sample.
Expand All @@ -87,6 +88,16 @@ pub struct Align {
pub period: Duration,
}

impl std::fmt::Display for Align {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let name = match self.method {
AlignmentMethod::Interpolate => "interpolate",
AlignmentMethod::MeanWithin => "mean_within",
};
write!(f, "{}({:?})", name, self.period)
}
}

impl Align {
// Apply the alignment function to the set of tables.
pub(crate) fn apply(
Expand All @@ -108,7 +119,7 @@ impl Align {
}

/// An alignment method.
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum AlignmentMethod {
/// Alignment is done by interpolating the output data at the specified
/// period.
Expand All @@ -118,6 +129,15 @@ pub enum AlignmentMethod {
MeanWithin,
}

impl fmt::Display for AlignmentMethod {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AlignmentMethod::Interpolate => write!(f, "interpolate"),
AlignmentMethod::MeanWithin => write!(f, "mean_within"),
}
}
}

// Align the timeseries in a table by computing the average within each output
// period.
fn align_mean_within(
Expand Down
Loading

0 comments on commit 20b4c6c

Please sign in to comment.