Skip to content

Commit

Permalink
Add basic Oximeter query language (#5273)
Browse files Browse the repository at this point in the history
- Add basic grammar for an Oximeter query language. Includes support for
numeric, string, boolean, UUID, timestamp, IP address, and duration
literals. Queries are constructed in a pipeline of "table operations",
each of which operates on a set of timeseries and produces another set.
- Implement temporal alignment, currently supporting one method that
generates output samples from the mean of the inputs over the alignment
period.
- Add basic subquery support, for fetching multiple timeseries and
joining them
- Implement filtering on fields and timestamps, both in the DB as much
as possible, and the query pipeline; and implement filtering on data
values in code.
- Implement group-by support, where we can currently reduce values
within a group by summing or computing the mean.
- Add public Nexus API endpoints for listing timeseries schema, and
running an OxQL query. Both are currently restricted to fleet readers,
until a more thorough authz process is fleshed out.
- This also reorganizes the internals of the `oximeter_db::client`
module, which were starting to get too unwieldy and littered with
conditional compilation directives.
  • Loading branch information
bnaecker authored Mar 29, 2024
1 parent 5f153c2 commit 17510a6
Show file tree
Hide file tree
Showing 40 changed files with 12,177 additions and 860 deletions.
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ ipcc = { path = "ipcc" }
ipnet = "2.9"
itertools = "0.12.1"
internet-checksum = "0.2"
ipcc-key-value = { path = "ipcc-key-value" }
ipnetwork = { version = "0.20", features = ["schemars"] }
ispf = { git = "https://github.com/oxidecomputer/ispf" }
key-manager = { path = "key-manager" }
Expand Down Expand Up @@ -313,7 +312,6 @@ openapiv3 = "2.0.0"
# must match samael's crate!
openssl = "0.10"
openssl-sys = "0.9"
openssl-probe = "0.1.5"
opte-ioctl = { git = "https://github.com/oxidecomputer/opte", rev = "7ee353a470ea59529ee1b34729681da887aa88ce" }
oso = "0.27"
owo-colors = "4.0.0"
Expand All @@ -330,6 +328,7 @@ partial-io = { version = "0.5.4", features = ["proptest1", "tokio1"] }
parse-size = "1.0.0"
paste = "1.0.14"
percent-encoding = "2.3.1"
peg = "0.8.2"
pem = "3.0"
petgraph = "0.6.4"
postgres-protocol = "0.6.6"
Expand Down Expand Up @@ -368,7 +367,6 @@ schemars = "0.8.16"
secrecy = "0.8.0"
semver = { version = "1.0.22", features = ["std", "serde"] }
serde = { version = "1.0", default-features = false, features = [ "derive", "rc" ] }
serde_derive = "1.0"
serde_human_bytes = { git = "http://github.com/oxidecomputer/serde_human_bytes", branch = "main" }
serde_json = "1.0.114"
serde_path_to_error = "0.1.16"
Expand All @@ -394,12 +392,12 @@ slog-envlogger = "2.2"
slog-error-chain = { git = "https://github.com/oxidecomputer/slog-error-chain", branch = "main", features = ["derive"] }
slog-term = "2.9"
smf = "0.2"
snafu = "0.7"
socket2 = { version = "0.5", features = ["all"] }
sp-sim = { path = "sp-sim" }
sprockets-common = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" }
sprockets-host = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" }
sprockets-rot = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" }
sqlformat = "0.2.3"
sqlparser = { version = "0.43.1", features = [ "visitor" ] }
static_assertions = "1.1.0"
# Please do not change the Steno version to a Git dependency. It makes it
Expand Down
85 changes: 84 additions & 1 deletion nexus/src/app/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use nexus_db_queries::{
db::{fixed_data::FLEET_ID, lookup},
};
use omicron_common::api::external::{Error, InternalContext};
use oximeter_db::Measurement;
use oximeter_db::{
oxql, Measurement, TimeseriesSchema, TimeseriesSchemaPaginationParams,
};
use std::num::NonZeroU32;

impl super::Nexus {
Expand Down Expand Up @@ -96,4 +98,85 @@ impl super::Nexus {
)
.await
}

/// List available timeseries schema.
pub(crate) async fn timeseries_schema_list(
&self,
opctx: &OpContext,
pagination: &TimeseriesSchemaPaginationParams,
limit: NonZeroU32,
) -> Result<dropshot::ResultsPage<TimeseriesSchema>, Error> {
// Must be a fleet user to list timeseries schema.
//
// TODO-security: We need to figure out how to implement proper security
// checks here, letting less-privileged users fetch data for the
// resources they have access to.
opctx.authorize(authz::Action::Read, &authz::FLEET).await?;
self.timeseries_client
.get()
.await
.map_err(|e| {
Error::internal_error(&format!(
"Cannot access timeseries DB: {}",
e
))
})?
.timeseries_schema_list(&pagination.page, limit)
.await
.map_err(|e| match e {
oximeter_db::Error::DatabaseUnavailable(_) => {
Error::ServiceUnavailable {
internal_message: e.to_string(),
}
}
_ => Error::InternalError { internal_message: e.to_string() },
})
}

/// Run an OxQL query against the timeseries database.
pub(crate) async fn timeseries_query(
&self,
opctx: &OpContext,
query: impl AsRef<str>,
) -> Result<Vec<oxql::Table>, Error> {
// Must be a fleet user to list timeseries schema.
//
// TODO-security: We need to figure out how to implement proper security
// checks here, letting less-privileged users fetch data for the
// resources they have access to.
opctx.authorize(authz::Action::Read, &authz::FLEET).await?;
self.timeseries_client
.get()
.await
.map_err(|e| {
Error::internal_error(&format!(
"Cannot access timeseries DB: {}",
e
))
})?
.oxql_query(query)
.await
.map(|result| {
// TODO-observability: The query method returns information
// about the duration of the OxQL query and the database
// resource usage for each contained SQL query. We should
// publish this as a timeseries itself, so that we can track
// improvements to query processing.
//
// For now, simply return the tables alone.
result.tables
})
.map_err(|e| match e {
oximeter_db::Error::DatabaseUnavailable(_) => {
Error::ServiceUnavailable {
internal_message: e.to_string(),
}
}
oximeter_db::Error::Oxql(_)
| oximeter_db::Error::TimeseriesNotFound(_) => {
Error::invalid_request(e.to_string())
}
_ => Error::InternalError { internal_message: e.to_string() },
})
}
}
52 changes: 52 additions & 0 deletions nexus/src/external_api/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ pub(crate) fn external_api() -> NexusApiDescription {

api.register(system_metric)?;
api.register(silo_metric)?;
api.register(timeseries_schema_list)?;
api.register(timeseries_query)?;

api.register(system_update_put_repository)?;
api.register(system_update_get_repository)?;
Expand Down Expand Up @@ -5626,6 +5628,56 @@ async fn silo_metric(
apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await
}

/// List available timeseries schema.
#[endpoint {
method = GET,
path = "/v1/timeseries/schema",
tags = ["metrics"],
}]
async fn timeseries_schema_list(
rqctx: RequestContext<Arc<ServerContext>>,
pag_params: Query<oximeter_db::TimeseriesSchemaPaginationParams>,
) -> Result<HttpResponseOk<ResultsPage<oximeter_db::TimeseriesSchema>>, HttpError>
{
let apictx = rqctx.context();
let handler = async {
let nexus = &apictx.nexus;
let opctx = crate::context::op_context_for_external_api(&rqctx).await?;
let pagination = pag_params.into_inner();
let limit = rqctx.page_limit(&pagination)?;
nexus
.timeseries_schema_list(&opctx, &pagination, limit)
.await
.map(HttpResponseOk)
.map_err(HttpError::from)
};
apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await
}

/// Run a timeseries query, written OxQL.
#[endpoint {
method = POST,
path = "/v1/timeseries/query",
tags = ["metrics"],
}]
async fn timeseries_query(
rqctx: RequestContext<Arc<ServerContext>>,
body: TypedBody<params::TimeseriesQuery>,
) -> Result<HttpResponseOk<Vec<oximeter_db::oxql::Table>>, HttpError> {
let apictx = rqctx.context();
let handler = async {
let nexus = &apictx.nexus;
let opctx = crate::context::op_context_for_external_api(&rqctx).await?;
let query = body.into_inner().query;
nexus
.timeseries_query(&opctx, &query)
.await
.map(HttpResponseOk)
.map_err(HttpError::from)
};
apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await
}

// Updates

/// Upload TUF repository
Expand Down
31 changes: 31 additions & 0 deletions nexus/tests/integration_tests/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,17 @@ pub static DEMO_SILO_METRICS_URL: Lazy<String> = Lazy::new(|| {
)
});

pub static TIMESERIES_LIST_URL: Lazy<String> =
Lazy::new(|| String::from("/v1/timeseries/schema"));

pub static TIMESERIES_QUERY_URL: Lazy<String> =
Lazy::new(|| String::from("/v1/timeseries/query"));

pub static DEMO_TIMESERIES_QUERY: Lazy<params::TimeseriesQuery> =
Lazy::new(|| params::TimeseriesQuery {
query: String::from("get http_service:request_latency_histogram"),
});

// Users
pub static DEMO_USER_CREATE: Lazy<params::UserCreate> =
Lazy::new(|| params::UserCreate {
Expand Down Expand Up @@ -2023,6 +2034,26 @@ pub static VERIFY_ENDPOINTS: Lazy<Vec<VerifyEndpoint>> = Lazy::new(|| {
],
},

VerifyEndpoint {
url: &TIMESERIES_LIST_URL,
visibility: Visibility::Public,
unprivileged_access: UnprivilegedAccess::None,
allowed_methods: vec![
AllowedMethod::Get,
],
},

VerifyEndpoint {
url: &TIMESERIES_QUERY_URL,
visibility: Visibility::Public,
unprivileged_access: UnprivilegedAccess::None,
allowed_methods: vec![
AllowedMethod::Post(
serde_json::to_value(&*DEMO_TIMESERIES_QUERY).unwrap()
),
],
},

/* Silo identity providers */

VerifyEndpoint {
Expand Down
25 changes: 25 additions & 0 deletions nexus/tests/integration_tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use nexus_test_utils::ControlPlaneTestContext;
use nexus_test_utils_macros::nexus_test;
use oximeter::types::Datum;
use oximeter::types::Measurement;
use oximeter::TimeseriesSchema;
use uuid::Uuid;

pub async fn query_for_metrics(
Expand Down Expand Up @@ -238,3 +239,27 @@ async fn test_metrics(
// project 1 unaffected by project 2's resources
assert_silo_metrics(&cptestctx, Some(project1_id), GIB, 4, GIB).await;
}

/// Test that we can correctly list some timeseries schema.
#[nexus_test]
async fn test_timeseries_schema_list(
cptestctx: &ControlPlaneTestContext<omicron_nexus::Server>,
) {
// We should be able to fetch the list of timeseries, and it should include
// Nexus's HTTP latency distribution. This is defined in Nexus itself, and
// should always exist after we've registered as a producer and start
// producing data. Force a collection to ensure that happens.
cptestctx.server.register_as_producer().await;
cptestctx.oximeter.force_collect().await;
let client = &cptestctx.external_client;
let url = "/v1/timeseries/schema";
let schema =
objects_list_page_authz::<TimeseriesSchema>(client, &url).await;
schema
.items
.iter()
.find(|sc| {
sc.timeseries_name == "http_service:request_latency_histogram"
})
.expect("Failed to find HTTP request latency histogram schema");
}
2 changes: 2 additions & 0 deletions nexus/tests/output/nexus_tags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ login_saml POST /login/{silo_name}/saml/{provi
API operations found with tag "metrics"
OPERATION ID METHOD URL PATH
silo_metric GET /v1/metrics/{metric_name}
timeseries_query POST /v1/timeseries/query
timeseries_schema_list GET /v1/timeseries/schema

API operations found with tag "policy"
OPERATION ID METHOD URL PATH
Expand Down
7 changes: 7 additions & 0 deletions nexus/types/src/external_api/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2055,3 +2055,10 @@ pub struct ProbeListSelector {
/// A name or id to use when selecting a probe.
pub name_or_id: Option<NameOrId>,
}

/// A timeseries query string, written in the Oximeter query language.
#[derive(Deserialize, JsonSchema, Serialize)]
pub struct TimeseriesQuery {
/// A timeseries query string, written in the Oximeter query language.
pub query: String,
}
Loading

0 comments on commit 17510a6

Please sign in to comment.