Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APR-278] Add config refresher to update API key #351

Merged
merged 19 commits into from
Dec 4, 2024
109 changes: 109 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions bin/agent-data-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#![deny(missing_docs)]
use std::{
future::pending,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -19,7 +20,7 @@ use saluki_components::{
AggregateConfiguration, ChainedConfiguration, HostEnrichmentConfiguration, OriginEnrichmentConfiguration,
},
};
use saluki_config::{ConfigurationLoader, GenericConfiguration};
use saluki_config::{ConfigRefresher, ConfigRefresherConfiguration, ConfigurationLoader, GenericConfiguration};
use saluki_core::topology::TopologyBlueprint;
use saluki_error::{ErrorContext as _, GenericError};
use saluki_health::HealthRegistry;
Expand Down Expand Up @@ -174,8 +175,14 @@ fn create_topology(
.with_transform_builder(host_enrichment_config)
.with_transform_builder(origin_enrichment_config);
let internal_metrics_remap_config = AgentTelemetryRemapperConfiguration::new();
let dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration)

let config_refresher_configuration = ConfigRefresherConfiguration::from_configuration(&configuration)?;
let config_refresher: Arc<ConfigRefresher> = Arc::new(config_refresher_configuration.build()?);
config_refresher.clone().spawn_refresh_task();

let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration)
.error_context("Failed to configure Datadog Metrics destination.")?;
dd_metrics_config.set_config_refresher(config_refresher);
let events_service_checks_config = DatadogEventsServiceChecksConfiguration::from_configuration(configuration)
.error_context("Failed to configure Datadog Events/Service Checks destination.")?;

Expand Down
14 changes: 12 additions & 2 deletions lib/saluki-components/src/destinations/datadog_metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use http::{HeaderValue, Method, Request, Uri};
use http_body_util::BodyExt;
use hyper::body::Incoming;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use metrics::Counter;
use saluki_config::GenericConfiguration;
use saluki_config::{ConfigRefresher, GenericConfiguration};
use saluki_core::{
components::{destinations::*, ComponentContext, MetricsBuilder},
pooling::{FixedSizeObjectPool, ObjectPool},
Expand Down Expand Up @@ -188,6 +188,9 @@ pub struct DatadogMetricsConfiguration {
#[serde_as(as = "PickFirst<(DisplayFromStr, _)>")]
#[serde(default, rename = "additional_endpoints")]
endpoints: AdditionalEndpoints,

#[serde(skip)]
config_refresher: Arc<ConfigRefresher>,
}

fn default_request_timeout_secs() -> u64 {
Expand Down Expand Up @@ -223,6 +226,11 @@ impl DatadogMetricsConfiguration {
fn api_base(&self) -> Result<Uri, GenericError> {
calculate_api_endpoint(self.dd_url.as_deref(), &self.site)
}

/// Add option to refresh config with a remote source
pub fn set_config_refresher(&mut self, refresher: Arc<ConfigRefresher>) {
self.config_refresher = refresher;
}
}

#[async_trait]
Expand Down Expand Up @@ -253,13 +261,15 @@ impl DestinationBuilder for DatadogMetricsConfiguration {
api_base.clone(),
MetricsEndpoint::Series,
rb_buffer_pool.clone(),
self.config_refresher.clone(),
)
.await?;
let sketches_request_builder = RequestBuilder::new(
self.api_key.clone(),
api_base,
MetricsEndpoint::Sketches,
rb_buffer_pool,
self.config_refresher.clone(),
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::Arc;
use std::{io, num::NonZeroU64};

use datadog_protos::metrics::{self as proto, Resource};
use ddsketch_agent::DDSketch;
use http::{Method, Request, Uri};
use protobuf::CodedOutputStream;
use saluki_config::ConfigRefresher;
use saluki_core::pooling::ObjectPool;
use saluki_event::metric::*;
use saluki_io::net::client::replay::ReplayBody;
Expand Down Expand Up @@ -110,6 +112,7 @@ impl MetricsEndpoint {
}
}

#[allow(unused)]
rayz marked this conversation as resolved.
Show resolved Hide resolved
pub struct RequestBuilder<O>
where
O: ObjectPool + 'static,
Expand All @@ -125,6 +128,7 @@ where
uncompressed_len: usize,
metrics_written: usize,
scratch_buf_lens: Vec<usize>,
refresher: Arc<ConfigRefresher>,
}

impl<O> RequestBuilder<O>
Expand All @@ -134,12 +138,11 @@ where
{
/// Creates a new `RequestBuilder` for the given endpoint, using the specified API key and base URI.
pub async fn new(
api_key: String, api_base_uri: Uri, endpoint: MetricsEndpoint, buffer_pool: O,
api_key: String, api_base_uri: Uri, endpoint: MetricsEndpoint, buffer_pool: O, refresher: Arc<ConfigRefresher>,
) -> Result<Self, RequestBuilderError> {
let chunked_buffer_pool = ChunkedBufferObjectPool::new(buffer_pool);
let compressor = create_compressor(&chunked_buffer_pool).await;
let api_uri = build_uri_for_endpoint(api_base_uri, endpoint)?;

Ok(Self {
api_key,
api_uri,
Expand All @@ -151,6 +154,7 @@ where
uncompressed_len: 0,
metrics_written: 0,
scratch_buf_lens: Vec::new(),
refresher,
})
}

Expand Down Expand Up @@ -354,7 +358,7 @@ where
.uri(self.api_uri.clone())
.header("Content-Type", "application/x-protobuf")
.header("Content-Encoding", "deflate")
.header("DD-API-KEY", self.api_key.clone())
.header("DD-API-KEY", self.refresher.api_key())
// TODO: We can't access the version number of the package being built that _includes_ this library, so
// using CARGO_PKG_VERSION or something like that would always be the version of `saluki-components`, which
// isn't what we want... maybe we can figure out some way to shove it in a global somewhere or something?
Expand Down
3 changes: 3 additions & 0 deletions lib/saluki-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ repository = { workspace = true }

[dependencies]
figment = { workspace = true, features = ["env", "json", "yaml"] }
http = { workspace = true }
reqwest = { workspace = true , features = ["default-tls", "json"]}
saluki-io = { workspace = true }
saluki-error = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions lib/saluki-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ use snafu::{ResultExt as _, Snafu};
use tracing::debug;

mod provider;
mod refresher;
mod secrets;
use self::provider::ResolvedProvider;
pub use self::refresher::{ConfigRefresher, ConfigRefresherConfiguration};

/// A configuration error.
#[derive(Debug, Snafu)]
Expand Down
Loading