Skip to content

Commit

Permalink
feat: honor serverless_logs (#475)
Browse files Browse the repository at this point in the history
* feat: honor serverless_logs

* fmt

---------

Co-authored-by: jordan gonzález <[email protected]>
  • Loading branch information
astuyve and duncanista authored Nov 25, 2024
1 parent 891936a commit 3b19fe7
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
2 changes: 2 additions & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub struct Config {
pub capture_lambda_payload_max_depth: u32,
#[serde(deserialize_with = "deserialize_service_mapping")]
pub service_mapping: HashMap<String, String>,
pub serverless_logs_enabled: bool,
// Trace Propagation
#[serde(deserialize_with = "deserialize_trace_propagation_style")]
pub trace_propagation_style: Vec<TracePropagationStyle>,
Expand Down Expand Up @@ -115,6 +116,7 @@ impl Default for Config {
capture_lambda_payload: false,
capture_lambda_payload_max_depth: 10,
service_mapping: HashMap::new(),
serverless_logs_enabled: true,
// Trace Propagation
trace_propagation_style: vec![
TracePropagationStyle::Datadog,
Expand Down
44 changes: 42 additions & 2 deletions bottlecap/src/logs/lambda/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct LambdaProcessor {
ready_logs: Vec<String>,
// Main event bus
event_bus: Sender<Event>,
// Logs enabled
logs_enabled: bool,
}

const OOM_ERRORS: [&str; 7] = [
Expand Down Expand Up @@ -63,12 +65,14 @@ impl LambdaProcessor {
let function_arn = tags_provider.get_canonical_id().unwrap_or_default();

let processing_rules = &datadog_config.logs_config_processing_rules;
let logs_enabled = datadog_config.serverless_logs_enabled;
let rules = LambdaProcessor::compile_rules(processing_rules);
LambdaProcessor {
function_arn,
service,
tags,
rules,
logs_enabled,
invocation_context: InvocationContext::new(String::new(), 0.0, 0.0, 0, None),
orphan_logs: Vec::new(),
ready_logs: Vec::new(),
Expand Down Expand Up @@ -250,8 +254,8 @@ impl LambdaProcessor {

pub async fn process(&mut self, event: TelemetryEvent, aggregator: &Arc<Mutex<Aggregator>>) {
if let Ok(mut log) = self.make_log(event).await {
let should_send_log =
LambdaProcessor::apply_rules(&self.rules, &mut log.message.message);
let should_send_log = self.logs_enabled
&& LambdaProcessor::apply_rules(&self.rules, &mut log.message.message);
if should_send_log {
if let Ok(serialized_log) = serde_json::to_string(&log) {
// explicitly drop log so we don't accidentally re-use it and push
Expand Down Expand Up @@ -669,6 +673,42 @@ mod tests {
assert_eq!(batch, serialized_log.as_bytes());
}

#[tokio::test]
async fn test_process_logs_disabled() {
let aggregator = Arc::new(Mutex::new(Aggregator::default()));
let config = Arc::new(config::Config {
service: Some("test-service".to_string()),
tags: Some("test:tags".to_string()),
serverless_logs_enabled: false,
..config::Config::default()
});

let tags_provider = Arc::new(provider::Provider::new(
Arc::clone(&config),
LAMBDA_RUNTIME_SLUG.to_string(),
&HashMap::from([("function_arn".to_string(), "test-arn".to_string())]),
));

let (tx, _rx) = tokio::sync::mpsc::channel(2);

let mut processor =
LambdaProcessor::new(Arc::clone(&tags_provider), Arc::clone(&config), tx.clone());

let event = TelemetryEvent {
time: Utc.with_ymd_and_hms(2023, 1, 7, 3, 23, 47).unwrap(),
record: TelemetryRecord::PlatformStart {
request_id: "test-request-id".to_string(),
version: Some("test".to_string()),
},
};

processor.process(event.clone(), &aggregator).await;

let mut aggregator_lock = aggregator.lock().unwrap();
let batch = aggregator_lock.get_batch();
assert_eq!(batch.len(), 0);
}

#[tokio::test]
async fn test_process_log_with_no_request_id() {
let aggregator = Arc::new(Mutex::new(Aggregator::default()));
Expand Down

0 comments on commit 3b19fe7

Please sign in to comment.