diff --git a/Cargo.lock b/Cargo.lock index 2f78d2c..9f649f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4339,7 +4339,7 @@ checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "logs-collector" -version = "1.0.11" +version = "1.0.12" dependencies = [ "anyhow", "clap", @@ -4347,6 +4347,7 @@ dependencies = [ "env_logger", "futures", "log", + "rand 0.8.5", "serde_json", "sqd-contract-client", "sqd-messages", diff --git a/crates/logs-collector/Cargo.toml b/crates/logs-collector/Cargo.toml index 261e262..34388ea 100644 --- a/crates/logs-collector/Cargo.toml +++ b/crates/logs-collector/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "logs-collector" -version = "1.0.11" +version = "1.0.12" edition = "2021" [dependencies] @@ -9,6 +9,7 @@ clap = { version = "4", features = ["derive", "env"] } env_logger = "0.11" futures = "0.3" log = "0.4" +rand = "0.8.5" serde_json = "1" tokio = { version = "1", features = ["full"] } diff --git a/crates/logs-collector/src/server.rs b/crates/logs-collector/src/server.rs index df4fe0a..c7df584 100644 --- a/crates/logs-collector/src/server.rs +++ b/crates/logs-collector/src/server.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::Duration; use futures::{Stream, StreamExt}; +use rand::seq::IteratorRandom; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::RwLock; @@ -130,6 +131,12 @@ where Ok(seq_nums) => seq_nums, Err(e) => return log::error!("Error saving logs to storage: {e:?}"), }; + // The pubsub message limit size prevents us from sending all sequence numbers + let sequence_numbers = sequence_numbers + .into_iter() + .choose_multiple(&mut rand::thread_rng(), 900) + .into_iter() + .collect(); let logs_collected = LogsCollected { sequence_numbers }; if transport_handle.logs_collected(logs_collected).is_err() { log::error!("Error broadcasting logs collected: queue full");