Skip to content

Commit

Permalink
Make Client::info return ServerState struct
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Apr 18, 2024
1 parent c49f972 commit 8cb472e
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ faktory/tls:

.PHONY: faktory/tls/kill
faktory/tls/kill:
docker compose -f docker/compose.yml down
docker compose -f docker/compose.yml down -v

.PHONY: test
test:
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ mod proto;
mod worker;

pub use crate::error::Error;
pub use crate::proto::{Client, Job, JobBuilder, JobId, Reconnect, WorkerId};
pub use crate::proto::{
Client, FaktoryServerProcessStats, FaktoryServiceStats, Job, JobBuilder, JobId, Reconnect,
ServerState, WorkerId,
};
pub use crate::worker::{JobRunner, Worker, WorkerBuilder};

#[cfg(feature = "ent")]
Expand Down
4 changes: 2 additions & 2 deletions src/proto/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,10 @@ where
Ok((jobs_count - errors.len(), Some(errors)))
}

/// Retrieve information about the running server.
/// Retrieve [information](crate::ServerState) about the running server.
///
/// The returned value is the result of running the `INFO` command on the server.
pub async fn info(&mut self) -> Result<serde_json::Value, Error> {
pub async fn info(&mut self) -> Result<single::ServerState, Error> {
self.issue(&Info)
.await?
.read_json()
Expand Down
4 changes: 3 additions & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ pub(crate) use client::{ClientOptions, HeartbeatStatus, EXPECTED_PROTOCOL_VERSIO

mod single;

pub use single::{Job, JobBuilder, JobId, WorkerId};
pub use single::{
FaktoryServerProcessStats, FaktoryServiceStats, Job, JobBuilder, JobId, ServerState, WorkerId,
};

pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl};

Expand Down
85 changes: 85 additions & 0 deletions src/proto/single/resp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::HashMap;

#[cfg(feature = "ent")]
use crate::ent::BatchId;

use crate::error::{self, Error};
use chrono::{DateTime, Utc};
use tokio::io::{AsyncBufReadExt, AsyncReadExt};

pub fn bad(expected: &'static str, got: &RawResponse) -> error::Protocol {
Expand Down Expand Up @@ -117,6 +120,88 @@ pub async fn read_ok<R: AsyncBufReadExt + Unpin>(r: R) -> Result<(), Error> {
Err(bad("server ok", &rr).into())
}

// ----------------------------------------------

/// Faktory service stats.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FaktoryServiceStats {
/// Total number of job failures.
pub total_failures: u64,

/// Total number of processed jobs.
pub total_processed: u64,

/// Total number of enqueued jobs.
pub total_enqueued: u64,

/// Total number of queues.
pub total_queues: u64,

/// Queues stats.
///
/// A mapping between a queue name and its size (number of jobs on the queue).
/// The keys of this map effectively make up a list of queues that are currently
/// registered in the Faktory service.
pub queues: HashMap<String, u64>,

/// Faktory's task runner stats.
///
/// Note that this is exposed as a "generic" `serde_json::Value` since this info
/// belongs to deep implementation details of the Faktory service.
pub tasks: serde_json::Value,
}

/// Faktory's server process stats.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FaktoryServerProcessStats {
/// Faktory's description (e.g. "Faktory").
pub description: String,

/// Faktory's version as semver (e.g. "1.8.0").
pub faktory_version: String,

/// Faktory server process uptime in seconds.
pub uptime: u64,

/// Number of clients connected to the server.
pub connections: u64,

/// Number of executed commands.
pub command_count: u64,

/// Faktory server process memory usage.
pub used_memory_mb: u64,
}

/// Current server state.
///
/// Contains such details as how many queues there are on the server, statistics on the jobs,
/// as well as some specific info on server process memory usage, uptime, etc.
///
/// Here is an example of the simplest way to fetch info on the server state.
/// ```no_run
/// # tokio_test::block_on(async {
/// use faktory::Client;
///
/// let mut client = Client::connect(None).await.unwrap();
/// let _server_state = client.info().await.unwrap();
/// # });
/// ```
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ServerState {
/// Server time.
pub now: DateTime<Utc>,

/// Server time as a string formatted as "%H:%M:%S UTC" (e.g. "19:47:39 UTC").
pub server_utc_time: String,

/// Faktory service stats.
pub faktory: FaktoryServiceStats,

/// Faktory's server process stats.
pub server: FaktoryServerProcessStats,
}

// ----------------------------------------------
//
// below is the implementation of the Redis RESP protocol
Expand Down
73 changes: 65 additions & 8 deletions tests/real/community.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,63 @@ async fn roundtrip() {
assert!(drained);
}

#[tokio::test(flavor = "multi_thread")]
async fn server_state() {
skip_check!();

let local = "server_state";

// prepare a worker
let mut w = WorkerBuilder::default();
w.register(local, move |_| async move { Ok::<(), io::Error>(()) });
let mut w = w.connect(None).await.unwrap();

// prepare a producing client
let mut client = Client::connect(None).await.unwrap();

// examine server state before pushing anything
let server_state = client.info().await.unwrap();
assert_eq!(*server_state.faktory.queues.get(local).unwrap(), 0);

// the following two assertions are not super-helpful but
// there is not much info we can make meaningful assetions on anyhow
// (like memusage, server description string, version, etc.)
assert!(server_state.server.connections >= 2); // at least two clients from the current test
assert!(server_state.server.uptime > 0); // if IPC is happenning, this should hold :)

// push 1 job
client
.enqueue(
JobBuilder::new(local)
.args(vec!["abc"])
.queue(local)
.build(),
)
.await
.unwrap();

// we only pushed 1 job on this queue
let server_state = client.info().await.unwrap();
assert_eq!(*server_state.faktory.queues.get(local).unwrap(), 1);
assert!(server_state.faktory.total_enqueued >= 1); // at least 1 job from this test
assert!(server_state.faktory.total_queues >= 1); // at least 1 qeueu from this test

// let's know consume that job ...
assert!(w.run_one(0, &[local]).await.unwrap());

// ... and verify the queue has got 0 pending jobs
//
// NB! If this is not passing locally, make sure to launch a fresh Faktory container,
// because if you have not pruned its volume _AND_ there somehow used to be a pending job
// on this local queue, you can imagine the test will fail. But generally, by consuming
// the jobs from the local queue, we are getting a clean up for free and there is normally
// no need to purge docker volumes to perform the next test run.
// Also note that on CI we are always starting a-fresh.
let server_state = client.info().await.unwrap();
assert_eq!(*server_state.faktory.queues.get(local).unwrap(), 0);
assert!(server_state.faktory.total_processed >= 1); // at least 1 job from this test
}

#[tokio::test(flavor = "multi_thread")]
async fn multi() {
skip_check!();
Expand Down Expand Up @@ -222,9 +279,9 @@ async fn queue_control_actions() {

// let's inspect the sever state
let server_state = client.info().await.unwrap();
let queues = server_state.get("faktory").unwrap().get("queues").unwrap();
assert_eq!(queues.get(local_1).unwrap(), 1); // 1 job remaining
assert_eq!(queues.get(local_2).unwrap(), 1); // also 1 job remaining
let queues = &server_state.faktory.queues;
assert_eq!(*queues.get(local_1).unwrap(), 1); // 1 job remaining
assert_eq!(*queues.get(local_2).unwrap(), 1); // also 1 job remaining

// let's now remove the queues
client.queue_remove(&[local_1, local_2]).await.unwrap();
Expand All @@ -237,7 +294,7 @@ async fn queue_control_actions() {

// let's inspect the sever state again
let server_state = client.info().await.unwrap();
let queues = server_state.get("faktory").unwrap().get("queues").unwrap();
let queues = &server_state.faktory.queues;
// our queue are not even mentioned in the server report:
assert!(queues.get(local_1).is_none());
assert!(queues.get(local_2).is_none());
Expand Down Expand Up @@ -303,9 +360,9 @@ async fn queue_control_actions_wildcard() {

// let's inspect the sever state
let server_state = client.info().await.unwrap();
let queues = server_state.get("faktory").unwrap().get("queues").unwrap();
assert_eq!(queues.get(local_1).unwrap(), 1); // 1 job remaining
assert_eq!(queues.get(local_2).unwrap(), 1); // also 1 job remaining
let queues = &server_state.faktory.queues;
assert_eq!(*queues.get(local_1).unwrap(), 1); // 1 job remaining
assert_eq!(*queues.get(local_2).unwrap(), 1); // also 1 job remaining

// let's now remove all the queues
client.queue_remove_all().await.unwrap();
Expand All @@ -318,7 +375,7 @@ async fn queue_control_actions_wildcard() {

// let's inspect the sever state again
let server_state = client.info().await.unwrap();
let queues = server_state.get("faktory").unwrap().get("queues").unwrap();
let queues = &server_state.faktory.queues;
// our queue are not even mentioned in the server report:
assert!(queues.get(local_1).is_none());
assert!(queues.get(local_2).is_none());
Expand Down

0 comments on commit 8cb472e

Please sign in to comment.