diff --git a/Makefile b/Makefile index 35cac22a..6ff9a9bd 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,7 @@ faktory/tls: .PHONY: faktory/tls/kill faktory/tls/kill: - docker compose -f docker/compose.yml down + docker compose -f docker/compose.yml down -v .PHONY: test test: diff --git a/src/lib.rs b/src/lib.rs index 8f4617e0..f282e408 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -74,7 +74,10 @@ mod proto; mod worker; pub use crate::error::Error; -pub use crate::proto::{Client, Job, JobBuilder, JobId, Reconnect, WorkerId}; +pub use crate::proto::{ + Client, FaktoryServerProcessStats, FaktoryServiceStats, Job, JobBuilder, JobId, Reconnect, + ServerState, WorkerId, +}; pub use crate::worker::{JobRunner, Worker, WorkerBuilder}; #[cfg(feature = "ent")] diff --git a/src/proto/client/mod.rs b/src/proto/client/mod.rs index 893f5bd7..ce6fa4e1 100644 --- a/src/proto/client/mod.rs +++ b/src/proto/client/mod.rs @@ -373,10 +373,10 @@ where Ok((jobs_count - errors.len(), Some(errors))) } - /// Retrieve information about the running server. + /// Retrieve [information](crate::ServerState) about the running server. /// /// The returned value is the result of running the `INFO` command on the server. - pub async fn info(&mut self) -> Result { + pub async fn info(&mut self) -> Result { self.issue(&Info) .await? .read_json() diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 5768b085..a92788d6 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -9,7 +9,9 @@ pub(crate) use client::{ClientOptions, HeartbeatStatus, EXPECTED_PROTOCOL_VERSIO mod single; -pub use single::{Job, JobBuilder, JobId, WorkerId}; +pub use single::{ + FaktoryServerProcessStats, FaktoryServiceStats, Job, JobBuilder, JobId, ServerState, WorkerId, +}; pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl}; diff --git a/src/proto/single/resp.rs b/src/proto/single/resp.rs index 18e5a9b1..b24a9d72 100644 --- a/src/proto/single/resp.rs +++ b/src/proto/single/resp.rs @@ -1,7 +1,10 @@ +use std::collections::HashMap; + #[cfg(feature = "ent")] use crate::ent::BatchId; use crate::error::{self, Error}; +use chrono::{DateTime, Utc}; use tokio::io::{AsyncBufReadExt, AsyncReadExt}; pub fn bad(expected: &'static str, got: &RawResponse) -> error::Protocol { @@ -117,6 +120,88 @@ pub async fn read_ok(r: R) -> Result<(), Error> { Err(bad("server ok", &rr).into()) } +// ---------------------------------------------- + +/// Faktory service stats. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct FaktoryServiceStats { + /// Total number of job failures. + pub total_failures: u64, + + /// Total number of processed jobs. + pub total_processed: u64, + + /// Total number of enqueued jobs. + pub total_enqueued: u64, + + /// Total number of queues. + pub total_queues: u64, + + /// Queues stats. + /// + /// A mapping between a queue name and its size (number of jobs on the queue). + /// The keys of this map effectively make up a list of queues that are currently + /// registered in the Faktory service. + pub queues: HashMap, + + /// Faktory's task runner stats. + /// + /// Note that this is exposed as a "generic" `serde_json::Value` since this info + /// belongs to deep implementation details of the Faktory service. + pub tasks: serde_json::Value, +} + +/// Faktory's server process stats. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct FaktoryServerProcessStats { + /// Faktory's description (e.g. "Faktory"). + pub description: String, + + /// Faktory's version as semver (e.g. "1.8.0"). + pub faktory_version: String, + + /// Faktory server process uptime in seconds. + pub uptime: u64, + + /// Number of clients connected to the server. + pub connections: u64, + + /// Number of executed commands. + pub command_count: u64, + + /// Faktory server process memory usage. + pub used_memory_mb: u64, +} + +/// Current server state. +/// +/// Contains such details as how many queues there are on the server, statistics on the jobs, +/// as well as some specific info on server process memory usage, uptime, etc. +/// +/// Here is an example of the simplest way to fetch info on the server state. +/// ```no_run +/// # tokio_test::block_on(async { +/// use faktory::Client; +/// +/// let mut client = Client::connect(None).await.unwrap(); +/// let _server_state = client.info().await.unwrap(); +/// # }); +/// ``` +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ServerState { + /// Server time. + pub now: DateTime, + + /// Server time as a string formatted as "%H:%M:%S UTC" (e.g. "19:47:39 UTC"). + pub server_utc_time: String, + + /// Faktory service stats. + pub faktory: FaktoryServiceStats, + + /// Faktory's server process stats. + pub server: FaktoryServerProcessStats, +} + // ---------------------------------------------- // // below is the implementation of the Redis RESP protocol diff --git a/tests/real/community.rs b/tests/real/community.rs index b473118f..9c1454bc 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -61,6 +61,63 @@ async fn roundtrip() { assert!(drained); } +#[tokio::test(flavor = "multi_thread")] +async fn server_state() { + skip_check!(); + + let local = "server_state"; + + // prepare a worker + let mut w = WorkerBuilder::default(); + w.register(local, move |_| async move { Ok::<(), io::Error>(()) }); + let mut w = w.connect(None).await.unwrap(); + + // prepare a producing client + let mut client = Client::connect(None).await.unwrap(); + + // examine server state before pushing anything + let server_state = client.info().await.unwrap(); + assert_eq!(*server_state.faktory.queues.get(local).unwrap(), 0); + + // the following two assertions are not super-helpful but + // there is not much info we can make meaningful assetions on anyhow + // (like memusage, server description string, version, etc.) + assert!(server_state.server.connections >= 2); // at least two clients from the current test + assert!(server_state.server.uptime > 0); // if IPC is happenning, this should hold :) + + // push 1 job + client + .enqueue( + JobBuilder::new(local) + .args(vec!["abc"]) + .queue(local) + .build(), + ) + .await + .unwrap(); + + // we only pushed 1 job on this queue + let server_state = client.info().await.unwrap(); + assert_eq!(*server_state.faktory.queues.get(local).unwrap(), 1); + assert!(server_state.faktory.total_enqueued >= 1); // at least 1 job from this test + assert!(server_state.faktory.total_queues >= 1); // at least 1 qeueu from this test + + // let's know consume that job ... + assert!(w.run_one(0, &[local]).await.unwrap()); + + // ... and verify the queue has got 0 pending jobs + // + // NB! If this is not passing locally, make sure to launch a fresh Faktory container, + // because if you have not pruned its volume _AND_ there somehow used to be a pending job + // on this local queue, you can imagine the test will fail. But generally, by consuming + // the jobs from the local queue, we are getting a clean up for free and there is normally + // no need to purge docker volumes to perform the next test run. + // Also note that on CI we are always starting a-fresh. + let server_state = client.info().await.unwrap(); + assert_eq!(*server_state.faktory.queues.get(local).unwrap(), 0); + assert!(server_state.faktory.total_processed >= 1); // at least 1 job from this test +} + #[tokio::test(flavor = "multi_thread")] async fn multi() { skip_check!(); @@ -222,9 +279,9 @@ async fn queue_control_actions() { // let's inspect the sever state let server_state = client.info().await.unwrap(); - let queues = server_state.get("faktory").unwrap().get("queues").unwrap(); - assert_eq!(queues.get(local_1).unwrap(), 1); // 1 job remaining - assert_eq!(queues.get(local_2).unwrap(), 1); // also 1 job remaining + let queues = &server_state.faktory.queues; + assert_eq!(*queues.get(local_1).unwrap(), 1); // 1 job remaining + assert_eq!(*queues.get(local_2).unwrap(), 1); // also 1 job remaining // let's now remove the queues client.queue_remove(&[local_1, local_2]).await.unwrap(); @@ -237,7 +294,7 @@ async fn queue_control_actions() { // let's inspect the sever state again let server_state = client.info().await.unwrap(); - let queues = server_state.get("faktory").unwrap().get("queues").unwrap(); + let queues = &server_state.faktory.queues; // our queue are not even mentioned in the server report: assert!(queues.get(local_1).is_none()); assert!(queues.get(local_2).is_none()); @@ -303,9 +360,9 @@ async fn queue_control_actions_wildcard() { // let's inspect the sever state let server_state = client.info().await.unwrap(); - let queues = server_state.get("faktory").unwrap().get("queues").unwrap(); - assert_eq!(queues.get(local_1).unwrap(), 1); // 1 job remaining - assert_eq!(queues.get(local_2).unwrap(), 1); // also 1 job remaining + let queues = &server_state.faktory.queues; + assert_eq!(*queues.get(local_1).unwrap(), 1); // 1 job remaining + assert_eq!(*queues.get(local_2).unwrap(), 1); // also 1 job remaining // let's now remove all the queues client.queue_remove_all().await.unwrap(); @@ -318,7 +375,7 @@ async fn queue_control_actions_wildcard() { // let's inspect the sever state again let server_state = client.info().await.unwrap(); - let queues = server_state.get("faktory").unwrap().get("queues").unwrap(); + let queues = &server_state.faktory.queues; // our queue are not even mentioned in the server report: assert!(queues.get(local_1).is_none()); assert!(queues.get(local_2).is_none());