From 35596356fb4381a33587cec3ebab5ca9327c7bea Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Mon, 25 Nov 2024 12:52:38 -0500 Subject: [PATCH 01/18] WIP --- Cargo.lock | 109 ++++++++++++++++++ bin/agent-data-plane/src/main.rs | 11 +- .../src/destinations/datadog_metrics/mod.rs | 14 ++- .../datadog_metrics/request_builder.rs | 10 +- lib/saluki-config/Cargo.toml | 3 + lib/saluki-config/src/lib.rs | 2 + lib/saluki-config/src/refresher.rs | 89 ++++++++++++++ 7 files changed, 231 insertions(+), 7 deletions(-) create mode 100644 lib/saluki-config/src/refresher.rs diff --git a/Cargo.lock b/Cargo.lock index 409602ac..17803a49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1081,6 +1081,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1581,6 +1596,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.5.0", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.10" @@ -2239,6 +2270,23 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +[[package]] +name = "native-tls" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "ndarray" version = "0.16.1" @@ -2372,12 +2420,50 @@ version = "11.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" +[[package]] +name = "openssl" +version = "0.10.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.103" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.18.0" @@ -3095,19 +3181,23 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper 1.5.0", + "hyper-tls", "hyper-util", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", "sync_wrapper 1.0.1", "tokio", + "tokio-native-tls", "tokio-util", "tower-service", "url", @@ -3350,7 +3440,10 @@ name = "saluki-config" version = "0.1.0" dependencies = [ "figment", + "http 1.1.0", + "reqwest", "saluki-error", + "saluki-io", "serde", "serde_json", "snafu", @@ -4165,6 +4258,16 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.0" @@ -4622,6 +4725,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" diff --git a/bin/agent-data-plane/src/main.rs b/bin/agent-data-plane/src/main.rs index 4ca7093d..0d3ac6db 100644 --- a/bin/agent-data-plane/src/main.rs +++ b/bin/agent-data-plane/src/main.rs @@ -7,6 +7,7 @@ #![deny(missing_docs)] use std::{ future::pending, + sync::Arc, time::{Duration, Instant}, }; @@ -19,7 +20,7 @@ use saluki_components::{ AggregateConfiguration, ChainedConfiguration, HostEnrichmentConfiguration, OriginEnrichmentConfiguration, }, }; -use saluki_config::{ConfigurationLoader, GenericConfiguration}; +use saluki_config::{ConfigRefresher, ConfigRefresherConfiguration, ConfigurationLoader, GenericConfiguration}; use saluki_core::topology::TopologyBlueprint; use saluki_error::{ErrorContext as _, GenericError}; use saluki_health::HealthRegistry; @@ -174,8 +175,14 @@ fn create_topology( .with_transform_builder(host_enrichment_config) .with_transform_builder(origin_enrichment_config); let internal_metrics_remap_config = AgentTelemetryRemapperConfiguration::new(); - let dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration) + + let config_refresher_configuration = ConfigRefresherConfiguration::from_configuration(&configuration)?; + let config_refresher: Arc = Arc::new(config_refresher_configuration.build()?); + config_refresher.clone().spawn_refresh_task(); + + let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Metrics destination.")?; + dd_metrics_config.set_config_refresher(config_refresher); let events_service_checks_config = DatadogEventsServiceChecksConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Events/Service Checks destination.")?; diff --git a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs index fce00831..33c5f5bd 100644 --- a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs +++ b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use http::{HeaderValue, Method, Request, Uri}; @@ -6,7 +6,7 @@ use http_body_util::BodyExt; use hyper::body::Incoming; use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; use metrics::Counter; -use saluki_config::GenericConfiguration; +use saluki_config::{ConfigRefresher, GenericConfiguration}; use saluki_core::{ components::{destinations::*, ComponentContext, MetricsBuilder}, pooling::{FixedSizeObjectPool, ObjectPool}, @@ -188,6 +188,9 @@ pub struct DatadogMetricsConfiguration { #[serde_as(as = "PickFirst<(DisplayFromStr, _)>")] #[serde(default, rename = "additional_endpoints")] endpoints: AdditionalEndpoints, + + #[serde(skip)] + config_refresher: Arc, } fn default_request_timeout_secs() -> u64 { @@ -223,6 +226,11 @@ impl DatadogMetricsConfiguration { fn api_base(&self) -> Result { calculate_api_endpoint(self.dd_url.as_deref(), &self.site) } + + /// Add option to refresh config with a remote source + pub fn set_config_refresher(&mut self, refresher: Arc) { + self.config_refresher = refresher; + } } #[async_trait] @@ -253,6 +261,7 @@ impl DestinationBuilder for DatadogMetricsConfiguration { api_base.clone(), MetricsEndpoint::Series, rb_buffer_pool.clone(), + self.config_refresher.clone(), ) .await?; let sketches_request_builder = RequestBuilder::new( @@ -260,6 +269,7 @@ impl DestinationBuilder for DatadogMetricsConfiguration { api_base, MetricsEndpoint::Sketches, rb_buffer_pool, + self.config_refresher.clone(), ) .await?; diff --git a/lib/saluki-components/src/destinations/datadog_metrics/request_builder.rs b/lib/saluki-components/src/destinations/datadog_metrics/request_builder.rs index 5f9e560a..9fc0cd64 100644 --- a/lib/saluki-components/src/destinations/datadog_metrics/request_builder.rs +++ b/lib/saluki-components/src/destinations/datadog_metrics/request_builder.rs @@ -1,9 +1,11 @@ +use std::sync::Arc; use std::{io, num::NonZeroU64}; use datadog_protos::metrics::{self as proto, Resource}; use ddsketch_agent::DDSketch; use http::{Method, Request, Uri}; use protobuf::CodedOutputStream; +use saluki_config::ConfigRefresher; use saluki_core::pooling::ObjectPool; use saluki_event::metric::*; use saluki_io::net::client::replay::ReplayBody; @@ -110,6 +112,7 @@ impl MetricsEndpoint { } } +#[allow(unused)] pub struct RequestBuilder where O: ObjectPool + 'static, @@ -125,6 +128,7 @@ where uncompressed_len: usize, metrics_written: usize, scratch_buf_lens: Vec, + refresher: Arc, } impl RequestBuilder @@ -134,12 +138,11 @@ where { /// Creates a new `RequestBuilder` for the given endpoint, using the specified API key and base URI. pub async fn new( - api_key: String, api_base_uri: Uri, endpoint: MetricsEndpoint, buffer_pool: O, + api_key: String, api_base_uri: Uri, endpoint: MetricsEndpoint, buffer_pool: O, refresher: Arc, ) -> Result { let chunked_buffer_pool = ChunkedBufferObjectPool::new(buffer_pool); let compressor = create_compressor(&chunked_buffer_pool).await; let api_uri = build_uri_for_endpoint(api_base_uri, endpoint)?; - Ok(Self { api_key, api_uri, @@ -151,6 +154,7 @@ where uncompressed_len: 0, metrics_written: 0, scratch_buf_lens: Vec::new(), + refresher, }) } @@ -354,7 +358,7 @@ where .uri(self.api_uri.clone()) .header("Content-Type", "application/x-protobuf") .header("Content-Encoding", "deflate") - .header("DD-API-KEY", self.api_key.clone()) + .header("DD-API-KEY", self.refresher.api_key()) // TODO: We can't access the version number of the package being built that _includes_ this library, so // using CARGO_PKG_VERSION or something like that would always be the version of `saluki-components`, which // isn't what we want... maybe we can figure out some way to shove it in a global somewhere or something? diff --git a/lib/saluki-config/Cargo.toml b/lib/saluki-config/Cargo.toml index 09eb5822..c25857f3 100644 --- a/lib/saluki-config/Cargo.toml +++ b/lib/saluki-config/Cargo.toml @@ -7,6 +7,9 @@ repository = { workspace = true } [dependencies] figment = { workspace = true, features = ["env", "json", "yaml"] } +http = { workspace = true } +reqwest = { workspace = true , features = ["default-tls", "json"]} +saluki-io = { workspace = true } saluki-error = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/lib/saluki-config/src/lib.rs b/lib/saluki-config/src/lib.rs index 26a7ca5a..14513b9b 100644 --- a/lib/saluki-config/src/lib.rs +++ b/lib/saluki-config/src/lib.rs @@ -12,8 +12,10 @@ use snafu::{ResultExt as _, Snafu}; use tracing::debug; mod provider; +mod refresher; mod secrets; use self::provider::ResolvedProvider; +pub use self::refresher::{ConfigRefresher, ConfigRefresherConfiguration}; /// A configuration error. #[derive(Debug, Snafu)] diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs new file mode 100644 index 00000000..5070e9b4 --- /dev/null +++ b/lib/saluki-config/src/refresher.rs @@ -0,0 +1,89 @@ +use std::sync::Arc; + +use crate::GenericConfiguration; +use reqwest; +use saluki_error::GenericError; +use serde::Deserialize; +use tokio::time::{sleep, Duration}; + +#[allow(dead_code)] +#[derive(Debug, Deserialize)] +struct ConfigResponse { + api_key: String, +} + +/// Configuration for setting up `ConfigRefresher`. +#[derive(Default, Deserialize)] +pub struct ConfigRefresherConfiguration { + api_key: String, + auth_token_file_path: String, +} + +/// The most recent configuration retrieved from the datadog-agent. +#[derive(Default)] +pub struct ConfigRefresher { + token: String, + api_key: String, +} + +#[allow(unused)] +impl ConfigRefresherConfiguration { + /// Creates a new `ConfigRefresherConfiguration` from the given configuration. + pub fn from_configuration(config: &GenericConfiguration) -> Result { + Ok(config.as_typed()?) + } + /// Create `ConfigRefresher` from `ConfigRefresherConfiguration` + pub fn build(&self) -> Result { + let raw_bearer_token = std::fs::read_to_string(&self.auth_token_file_path)?; + Ok(ConfigRefresher { + token: raw_bearer_token, + api_key: self.api_key.clone(), + }) + } +} +impl ConfigRefresher { + /// Start a task that queries the datadog-agent config endpoint every 15 seconds. + pub fn spawn_refresh_task(self: Arc) { + tokio::spawn(async move { + loop { + match self.query_agent().await { + Ok(_) => { + // todo()! + } + Err(_e) => { + // todo()! + } + } + sleep(Duration::from_secs(15)).await; // Wait for 15 seconds + } + }); + } + /// Query the datadog-agent config endpoint for the latest config + #[allow(unused)] + pub async fn query_agent(&self) -> Result<(), GenericError> { + let url = format!("https://localhost:5004/config/v1/"); + let client = reqwest::ClientBuilder::new() + .danger_accept_invalid_certs(true) // Allow invalid certificates + .build()?; + + let response = client + .get(&url) // Use the URL for the GET request + .header("Content-Type", "application/json") + .header("Authorization", format!("Bearer {}", self.token)) + .header("DD-Agent-Version", "0.1.0") + .header("User-Agent", "agent-data-plane/0.1.0") + .send() // Send the request + .await?; + + let config_response: ConfigResponse = response.json().await?; + + // self.api_key = config_response.api_key; + + Ok(()) + } + + /// Most recent `api_key` retrieved from the datadog-agent. + pub fn api_key(&self) -> &str { + &self.api_key + } +} From f73b58e657423bce852ecd1e76fa4f10f2e4e220 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Mon, 25 Nov 2024 13:00:20 -0500 Subject: [PATCH 02/18] use const for endpoint --- lib/saluki-config/src/refresher.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs index 5070e9b4..ed1e4155 100644 --- a/lib/saluki-config/src/refresher.rs +++ b/lib/saluki-config/src/refresher.rs @@ -6,6 +6,8 @@ use saluki_error::GenericError; use serde::Deserialize; use tokio::time::{sleep, Duration}; +const DATADOG_AGENT_CONFIG_ENDPOINT: &str = "https://localhost:5004/config/v1/"; + #[allow(dead_code)] #[derive(Debug, Deserialize)] struct ConfigResponse { @@ -61,18 +63,17 @@ impl ConfigRefresher { /// Query the datadog-agent config endpoint for the latest config #[allow(unused)] pub async fn query_agent(&self) -> Result<(), GenericError> { - let url = format!("https://localhost:5004/config/v1/"); let client = reqwest::ClientBuilder::new() .danger_accept_invalid_certs(true) // Allow invalid certificates .build()?; let response = client - .get(&url) // Use the URL for the GET request + .get(DATADOG_AGENT_CONFIG_ENDPOINT) .header("Content-Type", "application/json") .header("Authorization", format!("Bearer {}", self.token)) .header("DD-Agent-Version", "0.1.0") .header("User-Agent", "agent-data-plane/0.1.0") - .send() // Send the request + .send() .await?; let config_response: ConfigResponse = response.json().await?; From 2263ed09467d02ac317c50de2a19184505dacb17 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Tue, 26 Nov 2024 15:32:45 -0500 Subject: [PATCH 03/18] feedback --- Cargo.lock | 3 + bin/agent-data-plane/Cargo.toml | 1 + bin/agent-data-plane/src/main.rs | 8 +-- lib/saluki-components/Cargo.toml | 1 + .../src/destinations/datadog_metrics/mod.rs | 10 +-- .../datadog_metrics/request_builder.rs | 15 ++-- lib/saluki-config/Cargo.toml | 1 + lib/saluki-config/src/lib.rs | 2 +- lib/saluki-config/src/refresher.rs | 69 ++++++++++++------- 9 files changed, 70 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 17803a49..0c5e443d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,6 +21,7 @@ checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" name = "agent-data-plane" version = "0.1.0" dependencies = [ + "arc-swap", "async-trait", "bytesize", "chrono", @@ -3385,6 +3386,7 @@ name = "saluki-components" version = "0.1.0" dependencies = [ "ahash", + "arc-swap", "async-compression", "async-trait", "bitmask-enum", @@ -3439,6 +3441,7 @@ dependencies = [ name = "saluki-config" version = "0.1.0" dependencies = [ + "arc-swap", "figment", "http 1.1.0", "reqwest", diff --git a/bin/agent-data-plane/Cargo.toml b/bin/agent-data-plane/Cargo.toml index a3670d75..8e351c7b 100644 --- a/bin/agent-data-plane/Cargo.toml +++ b/bin/agent-data-plane/Cargo.toml @@ -10,6 +10,7 @@ default = [] fips = ["saluki-app/tls-fips"] [dependencies] +arc-swap = { workspace = true } async-trait = { workspace = true } bytesize = { workspace = true } memory-accounting = { workspace = true } diff --git a/bin/agent-data-plane/src/main.rs b/bin/agent-data-plane/src/main.rs index 0d3ac6db..8c0d8178 100644 --- a/bin/agent-data-plane/src/main.rs +++ b/bin/agent-data-plane/src/main.rs @@ -20,7 +20,7 @@ use saluki_components::{ AggregateConfiguration, ChainedConfiguration, HostEnrichmentConfiguration, OriginEnrichmentConfiguration, }, }; -use saluki_config::{ConfigRefresher, ConfigRefresherConfiguration, ConfigurationLoader, GenericConfiguration}; +use saluki_config::{ConfigurationLoader, GenericConfiguration, RefreshableConfiguration, RefresherConfiguration}; use saluki_core::topology::TopologyBlueprint; use saluki_error::{ErrorContext as _, GenericError}; use saluki_health::HealthRegistry; @@ -176,13 +176,13 @@ fn create_topology( .with_transform_builder(origin_enrichment_config); let internal_metrics_remap_config = AgentTelemetryRemapperConfiguration::new(); - let config_refresher_configuration = ConfigRefresherConfiguration::from_configuration(&configuration)?; - let config_refresher: Arc = Arc::new(config_refresher_configuration.build()?); + let config_refresher_configuration = RefresherConfiguration::from_configuration(&configuration)?; + let config_refresher: Arc = Arc::new(config_refresher_configuration.build()?); config_refresher.clone().spawn_refresh_task(); let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Metrics destination.")?; - dd_metrics_config.set_config_refresher(config_refresher); + dd_metrics_config.set_config_refresher(config_refresher.clone()); let events_service_checks_config = DatadogEventsServiceChecksConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Events/Service Checks destination.")?; diff --git a/lib/saluki-components/Cargo.toml b/lib/saluki-components/Cargo.toml index 42f1d87a..92772997 100644 --- a/lib/saluki-components/Cargo.toml +++ b/lib/saluki-components/Cargo.toml @@ -7,6 +7,7 @@ repository = { workspace = true } [dependencies] ahash = { workspace = true } +arc-swap = { workspace = true } async-compression = { workspace = true, features = ["tokio", "zlib"] } async-trait = { workspace = true } bitmask-enum = { workspace = true } diff --git a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs index 33c5f5bd..287e1d8c 100644 --- a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs +++ b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs @@ -1,12 +1,10 @@ -use std::{sync::Arc, time::Duration}; - use async_trait::async_trait; use http::{HeaderValue, Method, Request, Uri}; use http_body_util::BodyExt; use hyper::body::Incoming; use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; use metrics::Counter; -use saluki_config::{ConfigRefresher, GenericConfiguration}; +use saluki_config::{GenericConfiguration, RefreshableConfiguration}; use saluki_core::{ components::{destinations::*, ComponentContext, MetricsBuilder}, pooling::{FixedSizeObjectPool, ObjectPool}, @@ -23,6 +21,8 @@ use saluki_io::{ }; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr, PickFirst}; +use std::sync::Arc; +use std::time::Duration; use tokio::{ select, sync::{mpsc, oneshot}, @@ -190,7 +190,7 @@ pub struct DatadogMetricsConfiguration { endpoints: AdditionalEndpoints, #[serde(skip)] - config_refresher: Arc, + config_refresher: Arc, } fn default_request_timeout_secs() -> u64 { @@ -228,7 +228,7 @@ impl DatadogMetricsConfiguration { } /// Add option to refresh config with a remote source - pub fn set_config_refresher(&mut self, refresher: Arc) { + pub fn set_config_refresher(&mut self, refresher: Arc) { self.config_refresher = refresher; } } diff --git a/lib/saluki-components/src/destinations/datadog_metrics/request_builder.rs b/lib/saluki-components/src/destinations/datadog_metrics/request_builder.rs index 9fc0cd64..d0c7a9d3 100644 --- a/lib/saluki-components/src/destinations/datadog_metrics/request_builder.rs +++ b/lib/saluki-components/src/destinations/datadog_metrics/request_builder.rs @@ -5,7 +5,7 @@ use datadog_protos::metrics::{self as proto, Resource}; use ddsketch_agent::DDSketch; use http::{Method, Request, Uri}; use protobuf::CodedOutputStream; -use saluki_config::ConfigRefresher; +use saluki_config::RefreshableConfiguration; use saluki_core::pooling::ObjectPool; use saluki_event::metric::*; use saluki_io::net::client::replay::ReplayBody; @@ -49,6 +49,7 @@ pub enum RequestBuilderError { source: http::Error, }, FailedToCreateReplayBody, + FailedToGetApiKey, } impl RequestBuilderError { @@ -128,7 +129,7 @@ where uncompressed_len: usize, metrics_written: usize, scratch_buf_lens: Vec, - refresher: Arc, + refresher: Arc, } impl RequestBuilder @@ -138,7 +139,8 @@ where { /// Creates a new `RequestBuilder` for the given endpoint, using the specified API key and base URI. pub async fn new( - api_key: String, api_base_uri: Uri, endpoint: MetricsEndpoint, buffer_pool: O, refresher: Arc, + api_key: String, api_base_uri: Uri, endpoint: MetricsEndpoint, buffer_pool: O, + refresher: Arc, ) -> Result { let chunked_buffer_pool = ChunkedBufferObjectPool::new(buffer_pool); let compressor = create_compressor(&chunked_buffer_pool).await; @@ -358,7 +360,12 @@ where .uri(self.api_uri.clone()) .header("Content-Type", "application/x-protobuf") .header("Content-Encoding", "deflate") - .header("DD-API-KEY", self.refresher.api_key()) + .header( + "DD-API-KEY", + self.refresher + .get_typed::("api_key") + .map_err(|_| RequestBuilderError::FailedToGetApiKey)?, + ) // TODO: We can't access the version number of the package being built that _includes_ this library, so // using CARGO_PKG_VERSION or something like that would always be the version of `saluki-components`, which // isn't what we want... maybe we can figure out some way to shove it in a global somewhere or something? diff --git a/lib/saluki-config/Cargo.toml b/lib/saluki-config/Cargo.toml index c25857f3..47d49b06 100644 --- a/lib/saluki-config/Cargo.toml +++ b/lib/saluki-config/Cargo.toml @@ -6,6 +6,7 @@ license = { workspace = true } repository = { workspace = true } [dependencies] +arc-swap = { workspace = true } figment = { workspace = true, features = ["env", "json", "yaml"] } http = { workspace = true } reqwest = { workspace = true , features = ["default-tls", "json"]} diff --git a/lib/saluki-config/src/lib.rs b/lib/saluki-config/src/lib.rs index 14513b9b..a5d9bddc 100644 --- a/lib/saluki-config/src/lib.rs +++ b/lib/saluki-config/src/lib.rs @@ -15,7 +15,7 @@ mod provider; mod refresher; mod secrets; use self::provider::ResolvedProvider; -pub use self::refresher::{ConfigRefresher, ConfigRefresherConfiguration}; +pub use self::refresher::{RefreshableConfiguration, RefresherConfiguration}; /// A configuration error. #[derive(Debug, Snafu)] diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs index ed1e4155..a9fd7bf0 100644 --- a/lib/saluki-config/src/refresher.rs +++ b/lib/saluki-config/src/refresher.rs @@ -1,49 +1,43 @@ use std::sync::Arc; -use crate::GenericConfiguration; +use crate::{ConfigurationError, GenericConfiguration}; +use arc_swap::ArcSwap; use reqwest; use saluki_error::GenericError; -use serde::Deserialize; +use serde::{de::DeserializeOwned, Deserialize}; +use serde_json::Value; use tokio::time::{sleep, Duration}; const DATADOG_AGENT_CONFIG_ENDPOINT: &str = "https://localhost:5004/config/v1/"; -#[allow(dead_code)] -#[derive(Debug, Deserialize)] -struct ConfigResponse { - api_key: String, -} - -/// Configuration for setting up `ConfigRefresher`. +/// Configuration for setting up `RefreshableConfiguration`. #[derive(Default, Deserialize)] -pub struct ConfigRefresherConfiguration { - api_key: String, +pub struct RefresherConfiguration { auth_token_file_path: String, } /// The most recent configuration retrieved from the datadog-agent. #[derive(Default)] -pub struct ConfigRefresher { +pub struct RefreshableConfiguration { token: String, - api_key: String, + values: ArcSwap, } -#[allow(unused)] -impl ConfigRefresherConfiguration { +impl RefresherConfiguration { /// Creates a new `ConfigRefresherConfiguration` from the given configuration. pub fn from_configuration(config: &GenericConfiguration) -> Result { Ok(config.as_typed()?) } /// Create `ConfigRefresher` from `ConfigRefresherConfiguration` - pub fn build(&self) -> Result { + pub fn build(&self) -> Result { let raw_bearer_token = std::fs::read_to_string(&self.auth_token_file_path)?; - Ok(ConfigRefresher { + Ok(RefreshableConfiguration { token: raw_bearer_token, - api_key: self.api_key.clone(), + values: ArcSwap::from_pointee(serde_json::Value::Null), }) } } -impl ConfigRefresher { +impl RefreshableConfiguration { /// Start a task that queries the datadog-agent config endpoint every 15 seconds. pub fn spawn_refresh_task(self: Arc) { tokio::spawn(async move { @@ -61,7 +55,6 @@ impl ConfigRefresher { }); } /// Query the datadog-agent config endpoint for the latest config - #[allow(unused)] pub async fn query_agent(&self) -> Result<(), GenericError> { let client = reqwest::ClientBuilder::new() .danger_accept_invalid_certs(true) // Allow invalid certificates @@ -76,15 +69,39 @@ impl ConfigRefresher { .send() .await?; - let config_response: ConfigResponse = response.json().await?; - - // self.api_key = config_response.api_key; + let config_response: Value = response.json().await?; + self.values.store(Arc::new(config_response)); Ok(()) } - /// Most recent `api_key` retrieved from the datadog-agent. - pub fn api_key(&self) -> &str { - &self.api_key + /// Gets a configuration value by key. + /// + /// + /// ## Errors + /// + /// If the key does not exist in the configuration, or if the value could not be deserialized into `T`, an error + /// variant will be returned. + pub fn get_typed<'a, T>(&self, key: &str) -> Result + where + T: DeserializeOwned, + { + let values = self.values.load(); + match values.get(key) { + Some(value) => { + // Attempt to deserialize the value to type T + serde_json::from_value(value.clone()).map_err(|_| ConfigurationError::MissingField { + help_text: format!( + "RefreshableConfiguration could not convert key {} value into the proper type.", + key + ), + field: format!("{}", key).into(), + }) + } + None => Err(ConfigurationError::MissingField { + help_text: format!("RefreshableConfiguration missing key {}", key), + field: format!("{}", key).into(), + }), + } } } From d0ede29f563f332c19824704758b26b0a5b093e0 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Tue, 26 Nov 2024 15:42:07 -0500 Subject: [PATCH 04/18] update naming --- bin/agent-data-plane/src/main.rs | 2 +- lib/saluki-components/src/destinations/datadog_metrics/mod.rs | 4 ++-- lib/saluki-config/src/refresher.rs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/bin/agent-data-plane/src/main.rs b/bin/agent-data-plane/src/main.rs index 8c0d8178..4f3398ec 100644 --- a/bin/agent-data-plane/src/main.rs +++ b/bin/agent-data-plane/src/main.rs @@ -182,7 +182,7 @@ fn create_topology( let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Metrics destination.")?; - dd_metrics_config.set_config_refresher(config_refresher.clone()); + dd_metrics_config.add_refreshable_configuration(config_refresher.clone()); let events_service_checks_config = DatadogEventsServiceChecksConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Events/Service Checks destination.")?; diff --git a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs index 287e1d8c..d8ae11c9 100644 --- a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs +++ b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs @@ -227,8 +227,8 @@ impl DatadogMetricsConfiguration { calculate_api_endpoint(self.dd_url.as_deref(), &self.site) } - /// Add option to refresh config with a remote source - pub fn set_config_refresher(&mut self, refresher: Arc) { + /// Add option to retrieve config values from a `RefreshableConfiguration` + pub fn add_refreshable_configuration(&mut self, refresher: Arc) { self.config_refresher = refresher; } } diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs index a9fd7bf0..a40df120 100644 --- a/lib/saluki-config/src/refresher.rs +++ b/lib/saluki-config/src/refresher.rs @@ -24,11 +24,11 @@ pub struct RefreshableConfiguration { } impl RefresherConfiguration { - /// Creates a new `ConfigRefresherConfiguration` from the given configuration. + /// Creates a new `RefresherConfiguration` from the given configuration. pub fn from_configuration(config: &GenericConfiguration) -> Result { Ok(config.as_typed()?) } - /// Create `ConfigRefresher` from `ConfigRefresherConfiguration` + /// Create `RefreshableConfiguration` from `RefresherConfiguration` pub fn build(&self) -> Result { let raw_bearer_token = std::fs::read_to_string(&self.auth_token_file_path)?; Ok(RefreshableConfiguration { From 7f80b3aabe5442bc6a10bad9f5adbcfff48d8703 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Tue, 26 Nov 2024 15:45:43 -0500 Subject: [PATCH 05/18] more renaming --- bin/agent-data-plane/src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bin/agent-data-plane/src/main.rs b/bin/agent-data-plane/src/main.rs index 4f3398ec..0ab0d10d 100644 --- a/bin/agent-data-plane/src/main.rs +++ b/bin/agent-data-plane/src/main.rs @@ -176,13 +176,13 @@ fn create_topology( .with_transform_builder(origin_enrichment_config); let internal_metrics_remap_config = AgentTelemetryRemapperConfiguration::new(); - let config_refresher_configuration = RefresherConfiguration::from_configuration(&configuration)?; - let config_refresher: Arc = Arc::new(config_refresher_configuration.build()?); - config_refresher.clone().spawn_refresh_task(); + let refresher_configuration = RefresherConfiguration::from_configuration(&configuration)?; + let refreshable_configuration: Arc = Arc::new(refresher_configuration.build()?); + refreshable_configuration.clone().spawn_refresh_task(); let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Metrics destination.")?; - dd_metrics_config.add_refreshable_configuration(config_refresher.clone()); + dd_metrics_config.add_refreshable_configuration(refreshable_configuration.clone()); let events_service_checks_config = DatadogEventsServiceChecksConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Events/Service Checks destination.")?; From aff7f990dacafe40a0e330344abd65990f9b55b3 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Tue, 26 Nov 2024 16:27:46 -0500 Subject: [PATCH 06/18] partial feedback 2 --- Cargo.lock | 2 - bin/agent-data-plane/Cargo.toml | 1 - bin/agent-data-plane/src/main.rs | 5 +- lib/saluki-components/Cargo.toml | 1 - .../src/destinations/datadog_metrics/mod.rs | 7 +- lib/saluki-config/Cargo.toml | 4 +- lib/saluki-config/src/refresher.rs | 65 ++++++++++++++----- 7 files changed, 56 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c5e443d..d28da2c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,7 +21,6 @@ checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" name = "agent-data-plane" version = "0.1.0" dependencies = [ - "arc-swap", "async-trait", "bytesize", "chrono", @@ -3386,7 +3385,6 @@ name = "saluki-components" version = "0.1.0" dependencies = [ "ahash", - "arc-swap", "async-compression", "async-trait", "bitmask-enum", diff --git a/bin/agent-data-plane/Cargo.toml b/bin/agent-data-plane/Cargo.toml index 8e351c7b..a3670d75 100644 --- a/bin/agent-data-plane/Cargo.toml +++ b/bin/agent-data-plane/Cargo.toml @@ -10,7 +10,6 @@ default = [] fips = ["saluki-app/tls-fips"] [dependencies] -arc-swap = { workspace = true } async-trait = { workspace = true } bytesize = { workspace = true } memory-accounting = { workspace = true } diff --git a/bin/agent-data-plane/src/main.rs b/bin/agent-data-plane/src/main.rs index 0ab0d10d..5c2a26c4 100644 --- a/bin/agent-data-plane/src/main.rs +++ b/bin/agent-data-plane/src/main.rs @@ -176,9 +176,8 @@ fn create_topology( .with_transform_builder(origin_enrichment_config); let internal_metrics_remap_config = AgentTelemetryRemapperConfiguration::new(); - let refresher_configuration = RefresherConfiguration::from_configuration(&configuration)?; - let refreshable_configuration: Arc = Arc::new(refresher_configuration.build()?); - refreshable_configuration.clone().spawn_refresh_task(); + let refresher_configuration = RefresherConfiguration::from_configuration(configuration)?; + let refreshable_configuration: Arc = refresher_configuration.build()?; let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Metrics destination.")?; diff --git a/lib/saluki-components/Cargo.toml b/lib/saluki-components/Cargo.toml index 92772997..42f1d87a 100644 --- a/lib/saluki-components/Cargo.toml +++ b/lib/saluki-components/Cargo.toml @@ -7,7 +7,6 @@ repository = { workspace = true } [dependencies] ahash = { workspace = true } -arc-swap = { workspace = true } async-compression = { workspace = true, features = ["tokio", "zlib"] } async-trait = { workspace = true } bitmask-enum = { workspace = true } diff --git a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs index d8ae11c9..022c483c 100644 --- a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs +++ b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs @@ -1,3 +1,6 @@ +use std::sync::Arc; +use std::time::Duration; + use async_trait::async_trait; use http::{HeaderValue, Method, Request, Uri}; use http_body_util::BodyExt; @@ -21,8 +24,6 @@ use saluki_io::{ }; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr, PickFirst}; -use std::sync::Arc; -use std::time::Duration; use tokio::{ select, sync::{mpsc, oneshot}, @@ -227,7 +228,7 @@ impl DatadogMetricsConfiguration { calculate_api_endpoint(self.dd_url.as_deref(), &self.site) } - /// Add option to retrieve config values from a `RefreshableConfiguration` + /// Add option to retrieve configuration values from a `RefreshableConfiguration`. pub fn add_refreshable_configuration(&mut self, refresher: Arc) { self.config_refresher = refresher; } diff --git a/lib/saluki-config/Cargo.toml b/lib/saluki-config/Cargo.toml index 47d49b06..f74fbfb0 100644 --- a/lib/saluki-config/Cargo.toml +++ b/lib/saluki-config/Cargo.toml @@ -9,9 +9,9 @@ repository = { workspace = true } arc-swap = { workspace = true } figment = { workspace = true, features = ["env", "json", "yaml"] } http = { workspace = true } -reqwest = { workspace = true , features = ["default-tls", "json"]} -saluki-io = { workspace = true } +reqwest = { workspace = true, features = ["default-tls", "json"] } saluki-error = { workspace = true } +saluki-io = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } snafu = { workspace = true } diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs index a40df120..9aa6c104 100644 --- a/lib/saluki-config/src/refresher.rs +++ b/lib/saluki-config/src/refresher.rs @@ -1,26 +1,36 @@ use std::sync::Arc; -use crate::{ConfigurationError, GenericConfiguration}; use arc_swap::ArcSwap; -use reqwest; +use reqwest::ClientBuilder; use saluki_error::GenericError; use serde::{de::DeserializeOwned, Deserialize}; use serde_json::Value; use tokio::time::{sleep, Duration}; +use tracing::{debug, error}; + +use crate::{ConfigurationError, GenericConfiguration}; const DATADOG_AGENT_CONFIG_ENDPOINT: &str = "https://localhost:5004/config/v1/"; +const DEFAULT_REFRESH_INTERVAL_SECONDS: u64 = 15; /// Configuration for setting up `RefreshableConfiguration`. #[derive(Default, Deserialize)] pub struct RefresherConfiguration { auth_token_file_path: String, + #[serde(default = "default_refresh_interval_seconds")] + refresh_interval_seconds: u64, } -/// The most recent configuration retrieved from the datadog-agent. +fn default_refresh_interval_seconds() -> u64 { + DEFAULT_REFRESH_INTERVAL_SECONDS +} + +/// A configuration whose values are refreshed from a remote source at runtime. #[derive(Default)] pub struct RefreshableConfiguration { token: String, values: ArcSwap, + refresh_interval_seconds: u64, } impl RefresherConfiguration { @@ -28,29 +38,50 @@ impl RefresherConfiguration { pub fn from_configuration(config: &GenericConfiguration) -> Result { Ok(config.as_typed()?) } - /// Create `RefreshableConfiguration` from `RefresherConfiguration` - pub fn build(&self) -> Result { + /// Create `RefreshableConfiguration` from `RefresherConfiguration`. + pub fn build(&self) -> Result, GenericError> { let raw_bearer_token = std::fs::read_to_string(&self.auth_token_file_path)?; - Ok(RefreshableConfiguration { + let refreshable_configuration = Arc::new(RefreshableConfiguration { token: raw_bearer_token, values: ArcSwap::from_pointee(serde_json::Value::Null), - }) + refresh_interval_seconds: self.refresh_interval_seconds, + }); + refreshable_configuration.clone().spawn_refresh_task(); + Ok(refreshable_configuration) } } impl RefreshableConfiguration { /// Start a task that queries the datadog-agent config endpoint every 15 seconds. - pub fn spawn_refresh_task(self: Arc) { + fn spawn_refresh_task(self: Arc) { tokio::spawn(async move { + let client = ClientBuilder::new() + .danger_accept_invalid_certs(true) // Allow invalid certificates + .build() + .expect("failed to create http client"); loop { - match self.query_agent().await { - Ok(_) => { - // todo()! + let response = client + .get(DATADOG_AGENT_CONFIG_ENDPOINT) + .header("Content-Type", "application/json") + .header("Authorization", format!("Bearer {}", self.token)) + .header("DD-Agent-Version", "0.1.0") + .header("User-Agent", "agent-data-plane/0.1.0") + .send() + .await; + match response { + Ok(response) => { + let config_response: Value = response + .json() + .await + .expect("failed to deserialize configuration into json"); + self.values.store(Arc::new(config_response)); + debug!("Retrieved configuration from datadog-agent."); } - Err(_e) => { - // todo()! + Err(e) => { + error!("Error retrieving configuration from datadog-agent: {}", e); } } - sleep(Duration::from_secs(15)).await; // Wait for 15 seconds + + sleep(Duration::from_secs(self.refresh_interval_seconds)).await; } }); } @@ -82,7 +113,7 @@ impl RefreshableConfiguration { /// /// If the key does not exist in the configuration, or if the value could not be deserialized into `T`, an error /// variant will be returned. - pub fn get_typed<'a, T>(&self, key: &str) -> Result + pub fn get_typed(&self, key: &str) -> Result where T: DeserializeOwned, { @@ -95,12 +126,12 @@ impl RefreshableConfiguration { "RefreshableConfiguration could not convert key {} value into the proper type.", key ), - field: format!("{}", key).into(), + field: key.to_string().into(), }) } None => Err(ConfigurationError::MissingField { help_text: format!("RefreshableConfiguration missing key {}", key), - field: format!("{}", key).into(), + field: key.to_string().into(), }), } } From 01a7b0fa6ad03ce64b4175a95c1949d5f14d9d69 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Tue, 26 Nov 2024 16:32:34 -0500 Subject: [PATCH 07/18] remove query_agent --- lib/saluki-config/src/refresher.rs | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs index 9aa6c104..bd80ffb9 100644 --- a/lib/saluki-config/src/refresher.rs +++ b/lib/saluki-config/src/refresher.rs @@ -85,26 +85,6 @@ impl RefreshableConfiguration { } }); } - /// Query the datadog-agent config endpoint for the latest config - pub async fn query_agent(&self) -> Result<(), GenericError> { - let client = reqwest::ClientBuilder::new() - .danger_accept_invalid_certs(true) // Allow invalid certificates - .build()?; - - let response = client - .get(DATADOG_AGENT_CONFIG_ENDPOINT) - .header("Content-Type", "application/json") - .header("Authorization", format!("Bearer {}", self.token)) - .header("DD-Agent-Version", "0.1.0") - .header("User-Agent", "agent-data-plane/0.1.0") - .send() - .await?; - - let config_response: Value = response.json().await?; - self.values.store(Arc::new(config_response)); - - Ok(()) - } /// Gets a configuration value by key. /// From 9cee99949f406df434479f73fe6f25b2e9b59095 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Wed, 27 Nov 2024 10:12:36 -0500 Subject: [PATCH 08/18] wrap Arc around ArcSwap value --- lib/saluki-config/src/refresher.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs index bd80ffb9..454c9a79 100644 --- a/lib/saluki-config/src/refresher.rs +++ b/lib/saluki-config/src/refresher.rs @@ -29,7 +29,7 @@ fn default_refresh_interval_seconds() -> u64 { #[derive(Default)] pub struct RefreshableConfiguration { token: String, - values: ArcSwap, + values: Arc>, refresh_interval_seconds: u64, } @@ -43,7 +43,7 @@ impl RefresherConfiguration { let raw_bearer_token = std::fs::read_to_string(&self.auth_token_file_path)?; let refreshable_configuration = Arc::new(RefreshableConfiguration { token: raw_bearer_token, - values: ArcSwap::from_pointee(serde_json::Value::Null), + values: Arc::new(ArcSwap::from_pointee(serde_json::Value::Null)), refresh_interval_seconds: self.refresh_interval_seconds, }); refreshable_configuration.clone().spawn_refresh_task(); From 3cdaf4dd655e3b4009cfb94199e6f79fc9d804c6 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Wed, 27 Nov 2024 10:21:01 -0500 Subject: [PATCH 09/18] update doc --- lib/saluki-config/src/refresher.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs index 454c9a79..2f44de78 100644 --- a/lib/saluki-config/src/refresher.rs +++ b/lib/saluki-config/src/refresher.rs @@ -11,16 +11,29 @@ use tracing::{debug, error}; use crate::{ConfigurationError, GenericConfiguration}; const DATADOG_AGENT_CONFIG_ENDPOINT: &str = "https://localhost:5004/config/v1/"; +const DEFAULT_AUTH_TOKEN_FILE_PATH: &str = "/etc/datadog-agent/auth_token"; const DEFAULT_REFRESH_INTERVAL_SECONDS: u64 = 15; /// Configuration for setting up `RefreshableConfiguration`. #[derive(Default, Deserialize)] pub struct RefresherConfiguration { + /// The location of the auth token used by the datadog agent. + /// + /// Defaults to `/etc/datadog-agent/auth_token`.` + #[serde(default = "default_auth_token_file_path")] auth_token_file_path: String, + + /// The amount of time betweeen each request in seconds. + /// + /// Defaults to 15 seconds. #[serde(default = "default_refresh_interval_seconds")] refresh_interval_seconds: u64, } +fn default_auth_token_file_path() -> String { + DEFAULT_AUTH_TOKEN_FILE_PATH.to_owned() +} + fn default_refresh_interval_seconds() -> u64 { DEFAULT_REFRESH_INTERVAL_SECONDS } From 62558cb721c3d30473394f0c5a65703ba2b2fb6d Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Wed, 27 Nov 2024 10:28:15 -0500 Subject: [PATCH 10/18] add derive clone --- lib/saluki-config/src/refresher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs index 2f44de78..94133e48 100644 --- a/lib/saluki-config/src/refresher.rs +++ b/lib/saluki-config/src/refresher.rs @@ -39,7 +39,7 @@ fn default_refresh_interval_seconds() -> u64 { } /// A configuration whose values are refreshed from a remote source at runtime. -#[derive(Default)] +#[derive(Clone, Default)] pub struct RefreshableConfiguration { token: String, values: Arc>, From 9f21c09d2f92b2f68f58f711fe3244e2b35f8392 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Tue, 3 Dec 2024 10:51:24 -0500 Subject: [PATCH 11/18] merge main --- .../src/destinations/datadog_metrics/mod.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs index 955545ee..4c659f0b 100644 --- a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs +++ b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs @@ -140,11 +140,10 @@ pub struct DatadogMetricsConfiguration { /// /// Defaults to empty. #[serde(default, rename = "additional_endpoints")] - endpoints: AdditionalEndpoints, + additional_endpoints: AdditionalEndpoints, #[serde(skip)] config_refresher: Arc, - additional_endpoints: AdditionalEndpoints, } fn default_request_timeout_secs() -> u64 { @@ -578,11 +577,12 @@ fn for_resolved_endpoint( .expect("should not fail to construct new endpoint authority"); let new_uri_scheme = Scheme::try_from(endpoint.endpoint().scheme()).expect("should not fail to construct new endpoint scheme"); - let api_key_value = - HeaderValue::from_str(endpoint.api_key()).expect("should not fail to construct API key header value"); - let api_key = refresher - .get_typed::("api_key") - .expect("should not fail to retrieve API key from refreshable configuration"); + let mut api_key: String; + if let Ok(refresher_api_key) = refresher.get_typed::("api_key") { + api_key = refresher_api_key.clone(); + } else { + api_key = endpoint.api_key().to_string(); + } let api_key_value = HeaderValue::from_str(&api_key).expect("should not fail to construct API key header value"); move |mut request| { // Build an updated URI by taking the endpoint URL and slapping the request's URI path on the end of it. From 8fcaa380e39d19faf7d6ce0c59c466dbf78d821a Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Tue, 3 Dec 2024 13:46:37 -0500 Subject: [PATCH 12/18] feedback --- .../src/destinations/datadog_metrics/mod.rs | 3 +- .../datadog_metrics/request_builder.rs | 1 - lib/saluki-config/src/refresher.rs | 74 ++++++++++++++----- 3 files changed, 58 insertions(+), 20 deletions(-) diff --git a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs index 4c659f0b..36abc3e4 100644 --- a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs +++ b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs @@ -569,7 +569,6 @@ async fn run_endpoint_io_loop( task_barrier.wait().await; } -#[allow(unused)] fn for_resolved_endpoint( endpoint: ResolvedEndpoint, refresher: Arc, ) -> impl Fn(Request) -> Request + Clone { @@ -577,7 +576,7 @@ fn for_resolved_endpoint( .expect("should not fail to construct new endpoint authority"); let new_uri_scheme = Scheme::try_from(endpoint.endpoint().scheme()).expect("should not fail to construct new endpoint scheme"); - let mut api_key: String; + let api_key: String; if let Ok(refresher_api_key) = refresher.get_typed::("api_key") { api_key = refresher_api_key.clone(); } else { diff --git a/lib/saluki-components/src/destinations/datadog_metrics/request_builder.rs b/lib/saluki-components/src/destinations/datadog_metrics/request_builder.rs index c81ea195..a630d8bb 100644 --- a/lib/saluki-components/src/destinations/datadog_metrics/request_builder.rs +++ b/lib/saluki-components/src/destinations/datadog_metrics/request_builder.rs @@ -105,7 +105,6 @@ impl MetricsEndpoint { } } -#[allow(unused)] pub struct RequestBuilder where O: ObjectPool + 'static, diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs index 94133e48..71cba777 100644 --- a/lib/saluki-config/src/refresher.rs +++ b/lib/saluki-config/src/refresher.rs @@ -11,15 +11,16 @@ use tracing::{debug, error}; use crate::{ConfigurationError, GenericConfiguration}; const DATADOG_AGENT_CONFIG_ENDPOINT: &str = "https://localhost:5004/config/v1/"; +const DEFAULT_AGENT_IPC_HOST: &str = "localhost"; const DEFAULT_AUTH_TOKEN_FILE_PATH: &str = "/etc/datadog-agent/auth_token"; const DEFAULT_REFRESH_INTERVAL_SECONDS: u64 = 15; /// Configuration for setting up `RefreshableConfiguration`. #[derive(Default, Deserialize)] pub struct RefresherConfiguration { - /// The location of the auth token used by the datadog agent. + /// The location of the auth token used by the Datadog Agent. /// - /// Defaults to `/etc/datadog-agent/auth_token`.` + /// Defaults to `/etc/datadog-agent/auth_token`. #[serde(default = "default_auth_token_file_path")] auth_token_file_path: String, @@ -28,6 +29,18 @@ pub struct RefresherConfiguration { /// Defaults to 15 seconds. #[serde(default = "default_refresh_interval_seconds")] refresh_interval_seconds: u64, + + /// The IPC host used by the Datadog Agent. + /// + /// Defaults to `localhost`. + #[serde(default = "default_agent_ipc_host")] + agent_ipc_host: String, + + /// The IPC port used by the Datadog Agent. + /// + /// Defaults to `0`. + #[serde(default = "default_agent_ipc_port")] + agent_ipc_port: u64, } fn default_auth_token_file_path() -> String { @@ -38,9 +51,18 @@ fn default_refresh_interval_seconds() -> u64 { DEFAULT_REFRESH_INTERVAL_SECONDS } +fn default_agent_ipc_host() -> String { + DEFAULT_AGENT_IPC_HOST.to_owned() +} + +fn default_agent_ipc_port() -> u64 { + 0 +} + /// A configuration whose values are refreshed from a remote source at runtime. #[derive(Clone, Default)] pub struct RefreshableConfiguration { + endpoint: String, token: String, values: Arc>, refresh_interval_seconds: u64, @@ -51,21 +73,24 @@ impl RefresherConfiguration { pub fn from_configuration(config: &GenericConfiguration) -> Result { Ok(config.as_typed()?) } + /// Create `RefreshableConfiguration` from `RefresherConfiguration`. - pub fn build(&self) -> Result, GenericError> { + pub fn build(&self) -> Result { let raw_bearer_token = std::fs::read_to_string(&self.auth_token_file_path)?; - let refreshable_configuration = Arc::new(RefreshableConfiguration { + let endpoint = format!("https://{}:{}/config/v1", self.agent_ipc_host, self.agent_ipc_port); + let refreshable_configuration = RefreshableConfiguration { + endpoint, token: raw_bearer_token, values: Arc::new(ArcSwap::from_pointee(serde_json::Value::Null)), refresh_interval_seconds: self.refresh_interval_seconds, - }); - refreshable_configuration.clone().spawn_refresh_task(); + }; + refreshable_configuration.spawn_refresh_task(); Ok(refreshable_configuration) } } impl RefreshableConfiguration { /// Start a task that queries the datadog-agent config endpoint every 15 seconds. - fn spawn_refresh_task(self: Arc) { + fn spawn_refresh_task(self) { tokio::spawn(async move { let client = ClientBuilder::new() .danger_accept_invalid_certs(true) // Allow invalid certificates @@ -73,7 +98,7 @@ impl RefreshableConfiguration { .expect("failed to create http client"); loop { let response = client - .get(DATADOG_AGENT_CONFIG_ENDPOINT) + .get(self.endpoint.clone()) .header("Content-Type", "application/json") .header("Authorization", format!("Bearer {}", self.token)) .header("DD-Agent-Version", "0.1.0") @@ -87,10 +112,16 @@ impl RefreshableConfiguration { .await .expect("failed to deserialize configuration into json"); self.values.store(Arc::new(config_response)); - debug!("Retrieved configuration from datadog-agent."); + debug!( + remote_endpoint = self.endpoint, + "Retrieved configuration from remote source." + ); } Err(e) => { - error!("Error retrieving configuration from datadog-agent: {}", e); + error!( + remote_endpoint = self.endpoint, + "Failed to retrieve configuration from remote source: {}", e + ); } } @@ -114,18 +145,27 @@ impl RefreshableConfiguration { match values.get(key) { Some(value) => { // Attempt to deserialize the value to type T - serde_json::from_value(value.clone()).map_err(|_| ConfigurationError::MissingField { - help_text: format!( - "RefreshableConfiguration could not convert key {} value into the proper type.", - key - ), - field: key.to_string().into(), + serde_json::from_value(value.clone()).map_err(|_| ConfigurationError::InvalidFieldType { + field: key.to_string(), + expected_ty: std::any::type_name::().to_string(), + actual_ty: serde_json_value_type_name(value).to_string(), }) } None => Err(ConfigurationError::MissingField { - help_text: format!("RefreshableConfiguration missing key {}", key), + help_text: format!("Try validating remote source provides this field."), field: key.to_string().into(), }), } } } + +fn serde_json_value_type_name(value: &Value) -> &'static str { + match value { + Value::Null => "null", + Value::Bool(_) => "bool", + Value::Number(_) => "number", + Value::String(_) => "string", + Value::Array(_) => "array", + Value::Object(_) => "object", + } +} From 3bf6a42123c8c4e2bdd85b29f3b0ab005a2ad660 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Tue, 3 Dec 2024 15:52:24 -0500 Subject: [PATCH 13/18] Use Optional refresher config on primary endpoint --- bin/agent-data-plane/src/main.rs | 3 +- .../datadog_metrics/endpoint/endpoints.rs | 33 ++++++++++++++++-- .../src/destinations/datadog_metrics/mod.rs | 31 ++++++----------- lib/saluki-config/src/refresher.rs | 34 ++++++++++++++++--- 4 files changed, 72 insertions(+), 29 deletions(-) diff --git a/bin/agent-data-plane/src/main.rs b/bin/agent-data-plane/src/main.rs index 6a532d33..08775b5a 100644 --- a/bin/agent-data-plane/src/main.rs +++ b/bin/agent-data-plane/src/main.rs @@ -7,7 +7,6 @@ #![deny(missing_docs)] use std::{ future::pending, - sync::Arc, time::{Duration, Instant}, }; @@ -179,7 +178,7 @@ fn create_topology( let internal_metrics_remap_config = AgentTelemetryRemapperConfiguration::new(); let refresher_configuration = RefresherConfiguration::from_configuration(configuration)?; - let refreshable_configuration: Arc = refresher_configuration.build()?; + let refreshable_configuration: RefreshableConfiguration = refresher_configuration.build()?; let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Metrics destination.")?; diff --git a/lib/saluki-components/src/destinations/datadog_metrics/endpoint/endpoints.rs b/lib/saluki-components/src/destinations/datadog_metrics/endpoint/endpoints.rs index ea431e5b..6632ffea 100644 --- a/lib/saluki-components/src/destinations/datadog_metrics/endpoint/endpoints.rs +++ b/lib/saluki-components/src/destinations/datadog_metrics/endpoint/endpoints.rs @@ -5,11 +5,12 @@ use std::{ }; use regex::Regex; +use saluki_config::RefreshableConfiguration; use saluki_metadata; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr, OneOrMany, PickFirst}; use snafu::{ResultExt, Snafu}; -use tracing::debug; +use tracing::{debug, error}; use url::Url; use crate::destinations::datadog_metrics::DEFAULT_SITE; @@ -82,6 +83,7 @@ impl AdditionalEndpoints { resolved.push(ResolvedEndpoint { endpoint: endpoint.clone(), api_key: trimmed_api_key.to_string(), + config: None, }); } } @@ -98,6 +100,7 @@ impl AdditionalEndpoints { pub struct ResolvedEndpoint { endpoint: Url, api_key: String, + config: Option, } impl ResolvedEndpoint { @@ -113,16 +116,42 @@ impl ResolvedEndpoint { Ok(Self { endpoint, api_key: api_key.to_string(), + config: None, }) } + /// Creates a new `ResolvedEndpoint` instance from an existing `ResolvedEndpoint`, adding an optional `RefreshableConfiguration`. + pub fn with_refreshable_configuration(self, config: Option) -> Self { + Self { + endpoint: self.endpoint, + api_key: self.api_key, + config, + } + } + /// Returns the endpoint of the resolver. pub fn endpoint(&self) -> &Url { &self.endpoint } /// Returns the API key associated with the endpoint. - pub fn api_key(&self) -> &str { + pub fn api_key(&mut self) -> &str { + if let Some(config) = &self.config { + match config.try_get_typed::("api_key") { + Ok(Some(api_key)) => { + if !api_key.is_empty() && self.api_key != api_key { + debug!("Refreshing api key."); + self.api_key = api_key; + } + } + Ok(None) => { + debug!("Failed to retrieve api key from remote source. Falling back to last known api key."); + } + Err(_) => { + error!("Failed to retrieve api key from remote source. Falling back to last known api key."); + } + } + } self.api_key.as_str() } } diff --git a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs index 36abc3e4..bd863083 100644 --- a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs +++ b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs @@ -143,7 +143,7 @@ pub struct DatadogMetricsConfiguration { additional_endpoints: AdditionalEndpoints, #[serde(skip)] - config_refresher: Arc, + config_refresher: Option, } fn default_request_timeout_secs() -> u64 { @@ -177,8 +177,8 @@ impl DatadogMetricsConfiguration { } /// Add option to retrieve configuration values from a `RefreshableConfiguration`. - pub fn add_refreshable_configuration(&mut self, refresher: Arc) { - self.config_refresher = refresher; + pub fn add_refreshable_configuration(&mut self, refresher: RefreshableConfiguration) { + self.config_refresher = Some(refresher); } } @@ -204,7 +204,8 @@ impl DestinationBuilder for DatadogMetricsConfiguration { // Resolve all endpoints that we'll be forwarding metrics to. let primary_endpoint = calculate_resolved_endpoint(self.dd_url.as_deref(), &self.site, &self.api_key) - .error_context("Failed parsing/resolving the primary destination endpoint.")?; + .error_context("Failed parsing/resolving the primary destination endpoint.")? + .with_refreshable_configuration(self.config_refresher.clone()); let additional_endpoints = self .additional_endpoints @@ -270,7 +271,7 @@ where series_request_builder: RequestBuilder, sketches_request_builder: RequestBuilder, endpoints: Vec, - refresher: Arc, + refresher: Option, } #[allow(unused)] @@ -304,7 +305,6 @@ where telemetry.clone(), context.component_context(), endpoints, - refresher, )); health.mark_ready(); @@ -444,7 +444,6 @@ where async fn run_io_loop( mut requests_rx: mpsc::Receiver<(usize, Request)>, io_shutdown_tx: oneshot::Sender<()>, service: S, telemetry: ComponentTelemetry, component_context: ComponentContext, resolved_endpoints: Vec, - refresher: Arc, ) where S: Service, Response = hyper::Response> + Clone + Send + 'static, S::Future: Send, @@ -470,7 +469,6 @@ async fn run_io_loop( telemetry.clone(), component_context.clone(), resolved_endpoint, - refresher.clone(), )); endpoint_txs.push((endpoint_url, endpoint_tx)); @@ -505,7 +503,6 @@ async fn run_io_loop( async fn run_endpoint_io_loop( mut requests_rx: mpsc::Receiver<(usize, Request)>, task_barrier: Arc, service: S, telemetry: ComponentTelemetry, context: ComponentContext, endpoint: ResolvedEndpoint, - refresher: Arc, ) where S: Service, Response = hyper::Response> + Send + 'static, S::Future: Send, @@ -521,7 +518,7 @@ async fn run_endpoint_io_loop( // of the URI, adding the API key as a header, and so on. let mut service = ServiceBuilder::new() // Set the request's URI to the endpoint's URI, and add the API key as a header. - .map_request(for_resolved_endpoint::(endpoint, refresher.clone())) + .map_request(for_resolved_endpoint::(endpoint)) // Set the User-Agent and DD-Agent-Version headers indicating the version of the data plane sending the request. .map_request(with_version_info::()) // Add telemetry about the requests. @@ -569,20 +566,11 @@ async fn run_endpoint_io_loop( task_barrier.wait().await; } -fn for_resolved_endpoint( - endpoint: ResolvedEndpoint, refresher: Arc, -) -> impl Fn(Request) -> Request + Clone { +fn for_resolved_endpoint(mut endpoint: ResolvedEndpoint) -> impl FnMut(Request) -> Request + Clone { let new_uri_authority = Authority::try_from(endpoint.endpoint().authority()) .expect("should not fail to construct new endpoint authority"); let new_uri_scheme = Scheme::try_from(endpoint.endpoint().scheme()).expect("should not fail to construct new endpoint scheme"); - let api_key: String; - if let Ok(refresher_api_key) = refresher.get_typed::("api_key") { - api_key = refresher_api_key.clone(); - } else { - api_key = endpoint.api_key().to_string(); - } - let api_key_value = HeaderValue::from_str(&api_key).expect("should not fail to construct API key header value"); move |mut request| { // Build an updated URI by taking the endpoint URL and slapping the request's URI path on the end of it. let new_uri = Uri::builder() @@ -591,7 +579,8 @@ fn for_resolved_endpoint( .path_and_query(request.uri().path_and_query().expect("request path must exist").clone()) .build() .expect("should not fail to construct new URI"); - + let api_key = endpoint.api_key(); + let api_key_value = HeaderValue::from_str(api_key).expect("should not fail to construct API key header value"); *request.uri_mut() = new_uri; // Add the API key as a header. diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs index 71cba777..0fd61bdd 100644 --- a/lib/saluki-config/src/refresher.rs +++ b/lib/saluki-config/src/refresher.rs @@ -10,7 +10,6 @@ use tracing::{debug, error}; use crate::{ConfigurationError, GenericConfiguration}; -const DATADOG_AGENT_CONFIG_ENDPOINT: &str = "https://localhost:5004/config/v1/"; const DEFAULT_AGENT_IPC_HOST: &str = "localhost"; const DEFAULT_AUTH_TOKEN_FILE_PATH: &str = "/etc/datadog-agent/auth_token"; const DEFAULT_REFRESH_INTERVAL_SECONDS: u64 = 15; @@ -60,7 +59,7 @@ fn default_agent_ipc_port() -> u64 { } /// A configuration whose values are refreshed from a remote source at runtime. -#[derive(Clone, Default)] +#[derive(Clone, Debug, Default)] pub struct RefreshableConfiguration { endpoint: String, token: String, @@ -84,7 +83,7 @@ impl RefresherConfiguration { values: Arc::new(ArcSwap::from_pointee(serde_json::Value::Null)), refresh_interval_seconds: self.refresh_interval_seconds, }; - refreshable_configuration.spawn_refresh_task(); + refreshable_configuration.clone().spawn_refresh_task(); Ok(refreshable_configuration) } } @@ -152,11 +151,38 @@ impl RefreshableConfiguration { }) } None => Err(ConfigurationError::MissingField { - help_text: format!("Try validating remote source provides this field."), + help_text: "Try validating remote source provides this field.".to_string(), field: key.to_string().into(), }), } } + + /// Gets a configuration value by key, if it exists. + /// + /// If the key exists in the configuration, and can be deserialized, `Ok(Some(value))` is returned. Otherwise, + /// `Ok(None)` will be returned. + /// + /// ## Errors + /// + /// If the value could not be deserialized into `T`, an error will be returned. + pub fn try_get_typed(&self, key: &str) -> Result, ConfigurationError> + where + T: DeserializeOwned, + { + let values = self.values.load(); + match values.get(key) { + Some(value) => { + serde_json::from_value(value.clone()) + .map(Some) + .map_err(|_| ConfigurationError::InvalidFieldType { + field: key.to_string(), + expected_ty: std::any::type_name::().to_string(), + actual_ty: serde_json_value_type_name(value).to_string(), + }) + } + None => Ok(None), + } + } } fn serde_json_value_type_name(value: &Value) -> &'static str { From e90bd3f459e85ce2a5f43065c93130d1f8366614 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Tue, 3 Dec 2024 16:28:38 -0500 Subject: [PATCH 14/18] attempt fix pipeline --- Cargo.lock | 196 ++++++++++++++++------------------- LICENSE-3rdparty.csv | 7 ++ lib/saluki-config/Cargo.toml | 2 +- 3 files changed, 96 insertions(+), 109 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3c6b6659..ce5fc8cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -429,7 +429,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 1.1.0", "shlex", "syn 2.0.87", "which", @@ -977,7 +977,7 @@ dependencies = [ "lazy_static", "mintex", "parking_lot", - "rustc-hash", + "rustc-hash 1.1.0", "serde", "serde_json", "thousands", @@ -1102,21 +1102,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1590,6 +1575,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", + "webpki-roots", ] [[package]] @@ -1617,22 +1603,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "hyper-tls" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" -dependencies = [ - "bytes", - "http-body-util", - "hyper 1.5.0", - "hyper-util", - "native-tls", - "tokio", - "tokio-native-tls", - "tower-service", -] - [[package]] name = "hyper-util" version = "0.1.10" @@ -2032,7 +2002,7 @@ dependencies = [ "prost 0.11.9", "rand", "rmp-serde", - "rustc-hash", + "rustc-hash 1.1.0", "serde", "serde_json", "serde_tuple", @@ -2282,23 +2252,6 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" -[[package]] -name = "native-tls" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" -dependencies = [ - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "ndarray" version = "0.16.1" @@ -2432,50 +2385,12 @@ version = "11.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" -[[package]] -name = "openssl" -version = "0.10.66" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" -dependencies = [ - "bitflags 2.6.0", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.87", -] - [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "openssl-sys" -version = "0.9.103" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "opentelemetry" version = "0.18.0" @@ -3079,6 +2994,54 @@ dependencies = [ "parking_lot", ] +[[package]] +name = "quinn" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b22d8e7369034b9a7132bc2008cac12f2013c8132b45e0554e6e20e2617f2156" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash 2.1.0", + "rustls", + "socket2", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba92fb39ec7ad06ca2582c0ca834dfeadcaf06ddfc8e635c80aa7e1c05315fdd" +dependencies = [ + "bytes", + "rand", + "ring", + "rustc-hash 2.1.0", + "rustls", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bffec3605b73c6f1754535084a85229fa8a30f86014e6c81aeec4abb68b0285" +dependencies = [ + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "quote" version = "1.0.37" @@ -3240,29 +3203,32 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper 1.5.0", - "hyper-tls", + "hyper-rustls", "hyper-util", "ipnet", "js-sys", "log", "mime", - "native-tls", "once_cell", "percent-encoding", "pin-project-lite", + "quinn", + "rustls", "rustls-pemfile", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper 1.0.1", "tokio", - "tokio-native-tls", + "tokio-rustls", "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots", "windows-registry", ] @@ -3315,6 +3281,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hash" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7fb8039b3032c191086b10f11f319a6e99e1e82889c5cc6046f515c9db1d497" + [[package]] name = "rustix" version = "0.38.39" @@ -4294,6 +4266,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "445e881f4f6d382d5f27c034e25eb92edd7c784ceab92a0937db7f2e9471b938" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.41.1" @@ -4332,16 +4319,6 @@ dependencies = [ "syn 2.0.87", ] -[[package]] -name = "tokio-native-tls" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" -dependencies = [ - "native-tls", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.26.0" @@ -4804,12 +4781,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "version_check" version = "0.9.5" @@ -4918,6 +4889,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd7c23921eeb1713a4e851530e9b9756e4fb0e89978582942612524cf09f01cd" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 93c6b6d4..daa510bb 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -211,6 +211,9 @@ protobuf,https://github.com/stepancheg/rust-protobuf,MIT,Stepan Koltsov quanta,https://github.com/metrics-rs/quanta,MIT,Toby Lawrence quick_cache,https://github.com/arthurprs/quick-cache,MIT,Arthur Silva +quinn,https://github.com/quinn-rs/quinn,MIT OR Apache-2.0,The quinn Authors +quinn-proto,https://github.com/quinn-rs/quinn,MIT OR Apache-2.0,The quinn-proto Authors +quinn-udp,https://github.com/quinn-rs/quinn,MIT OR Apache-2.0,The quinn-udp Authors quote,https://github.com/dtolnay/quote,MIT OR Apache-2.0,David Tolnay rand,https://github.com/rust-random/rand,MIT OR Apache-2.0,"The Rand Project Developers, The Rust Project Developers" rand_chacha,https://github.com/rust-random/rand,MIT OR Apache-2.0,"The Rand Project Developers, The Rust Project Developers, The CryptoCorrosion Contributors" @@ -230,6 +233,7 @@ rmp,https://github.com/3Hren/msgpack-rust,MIT,Evgeny Safronov rustc-demangle,https://github.com/rust-lang/rustc-demangle,MIT OR Apache-2.0,Alex Crichton rustc-hash,https://github.com/rust-lang-nursery/rustc-hash,Apache-2.0 OR MIT,The Rust Project Developers +rustc-hash,https://github.com/rust-lang/rustc-hash,Apache-2.0 OR MIT,The Rust Project Developers rustix,https://github.com/bytecodealliance/rustix,Apache-2.0 WITH LLVM-exception OR Apache-2.0 OR MIT,"Dan Gohman , Jakub Konka " rustls,https://github.com/rustls/rustls,Apache-2.0 OR ISC OR MIT,The rustls Authors rustls-native-certs,https://github.com/rustls/rustls-native-certs,Apache-2.0 OR ISC OR MIT,The rustls-native-certs Authors @@ -282,6 +286,8 @@ thread_local,https://github.com/Amanieu/thread_local-rs,MIT OR Apache-2.0,Amanie time,https://github.com/time-rs/time,MIT OR Apache-2.0,"Jacob Pratt , Time contributors" tinystr,https://github.com/unicode-org/icu4x,Unicode-3.0,The ICU4X Project Developers tinytemplate,https://github.com/bheisler/TinyTemplate,Apache-2.0 OR MIT,Brook Heisler +tinyvec,https://github.com/Lokathor/tinyvec,Zlib OR Apache-2.0 OR MIT,Lokathor +tinyvec_macros,https://github.com/Soveu/tinyvec_macros,MIT OR Apache-2.0 OR Zlib,Soveu tokio,https://github.com/tokio-rs/tokio,MIT,Tokio Contributors tokio-io-timeout,https://github.com/sfackler/tokio-io-timeout,MIT OR Apache-2.0,Steven Fackler tokio-rustls,https://github.com/rustls/tokio-rustls,MIT OR Apache-2.0,The tokio-rustls Authors @@ -322,6 +328,7 @@ wasm-bindgen-macro,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/m wasm-bindgen-macro-support,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/macro-support,MIT OR Apache-2.0,The wasm-bindgen Developers wasm-bindgen-shared,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/shared,MIT OR Apache-2.0,The wasm-bindgen Developers web-sys,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/web-sys,MIT OR Apache-2.0,The wasm-bindgen Developers +webpki-roots,https://github.com/rustls/webpki-roots,MPL-2.0,The webpki-roots Authors which,https://github.com/harryfei/which-rs,MIT,Harry Fei winapi,https://github.com/retep998/winapi-rs,MIT OR Apache-2.0,Peter Atashian winapi-util,https://github.com/BurntSushi/winapi-util,Unlicense OR MIT,Andrew Gallant diff --git a/lib/saluki-config/Cargo.toml b/lib/saluki-config/Cargo.toml index 2b226afa..ee0736da 100644 --- a/lib/saluki-config/Cargo.toml +++ b/lib/saluki-config/Cargo.toml @@ -9,7 +9,7 @@ repository = { workspace = true } arc-swap = { workspace = true } figment = { workspace = true, features = ["env", "json", "yaml"] } http = { workspace = true } -reqwest = { workspace = true, features = ["default-tls", "json"] } +reqwest = { workspace = true, features = ["rustls-tls", "json"] } saluki-error = { workspace = true } saluki-io = { workspace = true } serde = { workspace = true } From 98aa1dcffd3bb01f5e9a76b834c0444baf669078 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Wed, 4 Dec 2024 11:50:49 -0500 Subject: [PATCH 15/18] nits --- Cargo.lock | 88 +------------------ LICENSE-3rdparty.csv | 7 -- bin/agent-data-plane/src/main.rs | 2 +- .../datadog_metrics/endpoint/endpoints.rs | 14 +-- .../src/destinations/datadog_metrics/mod.rs | 5 -- lib/saluki-config/Cargo.toml | 2 +- lib/saluki-config/src/refresher.rs | 13 ++- 7 files changed, 24 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce5fc8cc..c4eb4b40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -429,7 +429,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash 1.1.0", + "rustc-hash", "shlex", "syn 2.0.87", "which", @@ -977,7 +977,7 @@ dependencies = [ "lazy_static", "mintex", "parking_lot", - "rustc-hash 1.1.0", + "rustc-hash", "serde", "serde_json", "thousands", @@ -1575,7 +1575,6 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots", ] [[package]] @@ -2002,7 +2001,7 @@ dependencies = [ "prost 0.11.9", "rand", "rmp-serde", - "rustc-hash 1.1.0", + "rustc-hash", "serde", "serde_json", "serde_tuple", @@ -2994,54 +2993,6 @@ dependencies = [ "parking_lot", ] -[[package]] -name = "quinn" -version = "0.11.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b22d8e7369034b9a7132bc2008cac12f2013c8132b45e0554e6e20e2617f2156" -dependencies = [ - "bytes", - "pin-project-lite", - "quinn-proto", - "quinn-udp", - "rustc-hash 2.1.0", - "rustls", - "socket2", - "thiserror", - "tokio", - "tracing", -] - -[[package]] -name = "quinn-proto" -version = "0.11.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba92fb39ec7ad06ca2582c0ca834dfeadcaf06ddfc8e635c80aa7e1c05315fdd" -dependencies = [ - "bytes", - "rand", - "ring", - "rustc-hash 2.1.0", - "rustls", - "slab", - "thiserror", - "tinyvec", - "tracing", -] - -[[package]] -name = "quinn-udp" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bffec3605b73c6f1754535084a85229fa8a30f86014e6c81aeec4abb68b0285" -dependencies = [ - "libc", - "once_cell", - "socket2", - "tracing", - "windows-sys 0.52.0", -] - [[package]] name = "quote" version = "1.0.37" @@ -3212,8 +3163,8 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "quinn", "rustls", + "rustls-native-certs 0.8.0", "rustls-pemfile", "rustls-pki-types", "serde", @@ -3228,7 +3179,6 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", "windows-registry", ] @@ -3281,12 +3231,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" -[[package]] -name = "rustc-hash" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7fb8039b3032c191086b10f11f319a6e99e1e82889c5cc6046f515c9db1d497" - [[package]] name = "rustix" version = "0.38.39" @@ -4266,21 +4210,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "tinyvec" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "445e881f4f6d382d5f27c034e25eb92edd7c784ceab92a0937db7f2e9471b938" -dependencies = [ - "tinyvec_macros", -] - -[[package]] -name = "tinyvec_macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" - [[package]] name = "tokio" version = "1.41.1" @@ -4889,15 +4818,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki-roots" -version = "0.26.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd7c23921eeb1713a4e851530e9b9756e4fb0e89978582942612524cf09f01cd" -dependencies = [ - "rustls-pki-types", -] - [[package]] name = "which" version = "4.4.2" diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index daa510bb..93c6b6d4 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -211,9 +211,6 @@ protobuf,https://github.com/stepancheg/rust-protobuf,MIT,Stepan Koltsov quanta,https://github.com/metrics-rs/quanta,MIT,Toby Lawrence quick_cache,https://github.com/arthurprs/quick-cache,MIT,Arthur Silva -quinn,https://github.com/quinn-rs/quinn,MIT OR Apache-2.0,The quinn Authors -quinn-proto,https://github.com/quinn-rs/quinn,MIT OR Apache-2.0,The quinn-proto Authors -quinn-udp,https://github.com/quinn-rs/quinn,MIT OR Apache-2.0,The quinn-udp Authors quote,https://github.com/dtolnay/quote,MIT OR Apache-2.0,David Tolnay rand,https://github.com/rust-random/rand,MIT OR Apache-2.0,"The Rand Project Developers, The Rust Project Developers" rand_chacha,https://github.com/rust-random/rand,MIT OR Apache-2.0,"The Rand Project Developers, The Rust Project Developers, The CryptoCorrosion Contributors" @@ -233,7 +230,6 @@ rmp,https://github.com/3Hren/msgpack-rust,MIT,Evgeny Safronov rustc-demangle,https://github.com/rust-lang/rustc-demangle,MIT OR Apache-2.0,Alex Crichton rustc-hash,https://github.com/rust-lang-nursery/rustc-hash,Apache-2.0 OR MIT,The Rust Project Developers -rustc-hash,https://github.com/rust-lang/rustc-hash,Apache-2.0 OR MIT,The Rust Project Developers rustix,https://github.com/bytecodealliance/rustix,Apache-2.0 WITH LLVM-exception OR Apache-2.0 OR MIT,"Dan Gohman , Jakub Konka " rustls,https://github.com/rustls/rustls,Apache-2.0 OR ISC OR MIT,The rustls Authors rustls-native-certs,https://github.com/rustls/rustls-native-certs,Apache-2.0 OR ISC OR MIT,The rustls-native-certs Authors @@ -286,8 +282,6 @@ thread_local,https://github.com/Amanieu/thread_local-rs,MIT OR Apache-2.0,Amanie time,https://github.com/time-rs/time,MIT OR Apache-2.0,"Jacob Pratt , Time contributors" tinystr,https://github.com/unicode-org/icu4x,Unicode-3.0,The ICU4X Project Developers tinytemplate,https://github.com/bheisler/TinyTemplate,Apache-2.0 OR MIT,Brook Heisler -tinyvec,https://github.com/Lokathor/tinyvec,Zlib OR Apache-2.0 OR MIT,Lokathor -tinyvec_macros,https://github.com/Soveu/tinyvec_macros,MIT OR Apache-2.0 OR Zlib,Soveu tokio,https://github.com/tokio-rs/tokio,MIT,Tokio Contributors tokio-io-timeout,https://github.com/sfackler/tokio-io-timeout,MIT OR Apache-2.0,Steven Fackler tokio-rustls,https://github.com/rustls/tokio-rustls,MIT OR Apache-2.0,The tokio-rustls Authors @@ -328,7 +322,6 @@ wasm-bindgen-macro,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/m wasm-bindgen-macro-support,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/macro-support,MIT OR Apache-2.0,The wasm-bindgen Developers wasm-bindgen-shared,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/shared,MIT OR Apache-2.0,The wasm-bindgen Developers web-sys,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/web-sys,MIT OR Apache-2.0,The wasm-bindgen Developers -webpki-roots,https://github.com/rustls/webpki-roots,MPL-2.0,The webpki-roots Authors which,https://github.com/harryfei/which-rs,MIT,Harry Fei winapi,https://github.com/retep998/winapi-rs,MIT OR Apache-2.0,Peter Atashian winapi-util,https://github.com/BurntSushi/winapi-util,Unlicense OR MIT,Andrew Gallant diff --git a/bin/agent-data-plane/src/main.rs b/bin/agent-data-plane/src/main.rs index 08775b5a..571486fe 100644 --- a/bin/agent-data-plane/src/main.rs +++ b/bin/agent-data-plane/src/main.rs @@ -182,7 +182,7 @@ fn create_topology( let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Metrics destination.")?; - dd_metrics_config.add_refreshable_configuration(refreshable_configuration.clone()); + dd_metrics_config.add_refreshable_configuration(refreshable_configuration); let events_service_checks_config = DatadogEventsServiceChecksConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Events/Service Checks destination.")?; diff --git a/lib/saluki-components/src/destinations/datadog_metrics/endpoint/endpoints.rs b/lib/saluki-components/src/destinations/datadog_metrics/endpoint/endpoints.rs index 6632ffea..0839b74c 100644 --- a/lib/saluki-components/src/destinations/datadog_metrics/endpoint/endpoints.rs +++ b/lib/saluki-components/src/destinations/datadog_metrics/endpoint/endpoints.rs @@ -10,7 +10,7 @@ use saluki_metadata; use serde::Deserialize; use serde_with::{serde_as, DisplayFromStr, OneOrMany, PickFirst}; use snafu::{ResultExt, Snafu}; -use tracing::{debug, error}; +use tracing::debug; use url::Url; use crate::destinations::datadog_metrics::DEFAULT_SITE; @@ -135,20 +135,20 @@ impl ResolvedEndpoint { } /// Returns the API key associated with the endpoint. + /// + /// If a refreshable configuration has been configured, the API key will be queried from the + /// configuration and stored if it has been updated since the last time `api_key` was called. pub fn api_key(&mut self) -> &str { if let Some(config) = &self.config { match config.try_get_typed::("api_key") { Ok(Some(api_key)) => { if !api_key.is_empty() && self.api_key != api_key { - debug!("Refreshing api key."); + debug!(endpoint = %self.endpoint, "Refreshed API key."); self.api_key = api_key; } } - Ok(None) => { - debug!("Failed to retrieve api key from remote source. Falling back to last known api key."); - } - Err(_) => { - error!("Failed to retrieve api key from remote source. Falling back to last known api key."); + Ok(None) | Err(_) => { + debug!("Failed to retrieve API key from remote source (missing or wrong type). Continuing with last known valid API key."); } } } diff --git a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs index bd863083..0f28d651 100644 --- a/lib/saluki-components/src/destinations/datadog_metrics/mod.rs +++ b/lib/saluki-components/src/destinations/datadog_metrics/mod.rs @@ -220,14 +220,11 @@ impl DestinationBuilder for DatadogMetricsConfiguration { let series_request_builder = RequestBuilder::new(MetricsEndpoint::Series, rb_buffer_pool.clone()).await?; let sketches_request_builder = RequestBuilder::new(MetricsEndpoint::Sketches, rb_buffer_pool).await?; - let refresher = self.config_refresher.clone(); - Ok(Box::new(DatadogMetrics { service, series_request_builder, sketches_request_builder, endpoints, - refresher, })) } } @@ -271,7 +268,6 @@ where series_request_builder: RequestBuilder, sketches_request_builder: RequestBuilder, endpoints: Vec, - refresher: Option, } #[allow(unused)] @@ -289,7 +285,6 @@ where mut sketches_request_builder, service, endpoints, - refresher, } = *self; let mut health = context.take_health_handle(); diff --git a/lib/saluki-config/Cargo.toml b/lib/saluki-config/Cargo.toml index ee0736da..406515a8 100644 --- a/lib/saluki-config/Cargo.toml +++ b/lib/saluki-config/Cargo.toml @@ -9,7 +9,7 @@ repository = { workspace = true } arc-swap = { workspace = true } figment = { workspace = true, features = ["env", "json", "yaml"] } http = { workspace = true } -reqwest = { workspace = true, features = ["rustls-tls", "json"] } +reqwest = { workspace = true, features = ["rustls-tls-native-roots-no-provider", "json"] } saluki-error = { workspace = true } saluki-io = { workspace = true } serde = { workspace = true } diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs index 0fd61bdd..583d3b30 100644 --- a/lib/saluki-config/src/refresher.rs +++ b/lib/saluki-config/src/refresher.rs @@ -26,7 +26,10 @@ pub struct RefresherConfiguration { /// The amount of time betweeen each request in seconds. /// /// Defaults to 15 seconds. - #[serde(default = "default_refresh_interval_seconds")] + #[serde( + rename = "agent_config_refresh_internal_seconds", + default = "default_refresh_interval_seconds" + )] refresh_interval_seconds: u64, /// The IPC host used by the Datadog Agent. @@ -68,7 +71,13 @@ pub struct RefreshableConfiguration { } impl RefresherConfiguration { - /// Creates a new `RefresherConfiguration` from the given configuration. + /// Builds a `RefreshableConfiguration`, spawning a background task to periodically pull + /// configuration data and update the configuration. + /// + /// # Errors + /// + /// If the authentication token be read from the configured authentication token file + /// path, an error will be returned. pub fn from_configuration(config: &GenericConfiguration) -> Result { Ok(config.as_typed()?) } From ffffbfa6273e2f9c33209068156dfbfdc4213b9d Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Wed, 4 Dec 2024 11:58:32 -0500 Subject: [PATCH 16/18] doc --- lib/saluki-config/src/refresher.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs index 583d3b30..d05103ee 100644 --- a/lib/saluki-config/src/refresher.rs +++ b/lib/saluki-config/src/refresher.rs @@ -71,6 +71,11 @@ pub struct RefreshableConfiguration { } impl RefresherConfiguration { + /// Creates a new `RefresherConfiguration` from the given configuration. + pub fn from_configuration(config: &GenericConfiguration) -> Result { + Ok(config.as_typed()?) + } + /// Builds a `RefreshableConfiguration`, spawning a background task to periodically pull /// configuration data and update the configuration. /// @@ -78,11 +83,6 @@ impl RefresherConfiguration { /// /// If the authentication token be read from the configured authentication token file /// path, an error will be returned. - pub fn from_configuration(config: &GenericConfiguration) -> Result { - Ok(config.as_typed()?) - } - - /// Create `RefreshableConfiguration` from `RefresherConfiguration`. pub fn build(&self) -> Result { let raw_bearer_token = std::fs::read_to_string(&self.auth_token_file_path)?; let endpoint = format!("https://{}:{}/config/v1", self.agent_ipc_host, self.agent_ipc_port); From 2ddd0a8a3b35c0ed10e2dc8f8d67e9756e0b8820 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Wed, 4 Dec 2024 12:26:12 -0500 Subject: [PATCH 17/18] remove default agent ipc port --- bin/agent-data-plane/src/main.rs | 17 +++++++++++++---- lib/saluki-config/src/refresher.rs | 9 +-------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/bin/agent-data-plane/src/main.rs b/bin/agent-data-plane/src/main.rs index 571486fe..41379e5d 100644 --- a/bin/agent-data-plane/src/main.rs +++ b/bin/agent-data-plane/src/main.rs @@ -177,12 +177,21 @@ fn create_topology( .with_transform_builder(origin_enrichment_config); let internal_metrics_remap_config = AgentTelemetryRemapperConfiguration::new(); - let refresher_configuration = RefresherConfiguration::from_configuration(configuration)?; - let refreshable_configuration: RefreshableConfiguration = refresher_configuration.build()?; - let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Metrics destination.")?; - dd_metrics_config.add_refreshable_configuration(refreshable_configuration); + + match RefresherConfiguration::from_configuration(configuration) { + Ok(refresher_configuration) => { + let refreshable_configuration: RefreshableConfiguration = refresher_configuration.build()?; + dd_metrics_config.add_refreshable_configuration(refreshable_configuration); + } + Err(_) => { + info!( + "Dynamic configuration refreshing will be unable due to failure to configure refresher configuration." + ) + } + } + let events_service_checks_config = DatadogEventsServiceChecksConfiguration::from_configuration(configuration) .error_context("Failed to configure Datadog Events/Service Checks destination.")?; diff --git a/lib/saluki-config/src/refresher.rs b/lib/saluki-config/src/refresher.rs index d05103ee..042e3471 100644 --- a/lib/saluki-config/src/refresher.rs +++ b/lib/saluki-config/src/refresher.rs @@ -27,7 +27,7 @@ pub struct RefresherConfiguration { /// /// Defaults to 15 seconds. #[serde( - rename = "agent_config_refresh_internal_seconds", + rename = "agent_ipc_config_refresh_interval", default = "default_refresh_interval_seconds" )] refresh_interval_seconds: u64, @@ -39,9 +39,6 @@ pub struct RefresherConfiguration { agent_ipc_host: String, /// The IPC port used by the Datadog Agent. - /// - /// Defaults to `0`. - #[serde(default = "default_agent_ipc_port")] agent_ipc_port: u64, } @@ -57,10 +54,6 @@ fn default_agent_ipc_host() -> String { DEFAULT_AGENT_IPC_HOST.to_owned() } -fn default_agent_ipc_port() -> u64 { - 0 -} - /// A configuration whose values are refreshed from a remote source at runtime. #[derive(Clone, Debug, Default)] pub struct RefreshableConfiguration { From f2694a318d365d97f1862ab98640368586a4a520 Mon Sep 17 00:00:00 2001 From: raymond zhao Date: Wed, 4 Dec 2024 12:29:58 -0500 Subject: [PATCH 18/18] spelling --- bin/agent-data-plane/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/agent-data-plane/src/main.rs b/bin/agent-data-plane/src/main.rs index 41379e5d..ea5322e7 100644 --- a/bin/agent-data-plane/src/main.rs +++ b/bin/agent-data-plane/src/main.rs @@ -187,7 +187,7 @@ fn create_topology( } Err(_) => { info!( - "Dynamic configuration refreshing will be unable due to failure to configure refresher configuration." + "Dynamic configuration refreshing will be unavailable due to failure to configure refresher configuration." ) } }