diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index b2f39171..009f8430 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -112,10 +112,10 @@ impl<'a> HistoryService for PgHistoryService<'a> { historic_users::dsl::account_name, )) .filter(messages::dsl::target_channel.eq(db_channel_id)); - match request { + Ok(match request { HistoryRequest::Latest { to_ts, limit } => { let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); - Ok(match to_ts { + match to_ts { Some(to_ts) => { let to_ts = DateTime::from_timestamp(to_ts, 999_999) .unwrap_or(DateTime::::MIN_UTC) @@ -145,13 +145,50 @@ impl<'a> HistoryService for PgHistoryService<'a> { .expect("could not parse all records") .into_iter() .rev() // need to reverse *after* applying the SQL LIMIT - .collect::>()) + .collect::>() } HistoryRequest::Before { from_ts, limit } => { - todo!("before") + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + let from_ts = DateTime::from_timestamp(from_ts, 0) + .unwrap_or(DateTime::::MAX_UTC) + .naive_utc(); + Box::new( + base_query + .filter(messages::dsl::timestamp.lt(from_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) + .limit(limit) + .load_stream(&mut *connection_lock), + ) + .await + .expect("could not query messages") + .map_ok(|row| make_historical_event(&channel, row)) + .try_collect::>() + .await + .expect("could not parse all records") + .into_iter() + .rev() // need to reverse *after* applying the SQL LIMIT + .collect::>() } HistoryRequest::After { start_ts, limit } => { - todo!("after") + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + let start_ts = DateTime::from_timestamp(start_ts, 999_999) + .unwrap_or(DateTime::::MIN_UTC) + .naive_utc(); + Box::new( + base_query + .filter(messages::dsl::timestamp.gt(start_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp, messages::dsl::id)) + .limit(limit) + .load_stream(&mut *connection_lock), + ) + .await + .expect("could not query messages") + .map_ok(|row| make_historical_event(&channel, row)) + .try_collect::>() + .await + .expect("could not parse all records") } HistoryRequest::Around { around_ts, limit } => { todo!("around") @@ -161,9 +198,58 @@ impl<'a> HistoryService for PgHistoryService<'a> { end_ts, limit, } => { - todo!("between") + if start_ts <= end_ts { + let start_ts = DateTime::from_timestamp(start_ts, 999_999) + .unwrap_or(DateTime::::MIN_UTC) + .naive_utc(); + let end_ts = DateTime::from_timestamp(end_ts, 0) + .unwrap_or(DateTime::::MAX_UTC) + .naive_utc(); + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + Box::new( + base_query + .filter(messages::dsl::timestamp.gt(start_ts)) + .filter(messages::dsl::timestamp.lt(end_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp, messages::dsl::id)) + .limit(limit) + .load_stream(&mut *connection_lock), + ) + .await + .expect("could not query messages") + .map_ok(|row| make_historical_event(&channel, row)) + .try_collect::>() + .await + .expect("could not parse all records") + } else { + let start_ts = DateTime::from_timestamp(start_ts, 0) + .unwrap_or(DateTime::::MAX_UTC) + .naive_utc(); + let end_ts = DateTime::from_timestamp(end_ts, 999_999) + .unwrap_or(DateTime::::MIN_UTC) + .naive_utc(); + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + Box::new( + base_query + .filter(messages::dsl::timestamp.gt(end_ts)) + .filter(messages::dsl::timestamp.lt(start_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) + .limit(limit) + .load_stream(&mut *connection_lock), + ) + .await + .expect("could not query messages") + .map_ok(|row| make_historical_event(&channel, row)) + .try_collect::>() + .await + .expect("could not parse all records") + .into_iter() + .rev() // need to reverse *after* applying the SQL LIMIT + .collect::>() + } } - } + }) } }