Skip to content

Commit

Permalink
LATEST impl with blanks to fill
Browse files Browse the repository at this point in the history
  • Loading branch information
progval committed Oct 26, 2024
1 parent 298a257 commit b7369a8
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 18 deletions.
121 changes: 106 additions & 15 deletions sable_history/src/pg_history_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ use std::collections::HashMap;

use diesel::dsl::sql;
use diesel::prelude::*;
use diesel_async::{AsyncConnection, AsyncPgConnection};
use futures::stream::StreamExt;
use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};

Check warning on line 6 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused import: `AsyncConnection`
use futures::stream::{StreamExt, TryStreamExt};
use tokio::sync::Mutex;
use uuid::Uuid;

use sable_network::prelude::*;

use crate::schema::channels;
use crate::schema::messages;

/// Implementation of [`HistoryService`] backed PostgreSQL
pub struct PgHistoryService<'a> {
Expand All @@ -28,22 +29,22 @@ impl<'a> PgHistoryService<'a> {
impl<'a> HistoryService for PgHistoryService<'a> {
async fn list_targets(
&self,
user: UserId,
after_ts: Option<i64>,
before_ts: Option<i64>,
limit: Option<usize>,
_user: UserId,
_after_ts: Option<i64>,
_before_ts: Option<i64>,
_limit: Option<usize>,
) -> HashMap<TargetId, i64> {
// TODO: access control
match diesel_async::RunQueryDsl::load_stream(
channels::dsl::channels.select((
// TODO: after_ts, before_ts, limit
match channels::dsl::channels
.select((
channels::dsl::id,
sql::<diesel::pg::sql_types::Uuid>(
"SELECT MAX(id) FROM messages WHERE target_channel=channels.id",
),
)),
&mut *self.database_connection.lock().await,
)
.await
))
.load_stream(&mut *self.database_connection.lock().await)
.await
{
Err(e) => {
tracing::error!("Could not get history channels: {e}");
Expand Down Expand Up @@ -75,11 +76,101 @@ impl<'a> HistoryService for PgHistoryService<'a> {

async fn get_entries(
&self,
user: UserId,
_user: UserId,
target: TargetId,
request: HistoryRequest,
) -> Result<impl IntoIterator<Item = HistoryLogEntry>, HistoryError> {
todo!();
Ok(vec![])
// TODO: access control
let TargetId::Channel(channel_id) = target else {
// TODO: PMs
return Err(HistoryError::InvalidTarget(target));
};

let mut connection_lock = self.database_connection.lock().await;

let db_channel_id = i64::try_from(channel_id.as_u64()).expect("channel id overflows u64");
if channels::dsl::channels
.find(db_channel_id)
.select(crate::models::Channel::as_select())
.first(&mut *connection_lock)
.await
.optional()
.expect("Could not check if channel exists")
.is_none()
{
return Err(HistoryError::InvalidTarget(target));
}

match request {
HistoryRequest::Latest { to_ts, limit } => {
let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX));
Ok(match to_ts {
Some(to_ts) => {
// Lowest UUIDv7 corresponding to the timestamp
let to_uuid = uuid::Builder::from_unix_timestamp_millis(
u64::try_from(to_ts)
.unwrap_or(u64::MIN) // floor timestamps to Epoch
.saturating_mul(1000),
&[u8::MIN; 10],
)
.into_uuid();
Box::new(
messages::dsl::messages
.filter(messages::dsl::target_channel.eq(db_channel_id))
.filter(messages::dsl::id.lt(to_uuid))
.order(messages::dsl::id.desc())
.limit(limit)
.load_stream(&mut *connection_lock),
)
}
None => Box::new(
messages::dsl::messages
.filter(messages::dsl::target_channel.eq(db_channel_id))
.order(messages::dsl::id.desc())
.limit(limit)
.load_stream(&mut *connection_lock),
),
}
.await
.expect("could not query messages")
.map(|row| {
let crate::models::Message {
id,
source_user,
target_channel,
text,
} = row?;
Ok(HistoryLogEntry {
id: (),

Check failure on line 144 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

mismatched types
details: NetworkStateChange::NewMessage(update::NewMessage {
message: (),

Check failure on line 146 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

mismatched types
source: (),

Check failure on line 147 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

mismatched types
target: (),

Check failure on line 148 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

mismatched types
}),
source_event: (),

Check failure on line 150 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

mismatched types
timestamp: (),

Check failure on line 151 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

mismatched types
})
})
.try_collect::<Vec<_>>()
.await
.expect("could not parse all records"))
}
HistoryRequest::Before { from_ts, limit } => {
todo!("before")
}
HistoryRequest::After { start_ts, limit } => {
todo!("after")
}
HistoryRequest::Around { around_ts, limit } => {
todo!("around")
}
HistoryRequest::Between {
start_ts,
end_ts,
limit,
} => {
todo!("between")
}
}
}
}
4 changes: 1 addition & 3 deletions sable_network/src/history/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,5 @@ pub trait HistoryService {
user: UserId,
target: TargetId,
request: HistoryRequest,
) -> impl Future<Output = Result<impl IntoIterator<Item = HistoryLogEntry>, HistoryError>>
+ Send
+ Sync;
) -> impl Future<Output = Result<impl IntoIterator<Item = HistoryLogEntry>, HistoryError>> + Send;
}

0 comments on commit b7369a8

Please sign in to comment.