Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APR-278] Add config refresher to update API key #351

Merged
merged 19 commits into from
Dec 4, 2024
110 changes: 110 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions bin/agent-data-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#![deny(missing_docs)]
use std::{
future::pending,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -19,7 +20,7 @@ use saluki_components::{
AggregateConfiguration, ChainedConfiguration, HostEnrichmentConfiguration, OriginEnrichmentConfiguration,
},
};
use saluki_config::{ConfigurationLoader, GenericConfiguration};
use saluki_config::{ConfigurationLoader, GenericConfiguration, RefreshableConfiguration, RefresherConfiguration};
use saluki_core::topology::TopologyBlueprint;
use saluki_error::{ErrorContext as _, GenericError};
use saluki_health::HealthRegistry;
Expand Down Expand Up @@ -176,8 +177,13 @@ fn create_topology(
.with_transform_builder(host_enrichment_config)
.with_transform_builder(origin_enrichment_config);
let internal_metrics_remap_config = AgentTelemetryRemapperConfiguration::new();
let dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration)

let refresher_configuration = RefresherConfiguration::from_configuration(configuration)?;
let refreshable_configuration: Arc<RefreshableConfiguration> = refresher_configuration.build()?;

let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration)
.error_context("Failed to configure Datadog Metrics destination.")?;
dd_metrics_config.add_refreshable_configuration(refreshable_configuration.clone());
rayz marked this conversation as resolved.
Show resolved Hide resolved
let events_service_checks_config = DatadogEventsServiceChecksConfiguration::from_configuration(configuration)
.error_context("Failed to configure Datadog Events/Service Checks destination.")?;

Expand Down
36 changes: 30 additions & 6 deletions lib/saluki-components/src/destinations/datadog_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use http_body::Body;
use http_body_util::BodyExt as _;
use hyper::body::Incoming;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use saluki_config::GenericConfiguration;
use saluki_config::{GenericConfiguration, RefreshableConfiguration};
use saluki_core::{
components::{destinations::*, ComponentContext},
pooling::{FixedSizeObjectPool, ObjectPool},
Expand Down Expand Up @@ -141,6 +141,9 @@ pub struct DatadogMetricsConfiguration {
/// Defaults to empty.
#[serde(default, rename = "additional_endpoints")]
additional_endpoints: AdditionalEndpoints,

#[serde(skip)]
config_refresher: Arc<RefreshableConfiguration>,
}

fn default_request_timeout_secs() -> u64 {
Expand Down Expand Up @@ -172,6 +175,11 @@ impl DatadogMetricsConfiguration {
pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
Ok(config.as_typed()?)
}

/// Add option to retrieve configuration values from a `RefreshableConfiguration`.
pub fn add_refreshable_configuration(&mut self, refresher: Arc<RefreshableConfiguration>) {
self.config_refresher = refresher;
}
}

#[async_trait]
Expand Down Expand Up @@ -211,11 +219,14 @@ impl DestinationBuilder for DatadogMetricsConfiguration {
let series_request_builder = RequestBuilder::new(MetricsEndpoint::Series, rb_buffer_pool.clone()).await?;
let sketches_request_builder = RequestBuilder::new(MetricsEndpoint::Sketches, rb_buffer_pool).await?;

let refresher = self.config_refresher.clone();

Ok(Box::new(DatadogMetrics {
service,
series_request_builder,
sketches_request_builder,
endpoints,
refresher,
}))
}
}
Expand Down Expand Up @@ -259,6 +270,7 @@ where
series_request_builder: RequestBuilder<O>,
sketches_request_builder: RequestBuilder<O>,
endpoints: Vec<ResolvedEndpoint>,
refresher: Arc<RefreshableConfiguration>,
}

#[allow(unused)]
Expand All @@ -276,6 +288,7 @@ where
mut sketches_request_builder,
service,
endpoints,
refresher,
} = *self;

let mut health = context.take_health_handle();
Expand All @@ -291,6 +304,7 @@ where
telemetry.clone(),
context.component_context(),
endpoints,
refresher,
));

health.mark_ready();
Expand Down Expand Up @@ -430,6 +444,7 @@ where
async fn run_io_loop<S, B>(
mut requests_rx: mpsc::Receiver<(usize, Request<B>)>, io_shutdown_tx: oneshot::Sender<()>, service: S,
telemetry: ComponentTelemetry, component_context: ComponentContext, resolved_endpoints: Vec<ResolvedEndpoint>,
refresher: Arc<RefreshableConfiguration>,
) where
S: Service<Request<B>, Response = hyper::Response<Incoming>> + Clone + Send + 'static,
S::Future: Send,
Expand All @@ -455,6 +470,7 @@ async fn run_io_loop<S, B>(
telemetry.clone(),
component_context.clone(),
resolved_endpoint,
refresher.clone(),
));

endpoint_txs.push((endpoint_url, endpoint_tx));
Expand Down Expand Up @@ -489,6 +505,7 @@ async fn run_io_loop<S, B>(
async fn run_endpoint_io_loop<S, B>(
mut requests_rx: mpsc::Receiver<(usize, Request<B>)>, task_barrier: Arc<Barrier>, service: S,
telemetry: ComponentTelemetry, context: ComponentContext, endpoint: ResolvedEndpoint,
refresher: Arc<RefreshableConfiguration>,
) where
S: Service<Request<B>, Response = hyper::Response<Incoming>> + Send + 'static,
S::Future: Send,
Expand All @@ -504,7 +521,7 @@ async fn run_endpoint_io_loop<S, B>(
// of the URI, adding the API key as a header, and so on.
let mut service = ServiceBuilder::new()
// Set the request's URI to the endpoint's URI, and add the API key as a header.
.map_request(for_resolved_endpoint::<B>(endpoint))
.map_request(for_resolved_endpoint::<B>(endpoint, refresher.clone()))
// Set the User-Agent and DD-Agent-Version headers indicating the version of the data plane sending the request.
.map_request(with_version_info::<B>())
// Add telemetry about the requests.
Expand Down Expand Up @@ -552,14 +569,21 @@ async fn run_endpoint_io_loop<S, B>(
task_barrier.wait().await;
}

fn for_resolved_endpoint<B>(endpoint: ResolvedEndpoint) -> impl Fn(Request<B>) -> Request<B> + Clone {
#[allow(unused)]
rayz marked this conversation as resolved.
Show resolved Hide resolved
fn for_resolved_endpoint<B>(
endpoint: ResolvedEndpoint, refresher: Arc<RefreshableConfiguration>,
) -> impl Fn(Request<B>) -> Request<B> + Clone {
let new_uri_authority = Authority::try_from(endpoint.endpoint().authority())
.expect("should not fail to construct new endpoint authority");
let new_uri_scheme =
Scheme::try_from(endpoint.endpoint().scheme()).expect("should not fail to construct new endpoint scheme");
let api_key_value =
HeaderValue::from_str(endpoint.api_key()).expect("should not fail to construct API key header value");

let mut api_key: String;
if let Ok(refresher_api_key) = refresher.get_typed::<String>("api_key") {
api_key = refresher_api_key.clone();
} else {
api_key = endpoint.api_key().to_string();
}
rayz marked this conversation as resolved.
Show resolved Hide resolved
let api_key_value = HeaderValue::from_str(&api_key).expect("should not fail to construct API key header value");
move |mut request| {
// Build an updated URI by taking the endpoint URL and slapping the request's URI path on the end of it.
let new_uri = Uri::builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl MetricsEndpoint {
}
}

#[allow(unused)]
rayz marked this conversation as resolved.
Show resolved Hide resolved
pub struct RequestBuilder<O>
where
O: ObjectPool<Item = BytesBuffer> + 'static,
Expand All @@ -128,7 +129,6 @@ where
pub async fn new(endpoint: MetricsEndpoint, buffer_pool: O) -> Result<Self, RequestBuilderError> {
let chunked_buffer_pool = ChunkedBytesBufferObjectPool::new(buffer_pool);
let compressor = create_compressor(&chunked_buffer_pool).await;

Ok(Self {
endpoint,
endpoint_uri: endpoint.endpoint_uri(),
Expand Down
4 changes: 4 additions & 0 deletions lib/saluki-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ license = { workspace = true }
repository = { workspace = true }

[dependencies]
arc-swap = { workspace = true }
figment = { workspace = true, features = ["env", "json", "yaml"] }
http = { workspace = true }
reqwest = { workspace = true, features = ["default-tls", "json"] }
saluki-error = { workspace = true }
saluki-io = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
snafu = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions lib/saluki-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ use snafu::{ResultExt as _, Snafu};
use tracing::debug;

mod provider;
mod refresher;
mod secrets;
use self::provider::ResolvedProvider;
pub use self::refresher::{RefreshableConfiguration, RefresherConfiguration};

/// A configuration error.
#[derive(Debug, Snafu)]
Expand Down
Loading
Loading