From 973d2e04c5a70072beaca8ea5acb2ae983f4681a Mon Sep 17 00:00:00 2001 From: 4t145 Date: Mon, 30 Dec 2024 10:35:20 +0800 Subject: [PATCH] event: filter expired message --- Cargo.toml | 6 +++--- .../src/extension/notification.rs | 6 +++--- backend/gateways/spacegate-plugins/src/lib.rs | 3 ++- backend/gateways/spacegate-plugins/src/plugin.rs | 2 +- .../event/src/serv/event_message_serv.rs | 15 ++++++++++----- 5 files changed, 19 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2c210dccf..162b3c672 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,9 +67,9 @@ strum = { version = "0.26", features = ["derive"] } # tardis = { version = "0.2.0", path = "../tardis/tardis" } tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "66d4c63" } # asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "d59c64d" } -asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "f9c96f3" } +asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "726c8dd" } # asteroid-mq = { version = "0.1.0-alpha.5" } -asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "f9c96f3" } +asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "726c8dd" } # asteroid-mq-sdk = { version = "0.1.0-alpha.5" } #spacegate @@ -78,7 +78,7 @@ asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "f9c # "k8s", # "ext-axum", # ] } -spacegate-shell = { git = "https://github.com/ideal-world/spacegate.git", rev="40bb693", features = [ +spacegate-shell = { git = "https://github.com/ideal-world/spacegate.git", rev="fe747e4", features = [ "cache", "k8s", "ext-axum", diff --git a/backend/gateways/spacegate-plugins/src/extension/notification.rs b/backend/gateways/spacegate-plugins/src/extension/notification.rs index 9de8e4199..840686c55 100644 --- a/backend/gateways/spacegate-plugins/src/extension/notification.rs +++ b/backend/gateways/spacegate-plugins/src/extension/notification.rs @@ -1,7 +1,7 @@ use std::{borrow::Cow, collections::HashMap, sync::Arc}; use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, Uri}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use spacegate_shell::{kernel::backend_service::http_client_service::HttpClient, SgBody, SgRequest}; use tardis::{log as tracing, serde_json}; @@ -43,7 +43,7 @@ impl NotificationContext { } } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct ReachMsgSendReq { pub scene_code: String, pub receives: Vec, @@ -51,7 +51,7 @@ pub struct ReachMsgSendReq { pub replace: HashMap, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct ReachMsgReceive { pub receive_group_code: String, pub receive_kind: String, diff --git a/backend/gateways/spacegate-plugins/src/lib.rs b/backend/gateways/spacegate-plugins/src/lib.rs index 276639f6f..69435faa0 100644 --- a/backend/gateways/spacegate-plugins/src/lib.rs +++ b/backend/gateways/spacegate-plugins/src/lib.rs @@ -8,7 +8,7 @@ mod marker; mod plugin; pub const PACKAGE_NAME: &str = "spacegate_lib"; -use plugin::op_redis_publisher; +use plugin::{notify, op_redis_publisher}; use spacegate_shell::plugin::PluginRepository; pub fn register_lib_plugins(repo: &PluginRepository) { repo.register::(); @@ -18,4 +18,5 @@ pub fn register_lib_plugins(repo: &PluginRepository) { repo.register::(); repo.register::(); repo.register::(); + repo.register::(); } diff --git a/backend/gateways/spacegate-plugins/src/plugin.rs b/backend/gateways/spacegate-plugins/src/plugin.rs index f335721f0..e531f6731 100644 --- a/backend/gateways/spacegate-plugins/src/plugin.rs +++ b/backend/gateways/spacegate-plugins/src/plugin.rs @@ -5,4 +5,4 @@ pub mod auth; pub mod ip_time; pub mod notify; pub mod op_redis_publisher; -pub mod rewrite_ns_b_ip; +pub mod rewrite_ns_b_ip; \ No newline at end of file diff --git a/backend/middlewares/event/src/serv/event_message_serv.rs b/backend/middlewares/event/src/serv/event_message_serv.rs index c8c889c8e..be00f8dab 100644 --- a/backend/middlewares/event/src/serv/event_message_serv.rs +++ b/backend/middlewares/event/src/serv/event_message_serv.rs @@ -3,9 +3,7 @@ use asteroid_mq::{ protocol::{node::raft::proposal::MessageStateUpdate, topic::durable_message::DurableMessageQuery}, }; use tardis::{ - basic::{error::TardisError, result::TardisResult}, - db::sea_orm::{ActiveModelTrait, ColumnTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, Set, Unchanged}, - TardisFunsInst, + basic::{error::TardisError, result::TardisResult}, chrono::Utc, db::sea_orm::{ActiveModelTrait, ColumnTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, Set, Unchanged}, TardisFunsInst }; use crate::domain::event_message::{ActiveModel, Column, Entity, Model}; @@ -39,14 +37,21 @@ impl EventMessageServ { } pub async fn batch_retrieve(&self, topic: TopicCode, query: DurableMessageQuery, funs: &TardisFunsInst) -> TardisResult> { let DurableMessageQuery { limit, offset, .. } = query; - let select = Entity::find().filter(Column::Archived.eq(false)).filter(Column::Topic.eq(topic.to_string())).limit(Some(limit as u64)).offset(Some(offset as u64)); + let select = Entity::find() + .filter(Column::Archived.eq(false)) + .filter(Column::Time.gte(Utc::now())) + .filter(Column::Topic.eq(topic.to_string())) + .limit(Some(limit as u64)) + .offset(Some(offset as u64)); let conn = funs.reldb().conn(); let raw_conn = conn.raw_conn(); let models = select.all(raw_conn).await?; models.into_iter().map(|model| model.try_into_durable_message()).collect::>>() } pub async fn retrieve(&self, topic: TopicCode, message_id: MessageId, funs: &TardisFunsInst) -> TardisResult { - let select = Entity::find().filter(Column::Archived.eq(false)).filter(Column::Topic.eq(topic.to_string())).filter(Column::MessageId.eq(message_id.to_base64())); + let select = Entity::find().filter(Column::Archived.eq(false)) + .filter(Column::Time.gte(Utc::now())) + .filter(Column::Topic.eq(topic.to_string())).filter(Column::MessageId.eq(message_id.to_base64())); let conn = funs.reldb().conn(); let raw_conn = conn.raw_conn(); let model = select.one(raw_conn).await?;