Skip to content

Commit

Permalink
aggregation query for async commands + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
avaziman committed Mar 4, 2023
1 parent 4fa3bf0 commit 9bbad43
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 131 deletions.
127 changes: 29 additions & 98 deletions src/async_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,9 @@ pub trait AsyncTsCommands: ConnectionLike + Send + Sized {

/// Returns the latest (current) value from multiple redis time series.
fn ts_mget<
TS: std::default::Default + FromRedisValue,
V: std::default::Default + FromRedisValue,
'a,
TS: Default + FromRedisValue + 'a,
V: Default + FromRedisValue + 'a,
>(
&mut self,
filter_options: TsFilterOptions,
Expand All @@ -325,158 +326,88 @@ pub trait AsyncTsCommands: ConnectionLike + Send + Sized {
fn range<
'a,
K: ToRedisArgs + Send + Sync + 'a,
FTS: ToRedisArgs + Send + Sync + 'a,
TTS: ToRedisArgs + Send + Sync + 'a,
C: ToRedisArgs + Send + Sync + 'a,
TS: std::marker::Copy + FromRedisValue,
V: std::marker::Copy + FromRedisValue,
TS: Default + FromRedisValue + Copy,
V: Default + FromRedisValue + Copy,
>(
&'a mut self,
command: &str,
key: K,
from_timestamp: FTS,
to_timestamp: TTS,
count: Option<C>,
aggregation_type: Option<TsAggregationType>,
query: TsRangeQuery,
) -> RedisFuture<TsRange<TS, V>> {
let mut c = cmd(command);
c.arg(key).arg(from_timestamp).arg(to_timestamp);
if let Some(ct) = count {
c.arg("COUNT").arg(ct);
}
Box::pin(async move { c.arg(aggregation_type).query_async(self).await })
c.arg(key).arg(query);
Box::pin(async move { c.query_async(self).await })
}

/// Executes a redis time series range query.
fn ts_range<
'a,
K: ToRedisArgs + Send + Sync + 'a,
FTS: ToRedisArgs + Send + Sync + 'a,
TTS: ToRedisArgs + Send + Sync + 'a,
C: ToRedisArgs + Send + Sync + 'a,
TS: std::marker::Copy + FromRedisValue,
V: std::marker::Copy + FromRedisValue,
TS: Default + FromRedisValue + Copy,
V: Default + FromRedisValue + Copy,
>(
&'a mut self,
key: K,
from_timestamp: FTS,
to_timestamp: TTS,
count: Option<C>,
aggregation_type: Option<TsAggregationType>,
query: TsRangeQuery,
) -> RedisFuture<TsRange<TS, V>> {
self.range(
"TS.RANGE",
key,
from_timestamp,
to_timestamp,
count,
aggregation_type,
)
self.range("TS.RANGE", key, query)
}

/// Executes a redis time series revrange query.
fn ts_revrange<
'a,
K: ToRedisArgs + Send + Sync + 'a,
FTS: ToRedisArgs + Send + Sync + 'a,
TTS: ToRedisArgs + Send + Sync + 'a,
C: ToRedisArgs + Send + Sync + 'a,
TS: std::marker::Copy + FromRedisValue,
V: std::marker::Copy + FromRedisValue,
TS: Default + FromRedisValue + Copy,
V: Default + FromRedisValue + Copy,
>(
&'a mut self,
key: K,
from_timestamp: FTS,
to_timestamp: TTS,
count: Option<C>,
aggregation_type: Option<TsAggregationType>,
query: TsRangeQuery,
) -> RedisFuture<TsRange<TS, V>> {
self.range(
"TS.REVRANGE",
key,
from_timestamp,
to_timestamp,
count,
aggregation_type,
)
self.range("TS.REVRANGE", key, query)
}

#[doc(hidden)]
fn mrange<
'a,
FTS: ToRedisArgs + Send + Sync + 'a,
TTS: ToRedisArgs + Send + Sync + 'a,
C: ToRedisArgs + Send + Sync + 'a,
TS: std::default::Default + FromRedisValue + Copy,
V: std::default::Default + FromRedisValue + Copy,
TS: Default + FromRedisValue + Copy,
V: Default + FromRedisValue + Copy,
>(
&mut self,
command: &str,
from_timestamp: FTS,
to_timestamp: TTS,
count: Option<C>,
aggregation_type: Option<TsAggregationType>,
query: TsRangeQuery,
filter_options: TsFilterOptions,
) -> RedisFuture<TsMrange<TS, V>> {
let mut c = cmd(command);
c.arg(from_timestamp).arg(to_timestamp);
if let Some(ct) = count {
c.arg("COUNT").arg(ct);
}
c.arg(aggregation_type).arg(filter_options);
c.arg(query).arg(filter_options);

Box::pin(async move { c.query_async(self).await })
}

/// Executes multiple redis time series range queries.
fn ts_mrange<
'a,
FTS: ToRedisArgs + Send + Sync + 'a,
TTS: ToRedisArgs + Send + Sync + 'a,
C: ToRedisArgs + Send + Sync + 'a,
TS: std::default::Default + FromRedisValue + Copy,
V: std::default::Default + FromRedisValue + Copy,
TS: Default + FromRedisValue + Copy,
V: Default + FromRedisValue + Copy,
>(
&mut self,
from_timestamp: FTS,
to_timestamp: TTS,
count: Option<C>,
aggregation_type: Option<TsAggregationType>,
query: TsRangeQuery,
filter_options: TsFilterOptions,
) -> RedisFuture<TsMrange<TS, V>> {
self.mrange(
"TS.MRANGE",
from_timestamp,
to_timestamp,
count,
aggregation_type,
filter_options,
)
self.mrange("TS.MRANGE", query, filter_options)
}

/// Executes multiple redis time series revrange queries.
fn ts_mrevrange<
'a,
FTS: ToRedisArgs + Send + Sync + 'a,
TTS: ToRedisArgs + Send + Sync + 'a,
C: ToRedisArgs + Send + Sync + 'a,
TS: std::default::Default + FromRedisValue + Copy,
V: std::default::Default + FromRedisValue + Copy,
TS: Default + FromRedisValue + Copy,
V: Default + FromRedisValue + Copy,
>(
&mut self,
from_timestamp: FTS,
to_timestamp: TTS,
count: Option<C>,
aggregation_type: Option<TsAggregationType>,
query: TsRangeQuery,
filter_options: TsFilterOptions,
) -> RedisFuture<TsMrange<TS, V>> {
self.mrange(
"TS.MREVRANGE",
from_timestamp,
to_timestamp,
count,
aggregation_type,
filter_options,
)
self.mrange("TS.MREVRANGE", query, filter_options)
}

/// Returns a filtered list of redis time series keys.
Expand Down
57 changes: 24 additions & 33 deletions tests/async_command_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use redis::AsyncCommands;
use redis_ts::AsyncTsCommands;
use redis_ts::{
TsAggregationType, TsDuplicatePolicy, TsFilterOptions, TsInfo, TsMget, TsMrange, TsOptions,
TsRange,
TsRange, TsRangeQuery
};
use std::env;
use std::thread;
Expand Down Expand Up @@ -349,35 +349,34 @@ pub async fn ts_range(name: &str) {
.await
.unwrap();

let query = TsRangeQuery::default();

let res: TsRange<u64, f64> = con
.ts_range(name, "-", "+", None::<usize>, None)
.ts_range(name, query.clone())
.await
.unwrap();
assert_eq!(res.values, vec![(12, 1.0), (123, 2.0), (1234, 3.0)]);

let one_res: TsRange<u64, f64> = con.ts_range(name, "-", "+", Some(1), None).await.unwrap();
let one_res: TsRange<u64, f64> = con.ts_range(name, query.clone().count(1)).await.unwrap();
assert_eq!(one_res.values, vec![(12, 1.0)]);

let range_res: TsRange<u64, f64> = con
.ts_range(name, 12, 123, None::<usize>, None)
.ts_range(name, query.clone().filter_by_ts(vec![12, 123]))
.await
.unwrap();
assert_eq!(range_res.values, vec![(12, 1.0), (123, 2.0)]);

let sum: TsRange<u64, f64> = con
.ts_range(
name,
12,
123,
None::<usize>,
Some(TsAggregationType::Sum(10000)),
query.clone().filter_by_ts(vec![12, 123]).aggregation_type(TsAggregationType::Sum(10000))
)
.await
.unwrap();
assert_eq!(sum.values, vec![(0, 3.0)]);

let res: TsRange<u64, f64> = con
.ts_range(name2, "-", "+", None::<usize>, None)
.ts_range(name2, query.clone())
.await
.unwrap();
assert_eq!(res.values, vec![]);
Expand All @@ -392,38 +391,38 @@ pub async fn ts_revrange(name: &str) {
.await
.unwrap();

let query = TsRangeQuery::default();


let res: TsRange<u64, f64> = con
.ts_revrange(name, "-", "+", None::<usize>, None)
.ts_revrange(name, query.clone())
.await
.unwrap();
assert_eq!(res.values, vec![(1234, 3.0), (123, 2.0), (12, 1.0)]);

let one_res: TsRange<u64, f64> = con
.ts_revrange(name, "-", "+", Some(1), None)
.ts_revrange(name, query.clone().count(1))
.await
.unwrap();
assert_eq!(one_res.values, vec![(1234, 3.0)]);

let range_res: TsRange<u64, f64> = con
.ts_revrange(name, 12, 123, None::<usize>, None)
.ts_revrange(name, query.clone().filter_by_ts(vec![12, 123]))
.await
.unwrap();
assert_eq!(range_res.values, vec![(123, 2.0), (12, 1.0)]);

let sum: TsRange<u64, f64> = con
.ts_revrange(
name,
12,
123,
None::<usize>,
Some(TsAggregationType::Sum(10000)),
query.clone().filter_by_ts(vec![12, 123]).aggregation_type(TsAggregationType::Sum(10000))
)
.await
.unwrap();
assert_eq!(sum.values, vec![(0, 3.0)]);

let res: TsRange<u64, f64> = con
.ts_revrange(name2, "-", "+", None::<usize>, None)
.ts_revrange(name2, query.clone())
.await
.unwrap();
assert_eq!(res.values, vec![]);
Expand Down Expand Up @@ -451,12 +450,11 @@ pub async fn ts_mrange(name: &str) {
.await
.unwrap();

let query = TsRangeQuery::default();

let res: TsMrange<u64, f64> = con
.ts_mrange(
"-",
"+",
None::<usize>,
None,
query.clone(),
TsFilterOptions::default()
.equals("l", label)
.with_labels(true),
Expand All @@ -477,10 +475,7 @@ pub async fn ts_mrange(name: &str) {

let res2: TsMrange<u64, f64> = con
.ts_mrange(
"-",
"+",
None::<usize>,
None,
query.clone(),
TsFilterOptions::default()
.equals("none", "existing")
.with_labels(true),
Expand Down Expand Up @@ -512,12 +507,11 @@ pub async fn ts_mrevrange(name: &str) {
.await
.unwrap();

let query = TsRangeQuery::default();

let res: TsMrange<u64, f64> = con
.ts_mrevrange(
"-",
"+",
None::<usize>,
None,
query.clone(),
TsFilterOptions::default()
.equals("l", label)
.with_labels(true),
Expand All @@ -538,10 +532,7 @@ pub async fn ts_mrevrange(name: &str) {

let res2: TsMrange<u64, f64> = con
.ts_mrevrange(
"-",
"+",
None::<usize>,
None,
query.clone(),
TsFilterOptions::default()
.equals("none", "existing")
.with_labels(true),
Expand Down

0 comments on commit 9bbad43

Please sign in to comment.