Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance cluster and websocket client #90

Merged
merged 6 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion tardis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ poem = { version = "1.3", features = [
"websocket",
"multipart",
"tempfile",
"session",
], optional = true }
poem-grpc = { version = "0.2.22", optional = true }

Expand Down Expand Up @@ -216,11 +217,12 @@ testcontainers-modules = { version = "0.1.2", features = [

[dev-dependencies]
# Common
tokio = { version = "1", features = ["time", "rt", "macros", "sync"] }
tokio = { version = "1", features = ["time", "rt", "macros", "sync", "process"] }
criterion = { version = "0.5" }
poem-grpc-build = "0.2.22"
prost = "0.11"
strip-ansi-escapes = "0.2.0"
portpicker = "0.1.1"

[[test]]
name = "test_config"
Expand Down
2 changes: 1 addition & 1 deletion tardis/src/basic/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ where
L: SubscriberInitExt + 'static,
{
/// Initialize tardis tracing, this will set the global tardis tracing instance.
pub(crate) fn init(self) -> Arc<TardisTracing> {
pub fn init(self) -> Arc<TardisTracing> {
static INITIALIZED: Once = Once::new();
let configer_list = self.configers;
if INITIALIZED.is_completed() {
Expand Down
20 changes: 20 additions & 0 deletions tardis/src/cache/cache_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,26 @@ impl TardisCacheClient {
self.get_connection().await?.llen(key).await
}

pub async fn lrem(&self, key: &str, count: isize, value: &str) -> RedisResult<usize> {
trace!("[Tardis.CacheClient] lrem, key:{}", key);
self.get_connection().await?.lrem(key, count, value).await
}

pub async fn linsert_after(&self, key: &str, count: isize, value: &str) -> RedisResult<usize> {
trace!("[Tardis.CacheClient] linsert_after, key:{}", key);
self.get_connection().await?.linsert_after(key, count, value).await
}

pub async fn linsert_before(&self, key: &str, count: isize, value: &str) -> RedisResult<usize> {
trace!("[Tardis.CacheClient] linsert_after, key:{}", key);
self.get_connection().await?.linsert_before(key, count, value).await
}

pub async fn lset(&self, key: &str, count: isize, value: &str) -> RedisResult<usize> {
trace!("[Tardis.CacheClient] lset, key:{}", key);
self.get_connection().await?.lset(key, count, value).await
}

// hash operations

pub async fn hget(&self, key: &str, field: &str) -> RedisResult<Option<String>> {
Expand Down
4 changes: 4 additions & 0 deletions tardis/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
pub mod cluster_broadcast;
pub mod cluster_hashmap;
pub mod cluster_processor;
pub mod cluster_publish;
pub mod cluster_receive;
mod cluster_watch_by_cache;
#[cfg(feature = "k8s")]
mod cluster_watch_by_k8s;
107 changes: 107 additions & 0 deletions tardis/src/cluster/cluster_broadcast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use std::{
borrow::Cow,
sync::{Arc, Weak},
};

use async_trait::async_trait;
use serde_json::Value;
use tokio::sync::broadcast;

use crate::basic::result::TardisResult;

use super::{
cluster_processor::{subscribe, unsubscribe, ClusterEventTarget, TardisClusterMessageReq, TardisClusterSubscriber},
cluster_publish::publish_event_no_response,
};

pub struct ClusterBroadcastChannel<T>
where
T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned,
{
pub ident: String,
pub local_broadcast_channel: broadcast::Sender<T>,
}

impl<T> ClusterBroadcastChannel<T>
where
T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned,
{
pub fn event_name(&self) -> String {
format!("tardis/broadcast/{}", self.ident)
}
pub fn send(&self, message: T) {
let _ = self.local_broadcast_channel.send(message.clone());
let event = format!("tardis/broadcast/{}", self.ident);
tokio::spawn(async move {
if let Ok(json_value) = serde_json::to_value(message) {
let json = json_value;
let _ = publish_event_no_response(event, json, ClusterEventTarget::Broadcast).await;
}
});
}
pub fn new(ident: impl Into<String>, capacity: usize) -> Arc<Self> {
let sender = broadcast::Sender::new(capacity);
let cluster_chan = Arc::new(Self {
ident: ident.into(),
local_broadcast_channel: sender,
});

let subscriber = BroadcastChannelSubscriber {
channel: Arc::downgrade(&cluster_chan),
event_name: cluster_chan.event_name(),
};
tokio::spawn(subscribe(subscriber));
cluster_chan
}
}

impl<T> Drop for ClusterBroadcastChannel<T>
where
T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned,
{
fn drop(&mut self) {
let event_name = self.event_name();
tokio::spawn(async move {
unsubscribe(&event_name).await;
});
}
}

impl<T> std::ops::Deref for ClusterBroadcastChannel<T>
where
T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned,
{
type Target = broadcast::Sender<T>;

fn deref(&self) -> &Self::Target {
&self.local_broadcast_channel
}
}

pub struct BroadcastChannelSubscriber<T>
where
T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned,
{
event_name: String,
channel: Weak<ClusterBroadcastChannel<T>>,
}

#[async_trait]
impl<T> TardisClusterSubscriber for BroadcastChannelSubscriber<T>
where
T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned,
{
fn event_name(&self) -> Cow<'static, str> {
self.event_name.to_string().into()
}
async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult<Option<Value>> {
if let Ok(message) = serde_json::from_value(message_req.msg) {
if let Some(chan) = self.channel.upgrade() {
let _ = chan.send(message);
} else {
unsubscribe(&self.event_name()).await;
}
}
Ok(None)
}
}
164 changes: 164 additions & 0 deletions tardis/src/cluster/cluster_hashmap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
use std::{
borrow::Cow,
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};

use crate::basic::{json::TardisJson, result::TardisResult};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::hash::Hash;
use tokio::sync::RwLock;

use super::{
cluster_processor::{peer_count, ClusterEventTarget, TardisClusterMessageReq, TardisClusterSubscriber},
cluster_publish::{publish_event_no_response, ClusterEvent},
cluster_receive::listen::Stream,
};

// Cshm = ClusterStaticHashMap
#[derive(Debug, Clone)]
pub struct ClusterStaticHashMap<K, V> {
pub map: Arc<RwLock<HashMap<K, V>>>,
pub ident: &'static str,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
enum CshmEvent<K, V> {
Insert(Vec<(K, V)>),
Remove { keys: Vec<K> },
Get { key: K },
}

impl<K, V> ClusterStaticHashMap<K, V>
where
K: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + Hash + Eq,
V: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned,
{
pub fn new(ident: &'static str) -> Self {
Self {
map: Arc::new(RwLock::new(HashMap::new())),
ident,
}
}
pub fn event_name(&self) -> String {
format!("tardis/hashmap/{ident}", ident = self.ident)
}
pub fn local(&self) -> &RwLock<HashMap<K, V>> {
&self.map
}
pub async fn insert(&self, key: K, value: V) -> TardisResult<()> {
self.map.write().await.insert(key.clone(), value.clone());
let event = CshmEvent::<K, V>::Insert(vec![(key, value)]);
let json = TardisJson.obj_to_json(&event)?;
dbg!(&json);
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
Ok(())
}
pub async fn batch_insert(&self, pairs: Vec<(K, V)>) -> TardisResult<()> {
{
let mut wg = self.map.write().await;
for (key, value) in pairs.iter() {
wg.insert(key.clone(), value.clone());
}
}
let event = CshmEvent::<K, V>::Insert(pairs);
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
Ok(())
}
pub async fn remove(&self, key: K) -> TardisResult<()> {
self.map.write().await.remove(&key);
let event = CshmEvent::<K, V>::Remove { keys: vec![key] };
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
Ok(())
}
pub async fn batch_remove(&self, keys: Vec<K>) -> TardisResult<()> {
{
let mut wg = self.map.write().await;
for key in keys.iter() {
wg.remove(key);
}
}
let event = CshmEvent::<K, V>::Remove { keys };
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
Ok(())
}
pub async fn get(&self, key: K) -> TardisResult<Option<V>> {
if let Some(v) = self.map.read().await.get(&key) {
Ok(Some(v.clone()))
} else {
self.get_remote(key.clone()).await
}
}
async fn get_remote(&self, key: K) -> TardisResult<Option<V>> {
let peer_count = peer_count().await;
if peer_count == 0 {
return Ok(None);
}
let Ok(mut receiver) = ClusterEvent::new(self.event_name())
.message(&CshmEvent::<K, V>::Get { key })
.expect("not valid json value")
.listener(Stream)
.target(ClusterEventTarget::Broadcast)
.publish()
.await
else {
return Ok(None);
};

let create_time = Instant::now();
let mut count = 0;
while let Some(resp) = receiver.recv().await {
if let Ok(Some(v)) = TardisJson.json_to_obj::<Option<V>>(resp.msg) {
return Ok(Some(v));
}
count += 1;
if count >= peer_count {
return Ok(None);
}
if create_time.elapsed() > Duration::from_secs(1) {
return Ok(None);
}
}
Ok(None)
}
}

#[async_trait::async_trait]
impl<K, V> TardisClusterSubscriber for ClusterStaticHashMap<K, V>
where
K: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + Hash + Eq,
V: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned,
{
async fn subscribe(&self, message: TardisClusterMessageReq) -> TardisResult<Option<Value>> {
let event: CshmEvent<K, V> = TardisJson.json_to_obj(message.msg)?;
match event {
CshmEvent::Insert(pairs) => {
let mut wg = self.map.write().await;
for (key, value) in pairs {
wg.insert(key, value);
}
Ok(None)
}
CshmEvent::Remove { keys } => {
let mut wg = self.map.write().await;
for key in keys {
wg.remove(&key);
}
Ok(None)
}
CshmEvent::Get { key } => {
let rg = self.map.read().await;
let value = rg.get(&key);
Ok(Some(TardisJson.obj_to_json(&value)?))
}
}
}
fn event_name(&self) -> Cow<'static, str> {
ClusterStaticHashMap::event_name(self).into()
}
}
Loading
Loading