From c6e4dda41fb37b1407c733a00194949ebcdada6d Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Tue, 9 Jul 2024 16:29:39 +0200 Subject: [PATCH 01/27] poem gateway experimentation --- src/gateway/mod.rs | 54 ++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 9 ++++---- 2 files changed, 59 insertions(+), 4 deletions(-) diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index e69de29..dde9e26 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -0,0 +1,54 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use futures::{SinkExt, StreamExt}; +use log::{debug, info}; +use poem::listener::TcpListener; +use poem::web::websocket::{Message, WebSocket}; +use poem::web::Data; +use poem::{get, handler, EndpointExt, IntoResponse, Route, Server}; + +use crate::errors::Error; + +#[handler] +fn ws(ws: WebSocket, sender: Data<&tokio::sync::broadcast::Sender>) -> impl IntoResponse { + let sender = sender.clone(); + let mut receiver = sender.subscribe(); + ws.on_upgrade(move |socket| async move { + let (mut sink, mut stream) = socket.split(); + + tokio::spawn(async move { + while let Some(Ok(msg)) = stream.next().await { + if let Message::Text(text) = msg { + if sender.send(Message::text(text)).is_err() { + break; + } + } + } + }); + + tokio::spawn(async move { + while let Ok(msg) = receiver.recv().await { + if sink.send(msg).await.is_err() { + break; + } + } + }); + }) +} + +pub async fn start_gateway() -> Result<(), Error> { + info!(target: "symfonia::gateway", "Starting gateway server"); + let ws_app = Route::new().at( + "/", + get(ws.data(tokio::sync::mpsc::channel::(32).0)), + ); + let bind = std::env::var("API_BIND").unwrap_or_else(|_| String::from("localhost:3001")); + debug!(target: "symfonia::gateway", "Binding to {}", bind); + Server::new(TcpListener::bind("0.0.0.0:3000")) + .run(ws_app) + .await?; + debug!(target: "symfonia::gateway", "Gateway server started"); + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index 3da7acd..9478575 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,20 +1,20 @@ use clap::Parser; +use log::LevelFilter; use log4rs::{ append::{ console::{ConsoleAppender, Target}, rolling_file::{ policy::compound::{ - CompoundPolicy, roll::delete::DeleteRoller, trigger::size::SizeTrigger, + roll::delete::DeleteRoller, trigger::size::SizeTrigger, CompoundPolicy, }, RollingFileAppender, }, }, config::{Appender, Logger, Root}, - Config, encode::pattern::PatternEncoder, filter::Filter, + Config, }; -use log::LevelFilter; mod api; mod cdn; @@ -124,7 +124,7 @@ async fn main() { .logger( Logger::builder() .appender("gateway") - .build("symfonia::gateway", LevelFilter::Info), + .build("symfonia::gateway", LevelFilter::Debug), ) .build(Root::builder().appender("stdout").build({ let mode = std::env::var("MODE").unwrap_or("DEBUG".to_string()); @@ -141,6 +141,7 @@ async fn main() { log::info!(target: "symfonia", "Starting up Symfonia"); + gateway::start_gateway().await.unwrap(); // TODO: This should be near api::start... log::info!(target: "symfonia::db", "Establishing database connection"); let db = database::establish_connection() .await From 76e3e303480cec20d1f45b9c99b112719307644a Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Tue, 9 Jul 2024 22:42:17 +0200 Subject: [PATCH 02/27] create `start_server`, moving server starting logic to main.rs from api module --- src/main.rs | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9478575..cf03cfc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,12 @@ use log4rs::{ filter::Filter, Config, }; +use poem::listener::TcpListener; +use poem::middleware::{NormalizePath, TrailingSlash}; +use poem::web::Json; +use poem::{EndpointExt, IntoResponse, Route, Server}; +use serde_json::json; +use sqlx::MySqlPool; mod api; mod cdn; @@ -23,6 +29,8 @@ mod errors; mod gateway; mod util; +pub type PathRouteTuple = (String, Route); + #[derive(Debug)] struct LogFilter; @@ -141,7 +149,6 @@ async fn main() { log::info!(target: "symfonia", "Starting up Symfonia"); - gateway::start_gateway().await.unwrap(); // TODO: This should be near api::start... log::info!(target: "symfonia::db", "Establishing database connection"); let db = database::establish_connection() .await @@ -181,6 +188,50 @@ async fn main() { .await .expect("Failed to seed config"); } + let bind = std::env::var("API_BIND").unwrap_or_else(|_| String::from("localhost:3001")); + let api_route = api::setup_api(); + let gateway_route = gateway::setup_gateway(); + start_server(vec![api_route, gateway_route], &bind, db) + .await + .expect("Failed to start server") +} + +async fn start_server( + routes: Vec<(impl AsRef, Route)>, + addr: &impl ToString, + db: MySqlPool, +) -> Result<(), crate::errors::Error> { + let mut app_routes = Route::new(); + let config = crate::database::entities::Config::init(&db).await?; + if config.sentry.enabled { + let _guard = sentry::init(( + "https://241c6fb08adb469da1bb82522b25c99f@sentry.quartzinc.space/3", + sentry::ClientOptions { + release: sentry::release_name!(), + traces_sample_rate: config.sentry.trace_sample_rate as f32, + ..Default::default() + }, + )); + } + for (path, route) in routes.into_iter() { + app_routes = app_routes.nest(path, route); + } + let app = app_routes + .data(db) + .data(config) + .with(NormalizePath::new(TrailingSlash::Trim)) + .catch_all_error(custom_error); + log::info!(target: "symfonia::api", "Starting HTTP Server"); + Server::new(TcpListener::bind(addr.to_string())) + .run(app) + .await?; + Ok(()) +} - api::start_api(db).await.unwrap(); +async fn custom_error(err: poem::Error) -> impl IntoResponse { + Json(json! ({ + "success": false, + "message": err.to_string(), + })) + .with_status(err.status()) } From f46abc2b1512bf0cf783b9f05cd149878f9e99b7 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Tue, 9 Jul 2024 22:42:28 +0200 Subject: [PATCH 03/27] Move poem::Server starting logic --- src/api/mod.rs | 59 ++++++-------------------------------------------- 1 file changed, 7 insertions(+), 52 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index 138049b..0bc9494 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,41 +1,15 @@ -use poem::{ - EndpointExt, - IntoResponse, - listener::TcpListener, - middleware::{NormalizePath, TrailingSlash}, Route, Server, web::Json, -}; -use serde_json::json; -use sqlx::MySqlPool; +use poem::{EndpointExt, Route}; -use crate::{ - api::{ - middleware::{ - authentication::AuthenticationMiddleware, current_user::CurrentUserMiddleware, - }, - routes::{auth, channels, guilds, users}, - }, - database::entities::Config, - errors::Error, +use crate::api::{ + middleware::{authentication::AuthenticationMiddleware, current_user::CurrentUserMiddleware}, + routes::{auth, channels, guilds, users}, }; +use crate::PathRouteTuple; mod middleware; mod routes; -pub async fn start_api(db: MySqlPool) -> Result<(), Error> { - log::info!(target: "symfonia::api::cfg", "Loading configuration"); - let config = Config::init(&db).await?; - - if config.sentry.enabled { - let _guard = sentry::init(( - "https://241c6fb08adb469da1bb82522b25c99f@sentry.quartzinc.space/3", - sentry::ClientOptions { - release: sentry::release_name!(), - traces_sample_rate: config.sentry.trace_sample_rate as f32, - ..Default::default() - }, - )); - } - +pub fn setup_api() -> PathRouteTuple { let routes = Route::new() .nest("/auth", auth::setup_routes()) .nest( @@ -65,24 +39,5 @@ pub async fn start_api(db: MySqlPool) -> Result<(), Error> { .nest("/policies", routes::policies::setup_routes()) .nest("/-", routes::health::setup_routes()); - let v9_api = Route::new() - .nest("/api/v9", routes) - .data(db) - .data(config) - .with(NormalizePath::new(TrailingSlash::Trim)) - .catch_all_error(custom_error); - - let bind = std::env::var("API_BIND").unwrap_or_else(|_| String::from("localhost:3001")); - - log::info!(target: "symfonia::api", "Starting HTTP Server"); - Server::new(TcpListener::bind(bind)).run(v9_api).await?; - Ok(()) -} - -async fn custom_error(err: poem::Error) -> impl IntoResponse { - Json(json! ({ - "success": false, - "message": err.to_string(), - })) - .with_status(err.status()) + ("/api/v9".to_string(), routes) } From 62ab1f3ec330b98967a3ba98749087a28b1fe4b9 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Tue, 9 Jul 2024 22:42:49 +0200 Subject: [PATCH 04/27] Change start_gateway() to setup_gateway() --- src/gateway/mod.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index dde9e26..cf0c2bf 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -10,6 +10,7 @@ use poem::web::Data; use poem::{get, handler, EndpointExt, IntoResponse, Route, Server}; use crate::errors::Error; +use crate::PathRouteTuple; #[handler] fn ws(ws: WebSocket, sender: Data<&tokio::sync::broadcast::Sender>) -> impl IntoResponse { @@ -38,17 +39,10 @@ fn ws(ws: WebSocket, sender: Data<&tokio::sync::broadcast::Sender>) -> }) } -pub async fn start_gateway() -> Result<(), Error> { - info!(target: "symfonia::gateway", "Starting gateway server"); - let ws_app = Route::new().at( +pub fn setup_gateway() -> PathRouteTuple { + let ws_route = Route::new().at( "/", get(ws.data(tokio::sync::mpsc::channel::(32).0)), ); - let bind = std::env::var("API_BIND").unwrap_or_else(|_| String::from("localhost:3001")); - debug!(target: "symfonia::gateway", "Binding to {}", bind); - Server::new(TcpListener::bind("0.0.0.0:3000")) - .run(ws_app) - .await?; - debug!(target: "symfonia::gateway", "Gateway server started"); - Ok(()) + ("/".to_string(), ws_route) } From cef98f42fa00a726abd2a0a4db76fb318e93b534 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Wed, 10 Jul 2024 17:00:32 +0200 Subject: [PATCH 05/27] Revert "Change start_gateway() to setup_gateway()" This reverts commit 62ab1f3ec330b98967a3ba98749087a28b1fe4b9. --- src/gateway/mod.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index cf0c2bf..dde9e26 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -10,7 +10,6 @@ use poem::web::Data; use poem::{get, handler, EndpointExt, IntoResponse, Route, Server}; use crate::errors::Error; -use crate::PathRouteTuple; #[handler] fn ws(ws: WebSocket, sender: Data<&tokio::sync::broadcast::Sender>) -> impl IntoResponse { @@ -39,10 +38,17 @@ fn ws(ws: WebSocket, sender: Data<&tokio::sync::broadcast::Sender>) -> }) } -pub fn setup_gateway() -> PathRouteTuple { - let ws_route = Route::new().at( +pub async fn start_gateway() -> Result<(), Error> { + info!(target: "symfonia::gateway", "Starting gateway server"); + let ws_app = Route::new().at( "/", get(ws.data(tokio::sync::mpsc::channel::(32).0)), ); - ("/".to_string(), ws_route) + let bind = std::env::var("API_BIND").unwrap_or_else(|_| String::from("localhost:3001")); + debug!(target: "symfonia::gateway", "Binding to {}", bind); + Server::new(TcpListener::bind("0.0.0.0:3000")) + .run(ws_app) + .await?; + debug!(target: "symfonia::gateway", "Gateway server started"); + Ok(()) } From ce38cad6b632abb93125710d75820bdca74c881c Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Wed, 10 Jul 2024 17:00:36 +0200 Subject: [PATCH 06/27] Revert "Move poem::Server starting logic" This reverts commit f46abc2b1512bf0cf783b9f05cd149878f9e99b7. --- src/api/mod.rs | 59 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index 0bc9494..138049b 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,15 +1,41 @@ -use poem::{EndpointExt, Route}; +use poem::{ + EndpointExt, + IntoResponse, + listener::TcpListener, + middleware::{NormalizePath, TrailingSlash}, Route, Server, web::Json, +}; +use serde_json::json; +use sqlx::MySqlPool; -use crate::api::{ - middleware::{authentication::AuthenticationMiddleware, current_user::CurrentUserMiddleware}, - routes::{auth, channels, guilds, users}, +use crate::{ + api::{ + middleware::{ + authentication::AuthenticationMiddleware, current_user::CurrentUserMiddleware, + }, + routes::{auth, channels, guilds, users}, + }, + database::entities::Config, + errors::Error, }; -use crate::PathRouteTuple; mod middleware; mod routes; -pub fn setup_api() -> PathRouteTuple { +pub async fn start_api(db: MySqlPool) -> Result<(), Error> { + log::info!(target: "symfonia::api::cfg", "Loading configuration"); + let config = Config::init(&db).await?; + + if config.sentry.enabled { + let _guard = sentry::init(( + "https://241c6fb08adb469da1bb82522b25c99f@sentry.quartzinc.space/3", + sentry::ClientOptions { + release: sentry::release_name!(), + traces_sample_rate: config.sentry.trace_sample_rate as f32, + ..Default::default() + }, + )); + } + let routes = Route::new() .nest("/auth", auth::setup_routes()) .nest( @@ -39,5 +65,24 @@ pub fn setup_api() -> PathRouteTuple { .nest("/policies", routes::policies::setup_routes()) .nest("/-", routes::health::setup_routes()); - ("/api/v9".to_string(), routes) + let v9_api = Route::new() + .nest("/api/v9", routes) + .data(db) + .data(config) + .with(NormalizePath::new(TrailingSlash::Trim)) + .catch_all_error(custom_error); + + let bind = std::env::var("API_BIND").unwrap_or_else(|_| String::from("localhost:3001")); + + log::info!(target: "symfonia::api", "Starting HTTP Server"); + Server::new(TcpListener::bind(bind)).run(v9_api).await?; + Ok(()) +} + +async fn custom_error(err: poem::Error) -> impl IntoResponse { + Json(json! ({ + "success": false, + "message": err.to_string(), + })) + .with_status(err.status()) } From 9129141209348a6b3d48cf6c5cf7655a893aa63a Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Wed, 10 Jul 2024 17:00:48 +0200 Subject: [PATCH 07/27] Revert "create `start_server`, moving server starting logic to main.rs from api module" This reverts commit 76e3e303480cec20d1f45b9c99b112719307644a. --- src/main.rs | 55 ++--------------------------------------------------- 1 file changed, 2 insertions(+), 53 deletions(-) diff --git a/src/main.rs b/src/main.rs index cf03cfc..9478575 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,12 +15,6 @@ use log4rs::{ filter::Filter, Config, }; -use poem::listener::TcpListener; -use poem::middleware::{NormalizePath, TrailingSlash}; -use poem::web::Json; -use poem::{EndpointExt, IntoResponse, Route, Server}; -use serde_json::json; -use sqlx::MySqlPool; mod api; mod cdn; @@ -29,8 +23,6 @@ mod errors; mod gateway; mod util; -pub type PathRouteTuple = (String, Route); - #[derive(Debug)] struct LogFilter; @@ -149,6 +141,7 @@ async fn main() { log::info!(target: "symfonia", "Starting up Symfonia"); + gateway::start_gateway().await.unwrap(); // TODO: This should be near api::start... log::info!(target: "symfonia::db", "Establishing database connection"); let db = database::establish_connection() .await @@ -188,50 +181,6 @@ async fn main() { .await .expect("Failed to seed config"); } - let bind = std::env::var("API_BIND").unwrap_or_else(|_| String::from("localhost:3001")); - let api_route = api::setup_api(); - let gateway_route = gateway::setup_gateway(); - start_server(vec![api_route, gateway_route], &bind, db) - .await - .expect("Failed to start server") -} - -async fn start_server( - routes: Vec<(impl AsRef, Route)>, - addr: &impl ToString, - db: MySqlPool, -) -> Result<(), crate::errors::Error> { - let mut app_routes = Route::new(); - let config = crate::database::entities::Config::init(&db).await?; - if config.sentry.enabled { - let _guard = sentry::init(( - "https://241c6fb08adb469da1bb82522b25c99f@sentry.quartzinc.space/3", - sentry::ClientOptions { - release: sentry::release_name!(), - traces_sample_rate: config.sentry.trace_sample_rate as f32, - ..Default::default() - }, - )); - } - for (path, route) in routes.into_iter() { - app_routes = app_routes.nest(path, route); - } - let app = app_routes - .data(db) - .data(config) - .with(NormalizePath::new(TrailingSlash::Trim)) - .catch_all_error(custom_error); - log::info!(target: "symfonia::api", "Starting HTTP Server"); - Server::new(TcpListener::bind(addr.to_string())) - .run(app) - .await?; - Ok(()) -} -async fn custom_error(err: poem::Error) -> impl IntoResponse { - Json(json! ({ - "success": false, - "message": err.to_string(), - })) - .with_status(err.status()) + api::start_api(db).await.unwrap(); } From 5aea0534c9530541c437e58e84e694201c6b46f8 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 12 Jul 2024 23:00:28 +0200 Subject: [PATCH 08/27] Add `tokio-tungstenite` dependency, remove `openssl` where possible in favor of `rustls` --- Cargo.toml | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ac88f53..b8d498a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,23 +28,35 @@ log4rs = { version = "1.3.0", features = [ num-bigint = "0.4.5" num-traits = "0.2.19" openssl = "0.10.64" -poem = { version = "3.0.1", features = ["websocket"] } +poem = "3.0.1" utoipa = { version = "5.0.0-alpha.0", features = [] } rand = "0.8.5" regex = "1.10.4" -reqwest = "0.12.4" +reqwest = { version = "0.12.5", default-features = false, features = [ + "http2", + "macos-system-configuration", + "charset", + "rustls-tls-webpki-roots", +] } serde = { version = "1.0.203", features = ["derive"] } serde_json = { version = "1.0.117", features = ["raw_value"] } sqlx = { version = "0.7.4", features = [ "json", "chrono", "ipnetwork", - "runtime-tokio-native-tls", + "runtime-tokio-rustls", "any", ] } thiserror = "1.0.61" tokio = { version = "1.38.0", features = ["full"] } -sentry = "0.33.0" +sentry = { version = "0.34.0", default-features = false, features = [ + "backtrace", + "contexts", + "debug-images", + "panic", + "reqwest", + "rustls", +] } clap = { version = "4.5.4", features = ["derive"] } chorus = { git = "http://github.com/polyphony-chat/chorus", rev = "d591616", features = [ @@ -54,3 +66,6 @@ serde_path_to_error = "0.1.16" percent-encoding = "2.3.1" hex = "0.4.3" itertools = "0.13.0" +tokio-tungstenite = { version = "0.23.1", features = [ + "rustls-tls-webpki-roots", +] } From 43ca8e2223caadb0fc56b170d027f8ecc9c5b844 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 12 Jul 2024 23:00:52 +0200 Subject: [PATCH 09/27] Revert poem gateway implementation changes --- src/gateway/mod.rs | 47 ++-------------------------------------------- 1 file changed, 2 insertions(+), 45 deletions(-) diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index dde9e26..a729002 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -1,54 +1,11 @@ +use log::info; + // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. - -use futures::{SinkExt, StreamExt}; -use log::{debug, info}; -use poem::listener::TcpListener; -use poem::web::websocket::{Message, WebSocket}; -use poem::web::Data; -use poem::{get, handler, EndpointExt, IntoResponse, Route, Server}; - use crate::errors::Error; -#[handler] -fn ws(ws: WebSocket, sender: Data<&tokio::sync::broadcast::Sender>) -> impl IntoResponse { - let sender = sender.clone(); - let mut receiver = sender.subscribe(); - ws.on_upgrade(move |socket| async move { - let (mut sink, mut stream) = socket.split(); - - tokio::spawn(async move { - while let Some(Ok(msg)) = stream.next().await { - if let Message::Text(text) = msg { - if sender.send(Message::text(text)).is_err() { - break; - } - } - } - }); - - tokio::spawn(async move { - while let Ok(msg) = receiver.recv().await { - if sink.send(msg).await.is_err() { - break; - } - } - }); - }) -} - pub async fn start_gateway() -> Result<(), Error> { info!(target: "symfonia::gateway", "Starting gateway server"); - let ws_app = Route::new().at( - "/", - get(ws.data(tokio::sync::mpsc::channel::(32).0)), - ); - let bind = std::env::var("API_BIND").unwrap_or_else(|_| String::from("localhost:3001")); - debug!(target: "symfonia::gateway", "Binding to {}", bind); - Server::new(TcpListener::bind("0.0.0.0:3000")) - .run(ws_app) - .await?; - debug!(target: "symfonia::gateway", "Gateway server started"); Ok(()) } From 972d26e64eec97c835e161b8736dbac3ea793205 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Fri, 12 Jul 2024 23:01:31 +0200 Subject: [PATCH 10/27] Revert "poem gateway experimentation" This reverts commit c6e4dda41fb37b1407c733a00194949ebcdada6d. --- src/main.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9478575..3da7acd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,20 +1,20 @@ use clap::Parser; -use log::LevelFilter; use log4rs::{ append::{ console::{ConsoleAppender, Target}, rolling_file::{ policy::compound::{ - roll::delete::DeleteRoller, trigger::size::SizeTrigger, CompoundPolicy, + CompoundPolicy, roll::delete::DeleteRoller, trigger::size::SizeTrigger, }, RollingFileAppender, }, }, config::{Appender, Logger, Root}, + Config, encode::pattern::PatternEncoder, filter::Filter, - Config, }; +use log::LevelFilter; mod api; mod cdn; @@ -124,7 +124,7 @@ async fn main() { .logger( Logger::builder() .appender("gateway") - .build("symfonia::gateway", LevelFilter::Debug), + .build("symfonia::gateway", LevelFilter::Info), ) .build(Root::builder().appender("stdout").build({ let mode = std::env::var("MODE").unwrap_or("DEBUG".to_string()); @@ -141,7 +141,6 @@ async fn main() { log::info!(target: "symfonia", "Starting up Symfonia"); - gateway::start_gateway().await.unwrap(); // TODO: This should be near api::start... log::info!(target: "symfonia::db", "Establishing database connection"); let db = database::establish_connection() .await From e41a6d561315f154be12803b71dfaf29b7b53ea4 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 14 Jul 2024 17:39:17 +0200 Subject: [PATCH 11/27] Add pubserve dependency --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index b8d498a..1cf206a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,3 +69,4 @@ itertools = "0.13.0" tokio-tungstenite = { version = "0.23.1", features = [ "rustls-tls-webpki-roots", ] } +pubserve = { version = "1.1.0", features = ["async", "send"] } From 7939f47325c06d407de5cd065e6178bf1145c838 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 14 Jul 2024 17:39:52 +0200 Subject: [PATCH 12/27] Add HashMap of Emitters --- src/main.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index 3da7acd..72b33bd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,20 +1,26 @@ +use std::collections::HashMap; +use std::rc::Rc; + use clap::Parser; + +use gateway::{EventEmitter, Events}; +use log::LevelFilter; use log4rs::{ append::{ console::{ConsoleAppender, Target}, rolling_file::{ policy::compound::{ - CompoundPolicy, roll::delete::DeleteRoller, trigger::size::SizeTrigger, + roll::delete::DeleteRoller, trigger::size::SizeTrigger, CompoundPolicy, }, RollingFileAppender, }, }, config::{Appender, Logger, Root}, - Config, encode::pattern::PatternEncoder, filter::Filter, + Config, }; -use log::LevelFilter; +use pubserve::Publisher; mod api; mod cdn; @@ -181,5 +187,6 @@ async fn main() { .expect("Failed to seed config"); } + let mut emitters: HashMap>> = HashMap::new(); api::start_api(db).await.unwrap(); } From ea64e171a1665353060185dc026bdd3f94880ce7 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 14 Jul 2024 17:40:26 +0200 Subject: [PATCH 13/27] Make Channels have Event Emitters --- src/api/routes/guilds/id/channels.rs | 12 +++++++++--- src/database/entities/channel.rs | 21 ++++++++++++++++----- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/api/routes/guilds/id/channels.rs b/src/api/routes/guilds/id/channels.rs index 8d46ef3..ef8f6bc 100644 --- a/src/api/routes/guilds/id/channels.rs +++ b/src/api/routes/guilds/id/channels.rs @@ -1,10 +1,10 @@ use chorus::types::{ - ChannelModifySchema, ChannelType, jwt::Claims, ModifyChannelPositionsSchema, Snowflake, + jwt::Claims, ChannelModifySchema, ChannelType, ModifyChannelPositionsSchema, Snowflake, }; use poem::{ handler, - IntoResponse, - Response, web::{Data, Json, Path}, + web::{Data, Json, Path}, + IntoResponse, Response, }; use reqwest::StatusCode; use sqlx::MySqlPool; @@ -139,6 +139,7 @@ mod tests { position: Some(0), ..Default::default() }, + ..Default::default() }, Channel { inner: chorus::types::Channel { @@ -146,6 +147,7 @@ mod tests { position: Some(1), ..Default::default() }, + ..Default::default() }, Channel { inner: chorus::types::Channel { @@ -153,6 +155,7 @@ mod tests { position: Some(2), ..Default::default() }, + ..Default::default() }, Channel { inner: chorus::types::Channel { @@ -160,6 +163,7 @@ mod tests { position: Some(3), ..Default::default() }, + ..Default::default() }, Channel { inner: chorus::types::Channel { @@ -167,6 +171,7 @@ mod tests { position: Some(4), ..Default::default() }, + ..Default::default() }, Channel { inner: chorus::types::Channel { @@ -174,6 +179,7 @@ mod tests { position: Some(5), ..Default::default() }, + ..Default::default() }, ]; diff --git a/src/database/entities/channel.rs b/src/database/entities/channel.rs index b6c0c8f..777d4ef 100644 --- a/src/database/entities/channel.rs +++ b/src/database/entities/channel.rs @@ -1,25 +1,35 @@ use std::ops::{Deref, DerefMut}; use chorus::types::{ - ChannelMessagesAnchor, ChannelModifySchema, ChannelType, CreateChannelInviteSchema, InviteType, - MessageSendSchema, PermissionOverwrite, Snowflake, + ChannelDelete, ChannelMessagesAnchor, ChannelModifySchema, ChannelType, ChannelUpdate, + CreateChannelInviteSchema, InviteType, MessageSendSchema, PermissionOverwrite, Snowflake, }; use itertools::Itertools; +use pubserve::Publisher; use serde::{Deserialize, Serialize}; -use sqlx::{MySqlPool, types::Json}; +use sqlx::{types::Json, MySqlPool}; use crate::{ database::entities::{ - GuildMember, invite::Invite, message::Message, read_state::ReadState, recipient::Recipient, + invite::Invite, message::Message, read_state::ReadState, recipient::Recipient, GuildMember, User, Webhook, }, errors::{ChannelError, Error, GuildError, UserError}, }; -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, sqlx::FromRow)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, sqlx::FromRow, Default)] pub struct Channel { #[sqlx(flatten)] pub(crate) inner: chorus::types::Channel, + #[sqlx(skip)] + #[serde(skip)] + pub publisher: ChannelEventPublisher, +} + +#[derive(Debug, Default, Clone, PartialEq)] +pub struct ChannelEventPublisher { + pub update: Publisher, + pub delete: Publisher, } impl Deref for Channel { @@ -87,6 +97,7 @@ impl Channel { guild_id, ..Default::default() }, + ..Default::default() }; sqlx::query("INSERT INTO channels (id, type, name, nsfw, guild_id, parent_id, flags, permission_overwrites, default_thread_rate_limit_per_user, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, NOW())") From de2a31b6c222b8b1d36cc8b293acf97db96556e8 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 14 Jul 2024 17:40:48 +0200 Subject: [PATCH 14/27] Have user store list of subscribed_events --- src/database/entities/user.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/database/entities/user.rs b/src/database/entities/user.rs index dae2ed1..d3a2452 100644 --- a/src/database/entities/user.rs +++ b/src/database/entities/user.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use sqlx::{FromRow, MySqlPool, Row}; +use crate::gateway::EventEmitter; use crate::{ database::entities::{Config, Guild, GuildMember, UserSettings}, errors::{Error, GuildError}, @@ -28,6 +29,9 @@ pub struct User { #[sqlx(skip)] pub settings: UserSettings, pub extended_settings: sqlx::types::Json, + #[sqlx(rename = "subscribedEvents")] + /// A list of [EventEmitter]s that the server has determined the user should be subscribed to. + pub subscribed_events: sqlx::types::Json>, } impl Deref for User { From 9343c2e9b154b2aba0125c0a4686f72f9c965ef6 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 14 Jul 2024 17:41:03 +0200 Subject: [PATCH 15/27] Add some Gateway events types --- src/gateway/events.rs | 22 +++++++++ src/gateway/mod.rs | 16 +++++++ src/gateway/types.rs | 103 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 141 insertions(+) create mode 100644 src/gateway/events.rs create mode 100644 src/gateway/types.rs diff --git a/src/gateway/events.rs b/src/gateway/events.rs new file mode 100644 index 0000000..984560d --- /dev/null +++ b/src/gateway/events.rs @@ -0,0 +1,22 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use chorus::types::{ + ChannelCreate, ChannelDelete, ChannelUpdate, GuildCreate, GuildDelete, GuildUpdate, + RelationshipAdd, RelationshipRemove, UserUpdate, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] +pub enum Events { + ChannelCreate(ChannelCreate), + ChannelUpdate(ChannelUpdate), + ChannelDelete(ChannelDelete), + GuildCreate(GuildCreate), + GuildUpdate(GuildUpdate), + GuildDelete(GuildDelete), + RelationshipAdd(RelationshipAdd), + RelationshipRemove(RelationshipRemove), + UserUpdate(UserUpdate), +} diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index a729002..854fd6b 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -1,3 +1,12 @@ +mod events; +mod types; + +use chorus::types::Snowflake; +use serde::{Deserialize, Serialize}; + +pub use events::*; +pub use types::*; + use log::info; // This Source Code Form is subject to the terms of the Mozilla Public @@ -9,3 +18,10 @@ pub async fn start_gateway() -> Result<(), Error> { info!(target: "symfonia::gateway", "Starting gateway server"); Ok(()) } + +#[derive(Hash, PartialEq, Eq, Debug, Clone, Copy, Deserialize, Serialize)] +/// Identifies a unique event emitter with an event type and a snowflake ID. +pub struct EventEmitter { + pub event_type: EventType, + pub id: Snowflake, +} diff --git a/src/gateway/types.rs b/src/gateway/types.rs new file mode 100644 index 0000000..c44b3a4 --- /dev/null +++ b/src/gateway/types.rs @@ -0,0 +1,103 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use serde::{Deserialize, Serialize}; + +#[derive(Hash, PartialEq, Eq, Clone, Copy, Debug, Deserialize, Serialize)] +pub enum EventType { + Ready, + ReadySupplemental, + Resumed, + AuthSessionChange, + AuthenticatorCreate, + AuthenticatorUpdate, + AuthenticatorDelete, + ApplicationCommandPermissionsUpdate, + AutoModerationRuleCreate, + AutoModerationRuleUpdate, + AutoModerationRuleDelete, + AutoModerationActionExecution, + AutoModerationMentionRaidDetection, + CallCreate, + CallUpdate, + CallDelete, + ChannelCreate, + ChannelUpdate, + ChannelDelete, + ChannelStatuses, + VoiceChannelStatusUpdate, + ChannelPinsUpdate, + ChannelRecipientAdd, + ChannelRecipientRemove, + ThreadCreate, + ThreadUpdate, + ThreadDelete, + ThreadListSync, + ThreadMemberUpdate, + ThreadMembersUpdate, + FriendSuggestionCreate, + FriendSuggestionDelete, + GuildCreate, + GuildUpdate, + GuildDelete, + GuildAuditLogEntryCreate, + GuildBanAdd, + GuildBanRemove, + GuildEmojisUpdate, + GuildStickersUpdate, + GuildJoinRequestCreate, + GuildJoinRequestUpdate, + GuildJoinRequestDelete, + GuildMemberAdd, + GuildMemberRemove, + GuildMemberUpdate, + GuildMembersChunk, + GuildRoleCreate, + GuildRoleUpdate, + GuildRoleDelete, + GuildScheduledEventCreate, + GuildScheduledEventUpdate, + GuildScheduledEventDelete, + GuildScheduledEventUserAdd, + GuildScheduledEventUserRemove, + GuildSoundboardSoundCreate, + GuildSoundboardSoundUpdate, + GuildSoundboardSoundDelete, + SoundboardSounds, + GuildIntegrationsUpdate, + IntegrationCreate, + IntegrationUpdate, + IntegrationDelete, + InteractionCreate, + InviteCreate, + InviteDelete, + MessageCreate, + MessageUpdate, + MessageDelete, + MessageDeleteBulk, + MessagePollVoteAdd, + MessagePollVoteRemove, + MessageReactionAdd, + MessageReactionAddMany, + MessageReactionRemove, + MessageReactionRemoveAll, + MessageReactionRemoveEmoji, + RecentMentionDelete, + LastMessages, + PresenceUpdate, + RelationshipAdd, + RelationshipUpdate, + RelationshipRemove, + StageInstanceCreate, + StageInstanceUpdate, + StageInstanceDelete, + TypingStart, + UserUpdate, + UserNoteUpdate, + UserRequiredActionUpdate, + VoiceStateUpdate, + VoiceServerUpdate, + VoiceChannelEffectSend, + WebhooksUpdate, +} From 94c9d1d11e640e4a6e8fc3385f8d58a66063904b Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 14 Jul 2024 21:31:04 +0200 Subject: [PATCH 16/27] Move HashMap of Emitters to API, where it belongs --- src/api/mod.rs | 17 ++++++++++++++--- src/database/entities/channel.rs | 13 ++++++++++--- src/main.rs | 6 ------ 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index 138049b..b1f342b 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,12 +1,17 @@ +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + use poem::{ - EndpointExt, - IntoResponse, listener::TcpListener, - middleware::{NormalizePath, TrailingSlash}, Route, Server, web::Json, + middleware::{NormalizePath, TrailingSlash}, + web::Json, + EndpointExt, IntoResponse, Route, Server, }; +use pubserve::Publisher; use serde_json::json; use sqlx::MySqlPool; +use crate::gateway::{EventEmitter, Events}; use crate::{ api::{ middleware::{ @@ -23,6 +28,11 @@ mod routes; pub async fn start_api(db: MySqlPool) -> Result<(), Error> { log::info!(target: "symfonia::api::cfg", "Loading configuration"); + + // To avoid having to load all entities from disk every time we want to subscribe a newly + // connected user to their events, we store the emitters in a HashMap. + let emitters: HashMap>>> = HashMap::new(); + let config = Config::init(&db).await?; if config.sentry.enabled { @@ -69,6 +79,7 @@ pub async fn start_api(db: MySqlPool) -> Result<(), Error> { .nest("/api/v9", routes) .data(db) .data(config) + .data(emitters) .with(NormalizePath::new(TrailingSlash::Trim)) .catch_all_error(custom_error); diff --git a/src/database/entities/channel.rs b/src/database/entities/channel.rs index 777d4ef..0937a30 100644 --- a/src/database/entities/channel.rs +++ b/src/database/entities/channel.rs @@ -1,4 +1,5 @@ use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Mutex}; use chorus::types::{ ChannelDelete, ChannelMessagesAnchor, ChannelModifySchema, ChannelType, ChannelUpdate, @@ -26,10 +27,16 @@ pub struct Channel { pub publisher: ChannelEventPublisher, } -#[derive(Debug, Default, Clone, PartialEq)] +#[derive(Debug, Default, Clone)] pub struct ChannelEventPublisher { - pub update: Publisher, - pub delete: Publisher, + pub update: Arc>>, + pub delete: Arc>>, +} + +impl PartialEq for ChannelEventPublisher { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.delete, &other.delete) && Arc::ptr_eq(&self.update, &other.update) + } } impl Deref for Channel { diff --git a/src/main.rs b/src/main.rs index 72b33bd..5ea6f28 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,5 @@ -use std::collections::HashMap; -use std::rc::Rc; - use clap::Parser; -use gateway::{EventEmitter, Events}; use log::LevelFilter; use log4rs::{ append::{ @@ -20,7 +16,6 @@ use log4rs::{ filter::Filter, Config, }; -use pubserve::Publisher; mod api; mod cdn; @@ -187,6 +182,5 @@ async fn main() { .expect("Failed to seed config"); } - let mut emitters: HashMap>> = HashMap::new(); api::start_api(db).await.unwrap(); } From 2d80b69a17460366ef294935ec20e0e699e23544 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 21 Jul 2024 16:53:11 +0200 Subject: [PATCH 17/27] Update Events Types --- src/gateway/events.rs | 22 -------- src/gateway/types.rs | 127 +++++++++++++++++++++++++----------------- 2 files changed, 76 insertions(+), 73 deletions(-) delete mode 100644 src/gateway/events.rs diff --git a/src/gateway/events.rs b/src/gateway/events.rs deleted file mode 100644 index 984560d..0000000 --- a/src/gateway/events.rs +++ /dev/null @@ -1,22 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at https://mozilla.org/MPL/2.0/. - -use chorus::types::{ - ChannelCreate, ChannelDelete, ChannelUpdate, GuildCreate, GuildDelete, GuildUpdate, - RelationshipAdd, RelationshipRemove, UserUpdate, -}; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Deserialize, Serialize)] -pub enum Events { - ChannelCreate(ChannelCreate), - ChannelUpdate(ChannelUpdate), - ChannelDelete(ChannelDelete), - GuildCreate(GuildCreate), - GuildUpdate(GuildUpdate), - GuildDelete(GuildDelete), - RelationshipAdd(RelationshipAdd), - RelationshipRemove(RelationshipRemove), - UserUpdate(UserUpdate), -} diff --git a/src/gateway/types.rs b/src/gateway/types.rs index c44b3a4..5e647bb 100644 --- a/src/gateway/types.rs +++ b/src/gateway/types.rs @@ -2,53 +2,46 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use serde::{Deserialize, Serialize}; +use ::serde::{Deserialize, Serialize}; +use chorus::types::*; -#[derive(Hash, PartialEq, Eq, Clone, Copy, Debug, Deserialize, Serialize)] +#[derive( + Debug, + ::serde::Deserialize, + ::serde::Serialize, + Clone, + PartialEq, + PartialOrd, + Eq, + Ord, + Copy, + Hash, +)] +/// Enum representing all possible* event types that can be received from or sent to the gateway. +/// +/// TODO: This is only temporary. Replace with this enum from chorus, when it is ready. pub enum EventType { + Hello, Ready, - ReadySupplemental, Resumed, - AuthSessionChange, - AuthenticatorCreate, - AuthenticatorUpdate, - AuthenticatorDelete, - ApplicationCommandPermissionsUpdate, - AutoModerationRuleCreate, - AutoModerationRuleUpdate, - AutoModerationRuleDelete, - AutoModerationActionExecution, - AutoModerationMentionRaidDetection, - CallCreate, - CallUpdate, - CallDelete, + InvalidSession, ChannelCreate, ChannelUpdate, ChannelDelete, - ChannelStatuses, - VoiceChannelStatusUpdate, ChannelPinsUpdate, - ChannelRecipientAdd, - ChannelRecipientRemove, ThreadCreate, ThreadUpdate, ThreadDelete, ThreadListSync, ThreadMemberUpdate, ThreadMembersUpdate, - FriendSuggestionCreate, - FriendSuggestionDelete, GuildCreate, GuildUpdate, GuildDelete, - GuildAuditLogEntryCreate, GuildBanAdd, GuildBanRemove, GuildEmojisUpdate, - GuildStickersUpdate, - GuildJoinRequestCreate, - GuildJoinRequestUpdate, - GuildJoinRequestDelete, + GuildIntegrationsUpdate, GuildMemberAdd, GuildMemberRemove, GuildMemberUpdate, @@ -56,16 +49,6 @@ pub enum EventType { GuildRoleCreate, GuildRoleUpdate, GuildRoleDelete, - GuildScheduledEventCreate, - GuildScheduledEventUpdate, - GuildScheduledEventDelete, - GuildScheduledEventUserAdd, - GuildScheduledEventUserRemove, - GuildSoundboardSoundCreate, - GuildSoundboardSoundUpdate, - GuildSoundboardSoundDelete, - SoundboardSounds, - GuildIntegrationsUpdate, IntegrationCreate, IntegrationUpdate, IntegrationDelete, @@ -76,28 +59,70 @@ pub enum EventType { MessageUpdate, MessageDelete, MessageDeleteBulk, - MessagePollVoteAdd, - MessagePollVoteRemove, MessageReactionAdd, - MessageReactionAddMany, MessageReactionRemove, MessageReactionRemoveAll, MessageReactionRemoveEmoji, - RecentMentionDelete, - LastMessages, PresenceUpdate, - RelationshipAdd, - RelationshipUpdate, - RelationshipRemove, - StageInstanceCreate, - StageInstanceUpdate, - StageInstanceDelete, TypingStart, UserUpdate, - UserNoteUpdate, - UserRequiredActionUpdate, VoiceStateUpdate, VoiceServerUpdate, - VoiceChannelEffectSend, WebhooksUpdate, + StageInstanceCreate, + StageInstanceUpdate, + StageInstanceDelete, + RequestMembers, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +/// Enum representing all possible* events that can be received from or sent to the gateway. +/// +/// TODO: This is only temporary. Replace with this enum from chorus, when it is ready. +#[serde(rename_all = "PascalCase")] +pub enum Event { + Hello(GatewayHello), + Ready(GatewayReady), + Resumed(GatewayResume), + InvalidSession(GatewayInvalidSession), + ChannelCreate(ChannelCreate), + ChannelUpdate(ChannelUpdate), + ChannelDelete(ChannelDelete), + ThreadCreate(ThreadCreate), + ThreadUpdate(ThreadUpdate), + ThreadDelete(ThreadDelete), + ThreadListSync(ThreadListSync), + ThreadMemberUpdate(ThreadMemberUpdate), + ThreadMembersUpdate(ThreadMembersUpdate), + GuildCreate(GuildCreate), + GuildUpdate(GuildUpdate), + GuildDelete(GuildDelete), + GuildBanAdd(GuildBanAdd), + GuildBanRemove(GuildBanRemove), + GuildEmojisUpdate(GuildEmojisUpdate), + GuildIntegrationsUpdate(GuildIntegrationsUpdate), + GuildMemberAdd(GuildMemberAdd), + GuildMemberRemove(GuildMemberRemove), + GuildMemberUpdate(GuildMemberUpdate), + GuildMembersChunk(GuildMembersChunk), + InteractionCreate(InteractionCreate), + InviteCreate(InviteCreate), + InviteDelete(InviteDelete), + MessageCreate(MessageCreate), + MessageUpdate(MessageUpdate), + MessageDelete(MessageDelete), + MessageDeleteBulk(MessageDeleteBulk), + MessageReactionAdd(MessageReactionAdd), + MessageReactionRemove(MessageReactionRemove), + MessageReactionRemoveAll(MessageReactionRemoveAll), + MessageReactionRemoveEmoji(MessageReactionRemoveEmoji), + PresenceUpdate(PresenceUpdate), + TypingStart(TypingStartEvent), + UserUpdate(UserUpdate), + VoiceStateUpdate(VoiceStateUpdate), + VoiceServerUpdate(VoiceServerUpdate), + WebhooksUpdate(WebhooksUpdate), + StageInstanceCreate(StageInstanceCreate), + StageInstanceUpdate(StageInstanceUpdate), + StageInstanceDelete(StageInstanceDelete), } From b13906ffaa29d4d5b17544c0598f1a0a6483564f Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 21 Jul 2024 16:53:30 +0200 Subject: [PATCH 18/27] Remove EventEmitter --- src/api/mod.rs | 8 +------- src/database/entities/user.rs | 4 ---- src/gateway/mod.rs | 18 ++++-------------- 3 files changed, 5 insertions(+), 25 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index b1f342b..d7541ad 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,17 +1,12 @@ -use std::collections::HashMap; -use std::sync::{Arc, Mutex}; - use poem::{ listener::TcpListener, middleware::{NormalizePath, TrailingSlash}, web::Json, EndpointExt, IntoResponse, Route, Server, }; -use pubserve::Publisher; use serde_json::json; use sqlx::MySqlPool; -use crate::gateway::{EventEmitter, Events}; use crate::{ api::{ middleware::{ @@ -31,7 +26,6 @@ pub async fn start_api(db: MySqlPool) -> Result<(), Error> { // To avoid having to load all entities from disk every time we want to subscribe a newly // connected user to their events, we store the emitters in a HashMap. - let emitters: HashMap>>> = HashMap::new(); let config = Config::init(&db).await?; @@ -75,11 +69,11 @@ pub async fn start_api(db: MySqlPool) -> Result<(), Error> { .nest("/policies", routes::policies::setup_routes()) .nest("/-", routes::health::setup_routes()); + // TODO: Add emitters here let v9_api = Route::new() .nest("/api/v9", routes) .data(db) .data(config) - .data(emitters) .with(NormalizePath::new(TrailingSlash::Trim)) .catch_all_error(custom_error); diff --git a/src/database/entities/user.rs b/src/database/entities/user.rs index d3a2452..dae2ed1 100644 --- a/src/database/entities/user.rs +++ b/src/database/entities/user.rs @@ -9,7 +9,6 @@ use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use sqlx::{FromRow, MySqlPool, Row}; -use crate::gateway::EventEmitter; use crate::{ database::entities::{Config, Guild, GuildMember, UserSettings}, errors::{Error, GuildError}, @@ -29,9 +28,6 @@ pub struct User { #[sqlx(skip)] pub settings: UserSettings, pub extended_settings: sqlx::types::Json, - #[sqlx(rename = "subscribedEvents")] - /// A list of [EventEmitter]s that the server has determined the user should be subscribed to. - pub subscribed_events: sqlx::types::Json>, } impl Deref for User { diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 854fd6b..292bffa 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -1,27 +1,17 @@ -mod events; mod types; -use chorus::types::Snowflake; -use serde::{Deserialize, Serialize}; - -pub use events::*; -pub use types::*; - use log::info; +use pubserve::Publisher; +pub use types::*; // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. use crate::errors::Error; +pub type EventPublisher = Publisher; + pub async fn start_gateway() -> Result<(), Error> { info!(target: "symfonia::gateway", "Starting gateway server"); Ok(()) } - -#[derive(Hash, PartialEq, Eq, Debug, Clone, Copy, Deserialize, Serialize)] -/// Identifies a unique event emitter with an event type and a snowflake ID. -pub struct EventEmitter { - pub event_type: EventType, - pub id: Snowflake, -} From 88a78ab000d6f587b95f2c490b206693609894b6 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 21 Jul 2024 16:53:53 +0200 Subject: [PATCH 19/27] Add publisher to application --- src/database/entities/application.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/database/entities/application.rs b/src/database/entities/application.rs index 0e05cc7..9bd2f2d 100644 --- a/src/database/entities/application.rs +++ b/src/database/entities/application.rs @@ -1,12 +1,13 @@ use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, RwLock}; -use bitflags::Flags; use chorus::types::{ApplicationFlags, Snowflake}; use serde::{Deserialize, Serialize}; -use sqlx::{FromRow, MySqlPool}; +use sqlx::MySqlPool; +use crate::gateway::Event; use crate::{ - database::entities::{Config, user::User}, + database::entities::{user::User, Config}, errors::Error, }; @@ -17,6 +18,9 @@ pub struct Application { pub owner_id: Snowflake, pub bot_user_id: Option, pub team_id: Option, + #[sqlx(skip)] + #[serde(skip)] + pub publisher: Arc>>, } impl Deref for Application { @@ -62,6 +66,7 @@ impl Application { owner_id: owner_id.to_owned(), bot_user_id, team_id: None, + publisher: Arc::new(RwLock::new(pubserve::Publisher::new())), }; let _res = sqlx::query("INSERT INTO applications (id, name, summary, hook, bot_public, verify_key, owner_id, flags, integration_public, discoverability_state, discovery_eligibility_flags) VALUES (?, ?, ?, true, true, ?, ?, ?, true, 1, 2240)") From 68bedff42ca8672b6167605ea2a8a3da32bca97b Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 21 Jul 2024 22:28:36 +0200 Subject: [PATCH 20/27] Typedefs and dependency injection into api::start_api/gateway::start_gateway --- src/main.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index 5ea6f28..385e5be 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,10 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use chorus::types::Snowflake; use clap::Parser; +use gateway::Event; use log::LevelFilter; use log4rs::{ append::{ @@ -16,6 +21,7 @@ use log4rs::{ filter::Filter, Config, }; +use parking_lot::RwLock; mod api; mod cdn; @@ -24,6 +30,10 @@ mod errors; mod gateway; mod util; +pub type SharedPublisher = Arc>>; +pub type PublisherMap = HashMap; +pub type SharedPublisherMap = Arc>; + #[derive(Debug)] struct LogFilter; @@ -181,6 +191,11 @@ async fn main() { .await .expect("Failed to seed config"); } - - api::start_api(db).await.unwrap(); + let shared_publisher_map = Arc::new(RwLock::new(HashMap::new())); + api::start_api(db.clone(), shared_publisher_map.clone()) + .await + .unwrap(); + gateway::start_gateway(db.clone(), shared_publisher_map.clone()) + .await + .unwrap(); } From 4fda8f84e93ce153ec54717b49f444263c0835eb Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 21 Jul 2024 22:28:55 +0200 Subject: [PATCH 21/27] Add publisher_map to start_api data --- src/api/mod.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index d7541ad..ed3eac6 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -7,6 +7,7 @@ use poem::{ use serde_json::json; use sqlx::MySqlPool; +use crate::SharedPublisherMap; use crate::{ api::{ middleware::{ @@ -21,12 +22,9 @@ use crate::{ mod middleware; mod routes; -pub async fn start_api(db: MySqlPool) -> Result<(), Error> { +pub async fn start_api(db: MySqlPool, publisher_map: SharedPublisherMap) -> Result<(), Error> { log::info!(target: "symfonia::api::cfg", "Loading configuration"); - // To avoid having to load all entities from disk every time we want to subscribe a newly - // connected user to their events, we store the emitters in a HashMap. - let config = Config::init(&db).await?; if config.sentry.enabled { @@ -69,11 +67,11 @@ pub async fn start_api(db: MySqlPool) -> Result<(), Error> { .nest("/policies", routes::policies::setup_routes()) .nest("/-", routes::health::setup_routes()); - // TODO: Add emitters here let v9_api = Route::new() .nest("/api/v9", routes) .data(db) .data(config) + .data(publisher_map) .with(NormalizePath::new(TrailingSlash::Trim)) .catch_all_error(custom_error); From 586836020100d331ac72beea7dc14cd29879a886 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 21 Jul 2024 22:29:03 +0200 Subject: [PATCH 22/27] Add parking_lot --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 1cf206a..124fda0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,3 +70,4 @@ tokio-tungstenite = { version = "0.23.1", features = [ "rustls-tls-webpki-roots", ] } pubserve = { version = "1.1.0", features = ["async", "send"] } +parking_lot = { version = "0.12.3", features = ["deadlock_detection"] } From 51eac9e9056c4b43745467c6d9870405df4d46c4 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 21 Jul 2024 22:29:25 +0200 Subject: [PATCH 23/27] Use parking_lot::RwLock instead of std::sync::RwLock --- src/database/entities/application.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/database/entities/application.rs b/src/database/entities/application.rs index 9bd2f2d..8febf2c 100644 --- a/src/database/entities/application.rs +++ b/src/database/entities/application.rs @@ -1,7 +1,8 @@ use std::ops::{Deref, DerefMut}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use chorus::types::{ApplicationFlags, Snowflake}; +use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use sqlx::MySqlPool; From ac8fe255d8f4ab494dd4866a51b2eded00e4c30c Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 21 Jul 2024 22:29:37 +0200 Subject: [PATCH 24/27] Remove unused imports --- src/database/entities/guild.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/database/entities/guild.rs b/src/database/entities/guild.rs index 1ebe129..dd02fa3 100644 --- a/src/database/entities/guild.rs +++ b/src/database/entities/guild.rs @@ -1,11 +1,8 @@ -use std::{ - ops::{Deref, DerefMut}, - sync::{Arc, RwLock}, -}; +use std::ops::{Deref, DerefMut}; use chorus::types::{ - ChannelType, NSFWLevel, PermissionFlags, PremiumTier, - PublicUser, Snowflake, SystemChannelFlags, types::guild_configuration::GuildFeaturesList, WelcomeScreenObject, + types::guild_configuration::GuildFeaturesList, ChannelType, NSFWLevel, PermissionFlags, + PremiumTier, PublicUser, Snowflake, SystemChannelFlags, WelcomeScreenObject, }; use serde::{Deserialize, Serialize}; use sqlx::{FromRow, MySqlPool, QueryBuilder, Row}; From 4f10db765b4d1cf6f29abb6212e6c2d2c6566e91 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Sun, 21 Jul 2024 22:29:54 +0200 Subject: [PATCH 25/27] inject MySqlPool, SharedPublisherMap into start_gateway() --- src/gateway/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 292bffa..cc7d288 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -1,17 +1,17 @@ mod types; use log::info; -use pubserve::Publisher; +use sqlx::MySqlPool; pub use types::*; // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. use crate::errors::Error; +use crate::SharedPublisherMap; -pub type EventPublisher = Publisher; - -pub async fn start_gateway() -> Result<(), Error> { +pub async fn start_gateway(db: MySqlPool, publisher_map: SharedPublisherMap) -> Result<(), Error> { info!(target: "symfonia::gateway", "Starting gateway server"); + // `publishers` will live for the lifetime of the gateway server, in the main gateway thread Ok(()) } From 8864c2e618d17bb84e083a0fa3fa2b533599e7b7 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Tue, 23 Jul 2024 11:07:14 +0200 Subject: [PATCH 26/27] Rename SharedPublisher to SharedEventPublisher --- src/api/mod.rs | 4 ++-- src/main.rs | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index ed3eac6..ad85afa 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -7,7 +7,7 @@ use poem::{ use serde_json::json; use sqlx::MySqlPool; -use crate::SharedPublisherMap; +use crate::SharedEventPublisherMap; use crate::{ api::{ middleware::{ @@ -22,7 +22,7 @@ use crate::{ mod middleware; mod routes; -pub async fn start_api(db: MySqlPool, publisher_map: SharedPublisherMap) -> Result<(), Error> { +pub async fn start_api(db: MySqlPool, publisher_map: SharedEventPublisherMap) -> Result<(), Error> { log::info!(target: "symfonia::api::cfg", "Loading configuration"); let config = Config::init(&db).await?; diff --git a/src/main.rs b/src/main.rs index 385e5be..16dce4d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,7 @@ use log4rs::{ Config, }; use parking_lot::RwLock; +use pubserve::Publisher; mod api; mod cdn; @@ -30,9 +31,9 @@ mod errors; mod gateway; mod util; -pub type SharedPublisher = Arc>>; -pub type PublisherMap = HashMap; -pub type SharedPublisherMap = Arc>; +pub type SharedEventPublisher = Arc>>; +pub type EventPublisherMap = HashMap; +pub type SharedEventPublisherMap = Arc>; #[derive(Debug)] struct LogFilter; From cbe11b2379f22a8321b316c117b79d2258f01ff3 Mon Sep 17 00:00:00 2001 From: bitfl0wer Date: Tue, 23 Jul 2024 11:07:37 +0200 Subject: [PATCH 27/27] Add SharedEventPublisher to relevant entities --- src/database/entities/application.rs | 5 +++-- src/database/entities/channel.rs | 19 +++---------------- src/database/entities/guild.rs | 5 +++++ src/database/entities/mod.rs | 2 ++ src/database/entities/role.rs | 5 +++++ src/database/entities/user.rs | 5 +++++ src/gateway/mod.rs | 7 +++++-- 7 files changed, 28 insertions(+), 20 deletions(-) diff --git a/src/database/entities/application.rs b/src/database/entities/application.rs index 8febf2c..591e972 100644 --- a/src/database/entities/application.rs +++ b/src/database/entities/application.rs @@ -1,3 +1,5 @@ +use super::*; + use std::ops::{Deref, DerefMut}; use std::sync::Arc; @@ -6,7 +8,6 @@ use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use sqlx::MySqlPool; -use crate::gateway::Event; use crate::{ database::entities::{user::User, Config}, errors::Error, @@ -21,7 +22,7 @@ pub struct Application { pub team_id: Option, #[sqlx(skip)] #[serde(skip)] - pub publisher: Arc>>, + pub publisher: SharedEventPublisher, } impl Deref for Application { diff --git a/src/database/entities/channel.rs b/src/database/entities/channel.rs index 0937a30..f01ada1 100644 --- a/src/database/entities/channel.rs +++ b/src/database/entities/channel.rs @@ -1,6 +1,4 @@ -use std::ops::{Deref, DerefMut}; -use std::sync::{Arc, Mutex}; - +use super::*; use chorus::types::{ ChannelDelete, ChannelMessagesAnchor, ChannelModifySchema, ChannelType, ChannelUpdate, CreateChannelInviteSchema, InviteType, MessageSendSchema, PermissionOverwrite, Snowflake, @@ -9,6 +7,7 @@ use itertools::Itertools; use pubserve::Publisher; use serde::{Deserialize, Serialize}; use sqlx::{types::Json, MySqlPool}; +use std::ops::{Deref, DerefMut}; use crate::{ database::entities::{ @@ -24,19 +23,7 @@ pub struct Channel { pub(crate) inner: chorus::types::Channel, #[sqlx(skip)] #[serde(skip)] - pub publisher: ChannelEventPublisher, -} - -#[derive(Debug, Default, Clone)] -pub struct ChannelEventPublisher { - pub update: Arc>>, - pub delete: Arc>>, -} - -impl PartialEq for ChannelEventPublisher { - fn eq(&self, other: &Self) -> bool { - Arc::ptr_eq(&self.delete, &other.delete) && Arc::ptr_eq(&self.update, &other.update) - } + pub publisher: SharedEventPublisher, } impl Deref for Channel { diff --git a/src/database/entities/guild.rs b/src/database/entities/guild.rs index dd02fa3..978905c 100644 --- a/src/database/entities/guild.rs +++ b/src/database/entities/guild.rs @@ -1,3 +1,5 @@ +use super::*; + use std::ops::{Deref, DerefMut}; use chorus::types::{ @@ -28,6 +30,9 @@ pub struct Guild { pub parent: Option, pub template_id: Option, pub nsfw: bool, + #[sqlx(skip)] + #[serde(skip)] + pub publisher: SharedEventPublisher, } impl Deref for Guild { diff --git a/src/database/entities/mod.rs b/src/database/entities/mod.rs index fdb7c32..896fc70 100644 --- a/src/database/entities/mod.rs +++ b/src/database/entities/mod.rs @@ -16,6 +16,8 @@ pub use user_settings::*; pub use voice_state::*; pub use webhook::*; +use crate::SharedEventPublisher; + mod application; mod attachment; mod audit_log; diff --git a/src/database/entities/role.rs b/src/database/entities/role.rs index 183defc..9de6b27 100644 --- a/src/database/entities/role.rs +++ b/src/database/entities/role.rs @@ -1,3 +1,5 @@ +use super::*; + use std::ops::{Deref, DerefMut}; use chorus::types::{PermissionFlags, Snowflake}; @@ -11,6 +13,9 @@ pub struct Role { #[sqlx(flatten)] inner: chorus::types::RoleObject, pub guild_id: Snowflake, + #[sqlx(skip)] + #[serde(skip)] + pub publisher: SharedEventPublisher, } impl Deref for Role { diff --git a/src/database/entities/user.rs b/src/database/entities/user.rs index dae2ed1..a20cf23 100644 --- a/src/database/entities/user.rs +++ b/src/database/entities/user.rs @@ -1,3 +1,5 @@ +use super::*; + use std::{ default::Default, ops::{Deref, DerefMut}, @@ -28,6 +30,9 @@ pub struct User { #[sqlx(skip)] pub settings: UserSettings, pub extended_settings: sqlx::types::Json, + #[sqlx(skip)] + #[serde(skip)] + pub publisher: SharedEventPublisher, } impl Deref for User { diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index cc7d288..f7f1656 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -8,9 +8,12 @@ pub use types::*; // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. use crate::errors::Error; -use crate::SharedPublisherMap; +use crate::SharedEventPublisherMap; -pub async fn start_gateway(db: MySqlPool, publisher_map: SharedPublisherMap) -> Result<(), Error> { +pub async fn start_gateway( + db: MySqlPool, + publisher_map: SharedEventPublisherMap, +) -> Result<(), Error> { info!(target: "symfonia::gateway", "Starting gateway server"); // `publishers` will live for the lifetime of the gateway server, in the main gateway thread Ok(())