Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove mut self reference from LogExporter::export() method. #2380

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion opentelemetry-appender-tracing/benches/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct NoopExporter {

#[async_trait]
impl LogExporter for NoopExporter {
async fn export(&mut self, _: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, _: LogBatch<'_>) -> LogResult<()> {
LogResult::Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@

#[async_trait]
impl LogExporter for ReentrantLogExporter {
async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> {

Check warning on line 250 in opentelemetry-appender-tracing/src/layer.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-appender-tracing/src/layer.rs#L250

Added line #L250 was not covered by tests
// This will cause a deadlock as the export itself creates a log
// while still within the lock of the SimpleLogProcessor.
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

#[async_trait]
impl LogExporter for OtlpHttpClient {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {

Check warning on line 12 in opentelemetry-otlp/src/exporter/http/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/logs.rs#L12

Added line #L12 was not covered by tests
let client = self
.client
.lock()
Expand Down
11 changes: 7 additions & 4 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;
use tokio::sync::Mutex;

pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
Expand All @@ -20,7 +21,7 @@

struct ClientInner {
client: LogsServiceClient<Channel>,
interceptor: BoxInterceptor,
interceptor: Mutex<BoxInterceptor>,
}

impl fmt::Debug for TonicLogsClient {
Expand All @@ -45,7 +46,7 @@
TonicLogsClient {
inner: Some(ClientInner {
client,
interceptor,
interceptor: Mutex::new(interceptor),

Check warning on line 49 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L49

Added line #L49 was not covered by tests
}),
resource: Default::default(),
}
Expand All @@ -54,11 +55,13 @@

#[async_trait]
impl LogExporter for TonicLogsClient {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
let (mut client, metadata, extensions) = match &mut self.inner {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
let (mut client, metadata, extensions) = match &self.inner {

Check warning on line 59 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L58-L59

Added lines #L58 - L59 were not covered by tests
Some(inner) => {
let (m, e, _) = inner
.interceptor
.lock()
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here

Check warning on line 64 in opentelemetry-otlp/src/exporter/tonic/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/logs.rs#L63-L64

Added lines #L63 - L64 were not covered by tests
.call(Request::new(()))
.map_err(|e| LogError::Other(Box::new(e)))?
.into_parts();
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@

#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {

Check warning on line 127 in opentelemetry-otlp/src/logs.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/logs.rs#L127

Added line #L127 was not covered by tests
self.client.export(batch).await
}

Expand Down
7 changes: 7 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
- `ResourceDetector.detect()` no longer supports timeout option.
- `opentelemetry::global::shutdown_tracer_provider()` Removed from the API, should now use `tracer_provider.shutdown()` see [#2369](https://github.com/open-telemetry/opentelemetry-rust/pull/2369) for a migration example. "Tracer provider" is cheaply cloneable, so users are encouraged to set a clone of it as the global (ex: `global::set_tracer_provider(provider.clone()))`, so that instrumentations and other components can obtain tracers from `global::tracer()`. The tracer_provider must be kept around to call shutdown on it at the end of application (ex: `tracer_provider.shutdown()`)

- *Breaking* The LogExporter::export() method no longer requires a mutable reference to self.:
Before:
async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()>
After:
async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()>
Custom exporters will need to internally synchronize any mutable state, if applicable.

## 0.27.1

Released 2024-Nov-27
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/export/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub trait LogExporter: Send + Sync + Debug {
/// A `LogResult<()>`, which is a result type indicating either a successful export (with
/// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed.
///
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()>;
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()>;
/// Shuts down the exporter.
fn shutdown(&mut self) {}
#[cfg(feature = "spec_unstable_logs_enabled")]
Expand Down
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
.exporter
.lock()
.map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into()))
.and_then(|mut exporter| {
.and_then(|exporter| {
let log_tuple = &[(record as &LogRecord, instrumentation)];
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
});
Expand Down Expand Up @@ -586,7 +586,7 @@

#[async_trait]
impl LogExporter for MockLogExporter {
async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> {

Check warning on line 589 in opentelemetry-sdk/src/logs/log_processor.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/logs/log_processor.rs#L589

Added line #L589 was not covered by tests
Ok(())
}

Expand Down Expand Up @@ -1093,7 +1093,7 @@

#[async_trait::async_trait]
impl LogExporter for LogExporterThatRequiresTokio {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
// Simulate minimal dependency on tokio by sleeping asynchronously for a short duration
tokio::time::sleep(Duration::from_millis(50)).await;

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl InMemoryLogExporter {

#[async_trait]
impl LogExporter for InMemoryLogExporter {
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
let mut logs_guard = self.logs.lock().map_err(LogError::from)?;
for (log_record, instrumentation) in batch.iter() {
let owned_log = OwnedLogData {
Expand Down
14 changes: 9 additions & 5 deletions opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,21 @@
use opentelemetry_sdk::logs::LogResult;
use opentelemetry_sdk::Resource;
use std::sync::atomic;
use std::sync::atomic::Ordering;

/// An OpenTelemetry exporter that writes Logs to stdout on export.
pub struct LogExporter {
resource: Resource,
is_shutdown: atomic::AtomicBool,
resource_emitted: bool,
resource_emitted: atomic::AtomicBool,
}

impl Default for LogExporter {
fn default() -> Self {
LogExporter {
resource: Resource::default(),
is_shutdown: atomic::AtomicBool::new(false),
resource_emitted: false,
resource_emitted: atomic::AtomicBool::new(false),

Check warning on line 22 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L22

Added line #L22 was not covered by tests
}
}
}
Expand All @@ -32,15 +33,18 @@
#[async_trait]
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
/// Export spans to stdout
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {

Check warning on line 36 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L36

Added line #L36 was not covered by tests
if self.is_shutdown.load(atomic::Ordering::SeqCst) {
return Err("exporter is shut down".into());
} else {
println!("Logs");
if self.resource_emitted {
if self
.resource_emitted
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{

Check warning on line 45 in opentelemetry-stdout/src/logs/exporter.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-stdout/src/logs/exporter.rs#L41-L45

Added lines #L41 - L45 were not covered by tests
print_logs(batch);
} else {
self.resource_emitted = true;
println!("Resource");
if let Some(schema_url) = self.resource.schema_url() {
println!("\t Resource SchemaUrl: {:?}", schema_url);
Expand Down
2 changes: 2 additions & 0 deletions stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ tracing-subscriber = { workspace = true, features = ["registry", "std"] }
num-format = "0.4.4"
sysinfo = { version = "0.32", optional = true }
libc = "=0.2.164" # https://github.com/GuillaumeGomez/sysinfo/issues/1392
async-trait = "0.1.51"
futures-executor = { workspace = true }

[features]
stats = ["sysinfo"]
32 changes: 23 additions & 9 deletions stress/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,38 @@
~31 M/sec

Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs,
~44 M /sec
~40 M /sec
*/

use opentelemetry::InstrumentationScope;
use opentelemetry_appender_tracing::layer;
use opentelemetry_sdk::logs::{LogProcessor, LoggerProvider};
use opentelemetry_sdk::export::logs::{LogBatch, LogExporter};
use opentelemetry_sdk::logs::{LogProcessor, LogRecord, LogResult, LoggerProvider};
use tracing::error;
use tracing_subscriber::prelude::*;

mod throughput;
use async_trait::async_trait;

#[derive(Debug, Clone)]
struct NoopExporter;

#[async_trait]
impl LogExporter for NoopExporter {
async fn export(&self, _: LogBatch<'_>) -> LogResult<()> {
LogResult::Ok(())
}
}

#[derive(Debug)]
pub struct NoOpLogProcessor;
pub struct NoOpLogProcessor {
exporter: NoopExporter,
}

impl LogProcessor for NoOpLogProcessor {
fn emit(
&self,
_record: &mut opentelemetry_sdk::logs::LogRecord,
_scope: &InstrumentationScope,
) {
fn emit(&self, record: &mut opentelemetry_sdk::logs::LogRecord, scope: &InstrumentationScope) {
let log_tuple = &[(record as &LogRecord, scope)];
let _ = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple)));
}

fn force_flush(&self) -> opentelemetry_sdk::logs::LogResult<()> {
Expand All @@ -40,7 +52,9 @@ impl LogProcessor for NoOpLogProcessor {
fn main() {
// LoggerProvider with a no-op processor.
let provider: LoggerProvider = LoggerProvider::builder()
.with_log_processor(NoOpLogProcessor {})
.with_log_processor(NoOpLogProcessor {
exporter: NoopExporter {},
})
.build();

// Use the OpenTelemetryTracingBridge to test the throughput of the appender-tracing.
Expand Down
Loading