Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APR-278] Add config refresher to update API key #351

Merged
merged 19 commits into from
Dec 4, 2024
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions bin/agent-data-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use saluki_components::{
AggregateConfiguration, ChainedConfiguration, HostEnrichmentConfiguration, OriginEnrichmentConfiguration,
},
};
use saluki_config::{ConfigurationLoader, GenericConfiguration};
use saluki_config::{ConfigurationLoader, GenericConfiguration, RefreshableConfiguration, RefresherConfiguration};
use saluki_core::topology::TopologyBlueprint;
use saluki_error::{ErrorContext as _, GenericError};
use saluki_health::HealthRegistry;
Expand Down Expand Up @@ -176,8 +176,22 @@ fn create_topology(
.with_transform_builder(host_enrichment_config)
.with_transform_builder(origin_enrichment_config);
let internal_metrics_remap_config = AgentTelemetryRemapperConfiguration::new();
let dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration)

let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration)
.error_context("Failed to configure Datadog Metrics destination.")?;

match RefresherConfiguration::from_configuration(configuration) {
Ok(refresher_configuration) => {
let refreshable_configuration: RefreshableConfiguration = refresher_configuration.build()?;
dd_metrics_config.add_refreshable_configuration(refreshable_configuration);
}
Err(_) => {
info!(
"Dynamic configuration refreshing will be unavailable due to failure to configure refresher configuration."
)
}
}

let events_service_checks_config = DatadogEventsServiceChecksConfiguration::from_configuration(configuration)
.error_context("Failed to configure Datadog Events/Service Checks destination.")?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
};

use regex::Regex;
use saluki_config::RefreshableConfiguration;
use saluki_metadata;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr, OneOrMany, PickFirst};
Expand Down Expand Up @@ -82,6 +83,7 @@ impl AdditionalEndpoints {
resolved.push(ResolvedEndpoint {
endpoint: endpoint.clone(),
api_key: trimmed_api_key.to_string(),
config: None,
});
}
}
Expand All @@ -98,6 +100,7 @@ impl AdditionalEndpoints {
pub struct ResolvedEndpoint {
endpoint: Url,
api_key: String,
config: Option<RefreshableConfiguration>,
}

impl ResolvedEndpoint {
Expand All @@ -113,16 +116,42 @@ impl ResolvedEndpoint {
Ok(Self {
endpoint,
api_key: api_key.to_string(),
config: None,
})
}

/// Creates a new `ResolvedEndpoint` instance from an existing `ResolvedEndpoint`, adding an optional `RefreshableConfiguration`.
pub fn with_refreshable_configuration(self, config: Option<RefreshableConfiguration>) -> Self {
Self {
endpoint: self.endpoint,
api_key: self.api_key,
config,
}
}

/// Returns the endpoint of the resolver.
pub fn endpoint(&self) -> &Url {
&self.endpoint
}

/// Returns the API key associated with the endpoint.
rayz marked this conversation as resolved.
Show resolved Hide resolved
pub fn api_key(&self) -> &str {
///
/// If a refreshable configuration has been configured, the API key will be queried from the
/// configuration and stored if it has been updated since the last time `api_key` was called.
pub fn api_key(&mut self) -> &str {
if let Some(config) = &self.config {
match config.try_get_typed::<String>("api_key") {
Ok(Some(api_key)) => {
if !api_key.is_empty() && self.api_key != api_key {
debug!(endpoint = %self.endpoint, "Refreshed API key.");
self.api_key = api_key;
}
}
Ok(None) | Err(_) => {
debug!("Failed to retrieve API key from remote source (missing or wrong type). Continuing with last known valid API key.");
}
}
}
self.api_key.as_str()
}
}
Expand Down
21 changes: 14 additions & 7 deletions lib/saluki-components/src/destinations/datadog_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use http_body::Body;
use http_body_util::BodyExt as _;
use hyper::body::Incoming;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use saluki_config::GenericConfiguration;
use saluki_config::{GenericConfiguration, RefreshableConfiguration};
use saluki_core::{
components::{destinations::*, ComponentContext},
pooling::{FixedSizeObjectPool, ObjectPool},
Expand Down Expand Up @@ -141,6 +141,9 @@ pub struct DatadogMetricsConfiguration {
/// Defaults to empty.
#[serde(default, rename = "additional_endpoints")]
additional_endpoints: AdditionalEndpoints,

#[serde(skip)]
config_refresher: Option<RefreshableConfiguration>,
}

fn default_request_timeout_secs() -> u64 {
Expand Down Expand Up @@ -172,6 +175,11 @@ impl DatadogMetricsConfiguration {
pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
Ok(config.as_typed()?)
}

/// Add option to retrieve configuration values from a `RefreshableConfiguration`.
pub fn add_refreshable_configuration(&mut self, refresher: RefreshableConfiguration) {
self.config_refresher = Some(refresher);
}
}

#[async_trait]
Expand All @@ -196,7 +204,8 @@ impl DestinationBuilder for DatadogMetricsConfiguration {

// Resolve all endpoints that we'll be forwarding metrics to.
let primary_endpoint = calculate_resolved_endpoint(self.dd_url.as_deref(), &self.site, &self.api_key)
.error_context("Failed parsing/resolving the primary destination endpoint.")?;
.error_context("Failed parsing/resolving the primary destination endpoint.")?
.with_refreshable_configuration(self.config_refresher.clone());

let additional_endpoints = self
.additional_endpoints
Expand Down Expand Up @@ -552,14 +561,11 @@ async fn run_endpoint_io_loop<S, B>(
task_barrier.wait().await;
}

fn for_resolved_endpoint<B>(endpoint: ResolvedEndpoint) -> impl Fn(Request<B>) -> Request<B> + Clone {
fn for_resolved_endpoint<B>(mut endpoint: ResolvedEndpoint) -> impl FnMut(Request<B>) -> Request<B> + Clone {
let new_uri_authority = Authority::try_from(endpoint.endpoint().authority())
.expect("should not fail to construct new endpoint authority");
let new_uri_scheme =
Scheme::try_from(endpoint.endpoint().scheme()).expect("should not fail to construct new endpoint scheme");
let api_key_value =
HeaderValue::from_str(endpoint.api_key()).expect("should not fail to construct API key header value");

move |mut request| {
// Build an updated URI by taking the endpoint URL and slapping the request's URI path on the end of it.
let new_uri = Uri::builder()
Expand All @@ -568,7 +574,8 @@ fn for_resolved_endpoint<B>(endpoint: ResolvedEndpoint) -> impl Fn(Request<B>) -
.path_and_query(request.uri().path_and_query().expect("request path must exist").clone())
.build()
.expect("should not fail to construct new URI");

let api_key = endpoint.api_key();
let api_key_value = HeaderValue::from_str(api_key).expect("should not fail to construct API key header value");
*request.uri_mut() = new_uri;

// Add the API key as a header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ where
pub async fn new(endpoint: MetricsEndpoint, buffer_pool: O) -> Result<Self, RequestBuilderError> {
let chunked_buffer_pool = ChunkedBytesBufferObjectPool::new(buffer_pool);
let compressor = create_compressor(&chunked_buffer_pool).await;

Ok(Self {
endpoint,
endpoint_uri: endpoint.endpoint_uri(),
Expand Down
4 changes: 4 additions & 0 deletions lib/saluki-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ license = { workspace = true }
repository = { workspace = true }

[dependencies]
arc-swap = { workspace = true }
figment = { workspace = true, features = ["env", "json", "yaml"] }
http = { workspace = true }
reqwest = { workspace = true, features = ["rustls-tls-native-roots-no-provider", "json"] }
saluki-error = { workspace = true }
saluki-io = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions lib/saluki-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ use snafu::{ResultExt as _, Snafu};
use tracing::debug;

mod provider;
mod refresher;
mod secrets;
use self::provider::ResolvedProvider;
pub use self::refresher::{RefreshableConfiguration, RefresherConfiguration};

/// A configuration error.
#[derive(Debug, Snafu)]
Expand Down
Loading
Loading