Skip to content

Commit

Permalink
Merge pull request #888 from 4t145/update-mq-reversion
Browse files Browse the repository at this point in the history
event: filter expired message
  • Loading branch information
4t145 authored Dec 30, 2024
2 parents 32f97fb + 973d2e0 commit 1314d38
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 13 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ strum = { version = "0.26", features = ["derive"] }
# tardis = { version = "0.2.0", path = "../tardis/tardis" }
tardis = { git = "https://github.com/ideal-world/tardis.git", rev = "66d4c63" }
# asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "d59c64d" }
asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "f9c96f3" }
asteroid-mq = { git = "https://github.com/4t145/asteroid-mq.git", rev = "726c8dd" }
# asteroid-mq = { version = "0.1.0-alpha.5" }
asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "f9c96f3" }
asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "726c8dd" }
# asteroid-mq-sdk = { version = "0.1.0-alpha.5" }
#spacegate

Expand All @@ -78,7 +78,7 @@ asteroid-mq-sdk = { git = "https://github.com/4t145/asteroid-mq.git", rev = "f9c
# "k8s",
# "ext-axum",
# ] }
spacegate-shell = { git = "https://github.com/ideal-world/spacegate.git", rev="40bb693", features = [
spacegate-shell = { git = "https://github.com/ideal-world/spacegate.git", rev="fe747e4", features = [
"cache",
"k8s",
"ext-axum",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{borrow::Cow, collections::HashMap, sync::Arc};

use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, Uri};
use serde::Serialize;
use serde::{Deserialize, Serialize};
use spacegate_shell::{kernel::backend_service::http_client_service::HttpClient, SgBody, SgRequest};
use tardis::{log as tracing, serde_json};

Expand Down Expand Up @@ -43,15 +43,15 @@ impl NotificationContext {
}
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct ReachMsgSendReq {
pub scene_code: String,
pub receives: Vec<ReachMsgReceive>,
pub rel_item_id: String,
pub replace: HashMap<String, String>,
}

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct ReachMsgReceive {
pub receive_group_code: String,
pub receive_kind: String,
Expand Down
3 changes: 2 additions & 1 deletion backend/gateways/spacegate-plugins/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod marker;
mod plugin;

pub const PACKAGE_NAME: &str = "spacegate_lib";
use plugin::op_redis_publisher;
use plugin::{notify, op_redis_publisher};
use spacegate_shell::plugin::PluginRepository;
pub fn register_lib_plugins(repo: &PluginRepository) {
repo.register::<ip_time::IpTimePlugin>();
Expand All @@ -18,4 +18,5 @@ pub fn register_lib_plugins(repo: &PluginRepository) {
repo.register::<audit_log::AuditLogPlugin>();
repo.register::<auth::AuthPlugin>();
repo.register::<op_redis_publisher::RedisPublisherPlugin>();
repo.register::<notify::NotifyPlugin>();
}
2 changes: 1 addition & 1 deletion backend/gateways/spacegate-plugins/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ pub mod auth;
pub mod ip_time;
pub mod notify;
pub mod op_redis_publisher;
pub mod rewrite_ns_b_ip;
pub mod rewrite_ns_b_ip;
15 changes: 10 additions & 5 deletions backend/middlewares/event/src/serv/event_message_serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use asteroid_mq::{
protocol::{node::raft::proposal::MessageStateUpdate, topic::durable_message::DurableMessageQuery},
};
use tardis::{
basic::{error::TardisError, result::TardisResult},
db::sea_orm::{ActiveModelTrait, ColumnTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, Set, Unchanged},
TardisFunsInst,
basic::{error::TardisError, result::TardisResult}, chrono::Utc, db::sea_orm::{ActiveModelTrait, ColumnTrait, DbErr, EntityTrait, IntoActiveModel, QueryFilter, QuerySelect, Set, Unchanged}, TardisFunsInst
};

use crate::domain::event_message::{ActiveModel, Column, Entity, Model};
Expand Down Expand Up @@ -39,14 +37,21 @@ impl EventMessageServ {
}
pub async fn batch_retrieve(&self, topic: TopicCode, query: DurableMessageQuery, funs: &TardisFunsInst) -> TardisResult<Vec<DurableMessage>> {
let DurableMessageQuery { limit, offset, .. } = query;
let select = Entity::find().filter(Column::Archived.eq(false)).filter(Column::Topic.eq(topic.to_string())).limit(Some(limit as u64)).offset(Some(offset as u64));
let select = Entity::find()
.filter(Column::Archived.eq(false))
.filter(Column::Time.gte(Utc::now()))
.filter(Column::Topic.eq(topic.to_string()))
.limit(Some(limit as u64))
.offset(Some(offset as u64));
let conn = funs.reldb().conn();
let raw_conn = conn.raw_conn();
let models = select.all(raw_conn).await?;
models.into_iter().map(|model| model.try_into_durable_message()).collect::<TardisResult<Vec<DurableMessage>>>()
}
pub async fn retrieve(&self, topic: TopicCode, message_id: MessageId, funs: &TardisFunsInst) -> TardisResult<DurableMessage> {
let select = Entity::find().filter(Column::Archived.eq(false)).filter(Column::Topic.eq(topic.to_string())).filter(Column::MessageId.eq(message_id.to_base64()));
let select = Entity::find().filter(Column::Archived.eq(false))
.filter(Column::Time.gte(Utc::now()))
.filter(Column::Topic.eq(topic.to_string())).filter(Column::MessageId.eq(message_id.to_base64()));
let conn = funs.reldb().conn();
let raw_conn = conn.raw_conn();
let model = select.one(raw_conn).await?;
Expand Down

0 comments on commit 1314d38

Please sign in to comment.