Skip to content

Commit

Permalink
Merge pull request #16 from tompro/aggregation-query
Browse files Browse the repository at this point in the history
Aggregation query
  • Loading branch information
tompro authored Dec 17, 2022
2 parents 758f532 + 671ac39 commit fbebaaf
Show file tree
Hide file tree
Showing 6 changed files with 424 additions and 201 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "redis_ts"
version = "0.4.2"
version = "0.5.0"
authors = ["protom <[email protected]>"]
keywords = ["redis", "database"]
description = "API for Redis time series types."
Expand Down
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
# redis_ts

[![crates.io](https://img.shields.io/badge/crates.io-v0.4.2-orange)](https://crates.io/crates/redis_ts)
[![crates.io](https://img.shields.io/badge/crates.io-v0.5.0-orange)](https://crates.io/crates/redis_ts)
![Continuous integration](https://github.com/tompro/redis_ts/workflows/Continuous%20integration/badge.svg)

Versions >= 0.5 contain a breaking change in the argument list of range queries. With some recent additions in the
Redis time series module the number of arguments for ts_range, ts_revrange, ts_mrange and ts_mrevrange have simply
grown to long. All existing and the new arguments are now replaced by a single `TsRangeQuery` struct for which there
is also a builder available.

redis_ts provides a small trait with extension functions for the
[redis](https://docs.rs/redis) crate to allow
working with redis time series data that can be installed as
a [redis module](https://oss.redislabs.com/redistimeseries). Time
series commands are available as synchronous and asynchronous versions.

The crate is called `redis_ts` and you can depend on it via cargo. You will
also need redis in your dependencies. It has been tested agains redis 0.22.1
also need redis in your dependencies. It has been tested against redis 0.22.1
but should work with versions higher than that.

```ini
[dependencies]
redis = "0.22.1"
redis_ts = "0.4.2"
redis_ts = "0.5.0"
```

Or via git:
Expand All @@ -30,7 +35,7 @@ With async feature inherited from the [redis](https://docs.rs/redis) crate (eith
```ini
[dependencies]
redis = "0.22.1"
redis_ts = { version = "0.4.2", features = ['tokio-comp'] }
redis_ts = { version = "0.5.0", features = ['tokio-comp'] }
```

## Synchronous usage
Expand Down
134 changes: 19 additions & 115 deletions src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,155 +196,62 @@ pub trait TsCommands: ConnectionLike + Sized {
}

#[doc(hidden)]
fn range<
K: ToRedisArgs,
FTS: ToRedisArgs,
TTS: ToRedisArgs,
C: ToRedisArgs,
TS: std::marker::Copy + FromRedisValue,
V: std::marker::Copy + FromRedisValue,
>(
fn range<K: ToRedisArgs, TS: Copy + FromRedisValue, V: Copy + FromRedisValue>(
&mut self,
command: &str,
key: K,
from_timestamp: FTS,
to_timestamp: TTS,
count: Option<C>,
aggregation_type: Option<TsAggregationType>,
query: TsRangeQuery,
) -> RedisResult<TsRange<TS, V>> {
let mut c = cmd(command);
c.arg(key).arg(from_timestamp).arg(to_timestamp);
if let Some(ct) = count {
c.arg("COUNT").arg(ct);
}
c.arg(aggregation_type).query(self)
c.arg(key).arg(query).query(self)
}

/// Executes a redis time series range query.
fn ts_range<
K: ToRedisArgs,
FTS: ToRedisArgs,
TTS: ToRedisArgs,
C: ToRedisArgs,
TS: std::marker::Copy + FromRedisValue,
V: std::marker::Copy + FromRedisValue,
>(
fn ts_range<K: ToRedisArgs, TS: Copy + FromRedisValue, V: Copy + FromRedisValue>(
&mut self,
key: K,
from_timestamp: FTS,
to_timestamp: TTS,
count: Option<C>,
aggregation_type: Option<TsAggregationType>,
query: TsRangeQuery,
) -> RedisResult<TsRange<TS, V>> {
self.range(
"TS.RANGE",
key,
from_timestamp,
to_timestamp,
count,
aggregation_type,
)
self.range("TS.RANGE", key, query)
}

/// Executes a redis time series revrange query.
fn ts_revrange<
K: ToRedisArgs,
FTS: ToRedisArgs,
TTS: ToRedisArgs,
C: ToRedisArgs,
TS: std::marker::Copy + FromRedisValue,
V: std::marker::Copy + FromRedisValue,
>(
fn ts_revrange<K: ToRedisArgs, TS: Copy + FromRedisValue, V: Copy + FromRedisValue>(
&mut self,
key: K,
from_timestamp: FTS,
to_timestamp: TTS,
count: Option<C>,
aggregation_type: Option<TsAggregationType>,
query: TsRangeQuery,
) -> RedisResult<TsRange<TS, V>> {
self.range(
"TS.REVRANGE",
key,
from_timestamp,
to_timestamp,
count,
aggregation_type,
)
self.range("TS.REVRANGE", key, query)
}

#[doc(hidden)]
fn mrange<
FTS: ToRedisArgs,
TTS: ToRedisArgs,
C: ToRedisArgs,
TS: std::default::Default + FromRedisValue + Copy,
V: std::default::Default + FromRedisValue + Copy,
>(
fn mrange<TS: Default + FromRedisValue + Copy, V: Default + FromRedisValue + Copy>(
&mut self,
command: &str,
from_timestamp: FTS,
to_timestamp: TTS,
count: Option<C>,
aggregation_type: Option<TsAggregationType>,
query: TsRangeQuery,
filter_options: TsFilterOptions,
) -> RedisResult<TsMrange<TS, V>> {
let mut c = cmd(command);
c.arg(from_timestamp).arg(to_timestamp);
if let Some(ct) = count {
c.arg("COUNT").arg(ct);
}
c.arg(aggregation_type).arg(filter_options);
c.arg(query).arg(filter_options);
c.query(self)
}

/// Executes multiple redis time series range queries.
fn ts_mrange<
FTS: ToRedisArgs,
TTS: ToRedisArgs,
C: ToRedisArgs,
TS: std::default::Default + FromRedisValue + Copy,
V: std::default::Default + FromRedisValue + Copy,
>(
fn ts_mrange<TS: Default + FromRedisValue + Copy, V: Default + FromRedisValue + Copy>(
&mut self,
from_timestamp: FTS,
to_timestamp: TTS,
count: Option<C>,
aggregation_type: Option<TsAggregationType>,
query: TsRangeQuery,
filter_options: TsFilterOptions,
) -> RedisResult<TsMrange<TS, V>> {
self.mrange(
"TS.MRANGE",
from_timestamp,
to_timestamp,
count,
aggregation_type,
filter_options,
)
self.mrange("TS.MRANGE", query, filter_options)
}

/// Executes multiple redis time series revrange queries.
fn ts_mrevrange<
FTS: ToRedisArgs,
TTS: ToRedisArgs,
C: ToRedisArgs,
TS: std::default::Default + FromRedisValue + Copy,
V: std::default::Default + FromRedisValue + Copy,
>(
fn ts_mrevrange<TS: Default + FromRedisValue + Copy, V: Default + FromRedisValue + Copy>(
&mut self,
from_timestamp: FTS,
to_timestamp: TTS,
count: Option<C>,
aggregation_type: Option<TsAggregationType>,
query: TsRangeQuery,
filter_options: TsFilterOptions,
) -> RedisResult<TsMrange<TS, V>> {
self.mrange(
"TS.MREVRANGE",
from_timestamp,
to_timestamp,
count,
aggregation_type,
filter_options,
)
self.mrange("TS.MREVRANGE", query, filter_options)
}

/// Returns the latest (current) value in a redis time series.
Expand All @@ -356,10 +263,7 @@ pub trait TsCommands: ConnectionLike + Sized {
}

/// Returns the latest (current) value from multiple redis time series.
fn ts_mget<
TS: std::default::Default + FromRedisValue,
V: std::default::Default + FromRedisValue,
>(
fn ts_mget<TS: Default + FromRedisValue, V: Default + FromRedisValue>(
&mut self,
filter_options: TsFilterOptions,
) -> RedisResult<TsMget<TS, V>> {
Expand Down
37 changes: 22 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
//! series commands are available as synchronous and asynchronous versions.
//!
//! The crate is called `redis_ts` and you can depend on it via cargo. You will
//! also need redis in your dependencies. It has been tested agains redis 0.22.1
//! also need redis in your dependencies. It has been tested against redis 0.22.1
//! but should work with versions higher than that.
//!
//! ```ini
//! [dependencies]
//! redis = "0.22.1"
//! redis_ts = "0.4.2"
//! redis_ts = "0.5.0"
//! ```
//!
//! Or via git:
Expand All @@ -26,7 +26,7 @@
//! ```ini
//! [dependencies]
//! redis = "0.22.1"
//! redis_ts = { version = "0.4.2", features = ['tokio-comp'] }
//! redis_ts = { version = "0.5.0", features = ['tokio-comp'] }
//! ```
//!
//! # Synchronous usage
Expand All @@ -35,7 +35,7 @@
//! redis_ts::TsCommands into the scope. All redis time series
//! commands will then be available on your redis connection.
//!
//!
//!
//! ```rust,no_run
//! # fn run() -> redis::RedisResult<()> {
//! use redis::Commands;
Expand Down Expand Up @@ -237,21 +237,26 @@
//! Query for a range of time series data.
//!
//! ```rust,no_run
//! # fn run() -> redis::RedisResult<()> {
//! fn run() -> redis::RedisResult<()> {
//! # use redis::Commands;
//! # use redis_ts::{TsCommands, TsRange, TsAggregationType};
//! # use redis_ts::{TsCommands, TsRange, TsAggregationType, TsRangeQuery};
//! # let client = redis::Client::open("redis://127.0.0.1/")?;
//! # let mut con = client.get_connection()?;
//! let first_three_avg:TsRange<u64,f64> = con.ts_range(
//! "my_engine", "-", "+", Some(3), Some(TsAggregationType::Avg(5000))
//! "my_engine",
//! TsRangeQuery::default()
//! .count(3)
//! .aggregation_type(TsAggregationType::Avg(5000))
//! )?;
//!
//! let range_raw:TsRange<u64,f64> = con.ts_range(
//! "my_engine", 1234, 5678, None::<usize>, None
//! "my_engine",
//! TsRangeQuery::default().from(1234).to(5678)
//! )?;
//!
//! let rev_range_raw:TsRange<u64,f64> = con.ts_revrange(
//! "my_engine", 1234, 5678, None::<usize>, None
//! "my_engine",
//! TsRangeQuery::default().from(1234).to(5678)
//! )?;
//! # Ok(()) }
//! ```
Expand All @@ -262,21 +267,21 @@
//! ```rust,no_run
//! # fn run() -> redis::RedisResult<()> {
//! # use redis::Commands;
//! # use redis_ts::{TsCommands, TsMrange, TsAggregationType, TsFilterOptions};
//! # use redis_ts::{TsCommands, TsMrange, TsAggregationType, TsFilterOptions, TsRangeQuery};
//! # let client = redis::Client::open("redis://127.0.0.1/")?;
//! # let mut con = client.get_connection()?;
//! let first_three_avg:TsMrange<u64,f64> = con.ts_mrange(
//! "-", "+", Some(3), Some(TsAggregationType::Avg(5000)),
//! TsRangeQuery::default().count(3).aggregation_type(TsAggregationType::Avg(5000)),
//! TsFilterOptions::default().equals("sensor", "temperature")
//! )?;
//!
//! let range_raw:TsMrange<u64,f64> = con.ts_mrange(
//! 1234, 5678, None::<usize>, None,
//! TsRangeQuery::default().from(1234).to(5678),
//! TsFilterOptions::default().equals("sensor", "temperature")
//! )?;
//!
//! let rev_range_raw:TsMrange<u64,f64> = con.ts_mrevrange(
//! 1234, 5678, None::<usize>, None,
//! TsRangeQuery::default().from(1234).to(5678),
//! TsFilterOptions::default().equals("sensor", "temperature")
//! )?;
//! # Ok(()) }
Expand Down Expand Up @@ -338,14 +343,16 @@
//! # Ok(()) }
//! ```
//!
extern crate core;

#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
pub use crate::async_commands::AsyncTsCommands;

pub use crate::commands::TsCommands;

pub use crate::types::{
TsAggregationType, TsDuplicatePolicy, TsFilterOptions, TsInfo, TsMget, TsMrange, TsOptions,
TsRange,
TsAggregationType, TsAlign, TsBucketTimestamp, TsDuplicatePolicy, TsFilterOptions, TsInfo,
TsMget, TsMrange, TsOptions, TsRange, TsRangeQuery,
};

#[cfg(any(feature = "tokio-comp", feature = "async-std-comp"))]
Expand Down
Loading

0 comments on commit fbebaaf

Please sign in to comment.