Skip to content

Commit

Permalink
Fix MessageTooLarge error in LogsCollected broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
kalabukdima committed Sep 10, 2024
1 parent 636e978 commit 986baad
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 2 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/logs-collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "logs-collector"
version = "1.0.11"
version = "1.0.12"
edition = "2021"

[dependencies]
Expand All @@ -9,6 +9,7 @@ clap = { version = "4", features = ["derive", "env"] }
env_logger = "0.11"
futures = "0.3"
log = "0.4"
rand = "0.8.5"
serde_json = "1"
tokio = { version = "1", features = ["full"] }

Expand Down
7 changes: 7 additions & 0 deletions crates/logs-collector/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use std::time::Duration;

use futures::{Stream, StreamExt};
use rand::seq::IteratorRandom;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::RwLock;

Expand Down Expand Up @@ -130,6 +131,12 @@ where
Ok(seq_nums) => seq_nums,
Err(e) => return log::error!("Error saving logs to storage: {e:?}"),
};
// The pubsub message limit size prevents us from sending all sequence numbers
let sequence_numbers = sequence_numbers
.into_iter()
.choose_multiple(&mut rand::thread_rng(), 900)
.into_iter()
.collect();
let logs_collected = LogsCollected { sequence_numbers };
if transport_handle.logs_collected(logs_collected).is_err() {
log::error!("Error broadcasting logs collected: queue full");
Expand Down

0 comments on commit 986baad

Please sign in to comment.