Skip to content

Commit

Permalink
Sort fields when extracting timeseries schema (#4312)
Browse files Browse the repository at this point in the history
- Fields are reported in a sample via the implementations of `Target`
and `Metric`, which may or may not be sorted. They'll be in declaration
order, if folks derive the traits. When deriving a schema from the
sample, collect fields into a set to ignore order.
- Convert between a `DbFieldList` and `BTreeSet` when inserting /
reading fields from the nested tables in ClickHouse.
- Add sanity test that we're sorting field schema correctly.
- Errors for schema mismatches report entire schema, not just fields.
  • Loading branch information
bnaecker authored Oct 24, 2023
1 parent ca1b0ba commit 9daf99b
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 84 deletions.
169 changes: 109 additions & 60 deletions oximeter/db/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::collections::BTreeSet;
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::num::NonZeroU32;
use std::sync::Mutex;
use tokio::sync::Mutex;
use uuid::Uuid;

#[usdt::provider(provider = "clickhouse__client")]
Expand Down Expand Up @@ -208,16 +208,12 @@ impl Client {
&self,
name: &TimeseriesName,
) -> Result<Option<TimeseriesSchema>, Error> {
{
let map = self.schema.lock().unwrap();
if let Some(s) = map.get(name) {
return Ok(Some(s.clone()));
}
let mut schema = self.schema.lock().await;
if let Some(s) = schema.get(name) {
return Ok(Some(s.clone()));
}
// `get_schema` acquires the lock internally, so the above scope is required to avoid
// deadlock.
self.get_schema().await?;
Ok(self.schema.lock().unwrap().get(name).map(Clone::clone))
self.get_schema_locked(&mut schema).await?;
Ok(schema.get(name).map(Clone::clone))
}

/// List timeseries schema, paginated.
Expand Down Expand Up @@ -384,30 +380,48 @@ impl Client {
&self,
sample: &Sample,
) -> Result<Option<String>, Error> {
let schema = model::schema_for(sample);
let name = schema.timeseries_name.clone();
let maybe_new_schema = match self.schema.lock().unwrap().entry(name) {
Entry::Vacant(entry) => Ok(Some(entry.insert(schema).clone())),
let sample_schema = model::schema_for(sample);
let name = sample_schema.timeseries_name.clone();
let mut schema = self.schema.lock().await;

// We've taken the lock before we do any checks for schema. First, we
// check if we've already got one in the cache. If not, we update all
// the schema from the database, and then check the map again. If we
// find a schema (which now either came from the cache or the latest
// read of the DB), then we check that the derived schema matches. If
// not, we can insert it in the cache and the DB.
if !schema.contains_key(&name) {
self.get_schema_locked(&mut schema).await?;
}
match schema.entry(name) {
Entry::Occupied(entry) => {
let existing_schema = entry.get();
if existing_schema == &schema {
if existing_schema == &sample_schema {
Ok(None)
} else {
let err =
error_for_schema_mismatch(&schema, &existing_schema);
error!(
self.log,
"timeseries schema mismatch, sample will be skipped: {}",
err
"timeseries schema mismatch, sample will be skipped";
"expected" => ?existing_schema,
"actual" => ?sample_schema,
"sample" => ?sample,
);
Err(err)
Err(Error::SchemaMismatch {
expected: existing_schema.clone(),
actual: sample_schema,
})
}
}
}?;
Ok(maybe_new_schema.map(|schema| {
serde_json::to_string(&model::DbTimeseriesSchema::from(schema))
.expect("Failed to convert schema to DB model")
}))
Entry::Vacant(entry) => {
entry.insert(sample_schema.clone());
Ok(Some(
serde_json::to_string(&model::DbTimeseriesSchema::from(
sample_schema,
))
.expect("Failed to convert schema to DB model"),
))
}
}
}

// Select the timeseries, including keys and field values, that match the given field-selection
Expand Down Expand Up @@ -503,10 +517,15 @@ impl Client {
response
}

async fn get_schema(&self) -> Result<(), Error> {
// Get timeseries schema from the database.
//
// Can only be called after acquiring the lock around `self.schema`.
async fn get_schema_locked(
&self,
schema: &mut BTreeMap<TimeseriesName, TimeseriesSchema>,
) -> Result<(), Error> {
debug!(self.log, "retrieving timeseries schema from database");
let sql = {
let schema = self.schema.lock().unwrap();
if schema.is_empty() {
format!(
"SELECT * FROM {db_name}.timeseries_schema FORMAT JSONEachRow;",
Expand Down Expand Up @@ -545,7 +564,7 @@ impl Client {
);
(schema.timeseries_name.clone(), schema)
});
self.schema.lock().unwrap().extend(new);
schema.extend(new);
}
Ok(())
}
Expand Down Expand Up @@ -593,7 +612,7 @@ impl DbWrite for Client {
}
Ok(schema) => {
if let Some(schema) = schema {
debug!(self.log, "new timeseries schema: {:?}", schema);
debug!(self.log, "new timeseries schema"; "schema" => ?schema);
new_schema.push(schema);
}
}
Expand Down Expand Up @@ -730,28 +749,6 @@ async fn handle_db_response(
}
}

// Generate an error describing a schema mismatch
fn error_for_schema_mismatch(
schema: &TimeseriesSchema,
existing_schema: &TimeseriesSchema,
) -> Error {
let expected = existing_schema
.field_schema
.iter()
.map(|field| (field.name.clone(), field.ty))
.collect();
let actual = schema
.field_schema
.iter()
.map(|field| (field.name.clone(), field.ty))
.collect();
Error::SchemaMismatch {
name: schema.timeseries_name.to_string(),
expected,
actual,
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -1599,7 +1596,7 @@ mod tests {
);

// Clear the internal caches of seen schema
client.schema.lock().unwrap().clear();
client.schema.lock().await.clear();

// Insert the new sample
client.insert_samples(&[sample.clone()]).await.unwrap();
Expand All @@ -1611,7 +1608,7 @@ mod tests {
let expected_schema = client
.schema
.lock()
.unwrap()
.await
.get(&timeseries_name)
.expect(
"After inserting a new sample, its schema should be included",
Expand Down Expand Up @@ -2484,13 +2481,13 @@ mod tests {
#[tokio::test]
async fn test_get_schema_no_new_values() {
let (mut db, client, _) = setup_filter_testcase().await;
let schema = &client.schema.lock().unwrap().clone();
client.get_schema().await.expect("Failed to get timeseries schema");
assert_eq!(
schema,
&*client.schema.lock().unwrap(),
"Schema shouldn't change"
);
let original_schema = client.schema.lock().await.clone();
let mut schema = client.schema.lock().await;
client
.get_schema_locked(&mut schema)
.await
.expect("Failed to get timeseries schema");
assert_eq!(&original_schema, &*schema, "Schema shouldn't change");
db.cleanup().await.expect("Failed to cleanup database");
}

Expand Down Expand Up @@ -2585,4 +2582,56 @@ mod tests {
);
db.cleanup().await.expect("Failed to cleanup database");
}

#[tokio::test]
async fn test_update_schema_cache_on_new_sample() {
usdt::register_probes().unwrap();
let logctx = test_setup_log("test_update_schema_cache_on_new_sample");
let log = &logctx.log;

// Let the OS assign a port and discover it after ClickHouse starts
let mut db = ClickHouseInstance::new_single_node(0)
.await
.expect("Failed to start ClickHouse");
let address = SocketAddr::new("::1".parse().unwrap(), db.port());

let client = Client::new(address, &log);
client
.init_single_node_db()
.await
.expect("Failed to initialize timeseries database");
let samples = [test_util::make_sample()];
client.insert_samples(&samples).await.unwrap();

// Get the count of schema directly from the DB, which should have just
// one.
let response = client.execute_with_body(
"SELECT COUNT() FROM oximeter.timeseries_schema FORMAT JSONEachRow;
").await.unwrap();
assert_eq!(response.lines().count(), 1, "Expected exactly 1 schema");
assert_eq!(client.schema.lock().await.len(), 1);

// Clear the internal cache, and insert the sample again.
//
// This should cause us to look up the schema in the DB again, but _not_
// insert a new one.
client.schema.lock().await.clear();
assert!(client.schema.lock().await.is_empty());

client.insert_samples(&samples).await.unwrap();

// Get the count of schema directly from the DB, which should still have
// only the one schema.
let response = client.execute_with_body(
"SELECT COUNT() FROM oximeter.timeseries_schema FORMAT JSONEachRow;
").await.unwrap();
assert_eq!(
response.lines().count(),
1,
"Expected exactly 1 schema again"
);
assert_eq!(client.schema.lock().await.len(), 1);
db.cleanup().await.expect("Failed to cleanup ClickHouse server");
logctx.cleanup_successful();
}
}
Loading

0 comments on commit 9daf99b

Please sign in to comment.