Skip to content

Commit

Permalink
[APR-278] Add config refresher to update API key (#351)
Browse files Browse the repository at this point in the history
* WIP

* use const for endpoint

* feedback

* update naming

* more renaming

* partial feedback 2

* remove query_agent

* wrap Arc around ArcSwap value

* update doc

* add derive clone

* merge main

* feedback

* Use Optional refresher config on primary endpoint

* attempt fix pipeline

* nits

* doc

* remove default agent ipc port

* spelling
  • Loading branch information
rayz authored Dec 4, 2024
1 parent ccc6dad commit 5ffdc5e
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 11 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions bin/agent-data-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use saluki_components::{
AggregateConfiguration, ChainedConfiguration, HostEnrichmentConfiguration, OriginEnrichmentConfiguration,
},
};
use saluki_config::{ConfigurationLoader, GenericConfiguration};
use saluki_config::{ConfigurationLoader, GenericConfiguration, RefreshableConfiguration, RefresherConfiguration};
use saluki_core::topology::TopologyBlueprint;
use saluki_error::{ErrorContext as _, GenericError};
use saluki_health::HealthRegistry;
Expand Down Expand Up @@ -176,8 +176,22 @@ fn create_topology(
.with_transform_builder(host_enrichment_config)
.with_transform_builder(origin_enrichment_config);
let internal_metrics_remap_config = AgentTelemetryRemapperConfiguration::new();
let dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration)

let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration)
.error_context("Failed to configure Datadog Metrics destination.")?;

match RefresherConfiguration::from_configuration(configuration) {
Ok(refresher_configuration) => {
let refreshable_configuration: RefreshableConfiguration = refresher_configuration.build()?;
dd_metrics_config.add_refreshable_configuration(refreshable_configuration);
}
Err(_) => {
info!(
"Dynamic configuration refreshing will be unavailable due to failure to configure refresher configuration."
)
}
}

let events_service_checks_config = DatadogEventsServiceChecksConfiguration::from_configuration(configuration)
.error_context("Failed to configure Datadog Events/Service Checks destination.")?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
};

use regex::Regex;
use saluki_config::RefreshableConfiguration;
use saluki_metadata;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr, OneOrMany, PickFirst};
Expand Down Expand Up @@ -82,6 +83,7 @@ impl AdditionalEndpoints {
resolved.push(ResolvedEndpoint {
endpoint: endpoint.clone(),
api_key: trimmed_api_key.to_string(),
config: None,
});
}
}
Expand All @@ -98,6 +100,7 @@ impl AdditionalEndpoints {
pub struct ResolvedEndpoint {
endpoint: Url,
api_key: String,
config: Option<RefreshableConfiguration>,
}

impl ResolvedEndpoint {
Expand All @@ -113,16 +116,42 @@ impl ResolvedEndpoint {
Ok(Self {
endpoint,
api_key: api_key.to_string(),
config: None,
})
}

/// Creates a new `ResolvedEndpoint` instance from an existing `ResolvedEndpoint`, adding an optional `RefreshableConfiguration`.
pub fn with_refreshable_configuration(self, config: Option<RefreshableConfiguration>) -> Self {
Self {
endpoint: self.endpoint,
api_key: self.api_key,
config,
}
}

/// Returns the endpoint of the resolver.
pub fn endpoint(&self) -> &Url {
&self.endpoint
}

/// Returns the API key associated with the endpoint.
pub fn api_key(&self) -> &str {
///
/// If a refreshable configuration has been configured, the API key will be queried from the
/// configuration and stored if it has been updated since the last time `api_key` was called.
pub fn api_key(&mut self) -> &str {
if let Some(config) = &self.config {
match config.try_get_typed::<String>("api_key") {
Ok(Some(api_key)) => {
if !api_key.is_empty() && self.api_key != api_key {
debug!(endpoint = %self.endpoint, "Refreshed API key.");
self.api_key = api_key;
}
}
Ok(None) | Err(_) => {
debug!("Failed to retrieve API key from remote source (missing or wrong type). Continuing with last known valid API key.");
}
}
}
self.api_key.as_str()
}
}
Expand Down
21 changes: 14 additions & 7 deletions lib/saluki-components/src/destinations/datadog_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use http_body::Body;
use http_body_util::BodyExt as _;
use hyper::body::Incoming;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use saluki_config::GenericConfiguration;
use saluki_config::{GenericConfiguration, RefreshableConfiguration};
use saluki_core::{
components::{destinations::*, ComponentContext},
observability::ComponentMetricsExt as _,
Expand Down Expand Up @@ -144,6 +144,9 @@ pub struct DatadogMetricsConfiguration {
/// Defaults to empty.
#[serde(default, rename = "additional_endpoints")]
additional_endpoints: AdditionalEndpoints,

#[serde(skip)]
config_refresher: Option<RefreshableConfiguration>,
}

fn default_request_timeout_secs() -> u64 {
Expand Down Expand Up @@ -175,6 +178,11 @@ impl DatadogMetricsConfiguration {
pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
Ok(config.as_typed()?)
}

/// Add option to retrieve configuration values from a `RefreshableConfiguration`.
pub fn add_refreshable_configuration(&mut self, refresher: RefreshableConfiguration) {
self.config_refresher = Some(refresher);
}
}

#[async_trait]
Expand Down Expand Up @@ -204,7 +212,8 @@ impl DestinationBuilder for DatadogMetricsConfiguration {

// Resolve all endpoints that we'll be forwarding metrics to.
let primary_endpoint = calculate_resolved_endpoint(self.dd_url.as_deref(), &self.site, &self.api_key)
.error_context("Failed parsing/resolving the primary destination endpoint.")?;
.error_context("Failed parsing/resolving the primary destination endpoint.")?
.with_refreshable_configuration(self.config_refresher.clone());

let additional_endpoints = self
.additional_endpoints
Expand Down Expand Up @@ -555,14 +564,11 @@ async fn run_endpoint_io_loop<S, B>(
task_barrier.wait().await;
}

fn for_resolved_endpoint<B>(endpoint: ResolvedEndpoint) -> impl Fn(Request<B>) -> Request<B> + Clone {
fn for_resolved_endpoint<B>(mut endpoint: ResolvedEndpoint) -> impl FnMut(Request<B>) -> Request<B> + Clone {
let new_uri_authority = Authority::try_from(endpoint.endpoint().authority())
.expect("should not fail to construct new endpoint authority");
let new_uri_scheme =
Scheme::try_from(endpoint.endpoint().scheme()).expect("should not fail to construct new endpoint scheme");
let api_key_value =
HeaderValue::from_str(endpoint.api_key()).expect("should not fail to construct API key header value");

move |mut request| {
// Build an updated URI by taking the endpoint URL and slapping the request's URI path on the end of it.
let new_uri = Uri::builder()
Expand All @@ -571,7 +577,8 @@ fn for_resolved_endpoint<B>(endpoint: ResolvedEndpoint) -> impl Fn(Request<B>) -
.path_and_query(request.uri().path_and_query().expect("request path must exist").clone())
.build()
.expect("should not fail to construct new URI");

let api_key = endpoint.api_key();
let api_key_value = HeaderValue::from_str(api_key).expect("should not fail to construct API key header value");
*request.uri_mut() = new_uri;

// Add the API key as a header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ where
pub async fn new(endpoint: MetricsEndpoint, buffer_pool: O) -> Result<Self, RequestBuilderError> {
let chunked_buffer_pool = ChunkedBytesBufferObjectPool::new(buffer_pool);
let compressor = create_compressor(&chunked_buffer_pool).await;

Ok(Self {
endpoint,
endpoint_uri: endpoint.endpoint_uri(),
Expand Down
4 changes: 4 additions & 0 deletions lib/saluki-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ license = { workspace = true }
repository = { workspace = true }

[dependencies]
arc-swap = { workspace = true }
figment = { workspace = true, features = ["env", "json", "yaml"] }
http = { workspace = true }
reqwest = { workspace = true, features = ["rustls-tls-native-roots-no-provider", "json"] }
saluki-error = { workspace = true }
saluki-io = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions lib/saluki-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ use snafu::{ResultExt as _, Snafu};
use tracing::debug;

mod provider;
mod refresher;
mod secrets;
use self::provider::ResolvedProvider;
pub use self::refresher::{RefreshableConfiguration, RefresherConfiguration};

/// A configuration error.
#[derive(Debug, Snafu)]
Expand Down
Loading

0 comments on commit 5ffdc5e

Please sign in to comment.