From b7369a834e584d3e949e0ebdfb77b33c7e0fe32b Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sat, 26 Oct 2024 18:37:25 +0200 Subject: [PATCH] LATEST impl with blanks to fill --- sable_history/src/pg_history_service.rs | 121 +++++++++++++++++++++--- sable_network/src/history/service.rs | 4 +- 2 files changed, 107 insertions(+), 18 deletions(-) diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index 89590d43..ab65d19d 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -3,14 +3,15 @@ use std::collections::HashMap; use diesel::dsl::sql; use diesel::prelude::*; -use diesel_async::{AsyncConnection, AsyncPgConnection}; -use futures::stream::StreamExt; +use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl}; +use futures::stream::{StreamExt, TryStreamExt}; use tokio::sync::Mutex; use uuid::Uuid; use sable_network::prelude::*; use crate::schema::channels; +use crate::schema::messages; /// Implementation of [`HistoryService`] backed PostgreSQL pub struct PgHistoryService<'a> { @@ -28,22 +29,22 @@ impl<'a> PgHistoryService<'a> { impl<'a> HistoryService for PgHistoryService<'a> { async fn list_targets( &self, - user: UserId, - after_ts: Option, - before_ts: Option, - limit: Option, + _user: UserId, + _after_ts: Option, + _before_ts: Option, + _limit: Option, ) -> HashMap { // TODO: access control - match diesel_async::RunQueryDsl::load_stream( - channels::dsl::channels.select(( + // TODO: after_ts, before_ts, limit + match channels::dsl::channels + .select(( channels::dsl::id, sql::( "SELECT MAX(id) FROM messages WHERE target_channel=channels.id", ), - )), - &mut *self.database_connection.lock().await, - ) - .await + )) + .load_stream(&mut *self.database_connection.lock().await) + .await { Err(e) => { tracing::error!("Could not get history channels: {e}"); @@ -75,11 +76,101 @@ impl<'a> HistoryService for PgHistoryService<'a> { async fn get_entries( &self, - user: UserId, + _user: UserId, target: TargetId, request: HistoryRequest, ) -> Result, HistoryError> { - todo!(); - Ok(vec![]) + // TODO: access control + let TargetId::Channel(channel_id) = target else { + // TODO: PMs + return Err(HistoryError::InvalidTarget(target)); + }; + + let mut connection_lock = self.database_connection.lock().await; + + let db_channel_id = i64::try_from(channel_id.as_u64()).expect("channel id overflows u64"); + if channels::dsl::channels + .find(db_channel_id) + .select(crate::models::Channel::as_select()) + .first(&mut *connection_lock) + .await + .optional() + .expect("Could not check if channel exists") + .is_none() + { + return Err(HistoryError::InvalidTarget(target)); + } + + match request { + HistoryRequest::Latest { to_ts, limit } => { + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + Ok(match to_ts { + Some(to_ts) => { + // Lowest UUIDv7 corresponding to the timestamp + let to_uuid = uuid::Builder::from_unix_timestamp_millis( + u64::try_from(to_ts) + .unwrap_or(u64::MIN) // floor timestamps to Epoch + .saturating_mul(1000), + &[u8::MIN; 10], + ) + .into_uuid(); + Box::new( + messages::dsl::messages + .filter(messages::dsl::target_channel.eq(db_channel_id)) + .filter(messages::dsl::id.lt(to_uuid)) + .order(messages::dsl::id.desc()) + .limit(limit) + .load_stream(&mut *connection_lock), + ) + } + None => Box::new( + messages::dsl::messages + .filter(messages::dsl::target_channel.eq(db_channel_id)) + .order(messages::dsl::id.desc()) + .limit(limit) + .load_stream(&mut *connection_lock), + ), + } + .await + .expect("could not query messages") + .map(|row| { + let crate::models::Message { + id, + source_user, + target_channel, + text, + } = row?; + Ok(HistoryLogEntry { + id: (), + details: NetworkStateChange::NewMessage(update::NewMessage { + message: (), + source: (), + target: (), + }), + source_event: (), + timestamp: (), + }) + }) + .try_collect::>() + .await + .expect("could not parse all records")) + } + HistoryRequest::Before { from_ts, limit } => { + todo!("before") + } + HistoryRequest::After { start_ts, limit } => { + todo!("after") + } + HistoryRequest::Around { around_ts, limit } => { + todo!("around") + } + HistoryRequest::Between { + start_ts, + end_ts, + limit, + } => { + todo!("between") + } + } } } diff --git a/sable_network/src/history/service.rs b/sable_network/src/history/service.rs index 6707851e..f34d1e75 100644 --- a/sable_network/src/history/service.rs +++ b/sable_network/src/history/service.rs @@ -99,7 +99,5 @@ pub trait HistoryService { user: UserId, target: TargetId, request: HistoryRequest, - ) -> impl Future, HistoryError>> - + Send - + Sync; + ) -> impl Future, HistoryError>> + Send; }