From 298a25747fbae3102ef4ca9c24930e841b94b9e1 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sat, 26 Oct 2024 17:00:25 +0200 Subject: [PATCH 01/22] [WIP] Query history from history server --- Cargo.lock | 2 + sable_history/Cargo.toml | 3 +- sable_history/src/lib.rs | 2 + sable_history/src/pg_history_service.rs | 85 +++++++++++++ sable_history/src/server/mod.rs | 64 +++++++++- sable_history/src/server/sync.rs | 12 ++ sable_history/src/server/update_handler.rs | 1 + sable_ircd/src/capability/account_tag.rs | 1 + .../src/command/handlers/chathistory.rs | 4 +- sable_ircd/src/command/handlers/register.rs | 2 +- .../src/command/plumbing/argument_wrappers.rs | 2 +- sable_ircd/src/messages/send_history.rs | 2 + sable_ircd/src/server/server_type.rs | 9 +- sable_network/Cargo.toml | 3 +- sable_network/src/history/local_service.rs | 24 ++-- sable_network/src/history/log.rs | 1 + sable_network/src/history/mod.rs | 6 +- sable_network/src/history/remote_service.rs | 112 ++++++++++++++++++ sable_network/src/history/service.rs | 5 +- sable_network/src/history/tiered_service.rs | 84 +++++++++++++ sable_network/src/network/event/details.rs | 6 +- .../src/network/network/accessors.rs | 16 ++- .../src/network/network/account_state.rs | 4 +- .../src/network/network/history_state.rs | 16 +++ sable_network/src/network/network/mod.rs | 6 +- .../src/network/network/server_state.rs | 8 ++ sable_network/src/network/update.rs | 4 + sable_network/src/node/mod.rs | 11 ++ sable_network/src/node/update_receiver.rs | 5 + sable_network/src/policy/mod.rs | 2 + sable_network/src/rpc/network_message.rs | 44 +++++++ sable_server/src/server.rs | 2 +- sable_server/src/server_type.rs | 5 +- sable_services/src/server/mod.rs | 14 ++- sable_services/src/server/sync.rs | 2 +- 35 files changed, 529 insertions(+), 40 deletions(-) create mode 100644 sable_history/src/pg_history_service.rs create mode 100644 sable_history/src/server/sync.rs create mode 100644 sable_network/src/history/remote_service.rs create mode 100644 sable_network/src/history/tiered_service.rs create mode 100644 sable_network/src/network/network/history_state.rs diff --git a/Cargo.lock b/Cargo.lock index 757dbe23..9fa4a232 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2379,6 +2379,7 @@ dependencies = [ "clap 4.5.20", "diesel", "diesel-async", + "futures", "parking_lot 0.12.3", "sable_network", "sable_server", @@ -2454,6 +2455,7 @@ dependencies = [ "ambassador", "anyhow", "arrayvec", + "async-trait", "backoff", "bitflags 1.3.2", "built", diff --git a/sable_history/Cargo.toml b/sable_history/Cargo.toml index a096bbd1..34d29732 100644 --- a/sable_history/Cargo.toml +++ b/sable_history/Cargo.toml @@ -11,6 +11,7 @@ built = { version = "0.5", features = [ "git2" ] } sable_network = { path = "../sable_network" } sable_server = { path = "../sable_server" } +futures = "0.3" tokio = { version = "1.14", features = [ "full" ] } serde = { version = "1", features = [ "derive" ] } serde_with = "1.11" @@ -25,4 +26,4 @@ chrono = "0.4" uuid = { version = "1.9.1", features = ["v7", "fast-rng", "serde"] } diesel = { version = "2.2", features = [ "postgres", "chrono", "uuid" ] } -diesel-async = { version = "0.5", features = [ "postgres" ] } \ No newline at end of file +diesel-async = { version = "0.5", features = [ "postgres" ] } diff --git a/sable_history/src/lib.rs b/sable_history/src/lib.rs index 440bcb46..36c941cc 100644 --- a/sable_history/src/lib.rs +++ b/sable_history/src/lib.rs @@ -1,3 +1,5 @@ +mod pg_history_service; +pub use pg_history_service::PgHistoryService; mod server; pub use server::*; diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs new file mode 100644 index 00000000..89590d43 --- /dev/null +++ b/sable_history/src/pg_history_service.rs @@ -0,0 +1,85 @@ +use std::borrow::BorrowMut; +use std::collections::HashMap; + +use diesel::dsl::sql; +use diesel::prelude::*; +use diesel_async::{AsyncConnection, AsyncPgConnection}; +use futures::stream::StreamExt; +use tokio::sync::Mutex; +use uuid::Uuid; + +use sable_network::prelude::*; + +use crate::schema::channels; + +/// Implementation of [`HistoryService`] backed PostgreSQL +pub struct PgHistoryService<'a> { + database_connection: &'a Mutex, +} + +impl<'a> PgHistoryService<'a> { + pub fn new(database_connection: &'a Mutex) -> Self { + Self { + database_connection, + } + } +} + +impl<'a> HistoryService for PgHistoryService<'a> { + async fn list_targets( + &self, + user: UserId, + after_ts: Option, + before_ts: Option, + limit: Option, + ) -> HashMap { + // TODO: access control + match diesel_async::RunQueryDsl::load_stream( + channels::dsl::channels.select(( + channels::dsl::id, + sql::( + "SELECT MAX(id) FROM messages WHERE target_channel=channels.id", + ), + )), + &mut *self.database_connection.lock().await, + ) + .await + { + Err(e) => { + tracing::error!("Could not get history channels: {e}"); + return HashMap::new(); + } + Ok(rows) => { + rows.map(|row| row.expect("Could not deserialize row")) + .map( + |(channel_id, max_message_id): (i64, Uuid)| -> (TargetId, i64) { + let (seconds, _) = max_message_id + .get_timestamp() + .expect("messages.id is not a UUID7") + .to_unix(); + ( + TargetId::Channel(ChannelId::from(Snowflake::from( + u64::try_from(channel_id).expect("channel id is negative"), + ))), + seconds + .try_into() + .expect("message's UNIX timestamp is negative"), + ) + }, + ) + .collect() + .await + } + } + } + + async fn get_entries( + &self, + user: UserId, + target: TargetId, + request: HistoryRequest, + ) -> Result, HistoryError> { + todo!(); + Ok(vec![]) + } +} diff --git a/sable_history/src/server/mod.rs b/sable_history/src/server/mod.rs index 24de94cd..e9eb11c7 100644 --- a/sable_history/src/server/mod.rs +++ b/sable_history/src/server/mod.rs @@ -5,11 +5,13 @@ use sable_network::prelude::*; use sable_server::ServerType; use serde::Deserialize; use tokio::sync::{mpsc::UnboundedReceiver, Mutex}; +use tracing::instrument; use std::sync::Arc; use diesel_async::{AsyncConnection, AsyncPgConnection}; +mod sync; mod update_handler; #[derive(Debug, Clone, Deserialize)] @@ -20,7 +22,7 @@ pub struct HistoryServerConfig { pub struct HistoryServer { node: Arc, history_receiver: Mutex>, - database_connection: Mutex, + database_connection: Mutex, // TODO: use a connection pool } impl ServerType for HistoryServer { @@ -69,6 +71,14 @@ impl ServerType for HistoryServer { { let Some(update) = update else { break; }; + if let NetworkStateChange::NewServer(new_server) = &update.change + { + if new_server.server == self.node.id() + { + self.burst_to_network().await; + } + } + if let Err(error) = self.handle_history_update(update).await { tracing::error!(?error, "Error return handling history update"); } @@ -94,10 +104,56 @@ impl ServerType for HistoryServer { unimplemented!("history servers can't hot-upgrade"); } - fn handle_remote_command( + #[instrument(skip_all)] + async fn handle_remote_command( &self, - _request: sable_network::rpc::RemoteServerRequestType, + req: sable_network::rpc::RemoteServerRequestType, ) -> sable_network::rpc::RemoteServerResponse { - todo!() + tracing::debug!(?req, "Got remote request"); + + use crate::server::rpc::RemoteServerRequestType::*; + use sable_network::rpc::RemoteServerResponse; + + match req { + History(req) => { + use crate::server::rpc::RemoteHistoryServerRequestType::*; + use crate::server::rpc::RemoteHistoryServerResponse::*; + + let history_service = crate::PgHistoryService::new(&self.database_connection); + match req { + ListTargets { + user, + after_ts, + before_ts, + limit, + } => TargetList( + history_service + .list_targets(user, after_ts, before_ts, limit) + .await + .into_iter() + .collect(), + ), + GetEntries { + user, + target, + request, + } => Entries( + history_service + .get_entries(user, target, request) + .await + .map(|entries| entries.into_iter().collect()), + ), + } + .into() + } + Services(_) => { + tracing::warn!(?req, "Got unsupported request (services)"); + RemoteServerResponse::NotSupported + } + Ping => { + tracing::warn!(?req, "Got unsupported request (ping)"); + RemoteServerResponse::NotSupported + } + } } } diff --git a/sable_history/src/server/sync.rs b/sable_history/src/server/sync.rs new file mode 100644 index 00000000..651983c5 --- /dev/null +++ b/sable_history/src/server/sync.rs @@ -0,0 +1,12 @@ +use sable_network::network::wrapper::ObjectWrapper; + +use super::*; +use crate::server::event::IntroduceHistoryServer; + +impl HistoryServer { + pub(super) async fn burst_to_network(&self) { + // Set ourselves as the active history node + self.node + .submit_event(self.node.id(), IntroduceHistoryServer {}); + } +} diff --git a/sable_history/src/server/update_handler.rs b/sable_history/src/server/update_handler.rs index 23f5480f..3e4e6800 100644 --- a/sable_history/src/server/update_handler.rs +++ b/sable_history/src/server/update_handler.rs @@ -34,6 +34,7 @@ impl HistoryServer { | NetworkStateChange::ServerQuit(_) | NetworkStateChange::NewAuditLogEntry(_) | NetworkStateChange::UserLoginChange(_) + | NetworkStateChange::HistoryServerUpdate(_) | NetworkStateChange::ServicesUpdate(_) | NetworkStateChange::EventComplete(_) => Ok(()), } diff --git a/sable_ircd/src/capability/account_tag.rs b/sable_ircd/src/capability/account_tag.rs index 30af9595..f182dfac 100644 --- a/sable_ircd/src/capability/account_tag.rs +++ b/sable_ircd/src/capability/account_tag.rs @@ -27,6 +27,7 @@ fn account_for_tag(update: &NetworkStateChange, net: &Network) -> Option NetworkStateChange::NewServer(_) => None, NetworkStateChange::ServerQuit(_) => None, NetworkStateChange::NewAuditLogEntry(_) => None, + NetworkStateChange::HistoryServerUpdate(_) => None, NetworkStateChange::ServicesUpdate(_) => None, NetworkStateChange::EventComplete(_) => None, }?; diff --git a/sable_ircd/src/command/handlers/chathistory.rs b/sable_ircd/src/command/handlers/chathistory.rs index bf46a4ee..3b5a81f7 100644 --- a/sable_ircd/src/command/handlers/chathistory.rs +++ b/sable_ircd/src/command/handlers/chathistory.rs @@ -136,7 +136,7 @@ async fn handle_chathistory( } }; - let history_service = LocalHistoryService::new(server.node()); + let history_service = server.node().history_service(); match history_service .get_entries(source.id(), target_id, request) .await @@ -160,7 +160,7 @@ async fn list_targets<'a>( to_ts: Option, limit: Option, ) { - let history_service = LocalHistoryService::new(server.node()); + let history_service = server.node().history_service(); let found_targets = history_service .list_targets(source.id(), to_ts, from_ts, limit) diff --git a/sable_ircd/src/command/handlers/register.rs b/sable_ircd/src/command/handlers/register.rs index 06ab5304..d7b1f3a2 100644 --- a/sable_ircd/src/command/handlers/register.rs +++ b/sable_ircd/src/command/handlers/register.rs @@ -35,7 +35,7 @@ async fn do_register_user( _email: &str, password: &str, ) -> CommandResult { - let Some(services_name) = network.current_services_name() else { + let Some(services_name) = network.current_services_server_name() else { response_to.send(message::Fail::new( "REGISTER", "TEMPORARILY_UNAVAILABLE", diff --git a/sable_ircd/src/command/plumbing/argument_wrappers.rs b/sable_ircd/src/command/plumbing/argument_wrappers.rs index 73d93603..3adf34b9 100644 --- a/sable_ircd/src/command/plumbing/argument_wrappers.rs +++ b/sable_ircd/src/command/plumbing/argument_wrappers.rs @@ -18,7 +18,7 @@ impl<'a> AmbientArgument<'a> for ServicesTarget<'a> { Ok(Self { name: ctx .network() - .current_services_name() + .current_services_server_name() .ok_or(CommandError::ServicesNotAvailable)?, server: ctx.server(), }) diff --git a/sable_ircd/src/messages/send_history.rs b/sable_ircd/src/messages/send_history.rs index 282ca9c1..10c245c9 100644 --- a/sable_ircd/src/messages/send_history.rs +++ b/sable_ircd/src/messages/send_history.rs @@ -50,6 +50,7 @@ impl SendHistoryItem for ClientServer { | NetworkStateChange::ServerQuit(_) | NetworkStateChange::NewAuditLogEntry(_) | NetworkStateChange::UserLoginChange(_) + | NetworkStateChange::HistoryServerUpdate(_) | NetworkStateChange::ServicesUpdate(_) | NetworkStateChange::EventComplete(_) => Ok(()), } @@ -86,6 +87,7 @@ impl SendHistoryItem for ClientServer { | NetworkStateChange::ServerQuit(_) | NetworkStateChange::NewAuditLogEntry(_) | NetworkStateChange::UserLoginChange(_) + | NetworkStateChange::HistoryServerUpdate(_) | NetworkStateChange::ServicesUpdate(_) | NetworkStateChange::EventComplete(_) => Ok(()), } diff --git a/sable_ircd/src/server/server_type.rs b/sable_ircd/src/server/server_type.rs index 7114773b..1ef7a65d 100644 --- a/sable_ircd/src/server/server_type.rs +++ b/sable_ircd/src/server/server_type.rs @@ -1,9 +1,11 @@ -use super::*; -use crate::connection_collection::ConnectionCollectionState; use anyhow::Context; +use tracing::instrument; + use client_listener::SavedListenerCollection; use sable_server::ServerSaveError; +use super::*; +use crate::connection_collection::ConnectionCollectionState; use crate::monitor::MonitorSet; /// Saved state of a [`ClientServer`] for later resumption @@ -168,7 +170,8 @@ impl sable_server::ServerType for ClientServer { } } - fn handle_remote_command(&self, cmd: RemoteServerRequestType) -> RemoteServerResponse { + #[instrument(skip_all)] + async fn handle_remote_command(&self, cmd: RemoteServerRequestType) -> RemoteServerResponse { match cmd { RemoteServerRequestType::Ping => RemoteServerResponse::Success, _ => RemoteServerResponse::NotSupported, diff --git a/sable_network/Cargo.toml b/sable_network/Cargo.toml index 5df64d42..310e65c8 100644 --- a/sable_network/Cargo.toml +++ b/sable_network/Cargo.toml @@ -21,6 +21,7 @@ pretty_assertions = "1.4" [dependencies] sable_macros = { path = "../sable_macros" } +async-trait = "0.1.83" tracing = "0.1" thiserror = "1" serde_json = "1" @@ -42,7 +43,7 @@ rand = "0.8" arrayvec = { version = "0.7", features = [ "serde" ] } hashers = "1" serde_with = "1.11" -parking_lot = { version = "0.12", features = [ "serde" ] } +parking_lot = { version = "0.12.2", features = [ "serde", "arc_lock" ] } wildmatch = "2.1" concurrent_log = { version = "0.2.4", features = [ "serde" ] } ipnet = { version = "2", features = [ "serde" ] } diff --git a/sable_network/src/history/local_service.rs b/sable_network/src/history/local_service.rs index 7d91f3ef..e61b0f5b 100644 --- a/sable_network/src/history/local_service.rs +++ b/sable_network/src/history/local_service.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use tracing::instrument; + use crate::network::state::HistoricMessageTargetId; use crate::prelude::*; @@ -20,12 +22,12 @@ fn target_id_for_entry(for_user: UserId, entry: &HistoryLogEntry) -> Option { - node: &'a NetworkNode, +pub struct LocalHistoryService<'a, NetworkPolicy: policy::PolicyService> { + node: &'a NetworkNode, } -impl<'a> LocalHistoryService<'a> { - pub fn new(node: &'a NetworkNode) -> Self { +impl<'a, NetworkPolicy: policy::PolicyService> LocalHistoryService<'a, NetworkPolicy> { + pub fn new(node: &'a NetworkNode) -> Self { LocalHistoryService { node } } @@ -116,7 +118,10 @@ impl<'a> LocalHistoryService<'a> { } } -impl<'a> HistoryService for LocalHistoryService<'a> { +impl<'a, NetworkPolicy: policy::PolicyService> HistoryService + for LocalHistoryService<'a, NetworkPolicy> +{ + #[instrument(skip(self))] async fn list_targets( &self, user: UserId, @@ -147,16 +152,19 @@ impl<'a> HistoryService for LocalHistoryService<'a> { } } + tracing::trace!("list_targets local response: {found_targets:?}"); + found_targets } + #[instrument(skip(self))] async fn get_entries( &self, user: UserId, target: TargetId, request: HistoryRequest, ) -> Result, HistoryError> { - match request { + let res = match request { #[rustfmt::skip] HistoryRequest::Latest { to_ts, limit } => self.get_history_for_target( user, @@ -222,6 +230,8 @@ impl<'a> HistoryService for LocalHistoryService<'a> { ) } } - } + }; + tracing::trace!("get_entries local response: {}", res.is_ok()); + res } } diff --git a/sable_network/src/history/log.rs b/sable_network/src/history/log.rs index 9d714bf1..1d6668c6 100644 --- a/sable_network/src/history/log.rs +++ b/sable_network/src/history/log.rs @@ -165,6 +165,7 @@ impl NetworkHistoryLog { | NewAuditLogEntry(_) | UserLoginChange(_) | ServicesUpdate(_) + | HistoryServerUpdate(_) | EventComplete(_) => None, UserNickChange(_) diff --git a/sable_network/src/history/mod.rs b/sable_network/src/history/mod.rs index 74aa1716..2d5dc38d 100644 --- a/sable_network/src/history/mod.rs +++ b/sable_network/src/history/mod.rs @@ -3,9 +3,13 @@ pub use log::*; mod service; pub use service::*; mod local_service; +pub use local_service::LocalHistoryService; +mod remote_service; +pub use remote_service::RemoteHistoryService; +mod tiered_service; +pub use tiered_service::TieredHistoryService; use crate::network::NetworkStateChange; -pub use local_service::LocalHistoryService; /// Implemented by types that provide metadata for a historic state change pub trait HistoryItem { diff --git a/sable_network/src/history/remote_service.rs b/sable_network/src/history/remote_service.rs new file mode 100644 index 00000000..bbbf4e04 --- /dev/null +++ b/sable_network/src/history/remote_service.rs @@ -0,0 +1,112 @@ +use std::collections::HashMap; + +use tracing::instrument; + +use crate::prelude::*; +use crate::rpc::*; + +/// Implementation of [`HistoryService`] that forwards requests to a [`HistoryServer`] +/// through the RPC. +pub struct RemoteHistoryService<'a, NetworkPolicy: policy::PolicyService> { + node: &'a NetworkNode, + remote_server_name: ServerName, +} + +impl<'a, NetworkPolicy: policy::PolicyService> RemoteHistoryService<'a, NetworkPolicy> { + pub fn new(node: &'a NetworkNode, remote_server_name: ServerName) -> Self { + RemoteHistoryService { + node, + remote_server_name, + } + } +} + +impl<'a, NetworkPolicy: policy::PolicyService> HistoryService + for RemoteHistoryService<'a, NetworkPolicy> +{ + #[instrument(skip(self))] + async fn list_targets( + &self, + user: UserId, + after_ts: Option, + before_ts: Option, + limit: Option, + ) -> HashMap { + let res = self + .node + .sync_log() + .send_remote_request( + self.remote_server_name, + RemoteHistoryServerRequestType::ListTargets { + user, + after_ts, + before_ts, + limit, + } + .into(), + ) + .await; + tracing::trace!("list_targets RPC response: {res:?}"); + match res { + Ok(RemoteServerResponse::History(RemoteHistoryServerResponse::TargetList( + targets, + ))) => targets.into_iter().collect(), + Ok(RemoteServerResponse::History(_)) + | Ok(RemoteServerResponse::Services(_)) + // This request should never error + | Ok(RemoteServerResponse::Error(_)) + | Ok(RemoteServerResponse::NotSupported) + | Ok(RemoteServerResponse::Success) => { + tracing::error!("Got unexpected response to ListTargets request: {res:?}"); + HashMap::new() + }, + Err(e) => { + tracing::error!("ListTargets request failed: {e:?}"); + HashMap::new() + } + } + } + + #[instrument(skip(self))] + async fn get_entries( + &self, + user: UserId, + target: TargetId, + request: HistoryRequest, + ) -> Result, HistoryError> { + let res = self + .node + .sync_log() + .send_remote_request( + self.remote_server_name, + rpc::RemoteHistoryServerRequestType::GetEntries { + user, + target, + request, + } + .into(), + ) + .await; + match res { + Ok(RemoteServerResponse::History(RemoteHistoryServerResponse::Entries( + entries, + ))) => { + tracing::trace!("get_entries RPC response: {}", entries.is_ok()); + entries + }, + Ok(RemoteServerResponse::History(_)) + | Ok(RemoteServerResponse::Services(_)) + // Errors while processing this request would return Entries(Err(_)) + | Ok(RemoteServerResponse::Error(_)) + | Ok(RemoteServerResponse::NotSupported) + | Ok(RemoteServerResponse::Success) => { + tracing::error!("Got unexpected response to GetEntries request: {res:?}"); + Ok(Vec::new()) + }, + Err(e) => { + tracing::error!("GetEntries request failed: {e:?}"); + Ok(Vec::new()) + } + } + } +} diff --git a/sable_network/src/history/service.rs b/sable_network/src/history/service.rs index e0b8f21e..6707851e 100644 --- a/sable_network/src/history/service.rs +++ b/sable_network/src/history/service.rs @@ -9,7 +9,7 @@ use crate::history::HistoryLogEntry; use crate::network::state::{HistoricMessageSourceId, HistoricMessageTargetId}; use crate::prelude::*; -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub enum TargetId { User(UserId), Channel(ChannelId), @@ -50,6 +50,7 @@ impl TryFrom<&HistoricMessageTargetId> for TargetId { } } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum HistoryRequest { Latest { to_ts: Option, @@ -74,7 +75,7 @@ pub enum HistoryRequest { }, } -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum HistoryError { #[error("invalid target: {0:?}")] InvalidTarget(TargetId), diff --git a/sable_network/src/history/tiered_service.rs b/sable_network/src/history/tiered_service.rs new file mode 100644 index 00000000..4fe7ab23 --- /dev/null +++ b/sable_network/src/history/tiered_service.rs @@ -0,0 +1,84 @@ +use std::collections::HashMap; + +use tracing::instrument; + +use crate::prelude::*; + +/// Implementation of [`HistoryService`] backed by two other services, one fast and short-lived and +/// the other slower and longer-lived. +/// +/// This is used to query [`HistoryService`] when possible, and fall-back to a remote server when +/// events expired locally. +pub struct TieredHistoryService< + FastService: HistoryService + Send + Sync, + SlowService: HistoryService + Send + Sync, +> { + fast_service: Option, + slow_service: Option, +} + +impl + TieredHistoryService +{ + pub fn new(fast_service: Option, slow_service: Option) -> Self { + Self { + fast_service, + slow_service, + } + } +} + +impl + HistoryService for TieredHistoryService +{ + #[instrument(skip(self))] + async fn list_targets( + &self, + user: UserId, + after_ts: Option, + before_ts: Option, + limit: Option, + ) -> HashMap { + match (&self.fast_service, &self.slow_service) { + (_, Some(slow_service)) => { + tracing::info!("list_target slow"); + // TODO: implement fallback + slow_service + .list_targets(user, after_ts, before_ts, limit) + .await + } + (Some(fast_service), None) => { + tracing::info!("list_target fast"); + fast_service + .list_targets(user, after_ts, before_ts, limit) + .await + } + (None, None) => HashMap::new(), + } + } + + #[instrument(skip(self))] + async fn get_entries( + &self, + user: UserId, + target: TargetId, + request: HistoryRequest, + ) -> Result, HistoryError> { + // It's tempting to return Box here instead of collecting into a + // temporary Vec, but we can't because IntoIterator::IntoIter potentially differs + match (&self.fast_service, &self.slow_service) { + (_, Some(slow_service)) => { + // TODO: implement fallback + tracing::info!("get_entries slow"); + let entries = slow_service.get_entries(user, target, request).await?; + Ok(entries.into_iter().collect()) + } + (Some(fast_service), None) => { + tracing::info!("get_entries fast"); + let entries = fast_service.get_entries(user, target, request).await?; + Ok(entries.into_iter().collect()) + } + (None, None) => Ok(Vec::new()), + } + } +} diff --git a/sable_network/src/network/event/details.rs b/sable_network/src/network/event/details.rs index 342c8f5a..24ea911b 100644 --- a/sable_network/src/network/event/details.rs +++ b/sable_network/src/network/event/details.rs @@ -190,10 +190,14 @@ EventDetails => { } #[target_type(ServerId)] - struct IntroduceServices { + struct IntroduceServicesServer { pub sasl_mechanisms: Vec, } + #[target_type(ServerId)] + struct IntroduceHistoryServer { + } + #[target_type(AccountId)] struct AccountUpdate { pub data: Option, diff --git a/sable_network/src/network/network/accessors.rs b/sable_network/src/network/network/accessors.rs index cb9db3d0..f7b36bef 100644 --- a/sable_network/src/network/network/accessors.rs +++ b/sable_network/src/network/network/accessors.rs @@ -286,11 +286,17 @@ impl Network { } /// Retrieve the server name of the current active services - pub fn current_services_name(&self) -> Option { - self.current_services - .as_ref() - .and_then(|state| self.servers.get(&state.server_id)) - .map(|s| s.name) + pub fn current_history_server_name(&self) -> Option { + Some(self.servers.get(&self.current_history_server_id?)?.name) + } + + /// Retrieve the server name of the current active services + pub fn current_services_server_name(&self) -> Option { + Some( + self.servers + .get(&self.current_services.as_ref()?.server_id)? + .name, + ) } /// Retrieve the current services data diff --git a/sable_network/src/network/network/account_state.rs b/sable_network/src/network/network/account_state.rs index 6acd79ac..f0e3cb06 100644 --- a/sable_network/src/network/network/account_state.rs +++ b/sable_network/src/network/network/account_state.rs @@ -4,11 +4,11 @@ use crate::network::update::*; use crate::prelude::*; impl Network { - pub(super) fn introduce_services( + pub(super) fn introduce_services_server( &mut self, target: ServerId, event: &Event, - update: &IntroduceServices, + update: &IntroduceServicesServer, updates: &dyn NetworkUpdateReceiver, ) { self.current_services = Some(state::ServicesData { diff --git a/sable_network/src/network/network/history_state.rs b/sable_network/src/network/network/history_state.rs new file mode 100644 index 00000000..475e39e9 --- /dev/null +++ b/sable_network/src/network/network/history_state.rs @@ -0,0 +1,16 @@ +use super::Network; +use crate::network::event::*; +use crate::network::update::*; +use crate::prelude::*; + +impl Network { + pub(super) fn introduce_history_server( + &mut self, + target: ServerId, + _event: &Event, + _update: &IntroduceHistoryServer, + _updates: &dyn NetworkUpdateReceiver, + ) { + self.current_history_server_id = Some(target); + } +} diff --git a/sable_network/src/network/network/mod.rs b/sable_network/src/network/network/mod.rs index 8c2e3e07..0b3f5a87 100644 --- a/sable_network/src/network/network/mod.rs +++ b/sable_network/src/network/network/mod.rs @@ -89,6 +89,7 @@ pub struct Network { channel_roles: HashMap, current_services: Option, + current_history_server_id: Option, config: config::NetworkConfig, clock: EventClock, @@ -130,6 +131,7 @@ impl Network { channel_roles: HashMap::new(), current_services: None, + current_history_server_id: None, config, clock: EventClock::new(), @@ -207,7 +209,8 @@ impl Network { NewAuditLogEntry => self.new_audit_log, EnablePersistentSession => self.enable_persistent_session, DisablePersistentSession => self.disable_persistent_session, - IntroduceServices => self.introduce_services, + IntroduceServicesServer => self.introduce_services_server, + IntroduceHistoryServer => self.introduce_history_server, AccountUpdate => self.update_account, NickRegistrationUpdate => self.update_nick_registration, ChannelRegistrationUpdate => self.update_channel_registration, @@ -286,6 +289,7 @@ mod audit_log; mod ban_state; mod channel_state; mod config_state; +mod history_state; mod message_state; mod oper_state; mod server_state; diff --git a/sable_network/src/network/network/server_state.rs b/sable_network/src/network/network/server_state.rs index 4b84d117..3e48fa33 100644 --- a/sable_network/src/network/network/server_state.rs +++ b/sable_network/src/network/network/server_state.rs @@ -71,6 +71,14 @@ impl Network { } } + // ditto for the history server + if let Some(history_server_id) = self.current_history_server_id { + if removed.id == history_server_id { + self.current_history_server_id = None; + updates.notify(update::HistoryServerUpdate {}, event); + } + } + // Collect all the user connections that were on the departing server let removed_connections: Vec<_> = self .user_connections diff --git a/sable_network/src/network/update.rs b/sable_network/src/network/update.rs index fe527ded..ff1b931d 100644 --- a/sable_network/src/network/update.rs +++ b/sable_network/src/network/update.rs @@ -184,6 +184,10 @@ NetworkStateChange => { struct ServicesUpdate { } + /// The current history node has changed + struct HistoryServerUpdate { + } + /// A delimiter event to denote that an Event has been completely processed struct EventComplete { } }); diff --git a/sable_network/src/node/mod.rs b/sable_network/src/node/mod.rs index acb75474..554517d9 100644 --- a/sable_network/src/node/mod.rs +++ b/sable_network/src/node/mod.rs @@ -148,6 +148,17 @@ impl NetworkNode { self.history_log.read() } + /// Access the long-term history, potentially backed by RPC calls to the + /// history server + pub fn history_service(&self) -> impl HistoryService + use<'_, Policy> { + let local_service = LocalHistoryService::new(self); + let remote_service = self + .network() + .current_history_server_name() + .map(|history_server_name| RemoteHistoryService::new(self, history_server_name)); + TieredHistoryService::new(Some(local_service), remote_service) + } + /// Access the event log. pub fn event_log(&self) -> std::sync::RwLockReadGuard { self.event_log.event_log() diff --git a/sable_network/src/node/update_receiver.rs b/sable_network/src/node/update_receiver.rs index 91b908b7..7c29c470 100644 --- a/sable_network/src/node/update_receiver.rs +++ b/sable_network/src/node/update_receiver.rs @@ -247,6 +247,10 @@ impl NetworkNode { Ok(Vec::new()) } + fn handle_history_server_update(&self, _detail: &update::HistoryServerUpdate) -> HandleResult { + Ok(Vec::new()) + } + fn handle_services_update(&self, _detail: &update::ServicesUpdate) -> HandleResult { Ok(Vec::new()) } @@ -284,6 +288,7 @@ impl NetworkUpdateReceiver for NetworkNode ServerQuit(detail) => self.handle_server_quit(detail), NewAuditLogEntry(detail) => self.report_audit_entry(detail), UserLoginChange(detail) => self.handle_user_login(detail), + HistoryServerUpdate(detail) => self.handle_history_server_update(detail), ServicesUpdate(detail) => self.handle_services_update(detail), // We don't need to do anything with EventComplete, just pass it along to the subscriber EventComplete(_) => Ok(Vec::new()), diff --git a/sable_network/src/policy/mod.rs b/sable_network/src/policy/mod.rs index 55a1ead6..881f2fe9 100644 --- a/sable_network/src/policy/mod.rs +++ b/sable_network/src/policy/mod.rs @@ -50,6 +50,8 @@ pub trait PolicyService: + OperAuthenticationService + OperPolicyService + RegistrationPolicyService + + Sync + + Send { } diff --git a/sable_network/src/rpc/network_message.rs b/sable_network/src/rpc/network_message.rs index d45b765f..b3f4809b 100644 --- a/sable_network/src/rpc/network_message.rs +++ b/sable_network/src/rpc/network_message.rs @@ -1,4 +1,5 @@ use crate::{ + history::{HistoryError, HistoryLogEntry, HistoryRequest}, id::*, network::{event::*, state::ChannelAccessSet, Network}, validated::*, @@ -38,7 +39,10 @@ pub struct RemoteServerRequest { pub enum RemoteServerRequestType { /// Simple ping for communication tests Ping, + /// A message to be handled by a services node Services(RemoteServicesServerRequestType), + /// A message to be handled by a history node + History(RemoteHistoryServerRequestType), } impl From for RemoteServerRequestType { @@ -47,6 +51,12 @@ impl From for RemoteServerRequestType { } } +impl From for RemoteServerRequestType { + fn from(req: RemoteHistoryServerRequestType) -> Self { + RemoteServerRequestType::History(req) + } +} + /// A message to be handled by a services node #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum RemoteServicesServerRequestType { @@ -89,6 +99,23 @@ pub enum RemoteServicesServerRequestType { RemoveAccountFingerprint(AccountId, String), } +/// A message to be handled by a services node +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum RemoteHistoryServerRequestType { + ListTargets { + user: UserId, + after_ts: Option, + before_ts: Option, + limit: Option, + }, + + GetEntries { + user: UserId, + target: crate::history::TargetId, + request: HistoryRequest, + }, +} + /// A SASL authentication response #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AuthenticateStatus { @@ -113,6 +140,8 @@ pub enum RemoteServerResponse { Error(String), /// Response type specific to services servers Services(RemoteServicesServerResponse), + /// Response type specific to history servers + History(RemoteHistoryServerResponse), } impl From for RemoteServerResponse { @@ -121,6 +150,12 @@ impl From for RemoteServerResponse { } } +impl From for RemoteServerResponse { + fn from(resp: RemoteHistoryServerResponse) -> Self { + RemoteServerResponse::History(resp) + } +} + /// Remote services server response type #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum RemoteServicesServerResponse { @@ -139,3 +174,12 @@ pub enum RemoteServicesServerResponse { /// Channel isn't registered ChannelNotRegistered, } + +/// Remote history server response type +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum RemoteHistoryServerResponse { + /// TODO: switch to HashMap when we move away from JSON as the wire format, + /// to be consistent with [`HistoryService`] + TargetList(Vec<(crate::history::TargetId, i64)>), + Entries(Result, HistoryError>), +} diff --git a/sable_server/src/server.rs b/sable_server/src/server.rs index 0a6aa6b9..5b284046 100644 --- a/sable_server/src/server.rs +++ b/sable_server/src/server.rs @@ -236,7 +236,7 @@ where { if let Some(request) = request { - let response = server.handle_remote_command(request.req); + let response = server.handle_remote_command(request.req).await; if let Err(e) = request.response.send(response) { tracing::error!(?e, "Couldn't send response to remote command"); diff --git a/sable_server/src/server_type.rs b/sable_server/src/server_type.rs index 5077e669..003852c1 100644 --- a/sable_server/src/server_type.rs +++ b/sable_server/src/server_type.rs @@ -64,5 +64,8 @@ pub trait ServerType: Send + Sync + Sized + 'static { ) -> std::io::Result; /// Handle a request originating from a remote server - fn handle_remote_command(&self, request: RemoteServerRequestType) -> RemoteServerResponse; + fn handle_remote_command( + &self, + request: RemoteServerRequestType, + ) -> impl Future + Send; } diff --git a/sable_services/src/server/mod.rs b/sable_services/src/server/mod.rs index 524ada23..cbbfa253 100644 --- a/sable_services/src/server/mod.rs +++ b/sable_services/src/server/mod.rs @@ -22,11 +22,10 @@ use sable_server::ServerType; use std::{collections::HashMap, convert::Infallible, sync::Arc}; use anyhow::Context; +use dashmap::DashMap; use serde::Deserialize; - use tokio::sync::{broadcast, mpsc::UnboundedReceiver, Mutex}; - -use dashmap::DashMap; +use tracing::instrument; mod command; mod roles; @@ -136,7 +135,8 @@ where unimplemented!("services can't hot-upgrade"); } - fn handle_remote_command(&self, req: RemoteServerRequestType) -> RemoteServerResponse { + #[instrument(skip_all)] + async fn handle_remote_command(&self, req: RemoteServerRequestType) -> RemoteServerResponse { tracing::debug!(?req, "Got remote request"); use RemoteServerRequestType::*; @@ -205,8 +205,12 @@ where self.user_del_fp(acc, fp) } }, + History(_) => { + tracing::warn!(?req, "Got unsupported request (history)"); + Ok(RemoteServerResponse::NotSupported) + } Ping => { - tracing::warn!(?req, "Got unsupported request"); + tracing::warn!(?req, "Got unsupported request (ping)"); Ok(RemoteServerResponse::NotSupported) } }; diff --git a/sable_services/src/server/sync.rs b/sable_services/src/server/sync.rs index 0822fe85..73d45c1b 100644 --- a/sable_services/src/server/sync.rs +++ b/sable_services/src/server/sync.rs @@ -160,7 +160,7 @@ where // Finally, set ourselves as the active services node self.node.submit_event( self.node.id(), - IntroduceServices { + IntroduceServicesServer { sasl_mechanisms: vec!["PLAIN".to_owned()], }, ); From b7369a834e584d3e949e0ebdfb77b33c7e0fe32b Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sat, 26 Oct 2024 18:37:25 +0200 Subject: [PATCH 02/22] LATEST impl with blanks to fill --- sable_history/src/pg_history_service.rs | 121 +++++++++++++++++++++--- sable_network/src/history/service.rs | 4 +- 2 files changed, 107 insertions(+), 18 deletions(-) diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index 89590d43..ab65d19d 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -3,14 +3,15 @@ use std::collections::HashMap; use diesel::dsl::sql; use diesel::prelude::*; -use diesel_async::{AsyncConnection, AsyncPgConnection}; -use futures::stream::StreamExt; +use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl}; +use futures::stream::{StreamExt, TryStreamExt}; use tokio::sync::Mutex; use uuid::Uuid; use sable_network::prelude::*; use crate::schema::channels; +use crate::schema::messages; /// Implementation of [`HistoryService`] backed PostgreSQL pub struct PgHistoryService<'a> { @@ -28,22 +29,22 @@ impl<'a> PgHistoryService<'a> { impl<'a> HistoryService for PgHistoryService<'a> { async fn list_targets( &self, - user: UserId, - after_ts: Option, - before_ts: Option, - limit: Option, + _user: UserId, + _after_ts: Option, + _before_ts: Option, + _limit: Option, ) -> HashMap { // TODO: access control - match diesel_async::RunQueryDsl::load_stream( - channels::dsl::channels.select(( + // TODO: after_ts, before_ts, limit + match channels::dsl::channels + .select(( channels::dsl::id, sql::( "SELECT MAX(id) FROM messages WHERE target_channel=channels.id", ), - )), - &mut *self.database_connection.lock().await, - ) - .await + )) + .load_stream(&mut *self.database_connection.lock().await) + .await { Err(e) => { tracing::error!("Could not get history channels: {e}"); @@ -75,11 +76,101 @@ impl<'a> HistoryService for PgHistoryService<'a> { async fn get_entries( &self, - user: UserId, + _user: UserId, target: TargetId, request: HistoryRequest, ) -> Result, HistoryError> { - todo!(); - Ok(vec![]) + // TODO: access control + let TargetId::Channel(channel_id) = target else { + // TODO: PMs + return Err(HistoryError::InvalidTarget(target)); + }; + + let mut connection_lock = self.database_connection.lock().await; + + let db_channel_id = i64::try_from(channel_id.as_u64()).expect("channel id overflows u64"); + if channels::dsl::channels + .find(db_channel_id) + .select(crate::models::Channel::as_select()) + .first(&mut *connection_lock) + .await + .optional() + .expect("Could not check if channel exists") + .is_none() + { + return Err(HistoryError::InvalidTarget(target)); + } + + match request { + HistoryRequest::Latest { to_ts, limit } => { + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + Ok(match to_ts { + Some(to_ts) => { + // Lowest UUIDv7 corresponding to the timestamp + let to_uuid = uuid::Builder::from_unix_timestamp_millis( + u64::try_from(to_ts) + .unwrap_or(u64::MIN) // floor timestamps to Epoch + .saturating_mul(1000), + &[u8::MIN; 10], + ) + .into_uuid(); + Box::new( + messages::dsl::messages + .filter(messages::dsl::target_channel.eq(db_channel_id)) + .filter(messages::dsl::id.lt(to_uuid)) + .order(messages::dsl::id.desc()) + .limit(limit) + .load_stream(&mut *connection_lock), + ) + } + None => Box::new( + messages::dsl::messages + .filter(messages::dsl::target_channel.eq(db_channel_id)) + .order(messages::dsl::id.desc()) + .limit(limit) + .load_stream(&mut *connection_lock), + ), + } + .await + .expect("could not query messages") + .map(|row| { + let crate::models::Message { + id, + source_user, + target_channel, + text, + } = row?; + Ok(HistoryLogEntry { + id: (), + details: NetworkStateChange::NewMessage(update::NewMessage { + message: (), + source: (), + target: (), + }), + source_event: (), + timestamp: (), + }) + }) + .try_collect::>() + .await + .expect("could not parse all records")) + } + HistoryRequest::Before { from_ts, limit } => { + todo!("before") + } + HistoryRequest::After { start_ts, limit } => { + todo!("after") + } + HistoryRequest::Around { around_ts, limit } => { + todo!("around") + } + HistoryRequest::Between { + start_ts, + end_ts, + limit, + } => { + todo!("between") + } + } } } diff --git a/sable_network/src/history/service.rs b/sable_network/src/history/service.rs index 6707851e..f34d1e75 100644 --- a/sable_network/src/history/service.rs +++ b/sable_network/src/history/service.rs @@ -99,7 +99,5 @@ pub trait HistoryService { user: UserId, target: TargetId, request: HistoryRequest, - ) -> impl Future, HistoryError>> - + Send - + Sync; + ) -> impl Future, HistoryError>> + Send; } From 2c0eaadf47d89c3bf219cef03f33af3d78423b30 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sun, 27 Oct 2024 08:59:05 +0100 Subject: [PATCH 03/22] [WIP] decouple send_item --- sable_history/src/pg_history_service.rs | 8 + sable_ircd/src/messages/send_history.rs | 237 +++++++++++++----------- sable_network/src/history/mod.rs | 4 + 3 files changed, 139 insertions(+), 110 deletions(-) diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index ab65d19d..05d67722 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -134,12 +134,18 @@ impl<'a> HistoryService for PgHistoryService<'a> { .await .expect("could not query messages") .map(|row| { + let row: crate::models::Message = match row { + Ok(row) => row, + Err(e) => return Err(e), + }; + /* let crate::models::Message { id, source_user, target_channel, text, } = row?; + } Ok(HistoryLogEntry { id: (), details: NetworkStateChange::NewMessage(update::NewMessage { @@ -150,6 +156,8 @@ impl<'a> HistoryService for PgHistoryService<'a> { source_event: (), timestamp: (), }) + */ + Ok((|| -> HistoryLogEntry { todo!() })()) }) .try_collect::>() .await diff --git a/sable_ircd/src/messages/send_history.rs b/sable_ircd/src/messages/send_history.rs index 10c245c9..af1bb41d 100644 --- a/sable_ircd/src/messages/send_history.rs +++ b/sable_ircd/src/messages/send_history.rs @@ -1,6 +1,6 @@ use crate::capability::ClientCapability; use crate::capability::WithSupportedTags; -use crate::errors::HandleResult; +use crate::errors::{HandleResult, HandlerError}; use crate::messages::MessageSink; use crate::prelude::numeric; use crate::server::ClientServer; @@ -10,8 +10,12 @@ use sable_network::utils::*; use super::message; -/// Extension trait to translate a network history entry into client protocol messages +type Result = std::result::Result; + +/// Extension trait to translate a network history entry into an intermediate representation +/// ([`HistoryMessage`]) then to client protocol messages pub(crate) trait SendHistoryItem { + /// Shorthand for [`Self::make_history_message`] followed by [`Self::send_history_message`] fn send_item( &self, item: &Item, @@ -20,6 +24,45 @@ pub(crate) trait SendHistoryItem { ) -> HandleResult; } +impl SendHistoryItem for ClientServer +where + ClientServer: MakeHistoryMessage, +{ + fn send_item( + &self, + item: &Item, + conn: impl MessageSink, + from_entry: &impl HistoryItem, + ) -> HandleResult { + for message in self.make_history_messages(item, &conn, from_entry)? { + self.send_history_message(message, &conn)?; + } + + Ok(()) + } +} + +/// Extension trait to translate a network history entry into an intermediate representation +/// ([`HistoryMessage`]) +pub(crate) trait MakeHistoryMessage { + fn make_history_messages( + &self, + item: &Item, + conn: &impl MessageSink, + from_entry: &impl HistoryItem, + ) -> Result>; +} + +impl ClientServer { + fn send_history_message( + &self, + message: HistoryMessage, + conn: &impl MessageSink, + ) -> HandleResult { + match message {} + } +} + impl SendHistoryItem for ClientServer { fn send_item( &self, @@ -61,7 +104,7 @@ impl SendHistoryItem for ClientServer { fn send_item( &self, item: &HistoryLogEntry, - conn: impl MessageSink, + conn: &impl MessageSink, _from_entry: &impl HistoryItem, ) -> HandleResult { match &item.details { @@ -94,13 +137,13 @@ impl SendHistoryItem for ClientServer { } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::UserAwayChange, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let source = net.historic_user(item.user)?; @@ -110,45 +153,40 @@ impl SendHistoryItem for ClientServer { None => numeric::Unaway::new(), Some(_) => numeric::NowAway::new(), }; - conn.send(message.format_for(self, source)); + Ok([message.format_for(self, source)]) } else { // Tell other users sharing a channel if they enabled away-notify let message = match item.new_reason { None => message::Unaway::new(source), Some(reason) => message::Away::new(source, reason.value()), }; - let message = message.with_tags_from(from_entry, &net); - conn.send(message.with_required_capabilities(ClientCapability::AwayNotify)); + Ok([message.with_tags_from(from_entry, &net)]) } - - Ok(()) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::UserNickChange, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let source = net.historic_user(item.user)?; let message = message::Nick::new(source, &item.new_nick).with_tags_from(from_entry, &net); - conn.send(message); - - Ok(()) + Ok([message]) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::UserModeChange, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let source = net.historic_user(item.user)?; let message = message::Mode::new( @@ -158,36 +196,32 @@ impl SendHistoryItem for ClientServer { ) .with_tags_from(from_entry, &net); - conn.send(message); - - Ok(()) + Ok([message]) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::UserQuit, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let source = net.historic_user(item.user)?; let message = message::Quit::new(source, &item.message).with_tags_from(from_entry, &net); - conn.send(message); - - Ok(()) + Ok([message]) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::ChannelModeChange, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let source = net.message_source(&item.changed_by)?; let channel = net.channel(item.channel)?; @@ -201,19 +235,17 @@ impl SendHistoryItem for ClientServer { let message = message::Mode::new(&source, &channel, &changes).with_tags_from(from_entry, &net); - conn.send(message); - - Ok(()) + Ok([message]) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::ChannelTopicChange, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let source = net.message_source(&item.setter)?; let channel = net.channel(item.channel)?; @@ -221,55 +253,51 @@ impl SendHistoryItem for ClientServer { let message = message::Topic::new(&source, &channel.name(), &item.new_text) .with_tags_from(from_entry, &net); - conn.send(message); - - Ok(()) + Ok([message]) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::ListModeAdded, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let source = net.message_source(&item.set_by)?; let channel = net.channel(item.channel)?; let text = format!("+{} {}", item.list_type.mode_char(), item.pattern); let message = message::Mode::new(&source, &channel, &text).with_tags_from(from_entry, &net); - conn.send(message); - Ok(()) + Ok([message]) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::ListModeRemoved, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let source = net.message_source(&item.removed_by)?; let channel = net.channel(item.channel)?; let text = format!("-{} {}", item.list_type.mode_char(), item.pattern); let message = message::Mode::new(&source, &channel, &text).with_tags_from(from_entry, &net); - conn.send(message); - Ok(()) + Ok([message]) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::MembershipFlagChange, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let source = net.message_source(&item.changed_by)?; let user = net.historic_user(item.user)?; @@ -284,27 +312,24 @@ impl SendHistoryItem for ClientServer { let message = message::Mode::new(&source, &channel, &changes).with_tags_from(from_entry, &net); - conn.send(message); - - Ok(()) + Ok([message]) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::ChannelJoin, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let user = net.historic_user(item.user)?; let membership = net.membership(item.membership)?; let channel = membership.channel()?; - let message = message::Join::new(user, &channel.name()).with_tags_from(from_entry, &net); - - conn.send(message); + let mut messages = + vec![message::Join::new(user, &channel.name()).with_tags_from(from_entry, &net)]; if !membership.permissions().is_empty() { let (mut changes, args) = format_channel_perm_changes( @@ -317,27 +342,27 @@ impl SendHistoryItem for ClientServer { changes += &args.join(" "); let msg = message::Mode::new(user, &channel, &changes); - conn.send(msg); + messages.push(msg); } if let Some(away_reason) = user.away_reason() { let message = message::Away::new(user, away_reason.value()).with_tags_from(from_entry, &net); - conn.send(message.with_required_capabilities(ClientCapability::AwayNotify)); + messages.push(message.with_required_capabilities(ClientCapability::AwayNotify)); } - Ok(()) + Ok(message) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::ChannelKick, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let source = net.message_source(&item.source)?; let user = net.historic_user(item.user)?; @@ -346,19 +371,17 @@ impl SendHistoryItem for ClientServer { let message = message::Kick::new(&source, user, &channel.name(), &item.message) .with_tags_from(from_entry, &net); - conn.send(message); - - Ok(()) + Ok([message]) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::ChannelPart, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let user = net.historic_user(item.user)?; let channel = net.channel(item.membership.channel)?; @@ -368,19 +391,17 @@ impl SendHistoryItem for ClientServer { let message = message::Part::new(user, &channel.name(), &item.message) .with_tags_from(from_entry, &net); - conn.send(message); - - Ok(()) + Ok([message]) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::ChannelInvite, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let source = net.message_source(&item.source)?; let user = net.historic_user(item.user)?; @@ -389,32 +410,30 @@ impl SendHistoryItem for ClientServer { let message = message::Invite::new(&source, user, &channel.name()).with_tags_from(from_entry, &net); - conn.send(message); - - Ok(()) + Ok([message]) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, _item: &update::ChannelRename, - _conn: impl MessageSink, + _conn: &impl MessageSink, _from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { // Not part of history, so it is handled entirely in send_realtime.rs. // See https://github.com/ircv3/ircv3-specifications/issues/532 Ok(()) } } -impl SendHistoryItem for ClientServer { - fn send_item( +impl MakeHistoryMessage for ClientServer { + fn make_history_messages( &self, item: &update::NewMessage, - conn: impl MessageSink, + conn: &impl MessageSink, from_entry: &impl HistoryItem, - ) -> HandleResult { + ) -> Result> { let net = self.network(); let source = net.message_source(&item.source)?; let target = net.message_target(&item.target)?; @@ -431,14 +450,12 @@ impl SendHistoryItem for ClientServer { if conn.user_id() == Some(*user.user()) && item.target.user().map(|id| id.user()) != Some(user.user()) { - conn.send(message.with_required_capabilities(ClientCapability::EchoMessage)); + Ok([message.with_required_capabilities(ClientCapability::EchoMessage)]) } else { - conn.send(message); + Ok([message]) } } - _ => conn.send(message), + _ => Ok([message]), } - - Ok(()) } } diff --git a/sable_network/src/history/mod.rs b/sable_network/src/history/mod.rs index 2d5dc38d..886603b5 100644 --- a/sable_network/src/history/mod.rs +++ b/sable_network/src/history/mod.rs @@ -26,3 +26,7 @@ impl HistoryItem for HistoryLogEntry { &self.details } } + +/// A more concrete representation of [`HistoryItem`], with all its fields inflated +/// to strings that will be sent to the client +pub enum HistoryMessage {} From 2ef9b6a36c80b74862ce1e955a388149696b42f6 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sun, 27 Oct 2024 15:09:23 +0100 Subject: [PATCH 04/22] Make HistoryService return an inflated event instead of HistoryLogEntry --- sable_history/diesel.toml | 1 + sable_history/src/lib.rs | 1 + sable_history/src/models/message.rs | 1 + sable_history/src/pg_history_service.rs | 7 +- sable_history/src/schema.rs | 10 + sable_history/src/server/sync.rs | 2 - sable_history/src/server/update_handler.rs | 1 + .../src/command/handlers/chathistory.rs | 51 +++- sable_ircd/src/messages/send_history.rs | 237 ++++++++---------- sable_ircd/src/messages/source_target.rs | 6 + sable_network/src/history/local_service.rs | 30 ++- sable_network/src/history/remote_service.rs | 2 +- sable_network/src/history/service.rs | 17 +- sable_network/src/history/tiered_service.rs | 2 +- sable_network/src/rpc/network_message.rs | 4 +- 15 files changed, 218 insertions(+), 154 deletions(-) diff --git a/sable_history/diesel.toml b/sable_history/diesel.toml index a0d61bf4..06043aa5 100644 --- a/sable_history/diesel.toml +++ b/sable_history/diesel.toml @@ -4,6 +4,7 @@ [print_schema] file = "src/schema.rs" custom_type_derives = ["diesel::query_builder::QueryId", "Clone"] +import_types = ["crate::type::*"] [migrations_directory] dir = "migrations" diff --git a/sable_history/src/lib.rs b/sable_history/src/lib.rs index 36c941cc..c89921ea 100644 --- a/sable_history/src/lib.rs +++ b/sable_history/src/lib.rs @@ -5,3 +5,4 @@ pub use server::*; mod models; mod schema; +mod types; diff --git a/sable_history/src/models/message.rs b/sable_history/src/models/message.rs index 7934b523..aa6b3fc2 100644 --- a/sable_history/src/models/message.rs +++ b/sable_history/src/models/message.rs @@ -10,4 +10,5 @@ pub struct Message { pub source_user: i32, pub target_channel: i64, pub text: String, + pub message_type: crate::types::MessageType, } diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index 05d67722..504428c6 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -1,9 +1,8 @@ -use std::borrow::BorrowMut; use std::collections::HashMap; use diesel::dsl::sql; use diesel::prelude::*; -use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl}; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; use futures::stream::{StreamExt, TryStreamExt}; use tokio::sync::Mutex; use uuid::Uuid; @@ -79,7 +78,7 @@ impl<'a> HistoryService for PgHistoryService<'a> { _user: UserId, target: TargetId, request: HistoryRequest, - ) -> Result, HistoryError> { + ) -> Result, HistoryError> { // TODO: access control let TargetId::Channel(channel_id) = target else { // TODO: PMs @@ -157,7 +156,7 @@ impl<'a> HistoryService for PgHistoryService<'a> { timestamp: (), }) */ - Ok((|| -> HistoryLogEntry { todo!() })()) + Ok((|| -> HistoricalEvent { todo!() })()) }) .try_collect::>() .await diff --git a/sable_history/src/schema.rs b/sable_history/src/schema.rs index 2823831b..4346acf7 100644 --- a/sable_history/src/schema.rs +++ b/sable_history/src/schema.rs @@ -1,5 +1,11 @@ // @generated automatically by Diesel CLI. +pub mod sql_types { + #[derive(diesel::query_builder::QueryId, Clone, diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "Message_Type"))] + pub struct MessageType; +} + diesel::table! { channels (id) { id -> Int8, @@ -21,11 +27,15 @@ diesel::table! { } diesel::table! { + use diesel::sql_types::*; + use super::sql_types::MessageType; + messages (id) { id -> Uuid, source_user -> Int4, target_channel -> Int8, text -> Varchar, + message_type -> MessageType, } } diff --git a/sable_history/src/server/sync.rs b/sable_history/src/server/sync.rs index 651983c5..c293e2f4 100644 --- a/sable_history/src/server/sync.rs +++ b/sable_history/src/server/sync.rs @@ -1,5 +1,3 @@ -use sable_network::network::wrapper::ObjectWrapper; - use super::*; use crate::server::event::IntroduceHistoryServer; diff --git a/sable_history/src/server/update_handler.rs b/sable_history/src/server/update_handler.rs index 3e4e6800..05566342 100644 --- a/sable_history/src/server/update_handler.rs +++ b/sable_history/src/server/update_handler.rs @@ -156,6 +156,7 @@ impl HistoryServer { id: **net_message.id(), source_user: db_source.id, target_channel: db_channel.id, + message_type: net_message.message_type().into(), text: net_message.text().to_string(), }; diff --git a/sable_ircd/src/command/handlers/chathistory.rs b/sable_ircd/src/command/handlers/chathistory.rs index 3b5a81f7..cf8a4fa3 100644 --- a/sable_ircd/src/command/handlers/chathistory.rs +++ b/sable_ircd/src/command/handlers/chathistory.rs @@ -1,9 +1,10 @@ -use super::*; -use crate::{capability::ClientCapability, utils}; -use messages::send_history::SendHistoryItem; +use std::cmp::{max, min}; + use sable_network::history::{HistoryError, HistoryRequest, HistoryService, TargetId}; -use std::cmp::{max, min}; +use super::*; +use crate::capability::server_time; +use crate::{capability::ClientCapability, utils}; fn parse_msgref(subcommand: &str, target: Option<&str>, msgref: &str) -> Result { match msgref.split_once('=') { @@ -197,20 +198,48 @@ async fn list_targets<'a>( } fn send_history_entries<'a>( - server: &ClientServer, - into: impl MessageSink, + _server: &ClientServer, + conn: impl MessageSink, target: &str, - entries: impl IntoIterator, + entries: impl IntoIterator, ) -> CommandResult { - let batch = into + let batch = conn .batch("chathistory", ClientCapability::Batch) .with_arguments(&[target]) .start(); for entry in entries { - // Ignore errors here; it's possible that a message has been expired out of network state - // but a reference to it still exists in the history log - let _ = server.send_item(&entry, &batch, &entry); + match entry { + HistoricalEvent::Message { + id, + source, + source_account, + target: _, // assume it's the same as the one we got as parameter + message_type, + text, + } => { + let (timestamp, _) = id + .get_timestamp() + .expect("message has non-v7 UUID") + .to_unix(); + let msg = message::Message::new(&source, &target, message_type, &text) + .with_tag(server_time::server_time_tag( + i64::try_from(timestamp).unwrap_or(i64::MAX), + )) + .with_tag(OutboundMessageTag::new( + "msgid", + Some(id.to_string()), + ClientCapability::MessageTags, + )) + .with_tag(OutboundMessageTag::new( + "account", + source_account, + ClientCapability::AccountTag, + )); + + batch.send(msg); + } + } } Ok(()) diff --git a/sable_ircd/src/messages/send_history.rs b/sable_ircd/src/messages/send_history.rs index af1bb41d..10c245c9 100644 --- a/sable_ircd/src/messages/send_history.rs +++ b/sable_ircd/src/messages/send_history.rs @@ -1,6 +1,6 @@ use crate::capability::ClientCapability; use crate::capability::WithSupportedTags; -use crate::errors::{HandleResult, HandlerError}; +use crate::errors::HandleResult; use crate::messages::MessageSink; use crate::prelude::numeric; use crate::server::ClientServer; @@ -10,12 +10,8 @@ use sable_network::utils::*; use super::message; -type Result = std::result::Result; - -/// Extension trait to translate a network history entry into an intermediate representation -/// ([`HistoryMessage`]) then to client protocol messages +/// Extension trait to translate a network history entry into client protocol messages pub(crate) trait SendHistoryItem { - /// Shorthand for [`Self::make_history_message`] followed by [`Self::send_history_message`] fn send_item( &self, item: &Item, @@ -24,45 +20,6 @@ pub(crate) trait SendHistoryItem { ) -> HandleResult; } -impl SendHistoryItem for ClientServer -where - ClientServer: MakeHistoryMessage, -{ - fn send_item( - &self, - item: &Item, - conn: impl MessageSink, - from_entry: &impl HistoryItem, - ) -> HandleResult { - for message in self.make_history_messages(item, &conn, from_entry)? { - self.send_history_message(message, &conn)?; - } - - Ok(()) - } -} - -/// Extension trait to translate a network history entry into an intermediate representation -/// ([`HistoryMessage`]) -pub(crate) trait MakeHistoryMessage { - fn make_history_messages( - &self, - item: &Item, - conn: &impl MessageSink, - from_entry: &impl HistoryItem, - ) -> Result>; -} - -impl ClientServer { - fn send_history_message( - &self, - message: HistoryMessage, - conn: &impl MessageSink, - ) -> HandleResult { - match message {} - } -} - impl SendHistoryItem for ClientServer { fn send_item( &self, @@ -104,7 +61,7 @@ impl SendHistoryItem for ClientServer { fn send_item( &self, item: &HistoryLogEntry, - conn: &impl MessageSink, + conn: impl MessageSink, _from_entry: &impl HistoryItem, ) -> HandleResult { match &item.details { @@ -137,13 +94,13 @@ impl SendHistoryItem for ClientServer { } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::UserAwayChange, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let source = net.historic_user(item.user)?; @@ -153,40 +110,45 @@ impl MakeHistoryMessage for ClientServer { None => numeric::Unaway::new(), Some(_) => numeric::NowAway::new(), }; - Ok([message.format_for(self, source)]) + conn.send(message.format_for(self, source)); } else { // Tell other users sharing a channel if they enabled away-notify let message = match item.new_reason { None => message::Unaway::new(source), Some(reason) => message::Away::new(source, reason.value()), }; - Ok([message.with_tags_from(from_entry, &net)]) + let message = message.with_tags_from(from_entry, &net); + conn.send(message.with_required_capabilities(ClientCapability::AwayNotify)); } + + Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::UserNickChange, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let source = net.historic_user(item.user)?; let message = message::Nick::new(source, &item.new_nick).with_tags_from(from_entry, &net); - Ok([message]) + conn.send(message); + + Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::UserModeChange, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let source = net.historic_user(item.user)?; let message = message::Mode::new( @@ -196,32 +158,36 @@ impl MakeHistoryMessage for ClientServer { ) .with_tags_from(from_entry, &net); - Ok([message]) + conn.send(message); + + Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::UserQuit, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let source = net.historic_user(item.user)?; let message = message::Quit::new(source, &item.message).with_tags_from(from_entry, &net); - Ok([message]) + conn.send(message); + + Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::ChannelModeChange, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let source = net.message_source(&item.changed_by)?; let channel = net.channel(item.channel)?; @@ -235,17 +201,19 @@ impl MakeHistoryMessage for ClientServer { let message = message::Mode::new(&source, &channel, &changes).with_tags_from(from_entry, &net); - Ok([message]) + conn.send(message); + + Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::ChannelTopicChange, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let source = net.message_source(&item.setter)?; let channel = net.channel(item.channel)?; @@ -253,51 +221,55 @@ impl MakeHistoryMessage for ClientServer { let message = message::Topic::new(&source, &channel.name(), &item.new_text) .with_tags_from(from_entry, &net); - Ok([message]) + conn.send(message); + + Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::ListModeAdded, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let source = net.message_source(&item.set_by)?; let channel = net.channel(item.channel)?; let text = format!("+{} {}", item.list_type.mode_char(), item.pattern); let message = message::Mode::new(&source, &channel, &text).with_tags_from(from_entry, &net); - Ok([message]) + conn.send(message); + Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::ListModeRemoved, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let source = net.message_source(&item.removed_by)?; let channel = net.channel(item.channel)?; let text = format!("-{} {}", item.list_type.mode_char(), item.pattern); let message = message::Mode::new(&source, &channel, &text).with_tags_from(from_entry, &net); - Ok([message]) + conn.send(message); + Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::MembershipFlagChange, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let source = net.message_source(&item.changed_by)?; let user = net.historic_user(item.user)?; @@ -312,24 +284,27 @@ impl MakeHistoryMessage for ClientServer { let message = message::Mode::new(&source, &channel, &changes).with_tags_from(from_entry, &net); - Ok([message]) + conn.send(message); + + Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::ChannelJoin, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let user = net.historic_user(item.user)?; let membership = net.membership(item.membership)?; let channel = membership.channel()?; - let mut messages = - vec![message::Join::new(user, &channel.name()).with_tags_from(from_entry, &net)]; + let message = message::Join::new(user, &channel.name()).with_tags_from(from_entry, &net); + + conn.send(message); if !membership.permissions().is_empty() { let (mut changes, args) = format_channel_perm_changes( @@ -342,27 +317,27 @@ impl MakeHistoryMessage for ClientServer { changes += &args.join(" "); let msg = message::Mode::new(user, &channel, &changes); - messages.push(msg); + conn.send(msg); } if let Some(away_reason) = user.away_reason() { let message = message::Away::new(user, away_reason.value()).with_tags_from(from_entry, &net); - messages.push(message.with_required_capabilities(ClientCapability::AwayNotify)); + conn.send(message.with_required_capabilities(ClientCapability::AwayNotify)); } - Ok(message) + Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::ChannelKick, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let source = net.message_source(&item.source)?; let user = net.historic_user(item.user)?; @@ -371,17 +346,19 @@ impl MakeHistoryMessage for ClientServer { let message = message::Kick::new(&source, user, &channel.name(), &item.message) .with_tags_from(from_entry, &net); - Ok([message]) + conn.send(message); + + Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::ChannelPart, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let user = net.historic_user(item.user)?; let channel = net.channel(item.membership.channel)?; @@ -391,17 +368,19 @@ impl MakeHistoryMessage for ClientServer { let message = message::Part::new(user, &channel.name(), &item.message) .with_tags_from(from_entry, &net); - Ok([message]) + conn.send(message); + + Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::ChannelInvite, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let source = net.message_source(&item.source)?; let user = net.historic_user(item.user)?; @@ -410,30 +389,32 @@ impl MakeHistoryMessage for ClientServer { let message = message::Invite::new(&source, user, &channel.name()).with_tags_from(from_entry, &net); - Ok([message]) + conn.send(message); + + Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, _item: &update::ChannelRename, - _conn: &impl MessageSink, + _conn: impl MessageSink, _from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { // Not part of history, so it is handled entirely in send_realtime.rs. // See https://github.com/ircv3/ircv3-specifications/issues/532 Ok(()) } } -impl MakeHistoryMessage for ClientServer { - fn make_history_messages( +impl SendHistoryItem for ClientServer { + fn send_item( &self, item: &update::NewMessage, - conn: &impl MessageSink, + conn: impl MessageSink, from_entry: &impl HistoryItem, - ) -> Result> { + ) -> HandleResult { let net = self.network(); let source = net.message_source(&item.source)?; let target = net.message_target(&item.target)?; @@ -450,12 +431,14 @@ impl MakeHistoryMessage for ClientServer { if conn.user_id() == Some(*user.user()) && item.target.user().map(|id| id.user()) != Some(user.user()) { - Ok([message.with_required_capabilities(ClientCapability::EchoMessage)]) + conn.send(message.with_required_capabilities(ClientCapability::EchoMessage)); } else { - Ok([message]) + conn.send(message); } } - _ => Ok([message]), + _ => conn.send(message), } + + Ok(()) } } diff --git a/sable_ircd/src/messages/source_target.rs b/sable_ircd/src/messages/source_target.rs index 51da5e06..22d1cfca 100644 --- a/sable_ircd/src/messages/source_target.rs +++ b/sable_ircd/src/messages/source_target.rs @@ -103,6 +103,12 @@ impl MessageTarget for String { } } +impl MessageTarget for &str { + fn format(&self) -> String { + self.to_string() + } +} + impl MessageTarget for state::HistoricUser { fn format(&self) -> String { self.nickname.to_string() diff --git a/sable_network/src/history/local_service.rs b/sable_network/src/history/local_service.rs index e61b0f5b..f2dad556 100644 --- a/sable_network/src/history/local_service.rs +++ b/sable_network/src/history/local_service.rs @@ -39,7 +39,7 @@ impl<'a, NetworkPolicy: policy::PolicyService> LocalHistoryService<'a, NetworkPo to_ts: Option, backward_limit: usize, forward_limit: usize, - ) -> Result, HistoryError> { + ) -> Result, HistoryError> { let mut backward_entries = Vec::new(); let mut forward_entries = Vec::new(); let mut target_exists = false; @@ -105,13 +105,37 @@ impl<'a, NetworkPolicy: policy::PolicyService> LocalHistoryService<'a, NetworkPo } if target_exists { + use crate::network::update; + // "The order of returned messages within the batch is implementation-defined, but SHOULD be // ascending time order or some approximation thereof, regardless of the subcommand used." // -- https://ircv3.net/specs/extensions/chathistory#returned-message-notes Ok(backward_entries .into_iter() .rev() - .chain(forward_entries.into_iter())) + .chain(forward_entries.into_iter()) + .flat_map( + move |HistoryLogEntry { + id: _, + timestamp: _, + source_event: _, + details, + }| match details { + NetworkStateChange::NewMessage(update::NewMessage { + message, + source, + target: _, // assume it's the same as the argument we got + }) => Some(HistoricalEvent::Message { + id: *message, + message_type: crate::network::state::MessageType::Notice, // TODO + source: "".to_string(), // TODO + source_account: None, // TODO + target, + text: "".to_string(), // TODO + }), + _ => None, + }, + )) } else { Err(HistoryError::InvalidTarget(target)) } @@ -163,7 +187,7 @@ impl<'a, NetworkPolicy: policy::PolicyService> HistoryService user: UserId, target: TargetId, request: HistoryRequest, - ) -> Result, HistoryError> { + ) -> Result, HistoryError> { let res = match request { #[rustfmt::skip] HistoryRequest::Latest { to_ts, limit } => self.get_history_for_target( diff --git a/sable_network/src/history/remote_service.rs b/sable_network/src/history/remote_service.rs index bbbf4e04..ed9dd860 100644 --- a/sable_network/src/history/remote_service.rs +++ b/sable_network/src/history/remote_service.rs @@ -73,7 +73,7 @@ impl<'a, NetworkPolicy: policy::PolicyService> HistoryService user: UserId, target: TargetId, request: HistoryRequest, - ) -> Result, HistoryError> { + ) -> Result, HistoryError> { let res = self .node .sync_log() diff --git a/sable_network/src/history/service.rs b/sable_network/src/history/service.rs index f34d1e75..afea8bfa 100644 --- a/sable_network/src/history/service.rs +++ b/sable_network/src/history/service.rs @@ -5,8 +5,7 @@ use std::future::Future; use thiserror::Error; -use crate::history::HistoryLogEntry; -use crate::network::state::{HistoricMessageSourceId, HistoricMessageTargetId}; +use crate::network::state::{HistoricMessageSourceId, HistoricMessageTargetId, MessageType}; use crate::prelude::*; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] @@ -99,5 +98,17 @@ pub trait HistoryService { user: UserId, target: TargetId, request: HistoryRequest, - ) -> impl Future, HistoryError>> + Send; + ) -> impl Future, HistoryError>> + Send; +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum HistoricalEvent { + Message { + id: Uuid7, + source: String, + source_account: Option, + target: TargetId, + message_type: MessageType, + text: String, + }, } diff --git a/sable_network/src/history/tiered_service.rs b/sable_network/src/history/tiered_service.rs index 4fe7ab23..859e727f 100644 --- a/sable_network/src/history/tiered_service.rs +++ b/sable_network/src/history/tiered_service.rs @@ -63,7 +63,7 @@ impl Result, HistoryError> { + ) -> Result, HistoryError> { // It's tempting to return Box here instead of collecting into a // temporary Vec, but we can't because IntoIterator::IntoIter potentially differs match (&self.fast_service, &self.slow_service) { diff --git a/sable_network/src/rpc/network_message.rs b/sable_network/src/rpc/network_message.rs index b3f4809b..c8beafd5 100644 --- a/sable_network/src/rpc/network_message.rs +++ b/sable_network/src/rpc/network_message.rs @@ -1,5 +1,5 @@ use crate::{ - history::{HistoryError, HistoryLogEntry, HistoryRequest}, + history::{HistoricalEvent, HistoryError, HistoryRequest}, id::*, network::{event::*, state::ChannelAccessSet, Network}, validated::*, @@ -181,5 +181,5 @@ pub enum RemoteHistoryServerResponse { /// TODO: switch to HashMap when we move away from JSON as the wire format, /// to be consistent with [`HistoryService`] TargetList(Vec<(crate::history::TargetId, i64)>), - Entries(Result, HistoryError>), + Entries(Result, HistoryError>), } From e5aa0e0d32bacfe2a894e94a35d2325d0a809ffe Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sun, 27 Oct 2024 15:58:38 +0100 Subject: [PATCH 05/22] Finish HistoryRequest::Latest in pg_history_service --- sable_history/src/pg_history_service.rs | 71 ++++++++++++++----------- sable_network/src/id.rs | 12 +++++ 2 files changed, 53 insertions(+), 30 deletions(-) diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index 504428c6..e83f946f 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -9,8 +9,7 @@ use uuid::Uuid; use sable_network::prelude::*; -use crate::schema::channels; -use crate::schema::messages; +use crate::schema::{channels, historic_users, messages}; /// Implementation of [`HistoryService`] backed PostgreSQL pub struct PgHistoryService<'a> { @@ -100,6 +99,18 @@ impl<'a> HistoryService for PgHistoryService<'a> { return Err(HistoryError::InvalidTarget(target)); } + let base_query = messages::dsl::messages + .inner_join(historic_users::dsl::historic_users) + .select(( + messages::dsl::id, + messages::dsl::message_type, + messages::dsl::text, + historic_users::dsl::nick, + historic_users::dsl::ident, + historic_users::dsl::vhost, + historic_users::dsl::account_name, + )) + .filter(messages::dsl::target_channel.eq(db_channel_id)); match request { HistoryRequest::Latest { to_ts, limit } => { let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); @@ -114,8 +125,7 @@ impl<'a> HistoryService for PgHistoryService<'a> { ) .into_uuid(); Box::new( - messages::dsl::messages - .filter(messages::dsl::target_channel.eq(db_channel_id)) + base_query .filter(messages::dsl::id.lt(to_uuid)) .order(messages::dsl::id.desc()) .limit(limit) @@ -123,8 +133,7 @@ impl<'a> HistoryService for PgHistoryService<'a> { ) } None => Box::new( - messages::dsl::messages - .filter(messages::dsl::target_channel.eq(db_channel_id)) + base_query .order(messages::dsl::id.desc()) .limit(limit) .load_stream(&mut *connection_lock), @@ -133,30 +142,32 @@ impl<'a> HistoryService for PgHistoryService<'a> { .await .expect("could not query messages") .map(|row| { - let row: crate::models::Message = match row { - Ok(row) => row, - Err(e) => return Err(e), - }; - /* - let crate::models::Message { - id, - source_user, - target_channel, - text, - } = row?; - } - Ok(HistoryLogEntry { - id: (), - details: NetworkStateChange::NewMessage(update::NewMessage { - message: (), - source: (), - target: (), - }), - source_event: (), - timestamp: (), - }) - */ - Ok((|| -> HistoricalEvent { todo!() })()) + row.map( + |( + id, + message_type, + text, + source_nick, + source_ident, + source_vhost, + source_account, + ): ( + uuid::Uuid, + crate::types::MessageType, + String, + String, + String, + String, + _, + )| HistoricalEvent::Message { + id: id.try_into().expect("Message id is a non-v7 UUID"), + source: format!("{}!{}@{}", source_nick, source_ident, source_vhost), + source_account, + message_type: message_type.into(), + target, // assume it's the same + text, + }, + ) }) .try_collect::>() .await diff --git a/sable_network/src/id.rs b/sable_network/src/id.rs index 2ed2de2b..a91e5861 100644 --- a/sable_network/src/id.rs +++ b/sable_network/src/id.rs @@ -25,6 +25,18 @@ impl Uuid7 { } } +impl TryFrom for Uuid7 { + type Error = (); + + fn try_from(uuid: Uuid) -> Result { + if uuid.get_version() == Some(uuid::Version::SortRand) { + Ok(Self(uuid)) + } else { + Err(()) + } + } +} + #[derive(Debug, Error)] #[error("Mismatched object ID type for event")] pub struct WrongIdTypeError; From 682bbf78a7ac96a709e9e0097ddc06dc622f1aae Mon Sep 17 00:00:00 2001 From: Stephen Bennett Date: Sun, 27 Oct 2024 14:46:50 +0000 Subject: [PATCH 06/22] Partial fill-in-the-blanks for message translation --- sable_network/src/history/local_service.rs | 47 ++++++++++++---------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/sable_network/src/history/local_service.rs b/sable_network/src/history/local_service.rs index f2dad556..2743890d 100644 --- a/sable_network/src/history/local_service.rs +++ b/sable_network/src/history/local_service.rs @@ -47,6 +47,7 @@ impl<'a, NetworkPolicy: policy::PolicyService> LocalHistoryService<'a, NetworkPo // Keep the lock on the NetworkHistoryLog between the backward and the forward // search to make sure both have a consistent state let log = self.node.history(); + let net = self.node.network(); if backward_limit != 0 { let from_ts = if forward_limit == 0 { @@ -114,32 +115,34 @@ impl<'a, NetworkPolicy: policy::PolicyService> LocalHistoryService<'a, NetworkPo .into_iter() .rev() .chain(forward_entries.into_iter()) - .flat_map( - move |HistoryLogEntry { - id: _, - timestamp: _, - source_event: _, - details, - }| match details { - NetworkStateChange::NewMessage(update::NewMessage { - message, - source, - target: _, // assume it's the same as the argument we got - }) => Some(HistoricalEvent::Message { - id: *message, - message_type: crate::network::state::MessageType::Notice, // TODO - source: "".to_string(), // TODO - source_account: None, // TODO - target, - text: "".to_string(), // TODO - }), - _ => None, - }, - )) + .flat_map(move |entry| Self::translate_log_entry(entry, &net))) } else { Err(HistoryError::InvalidTarget(target)) } } + + fn translate_log_entry(entry: HistoryLogEntry, net: &Network) -> Option { + match entry.details { + NetworkStateChange::NewMessage(update::NewMessage { + message, + source, + target, + }) => { + let message = net.message(message).ok()?; + let source = message.source().ok()?; + + Some(HistoricalEvent::Message { + id: *message.id(), + message_type: message.message_type(), + source: source.nuh(), + source_account: source.account_name().map(|n| n.to_string()), + target: todo!(), + text: message.text().to_string(), + }) + } + _ => None, + } + } } impl<'a, NetworkPolicy: policy::PolicyService> HistoryService From a8874b6529427d0de9ba9308a6dd433d86f9f355 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sun, 27 Oct 2024 16:02:19 +0100 Subject: [PATCH 07/22] Add missing files --- .../down.sql | 4 ++ .../up.sql | 4 ++ sable_history/src/types.rs | 54 +++++++++++++++++++ 3 files changed, 62 insertions(+) create mode 100644 sable_history/migrations/2024-10-27-125826_reproducible_messages/down.sql create mode 100644 sable_history/migrations/2024-10-27-125826_reproducible_messages/up.sql create mode 100644 sable_history/src/types.rs diff --git a/sable_history/migrations/2024-10-27-125826_reproducible_messages/down.sql b/sable_history/migrations/2024-10-27-125826_reproducible_messages/down.sql new file mode 100644 index 00000000..51aa81a7 --- /dev/null +++ b/sable_history/migrations/2024-10-27-125826_reproducible_messages/down.sql @@ -0,0 +1,4 @@ +ALTER TABLE + DROP COLUMN message_type MessageType NOT NULL; + +DROP TYPE "MessageType"; diff --git a/sable_history/migrations/2024-10-27-125826_reproducible_messages/up.sql b/sable_history/migrations/2024-10-27-125826_reproducible_messages/up.sql new file mode 100644 index 00000000..0aba669c --- /dev/null +++ b/sable_history/migrations/2024-10-27-125826_reproducible_messages/up.sql @@ -0,0 +1,4 @@ +CREATE TYPE "Message_Type" AS ENUM ('privmsg', 'notice'); + +ALTER TABLE messages + ADD COLUMN message_type "Message_Type" NOT NULL; diff --git a/sable_history/src/types.rs b/sable_history/src/types.rs new file mode 100644 index 00000000..c941720a --- /dev/null +++ b/sable_history/src/types.rs @@ -0,0 +1,54 @@ +use std::io::Write; + +use diesel::pg::{Pg, PgValue}; +use diesel::{deserialize, serialize}; +use diesel::{AsExpression, FromSqlRow}; + +use crate::schema::sql_types::MessageType as SqlMessageType; + +#[derive(Debug, PartialEq, FromSqlRow, AsExpression, Eq)] +#[diesel(sql_type = SqlMessageType)] +pub enum MessageType { + Privmsg, + Notice, +} + +impl serialize::ToSql for MessageType { + fn to_sql<'b>(&'b self, out: &mut serialize::Output<'b, '_, Pg>) -> serialize::Result { + match *self { + MessageType::Privmsg => out.write_all(b"privmsg")?, + MessageType::Notice => out.write_all(b"notice")?, + } + Ok(serialize::IsNull::No) + } +} + +impl deserialize::FromSql for MessageType { + fn from_sql(bytes: PgValue<'_>) -> deserialize::Result { + match bytes.as_bytes() { + b"privmsg" => Ok(MessageType::Privmsg), + b"notice" => Ok(MessageType::Notice), + _ => Err("Unrecognized enum variant for MessageType".into()), + } + } +} + +impl From for MessageType { + fn from(value: sable_network::network::state::MessageType) -> Self { + use sable_network::network::state::MessageType::*; + match value { + Privmsg => MessageType::Privmsg, + Notice => MessageType::Notice, + } + } +} + +impl From for sable_network::network::state::MessageType { + fn from(value: MessageType) -> Self { + use sable_network::network::state::MessageType::*; + match value { + MessageType::Privmsg => Privmsg, + MessageType::Notice => Notice, + } + } +} From f0ac38dee1ade1dbf9412c5ad4ae1434388f9454 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sun, 27 Oct 2024 16:12:18 +0100 Subject: [PATCH 08/22] Make target a String in case it expired --- sable_history/src/pg_history_service.rs | 9 ++++----- sable_network/src/history/local_service.rs | 9 ++++----- sable_network/src/history/service.rs | 2 +- sable_network/src/network/wrapper/message.rs | 9 +++++++++ 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index e83f946f..42185f95 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -87,17 +87,16 @@ impl<'a> HistoryService for PgHistoryService<'a> { let mut connection_lock = self.database_connection.lock().await; let db_channel_id = i64::try_from(channel_id.as_u64()).expect("channel id overflows u64"); - if channels::dsl::channels + let Some(channel) = channels::dsl::channels .find(db_channel_id) .select(crate::models::Channel::as_select()) .first(&mut *connection_lock) .await .optional() .expect("Could not check if channel exists") - .is_none() - { + else { return Err(HistoryError::InvalidTarget(target)); - } + }; let base_query = messages::dsl::messages .inner_join(historic_users::dsl::historic_users) @@ -164,7 +163,7 @@ impl<'a> HistoryService for PgHistoryService<'a> { source: format!("{}!{}@{}", source_nick, source_ident, source_vhost), source_account, message_type: message_type.into(), - target, // assume it's the same + target: channel.name.clone(), // assume it's the same text, }, ) diff --git a/sable_network/src/history/local_service.rs b/sable_network/src/history/local_service.rs index 2743890d..7dd5fd82 100644 --- a/sable_network/src/history/local_service.rs +++ b/sable_network/src/history/local_service.rs @@ -106,8 +106,6 @@ impl<'a, NetworkPolicy: policy::PolicyService> LocalHistoryService<'a, NetworkPo } if target_exists { - use crate::network::update; - // "The order of returned messages within the batch is implementation-defined, but SHOULD be // ascending time order or some approximation thereof, regardless of the subcommand used." // -- https://ircv3.net/specs/extensions/chathistory#returned-message-notes @@ -125,18 +123,19 @@ impl<'a, NetworkPolicy: policy::PolicyService> LocalHistoryService<'a, NetworkPo match entry.details { NetworkStateChange::NewMessage(update::NewMessage { message, - source, - target, + source: _, + target: _, }) => { let message = net.message(message).ok()?; let source = message.source().ok()?; + let target = message.target().ok()?; Some(HistoricalEvent::Message { id: *message.id(), message_type: message.message_type(), source: source.nuh(), source_account: source.account_name().map(|n| n.to_string()), - target: todo!(), + target: target.to_string(), text: message.text().to_string(), }) } diff --git a/sable_network/src/history/service.rs b/sable_network/src/history/service.rs index afea8bfa..f9fa12a1 100644 --- a/sable_network/src/history/service.rs +++ b/sable_network/src/history/service.rs @@ -107,7 +107,7 @@ pub enum HistoricalEvent { id: Uuid7, source: String, source_account: Option, - target: TargetId, + target: String, message_type: MessageType, text: String, }, diff --git a/sable_network/src/network/wrapper/message.rs b/sable_network/src/network/wrapper/message.rs index 8922187c..411bd439 100644 --- a/sable_network/src/network/wrapper/message.rs +++ b/sable_network/src/network/wrapper/message.rs @@ -56,6 +56,15 @@ impl MessageTarget<'_> { } } +impl ToString for MessageTarget<'_> { + fn to_string(&self) -> String { + match self { + Self::User(u) => u.nuh(), + Self::Channel(c) => c.name().to_string(), + } + } +} + /// A wrapper around a [`state::Message`] pub struct Message<'a> { network: &'a Network, From 9b88fca4d62828605ee6ebb9aaad23bd7e93941c Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sun, 27 Oct 2024 16:14:54 +0100 Subject: [PATCH 09/22] Make id a MessageId --- sable_history/src/pg_history_service.rs | 2 +- sable_network/src/history/local_service.rs | 2 +- sable_network/src/history/service.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index 42185f95..cc9d5b34 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -159,7 +159,7 @@ impl<'a> HistoryService for PgHistoryService<'a> { String, _, )| HistoricalEvent::Message { - id: id.try_into().expect("Message id is a non-v7 UUID"), + id: MessageId::new(id.try_into().expect("Message id is a non-v7 UUID")), source: format!("{}!{}@{}", source_nick, source_ident, source_vhost), source_account, message_type: message_type.into(), diff --git a/sable_network/src/history/local_service.rs b/sable_network/src/history/local_service.rs index 7dd5fd82..0f45ac25 100644 --- a/sable_network/src/history/local_service.rs +++ b/sable_network/src/history/local_service.rs @@ -131,7 +131,7 @@ impl<'a, NetworkPolicy: policy::PolicyService> LocalHistoryService<'a, NetworkPo let target = message.target().ok()?; Some(HistoricalEvent::Message { - id: *message.id(), + id: message.id(), message_type: message.message_type(), source: source.nuh(), source_account: source.account_name().map(|n| n.to_string()), diff --git a/sable_network/src/history/service.rs b/sable_network/src/history/service.rs index f9fa12a1..b70f7265 100644 --- a/sable_network/src/history/service.rs +++ b/sable_network/src/history/service.rs @@ -104,7 +104,7 @@ pub trait HistoryService { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum HistoricalEvent { Message { - id: Uuid7, + id: MessageId, source: String, source_account: Option, target: String, From 96661e9cdbf9e7faf143f0a989eff9c6f8e17fe6 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sun, 27 Oct 2024 17:39:16 +0100 Subject: [PATCH 10/22] sable_history: Add support for running migrations on startup --- Cargo.lock | 88 ++++++++++++++++++++++++++++++++- sable_history/Cargo.toml | 4 +- sable_history/build.rs | 3 ++ sable_history/src/server/mod.rs | 49 ++++++++++++++++-- 4 files changed, 137 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9fa4a232..a874521c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -441,7 +441,7 @@ checksum = "031718ddb8f78aa5def78a09e90defe30151d1f6c672f937af4dd916429ed996" dependencies = [ "semver", "serde", - "toml", + "toml 0.5.11", "url", ] @@ -907,6 +907,17 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "diesel_migrations" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a73ce704bad4231f001bff3314d91dce4aba0770cee8b233991859abc15c1f6" +dependencies = [ + "diesel", + "migrations_internals", + "migrations_macros", +] + [[package]] name = "diesel_table_macro_syntax" version = "0.2.0" @@ -1705,6 +1716,27 @@ dependencies = [ "autocfg", ] +[[package]] +name = "migrations_internals" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd01039851e82f8799046eabbb354056283fb265c8ec0996af940f4e85a380ff" +dependencies = [ + "serde", + "toml 0.8.19", +] + +[[package]] +name = "migrations_macros" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb161cc72176cb37aa47f1fc520d3ef02263d67d661f44f05d05a079e1237fd" +dependencies = [ + "migrations_internals", + "proc-macro2", + "quote", +] + [[package]] name = "mime" version = "0.3.17" @@ -2379,7 +2411,9 @@ dependencies = [ "clap 4.5.20", "diesel", "diesel-async", + "diesel_migrations", "futures", + "itertools", "parking_lot 0.12.3", "sable_network", "sable_server", @@ -2618,6 +2652,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87607cb1398ed59d48732e575a4c28a7a8ebf2454b964fe3f224f2afc07909e1" +dependencies = [ + "serde", +] + [[package]] name = "serde_with" version = "1.14.0" @@ -3143,6 +3186,40 @@ dependencies = [ "serde", ] +[[package]] +name = "toml" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.22.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" +dependencies = [ + "indexmap 2.6.0", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + [[package]] name = "tonic" version = "0.9.2" @@ -3671,6 +3748,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b" +dependencies = [ + "memchr", +] + [[package]] name = "winreg" version = "0.6.2" diff --git a/sable_history/Cargo.toml b/sable_history/Cargo.toml index 34d29732..c65abbd5 100644 --- a/sable_history/Cargo.toml +++ b/sable_history/Cargo.toml @@ -23,7 +23,9 @@ tracing = "0.1" anyhow = "1.0" clap = { version = "4.5", features = [ "derive" ] } chrono = "0.4" +itertools = "0.10" uuid = { version = "1.9.1", features = ["v7", "fast-rng", "serde"] } diesel = { version = "2.2", features = [ "postgres", "chrono", "uuid" ] } -diesel-async = { version = "0.5", features = [ "postgres" ] } +diesel-async = { version = "0.5", features = [ "postgres", "tokio", "async-connection-wrapper" ] } +diesel_migrations = "2.2.0" diff --git a/sable_history/build.rs b/sable_history/build.rs index d8f91cb9..ea6041ba 100644 --- a/sable_history/build.rs +++ b/sable_history/build.rs @@ -1,3 +1,6 @@ fn main() { + // https://docs.rs/diesel_migrations/2.2.0/diesel_migrations/macro.embed_migrations.html#automatic-rebuilds + println!("cargo:rerun-if-changed=migrations/"); + built::write_built_file().expect("Failed to acquire build-time information"); } diff --git a/sable_history/src/server/mod.rs b/sable_history/src/server/mod.rs index e9eb11c7..0e039a8d 100644 --- a/sable_history/src/server/mod.rs +++ b/sable_history/src/server/mod.rs @@ -1,22 +1,29 @@ use std::convert::Infallible; +use std::sync::Arc; use anyhow::Context; -use sable_network::prelude::*; -use sable_server::ServerType; +use diesel::migration::MigrationSource; +use diesel::prelude::*; +use diesel_async::async_connection_wrapper::AsyncConnectionWrapper; +use diesel_async::{AsyncConnection, AsyncPgConnection}; +use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; +use itertools::Itertools; use serde::Deserialize; use tokio::sync::{mpsc::UnboundedReceiver, Mutex}; use tracing::instrument; -use std::sync::Arc; - -use diesel_async::{AsyncConnection, AsyncPgConnection}; +use sable_network::prelude::*; +use sable_server::ServerType; mod sync; mod update_handler; +pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); + #[derive(Debug, Clone, Deserialize)] pub struct HistoryServerConfig { pub database: String, + pub auto_run_migrations: bool, } pub struct HistoryServer { @@ -46,6 +53,38 @@ impl ServerType for HistoryServer { sable_network::rpc::NetworkHistoryUpdate, >, ) -> anyhow::Result { + let database = config.database.clone(); + if config.auto_run_migrations { + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + // run_pending_migrations only support sync connections + let mut conn = AsyncConnectionWrapper::::establish(&database) + .context("Couldn't connect to database")?; + tracing::info!("Running database migrations"); + tracing::trace!( + "Required migrations: {}", + MIGRATIONS + .migrations() + .map_err(|e| anyhow::anyhow!("Couldn't get migrations: {e}"))? + .iter() + .map(diesel::migration::Migration::::name) + .join(", ") + ); + let migrations = conn + .run_pending_migrations(MIGRATIONS) + .map_err(|e| anyhow::anyhow!("Database migrations failed: {e}"))?; + if migrations.is_empty() { + tracing::info!("No database migrations to run"); + } else { + tracing::info!( + "Applied database migrations: {}", + migrations.iter().map(ToString::to_string).join(", ") + ) + } + Ok(()) + }) + .await + .context("Couldn't join migration task")??; + } Ok(Self { node, history_receiver: Mutex::new(history_receiver), From 984de4d0945eb32861250110875b0922d78ab261 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sun, 27 Oct 2024 17:39:35 +0100 Subject: [PATCH 11/22] pg_history_service: Fix LATEST logic --- sable_history/src/pg_history_service.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index cc9d5b34..ee57dac3 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -115,17 +115,18 @@ impl<'a> HistoryService for PgHistoryService<'a> { let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); Ok(match to_ts { Some(to_ts) => { - // Lowest UUIDv7 corresponding to the timestamp + // Highest UUIDv7 corresponding to the timestamp let to_uuid = uuid::Builder::from_unix_timestamp_millis( u64::try_from(to_ts) .unwrap_or(u64::MIN) // floor timestamps to Epoch - .saturating_mul(1000), - &[u8::MIN; 10], + .saturating_mul(1000) + .saturating_add(999), + &[u8::MAX; 10], ) .into_uuid(); Box::new( base_query - .filter(messages::dsl::id.lt(to_uuid)) + .filter(messages::dsl::id.gt(to_uuid)) .order(messages::dsl::id.desc()) .limit(limit) .load_stream(&mut *connection_lock), @@ -170,7 +171,10 @@ impl<'a> HistoryService for PgHistoryService<'a> { }) .try_collect::>() .await - .expect("could not parse all records")) + .expect("could not parse all records") + .into_iter() + .rev() // need to reverse *after* applying the SQL LIMIT + .collect::>()) } HistoryRequest::Before { from_ts, limit } => { todo!("before") From d56db83b2b385dc211a590685b3e1263016f9a91 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sun, 27 Oct 2024 21:17:11 +0100 Subject: [PATCH 12/22] Make sure server-time is consistent between echoes and CHATHISTORY --- .../up.sql | 6 ++++- sable_history/src/models/message.rs | 9 +++++++ sable_history/src/pg_history_service.rs | 25 ++++++++++--------- sable_history/src/schema.rs | 1 + sable_history/src/server/update_handler.rs | 22 ++++++++++++---- .../src/command/handlers/chathistory.rs | 5 +--- sable_network/src/history/local_service.rs | 1 + sable_network/src/history/service.rs | 1 + 8 files changed, 48 insertions(+), 22 deletions(-) diff --git a/sable_history/migrations/2024-10-27-125826_reproducible_messages/up.sql b/sable_history/migrations/2024-10-27-125826_reproducible_messages/up.sql index 0aba669c..990a57c3 100644 --- a/sable_history/migrations/2024-10-27-125826_reproducible_messages/up.sql +++ b/sable_history/migrations/2024-10-27-125826_reproducible_messages/up.sql @@ -1,4 +1,8 @@ CREATE TYPE "Message_Type" AS ENUM ('privmsg', 'notice'); ALTER TABLE messages - ADD COLUMN message_type "Message_Type" NOT NULL; + ADD COLUMN message_type "Message_Type" NOT NULL, + ADD COLUMN timestamp TIMESTAMP NOT NULL; + +CREATE INDEX messages_by_timestamp ON messages USING BRIN (timestamp, id); +COMMENT ON INDEX messages_by_timestamp IS 'Includes the id in order to be a consistent total order across requests'; diff --git a/sable_history/src/models/message.rs b/sable_history/src/models/message.rs index aa6b3fc2..d1f2d01f 100644 --- a/sable_history/src/models/message.rs +++ b/sable_history/src/models/message.rs @@ -5,10 +5,19 @@ use super::*; #[diesel(check_for_backend(diesel::pg::Pg))] #[diesel(belongs_to(Channel, foreign_key = target_channel))] #[diesel(belongs_to(HistoricUser, foreign_key = source_user))] +#[derive(Debug)] pub struct Message { pub id: Uuid, pub source_user: i32, pub target_channel: i64, pub text: String, pub message_type: crate::types::MessageType, + /// Timestamp of the *update* introducing the message. + /// + /// This is usually the same second as the one in [`id`] (a UUIDv7), but is + /// occasionally 1 second later, because the message id is created before being + /// pushed to the log. + /// It can also before significantly different, because both are based on the + /// system clock, which can change arbitrarily. + pub timestamp: chrono::NaiveDateTime, } diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index ee57dac3..cf60410b 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use chrono::{DateTime, NaiveDateTime, Utc}; use diesel::dsl::sql; use diesel::prelude::*; use diesel_async::{AsyncPgConnection, RunQueryDsl}; @@ -102,6 +103,7 @@ impl<'a> HistoryService for PgHistoryService<'a> { .inner_join(historic_users::dsl::historic_users) .select(( messages::dsl::id, + messages::dsl::timestamp, messages::dsl::message_type, messages::dsl::text, historic_users::dsl::nick, @@ -115,26 +117,22 @@ impl<'a> HistoryService for PgHistoryService<'a> { let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); Ok(match to_ts { Some(to_ts) => { - // Highest UUIDv7 corresponding to the timestamp - let to_uuid = uuid::Builder::from_unix_timestamp_millis( - u64::try_from(to_ts) - .unwrap_or(u64::MIN) // floor timestamps to Epoch - .saturating_mul(1000) - .saturating_add(999), - &[u8::MAX; 10], - ) - .into_uuid(); + let to_ts = DateTime::from_timestamp(to_ts, 999_999) + .unwrap_or(DateTime::::MIN_UTC) + .naive_utc(); Box::new( base_query - .filter(messages::dsl::id.gt(to_uuid)) - .order(messages::dsl::id.desc()) + .filter(messages::dsl::timestamp.gt(to_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) .limit(limit) .load_stream(&mut *connection_lock), ) } None => Box::new( base_query - .order(messages::dsl::id.desc()) + // total order, consistent across requests + .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) .limit(limit) .load_stream(&mut *connection_lock), ), @@ -145,6 +143,7 @@ impl<'a> HistoryService for PgHistoryService<'a> { row.map( |( id, + timestamp, message_type, text, source_nick, @@ -153,6 +152,7 @@ impl<'a> HistoryService for PgHistoryService<'a> { source_account, ): ( uuid::Uuid, + NaiveDateTime, crate::types::MessageType, String, String, @@ -161,6 +161,7 @@ impl<'a> HistoryService for PgHistoryService<'a> { _, )| HistoricalEvent::Message { id: MessageId::new(id.try_into().expect("Message id is a non-v7 UUID")), + timestamp: timestamp.and_utc().timestamp(), source: format!("{}!{}@{}", source_nick, source_ident, source_vhost), source_account, message_type: message_type.into(), diff --git a/sable_history/src/schema.rs b/sable_history/src/schema.rs index 4346acf7..1f34b6ce 100644 --- a/sable_history/src/schema.rs +++ b/sable_history/src/schema.rs @@ -36,6 +36,7 @@ diesel::table! { target_channel -> Int8, text -> Varchar, message_type -> MessageType, + timestamp -> Timestamp, } } diff --git a/sable_history/src/server/update_handler.rs b/sable_history/src/server/update_handler.rs index 05566342..f7f7dd44 100644 --- a/sable_history/src/server/update_handler.rs +++ b/sable_history/src/server/update_handler.rs @@ -1,3 +1,6 @@ +use chrono::DateTime; +use diesel_async::RunQueryDsl; + use super::*; use crate::models::HistoricUser; @@ -5,13 +8,13 @@ use rpc::NetworkHistoryUpdate; use state::HistoricMessageSourceId; use wrapper::HistoricMessageTarget; -use diesel::prelude::*; -use diesel_async::RunQueryDsl; - impl HistoryServer { pub async fn handle_history_update(&self, update: NetworkHistoryUpdate) -> anyhow::Result<()> { + let update_timestamp = update.timestamp(); match update.change { - NetworkStateChange::NewMessage(detail) => self.handle_new_message(detail).await, + NetworkStateChange::NewMessage(detail) => { + self.handle_new_message(detail, update_timestamp).await + } NetworkStateChange::NewUser(_) | NetworkStateChange::UserNickChange(_) @@ -132,7 +135,11 @@ impl HistoryServer { } } - async fn handle_new_message(&self, new_message: update::NewMessage) -> anyhow::Result<()> { + async fn handle_new_message( + &self, + new_message: update::NewMessage, + update_timestamp: i64, + ) -> anyhow::Result<()> { use crate::schema::messages::dsl::*; let net = self.node.network(); @@ -154,6 +161,9 @@ impl HistoryServer { let db_message = crate::models::Message { id: **net_message.id(), + timestamp: DateTime::from_timestamp(update_timestamp, 0) + .context("Timestamp overflowed")? + .naive_utc(), // may differ from the message's timestamp source_user: db_source.id, target_channel: db_channel.id, message_type: net_message.message_type().into(), @@ -166,6 +176,8 @@ impl HistoryServer { .execute(&mut *connection_lock) .await?; + tracing::trace!("Persisted message: {db_message:?}"); + Ok(()) } } diff --git a/sable_ircd/src/command/handlers/chathistory.rs b/sable_ircd/src/command/handlers/chathistory.rs index cf8a4fa3..315539e8 100644 --- a/sable_ircd/src/command/handlers/chathistory.rs +++ b/sable_ircd/src/command/handlers/chathistory.rs @@ -212,16 +212,13 @@ fn send_history_entries<'a>( match entry { HistoricalEvent::Message { id, + timestamp, source, source_account, target: _, // assume it's the same as the one we got as parameter message_type, text, } => { - let (timestamp, _) = id - .get_timestamp() - .expect("message has non-v7 UUID") - .to_unix(); let msg = message::Message::new(&source, &target, message_type, &text) .with_tag(server_time::server_time_tag( i64::try_from(timestamp).unwrap_or(i64::MAX), diff --git a/sable_network/src/history/local_service.rs b/sable_network/src/history/local_service.rs index 0f45ac25..a769e866 100644 --- a/sable_network/src/history/local_service.rs +++ b/sable_network/src/history/local_service.rs @@ -132,6 +132,7 @@ impl<'a, NetworkPolicy: policy::PolicyService> LocalHistoryService<'a, NetworkPo Some(HistoricalEvent::Message { id: message.id(), + timestamp: entry.timestamp(), // update's timestamp, may differ from the message's timestamp message_type: message.message_type(), source: source.nuh(), source_account: source.account_name().map(|n| n.to_string()), diff --git a/sable_network/src/history/service.rs b/sable_network/src/history/service.rs index b70f7265..66d3e47a 100644 --- a/sable_network/src/history/service.rs +++ b/sable_network/src/history/service.rs @@ -105,6 +105,7 @@ pub trait HistoryService { pub enum HistoricalEvent { Message { id: MessageId, + timestamp: i64, source: String, source_account: Option, target: String, From 71ae9c187c4efae831097d2d1807fabff2eb7082 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sun, 27 Oct 2024 21:25:15 +0100 Subject: [PATCH 13/22] Update down.sql --- .../2024-10-27-125826_reproducible_messages/down.sql | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sable_history/migrations/2024-10-27-125826_reproducible_messages/down.sql b/sable_history/migrations/2024-10-27-125826_reproducible_messages/down.sql index 51aa81a7..12136b7f 100644 --- a/sable_history/migrations/2024-10-27-125826_reproducible_messages/down.sql +++ b/sable_history/migrations/2024-10-27-125826_reproducible_messages/down.sql @@ -1,4 +1,7 @@ +DROP INDEX messages_by_timestamp; + ALTER TABLE - DROP COLUMN message_type MessageType NOT NULL; + DROP COLUMN message_type, + DROP COLUMN timestamp; DROP TYPE "MessageType"; From b87c630da48d8a5daf93ec378a555aa64bf2e66a Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Mon, 28 Oct 2024 19:14:01 +0100 Subject: [PATCH 14/22] Split make_historical_event out of get_entries --- sable_history/src/pg_history_service.rs | 56 +++++++++++-------------- 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index cf60410b..b2f39171 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -139,37 +139,7 @@ impl<'a> HistoryService for PgHistoryService<'a> { } .await .expect("could not query messages") - .map(|row| { - row.map( - |( - id, - timestamp, - message_type, - text, - source_nick, - source_ident, - source_vhost, - source_account, - ): ( - uuid::Uuid, - NaiveDateTime, - crate::types::MessageType, - String, - String, - String, - String, - _, - )| HistoricalEvent::Message { - id: MessageId::new(id.try_into().expect("Message id is a non-v7 UUID")), - timestamp: timestamp.and_utc().timestamp(), - source: format!("{}!{}@{}", source_nick, source_ident, source_vhost), - source_account, - message_type: message_type.into(), - target: channel.name.clone(), // assume it's the same - text, - }, - ) - }) + .map_ok(|row| make_historical_event(&channel, row)) .try_collect::>() .await .expect("could not parse all records") @@ -196,3 +166,27 @@ impl<'a> HistoryService for PgHistoryService<'a> { } } } + +fn make_historical_event( + channel: &crate::models::Channel, + (id, timestamp, message_type, text, source_nick, source_ident, source_vhost, source_account): ( + uuid::Uuid, + NaiveDateTime, + crate::types::MessageType, + String, + String, + String, + String, + Option, + ), +) -> HistoricalEvent { + HistoricalEvent::Message { + id: MessageId::new(id.try_into().expect("Message id is a non-v7 UUID")), + timestamp: timestamp.and_utc().timestamp(), + source: format!("{}!{}@{}", source_nick, source_ident, source_vhost), + source_account, + message_type: message_type.into(), + target: channel.name.clone(), // assume it's the same + text, + } +} From 82d4546c6af56a4b7e9a51f28353ed05ada82966 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Mon, 28 Oct 2024 19:36:06 +0100 Subject: [PATCH 15/22] Implement BEFORE/AFTER/BETWEEN --- sable_history/src/pg_history_service.rs | 100 ++++++++++++++++++++++-- 1 file changed, 93 insertions(+), 7 deletions(-) diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index b2f39171..009f8430 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -112,10 +112,10 @@ impl<'a> HistoryService for PgHistoryService<'a> { historic_users::dsl::account_name, )) .filter(messages::dsl::target_channel.eq(db_channel_id)); - match request { + Ok(match request { HistoryRequest::Latest { to_ts, limit } => { let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); - Ok(match to_ts { + match to_ts { Some(to_ts) => { let to_ts = DateTime::from_timestamp(to_ts, 999_999) .unwrap_or(DateTime::::MIN_UTC) @@ -145,13 +145,50 @@ impl<'a> HistoryService for PgHistoryService<'a> { .expect("could not parse all records") .into_iter() .rev() // need to reverse *after* applying the SQL LIMIT - .collect::>()) + .collect::>() } HistoryRequest::Before { from_ts, limit } => { - todo!("before") + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + let from_ts = DateTime::from_timestamp(from_ts, 0) + .unwrap_or(DateTime::::MAX_UTC) + .naive_utc(); + Box::new( + base_query + .filter(messages::dsl::timestamp.lt(from_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) + .limit(limit) + .load_stream(&mut *connection_lock), + ) + .await + .expect("could not query messages") + .map_ok(|row| make_historical_event(&channel, row)) + .try_collect::>() + .await + .expect("could not parse all records") + .into_iter() + .rev() // need to reverse *after* applying the SQL LIMIT + .collect::>() } HistoryRequest::After { start_ts, limit } => { - todo!("after") + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + let start_ts = DateTime::from_timestamp(start_ts, 999_999) + .unwrap_or(DateTime::::MIN_UTC) + .naive_utc(); + Box::new( + base_query + .filter(messages::dsl::timestamp.gt(start_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp, messages::dsl::id)) + .limit(limit) + .load_stream(&mut *connection_lock), + ) + .await + .expect("could not query messages") + .map_ok(|row| make_historical_event(&channel, row)) + .try_collect::>() + .await + .expect("could not parse all records") } HistoryRequest::Around { around_ts, limit } => { todo!("around") @@ -161,9 +198,58 @@ impl<'a> HistoryService for PgHistoryService<'a> { end_ts, limit, } => { - todo!("between") + if start_ts <= end_ts { + let start_ts = DateTime::from_timestamp(start_ts, 999_999) + .unwrap_or(DateTime::::MIN_UTC) + .naive_utc(); + let end_ts = DateTime::from_timestamp(end_ts, 0) + .unwrap_or(DateTime::::MAX_UTC) + .naive_utc(); + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + Box::new( + base_query + .filter(messages::dsl::timestamp.gt(start_ts)) + .filter(messages::dsl::timestamp.lt(end_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp, messages::dsl::id)) + .limit(limit) + .load_stream(&mut *connection_lock), + ) + .await + .expect("could not query messages") + .map_ok(|row| make_historical_event(&channel, row)) + .try_collect::>() + .await + .expect("could not parse all records") + } else { + let start_ts = DateTime::from_timestamp(start_ts, 0) + .unwrap_or(DateTime::::MAX_UTC) + .naive_utc(); + let end_ts = DateTime::from_timestamp(end_ts, 999_999) + .unwrap_or(DateTime::::MIN_UTC) + .naive_utc(); + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + Box::new( + base_query + .filter(messages::dsl::timestamp.gt(end_ts)) + .filter(messages::dsl::timestamp.lt(start_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) + .limit(limit) + .load_stream(&mut *connection_lock), + ) + .await + .expect("could not query messages") + .map_ok(|row| make_historical_event(&channel, row)) + .try_collect::>() + .await + .expect("could not parse all records") + .into_iter() + .rev() // need to reverse *after* applying the SQL LIMIT + .collect::>() + } } - } + }) } } From 60a6facb0dc73ffca7208744d2687dd98fa57749 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Mon, 28 Oct 2024 20:23:39 +0100 Subject: [PATCH 16/22] Implement TieredHistoryService::list_targets --- sable_network/src/history/tiered_service.rs | 32 +++++++++++++++++---- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/sable_network/src/history/tiered_service.rs b/sable_network/src/history/tiered_service.rs index 859e727f..a4120a10 100644 --- a/sable_network/src/history/tiered_service.rs +++ b/sable_network/src/history/tiered_service.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::hash_map::{Entry, HashMap}; use tracing::instrument; @@ -40,15 +40,37 @@ impl, ) -> HashMap { match (&self.fast_service, &self.slow_service) { - (_, Some(slow_service)) => { - tracing::info!("list_target slow"); - // TODO: implement fallback + (Some(fast_service), Some(slow_service)) => { + let (mut targets1, mut targets2) = futures::join!( + slow_service.list_targets(user, after_ts, before_ts, limit), + fast_service.list_targets(user, after_ts, before_ts, limit) + ); + + // merge targets, taking the most recent timestamp for those present + // in both backends + if targets1.len() < targets2.len() { + (targets1, targets2) = (targets2, targets1); + } + for (target, ts) in targets2.drain() { + match targets1.entry(target) { + Entry::Occupied(mut entry) => { + if *entry.get() < ts { + entry.insert(ts); + } + } + Entry::Vacant(entry) => { + entry.insert(ts); + } + } + } + targets1 + } + (None, Some(slow_service)) => { slow_service .list_targets(user, after_ts, before_ts, limit) .await } (Some(fast_service), None) => { - tracing::info!("list_target fast"); fast_service .list_targets(user, after_ts, before_ts, limit) .await From bdcc1e684ea6388c36377ea208cbce76b8807fb4 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sat, 2 Nov 2024 12:08:22 +0100 Subject: [PATCH 17/22] Finish implementing TieredHistoryService --- sable_network/src/history/service.rs | 3 +- sable_network/src/history/tiered_service.rs | 74 +++++++++++++++++++-- 2 files changed, 72 insertions(+), 5 deletions(-) diff --git a/sable_network/src/history/service.rs b/sable_network/src/history/service.rs index 66d3e47a..0a56dd4d 100644 --- a/sable_network/src/history/service.rs +++ b/sable_network/src/history/service.rs @@ -98,7 +98,8 @@ pub trait HistoryService { user: UserId, target: TargetId, request: HistoryRequest, - ) -> impl Future, HistoryError>> + Send; + ) -> impl Future + Send, HistoryError>> + + Send; } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/sable_network/src/history/tiered_service.rs b/sable_network/src/history/tiered_service.rs index a4120a10..a7a5fb01 100644 --- a/sable_network/src/history/tiered_service.rs +++ b/sable_network/src/history/tiered_service.rs @@ -1,5 +1,6 @@ use std::collections::hash_map::{Entry, HashMap}; +use futures::TryFutureExt; use tracing::instrument; use crate::prelude::*; @@ -88,15 +89,80 @@ impl Result, HistoryError> { // It's tempting to return Box here instead of collecting into a // temporary Vec, but we can't because IntoIterator::IntoIter potentially differs + + macro_rules! get_entries { + ($service:expr, $user:expr, $target:expr, $request:expr) => { + $service + .get_entries($user, $target, $request) + .map_ok(|entries| -> Vec<_> { entries.into_iter().collect() }) + .await + }; + } + match (&self.fast_service, &self.slow_service) { - (_, Some(slow_service)) => { - // TODO: implement fallback - tracing::info!("get_entries slow"); + (Some(fast_service), Some(slow_service)) => { + match request { + HistoryRequest::Latest { limit, .. } | HistoryRequest::Before { limit, .. } => { + let mut entries = get_entries!(fast_service, user, target, request.clone()) + .unwrap_or_else(|e| { + tracing::error!("Could not get history from fast service: {e}"); + vec![] + }); + if entries.len() < limit { + // TODO: send a BEFORE request, and merge lists together + entries = get_entries!(slow_service, user, target, request)?; + } + Ok(entries) + } + HistoryRequest::After { start_ts, .. } => { + // Check if the fast-but-shortlived backend still has messages up to that + // timestamp + match fast_service + .get_entries( + user, + target, + HistoryRequest::Before { + from_ts: start_ts, + limit: 1, + }, + ) + .await + { + Ok(entries) => { + if entries.into_iter().count() > 0 { + // Yes, it does, so we don't need the slow_service to fulfill + // the request + match get_entries!(fast_service, user, target, request.clone()) + { + Ok(entries) => Ok(entries), + Err(e) => { + tracing::error!( + "Could not get history from fast service: {e}" + ); + get_entries!(slow_service, user, target, request) + } + } + } else { + get_entries!(slow_service, user, target, request) + } + } + Err(e) => { + tracing::error!("Could not get history from fast service: {e}"); + get_entries!(slow_service, user, target, request) + } + } + } + HistoryRequest::Around { .. } | HistoryRequest::Between { .. } => { + // TODO: try to use the fast_service when possible + get_entries!(slow_service, user, target, request) + } + } + } + (None, Some(slow_service)) => { let entries = slow_service.get_entries(user, target, request).await?; Ok(entries.into_iter().collect()) } (Some(fast_service), None) => { - tracing::info!("get_entries fast"); let entries = fast_service.get_entries(user, target, request).await?; Ok(entries.into_iter().collect()) } From 89143daacb37def37e104b1edc1a464acda27b47 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sat, 2 Nov 2024 13:44:41 +0100 Subject: [PATCH 18/22] Remove unused dependency on async-trait --- Cargo.lock | 1 - sable_network/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a874521c..2f9df6fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2489,7 +2489,6 @@ dependencies = [ "ambassador", "anyhow", "arrayvec", - "async-trait", "backoff", "bitflags 1.3.2", "built", diff --git a/sable_network/Cargo.toml b/sable_network/Cargo.toml index 310e65c8..d25615a2 100644 --- a/sable_network/Cargo.toml +++ b/sable_network/Cargo.toml @@ -21,7 +21,6 @@ pretty_assertions = "1.4" [dependencies] sable_macros = { path = "../sable_macros" } -async-trait = "0.1.83" tracing = "0.1" thiserror = "1" serde_json = "1" From 0b245bf9e0953013319d5ce870590688956df62b Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sat, 2 Nov 2024 13:44:53 +0100 Subject: [PATCH 19/22] Remove unused enum HistoryMessage --- sable_network/src/history/mod.rs | 4 ---- sable_network/src/history/service.rs | 2 ++ 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sable_network/src/history/mod.rs b/sable_network/src/history/mod.rs index 886603b5..2d5dc38d 100644 --- a/sable_network/src/history/mod.rs +++ b/sable_network/src/history/mod.rs @@ -26,7 +26,3 @@ impl HistoryItem for HistoryLogEntry { &self.details } } - -/// A more concrete representation of [`HistoryItem`], with all its fields inflated -/// to strings that will be sent to the client -pub enum HistoryMessage {} diff --git a/sable_network/src/history/service.rs b/sable_network/src/history/service.rs index 0a56dd4d..fcf3702c 100644 --- a/sable_network/src/history/service.rs +++ b/sable_network/src/history/service.rs @@ -102,6 +102,8 @@ pub trait HistoryService { + Send; } +/// A more concrete representation of `sable_ircd`'s `HistoryItem`, with all its fields +/// inflated to strings that will be sent to the client #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum HistoricalEvent { Message { From 61f78eba22ef622054bad804f72ecc465697ab8d Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sat, 2 Nov 2024 16:09:11 +0100 Subject: [PATCH 20/22] Remove panics --- sable_history/src/pg_history_service.rs | 109 ++++++++++++------ .../src/command/handlers/chathistory.rs | 6 + sable_network/src/history/service.rs | 2 + 3 files changed, 81 insertions(+), 36 deletions(-) diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index 009f8430..dc97a70b 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use anyhow::{bail, Result}; use chrono::{DateTime, NaiveDateTime, Utc}; use diesel::dsl::sql; use diesel::prelude::*; @@ -49,27 +50,27 @@ impl<'a> HistoryService for PgHistoryService<'a> { tracing::error!("Could not get history channels: {e}"); return HashMap::new(); } - Ok(rows) => { - rows.map(|row| row.expect("Could not deserialize row")) - .map( - |(channel_id, max_message_id): (i64, Uuid)| -> (TargetId, i64) { - let (seconds, _) = max_message_id - .get_timestamp() - .expect("messages.id is not a UUID7") - .to_unix(); - ( - TargetId::Channel(ChannelId::from(Snowflake::from( - u64::try_from(channel_id).expect("channel id is negative"), - ))), - seconds - .try_into() - .expect("message's UNIX timestamp is negative"), - ) - }, - ) - .collect() - .await - } + Ok(rows) => rows + .map(|row| -> Result<(TargetId, i64)> { + let (channel_id, max_message_id): (i64, Uuid) = row?; + let channel = + TargetId::Channel(ChannelId::from(Snowflake::from(channel_id as u64))); + + let Some(ts) = max_message_id.get_timestamp() else { + bail!("messages.id should be a UUID7, not {max_message_id}"); + }; + let (seconds, _) = ts.to_unix(); + let Ok(seconds) = seconds.try_into() else { + bail!("message {max_message_id}'s UNIX timestamp is negative"); + }; + Ok((channel, seconds)) + }) + .try_collect() + .await + .unwrap_or_else(|e| { + tracing::error!("Could not read rows: {e}"); + HashMap::new() + }), } } @@ -87,16 +88,22 @@ impl<'a> HistoryService for PgHistoryService<'a> { let mut connection_lock = self.database_connection.lock().await; - let db_channel_id = i64::try_from(channel_id.as_u64()).expect("channel id overflows u64"); - let Some(channel) = channels::dsl::channels + let db_channel_id = channel_id.as_u64() as i64; + let channel = match channels::dsl::channels .find(db_channel_id) .select(crate::models::Channel::as_select()) .first(&mut *connection_lock) .await .optional() - .expect("Could not check if channel exists") - else { - return Err(HistoryError::InvalidTarget(target)); + { + Ok(Some(channel)) => channel, + Ok(None) => return Err(HistoryError::InvalidTarget(target)), + Err(e) => { + tracing::error!("Could not check if channel exists: {e}"); + return Err(HistoryError::InternalError( + "Could not check if channel exists".to_string(), + )); + } }; let base_query = messages::dsl::messages @@ -138,11 +145,17 @@ impl<'a> HistoryService for PgHistoryService<'a> { ), } .await - .expect("could not query messages") + .map_err(|e| { + tracing::error!("Could not query messages: {e}"); + HistoryError::InternalError("Could not query messages".to_string()) + })? .map_ok(|row| make_historical_event(&channel, row)) .try_collect::>() .await - .expect("could not parse all records") + .map_err(|e| { + tracing::error!("Could not parse messages: {e}"); + HistoryError::InternalError("Could not parse message".to_string()) + })? .into_iter() .rev() // need to reverse *after* applying the SQL LIMIT .collect::>() @@ -161,11 +174,17 @@ impl<'a> HistoryService for PgHistoryService<'a> { .load_stream(&mut *connection_lock), ) .await - .expect("could not query messages") + .map_err(|e| { + tracing::error!("Could not query messages: {e}"); + HistoryError::InternalError("Could not query messages".to_string()) + })? .map_ok(|row| make_historical_event(&channel, row)) .try_collect::>() .await - .expect("could not parse all records") + .map_err(|e| { + tracing::error!("Could not parse messages: {e}"); + HistoryError::InternalError("Could not parse message".to_string()) + })? .into_iter() .rev() // need to reverse *after* applying the SQL LIMIT .collect::>() @@ -184,11 +203,17 @@ impl<'a> HistoryService for PgHistoryService<'a> { .load_stream(&mut *connection_lock), ) .await - .expect("could not query messages") + .map_err(|e| { + tracing::error!("Could not query messages: {e}"); + HistoryError::InternalError("Could not query messages".to_string()) + })? .map_ok(|row| make_historical_event(&channel, row)) .try_collect::>() .await - .expect("could not parse all records") + .map_err(|e| { + tracing::error!("Could not parse messages: {e}"); + HistoryError::InternalError("Could not parse message".to_string()) + })? } HistoryRequest::Around { around_ts, limit } => { todo!("around") @@ -216,11 +241,17 @@ impl<'a> HistoryService for PgHistoryService<'a> { .load_stream(&mut *connection_lock), ) .await - .expect("could not query messages") + .map_err(|e| { + tracing::error!("Could not query messages: {e}"); + HistoryError::InternalError("Could not query messages".to_string()) + })? .map_ok(|row| make_historical_event(&channel, row)) .try_collect::>() .await - .expect("could not parse all records") + .map_err(|e| { + tracing::error!("Could not parse messages: {e}"); + HistoryError::InternalError("Could not parse message".to_string()) + })? } else { let start_ts = DateTime::from_timestamp(start_ts, 0) .unwrap_or(DateTime::::MAX_UTC) @@ -239,11 +270,17 @@ impl<'a> HistoryService for PgHistoryService<'a> { .load_stream(&mut *connection_lock), ) .await - .expect("could not query messages") + .map_err(|e| { + tracing::error!("Could not query messages: {e}"); + HistoryError::InternalError("Could not query messages".to_string()) + })? .map_ok(|row| make_historical_event(&channel, row)) .try_collect::>() .await - .expect("could not parse all records") + .map_err(|e| { + tracing::error!("Could not parse messages: {e}"); + HistoryError::InternalError("Could not parse message".to_string()) + })? .into_iter() .rev() // need to reverse *after* applying the SQL LIMIT .collect::>() diff --git a/sable_ircd/src/command/handlers/chathistory.rs b/sable_ircd/src/command/handlers/chathistory.rs index 315539e8..0d33d92f 100644 --- a/sable_ircd/src/command/handlers/chathistory.rs +++ b/sable_ircd/src/command/handlers/chathistory.rs @@ -144,6 +144,12 @@ async fn handle_chathistory( { Ok(entries) => send_history_entries(server, response, target, entries)?, Err(HistoryError::InvalidTarget(_)) => Err(invalid_target_error())?, + Err(HistoryError::InternalError(e)) => Err(CommandError::Fail { + command: "CHATHISTORY", + code: "MESSAGE_ERROR", + context: format!("{} {}", subcommand, target), + description: e, + })?, }; } } diff --git a/sable_network/src/history/service.rs b/sable_network/src/history/service.rs index fcf3702c..4af67f04 100644 --- a/sable_network/src/history/service.rs +++ b/sable_network/src/history/service.rs @@ -78,6 +78,8 @@ pub enum HistoryRequest { pub enum HistoryError { #[error("invalid target: {0:?}")] InvalidTarget(TargetId), + #[error("internal server error: {0:?}")] + InternalError(String), } /// A backend implementation of [IRCv3 CHATHISTORY](https://ircv3.net/specs/extensions/chathistory) From a8f711b28921d0046392573fcd37af0ca139a4c4 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sat, 9 Nov 2024 15:45:46 +0100 Subject: [PATCH 21/22] Deduplicate query collection --- sable_history/src/pg_history_service.rs | 180 +++++++++++------------- 1 file changed, 81 insertions(+), 99 deletions(-) diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index dc97a70b..d9d43e26 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -119,7 +119,7 @@ impl<'a> HistoryService for PgHistoryService<'a> { historic_users::dsl::account_name, )) .filter(messages::dsl::target_channel.eq(db_channel_id)); - Ok(match request { + match request { HistoryRequest::Latest { to_ts, limit } => { let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); match to_ts { @@ -127,93 +127,65 @@ impl<'a> HistoryService for PgHistoryService<'a> { let to_ts = DateTime::from_timestamp(to_ts, 999_999) .unwrap_or(DateTime::::MIN_UTC) .naive_utc(); - Box::new( + collect_query( + connection_lock, + &channel, + true, // reverse base_query .filter(messages::dsl::timestamp.gt(to_ts)) // total order, consistent across requests .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) - .limit(limit) - .load_stream(&mut *connection_lock), + .limit(limit), ) + .await + } + None => { + collect_query( + connection_lock, + &channel, + true, // reverse + base_query + // total order, consistent across requests + .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) + .limit(limit), + ) + .await } - None => Box::new( - base_query - // total order, consistent across requests - .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) - .limit(limit) - .load_stream(&mut *connection_lock), - ), } - .await - .map_err(|e| { - tracing::error!("Could not query messages: {e}"); - HistoryError::InternalError("Could not query messages".to_string()) - })? - .map_ok(|row| make_historical_event(&channel, row)) - .try_collect::>() - .await - .map_err(|e| { - tracing::error!("Could not parse messages: {e}"); - HistoryError::InternalError("Could not parse message".to_string()) - })? - .into_iter() - .rev() // need to reverse *after* applying the SQL LIMIT - .collect::>() } HistoryRequest::Before { from_ts, limit } => { let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); let from_ts = DateTime::from_timestamp(from_ts, 0) .unwrap_or(DateTime::::MAX_UTC) .naive_utc(); - Box::new( + collect_query( + connection_lock, + &channel, + true, // reverse base_query .filter(messages::dsl::timestamp.lt(from_ts)) // total order, consistent across requests .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) - .limit(limit) - .load_stream(&mut *connection_lock), + .limit(limit), ) .await - .map_err(|e| { - tracing::error!("Could not query messages: {e}"); - HistoryError::InternalError("Could not query messages".to_string()) - })? - .map_ok(|row| make_historical_event(&channel, row)) - .try_collect::>() - .await - .map_err(|e| { - tracing::error!("Could not parse messages: {e}"); - HistoryError::InternalError("Could not parse message".to_string()) - })? - .into_iter() - .rev() // need to reverse *after* applying the SQL LIMIT - .collect::>() } HistoryRequest::After { start_ts, limit } => { let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); let start_ts = DateTime::from_timestamp(start_ts, 999_999) .unwrap_or(DateTime::::MIN_UTC) .naive_utc(); - Box::new( + collect_query( + connection_lock, + &channel, + false, // don't reverse base_query .filter(messages::dsl::timestamp.gt(start_ts)) // total order, consistent across requests .order((messages::dsl::timestamp, messages::dsl::id)) - .limit(limit) - .load_stream(&mut *connection_lock), + .limit(limit), ) .await - .map_err(|e| { - tracing::error!("Could not query messages: {e}"); - HistoryError::InternalError("Could not query messages".to_string()) - })? - .map_ok(|row| make_historical_event(&channel, row)) - .try_collect::>() - .await - .map_err(|e| { - tracing::error!("Could not parse messages: {e}"); - HistoryError::InternalError("Could not parse message".to_string()) - })? } HistoryRequest::Around { around_ts, limit } => { todo!("around") @@ -231,27 +203,18 @@ impl<'a> HistoryService for PgHistoryService<'a> { .unwrap_or(DateTime::::MAX_UTC) .naive_utc(); let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); - Box::new( + collect_query( + connection_lock, + &channel, + false, // don't reverse base_query .filter(messages::dsl::timestamp.gt(start_ts)) .filter(messages::dsl::timestamp.lt(end_ts)) // total order, consistent across requests .order((messages::dsl::timestamp, messages::dsl::id)) - .limit(limit) - .load_stream(&mut *connection_lock), + .limit(limit), ) .await - .map_err(|e| { - tracing::error!("Could not query messages: {e}"); - HistoryError::InternalError("Could not query messages".to_string()) - })? - .map_ok(|row| make_historical_event(&channel, row)) - .try_collect::>() - .await - .map_err(|e| { - tracing::error!("Could not parse messages: {e}"); - HistoryError::InternalError("Could not parse message".to_string()) - })? } else { let start_ts = DateTime::from_timestamp(start_ts, 0) .unwrap_or(DateTime::::MAX_UTC) @@ -260,48 +223,67 @@ impl<'a> HistoryService for PgHistoryService<'a> { .unwrap_or(DateTime::::MIN_UTC) .naive_utc(); let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); - Box::new( + collect_query( + connection_lock, + &channel, + true, // reverse base_query .filter(messages::dsl::timestamp.gt(end_ts)) .filter(messages::dsl::timestamp.lt(start_ts)) // total order, consistent across requests .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) - .limit(limit) - .load_stream(&mut *connection_lock), + .limit(limit), ) .await - .map_err(|e| { - tracing::error!("Could not query messages: {e}"); - HistoryError::InternalError("Could not query messages".to_string()) - })? - .map_ok(|row| make_historical_event(&channel, row)) - .try_collect::>() - .await - .map_err(|e| { - tracing::error!("Could not parse messages: {e}"); - HistoryError::InternalError("Could not parse message".to_string()) - })? - .into_iter() - .rev() // need to reverse *after* applying the SQL LIMIT - .collect::>() } } - }) + } } } +type JoinedMessageRow = ( + uuid::Uuid, + NaiveDateTime, + crate::types::MessageType, + String, + String, + String, + String, + Option, +); + +async fn collect_query<'query>( + mut connection: tokio::sync::MutexGuard<'_, AsyncPgConnection>, + channel: &crate::models::Channel, + reverse: bool, + query: impl diesel_async::RunQueryDsl + + diesel_async::methods::LoadQuery<'query, AsyncPgConnection, JoinedMessageRow> + + 'query, +) -> Result, HistoryError> { + let events = query + .load_stream(&mut *connection) + .await + .map_err(|e| { + tracing::error!("Could not query messages: {e}"); + HistoryError::InternalError("Could not query messages".to_string()) + })? + .map_ok(|row| make_historical_event(channel, row)) + .try_collect::>() + .await + .map_err(|e| { + tracing::error!("Could not parse messages: {e}"); + HistoryError::InternalError("Could not parse message".to_string()) + })?; + Ok(if reverse { + events.into_iter().rev().collect() + } else { + events + }) +} + fn make_historical_event( channel: &crate::models::Channel, - (id, timestamp, message_type, text, source_nick, source_ident, source_vhost, source_account): ( - uuid::Uuid, - NaiveDateTime, - crate::types::MessageType, - String, - String, - String, - String, - Option, - ), + (id, timestamp, message_type, text, source_nick, source_ident, source_vhost, source_account): JoinedMessageRow, ) -> HistoricalEvent { HistoricalEvent::Message { id: MessageId::new(id.try_into().expect("Message id is a non-v7 UUID")), From ffd8f835e780340cf9d4f0f4452a09d296c8d4bf Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sun, 10 Nov 2024 11:05:19 +0100 Subject: [PATCH 22/22] Implement Around --- sable_history/src/pg_history_service.rs | 31 ++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index d9d43e26..b151ad27 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -188,7 +188,36 @@ impl<'a> HistoryService for PgHistoryService<'a> { .await } HistoryRequest::Around { around_ts, limit } => { - todo!("around") + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + let around_ts = DateTime::from_timestamp(around_ts, 0) + .unwrap_or(DateTime::::MIN_UTC) + .naive_utc(); + collect_query( + connection_lock, + &channel, + false, // don't reverse + CombineDsl::union( + base_query + .filter(messages::dsl::timestamp.le(around_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) + .limit(limit), + base_query + .filter(messages::dsl::timestamp.gt(around_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp, messages::dsl::id)) + .limit(limit), + ), + ) + .await + .map(|mut events| { + // TODO: make postgresql sort it, it may be able to do it directly from + // the index scan instead of sorting after the union + events.sort_unstable_by_key(|event| match event { + HistoricalEvent::Message { id, timestamp, .. } => (*timestamp, *id), + }); + events + }) } HistoryRequest::Between { start_ts,