Skip to content

Commit

Permalink
chore: add metrics to new storage queue impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Horusiath committed Nov 16, 2024
1 parent 2112d95 commit a366053
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 26 deletions.
1 change: 1 addition & 0 deletions services/appflowy-collaborate/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
collab_storage_access_control,
snapshot_control,
rt_cmd_tx,
metrics.collab_metrics.clone(),
));
let app_state = AppState {
config: Arc::new(config.clone()),
Expand Down
30 changes: 23 additions & 7 deletions services/appflowy-collaborate/src/collab/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct CollabStorageImpl<AC> {
snapshot_control: SnapshotControl,
rt_cmd_sender: CLCommandSender,
queue: Sender<PendingCollabWrite>,
metrics: Arc<CollabMetrics>,
}

impl<AC> CollabStorageImpl<AC>
Expand All @@ -74,23 +75,30 @@ where
access_control: AC,
snapshot_control: SnapshotControl,
rt_cmd_sender: CLCommandSender,
metrics: Arc<CollabMetrics>,
) -> Self {
let (queue, reader) = channel(1000);
{
let cache = cache.clone();
tokio::spawn(Self::periodic_write_task(cache, reader));
}
tokio::spawn(Self::periodic_write_task(
cache.clone(),
metrics.clone(),
reader,
));
Self {
cache,
access_control,
snapshot_control,
rt_cmd_sender,
queue,
metrics,
}
}

const PENDING_WRITE_BUF_CAPACITY: usize = 100;
async fn periodic_write_task(cache: CollabCache, mut reader: Receiver<PendingCollabWrite>) {
async fn periodic_write_task(
cache: CollabCache,
metrics: Arc<CollabMetrics>,
mut reader: Receiver<PendingCollabWrite>,
) {
let mut buf = Vec::with_capacity(Self::PENDING_WRITE_BUF_CAPACITY);
loop {
let n = reader
Expand All @@ -100,14 +108,15 @@ where
break;
}
let pending = buf.drain(..n);
if let Err(e) = Self::persist(&cache, pending).await {
if let Err(e) = Self::persist(&cache, &metrics, pending).await {
tracing::error!("failed to persist {} collabs: {}", n, e);
}
}
}

async fn persist(
cache: &CollabCache,
metrics: &CollabMetrics,
records: impl ExactSizeIterator<Item = PendingCollabWrite>,
) -> Result<(), AppError> {
// Start a database transaction
Expand All @@ -118,7 +127,8 @@ where
.context("Failed to acquire transaction for writing pending collaboration data")
.map_err(AppError::from)?;

tracing::trace!("persisting {} collabs", records.len());
let total_records = records.len();
let mut successful_writes = 0;
// Insert each record into the database within the transaction context
let mut action_description = String::new();
for (index, record) in records.into_iter().enumerate() {
Expand All @@ -137,9 +147,13 @@ where
sqlx::query(&format!("ROLLBACK TO SAVEPOINT {}", savepoint_name))
.execute(transaction.deref_mut())
.await?;
} else {
successful_writes += 1;
}
}

metrics.record_write_collab(successful_writes, total_records as _);

// Commit the transaction to finalize all writes
match tokio::time::timeout(Duration::from_secs(10), transaction.commit()).await {
Ok(result) => {
Expand Down Expand Up @@ -305,6 +319,8 @@ where
let pending = PendingCollabWrite::new(workspace_id.into(), *uid, params);
if let Err(e) = self.queue.send(pending).await {
tracing::error!("Failed to queue insert collab doc state: {}", e);
} else {
self.metrics.record_queue_collab(1);
}
Ok(())
}
Expand Down
30 changes: 11 additions & 19 deletions services/appflowy-collaborate/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::sync::Arc;
use std::time::Duration;

use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::Histogram;
use prometheus_client::registry::Registry;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;

use database::collab::CollabStorage;
Expand Down Expand Up @@ -146,10 +146,9 @@ where
pub struct CollabMetrics {
success_write_snapshot_count: Gauge,
total_write_snapshot_count: Gauge,
success_write_collab_count: Gauge,
total_write_collab_count: Gauge,
total_queue_collab_count: Gauge,
success_queue_collab_count: Gauge,
success_write_collab_count: Counter,
total_write_collab_count: Counter,
success_queue_collab_count: Counter,
}

impl CollabMetrics {
Expand All @@ -159,7 +158,6 @@ impl CollabMetrics {
total_write_snapshot_count: Default::default(),
success_write_collab_count: Default::default(),
total_write_collab_count: Default::default(),
total_queue_collab_count: Default::default(),
success_queue_collab_count: Default::default(),
}
}
Expand Down Expand Up @@ -192,11 +190,6 @@ impl CollabMetrics {
"success queue collab",
metrics.success_queue_collab_count.clone(),
);
realtime_registry.register(
"total_queue_collab_count",
"total queue pending collab",
metrics.total_queue_collab_count.clone(),
);

metrics
}
Expand All @@ -206,13 +199,12 @@ impl CollabMetrics {
self.total_write_snapshot_count.set(total_attempt);
}

pub fn record_write_collab(&self, success_attempt: i64, total_attempt: i64) {
self.success_write_collab_count.set(success_attempt);
self.total_write_collab_count.set(total_attempt);
pub fn record_write_collab(&self, success_attempt: u64, total_attempt: u64) {
self.success_write_collab_count.inc_by(success_attempt);
self.total_write_collab_count.inc_by(total_attempt);
}

pub fn record_queue_collab(&self, success_attempt: i64, total_attempt: i64) {
self.success_queue_collab_count.set(success_attempt);
self.total_queue_collab_count.set(total_attempt);
pub fn record_queue_collab(&self, attempt: u64) {
self.success_queue_collab_count.inc_by(attempt);
}
}
1 change: 1 addition & 0 deletions src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
collab_storage_access_control,
snapshot_control,
rt_cmd_tx,
metrics.collab_metrics.clone(),
));

info!(
Expand Down

0 comments on commit a366053

Please sign in to comment.