From 704ae00bfc5d7db43b8b158ddf1b13d4f0a4e7a7 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Fri, 10 Nov 2023 18:03:19 +0800 Subject: [PATCH 1/6] modify send and recieve logic --- tardis/Cargo.toml | 3 +- tardis/src/basic/tracing.rs | 2 +- tardis/src/cluster.rs | 2 + tardis/src/cluster/cluster_processor.rs | 384 +++++++++++-------- tardis/src/cluster/cluster_publish.rs | 191 +++++++++ tardis/src/cluster/cluster_receive.rs | 210 ++++++++++ tardis/src/cluster/cluster_watch_by_cache.rs | 21 +- tardis/src/cluster/cluster_watch_by_k8s.rs | 10 +- tardis/src/lib.rs | 32 +- tardis/src/utils/tardis_static.rs | 20 + tardis/src/web/ws_client.rs | 191 ++++++--- tardis/tests/test_cluster.rs | 115 ++++-- 12 files changed, 916 insertions(+), 265 deletions(-) create mode 100644 tardis/src/cluster/cluster_publish.rs create mode 100644 tardis/src/cluster/cluster_receive.rs diff --git a/tardis/Cargo.toml b/tardis/Cargo.toml index 81e3766d..659c2120 100644 --- a/tardis/Cargo.toml +++ b/tardis/Cargo.toml @@ -216,11 +216,12 @@ testcontainers-modules = { version = "0.1.2", features = [ [dev-dependencies] # Common -tokio = { version = "1", features = ["time", "rt", "macros", "sync"] } +tokio = { version = "1", features = ["time", "rt", "macros", "sync", "process"] } criterion = { version = "0.5" } poem-grpc-build = "0.2.22" prost = "0.11" strip-ansi-escapes = "0.2.0" +portpicker = "0.1.1" [[test]] name = "test_config" diff --git a/tardis/src/basic/tracing.rs b/tardis/src/basic/tracing.rs index e1452c19..19991dce 100644 --- a/tardis/src/basic/tracing.rs +++ b/tardis/src/basic/tracing.rs @@ -192,7 +192,7 @@ where L: SubscriberInitExt + 'static, { /// Initialize tardis tracing, this will set the global tardis tracing instance. - pub(crate) fn init(self) -> Arc { + pub fn init(self) -> Arc { static INITIALIZED: Once = Once::new(); let configer_list = self.configers; if INITIALIZED.is_completed() { diff --git a/tardis/src/cluster.rs b/tardis/src/cluster.rs index 2d8ea7c3..1e0b2e39 100644 --- a/tardis/src/cluster.rs +++ b/tardis/src/cluster.rs @@ -1,4 +1,6 @@ pub mod cluster_processor; +pub mod cluster_publish; +pub mod cluster_receive; mod cluster_watch_by_cache; #[cfg(feature = "k8s")] mod cluster_watch_by_k8s; diff --git a/tardis/src/cluster/cluster_processor.rs b/tardis/src/cluster/cluster_processor.rs index 15841183..5d2ab115 100644 --- a/tardis/src/cluster/cluster_processor.rs +++ b/tardis/src/cluster/cluster_processor.rs @@ -1,53 +1,170 @@ -use std::collections::HashMap; -use std::sync::Arc; +use std::borrow::Cow; +use std::collections::{HashMap, HashSet}; +use std::net::SocketAddr; +use std::sync::{Arc, OnceLock}; +use std::time::Duration; use futures_util::future::join_all; use futures_util::{SinkExt, StreamExt}; use poem::web::websocket::{BoxWebSocketUpgraded, Message, WebSocket}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::sync::{broadcast, RwLock}; +use tokio::sync::{broadcast, mpsc, oneshot, RwLock}; use tracing::{debug, error, info, trace, warn}; use crate::basic::error::TardisError; +use crate::cluster::cluster_publish::{do_publish_event, publish_event_one_response, ClusterEvent}; +use crate::cluster::cluster_receive::init_response_dispatcher; use crate::cluster::cluster_watch_by_cache; #[cfg(feature = "k8s")] use crate::cluster::cluster_watch_by_k8s; use crate::config::config_dto::FrameworkConfig; +use crate::tardis_static; use crate::web::web_server::TardisWebServer; use crate::web::ws_client::TardisWSClient; use crate::{basic::result::TardisResult, TardisFuns}; use async_trait::async_trait; pub const CLUSTER_NODE_WHOAMI: &str = "__cluster_node_who_am_i__"; +pub const EVENT_PING: &str = "tardis/ping"; pub const CLUSTER_MESSAGE_CACHE_SIZE: usize = 10000; +pub const WHOIAM_TIMEOUT: Duration = Duration::from_secs(30); -lazy_static! { - static ref SUBSCRIBES: Arc>>> = Arc::new(RwLock::new(HashMap::new())); - static ref CLIENT_MESSAGE_RESPONDER: Arc>>> = Arc::new(RwLock::new(None)); - static ref CLUSTER_CACHE_NODES: Arc>> = Arc::new(RwLock::new(Vec::new())); - static ref CLUSTER_CURRENT_NODE_ID: Arc> = Arc::new(RwLock::new(String::new())); +type StaticCowStr = Cow<'static, str>; +// static LOCAL_NODE_ID_SETTER: OnceLock = OnceLock::new(); +// static LOCAL_SOCKET_ADDR: OnceLock = OnceLock::new(); +tardis_static! { + pub async set local_socket_addr: SocketAddr; + pub async set local_node_id: String; + pub async set responsor_dispatcher: mpsc::Sender; + pub(crate) cache_nodes: Arc>>; + subscribers: Arc>>>; } +/// clone the cache_nodes_info at current time +pub async fn load_cache_nodes_info() -> HashMap { + cache_nodes().read().await.clone() +} + +#[derive(Debug, Clone, Eq, Hash, PartialEq)] +pub enum ClusterRemoteNodeKey { + SocketAddr(SocketAddr), + NodeId(String), +} + +impl From for ClusterRemoteNodeKey { + fn from(val: SocketAddr) -> Self { + ClusterRemoteNodeKey::SocketAddr(val) + } +} + +impl From for ClusterRemoteNodeKey { + fn from(val: String) -> Self { + ClusterRemoteNodeKey::NodeId(val) + } +} + +impl ClusterRemoteNodeKey { + pub fn as_socket_addr(&self) -> Option { + match self { + ClusterRemoteNodeKey::SocketAddr(socket_addr) => Some(*socket_addr), + _ => None, + } + } + pub fn as_node_id(&self) -> Option { + match self { + ClusterRemoteNodeKey::NodeId(node_id) => Some(node_id.clone()), + _ => None, + } + } +} + +impl std::fmt::Display for ClusterRemoteNodeKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ClusterRemoteNodeKey::SocketAddr(socket_addr) => write!(f, "{}", socket_addr), + ClusterRemoteNodeKey::NodeId(node_id) => write!(f, "[id]{}", node_id), + } + } +} + +pub type ClusterMessageId = String; + #[async_trait] pub trait TardisClusterSubscriber: Send + Sync + 'static { + fn event_name(&self) -> Cow<'static, str>; async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult>; } -struct ClusterSubscriberWhoAmI; +#[derive(Debug, Clone, Default)] +pub enum ClusterEventTarget { + #[default] + /// broadcast to all known nodes that id is known + Broadcast, + /// to single remote node + Single(ClusterRemoteNodeKey), + /// to multi nodes + Multi(Vec), + /// raw client + Client(Arc), +} + +impl ClusterEventTarget { + pub fn multi, I: IntoIterator>(iter: I) -> Self { + ClusterEventTarget::Multi(iter.into_iter().map(|v| v.into()).collect()) + } +} + +impl From for ClusterEventTarget { + fn from(val: SocketAddr) -> Self { + ClusterEventTarget::Single(ClusterRemoteNodeKey::SocketAddr(val)) + } +} + +impl From for ClusterEventTarget { + fn from(val: String) -> Self { + ClusterEventTarget::Single(ClusterRemoteNodeKey::NodeId(val)) + } +} + +impl<'s> From<&'s str> for ClusterEventTarget { + fn from(val: &'s str) -> Self { + ClusterEventTarget::Single(ClusterRemoteNodeKey::NodeId(val.to_string())) + } +} + +impl> From> for ClusterEventTarget { + fn from(val: Vec) -> Self { + ClusterEventTarget::Multi(val.into_iter().map(|id| ClusterRemoteNodeKey::NodeId(id.into())).collect::>()) + } +} + +impl From> for ClusterEventTarget { + fn from(val: Arc) -> Self { + ClusterEventTarget::Client(val) + } +} + +struct EventPing; #[async_trait] -impl TardisClusterSubscriber for ClusterSubscriberWhoAmI { +impl TardisClusterSubscriber for EventPing { + fn event_name(&self) -> Cow<'static, str> { + EVENT_PING.into() + } async fn subscribe(&self, _message_req: TardisClusterMessageReq) -> TardisResult> { - Ok(Some(Value::String(CLUSTER_CURRENT_NODE_ID.read().await.to_string()))) + Ok(Some(serde_json::to_value(local_node_id().await).expect("spec always be a valid json value"))) } } pub async fn init_by_conf(conf: &FrameworkConfig, cluster_server: &TardisWebServer) -> TardisResult<()> { if let Some(cluster_config) = &conf.cluster { let web_server_config = conf.web_server.as_ref().expect("missing web server config"); + let access_host = web_server_config.access_host.unwrap_or(web_server_config.host); + let access_port = web_server_config.access_port.unwrap_or(web_server_config.port); + let access_addr = SocketAddr::new(access_host, access_port); info!("[Tardis.Cluster] Initializing cluster"); - init_node(cluster_server).await?; + init_node(cluster_server, access_addr).await?; match cluster_config.watch_kind.to_lowercase().as_str() { #[cfg(feature = "k8s")] "k8s" => { @@ -65,53 +182,75 @@ pub async fn init_by_conf(conf: &FrameworkConfig, cluster_server: &TardisWebServ Ok(()) } -async fn init_node(cluster_server: &TardisWebServer) -> TardisResult<()> { +async fn init_node(cluster_server: &TardisWebServer, access_addr: SocketAddr) -> TardisResult<()> { info!("[Tardis.Cluster] Initializing node"); - if CLUSTER_CURRENT_NODE_ID.read().await.is_empty() { - *CLUSTER_CURRENT_NODE_ID.write().await = TardisFuns::field.nanoid(); - } - if !CLIENT_MESSAGE_RESPONDER.read().await.is_some() { - *CLIENT_MESSAGE_RESPONDER.write().await = Some(broadcast::channel::(CLUSTER_MESSAGE_CACHE_SIZE).0); - } + set_local_node_id(TardisFuns::field.nanoid()); + set_local_socket_addr(access_addr); + debug!("[Tardis.Cluster] Initializing response dispathcer"); + set_responsor_dispatcher(init_response_dispatcher()); debug!("[Tardis.Cluster] Register exchange route"); cluster_server.add_route(ClusterAPI).await; debug!("[Tardis.Cluster] Register default events"); - subscribe_event(CLUSTER_NODE_WHOAMI, Box::new(ClusterSubscriberWhoAmI {})).await; - + subscribe(EventPing).await; info!("[Tardis.Cluster] Initialized node"); Ok(()) } -pub async fn set_node_id(node_id: &str) { - *CLUSTER_CURRENT_NODE_ID.write().await = node_id.to_string(); -} - -pub async fn refresh_nodes(active_nodes: Vec<(String, u16)>) -> TardisResult<()> { +pub async fn refresh_nodes(active_nodes: &HashSet) -> TardisResult<()> { trace!("[Tardis.Cluster] Refreshing nodes"); trace!("[Tardis.Cluster] Find all active nodes: {:?}", active_nodes); - let mut cache_nodes = CLUSTER_CACHE_NODES.write().await; + let mut cache_nodes = cache_nodes().write().await; + let socket_set = cache_nodes.keys().filter_map(ClusterRemoteNodeKey::as_socket_addr).collect::>(); + // remove inactive nodes trace!("[Tardis.Cluster] Try remove inactive nodes from cache"); - cache_nodes.retain(|cache_node| active_nodes.iter().any(|(active_node_ip, active_node_port)| cache_node.ip == *active_node_ip && cache_node.port == *active_node_port)); + for inactive_node in socket_set.difference(active_nodes) { + if let Some(remote) = cache_nodes.remove(&ClusterRemoteNodeKey::SocketAddr(*inactive_node)) { + // load_cache_nodes_info() + info!("[Tardis.Cluster] remove inactive node {remote:?} from cache"); + cache_nodes.remove(&ClusterRemoteNodeKey::NodeId(remote.node_id)); + // TODO + // be nice to the server, close the connection + // remote.client + } + } + // add new nodes trace!("[Tardis.Cluster] Try add new active nodes to cache"); - let added_active_nodes = active_nodes - .iter() - .filter(|(active_node_ip, active_node_port)| !cache_nodes.iter().any(|cache_node| cache_node.ip == *active_node_ip && cache_node.port == *active_node_port)) - .collect::>(); - for (active_node_ip, active_node_port) in added_active_nodes { - cache_nodes.push(add_node(active_node_ip, *active_node_port).await?); + for new_nodes_addr in active_nodes.difference(&socket_set) { + if local_socket_addr().await == new_nodes_addr { + // skip local node + continue; + } + let remote = add_remote_node(*new_nodes_addr).await?; + info!("[Tardis.Cluster] New remote nodes: {remote:?}"); + + cache_nodes.insert(ClusterRemoteNodeKey::SocketAddr(*new_nodes_addr), remote.clone()); + cache_nodes.insert(ClusterRemoteNodeKey::NodeId(remote.node_id.clone()), remote); } trace!("[Tardis.Cluster] Refreshed nodes"); + let mut table = String::new(); + for (k, v) in cache_nodes.iter() { + use std::fmt::Write; + writeln!(&mut table, "{k:20} | {v:40} ").expect("shouldn't fail"); + } + trace!("[Tardis.Cluster] cache nodes table \n{table}"); Ok(()) } -async fn add_node(node_ip: &str, node_port: u16) -> TardisResult { - debug!("[Tardis.Cluster] Connect node: {node_ip}:{node_port}"); - let client = TardisFuns::ws_client(&format!("ws://{node_ip}:{node_port}/tardis/cluster/ws/exchange"), move |message| async move { +async fn add_remote_node(socket_addr: SocketAddr) -> TardisResult { + if *local_socket_addr().await == socket_addr { + return Err(TardisError::wrap( + &format!("[Tardis.Cluster] [Client] add remote node {socket_addr}: can't add local node"), + "-1-tardis-cluster-add-remote-node-error", + )); + } + debug!("[Tardis.Cluster] Connect node: {socket_addr}"); + // is this node + let client = TardisFuns::ws_client(&format!("ws://{socket_addr}/tardis/cluster/ws/exchange"), move |message| async move { if let tokio_tungstenite::tungstenite::Message::Text(message) = message { match TardisFuns::json.str_to_obj::(&message) { Ok(message_resp) => { - if let Err(error) = CLIENT_MESSAGE_RESPONDER.read().await.as_ref().expect("Global variable [CLIENT_MESSAGE_RESPONDER] doesn't exist").send(message_resp) { + if let Err(error) = responsor_dispatcher().await.send(message_resp).await { error!("[Tardis.Cluster] [Client] response message {message}: {error}"); } } @@ -121,117 +260,28 @@ async fn add_node(node_ip: &str, node_port: u16) -> TardisResult) { - info!("[Tardis.Cluster] [Server] subscribe event {event}"); - SUBSCRIBES.write().await.insert(event.to_string(), sub_fun); -} - -pub async fn publish_event(event: &str, message: Value, node_ids: Option>) -> TardisResult { - trace!("[Tardis.Cluster] [Client] publish event {event} , message {message} , to {node_ids:?}"); - let cache_nodes = CLUSTER_CACHE_NODES.read().await; - if !cache_nodes.iter().any(|cache_node| !cache_node.current) { - return Err(TardisError::not_found( - &format!("[Tardis.Cluster] [Client] publish event {event} , message {message} : no active nodes found"), - "404-tardis-cluster-publish-message-node-not-exit", - )); - } - trace!( - "[Tardis.Cluster] [Client] cache nodes {}", - cache_nodes - .iter() - .map(|cache_node| format!("[node_id={} , {}:{} , current={}]", cache_node.id, cache_node.ip, cache_node.port, cache_node.current)) - .collect::>() - .join(" ") - ); - let node_clients = cache_nodes - .iter() - .filter(|cache_node| cache_node.client.is_some() && node_ids.as_ref().map(|node_ids| node_ids.contains(&cache_node.id.as_str())).unwrap_or(true)) - .map(|cache_node| (cache_node.client.as_ref().expect("ignore"), &cache_node.id)) - .collect::>(); - if let Some(node_ids) = node_ids { - if node_clients.len() != node_ids.len() { - let not_found_node_ids = node_ids.into_iter().filter(|node_id| !node_clients.iter().any(|node_client| node_client.1 == node_id)).collect::>().join(","); - return Err(TardisError::not_found( - &format!("[Tardis.Cluster] [Client] publish event {event} , message {message} to [{not_found_node_ids}] not found"), - "404-tardis-cluster-publish-message-node-not-exit", - )); - } - } - do_publish_event(event, message, node_clients.iter().map(|n| n.0).collect::>()).await -} - -async fn do_publish_event(event: &str, message: Value, node_clients: Vec<&TardisWSClient>) -> TardisResult { - let message_req = TardisClusterMessageReq::new(message.clone(), event.to_string(), CLUSTER_CURRENT_NODE_ID.read().await.to_string()); - let ws_message = tokio_tungstenite::tungstenite::Message::Text(TardisFuns::json.obj_to_string(&message_req)?); - let publish_result = join_all(node_clients.iter().map(|client| client.send_raw_with_retry(ws_message.clone()))).await; - - if publish_result - .iter() - .filter(|result| { - if let Err(error) = result { - error!("[Tardis.Cluster] [Client] publish event {event} , message {message}: {error}"); - true - } else { - false - } - }) - .count() - != 0 - { - Err(TardisError::wrap( - &format!("[Tardis.Cluster] [Client] publish event {event} , message {message} error"), - "-1-tardis-cluster-publish-message-error", - )) - } else { - Ok(message_req.msg_id()) - } + let client = Arc::new(client); + let resp = ClusterEvent::new(EVENT_PING).target(client.clone()).one_response(Some(WHOIAM_TIMEOUT)).publish_one_response().await?; + let resp_node_id = resp.resp_node_id; + let remote = TardisClusterNodeRemote { node_id: resp_node_id, client }; + Ok(remote) } -async fn publish_event_wait_resp(msg_id: &str) -> TardisResult { - loop { - match CLIENT_MESSAGE_RESPONDER.read().await.as_ref().expect("Global variable [CLIENT_MESSAGE_RESPONDER] doesn't exist").subscribe().recv().await { - Ok(message_response) => { - if message_response.msg_id == msg_id { - return Ok(message_response); - } - } - Err(error) => { - error!("[Tardis.Cluster] [Client] receive message id {msg_id}: {error}"); - return Err(TardisError::wrap( - &format!("[Tardis.Cluster] [Client] receive message id {msg_id}: {error}"), - "-1-tardis-cluster-receive-message-error", - )); - } - } - } +pub async fn subscribe_boxed(subscriber: Box) { + let event_name = subscriber.event_name(); + info!("[Tardis.Cluster] [Server] subscribe event {event_name}"); + subscribers().write().await.insert(event_name, subscriber); } -pub async fn publish_event_and_wait_resp(event: &str, message: Value, node_id: &str) -> TardisResult { - trace!("[Tardis.Cluster] [Client] publish and wait resp, event {event} , message {message} , to {node_id}"); - let msg_id = publish_event(event, message, Some(vec![node_id])).await?; - publish_event_wait_resp(&msg_id).await +pub async fn subscribe(subscriber: S) { + let event_name = subscriber.event_name(); + info!("[Tardis.Cluster] [Server] subscribe event {event_name}"); + subscribers().write().await.insert(event_name, Box::new(subscriber)); } #[derive(Deserialize, Serialize, Clone, Debug)] pub struct TardisClusterMessageReq { - msg_id: String, + pub(crate) msg_id: String, pub req_node_id: String, pub msg: Value, pub event: String, @@ -254,7 +304,7 @@ impl TardisClusterMessageReq { #[derive(Deserialize, Serialize, Clone, Debug)] pub struct TardisClusterMessageResp { - msg_id: String, + pub(crate) msg_id: String, pub resp_node_id: String, pub msg: Value, } @@ -268,15 +318,45 @@ impl TardisClusterMessageResp { self.msg_id.to_string() } } - -pub struct TardisClusterNode { +#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] +pub struct TardisClusterNodeSpecifier { pub id: String, - pub ip: String, - pub port: u16, - pub current: bool, - pub client: Option, + pub socket_addr: SocketAddr, +} + +pub struct TardisClusterNodeLocal { + pub spec: TardisClusterNodeSpecifier, } +#[derive(Debug, Clone)] +pub struct TardisClusterNodeRemote { + pub node_id: String, + pub client: Arc, +} + +impl std::fmt::Display for TardisClusterNodeRemote { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{is_online} / {node_id} / {url}", + is_online = if self.client.is_connected() { "online" } else { "offline" }, + node_id = self.node_id, + url = self.client.url + ) + } +} +pub enum TardisClusterNode { + Local(TardisClusterNodeLocal), + Remote(TardisClusterNodeRemote), +} + +impl TardisClusterNode {} + +use std::hash::Hash; + +use super::cluster_receive::listen::{self, Listener}; +use super::cluster_receive::listen_reply; + #[derive(Debug, Clone)] struct ClusterAPI; @@ -292,18 +372,14 @@ impl ClusterAPI { trace!("[Tardis.Cluster] [Server] receive message {ws_message}"); match TardisFuns::json.str_to_obj::(&ws_message) { Ok(message_req) => { - if let Some(subscriber) = SUBSCRIBES.read().await.get(&message_req.event) { + if let Some(subscriber) = subscribers().read().await.get(&Cow::Owned(message_req.event.to_string())) { let msg_id = message_req.msg_id(); match subscriber.subscribe(message_req).await { Ok(Some(message_resp)) => { if let Err(error) = socket .send(Message::Text( TardisFuns::json - .obj_to_string(&TardisClusterMessageResp::new( - message_resp.clone(), - msg_id, - CLUSTER_CURRENT_NODE_ID.read().await.to_string(), - )) + .obj_to_string(&TardisClusterMessageResp::new(message_resp.clone(), msg_id, local_node_id().await.to_string())) .expect("ignore"), )) .await diff --git a/tardis/src/cluster/cluster_publish.rs b/tardis/src/cluster/cluster_publish.rs new file mode 100644 index 00000000..0a532c02 --- /dev/null +++ b/tardis/src/cluster/cluster_publish.rs @@ -0,0 +1,191 @@ +use std::borrow::Cow; +use std::sync::Arc; +use std::time::Duration; + +use super::cluster_processor::{TardisClusterMessageReq, TardisClusterMessageResp}; +use super::cluster_receive::{listen::*, listen_reply}; +use crate::cluster::cluster_processor::{cache_nodes, local_node_id, ClusterEventTarget, ClusterRemoteNodeKey}; +use crate::{ + basic::{error::TardisError, result::TardisResult}, + web::ws_client::TardisWSClient, + TardisFuns, +}; +use futures::future::join_all; +use serde::Serialize; +use serde_json::Value; +use tokio::sync::broadcast; +use tracing::{error, trace}; +pub struct EventResponse { + message_id: String, + rx: broadcast::Receiver, +} + +#[derive(Debug, Clone)] +pub struct ClusterEvent { + event: Cow<'static, str>, + message: Value, + target: ClusterEventTarget, + listener: L, +} + +impl ClusterEvent { + pub fn new(event: impl Into>) -> Self { + Self { + event: event.into(), + message: Value::Null, + target: ClusterEventTarget::Broadcast, + listener: Once { timeout: None }, + } + } +} + +impl ClusterEvent { + pub fn listener(self, listener: L2) -> ClusterEvent { + ClusterEvent { + event: self.event, + message: self.message, + target: self.target, + listener, + } + } + pub fn one_response(self, timeout: Option) -> ClusterEvent { + ClusterEvent { + event: self.event, + message: self.message, + target: self.target, + listener: Once { timeout }, + } + } + pub fn no_response(self) -> ClusterEvent { + ClusterEvent { + event: self.event, + message: self.message, + target: self.target, + listener: Never, + } + } + pub fn message(self, message: &T) -> TardisResult { + Ok(Self { + message: crate::TardisFuns::json.obj_to_json(message)?, + ..self + }) + } + pub fn json_message(self, message: Value) -> Self { + Self { message, ..self } + } + pub fn target(self, target: impl Into) -> Self { + Self { target: target.into(), ..self } + } +} + +impl ClusterEvent { + pub async fn publish_one_response(self) -> TardisResult { + publish_event_with_listener(self.event, self.message, self.target, self.listener).await?.await.map_err(|e| { + let error_info = format!("[Tardis.Cluster] [Client] Oneshot receive error: {e}, this may caused by timeout"); + tracing::error!("{error_info}"); + TardisError::wrap(&error_info, "-1-tardis-cluster-receive-message-error") + }) + } +} + +impl ClusterEvent { + pub async fn publish(self) -> TardisResult { + publish_event_with_listener(self.event, self.message, self.target, self.listener).await + } +} + +pub async fn publish_event_no_response(event: impl Into>, message: Value, target: impl Into) -> TardisResult { + publish_event_with_listener(event, message, target, Never).await +} + +pub async fn publish_event_one_response( + event: impl Into>, + message: Value, + target: impl Into, + timeout: Option, +) -> TardisResult { + publish_event_with_listener(event, message, target, Once { timeout }).await?.await.map_err(|e| { + let error_info = format!("[Tardis.Cluster] [Client] Oneshot receive error: {e}, this may caused by timeout"); + tracing::error!("{error_info}"); + TardisError::wrap(&error_info, "-1-tardis-cluster-receive-message-error") + }) +} + +pub async fn publish_event_with_listener(event: impl Into>, message: Value, target: impl Into, listener: S) -> TardisResult { + let node_id = local_node_id().await.to_string(); + let event = event.into(); + let target = target.into(); + let target_debug = format!("{target:?}"); + trace!("[Tardis.Cluster] [Client] publish event {event} , message {message} , to {target_debug}"); + + let nodes: Vec<_> = match target { + ClusterEventTarget::Broadcast => { + cache_nodes().read().await.iter().filter(|(key, _)| matches!(key, ClusterRemoteNodeKey::NodeId(_))).map(|(_, val)| val.client.clone()).collect() + } + ClusterEventTarget::Single(ref addr) => cache_nodes().read().await.get(addr).map(|node| node.client.clone()).into_iter().collect(), + ClusterEventTarget::Multi(ref multi) => { + let cache_nodes = cache_nodes().read().await; + multi.iter().filter_map(|addr| cache_nodes.get(addr).map(|node| node.client.clone())).collect() + } + ClusterEventTarget::Client(client) => vec![client], + }; + if nodes.is_empty() { + error!( + "[Tardis.Cluster] [Client] publish event {event} , message {message} , to {target} error: can't find any target node", + event = event, + message = message, + target = target_debug + ); + return Err(TardisError::wrap( + &format!( + "[Tardis.Cluster] [Client] publish event {event} , message {message} , to {target} error: can't find any target node", + event = event, + message = message, + target = target_debug + ), + "-1-tardis-cluster-publish-message-error", + )); + } + let message_req = TardisClusterMessageReq::new(message.clone(), event.to_string(), node_id); + let message_id = message_req.msg_id.clone(); + let reply = listen_reply(listener, message_id).await; + do_publish_event(message_req, nodes).await?; + Ok(reply) +} + +pub async fn do_publish_event(message_req: TardisClusterMessageReq, clients: impl IntoIterator>) -> TardisResult<()> { + let ws_message = tokio_tungstenite::tungstenite::Message::Text(TardisFuns::json.obj_to_string(&message_req)?); + let publish_result = join_all(clients.into_iter().map(|client| { + let ws_message = ws_message.clone(); + async move { client.send_raw_with_retry(ws_message).await } + })) + .await; + if publish_result + .iter() + .filter(|result| { + if let Err(error) = result { + error!( + "[Tardis.Cluster] [Client] publish event {event} , message {message}: {error}", + event = message_req.event, + message = message_req.msg + ); + true + } else { + false + } + }) + .count() + != 0 + { + Err(TardisError::wrap( + &format!( + "[Tardis.Cluster] [Client] publish event {event} , message {message} error", + event = message_req.event, + message = message_req.msg + ), + "-1-tardis-cluster-publish-message-error", + )) + } else { + Ok(()) + } +} diff --git a/tardis/src/cluster/cluster_receive.rs b/tardis/src/cluster/cluster_receive.rs new file mode 100644 index 00000000..f0bcc1ab --- /dev/null +++ b/tardis/src/cluster/cluster_receive.rs @@ -0,0 +1,210 @@ +use std::{collections::HashMap, pin::Pin}; + +use async_trait::async_trait; +use tokio::sync::{broadcast, mpsc, oneshot, RwLock}; + +use crate::tardis_static; + +use self::listen::Listener; + +use super::cluster_processor::{TardisClusterMessageResp, CLUSTER_MESSAGE_CACHE_SIZE}; + +enum ResponseFn { + Once(Box), + Multitime(Box bool + Send + Sync>), +} +tardis_static! { + responsor_subscribers: RwLock>; +} + +pub async fn listen_reply(strategy: S, id: String) -> S::Reply { + strategy.subscribe(id).await +} + +pub(crate) fn init_response_dispatcher() -> mpsc::Sender { + let (tx, mut rx) = mpsc::channel::(CLUSTER_MESSAGE_CACHE_SIZE); + // rx is for ws connections + // tx is for response dispatcher + let dispatch_task = async move { + while let Some(resp) = rx.recv().await { + let id = resp.msg_id.clone(); + tracing::trace!( + "[Tardis.Cluster] dispatching received response: {id} from {node_id}, message: {resp:?}", + id = id, + node_id = resp.resp_node_id, + resp = resp + ); + if let Some(subscriber) = responsor_subscribers().read().await.get(&id) { + match subscriber { + ResponseFn::Once(_) => { + tokio::spawn(async move { + if let Some(ResponseFn::Once(f)) = responsor_subscribers().write().await.remove(&id) { + f(resp) + } + }); + } + ResponseFn::Multitime(f) => { + let drop_me = f(resp); + if drop_me { + tokio::spawn(async move { + responsor_subscribers().write().await.remove(&id); + }); + } + } + } + } else { + tracing::trace!("[Tardis.Cluster] no subscriber found for message_id: {id}.", id = id); + } + } + }; + tokio::spawn(dispatch_task); + tx +} + +pub mod listen { + use std::{collections::HashMap, time::Duration}; + + use async_trait::async_trait; + use tokio::sync::{broadcast, mpsc, oneshot}; + + use crate::{ + basic::{error::TardisError, result::TardisResult}, + cluster::cluster_processor::TardisClusterMessageResp, + tardis_static, + }; + + use super::ResponseFn; + #[async_trait] + pub trait Listener { + type Reply; + async fn subscribe(self, id: String) -> Self::Reply; + } + + /// The message will be received only once. + #[derive(Debug, Default, Clone, Copy)] + pub struct Once { + pub(crate) timeout: Option, + } + + impl Once { + pub fn with_timeout(timeout: Duration) -> Self { + Self { timeout: Some(timeout) } + } + } + + #[async_trait] + impl Listener for Once { + type Reply = oneshot::Receiver; + + async fn subscribe(self, id: String) -> Self::Reply { + let (tx, rx) = oneshot::channel(); + let timeout_handle = { + let id = id.clone(); + self.timeout.map(|timeout| { + tokio::spawn(async move { + tokio::time::sleep(timeout).await; + + // super::responsor_subscribers().write().await.remove(&id); + // tracing::trace!("[Tardis.Cluster] message {id} timeout"); + + if let Some(_task) = super::responsor_subscribers().write().await.remove(&id) { + tracing::trace!("[Tardis.Cluster] message {id} timeout"); + } + }) + }) + }; + super::responsor_subscribers().write().await.insert( + id, + ResponseFn::Once(Box::new(move |resp| { + tracing::trace!("[Tardis.Cluster] Once listener receive resp {resp:?}"); + // cleanup timeout callback + if let Some(ref timeout_handle) = timeout_handle { + timeout_handle.abort(); + } + if let Err(e) = tx.send(resp) { + tracing::debug!("[Tardis.Cluster] message {e:?} missing receiver"); + } + })), + ); + rx + } + } + + #[derive(Debug, Default, Clone, Copy)] + /// send a message and receive all the responses until the receiver is dropped. + pub struct Stream {} + + #[async_trait] + impl Listener for Stream { + type Reply = mpsc::Receiver; + + async fn subscribe(self, id: String) -> Self::Reply { + let (tx, rx) = mpsc::channel(100); + { + let tx = tx.clone(); + let id = id.clone(); + tokio::spawn(async move { + tx.closed().await; + super::responsor_subscribers().write().await.remove(&id); + }); + } + super::responsor_subscribers().write().await.insert( + id, + ResponseFn::Multitime(Box::new(move |resp| { + if tx.is_closed() { + true + } else { + let tx = tx.clone(); + tokio::spawn(async move { tx.send(resp).await }); + false + } + })), + ); + rx + } + } + + /// Send a message and ignore the response. + #[derive(Debug, Default, Clone, Copy)] + pub struct Never; + + #[async_trait] + impl Listener for Never { + type Reply = String; + + async fn subscribe(self, id: String) -> Self::Reply { + id + } + } + + #[derive(Debug, Default, Clone, Copy)] + pub struct Broadcast {} + + #[async_trait] + impl Listener for Broadcast { + type Reply = broadcast::Receiver; + + async fn subscribe(self, id: String) -> Self::Reply { + let (tx, rx) = broadcast::channel(100); + { + let tx = tx.clone(); + let id = id.clone(); + tokio::spawn(async move { + if tx.receiver_count() == 0 { + super::responsor_subscribers().write().await.remove(&id); + } else { + tokio::task::yield_now().await; + } + }); + } + super::responsor_subscribers().write().await.insert( + id, + ResponseFn::Multitime(Box::new(move |resp| { + let _ = tx.send(resp); + tx.receiver_count() == 0 + })), + ); + rx + } + } +} diff --git a/tardis/src/cluster/cluster_watch_by_cache.rs b/tardis/src/cluster/cluster_watch_by_cache.rs index 93abc679..67f4b122 100644 --- a/tardis/src/cluster/cluster_watch_by_cache.rs +++ b/tardis/src/cluster/cluster_watch_by_cache.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{net::SocketAddr, time::Duration, collections::HashSet}; use chrono::Utc; use tokio::time; @@ -16,16 +16,18 @@ pub const CACHE_NODE_INFO_KEY: &str = "tardis:cluster:node"; pub const CACHE_NODE_ALIVE_CHECK_DELAYED_TIMES: i8 = 3; pub async fn init(cluster_config: &ClusterConfig, web_server_config: &WebServerConfig) -> TardisResult<()> { - let access_host = web_server_config.access_host.as_ref().unwrap_or(&web_server_config.host).to_string(); + let access_host = web_server_config.access_host.unwrap_or(web_server_config.host); let access_port = web_server_config.access_port.unwrap_or(web_server_config.port); let cache_check_interval_sec = cluster_config.cache_check_interval_sec.unwrap_or(10); + let access_addr = SocketAddr::new(access_host, access_port); + // heart beat tokio::spawn(async move { let client = TardisFuns::cache(); let mut interval = time::interval(Duration::from_secs(cache_check_interval_sec as u64)); loop { { trace!("[Tardis.Cluster] [Client] heartbeat..."); - if let Err(error) = client.hset(CACHE_NODE_INFO_KEY, &format!("{},{}", access_host, access_port), &format!("{}", Utc::now().timestamp())).await { + if let Err(error) = client.hset(CACHE_NODE_INFO_KEY, &access_addr.to_string(), &Utc::now().timestamp().to_string()).await { error!("[Tardis.Cluster] [Client] heartbeat error: {}", error); } } @@ -53,14 +55,11 @@ async fn watch(client: &TardisCacheClient, cache_check_interval_sec: i32) -> Tar let active_ts = Utc::now().timestamp() - cache_check_interval_sec as i64 * CACHE_NODE_ALIVE_CHECK_DELAYED_TIMES as i64 - 1; let active_nodes = all_nodes .iter() - .filter(|(_, active_node_ts)| active_node_ts.parse::().unwrap_or(0) > active_ts) - .map(|(active_node_key, _)| { - let node_info = active_node_key.split(',').collect::>(); - (node_info[0].to_string(), node_info[1].parse::().unwrap_or(0)) - }) - .collect::>(); - cluster_processor::refresh_nodes(active_nodes).await?; - let inactive_nodes = all_nodes.iter().filter(|(_, active_node_ts)| active_node_ts.parse::().unwrap_or(0) <= active_ts).collect::>(); + .filter_map(|(active_node_key, active_node_ts)| (active_node_ts.parse::().unwrap_or(i64::MIN) > active_ts).then_some(active_node_key)) + .filter_map(|active_node_key| active_node_key.parse::().ok()) + .collect::>(); + cluster_processor::refresh_nodes(&active_nodes).await?; + let inactive_nodes = all_nodes.iter().filter(|(_, active_node_ts)| active_node_ts.parse::().unwrap_or(i64::MIN) <= active_ts).collect::>(); for (inactive_node_key, _) in inactive_nodes { client.hdel(CACHE_NODE_INFO_KEY, inactive_node_key).await?; } diff --git a/tardis/src/cluster/cluster_watch_by_k8s.rs b/tardis/src/cluster/cluster_watch_by_k8s.rs index 31e45339..547542b0 100644 --- a/tardis/src/cluster/cluster_watch_by_k8s.rs +++ b/tardis/src/cluster/cluster_watch_by_k8s.rs @@ -1,3 +1,8 @@ +use std::{ + collections::HashSet, + net::{IpAddr, SocketAddr}, +}; + use k8s_openapi::api::core::v1::{Endpoints, Service}; use kube::{api::WatchParams, Api, Client}; use tracing::{error, trace}; @@ -74,8 +79,9 @@ async fn refresh(k8s_svc: &str, web_server_port: u16) -> TardisResult<()> { .flat_map(|subset| subset.addresses.as_ref().map(|addresses| addresses.iter().map(|address| address.ip.to_string()).collect::>()).unwrap_or_default()) }) .map(|ip: String| (ip, port_mapping)) - .collect::>(); - cluster_processor::refresh_nodes(active_nodes).await?; + .filter_map(|(ip, port)| ip.parse::().map(|ip_addr| SocketAddr::new(ip_addr, port)).ok()) + .collect::>(); + cluster_processor::refresh_nodes(&active_nodes).await?; Ok(()) } diff --git a/tardis/src/lib.rs b/tardis/src/lib.rs index 62b5347a..31f2190c 100644 --- a/tardis/src/lib.rs +++ b/tardis/src/lib.rs @@ -152,15 +152,15 @@ pub use tokio; pub use tracing as log; // we still need to pub use tracing for some macros // in tracing relies on `$crate` witch infers `tracing`. -pub use tracing; -pub use url; - use basic::error::TardisErrorWithExt; use basic::result::TardisResult; use basic::tracing::TardisTracing; +pub use paste; #[cfg(feature = "tardis-macros")] #[cfg(any(feature = "reldb-postgres", feature = "reldb-mysql"))] pub use tardis_macros::{TardisCreateEntity, TardisCreateIndex, TardisCreateTable, TardisEmptyBehavior, TardisEmptyRelation}; +pub use tracing; +pub use url; use crate::basic::field::TardisField; use crate::basic::json::TardisJson; @@ -954,18 +954,32 @@ impl TardisFuns { } #[cfg(feature = "cluster")] - pub async fn cluster_subscribe_event(event: &str, sub_fun: Box) { - cluster::cluster_processor::subscribe_event(event, sub_fun).await; + pub async fn cluster_subscribe_event_boxed(subscriber: Box) { + cluster::cluster_processor::subscribe_boxed(subscriber).await; + } + + #[cfg(feature = "cluster")] + pub async fn cluster_subscribe_event(subscriber: S) { + cluster::cluster_processor::subscribe(subscriber).await; } #[cfg(feature = "cluster")] - pub async fn cluster_publish_event(event: &str, message: serde_json::Value, node_ids: Option>) -> TardisResult { - cluster::cluster_processor::publish_event(event, message, node_ids).await + pub async fn cluster_publish_event( + event: impl Into>, + message: serde_json::Value, + target: impl Into, + ) -> TardisResult { + use cluster::cluster_publish::ClusterEvent; + ClusterEvent::new(event).json_message(message).target(target).no_response().publish().await } #[cfg(feature = "cluster")] - pub async fn cluster_publish_event_and_wait_resp(event: &str, message: serde_json::Value, node_id: &str) -> TardisResult { - cluster::cluster_processor::publish_event_and_wait_resp(event, message, node_id).await + pub async fn cluster_publish_event_one_resp( + event: impl Into>, + message: serde_json::Value, + node_id: &str, + ) -> TardisResult { + cluster::cluster_publish::publish_event_one_response(event, message, node_id, None).await } /// # Parameters diff --git a/tardis/src/utils/tardis_static.rs b/tardis/src/utils/tardis_static.rs index 210a468e..a5d3297c 100644 --- a/tardis/src/utils/tardis_static.rs +++ b/tardis/src/utils/tardis_static.rs @@ -30,6 +30,24 @@ macro_rules! tardis_static { () => { }; + ($(#[$attr:meta])* $vis:vis async set $ident:ident :$Type:ty; $($rest: tt)*) => { + $crate::paste::paste! { + static [<__ $ident:upper _SYNC>]: OnceLock<$Type> = OnceLock::new(); + $vis fn [](init: $Type) { + [<__ $ident:upper _SYNC>].get_or_init(|| init); + } + $(#[$attr])* + $vis async fn $ident() -> &'static $Type { + loop { + match [<__ $ident:upper _SYNC>].get() { + Some(val) => break val, + None => { $crate::tokio::task::yield_now().await; } + } + } + } + $crate::tardis_static!($($rest)*); + } + }; ($(#[$attr:meta])* $vis:vis async $ident:ident :$Type:ty = $init: expr; $($rest: tt)*) => { $(#[$attr])* $vis async fn $ident() -> &'static $Type { @@ -55,6 +73,8 @@ macro_rules! tardis_static { ($(#[$attr:meta])* $vis:vis $ident:ident :$Type:ty; $($rest: tt)*) => { $crate::tardis_static!($(#[$attr])* $vis $ident: $Type = Default::default(); $($rest)*); }; + + } #[cfg(test)] diff --git a/tardis/src/web/ws_client.rs b/tardis/src/web/ws_client.rs index 9faa75af..2d459b8b 100644 --- a/tardis/src/web/ws_client.rs +++ b/tardis/src/web/ws_client.rs @@ -9,10 +9,11 @@ use native_tls::TlsConnector; use serde::de::Deserialize; use serde::Serialize; use serde_json::Value; -use tokio::{net::TcpStream, sync::Mutex}; +use tokio::sync::{mpsc, Notify, OwnedSemaphorePermit, RwLock, Semaphore}; +use tokio::{net::TcpStream, sync::watch}; use tokio_tungstenite::tungstenite::{self, Error, Message}; use tokio_tungstenite::{Connector, MaybeTlsStream, WebSocketStream}; -use tracing::info; +use tracing::{debug, info}; use tracing::{trace, warn}; use url::Url; @@ -20,31 +21,56 @@ use crate::basic::error::TardisError; use crate::basic::result::TardisResult; use crate::TardisFuns; -type Fun = Arc Pin> + Send + Sync>> + Send + Sync>; +type OnMsgCbk = Arc Pin> + Send + Sync>> + Send + Sync>; +// with a callback function to handle inbound messages, but never handle inbound messages positively. +// and then, we should also send messages through this client. + +#[derive(Clone)] pub struct TardisWSClient { - str_url: String, - fun: Fun, - write: Mutex>, Message>>>>, + pub(crate) url: Url, + on_message: OnMsgCbk, + sender: Arc>>, + connection_semaphore: Arc, +} + +impl std::fmt::Debug for TardisWSClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TardisWSClient").field("url", &self.url.to_string()).field("connected", &self.is_connected()).finish() + } } impl TardisWSClient { - pub async fn connect(str_url: &str, fun: F) -> TardisResult + pub async fn connect(str_url: &str, on_message: F) -> TardisResult where F: Fn(Message) -> T + Send + Sync + Copy + 'static, T: Future> + Send + Sync + 'static, { - Self::do_connect(str_url, Arc::new(move |m| Box::pin(fun(m))), false).await + let url = Url::parse(str_url).map_err(|_| TardisError::format_error(&format!("[Tardis.WSClient] Invalid url {str_url}"), "406-tardis-ws-url-error"))?; + + let connection_semaphore = Arc::new(Semaphore::const_new(1)); + let permit = connection_semaphore.clone().acquire_owned().await.expect("newly created semaphore should not fail"); + let tx = Self::do_connect(&url, Arc::new(move |m| Box::pin(on_message(m))), false, permit).await?; + let sender = Arc::new(RwLock::new(tx)); + Ok(TardisWSClient { + url, + on_message: Arc::new(move |m| Box::pin(on_message(m))), + sender, + connection_semaphore, + }) } - async fn do_connect(str_url: &str, fun: Fun, retry: bool) -> TardisResult { - let url = Url::parse(str_url).map_err(|_| TardisError::format_error(&format!("[Tardis.WSClient] Invalid url {str_url}"), "406-tardis-ws-url-error"))?; + pub fn is_connected(&self) -> bool { + self.connection_semaphore.available_permits() == 0 + } + + async fn do_connect(url: &Url, on_message: OnMsgCbk, retry: bool, permit: OwnedSemaphorePermit) -> TardisResult> { info!( "[Tardis.WSClient] {}, host:{}, port:{}", if retry { "Re-initializing" } else { "Initializing" }, url.host_str().unwrap_or(""), url.port().unwrap_or(0) ); - let connect = if !str_url.starts_with("wss") { + let connect = if url.scheme() != "wss" { tokio_tungstenite::connect_async(url.clone()).await } else { tokio_tungstenite::connect_async_tls_with_config( @@ -60,11 +86,11 @@ impl TardisWSClient { ) .await }; - let (client, _) = connect.map_err(|error| { + let (stream, _) = connect.map_err(|error| { if !retry { - TardisError::format_error(&format!("[Tardis.WSClient] Failed to connect {str_url} {error}"), "500-tardis-ws-client-connect-error") + TardisError::format_error(&format!("[Tardis.WSClient] Failed to connect {url} {error}"), "500-tardis-ws-client-connect-error") } else { - TardisError::format_error(&format!("[Tardis.WSClient] Failed to reconnect {str_url} {error}"), "500-tardis-ws-client-reconnect-error") + TardisError::format_error(&format!("[Tardis.WSClient] Failed to reconnect {url} {error}"), "500-tardis-ws-client-reconnect-error") } })?; info!( @@ -73,27 +99,74 @@ impl TardisWSClient { url.host_str().unwrap_or(""), url.port().unwrap_or(0) ); - let (write, mut read) = client.split(); - let write = Arc::new(Mutex::new(write)); - let reply = write.clone(); - let fun_clone = fun.clone(); - tokio::spawn(async move { - while let Some(Ok(message)) = read.next().await { - trace!("[Tardis.WSClient] WS receive: {}", message); - if let Some(resp) = fun_clone(message).await { - trace!("[Tardis.WSClient] WS send: {}", resp); - if let Err(error) = reply.lock().await.send(resp).await { - warn!("[Tardis.WSClient] Failed to send message : {error}"); - break; + let (mut ws_tx, mut ws_rx) = stream.split(); + // let ws_tx = Arc::new(Mutex::new(ws_tx)); + // let reply = ws_tx.clone(); + // let (outbound_tx, mut outbound_rx) = mpsc::unbounded_channel::(); + + let (outbound_quene_tx, mut outbound_quene_rx) = mpsc::unbounded_channel::(); + + // there should be two quene: + // 1. out to client quene + // 2. client to remote quene + + // outbound side + let ob_handle = { + let url = url.clone(); + tokio::spawn(async move { + while let Some(message) = outbound_quene_rx.recv().await { + if let Err(e) = ws_tx.send(message).await { + debug!("[Tardis.WSClient] client: {url} error when send to websocket: {e}") + // websocket was closed + } + } + }) + }; + + // inbound side + let ib_handle = { + let on_message = on_message.clone(); + + let outbound_quene_tx = outbound_quene_tx.clone(); + let url = url.clone(); + tokio::spawn(async move { + // stream would be owned by one single task and + // 1. outbound messages would be sent by the task, and can be forwarded from other tasks + // 2. inbound messages would be received by the task, and be dropped in this task. + while let Some(message) = ws_rx.next().await { + match message { + Ok(message) => { + trace!("[Tardis.WSClient] WS receive: {}", message); + let fut_response = on_message(message); + let outbound_quene_tx = outbound_quene_tx.clone(); + let url = url.clone(); + tokio::spawn(async move { + if let Some(resp) = fut_response.await { + trace!("[Tardis.WSClient] WS send: {}", resp); + if let Err(e) = outbound_quene_tx.send(resp) { + debug!("[Tardis.WSClient] client: {url} error when send to outbound message quene: {e}") + // outbound channel was closed + } + } + }); + } + Err(e) => { + warn!("[Tardis.WSClient] client: {url} error when receive from websocket: {e}") + } } } + }) + }; + tokio::spawn(async move { + let permit = permit; + tokio::select! { + _ = ib_handle => {}, + _ = ob_handle => {} } + drop(permit) }); - Ok(TardisWSClient { - str_url: str_url.to_string(), - fun, - write: Mutex::new(write), - }) + + Ok(outbound_quene_tx) } pub async fn send_obj(&self, msg: &E) -> TardisResult<()> { @@ -107,36 +180,46 @@ impl TardisWSClient { } pub async fn send_raw_with_retry(&self, message: Message) -> TardisResult<()> { - if let Err(error) = self.send_raw(message.clone()).await { - warn!("[Tardis.WSClient] Failed to send message {}: {}", message.clone(), error); - match error { - Error::AlreadyClosed | Error::Io(_) => { - if let Err(error) = self.reconnect().await { - Err(error) - } else { - self.send_raw(message.clone()) - .await - .map_err(|error| TardisError::format_error(&format!("[Tardis.WSClient] Failed to send message {message}: {error}"), "500-tardis-ws-client-send-error")) - } - } - _ => Err(TardisError::format_error( - &format!("[Tardis.WSClient] Failed to send message {message}: {error}"), - "500-tardis-ws-client-send-error", - )), + // wait until the client is ready + const MAX_RETRY_TIME: usize = 1; + let mut retry_time = 0; + while retry_time < MAX_RETRY_TIME { + let connected = self.send_raw(message.clone()).await?; + if !connected { + self.reconnect().await?; + retry_time += 1; + continue; + } else { + return Ok(()); } - } else { - Ok(()) } + Err(TardisError::format_error( + &format!("[Tardis.WSClient] Failed to send message {message}: exceed max retry time {MAX_RETRY_TIME}"), + "500-tardis-ws-client-send-error", + )) } - pub async fn send_raw(&self, message: Message) -> Result<(), tungstenite::Error> { - self.write.lock().await.lock().await.send(message).await + /// Send a message to the websocket server. + /// if the client is not ready or disconnected, a `Ok(false)` value would be returned. + pub async fn send_raw(&self, message: Message) -> TardisResult { + if !self.is_connected() { + return Ok(false); + } + match self.sender.read().await.send(message) { + Ok(_) => Ok(true), + Err(_) => Err(TardisError::format_error( + &format!("[Tardis.WSClient] Client {url} failed to send message", url = self.url), + "500-tardis-ws-client-send-error", + )), + } } async fn reconnect(&self) -> TardisResult<()> { - let new_fun = self.fun.clone(); - let new_client = Self::do_connect(&self.str_url, new_fun, true).await?; - *self.write.lock().await = new_client.write.lock().await.clone(); + if let Ok(permit) = self.connection_semaphore.clone().try_acquire_owned() { + info!("[Tardis.WSClient] trying to reconnect {url}", url = self.url); + let sender = Self::do_connect(&self.url, self.on_message.clone(), true, permit).await?; + *self.sender.write().await = sender; + } Ok(()) } } diff --git a/tardis/tests/test_cluster.rs b/tardis/tests/test_cluster.rs index 85918604..a0ff578f 100644 --- a/tardis/tests/test_cluster.rs +++ b/tardis/tests/test_cluster.rs @@ -1,4 +1,5 @@ use std::{ + borrow::Cow, env, ffi::OsStr, path::Path, @@ -10,26 +11,30 @@ use async_trait::async_trait; use futures_util::future::join_all; use serde_json::{json, Value}; use tardis::{ - basic::result::TardisResult, - cluster::cluster_processor::{self, TardisClusterMessageReq, TardisClusterSubscriber}, - config::config_dto::{CacheModuleConfig, ClusterConfig, FrameworkConfig, TardisConfig, WebServerCommonConfig, WebServerConfig, WebServerModuleConfig}, + basic::{result::TardisResult, tracing::TardisTracing}, + cluster::{ + cluster_processor::{self, ClusterEventTarget, TardisClusterMessageReq, TardisClusterSubscriber}, + cluster_publish::publish_event_one_response, + }, + config::config_dto::{CacheModuleConfig, ClusterConfig, FrameworkConfig, LogConfig, TardisConfig, WebServerCommonConfig, WebServerConfig, WebServerModuleConfig}, consts::IP_LOCALHOST, test::test_container::TardisTestContainer, TardisFuns, }; use testcontainers::clients; -use tokio::{process::Command, time::sleep}; +use tokio::{io::AsyncReadExt, process::Command, time::sleep}; use tracing::info; +use tracing_subscriber::filter::Directive; #[tokio::test(flavor = "multi_thread")] async fn test_cluster() -> TardisResult<()> { env::set_var("RUST_LOG", "info,tardis=debug"); let cluster_url = env::var("cluster_url"); - if let Ok(cluster_url) = cluster_url { start_node(cluster_url, &env::var("node_id").unwrap()).await?; } else { - let program = env::args().next().as_ref().map(Path::new).and_then(Path::file_name).and_then(OsStr::to_str).map(String::from).unwrap(); + // let program = env::args().next().as_ref().map(Path::new).and_then(Path::file_name).and_then(OsStr::to_str).map(String::from).unwrap(); + let program = env::current_exe()?; let docker = clients::Cli::default(); let redis_container = TardisTestContainer::redis_custom(&docker); @@ -47,16 +52,17 @@ async fn test_cluster() -> TardisResult<()> { Ok(()) } -async fn invoke_node(cluster_url: &str, node_id: &str, program: &str) -> TardisResult { - let output = if cfg!(target_os = "windows") { +async fn invoke_node(cluster_url: &str, node_id: &str, program: &Path) -> TardisResult { + let mut child = if cfg!(target_os = "windows") { Command::new("cmd") .env("cluster_url", cluster_url) .env("node_id", node_id) .env("LS_COLORS", "rs=0:di=38;5;27:mh=44;38;5;15") - .args(["/C", program]) - .output() - .await - .expect("failed to execute process") + .arg("/C") + .arg(program) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn()? } else { Command::new("sh") .env("cluster_url", cluster_url) @@ -64,24 +70,51 @@ async fn invoke_node(cluster_url: &str, node_id: &str, program: &str) -> TardisR .env("LS_COLORS", "rs=0:di=38;5;27:mh=44;38;5;15") .arg("-c") .arg(program) - .output() - .await - .expect("failed to execute process") + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn()? }; - let output_msg = String::from_utf8(strip_ansi_escapes::strip(output.stdout)).unwrap(); - println!("{node_id} stdout:"); - output_msg.lines().for_each(|line| println!("{line}")); - - Ok(!output_msg.contains("test result: FAILED")) + let mut buf = [0; 1024]; + let mut err_buf = [0; 1024]; + let mut stdout = child.stdout.take().unwrap(); + let mut stderr = child.stderr.take().unwrap(); + loop { + tokio::select! { + result = stdout.read(&mut buf) => { + let size = result?; + if size != 0 { + println!("node[{node_id}]/stdout:"); + println!("{}", String::from_utf8_lossy(&buf[..size])); + } + } + result = stderr.read(&mut err_buf) => { + let size = result?; + if size != 0 { + println!("node[{node_id}]/stdout:"); + println!("{}", String::from_utf8_lossy(&err_buf[..size])); + } + } + exit_code = child.wait() => { + if let Ok(exit_code) = exit_code { + return Ok(exit_code.success()) + } else { + return Ok(false) + } + } + + }; + } } async fn start_node(cluster_url: String, node_id: &str) -> TardisResult<()> { - cluster_processor::set_node_id(&format!("node_{node_id}")).await; + cluster_processor::set_local_node_id(format!("node_{node_id}")); + let port = portpicker::pick_unused_port().unwrap(); + TardisTracing::initializer().with_fmt_layer().with_env_layer().init(); TardisFuns::init_conf(TardisConfig { cs: Default::default(), fw: FrameworkConfig::builder() .web_server( - WebServerConfig::builder().common(WebServerCommonConfig::builder().access_host(IP_LOCALHOST).port(80).build()).default(WebServerModuleConfig::default()).build(), + WebServerConfig::builder().common(WebServerCommonConfig::builder().access_host(IP_LOCALHOST).port(port).build()).default(WebServerModuleConfig::default()).build(), ) .cache(CacheModuleConfig::builder().url(cluster_url.parse().unwrap()).build()) .cluster(ClusterConfig { @@ -89,6 +122,16 @@ async fn start_node(cluster_url: String, node_id: &str) -> TardisResult<()> { k8s_svc: None, cache_check_interval_sec: Some(1), }) + .log( + LogConfig::builder() + .level("info".parse::().unwrap_or_default()) + .directives(if node_id == "2" { + ["tardis=trace".parse::().unwrap()] + } else { + ["tardis=debug".parse::().unwrap()] + }) + .build(), + ) .build(), }) .await @@ -118,24 +161,24 @@ async fn start_node(cluster_url: String, node_id: &str) -> TardisResult<()> { static PING_COUNTER: AtomicUsize = AtomicUsize::new(0); async fn test_ping(node_id: &str) -> TardisResult<()> { - TardisFuns::cluster_subscribe_event("ping", Box::new(ClusterSubscriberPingTest {})).await; + TardisFuns::cluster_subscribe_event(ClusterSubscriberPingTest).await; if node_id == "1" { // expect hit 0 times - let result = TardisFuns::cluster_publish_event("ping", json!(1000), None).await; + let result = TardisFuns::cluster_publish_event("ping", json!(1000), ClusterEventTarget::Broadcast).await; assert!(result.is_err()); sleep(Duration::from_secs(5)).await; // expect hit 2 times (to node_2, node_3) - TardisFuns::cluster_publish_event("ping", json!(400), None).await?; + TardisFuns::cluster_publish_event("ping", json!(400), ClusterEventTarget::Broadcast).await?; sleep(Duration::from_secs(5)).await; assert_eq!(PING_COUNTER.load(Ordering::SeqCst), 50 + 4); } else if node_id == "2" { // expect hit 1 times (to node_1) - TardisFuns::cluster_publish_event("ping", json!(50), None).await?; + TardisFuns::cluster_publish_event("ping", json!(50), ClusterEventTarget::Broadcast).await?; sleep(Duration::from_secs(5)).await; assert_eq!(PING_COUNTER.load(Ordering::SeqCst), 400 + 4); } else { // expect hit 2 times (to node_1, node_2) - TardisFuns::cluster_publish_event("ping", json!(4), None).await?; + TardisFuns::cluster_publish_event("ping", json!(4), ClusterEventTarget::Broadcast).await?; sleep(Duration::from_secs(5)).await; assert_eq!(PING_COUNTER.load(Ordering::SeqCst), 400); } @@ -146,6 +189,9 @@ struct ClusterSubscriberPingTest; #[async_trait] impl TardisClusterSubscriber for ClusterSubscriberPingTest { + fn event_name(&self) -> Cow<'static, str> { + "ping".into() + } async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult> { info!("message_req:{message_req:?}"); PING_COUNTER.fetch_add(message_req.msg.as_i64().unwrap() as usize, Ordering::SeqCst); @@ -157,6 +203,9 @@ struct ClusterSubscriberEchoTest; #[async_trait] impl TardisClusterSubscriber for ClusterSubscriberEchoTest { + fn event_name(&self) -> Cow<'static, str> { + "echo".into() + } async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult> { info!("message_req:{message_req:?}"); Ok(Some(serde_json::Value::String(format!("echo {}", message_req.req_node_id)))) @@ -164,17 +213,17 @@ impl TardisClusterSubscriber for ClusterSubscriberEchoTest { } async fn test_echo(node_id: &str) -> TardisResult<()> { - TardisFuns::cluster_subscribe_event("echo", Box::new(ClusterSubscriberEchoTest {})).await; + TardisFuns::cluster_subscribe_event(ClusterSubscriberEchoTest).await; if node_id == "1" { - let resp = TardisFuns::cluster_publish_event_and_wait_resp("echo", serde_json::Value::String("hi".to_string()), "node_3").await?; + let resp = TardisFuns::cluster_publish_event_one_resp("echo", serde_json::Value::String("hi".to_string()), "node_3").await?; assert_eq!(resp.msg.as_str().unwrap(), &format!("echo node_{node_id}")); assert_eq!(&resp.resp_node_id, "node_3"); } else if node_id == "2" { - let resp = TardisFuns::cluster_publish_event_and_wait_resp("echo", serde_json::Value::String("hi".to_string()), "node_3").await?; - assert_eq!(resp.msg.as_str().unwrap(), &format!("echo node_{node_id}")); - assert_eq!(&resp.resp_node_id, "node_3"); + let resp = publish_event_one_response("echo", serde_json::Value::String("hi".to_string()), "node_3", Some(Duration::from_secs(1))).await; + // should time out + assert!(resp.is_err()); } else { - let resp = TardisFuns::cluster_publish_event_and_wait_resp("echo", serde_json::Value::String("hi".to_string()), "node_3").await; + let resp = TardisFuns::cluster_publish_event_one_resp("echo", serde_json::Value::String("hi".to_string()), "node_3").await; assert!(resp.is_err()); } Ok(()) From 8b6bb6886a14a15af91cc27310e2bc97912f49dc Mon Sep 17 00:00:00 2001 From: 4t145 Date: Mon, 13 Nov 2023 10:35:36 +0800 Subject: [PATCH 2/6] update --- tardis/src/lib.rs | 4 ++-- tardis/src/web/ws_processor.rs | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tardis/src/lib.rs b/tardis/src/lib.rs index 31f2190c..64b6ad2f 100644 --- a/tardis/src/lib.rs +++ b/tardis/src/lib.rs @@ -795,12 +795,12 @@ impl TardisFuns { } #[cfg(feature = "ws-client")] - pub async fn ws_client(str_url: &str, fun: F) -> TardisResult + pub async fn ws_client(str_url: &str, on_message: F) -> TardisResult where F: Fn(tokio_tungstenite::tungstenite::Message) -> T + Send + Sync + Copy + 'static, T: futures::Future> + Send + Sync + 'static, { - web::ws_client::TardisWSClient::connect(str_url, fun).await + web::ws_client::TardisWSClient::connect(str_url, on_message).await } /// Use the distributed cache feature / 使用分布式缓存功能 diff --git a/tardis/src/web/ws_processor.rs b/tardis/src/web/ws_processor.rs index 2ffe51bf..ac466b4c 100644 --- a/tardis/src/web/ws_processor.rs +++ b/tardis/src/web/ws_processor.rs @@ -116,6 +116,7 @@ where let mut inner_receiver = inner_sender.subscribe(); websocket .on_upgrade(move |socket| async move { + // a connection is a instance let inst_id = TardisFuns::field.nanoid(); let current_receive_inst_id = inst_id.clone(); { From 20f3e61eb23992fa7b33ea21586ad40a51978b70 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Fri, 24 Nov 2023 18:10:22 +0800 Subject: [PATCH 3/6] update --- tardis/Cargo.toml | 1 + tardis/src/cache/cache_client.rs | 24 +++ tardis/src/cluster.rs | 2 + tardis/src/cluster/cluster_broadcast.rs | 110 ++++++++++++ tardis/src/cluster/cluster_hashmap.rs | 164 ++++++++++++++++++ tardis/src/cluster/cluster_processor.rs | 19 +- tardis/src/cluster/cluster_publish.rs | 14 +- tardis/src/cluster/cluster_receive.rs | 2 +- tardis/src/web/ws_processor.rs | 88 +++++++--- .../src/web/ws_processor/cluster_protocol.rs | 81 +++++++++ 10 files changed, 470 insertions(+), 35 deletions(-) create mode 100644 tardis/src/cluster/cluster_broadcast.rs create mode 100644 tardis/src/cluster/cluster_hashmap.rs create mode 100644 tardis/src/web/ws_processor/cluster_protocol.rs diff --git a/tardis/Cargo.toml b/tardis/Cargo.toml index 659c2120..f773973c 100644 --- a/tardis/Cargo.toml +++ b/tardis/Cargo.toml @@ -165,6 +165,7 @@ poem = { version = "1.3", features = [ "websocket", "multipart", "tempfile", + "session", ], optional = true } poem-grpc = { version = "0.2.22", optional = true } diff --git a/tardis/src/cache/cache_client.rs b/tardis/src/cache/cache_client.rs index c35da7e6..e36f898c 100644 --- a/tardis/src/cache/cache_client.rs +++ b/tardis/src/cache/cache_client.rs @@ -65,6 +65,10 @@ impl TardisCacheClient { self.pool.get().await.map_err(|error| RedisError::from((ErrorKind::IoError, "Get connection error", error.to_string()))) } + pub async fn exec() { + + } + pub async fn set(&self, key: &str, value: &str) -> RedisResult<()> { trace!("[Tardis.CacheClient] set, key:{}, value:{}", key, value); self.get_connection().await?.set(key, value).await @@ -158,6 +162,26 @@ impl TardisCacheClient { self.get_connection().await?.llen(key).await } + pub async fn lrem(&self, key: &str, count: isize, value: &str) -> RedisResult { + trace!("[Tardis.CacheClient] lrem, key:{}", key); + self.get_connection().await?.lrem(key, count, value).await + } + + pub async fn linsert_after(&self, key: &str, count: isize, value: &str) -> RedisResult { + trace!("[Tardis.CacheClient] linsert_after, key:{}", key); + self.get_connection().await?.linsert_after(key, count, value).await + } + + pub async fn linsert_before(&self, key: &str, count: isize, value: &str) -> RedisResult { + trace!("[Tardis.CacheClient] linsert_after, key:{}", key); + self.get_connection().await?.linsert_before(key, count, value).await + } + + pub async fn lset(&self, key: &str, count: isize, value: &str) -> RedisResult { + trace!("[Tardis.CacheClient] lset, key:{}", key); + self.get_connection().await?.lset(key, count, value).await + } + // hash operations pub async fn hget(&self, key: &str, field: &str) -> RedisResult> { diff --git a/tardis/src/cluster.rs b/tardis/src/cluster.rs index 1e0b2e39..85771616 100644 --- a/tardis/src/cluster.rs +++ b/tardis/src/cluster.rs @@ -4,3 +4,5 @@ pub mod cluster_receive; mod cluster_watch_by_cache; #[cfg(feature = "k8s")] mod cluster_watch_by_k8s; +pub mod cluster_broadcast; +pub mod cluster_hashmap; \ No newline at end of file diff --git a/tardis/src/cluster/cluster_broadcast.rs b/tardis/src/cluster/cluster_broadcast.rs new file mode 100644 index 00000000..a699391a --- /dev/null +++ b/tardis/src/cluster/cluster_broadcast.rs @@ -0,0 +1,110 @@ +use std::{ + borrow::Cow, + ops::Deref, + sync::{Arc, Weak}, +}; + +use async_trait::async_trait; +use serde_json::{Value, json}; +use tokio::sync::broadcast; +use tracing::subscriber; + +use crate::basic::result::TardisResult; + +use super::{ + cluster_processor::{subscribe, unsubscribe, ClusterEventTarget, TardisClusterMessageReq, TardisClusterSubscriber}, + cluster_publish::publish_event_no_response, +}; + + +pub struct ClusterBroadcastChannel +where + T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned, +{ + pub ident: String, + pub local_broadcast_channel: broadcast::Sender, +} + +impl ClusterBroadcastChannel +where + T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned, +{ + pub fn event_name(&self) -> String { + format!("tardis/broadcast/{}", self.ident) + } + pub fn send(&self, message: T) { + let _ = self.local_broadcast_channel.send(message.clone()); + let event = format!("tardis/broadcast/{}", self.ident); + tokio::spawn(async move { + if let Ok(json_value) = serde_json::to_value(message) { + let json = json_value; + let _ = publish_event_no_response(event, json, ClusterEventTarget::Broadcast).await; + } + }); + } + pub fn new(ident: impl Into, capacity: usize) -> Arc { + let sender = broadcast::Sender::new(capacity); + let cluster_chan = Arc::new(Self { + ident: ident.into(), + local_broadcast_channel: sender, + }); + + let subscriber = BroadcastChannelSubscriber { + channel: Arc::downgrade(&cluster_chan), + event_name: cluster_chan.event_name(), + }; + tokio::spawn(subscribe(subscriber)); + cluster_chan + } +} + +impl Drop for ClusterBroadcastChannel +where + T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned, +{ + fn drop(&mut self) { + let event_name = self.event_name(); + tokio::spawn(async move { + unsubscribe(&event_name).await; + }); + } +} + +impl std::ops::Deref for ClusterBroadcastChannel +where + T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned, +{ + type Target = broadcast::Sender; + + fn deref(&self) -> &Self::Target { + &self.local_broadcast_channel + } +} + +pub struct BroadcastChannelSubscriber +where + T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned, +{ + event_name: String, + channel: Weak>, +} + +#[async_trait] +impl TardisClusterSubscriber for BroadcastChannelSubscriber +where + T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned, +{ + fn event_name(&self) -> Cow<'static, str> { + self.event_name.to_string().into() + } + async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult> { + if let Ok(message) = serde_json::from_value(message_req.msg) { + if let Some(chan) = self.channel.upgrade() { + let _ = chan.send(message); + } else { + unsubscribe(&self.event_name()).await; + } + } + Ok(None) + } +} diff --git a/tardis/src/cluster/cluster_hashmap.rs b/tardis/src/cluster/cluster_hashmap.rs new file mode 100644 index 00000000..9d90b70c --- /dev/null +++ b/tardis/src/cluster/cluster_hashmap.rs @@ -0,0 +1,164 @@ +use std::{ + borrow::Cow, + collections::HashMap, + sync::Arc, + time::{Duration, Instant}, +}; + +use crate::basic::{json::TardisJson, result::TardisResult}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde_json::Value; +use std::hash::Hash; +use tokio::sync::{Mutex, RwLock}; + +use super::{ + cluster_processor::{peer_count, ClusterEventTarget, TardisClusterMessageReq, TardisClusterSubscriber}, + cluster_publish::{publish_event_no_response, ClusterEvent}, + cluster_receive::listen::Stream, +}; + +// Cshm = ClusterStaticHashMap +#[derive(Debug, Clone)] +pub struct ClusterStaticHashMap { + pub map: Arc>>, + pub ident: &'static str, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +enum CshmEvent { + Insert(Vec<(K, V)>), + Remove { keys: Vec }, + Get { key: K }, +} + +impl ClusterStaticHashMap +where + K: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + Hash + Eq, + V: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned, +{ + pub fn new(ident: &'static str) -> Self { + Self { + map: Arc::new(RwLock::new(HashMap::new())), + ident, + } + } + pub fn event_name(&self) -> String { + format!("tardis/hashmap/{ident}", ident = self.ident) + } + pub fn local(&self) -> &RwLock> { + &self.map + } + pub async fn insert(&self, key: K, value: V) -> TardisResult<()> { + self.map.write().await.insert(key.clone(), value.clone()); + let event = CshmEvent::::Insert(vec![(key, value)]); + let json = TardisJson.obj_to_json(&event)?; + dbg!(&json); + let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await; + Ok(()) + } + pub async fn batch_insert(&self, pairs: Vec<(K, V)>) -> TardisResult<()> { + { + let mut wg = self.map.write().await; + for (key, value) in pairs.iter() { + wg.insert(key.clone(), value.clone()); + } + } + let event = CshmEvent::::Insert(pairs); + let json = TardisJson.obj_to_json(&event)?; + let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await; + Ok(()) + } + pub async fn remove(&self, key: K) -> TardisResult<()> { + self.map.write().await.remove(&key); + let event = CshmEvent::::Remove { keys: vec![key] }; + let json = TardisJson.obj_to_json(&event)?; + let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await; + Ok(()) + } + pub async fn batch_remove(&self, keys: Vec) -> TardisResult<()> { + { + let mut wg = self.map.write().await; + for key in keys.iter() { + wg.remove(key); + } + } + let event = CshmEvent::::Remove { keys }; + let json = TardisJson.obj_to_json(&event)?; + let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await; + Ok(()) + } + pub async fn get(&self, key: K) -> TardisResult> { + if let Some(v) = self.map.read().await.get(&key) { + Ok(Some(v.clone())) + } else { + self.get_remote(key.clone()).await + } + } + async fn get_remote(&self, key: K) -> TardisResult> { + let peer_count = peer_count().await; + if peer_count == 0 { + return Ok(None); + } + let Ok(mut receiver) = ClusterEvent::new(self.event_name()) + .message(&CshmEvent::::Get { key }) + .expect("not valid json value") + .listener(Stream) + .target(ClusterEventTarget::Broadcast) + .publish() + .await + else { + return Ok(None); + }; + + let create_time = Instant::now(); + let mut count = 0; + while let Some(resp) = receiver.recv().await { + if let Ok(Some(v)) = TardisJson.json_to_obj::>(resp.msg) { + return Ok(Some(v)); + } + count += 1; + if count >= peer_count { + return Ok(None); + } + if create_time.elapsed() > Duration::from_secs(1) { + return Ok(None); + } + } + Ok(None) + } +} + +#[async_trait::async_trait] +impl TardisClusterSubscriber for ClusterStaticHashMap +where + K: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + Hash + Eq, + V: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned, +{ + async fn subscribe(&self, message: TardisClusterMessageReq) -> TardisResult> { + let event: CshmEvent = TardisJson.json_to_obj(message.msg)?; + match event { + CshmEvent::Insert(pairs) => { + let mut wg = self.map.write().await; + for (key, value) in pairs { + wg.insert(key, value); + } + Ok(None) + } + CshmEvent::Remove { keys } => { + let mut wg = self.map.write().await; + for key in keys { + wg.remove(&key); + } + Ok(None) + } + CshmEvent::Get { key } => { + let rg = self.map.read().await; + let value = rg.get(&key); + Ok(Some(TardisJson.obj_to_json(&value)?)) + } + } + } + fn event_name(&self) -> Cow<'static, str> { + ClusterStaticHashMap::event_name(self).into() + } +} diff --git a/tardis/src/cluster/cluster_processor.rs b/tardis/src/cluster/cluster_processor.rs index 5d2ab115..021db2b8 100644 --- a/tardis/src/cluster/cluster_processor.rs +++ b/tardis/src/cluster/cluster_processor.rs @@ -22,6 +22,7 @@ use crate::config::config_dto::FrameworkConfig; use crate::tardis_static; use crate::web::web_server::TardisWebServer; use crate::web::ws_client::TardisWSClient; +use crate::web::ws_processor::cluster_protocol::Avatar; use crate::{basic::result::TardisResult, TardisFuns}; use async_trait::async_trait; @@ -46,6 +47,10 @@ pub async fn load_cache_nodes_info() -> HashMap usize { + cache_nodes().read().await.keys().filter(|k|matches!(k, ClusterRemoteNodeKey::NodeId(_))).count() +} + #[derive(Debug, Clone, Eq, Hash, PartialEq)] pub enum ClusterRemoteNodeKey { SocketAddr(SocketAddr), @@ -96,6 +101,7 @@ pub trait TardisClusterSubscriber: Send + Sync + 'static { async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult>; } + #[derive(Debug, Clone, Default)] pub enum ClusterEventTarget { #[default] @@ -193,6 +199,11 @@ async fn init_node(cluster_server: &TardisWebServer, access_addr: SocketAddr) -> debug!("[Tardis.Cluster] Register default events"); subscribe(EventPing).await; + #[cfg(feature = "web-server")] + { + subscribe(Avatar).await; + } + info!("[Tardis.Cluster] Initialized node"); Ok(()) } @@ -279,6 +290,11 @@ pub async fn subscribe(subscriber: S) { subscribers().write().await.insert(event_name, Box::new(subscriber)); } +pub async fn unsubscribe(event_name: &str) { + info!("[Tardis.Cluster] [Server] unsubscribe event {event_name}"); + subscribers().write().await.remove(event_name); +} + #[derive(Deserialize, Serialize, Clone, Debug)] pub struct TardisClusterMessageReq { pub(crate) msg_id: String, @@ -354,9 +370,6 @@ impl TardisClusterNode {} use std::hash::Hash; -use super::cluster_receive::listen::{self, Listener}; -use super::cluster_receive::listen_reply; - #[derive(Debug, Clone)] struct ClusterAPI; diff --git a/tardis/src/cluster/cluster_publish.rs b/tardis/src/cluster/cluster_publish.rs index 0a532c02..2503d2aa 100644 --- a/tardis/src/cluster/cluster_publish.rs +++ b/tardis/src/cluster/cluster_publish.rs @@ -111,8 +111,14 @@ pub async fn publish_event_one_response( }) } -pub async fn publish_event_with_listener(event: impl Into>, message: Value, target: impl Into, listener: S) -> TardisResult { +pub async fn publish_event_with_listener( + event: impl Into>, + message: Value, + target: impl Into, + listener: S, +) -> TardisResult { let node_id = local_node_id().await.to_string(); + dbg!(&node_id); let event = event.into(); let target = target.into(); let target_debug = format!("{target:?}"); @@ -130,12 +136,6 @@ pub async fn publish_event_with_listener(event: impl Into vec![client], }; if nodes.is_empty() { - error!( - "[Tardis.Cluster] [Client] publish event {event} , message {message} , to {target} error: can't find any target node", - event = event, - message = message, - target = target_debug - ); return Err(TardisError::wrap( &format!( "[Tardis.Cluster] [Client] publish event {event} , message {message} , to {target} error: can't find any target node", diff --git a/tardis/src/cluster/cluster_receive.rs b/tardis/src/cluster/cluster_receive.rs index f0bcc1ab..fc7df064 100644 --- a/tardis/src/cluster/cluster_receive.rs +++ b/tardis/src/cluster/cluster_receive.rs @@ -132,7 +132,7 @@ pub mod listen { #[derive(Debug, Default, Clone, Copy)] /// send a message and receive all the responses until the receiver is dropped. - pub struct Stream {} + pub struct Stream; #[async_trait] impl Listener for Stream { diff --git a/tardis/src/web/ws_processor.rs b/tardis/src/web/ws_processor.rs index ac466b4c..623d2089 100644 --- a/tardis/src/web/ws_processor.rs +++ b/tardis/src/web/ws_processor.rs @@ -1,3 +1,5 @@ +pub mod cluster_protocol; + use std::sync::Arc; use std::{collections::HashMap, num::NonZeroUsize}; @@ -10,7 +12,8 @@ use tokio::sync::{broadcast::Sender, Mutex, RwLock}; use tracing::trace; use tracing::warn; -use crate::TardisFuns; +use crate::cluster::cluster_broadcast::ClusterBroadcastChannel; +use crate::{TardisFuns, tardis_static}; pub const WS_SYSTEM_EVENT_INFO: &str = "__sys_info__"; pub const WS_SYSTEM_EVENT_AVATAR_ADD: &str = "__sys_avatar_add__"; @@ -22,11 +25,13 @@ pub const WS_SYSTEM_EVENT_ERROR: &str = "__sys_error__"; #[allow(clippy::undocumented_unsafe_blocks)] pub const WS_SENDER_CACHE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1000000) }; +tardis_static! { + // Websocket instance Id -> Avatars + ws_insts_mapping_avatars: Arc>>>; +} lazy_static! { // Single instance reply guard static ref REPLY_ONCE_GUARD: Arc>> = Arc::new(Mutex::new(LruCache::new(WS_SENDER_CACHE_SIZE))); - // Websocket instance Id -> Avatars - static ref WS_INSTS_MAPPING_AVATARS: Arc>>> = Arc::new(RwLock::new(HashMap::new())); } pub fn ws_echo(avatars: String, ext: HashMap, websocket: WebSocket, process_fun: PF, close_fun: CF) -> BoxWebSocketUpgraded @@ -69,19 +74,16 @@ where .boxed() } -fn ws_send_to_channel(send_msg: TardisWebsocketMgrMessage, inner_sender: &Sender) -> bool { +fn ws_send_to_channel(send_msg: TardisWebsocketMgrMessage, inner_sender: &TX) +where TX: WsBroadcastSender +{ inner_sender - .send(send_msg.clone()) - .map_err(|error| { - warn!( - "[Tardis.WebServer] WS message send to channel: {} to {:?} ignore {:?} failed: {error}", - send_msg.msg, send_msg.to_avatars, send_msg.ignore_avatars - ); - }) - .is_ok() + .send(send_msg.clone()); } -pub fn ws_send_error_to_channel(req_message: &str, error_message: &str, from_avatar: &str, from_inst_id: &str, inner_sender: &Sender) -> bool { +pub fn ws_send_error_to_channel(req_message: &str, error_message: &str, from_avatar: &str, from_inst_id: &str, inner_sender: &TX) +where TX: WsBroadcastSender +{ let send_msg = TardisWebsocketMgrMessage { id: TardisFuns::field.nanoid(), msg: json!(error_message), @@ -97,13 +99,54 @@ pub fn ws_send_error_to_channel(req_message: &str, error_message: &str, from_ava ws_send_to_channel(send_msg, inner_sender) } +pub trait WsBroadcastSender: Send + Sync + 'static { + fn subscribe(&self) -> tokio::sync::broadcast::Receiver; + // irresponsable to return error + fn send(&self, msg: TardisWebsocketMgrMessage); +} + +impl WsBroadcastSender for tokio::sync::broadcast::Sender { + fn subscribe(&self) -> tokio::sync::broadcast::Receiver { + self.subscribe() + } + + fn send(&self, msg: TardisWebsocketMgrMessage) { + if let Err(err) = self.send(msg) { + let msg = err.0; + warn!( + "[Tardis.WebServer] WS message send to channel: {} to {:?} ignore {:?} failed", + msg.msg, msg.to_avatars, msg.ignore_avatars + ); + } + } +} + +impl WsBroadcastSender for ClusterBroadcastChannel { + fn subscribe(&self) -> tokio::sync::broadcast::Receiver { + self.local_broadcast_channel.subscribe() + } + + fn send(&self, msg: TardisWebsocketMgrMessage) { + self.send(msg); + } +} +impl WsBroadcastSender for Arc> { + fn subscribe(&self) -> tokio::sync::broadcast::Receiver { + self.local_broadcast_channel.subscribe() + } + + fn send(&self, msg: TardisWebsocketMgrMessage) { + ClusterBroadcastChannel::send(&self, msg); + } +} + pub async fn ws_broadcast( avatars: Vec, mgr_node: bool, subscribe_mode: bool, ext: HashMap, websocket: WebSocket, - inner_sender: Sender, + inner_sender: impl WsBroadcastSender, process_fun: PF, close_fun: CF, ) -> BoxWebSocketUpgraded @@ -116,16 +159,17 @@ where let mut inner_receiver = inner_sender.subscribe(); websocket .on_upgrade(move |socket| async move { - // a connection is a instance + // corresponed to the current ws connection let inst_id = TardisFuns::field.nanoid(); let current_receive_inst_id = inst_id.clone(); { - WS_INSTS_MAPPING_AVATARS.write().await.insert(inst_id.clone(), avatars); + ws_insts_mapping_avatars().write().await.insert(inst_id.clone(), avatars); } let (mut ws_sink, mut ws_stream) = socket.split(); - let insts_in_send = WS_INSTS_MAPPING_AVATARS.clone(); + let insts_in_send = ws_insts_mapping_avatars().clone(); tokio::spawn(async move { + // message inbound while let Some(Ok(message)) = ws_stream.next().await { match message { Message::Text(text) => { @@ -190,9 +234,7 @@ where from_inst_id: if let Some(spec_inst_id) = req_msg.spec_inst_id { spec_inst_id } else { inst_id.clone() }, echo: true, }; - if !ws_send_to_channel(send_msg, &inner_sender) { - break; - } + ws_send_to_channel(send_msg, &inner_sender); continue; // For security reasons, adding an avatar needs to be handled by the management node } else if mgr_node && req_msg.event == Some(WS_SYSTEM_EVENT_AVATAR_ADD.to_string()) { @@ -250,9 +292,7 @@ where from_inst_id: if let Some(spec_inst_id) = req_msg.spec_inst_id { spec_inst_id } else { inst_id.clone() }, echo: false, }; - if !ws_send_to_channel(send_msg, &inner_sender) { - break; - } + ws_send_to_channel(send_msg, &inner_sender); } } } @@ -275,7 +315,7 @@ where }); let reply_once_guard = REPLY_ONCE_GUARD.clone(); - let insts_in_receive = WS_INSTS_MAPPING_AVATARS.clone(); + let insts_in_receive = ws_insts_mapping_avatars().clone(); tokio::spawn(async move { while let Ok(mgr_message) = inner_receiver.recv().await { diff --git a/tardis/src/web/ws_processor/cluster_protocol.rs b/tardis/src/web/ws_processor/cluster_protocol.rs new file mode 100644 index 00000000..410c245e --- /dev/null +++ b/tardis/src/web/ws_processor/cluster_protocol.rs @@ -0,0 +1,81 @@ +//! +//! +//! +//! +//! Protocol: +//! +//! 1. Route +//! 2. Avatar +//! 3. Forward + +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::{borrow::Cow, collections::HashMap}; + +use crate::{ + basic::result::TardisResult, + cluster::cluster_processor::{TardisClusterMessageReq, TardisClusterSubscriber}, +}; + +use super::ws_insts_mapping_avatars; + +pub const EVENT_AVATAR: &str = "tardis/avatar"; + +pub(crate) struct Avatar; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) enum AvatarMessage { + Sync { table: HashMap> }, + +} + +#[async_trait::async_trait] +impl TardisClusterSubscriber for Avatar { + fn event_name(&self) -> Cow<'static, str> { + EVENT_AVATAR.into() + } + + async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult> { + // let from_node = message_req.req_node_id; + if let Ok(message) = serde_json::from_value(message_req.msg) { + match message { + AvatarMessage::Sync { table } => { + let mut routes = ws_insts_mapping_avatars().write().await; + for (k, v) in table { + routes.insert(k, v); + } + } + } + } + Ok(None) + } +} + +// pub(crate) struct Forward {} + +// #[derive(Debug, Clone, Serialize, Deserialize)] +// pub(crate) enum ForwardMessage { +// Forward { +// to_inst: String, +// message: String, +// }, +// } + +// #[async_trait::async_trait] +// impl TardisClusterSubscriber for Forward { +// fn event_name(&self) -> Cow<'static, str> { +// "cluster/forward".into() +// } + +// async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult> { +// // let from_node = message_req.req_node_id; +// if let Ok(message) = serde_json::from_value(message_req.msg) { +// match message { +// ForwardMessage::Forward { to_inst, message } => { + +// } +// } +// } +// Ok(None) +// } +// } \ No newline at end of file From e863bb607f38903f010e67a7645675b8603cb963 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Sat, 25 Nov 2023 16:01:05 +0800 Subject: [PATCH 4/6] cleanup useless codes. format, --- tardis/src/cache/cache_client.rs | 4 -- tardis/src/cluster.rs | 4 +- tardis/src/cluster/cluster_broadcast.rs | 5 +- tardis/src/cluster/cluster_hashmap.rs | 4 +- tardis/src/cluster/cluster_processor.rs | 8 +-- tardis/src/cluster/cluster_publish.rs | 5 -- tardis/src/cluster/cluster_receive.rs | 17 ++---- tardis/src/cluster/cluster_watch_by_cache.rs | 2 +- .../config/config_dto/component/web_server.rs | 8 +-- tardis/src/utils/tardis_component.rs | 2 +- tardis/src/utils/tardis_static.rs | 2 +- tardis/src/web/ws_client.rs | 9 +-- tardis/src/web/ws_processor.rs | 36 +++-------- .../src/web/ws_processor/cluster_protocol.rs | 61 +++++++------------ tardis/tests/test_cluster.rs | 1 - 15 files changed, 55 insertions(+), 113 deletions(-) diff --git a/tardis/src/cache/cache_client.rs b/tardis/src/cache/cache_client.rs index e36f898c..8afc0d0c 100644 --- a/tardis/src/cache/cache_client.rs +++ b/tardis/src/cache/cache_client.rs @@ -65,10 +65,6 @@ impl TardisCacheClient { self.pool.get().await.map_err(|error| RedisError::from((ErrorKind::IoError, "Get connection error", error.to_string()))) } - pub async fn exec() { - - } - pub async fn set(&self, key: &str, value: &str) -> RedisResult<()> { trace!("[Tardis.CacheClient] set, key:{}, value:{}", key, value); self.get_connection().await?.set(key, value).await diff --git a/tardis/src/cluster.rs b/tardis/src/cluster.rs index 85771616..43537cae 100644 --- a/tardis/src/cluster.rs +++ b/tardis/src/cluster.rs @@ -1,8 +1,8 @@ +pub mod cluster_broadcast; +pub mod cluster_hashmap; pub mod cluster_processor; pub mod cluster_publish; pub mod cluster_receive; mod cluster_watch_by_cache; #[cfg(feature = "k8s")] mod cluster_watch_by_k8s; -pub mod cluster_broadcast; -pub mod cluster_hashmap; \ No newline at end of file diff --git a/tardis/src/cluster/cluster_broadcast.rs b/tardis/src/cluster/cluster_broadcast.rs index a699391a..664b32af 100644 --- a/tardis/src/cluster/cluster_broadcast.rs +++ b/tardis/src/cluster/cluster_broadcast.rs @@ -1,13 +1,11 @@ use std::{ borrow::Cow, - ops::Deref, sync::{Arc, Weak}, }; use async_trait::async_trait; -use serde_json::{Value, json}; +use serde_json::Value; use tokio::sync::broadcast; -use tracing::subscriber; use crate::basic::result::TardisResult; @@ -16,7 +14,6 @@ use super::{ cluster_publish::publish_event_no_response, }; - pub struct ClusterBroadcastChannel where T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned, diff --git a/tardis/src/cluster/cluster_hashmap.rs b/tardis/src/cluster/cluster_hashmap.rs index 9d90b70c..c8d338f1 100644 --- a/tardis/src/cluster/cluster_hashmap.rs +++ b/tardis/src/cluster/cluster_hashmap.rs @@ -6,10 +6,10 @@ use std::{ }; use crate::basic::{json::TardisJson, result::TardisResult}; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use serde_json::Value; use std::hash::Hash; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::RwLock; use super::{ cluster_processor::{peer_count, ClusterEventTarget, TardisClusterMessageReq, TardisClusterSubscriber}, diff --git a/tardis/src/cluster/cluster_processor.rs b/tardis/src/cluster/cluster_processor.rs index 021db2b8..e0c1208d 100644 --- a/tardis/src/cluster/cluster_processor.rs +++ b/tardis/src/cluster/cluster_processor.rs @@ -4,16 +4,15 @@ use std::net::SocketAddr; use std::sync::{Arc, OnceLock}; use std::time::Duration; -use futures_util::future::join_all; use futures_util::{SinkExt, StreamExt}; use poem::web::websocket::{BoxWebSocketUpgraded, Message, WebSocket}; use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::sync::{broadcast, mpsc, oneshot, RwLock}; +use tokio::sync::{mpsc, RwLock}; use tracing::{debug, error, info, trace, warn}; use crate::basic::error::TardisError; -use crate::cluster::cluster_publish::{do_publish_event, publish_event_one_response, ClusterEvent}; +use crate::cluster::cluster_publish::ClusterEvent; use crate::cluster::cluster_receive::init_response_dispatcher; use crate::cluster::cluster_watch_by_cache; #[cfg(feature = "k8s")] @@ -48,7 +47,7 @@ pub async fn load_cache_nodes_info() -> HashMap usize { - cache_nodes().read().await.keys().filter(|k|matches!(k, ClusterRemoteNodeKey::NodeId(_))).count() + cache_nodes().read().await.keys().filter(|k| matches!(k, ClusterRemoteNodeKey::NodeId(_))).count() } #[derive(Debug, Clone, Eq, Hash, PartialEq)] @@ -101,7 +100,6 @@ pub trait TardisClusterSubscriber: Send + Sync + 'static { async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult>; } - #[derive(Debug, Clone, Default)] pub enum ClusterEventTarget { #[default] diff --git a/tardis/src/cluster/cluster_publish.rs b/tardis/src/cluster/cluster_publish.rs index 2503d2aa..a7f9d38e 100644 --- a/tardis/src/cluster/cluster_publish.rs +++ b/tardis/src/cluster/cluster_publish.rs @@ -13,12 +13,7 @@ use crate::{ use futures::future::join_all; use serde::Serialize; use serde_json::Value; -use tokio::sync::broadcast; use tracing::{error, trace}; -pub struct EventResponse { - message_id: String, - rx: broadcast::Receiver, -} #[derive(Debug, Clone)] pub struct ClusterEvent { diff --git a/tardis/src/cluster/cluster_receive.rs b/tardis/src/cluster/cluster_receive.rs index fc7df064..a5bde0d7 100644 --- a/tardis/src/cluster/cluster_receive.rs +++ b/tardis/src/cluster/cluster_receive.rs @@ -1,7 +1,6 @@ -use std::{collections::HashMap, pin::Pin}; +use std::collections::HashMap; -use async_trait::async_trait; -use tokio::sync::{broadcast, mpsc, oneshot, RwLock}; +use tokio::sync::{mpsc, RwLock}; use crate::tardis_static; @@ -62,16 +61,12 @@ pub(crate) fn init_response_dispatcher() -> mpsc::Sender, - #[builder(default = Some(String::from("ui")), setter(strip_option, into))] + #[builder(default = None, setter(strip_option, into))] /// ``OpenAPI`` UI path / 模``OpenAPI`` UI路径 pub ui_path: Option, - #[builder(default = Some(String::from("spec")), setter(strip_option, into))] + #[builder(default = None, setter(strip_option, into))] /// ``OpenAPI`` information path / ``OpenAPI`` 信息路径 pub spec_path: Option, #[builder(default = true)] diff --git a/tardis/src/utils/tardis_component.rs b/tardis/src/utils/tardis_component.rs index e377d488..84edf166 100644 --- a/tardis/src/utils/tardis_component.rs +++ b/tardis/src/utils/tardis_component.rs @@ -133,7 +133,7 @@ impl TardisComponentMapInner { } impl TardisComponentMapInner { - const LOCK_EXPECT: &str = "encounter an poisoned lock when trying to lock component"; + const LOCK_EXPECT: &'static str = "encounter an poisoned lock when trying to lock component"; pub fn new() -> Self { Self { diff --git a/tardis/src/utils/tardis_static.rs b/tardis/src/utils/tardis_static.rs index a5d3297c..46b3d16e 100644 --- a/tardis/src/utils/tardis_static.rs +++ b/tardis/src/utils/tardis_static.rs @@ -43,7 +43,7 @@ macro_rules! tardis_static { Some(val) => break val, None => { $crate::tokio::task::yield_now().await; } } - } + } } $crate::tardis_static!($($rest)*); } diff --git a/tardis/src/web/ws_client.rs b/tardis/src/web/ws_client.rs index 2d459b8b..c7f9451f 100644 --- a/tardis/src/web/ws_client.rs +++ b/tardis/src/web/ws_client.rs @@ -1,18 +1,15 @@ use std::pin::Pin; use std::sync::Arc; -#[cfg(feature = "future")] -use futures::stream::SplitSink; #[cfg(feature = "future")] use futures::{Future, SinkExt, StreamExt}; use native_tls::TlsConnector; use serde::de::Deserialize; use serde::Serialize; use serde_json::Value; -use tokio::sync::{mpsc, Notify, OwnedSemaphorePermit, RwLock, Semaphore}; -use tokio::{net::TcpStream, sync::watch}; -use tokio_tungstenite::tungstenite::{self, Error, Message}; -use tokio_tungstenite::{Connector, MaybeTlsStream, WebSocketStream}; +use tokio::sync::{mpsc, OwnedSemaphorePermit, RwLock, Semaphore}; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::Connector; use tracing::{debug, info}; use tracing::{trace, warn}; use url::Url; diff --git a/tardis/src/web/ws_processor.rs b/tardis/src/web/ws_processor.rs index 623d2089..228b25fd 100644 --- a/tardis/src/web/ws_processor.rs +++ b/tardis/src/web/ws_processor.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "cluster")] pub mod cluster_protocol; use std::sync::Arc; @@ -8,12 +9,11 @@ use lru::LruCache; use poem::web::websocket::{BoxWebSocketUpgraded, CloseCode, Message, WebSocket}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use tokio::sync::{broadcast::Sender, Mutex, RwLock}; +use tokio::sync::{Mutex, RwLock}; use tracing::trace; use tracing::warn; -use crate::cluster::cluster_broadcast::ClusterBroadcastChannel; -use crate::{TardisFuns, tardis_static}; +use crate::{tardis_static, TardisFuns}; pub const WS_SYSTEM_EVENT_INFO: &str = "__sys_info__"; pub const WS_SYSTEM_EVENT_AVATAR_ADD: &str = "__sys_avatar_add__"; @@ -74,15 +74,16 @@ where .boxed() } -fn ws_send_to_channel(send_msg: TardisWebsocketMgrMessage, inner_sender: &TX) -where TX: WsBroadcastSender +fn ws_send_to_channel(send_msg: TardisWebsocketMgrMessage, inner_sender: &TX) +where + TX: WsBroadcastSender, { - inner_sender - .send(send_msg.clone()); + inner_sender.send(send_msg.clone()); } pub fn ws_send_error_to_channel(req_message: &str, error_message: &str, from_avatar: &str, from_inst_id: &str, inner_sender: &TX) -where TX: WsBroadcastSender +where + TX: WsBroadcastSender, { let send_msg = TardisWebsocketMgrMessage { id: TardisFuns::field.nanoid(), @@ -121,25 +122,6 @@ impl WsBroadcastSender for tokio::sync::broadcast::Sender { - fn subscribe(&self) -> tokio::sync::broadcast::Receiver { - self.local_broadcast_channel.subscribe() - } - - fn send(&self, msg: TardisWebsocketMgrMessage) { - self.send(msg); - } -} -impl WsBroadcastSender for Arc> { - fn subscribe(&self) -> tokio::sync::broadcast::Receiver { - self.local_broadcast_channel.subscribe() - } - - fn send(&self, msg: TardisWebsocketMgrMessage) { - ClusterBroadcastChannel::send(&self, msg); - } -} - pub async fn ws_broadcast( avatars: Vec, mgr_node: bool, diff --git a/tardis/src/web/ws_processor/cluster_protocol.rs b/tardis/src/web/ws_processor/cluster_protocol.rs index 410c245e..da6c81ad 100644 --- a/tardis/src/web/ws_processor/cluster_protocol.rs +++ b/tardis/src/web/ws_processor/cluster_protocol.rs @@ -1,23 +1,16 @@ -//! -//! -//! -//! -//! Protocol: -//! -//! 1. Route -//! 2. Avatar -//! 3. Forward - use serde::{Deserialize, Serialize}; use serde_json::Value; -use std::{borrow::Cow, collections::HashMap}; +use std::{borrow::Cow, collections::HashMap, sync::Arc}; use crate::{ basic::result::TardisResult, - cluster::cluster_processor::{TardisClusterMessageReq, TardisClusterSubscriber}, + cluster::{ + cluster_broadcast::ClusterBroadcastChannel, + cluster_processor::{TardisClusterMessageReq, TardisClusterSubscriber}, + }, }; -use super::ws_insts_mapping_avatars; +use super::{ws_insts_mapping_avatars, TardisWebsocketMgrMessage, WsBroadcastSender}; pub const EVENT_AVATAR: &str = "tardis/avatar"; @@ -26,7 +19,6 @@ pub(crate) struct Avatar; #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) enum AvatarMessage { Sync { table: HashMap> }, - } #[async_trait::async_trait] @@ -51,31 +43,22 @@ impl TardisClusterSubscriber for Avatar { } } -// pub(crate) struct Forward {} - -// #[derive(Debug, Clone, Serialize, Deserialize)] -// pub(crate) enum ForwardMessage { -// Forward { -// to_inst: String, -// message: String, -// }, -// } +impl WsBroadcastSender for ClusterBroadcastChannel { + fn subscribe(&self) -> tokio::sync::broadcast::Receiver { + self.local_broadcast_channel.subscribe() + } -// #[async_trait::async_trait] -// impl TardisClusterSubscriber for Forward { -// fn event_name(&self) -> Cow<'static, str> { -// "cluster/forward".into() -// } + fn send(&self, msg: TardisWebsocketMgrMessage) { + self.send(msg); + } +} -// async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult> { -// // let from_node = message_req.req_node_id; -// if let Ok(message) = serde_json::from_value(message_req.msg) { -// match message { -// ForwardMessage::Forward { to_inst, message } => { +impl WsBroadcastSender for Arc> { + fn subscribe(&self) -> tokio::sync::broadcast::Receiver { + self.local_broadcast_channel.subscribe() + } -// } -// } -// } -// Ok(None) -// } -// } \ No newline at end of file + fn send(&self, msg: TardisWebsocketMgrMessage) { + ClusterBroadcastChannel::send(self, msg); + } +} diff --git a/tardis/tests/test_cluster.rs b/tardis/tests/test_cluster.rs index a0ff578f..eba78807 100644 --- a/tardis/tests/test_cluster.rs +++ b/tardis/tests/test_cluster.rs @@ -1,7 +1,6 @@ use std::{ borrow::Cow, env, - ffi::OsStr, path::Path, sync::atomic::{AtomicUsize, Ordering}, time::Duration, From 6cf61f9a09ca9ae9e76663e42d7b413f9dd24094 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Sat, 25 Nov 2023 16:32:29 +0800 Subject: [PATCH 5/6] fix version conflict --- tardis/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tardis/Cargo.toml b/tardis/Cargo.toml index f773973c..00ecda92 100644 --- a/tardis/Cargo.toml +++ b/tardis/Cargo.toml @@ -205,7 +205,7 @@ rust-s3 = { version = "0.33", optional = true } anyhow = { version = "1.0", optional = true } # K8s -kube = { version = "0.86", features = ["runtime"], optional = true } +kube = { version = "0.87", features = ["runtime"], optional = true } k8s-openapi = { version = "0.20", features = ["earliest"], optional = true } # Test From 1bd54633059f6ecd22caec715f8917b5a19d58e9 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Sat, 25 Nov 2023 17:38:04 +0800 Subject: [PATCH 6/6] resolve tardis grpc version --- tardis/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tardis/Cargo.toml b/tardis/Cargo.toml index 00ecda92..a8c4be82 100644 --- a/tardis/Cargo.toml +++ b/tardis/Cargo.toml @@ -167,7 +167,7 @@ poem = { version = "1.3", features = [ "tempfile", "session", ], optional = true } -poem-grpc = { version = "0.2.22", optional = true } +poem-grpc = { version = "=0.2.22", optional = true } # Web Client reqwest = { version = "0.11", features = [