From 9bbad43bb9e3dba185cc51dfd89244805f0acf3c Mon Sep 17 00:00:00 2001 From: ISickMan Date: Sat, 4 Mar 2023 17:57:56 +0200 Subject: [PATCH] aggregation query for async commands + tests --- src/async_commands.rs | 127 +++++++------------------------ tests/async_command_tests/mod.rs | 57 ++++++-------- 2 files changed, 53 insertions(+), 131 deletions(-) diff --git a/src/async_commands.rs b/src/async_commands.rs index 8feb986..c041d69 100644 --- a/src/async_commands.rs +++ b/src/async_commands.rs @@ -312,8 +312,9 @@ pub trait AsyncTsCommands: ConnectionLike + Send + Sized { /// Returns the latest (current) value from multiple redis time series. fn ts_mget< - TS: std::default::Default + FromRedisValue, - V: std::default::Default + FromRedisValue, + 'a, + TS: Default + FromRedisValue + 'a, + V: Default + FromRedisValue + 'a, >( &mut self, filter_options: TsFilterOptions, @@ -325,158 +326,88 @@ pub trait AsyncTsCommands: ConnectionLike + Send + Sized { fn range< 'a, K: ToRedisArgs + Send + Sync + 'a, - FTS: ToRedisArgs + Send + Sync + 'a, - TTS: ToRedisArgs + Send + Sync + 'a, - C: ToRedisArgs + Send + Sync + 'a, - TS: std::marker::Copy + FromRedisValue, - V: std::marker::Copy + FromRedisValue, + TS: Default + FromRedisValue + Copy, + V: Default + FromRedisValue + Copy, >( &'a mut self, command: &str, key: K, - from_timestamp: FTS, - to_timestamp: TTS, - count: Option, - aggregation_type: Option, + query: TsRangeQuery, ) -> RedisFuture> { let mut c = cmd(command); - c.arg(key).arg(from_timestamp).arg(to_timestamp); - if let Some(ct) = count { - c.arg("COUNT").arg(ct); - } - Box::pin(async move { c.arg(aggregation_type).query_async(self).await }) + c.arg(key).arg(query); + Box::pin(async move { c.query_async(self).await }) } /// Executes a redis time series range query. fn ts_range< 'a, K: ToRedisArgs + Send + Sync + 'a, - FTS: ToRedisArgs + Send + Sync + 'a, - TTS: ToRedisArgs + Send + Sync + 'a, - C: ToRedisArgs + Send + Sync + 'a, - TS: std::marker::Copy + FromRedisValue, - V: std::marker::Copy + FromRedisValue, + TS: Default + FromRedisValue + Copy, + V: Default + FromRedisValue + Copy, >( &'a mut self, key: K, - from_timestamp: FTS, - to_timestamp: TTS, - count: Option, - aggregation_type: Option, + query: TsRangeQuery, ) -> RedisFuture> { - self.range( - "TS.RANGE", - key, - from_timestamp, - to_timestamp, - count, - aggregation_type, - ) + self.range("TS.RANGE", key, query) } /// Executes a redis time series revrange query. fn ts_revrange< 'a, K: ToRedisArgs + Send + Sync + 'a, - FTS: ToRedisArgs + Send + Sync + 'a, - TTS: ToRedisArgs + Send + Sync + 'a, - C: ToRedisArgs + Send + Sync + 'a, - TS: std::marker::Copy + FromRedisValue, - V: std::marker::Copy + FromRedisValue, + TS: Default + FromRedisValue + Copy, + V: Default + FromRedisValue + Copy, >( &'a mut self, key: K, - from_timestamp: FTS, - to_timestamp: TTS, - count: Option, - aggregation_type: Option, + query: TsRangeQuery, ) -> RedisFuture> { - self.range( - "TS.REVRANGE", - key, - from_timestamp, - to_timestamp, - count, - aggregation_type, - ) + self.range("TS.REVRANGE", key, query) } #[doc(hidden)] fn mrange< 'a, - FTS: ToRedisArgs + Send + Sync + 'a, - TTS: ToRedisArgs + Send + Sync + 'a, - C: ToRedisArgs + Send + Sync + 'a, - TS: std::default::Default + FromRedisValue + Copy, - V: std::default::Default + FromRedisValue + Copy, + TS: Default + FromRedisValue + Copy, + V: Default + FromRedisValue + Copy, >( &mut self, command: &str, - from_timestamp: FTS, - to_timestamp: TTS, - count: Option, - aggregation_type: Option, + query: TsRangeQuery, filter_options: TsFilterOptions, ) -> RedisFuture> { let mut c = cmd(command); - c.arg(from_timestamp).arg(to_timestamp); - if let Some(ct) = count { - c.arg("COUNT").arg(ct); - } - c.arg(aggregation_type).arg(filter_options); + c.arg(query).arg(filter_options); + Box::pin(async move { c.query_async(self).await }) } /// Executes multiple redis time series range queries. fn ts_mrange< 'a, - FTS: ToRedisArgs + Send + Sync + 'a, - TTS: ToRedisArgs + Send + Sync + 'a, - C: ToRedisArgs + Send + Sync + 'a, - TS: std::default::Default + FromRedisValue + Copy, - V: std::default::Default + FromRedisValue + Copy, + TS: Default + FromRedisValue + Copy, + V: Default + FromRedisValue + Copy, >( &mut self, - from_timestamp: FTS, - to_timestamp: TTS, - count: Option, - aggregation_type: Option, + query: TsRangeQuery, filter_options: TsFilterOptions, ) -> RedisFuture> { - self.mrange( - "TS.MRANGE", - from_timestamp, - to_timestamp, - count, - aggregation_type, - filter_options, - ) + self.mrange("TS.MRANGE", query, filter_options) } /// Executes multiple redis time series revrange queries. fn ts_mrevrange< 'a, - FTS: ToRedisArgs + Send + Sync + 'a, - TTS: ToRedisArgs + Send + Sync + 'a, - C: ToRedisArgs + Send + Sync + 'a, - TS: std::default::Default + FromRedisValue + Copy, - V: std::default::Default + FromRedisValue + Copy, + TS: Default + FromRedisValue + Copy, + V: Default + FromRedisValue + Copy, >( &mut self, - from_timestamp: FTS, - to_timestamp: TTS, - count: Option, - aggregation_type: Option, + query: TsRangeQuery, filter_options: TsFilterOptions, ) -> RedisFuture> { - self.mrange( - "TS.MREVRANGE", - from_timestamp, - to_timestamp, - count, - aggregation_type, - filter_options, - ) + self.mrange("TS.MREVRANGE", query, filter_options) } /// Returns a filtered list of redis time series keys. diff --git a/tests/async_command_tests/mod.rs b/tests/async_command_tests/mod.rs index ee85e2c..10cc166 100644 --- a/tests/async_command_tests/mod.rs +++ b/tests/async_command_tests/mod.rs @@ -7,7 +7,7 @@ use redis::AsyncCommands; use redis_ts::AsyncTsCommands; use redis_ts::{ TsAggregationType, TsDuplicatePolicy, TsFilterOptions, TsInfo, TsMget, TsMrange, TsOptions, - TsRange, + TsRange, TsRangeQuery }; use std::env; use std::thread; @@ -349,17 +349,19 @@ pub async fn ts_range(name: &str) { .await .unwrap(); + let query = TsRangeQuery::default(); + let res: TsRange = con - .ts_range(name, "-", "+", None::, None) + .ts_range(name, query.clone()) .await .unwrap(); assert_eq!(res.values, vec![(12, 1.0), (123, 2.0), (1234, 3.0)]); - let one_res: TsRange = con.ts_range(name, "-", "+", Some(1), None).await.unwrap(); + let one_res: TsRange = con.ts_range(name, query.clone().count(1)).await.unwrap(); assert_eq!(one_res.values, vec![(12, 1.0)]); let range_res: TsRange = con - .ts_range(name, 12, 123, None::, None) + .ts_range(name, query.clone().filter_by_ts(vec![12, 123])) .await .unwrap(); assert_eq!(range_res.values, vec![(12, 1.0), (123, 2.0)]); @@ -367,17 +369,14 @@ pub async fn ts_range(name: &str) { let sum: TsRange = con .ts_range( name, - 12, - 123, - None::, - Some(TsAggregationType::Sum(10000)), + query.clone().filter_by_ts(vec![12, 123]).aggregation_type(TsAggregationType::Sum(10000)) ) .await .unwrap(); assert_eq!(sum.values, vec![(0, 3.0)]); let res: TsRange = con - .ts_range(name2, "-", "+", None::, None) + .ts_range(name2, query.clone()) .await .unwrap(); assert_eq!(res.values, vec![]); @@ -392,20 +391,23 @@ pub async fn ts_revrange(name: &str) { .await .unwrap(); + let query = TsRangeQuery::default(); + + let res: TsRange = con - .ts_revrange(name, "-", "+", None::, None) + .ts_revrange(name, query.clone()) .await .unwrap(); assert_eq!(res.values, vec![(1234, 3.0), (123, 2.0), (12, 1.0)]); let one_res: TsRange = con - .ts_revrange(name, "-", "+", Some(1), None) + .ts_revrange(name, query.clone().count(1)) .await .unwrap(); assert_eq!(one_res.values, vec![(1234, 3.0)]); let range_res: TsRange = con - .ts_revrange(name, 12, 123, None::, None) + .ts_revrange(name, query.clone().filter_by_ts(vec![12, 123])) .await .unwrap(); assert_eq!(range_res.values, vec![(123, 2.0), (12, 1.0)]); @@ -413,17 +415,14 @@ pub async fn ts_revrange(name: &str) { let sum: TsRange = con .ts_revrange( name, - 12, - 123, - None::, - Some(TsAggregationType::Sum(10000)), + query.clone().filter_by_ts(vec![12, 123]).aggregation_type(TsAggregationType::Sum(10000)) ) .await .unwrap(); assert_eq!(sum.values, vec![(0, 3.0)]); let res: TsRange = con - .ts_revrange(name2, "-", "+", None::, None) + .ts_revrange(name2, query.clone()) .await .unwrap(); assert_eq!(res.values, vec![]); @@ -451,12 +450,11 @@ pub async fn ts_mrange(name: &str) { .await .unwrap(); + let query = TsRangeQuery::default(); + let res: TsMrange = con .ts_mrange( - "-", - "+", - None::, - None, + query.clone(), TsFilterOptions::default() .equals("l", label) .with_labels(true), @@ -477,10 +475,7 @@ pub async fn ts_mrange(name: &str) { let res2: TsMrange = con .ts_mrange( - "-", - "+", - None::, - None, + query.clone(), TsFilterOptions::default() .equals("none", "existing") .with_labels(true), @@ -512,12 +507,11 @@ pub async fn ts_mrevrange(name: &str) { .await .unwrap(); + let query = TsRangeQuery::default(); + let res: TsMrange = con .ts_mrevrange( - "-", - "+", - None::, - None, + query.clone(), TsFilterOptions::default() .equals("l", label) .with_labels(true), @@ -538,10 +532,7 @@ pub async fn ts_mrevrange(name: &str) { let res2: TsMrange = con .ts_mrevrange( - "-", - "+", - None::, - None, + query.clone(), TsFilterOptions::default() .equals("none", "existing") .with_labels(true),