Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[APR-278] Add config refresher to update API key #351

Merged
merged 19 commits into from
Dec 4, 2024
112 changes: 112 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/agent-data-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ default = []
fips = ["saluki-app/tls-fips"]

[dependencies]
arc-swap = { workspace = true }
rayz marked this conversation as resolved.
Show resolved Hide resolved
async-trait = { workspace = true }
bytesize = { workspace = true }
memory-accounting = { workspace = true }
Expand Down
11 changes: 9 additions & 2 deletions bin/agent-data-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#![deny(missing_docs)]
use std::{
future::pending,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -19,7 +20,7 @@ use saluki_components::{
AggregateConfiguration, ChainedConfiguration, HostEnrichmentConfiguration, OriginEnrichmentConfiguration,
},
};
use saluki_config::{ConfigurationLoader, GenericConfiguration};
use saluki_config::{ConfigurationLoader, GenericConfiguration, RefreshableConfiguration, RefresherConfiguration};
use saluki_core::topology::TopologyBlueprint;
use saluki_error::{ErrorContext as _, GenericError};
use saluki_health::HealthRegistry;
Expand Down Expand Up @@ -174,8 +175,14 @@ fn create_topology(
.with_transform_builder(host_enrichment_config)
.with_transform_builder(origin_enrichment_config);
let internal_metrics_remap_config = AgentTelemetryRemapperConfiguration::new();
let dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration)

let refresher_configuration = RefresherConfiguration::from_configuration(&configuration)?;
let refreshable_configuration: Arc<RefreshableConfiguration> = Arc::new(refresher_configuration.build()?);
refreshable_configuration.clone().spawn_refresh_task();

let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration)
.error_context("Failed to configure Datadog Metrics destination.")?;
dd_metrics_config.add_refreshable_configuration(refreshable_configuration.clone());
rayz marked this conversation as resolved.
Show resolved Hide resolved
let events_service_checks_config = DatadogEventsServiceChecksConfiguration::from_configuration(configuration)
.error_context("Failed to configure Datadog Events/Service Checks destination.")?;

Expand Down
1 change: 1 addition & 0 deletions lib/saluki-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ repository = { workspace = true }

[dependencies]
ahash = { workspace = true }
arc-swap = { workspace = true }
rayz marked this conversation as resolved.
Show resolved Hide resolved
async-compression = { workspace = true, features = ["tokio", "zlib"] }
async-trait = { workspace = true }
bitmask-enum = { workspace = true }
Expand Down
16 changes: 13 additions & 3 deletions lib/saluki-components/src/destinations/datadog_metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::time::Duration;

use async_trait::async_trait;
use http::{HeaderValue, Method, Request, Uri};
use http_body_util::BodyExt;
use hyper::body::Incoming;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use metrics::Counter;
use saluki_config::GenericConfiguration;
use saluki_config::{GenericConfiguration, RefreshableConfiguration};
use saluki_core::{
components::{destinations::*, ComponentContext, MetricsBuilder},
pooling::{FixedSizeObjectPool, ObjectPool},
Expand All @@ -23,6 +21,8 @@ use saluki_io::{
};
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr, PickFirst};
use std::sync::Arc;
use std::time::Duration;
use tokio::{
select,
sync::{mpsc, oneshot},
Expand Down Expand Up @@ -188,6 +188,9 @@ pub struct DatadogMetricsConfiguration {
#[serde_as(as = "PickFirst<(DisplayFromStr, _)>")]
#[serde(default, rename = "additional_endpoints")]
endpoints: AdditionalEndpoints,

#[serde(skip)]
config_refresher: Arc<RefreshableConfiguration>,
}

fn default_request_timeout_secs() -> u64 {
Expand Down Expand Up @@ -223,6 +226,11 @@ impl DatadogMetricsConfiguration {
fn api_base(&self) -> Result<Uri, GenericError> {
calculate_api_endpoint(self.dd_url.as_deref(), &self.site)
}

/// Add option to retrieve config values from a `RefreshableConfiguration`
rayz marked this conversation as resolved.
Show resolved Hide resolved
pub fn add_refreshable_configuration(&mut self, refresher: Arc<RefreshableConfiguration>) {
self.config_refresher = refresher;
}
}

#[async_trait]
Expand Down Expand Up @@ -253,13 +261,15 @@ impl DestinationBuilder for DatadogMetricsConfiguration {
api_base.clone(),
MetricsEndpoint::Series,
rb_buffer_pool.clone(),
self.config_refresher.clone(),
)
.await?;
let sketches_request_builder = RequestBuilder::new(
self.api_key.clone(),
api_base,
MetricsEndpoint::Sketches,
rb_buffer_pool,
self.config_refresher.clone(),
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::Arc;
use std::{io, num::NonZeroU64};

use datadog_protos::metrics::{self as proto, Resource};
use ddsketch_agent::DDSketch;
use http::{Method, Request, Uri};
use protobuf::CodedOutputStream;
use saluki_config::RefreshableConfiguration;
use saluki_core::pooling::ObjectPool;
use saluki_event::metric::*;
use saluki_io::net::client::replay::ReplayBody;
Expand Down Expand Up @@ -47,6 +49,7 @@ pub enum RequestBuilderError {
source: http::Error,
},
FailedToCreateReplayBody,
FailedToGetApiKey,
}

impl RequestBuilderError {
Expand Down Expand Up @@ -110,6 +113,7 @@ impl MetricsEndpoint {
}
}

#[allow(unused)]
rayz marked this conversation as resolved.
Show resolved Hide resolved
pub struct RequestBuilder<O>
where
O: ObjectPool + 'static,
Expand All @@ -125,6 +129,7 @@ where
uncompressed_len: usize,
metrics_written: usize,
scratch_buf_lens: Vec<usize>,
refresher: Arc<RefreshableConfiguration>,
}

impl<O> RequestBuilder<O>
Expand All @@ -135,11 +140,11 @@ where
/// Creates a new `RequestBuilder` for the given endpoint, using the specified API key and base URI.
pub async fn new(
api_key: String, api_base_uri: Uri, endpoint: MetricsEndpoint, buffer_pool: O,
refresher: Arc<RefreshableConfiguration>,
) -> Result<Self, RequestBuilderError> {
let chunked_buffer_pool = ChunkedBufferObjectPool::new(buffer_pool);
let compressor = create_compressor(&chunked_buffer_pool).await;
let api_uri = build_uri_for_endpoint(api_base_uri, endpoint)?;

Ok(Self {
api_key,
api_uri,
Expand All @@ -151,6 +156,7 @@ where
uncompressed_len: 0,
metrics_written: 0,
scratch_buf_lens: Vec::new(),
refresher,
})
}

Expand Down Expand Up @@ -354,7 +360,12 @@ where
.uri(self.api_uri.clone())
.header("Content-Type", "application/x-protobuf")
.header("Content-Encoding", "deflate")
.header("DD-API-KEY", self.api_key.clone())
.header(
"DD-API-KEY",
self.refresher
.get_typed::<String>("api_key")
.map_err(|_| RequestBuilderError::FailedToGetApiKey)?,
)
// TODO: We can't access the version number of the package being built that _includes_ this library, so
// using CARGO_PKG_VERSION or something like that would always be the version of `saluki-components`, which
// isn't what we want... maybe we can figure out some way to shove it in a global somewhere or something?
Expand Down
4 changes: 4 additions & 0 deletions lib/saluki-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ license = { workspace = true }
repository = { workspace = true }

[dependencies]
arc-swap = { workspace = true }
figment = { workspace = true, features = ["env", "json", "yaml"] }
http = { workspace = true }
reqwest = { workspace = true , features = ["default-tls", "json"]}
saluki-io = { workspace = true }
saluki-error = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
Loading