Skip to content

Commit

Permalink
feat(bottlecap): add capture lambda payload (#454)
Browse files Browse the repository at this point in the history
* add `tag_span_from_value`

* add `capture_lambda_payload` config

* add unit testing for `tag_span_from_value`

* update listener `end_invocation_handler`

parsing should not be handled here

* add capture lambda payload feature

also parse body properly, and handle `statusCode`
  • Loading branch information
duncanista authored Nov 14, 2024
1 parent f639aae commit 56306a5
Show file tree
Hide file tree
Showing 4 changed files with 300 additions and 27 deletions.
5 changes: 4 additions & 1 deletion bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub struct Config {
pub serverless_flush_strategy: FlushStrategy,
pub enhanced_metrics: bool,
pub https_proxy: Option<String>,
pub capture_lambda_payload: bool,
pub capture_lambda_payload_max_depth: u32,
// Trace Propagation
#[serde(deserialize_with = "deserialize_trace_propagation_style")]
pub trace_propagation_style: Vec<TracePropagationStyle>,
Expand Down Expand Up @@ -93,8 +95,9 @@ impl Default for Config {
logs_config_processing_rules: None,
// Metrics
enhanced_metrics: true,
// Failover
https_proxy: None,
capture_lambda_payload: false,
capture_lambda_payload_max_depth: 10,
// Trace Propagation
trace_propagation_style: vec![
TracePropagationStyle::Datadog,
Expand Down
243 changes: 243 additions & 0 deletions bottlecap/src/lifecycle/invocation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,256 @@
use base64::{engine::general_purpose, DecodeError, Engine};
use datadog_trace_protobuf::pb::Span;
use serde_json::Value;
use tracing::debug;

pub mod context;
pub mod processor;
pub mod span_inferrer;
pub mod triggers;

const MAX_TAG_CHARS: usize = 4096;
const REDACTABLE_KEYS: [&str; 8] = [
"password",
"passwd",
"pwd",
"secret",
"token",
"authorization",
"x-authorization",
"api_key",
];

pub fn base64_to_string(base64_string: &str) -> Result<String, DecodeError> {
match general_purpose::STANDARD.decode(base64_string) {
Ok(bytes) => Ok(String::from_utf8_lossy(&bytes).to_string()),
Err(e) => Err(e),
}
}

pub fn tag_span_from_value(span: &mut Span, key: &str, value: &Value, depth: u32, max_depth: u32) {
// Null scenario
if value.is_null() {
span.meta.insert(key.to_string(), value.to_string());
return;
}

// Check max depth
if depth >= max_depth {
match serde_json::to_string(value) {
Ok(s) => {
let truncated = s.chars().take(MAX_TAG_CHARS).collect::<String>();
span.meta.insert(key.to_string(), truncated);
return;
}
Err(e) => {
debug!("Unable to serialize value for tagging {e}");
return;
}
}
}

let new_depth = depth + 1;
match value {
// Handle string case
Value::String(s) => {
if let Ok(p) = serde_json::from_str::<Value>(s) {
tag_span_from_value(span, key, &p, new_depth, max_depth);
} else {
let truncated = s.chars().take(MAX_TAG_CHARS).collect::<String>();
span.meta
.insert(key.to_string(), redact_value(key, truncated));
}
}

// Handle number case
Value::Number(n) => {
span.meta.insert(key.to_string(), n.to_string());
}

// Handle boolean case
Value::Bool(b) => {
span.meta.insert(key.to_string(), b.to_string());
}

// Handle object case
Value::Object(map) => {
for (k, v) in map {
let new_key = format!("{key}.{k}");
tag_span_from_value(span, &new_key, v, new_depth, max_depth);
}
}

Value::Array(a) => {
if a.is_empty() {
span.meta.insert(key.to_string(), "[]".to_string());
return;
}

for (i, v) in a.iter().enumerate() {
let new_key = format!("{key}.{i}");
tag_span_from_value(span, &new_key, v, new_depth, max_depth);
}
}
Value::Null => {}
}
}

fn redact_value(key: &str, value: String) -> String {
let split_key = key.split('.').last().unwrap_or_default();
if REDACTABLE_KEYS.contains(&split_key) {
String::from("redacted")
} else {
value
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use serde_json::json;

use super::*;

#[test]
fn test_simple_tagging() {
let mut span = Span::default();
let value = json!({ "request": { "simple": "value" } });

tag_span_from_value(&mut span, "payload", &value, 0, 10);

let expected = HashMap::from([("payload.request.simple".to_string(), "value".to_string())]);

assert_eq!(span.meta, expected);
}

#[test]
fn test_complex_object() {
let mut span = Span::default();
let value = json!({
"request": {
"simple": "value",
"obj": {
"arr": ["a", "b", "c"],
"boolean": true,
"nested": {
"value": "nested_value"
}
},
"empty": null,
"number": 1,
"boolean": true,
}
});

tag_span_from_value(&mut span, "payload", &value, 0, 10);

let expected = HashMap::from([
("payload.request.simple".to_string(), "value".to_string()),
("payload.request.obj.arr.0".to_string(), "a".to_string()),
("payload.request.obj.arr.1".to_string(), "b".to_string()),
("payload.request.obj.arr.2".to_string(), "c".to_string()),
(
"payload.request.obj.boolean".to_string(),
"true".to_string(),
),
(
"payload.request.obj.nested.value".to_string(),
"nested_value".to_string(),
),
("payload.request.empty".to_string(), "null".to_string()),
("payload.request.number".to_string(), "1".to_string()),
("payload.request.boolean".to_string(), "true".to_string()),
]);

assert_eq!(span.meta, expected);
}

#[test]
fn test_array_of_objects() {
let mut span = Span::default();
let value = json!({
"request": [
{ "simple": "value" },
{ "simple": "value" },
{ "simple": "value" },
]
});

tag_span_from_value(&mut span, "payload", &value, 0, 10);

let expected = HashMap::from([
("payload.request.0.simple".to_string(), "value".to_string()),
("payload.request.1.simple".to_string(), "value".to_string()),
("payload.request.2.simple".to_string(), "value".to_string()),
]);

assert_eq!(span.meta, expected);
}

#[test]
fn test_reach_max_depth() {
let mut span = Span::default();
let value = json!({
"hello": "world",
"empty": null,
"level1": {
"obj": {
"level3": 3
},
"arr": [null, true, "great", { "l3": "v3" }],
"boolean": true,
"number": 2,
"empty": null,
"empty_obj": {},
"empty_arr": []
},
"arr": [{ "a": "b" }, { "c": "d" }]
});

tag_span_from_value(&mut span, "payload", &value, 0, 2);

let expected = HashMap::from([
("payload.hello".to_string(), "world".to_string()),
("payload.empty".to_string(), "null".to_string()),
(
"payload.level1.obj".to_string(),
"{\"level3\":3}".to_string(),
),
(
"payload.level1.arr".to_string(),
"[null,true,\"great\",{\"l3\":\"v3\"}]".to_string(),
),
("payload.level1.boolean".to_string(), "true".to_string()),
("payload.level1.number".to_string(), "2".to_string()),
("payload.level1.empty".to_string(), "null".to_string()),
("payload.level1.empty_obj".to_string(), "{}".to_string()),
("payload.level1.empty_arr".to_string(), "[]".to_string()),
("payload.arr.0".to_string(), "{\"a\":\"b\"}".to_string()),
("payload.arr.1".to_string(), "{\"c\":\"d\"}".to_string()),
]);

assert_eq!(span.meta, expected);
}

#[test]
fn test_tag_redacts_key() {
let mut span = Span::default();
let value = json!({
"request": {
"headers": {
"authorization": "secret token",
}
}
});

tag_span_from_value(&mut span, "payload", &value, 0, 10);

let expected = HashMap::from([(
"payload.request.headers.authorization".to_string(),
"redacted".to_string(),
)]);

assert_eq!(span.meta, expected);
}
}
47 changes: 35 additions & 12 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing::debug;
use crate::{
config::{self, AwsConfig},
lifecycle::invocation::{
base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer,
base64_to_string, context::ContextBuffer, span_inferrer::SpanInferrer, tag_span_from_value,
},
metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics},
proc::{self, CPUData, NetworkData},
Expand Down Expand Up @@ -52,7 +52,7 @@ pub struct Processor {
enhanced_metrics: EnhancedMetrics,
aws_config: AwsConfig,
tracer_detected: bool,
enhanced_metrics_enabled: bool,
config: Arc<config::Config>,
}

impl Processor {
Expand Down Expand Up @@ -94,15 +94,15 @@ impl Processor {
enhanced_metrics: EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config)),
aws_config: aws_config.clone(),
tracer_detected: false,
enhanced_metrics_enabled: config.enhanced_metrics,
config: Arc::clone(&config),
}
}

/// Given a `request_id`, creates the context and adds the enhanced metric offsets to the context buffer.
///
pub fn on_invoke_event(&mut self, request_id: String) {
self.context_buffer.create_context(request_id.clone());
if self.enhanced_metrics_enabled {
if self.config.enhanced_metrics {
// Collect offsets for network and cpu metrics
let network_offset: Option<NetworkData> = proc::get_network_data().ok();
let cpu_offset: Option<CPUData> = proc::get_cpu_data().ok();
Expand Down Expand Up @@ -293,6 +293,17 @@ impl Processor {
Err(_) => json!({}),
};

// Tag the invocation span with the request payload
if self.config.capture_lambda_payload {
tag_span_from_value(
&mut self.span,
"function.request",
&payload_value,
0,
self.config.capture_lambda_payload_max_depth,
);
}

self.inferrer.infer_span(&payload_value, &self.aws_config);
self.extracted_span_context = self.extract_span_context(&headers, &payload_value);

Expand Down Expand Up @@ -344,22 +355,34 @@ impl Processor {

/// Given trace context information, set it to the current span.
///
pub fn on_invocation_end(
&mut self,
headers: HashMap<String, String>,
status_code: Option<String>,
) {
if let Some(status_code) = status_code {
pub fn on_invocation_end(&mut self, headers: HashMap<String, String>, payload: Vec<u8>) {
let payload_value = match serde_json::from_slice::<Value>(&payload) {
Ok(value) => value,
Err(_) => json!({}),
};

// Tag the invocation span with the request payload
if self.config.capture_lambda_payload {
tag_span_from_value(
&mut self.span,
"function.response",
&payload_value,
0,
self.config.capture_lambda_payload_max_depth,
);
}

if let Some(status_code) = payload_value.get("statusCode").and_then(Value::as_str) {
self.span
.meta
.insert("http.status_code".to_string(), status_code.clone());
.insert("http.status_code".to_string(), status_code.to_string());

if status_code.len() == 3 && status_code.starts_with('5') {
self.span.error = 1;
}

// If we have an inferred span, set the status code to it
self.inferrer.set_status_code(status_code);
self.inferrer.set_status_code(status_code.to_string());
}

self.update_span_context_from_headers(&headers);
Expand Down
Loading

0 comments on commit 56306a5

Please sign in to comment.