Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ServerState and Client::current_info #63

Merged
merged 17 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ tokio = { version = "1.35.1", features = [
tokio-native-tls = { version = "0.3.1", optional = true }
tokio-rustls = { version = "0.25.0", optional = true }
url = "2"
semver = { version = "1.0.23", features = ["serde"] }

[dev-dependencies]
rustls-pki-types = "1.0.1"
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ faktory/tls:

.PHONY: faktory/tls/kill
faktory/tls/kill:
docker compose -f docker/compose.yml down
docker compose -f docker/compose.yml down -v

.PHONY: test
test:
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ mod proto;
mod worker;

pub use crate::error::Error;
pub use crate::proto::{Client, Job, JobBuilder, JobId, Reconnect, WorkerId};
pub use crate::proto::{
Client, DataSnapshot, FaktoryState, Job, JobBuilder, JobId, Reconnect, ServerSnapshot, WorkerId,
};
pub use crate::worker::{JobRunner, Worker, WorkerBuilder};

#[cfg(feature = "ent")]
Expand Down
4 changes: 2 additions & 2 deletions src/proto/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,10 @@ where
Ok((jobs_count - errors.len(), Some(errors)))
}

/// Retrieve information about the running server.
/// Retrieve [information](crate::ServerSnapshot) about the running server.
///
/// The returned value is the result of running the `INFO` command on the server.
pub async fn info(&mut self) -> Result<serde_json::Value, Error> {
pub async fn current_info(&mut self) -> Result<single::FaktoryState, Error> {
self.issue(&Info)
.await?
.read_json()
Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) use client::{ClientOptions, HeartbeatStatus, EXPECTED_PROTOCOL_VERSIO

mod single;

pub use single::{Job, JobBuilder, JobId, WorkerId};
pub use single::{DataSnapshot, FaktoryState, Job, JobBuilder, JobId, ServerSnapshot, WorkerId};

pub(crate) use single::{Ack, Fail, Info, Push, PushBulk, QueueAction, QueueControl};

Expand Down
4 changes: 2 additions & 2 deletions src/proto/single/ent/progress.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::proto::single::JobId;

use super::utils;
use crate::proto::single::JobId;
use chrono::{DateTime, Utc};
use derive_builder::Builder;

/// Info on job execution progress (sent).
///
/// In Enterprise Faktory, a client executing a job can report on the execution
Expand Down
112 changes: 108 additions & 4 deletions src/proto/single/resp.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use super::utils;
use crate::error::{self, Error};
use chrono::{DateTime, Utc};
use std::collections::BTreeMap;
use std::time::Duration;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt};

#[cfg(feature = "ent")]
use crate::ent::BatchId;

use crate::error::{self, Error};
use tokio::io::AsyncBufRead;

pub fn bad(expected: &'static str, got: &RawResponse) -> error::Protocol {
let stringy = match *got {
RawResponse::String(ref s) => Some(&**s),
Expand Down Expand Up @@ -118,6 +122,107 @@ pub async fn read_ok<R: AsyncBufRead + Unpin>(r: R) -> Result<(), Error> {
Err(bad("server ok", &rr).into())
}

// ----------------------------------------------

/// Faktory service information.
///
/// This holds information on the registered [queues](DataSnapshot::queues) as well as
/// some aggregated data, e.g. total number of jobs [processed](DataSnapshot::total_processed),
/// total number of jobs [enqueued](DataSnapshot::total_enqueued), etc.
#[derive(Serialize, Deserialize, Debug, Clone)]
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
#[non_exhaustive]
pub struct DataSnapshot {
/// Total number of job failures.
pub total_failures: u64,

/// Total number of processed jobs.
pub total_processed: u64,

/// Total number of enqueued jobs.
pub total_enqueued: u64,

/// Total number of queues.
pub total_queues: u64,

/// Queues stats.
///
/// A mapping between a queue name and its size (number of jobs on the queue).
/// The keys of this map effectively make up a list of queues that are currently
/// registered in the Faktory service.
pub queues: BTreeMap<String, u64>,

/// ***Deprecated***. Faktory's task runner stats.
///
/// Note that this is exposed as a "generic" `serde_json::Value` since this info
/// belongs to deep implementation details of the Faktory service.
#[deprecated(
note = "marked as deprecated in the Faktory source code and is likely to be completely removed in the future, so please do not rely on this data"
)]
pub tasks: serde_json::Value,
}

/// Faktory's server process information.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ServerSnapshot {
/// Description of the server process (e.g. "Faktory").
pub description: String,

/// Faktory's version as semver.
#[serde(rename = "faktory_version")]
pub version: semver::Version,

/// Faktory server process uptime in seconds.
#[serde(deserialize_with = "utils::deser_duration")]
#[serde(serialize_with = "utils::ser_duration")]
pub uptime: Duration,

/// Number of clients connected to the server.
pub connections: u64,

/// Number of executed commands.
pub command_count: u64,

/// Faktory server process memory usage.
pub used_memory_mb: u64,
}

/// Current server state.
///
/// Contains such details as how many queues there are on the server, statistics on the jobs,
/// as well as some specific info on server process memory usage, uptime, etc.
///
/// Here is an example of the simplest way to fetch info on the server state.
/// ```no_run
/// # tokio_test::block_on(async {
/// use faktory::Client;
///
/// let mut client = Client::connect(None).await.unwrap();
/// let _server_state = client.current_info().await.unwrap();
/// # });
/// ```
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FaktoryState {
/// Server time.
pub now: DateTime<Utc>,

/// Server time (naive representation).
///
/// Faktory sends it as a string formatted as "%H:%M:%S UTC" (e.g. "19:47:39 UTC")
/// and it is being parsed as `NaiveTime`.
///
/// Most of the time, though, you will want to use [`FaktoryState::now`] instead.
#[serde(deserialize_with = "utils::deser_server_time")]
#[serde(serialize_with = "utils::ser_server_time")]
pub server_utc_time: chrono::naive::NaiveTime,

/// Faktory service information.
#[serde(rename = "faktory")]
pub data: DataSnapshot,

/// Faktory's server process information.
pub server: ServerSnapshot,
}

// ----------------------------------------------
//
// below is the implementation of the Redis RESP protocol
Expand All @@ -132,7 +237,6 @@ pub enum RawResponse {
Null,
}

use tokio::io::{AsyncBufReadExt, AsyncReadExt};
async fn read<R>(mut r: R) -> Result<RawResponse, Error>
where
R: AsyncBufRead + Unpin,
Expand Down
80 changes: 80 additions & 0 deletions src/proto/single/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use chrono::naive::NaiveTime;
use rand::{thread_rng, Rng};
use serde::{de::Deserializer, Deserialize, Serializer};
use std::time::Duration;

const JOB_ID_LENGTH: usize = 16;
const WORKER_ID_LENGTH: usize = 32;
Expand All @@ -19,6 +22,44 @@ pub(crate) fn gen_random_wid() -> String {
gen_random_id(WORKER_ID_LENGTH)
}

pub(crate) fn ser_duration<S>(value: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let secs = value.as_secs();
serializer.serialize_u64(secs)
}

pub(crate) fn deser_duration<'de, D>(value: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let secs = u64::deserialize(value)?;
Ok(Duration::from_secs(secs))
}

pub(crate) fn ser_server_time<S>(value: &NaiveTime, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("{} UTC", value))
}

pub(crate) fn deser_server_time<'de, D>(value: D) -> Result<NaiveTime, D::Error>
where
D: Deserializer<'de>,
{
let naive_time_str = String::deserialize(value)?;
let naive_time_str = naive_time_str
.strip_suffix(" UTC")
.ok_or(serde::de::Error::custom(
"Expected a naive time string that ends with ' UTC'",
))?;
let naive_time =
NaiveTime::parse_from_str(naive_time_str, "%H:%M:%S").map_err(serde::de::Error::custom)?;
Ok(naive_time)
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -49,4 +90,43 @@ mod test {
}
assert_eq!(ids.len(), 1_000_000);
}

#[test]
fn test_ser_deser_duration() {
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct FaktoryServer {
#[serde(deserialize_with = "deser_duration")]
#[serde(serialize_with = "ser_duration")]
uptime: Duration,
}

let server = FaktoryServer {
uptime: Duration::from_secs(2024),
};

let serialized = serde_json::to_string(&server).expect("serialized ok");
let deserialized = serde_json::from_str(&serialized).expect("deserialized ok");
assert_eq!(server, deserialized);
}

#[test]
fn test_ser_deser_server_time() {
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct FaktoryServer {
/// Server time as a string formatted as "%H:%M:%S UTC" (e.g. "19:47:39 UTC").
#[serde(deserialize_with = "deser_server_time")]
#[serde(serialize_with = "ser_server_time")]
pub server_utc_time: NaiveTime,
}

let server = FaktoryServer {
server_utc_time: NaiveTime::from_hms_opt(19, 47, 39).expect("valid"),
};

let serialized = serde_json::to_string(&server).expect("serialized ok");
assert_eq!(serialized, "{\"server_utc_time\":\"19:47:39 UTC\"}");

let deserialized = serde_json::from_str(&serialized).expect("deserialized ok");
assert_eq!(server, deserialized);
}
}
Loading
Loading