Skip to content

Commit

Permalink
Log errors if metric buffer is full and drop calls rather than blocki…
Browse files Browse the repository at this point in the history
…ng (#611)
  • Loading branch information
Sushisource authored Sep 29, 2023
1 parent e9a6f06 commit 617612a
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions core/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,19 @@ where
I: BufferInstrumentRef,
{
calls_rx: crossbeam::channel::Receiver<MetricEvent<I>>,
calls_tx: crossbeam::channel::Sender<MetricEvent<I>>,
calls_tx: LogErrOnFullSender<MetricEvent<I>>,
}
#[derive(Clone, Debug)]
struct LogErrOnFullSender<I>(crossbeam::channel::Sender<I>);
impl<I> LogErrOnFullSender<I> {
fn send(&self, v: I) {
if let Err(crossbeam::channel::TrySendError::Full(_)) = self.0.try_send(v) {
error!(
"Core's metrics buffer is full! Dropping call to record metrics. \
Make sure you drain the metric buffer often!"
);
}
}
}

impl<I> MetricsCallBuffer<I>
Expand All @@ -643,11 +655,14 @@ where
/// Create a new buffer with the given capacity
pub fn new(buffer_size: usize) -> Self {
let (calls_tx, calls_rx) = crossbeam::channel::bounded(buffer_size);
MetricsCallBuffer { calls_rx, calls_tx }
MetricsCallBuffer {
calls_rx,
calls_tx: LogErrOnFullSender(calls_tx),
}
}
fn new_instrument(&self, params: MetricParameters, kind: MetricKind) -> BufferInstrument<I> {
let hole = LazyBufferInstrument::hole();
let _ = self.calls_tx.send(MetricEvent::Create {
self.calls_tx.send(MetricEvent::Create {
params,
kind,
populate_into: hole.clone(),
Expand All @@ -666,7 +681,7 @@ where
{
fn new_attributes(&self, opts: NewAttributes) -> MetricAttributes {
let ba = BufferAttributes::hole();
let _ = self.calls_tx.send(MetricEvent::CreateAttributes {
self.calls_tx.send(MetricEvent::CreateAttributes {
populate_into: ba.clone(),
append_from: None,
attributes: opts.attributes,
Expand All @@ -681,7 +696,7 @@ where
) -> MetricAttributes {
if let MetricAttributes::Buffer(ol) = existing {
let ba = BufferAttributes::hole();
let _ = self.calls_tx.send(MetricEvent::CreateAttributes {
self.calls_tx.send(MetricEvent::CreateAttributes {
populate_into: ba.clone(),
append_from: Some(ol),
attributes: attribs.attributes,
Expand Down Expand Up @@ -717,7 +732,7 @@ where
struct BufferInstrument<I: BufferInstrumentRef> {
kind: MetricKind,
instrument_ref: LazyBufferInstrument<I>,
tx: crossbeam::channel::Sender<MetricEvent<I>>,
tx: LogErrOnFullSender<MetricEvent<I>>,
}
impl<I> BufferInstrument<I>
where
Expand All @@ -728,7 +743,7 @@ where
MetricAttributes::Buffer(l) => l.clone(),
_ => panic!("MetricsCallBuffer only works with MetricAttributes::Lang"),
};
let _ = self.tx.send(MetricEvent::Update {
self.tx.send(MetricEvent::Update {
instrument: self.instrument_ref.clone(),
update: match self.kind {
MetricKind::Counter => MetricUpdateVal::Delta(value),
Expand Down

0 comments on commit 617612a

Please sign in to comment.