From 32ffbc6e5c1d203aba8057f42f05c986f53159a9 Mon Sep 17 00:00:00 2001 From: Pavel Procopiuc Date: Mon, 2 Dec 2024 11:54:08 +0100 Subject: [PATCH] [COLIB-522]: Fix the trace propagation for engine gRPC calls --- Cargo.toml | 3 +++ src/lib.rs | 14 +++++------ src/request/grpc.rs | 58 +++++++++++++++++++++++++++++++++++++++++++++ src/request/mod.rs | 2 ++ src/request/otel.rs | 4 +++- 5 files changed, 73 insertions(+), 8 deletions(-) create mode 100644 src/request/grpc.rs diff --git a/Cargo.toml b/Cargo.toml index 51150fa..c9fa79d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ auth0 = [ "dashmap", "tracing", ] +grpc = ["tonic"] gzip = ["reqwest/gzip"] redis-tls = ["redis/tls", "redis/tokio-native-tls-comp"] tracing_opentelemetry = ["tracing_opentelemetry_0_27"] @@ -97,6 +98,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "2" tokio = { version = "1.16", features = ["macros", "rt-multi-thread", "fs"] } +tonic = { version = "0.12", default-features = false, optional = true } tracing = { version = "0.1", optional = true } uuid = { version = ">=0.7.0, <2.0.0", features = ["serde", "v4"] } chacha20poly1305 = { version = "0.10.1", features = ["std"], optional = true } @@ -130,6 +132,7 @@ tracing-opentelemetry_0_28_pkg = { package = "tracing-opentelemetry", version = flate2 = "1.0" mockito = "1.0" tokio = { version = "1.16", features = ["macros", "rt-multi-thread"] } +tonic = "0.12" [profile.release] codegen-units = 1 diff --git a/src/lib.rs b/src/lib.rs index 2c142c0..85df8df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ #![cfg_attr(docsrs, feature(doc_cfg))] -//! This crate gives an high level API to execute external HTTP requests. +//! This crate gives a high level API to execute external HTTP requests. //! //! It is supposed to give the basics building blocks for building bridges to other services //! while abstracting the low level stuff like adding custom headers and request tracing. @@ -21,12 +21,11 @@ //! * `auth0` - enable auth0 integration, allowing bridge.rs to retrieve tokens from auth0 for authentication //! * `gzip` - provides response body gzip decompression. //! * `redis-tls` - add support for connecting to redis with tls -//! * `tracing-opentelemetry` adds support for integration with opentelemetry. -//! This feature is an alias for the `tracing_opentelemetry_0_21` feature. -//! `tracing_opentelemetry_0_20` is also available as to support the 0.20 opentelemetry -//! libraries. -//! -//! We are going to support at least the last 3 versions of opentelemetry. After that we mightremove support for older otel version without it being a breaking change. +//! * `grpc` - provides the [GrpcOtelInterceptor] for adding the opentelemetry context to the gRPC requests +//! * `tracing_opentelemetry` - adds support for integration with opentelemetry. +//! This feature is an alias for the latest `tracing_opentelemetry_x_xx` feature. +//! * `tracing_opentelemetry_x_xx` (e.g. `tracing_opentelemetry_0_27`) - adds support for integration with a particular opentelemetry version. +//! We are going to support at least the last 3 versions of opentelemetry. After that we might remove support for older otel version without it being a breaking change. use errors::PrimaBridgeError; use http::{header::HeaderName, HeaderValue, Method}; @@ -36,6 +35,7 @@ use sealed::Sealed; pub use self::{ builder::BridgeBuilder, redirect::RedirectPolicy, + request::grpc::{GrpcOtelInterceptedService, GrpcOtelInterceptor}, request::{ Body, DeliverableRequest, GraphQLMultipart, GraphQLRequest, MultipartFile, MultipartFormFileField, Request, RestMultipart, RestRequest, diff --git a/src/request/grpc.rs b/src/request/grpc.rs new file mode 100644 index 0000000..bd16e22 --- /dev/null +++ b/src/request/grpc.rs @@ -0,0 +1,58 @@ +use std::str::FromStr; + +use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue}; +use tonic::service::interceptor::InterceptedService; +use tonic::service::Interceptor; +use tonic::{Request, Status}; + +use super::otel::{inject_context, Injector}; + +/// A gRPC interceptor that injects the current tracing context into the request metadata. +/// +/// Use this interceptor to make sure that DataDog traces are connected between the services when calling the gRPC +/// service on another server. +/// +/// # Example +/// +/// ``` +/// # struct QuotePreviewServiceClient { _d: std::marker::PhantomData } +/// # impl QuotePreviewServiceClient> { +/// # pub fn with_interceptor(endpoint: Channel, interceptor: GrpcOtelInterceptor) -> Self { Self { _d: std::marker::PhantomData } } +/// # } +/// use tonic::transport::{Endpoint, Channel}; +/// +/// use prima_bridge::{GrpcOtelInterceptor, GrpcOtelInterceptedService}; +/// +/// async fn make_grpc_service() -> Result>, Box> { +/// let url = "http://..."; +/// let channel = Endpoint::new(url)?.connect().await?; +/// // QuotePreviewServiceClient is a tonic-generated gRPC client from [https://github.com/primait/es-engine-schema] +/// Ok(QuotePreviewServiceClient::with_interceptor(channel, GrpcOtelInterceptor)) +/// } +/// ``` +#[derive(Clone)] +pub struct GrpcOtelInterceptor; + +/// Convenience type alias for a long type that's returned from the `with_interceptor()` function of the tonic-generated +/// gRPC client when used with [GrpcOtelInterceptor]. +pub type GrpcOtelInterceptedService = InterceptedService; + +impl Interceptor for GrpcOtelInterceptor { + fn call(&mut self, mut request: Request<()>) -> Result, Status> { + inject_context(&mut MetadataMapInjector(request.metadata_mut())); + Ok(request) + } +} + +pub struct MetadataMapInjector<'h>(&'h mut MetadataMap); + +impl Injector for MetadataMapInjector<'_> { + fn set(&mut self, key: &str, value: String) { + let key_value = MetadataKey::from_str(key) + .ok() + .and_then(|key| Some((key, MetadataValue::from_str(&value).ok()?))); + if let Some((key, value)) = key_value { + self.0.insert(key, value); + } + } +} diff --git a/src/request/mod.rs b/src/request/mod.rs index 1f6362d..38e42c9 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -17,6 +17,8 @@ use crate::{BridgeClient, BridgeImpl, Response}; mod body; mod request_type; +#[cfg(all(feature = "grpc", feature = "_any_otel_version"))] +pub mod grpc; #[cfg(feature = "_any_otel_version")] mod otel; diff --git a/src/request/otel.rs b/src/request/otel.rs index 0a1bd6d..ca4f4e8 100644 --- a/src/request/otel.rs +++ b/src/request/otel.rs @@ -49,7 +49,9 @@ mod otel_crates { use otel_crates::*; -use opentelemetry::propagation::{Injector, TextMapPropagator}; +pub use opentelemetry::propagation::Injector; + +use opentelemetry::propagation::TextMapPropagator; use opentelemetry_sdk::propagation::TraceContextPropagator; use tracing_opentelemetry::OpenTelemetrySpanExt;