diff --git a/CHANGELOG.md b/CHANGELOG.md index dce56e1..13244dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # 变更日志 | Change log +### 0.4.0 + +- 破坏性变更: 使 NamingService 和 ConfigService impl Send + Sync +- 破坏性变更: 默认 async,去掉 sync api,需要的话建议 `futures::executor::block_on(future_fn)` + +--- + +- Change: make NamingService and ConfigService Send + Sync +- Change: all async API; If you need sync, maybe `futures::executor::block_on(future_fn)` + ### 0.3.6 - 文档: 补充说明 `NamingService` 和 `ConfigService` 需要全局的生命周期 diff --git a/Cargo.toml b/Cargo.toml index 73675f4..3bf2fdc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,6 @@ config = [] naming = [] tls = ["reqwest/default-tls"] auth-by-http = ["reqwest"] -async = [] [dependencies] nacos-macro = { version = "0.1.0", path = "nacos-macro" } diff --git a/README.md b/README.md index 3169c94..1570dc2 100644 --- a/README.md +++ b/README.md @@ -22,8 +22,8 @@ Add the dependency in `Cargo.toml`: ```toml [dependencies] -# If you need async API, which can be enabled via `features = ["async"]` -nacos-sdk = { version = "0.3", features = ["default"] } +# If you need sync API, maybe `futures::executor::block_on(future_fn)` +nacos-sdk = { version = "0.4", features = ["default"] } ``` ### Usage of Config @@ -32,7 +32,7 @@ nacos-sdk = { version = "0.3", features = ["default"] } // 因为它内部会初始化与服务端的长链接,后续的数据交互及变更订阅,都是实时地通过长链接告知客户端的。 let config_service = ConfigServiceBuilder::new( ClientProps::new() - .server_addr("0.0.0.0:8848") + .server_addr("127.0.0.1:8848") // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. .namespace("") .app_name("simple_app"), @@ -43,7 +43,7 @@ nacos-sdk = { version = "0.3", features = ["default"] } .build()?; // example get a config - let config_resp = config_service.get_config("todo-data-id".to_string(), "todo-group".to_string()); + let config_resp = config_service.get_config("todo-data-id".to_string(), "todo-group".to_string()).await; match config_resp { Ok(config_resp) => tracing::info!("get the config {}", config_resp), Err(err) => tracing::error!("get the config {:?}", err), @@ -62,7 +62,7 @@ nacos-sdk = { version = "0.3", features = ["default"] } "todo-data-id".to_string(), "todo-group".to_string(), Arc::new(ExampleConfigChangeListener {}), - ); + ).await; match _listen { Ok(_) => tracing::info!("listening the config success"), Err(err) => tracing::error!("listen config error {:?}", err), @@ -75,7 +75,7 @@ nacos-sdk = { version = "0.3", features = ["default"] } // 因为它内部会初始化与服务端的长链接,后续的数据交互及变更订阅,都是实时地通过长链接告知客户端的。 let naming_service = NamingServiceBuilder::new( ClientProps::new() - .server_addr("0.0.0.0:8848") + .server_addr("127.0.0.1:8848") // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. .namespace("") .app_name("simple_app"), @@ -100,7 +100,7 @@ nacos-sdk = { version = "0.3", features = ["default"] } Some(constants::DEFAULT_GROUP.to_string()), Vec::default(), subscriber, - ); + ).await; // example naming register instances let service_instance1 = ServiceInstance { @@ -112,7 +112,7 @@ nacos-sdk = { version = "0.3", features = ["default"] } "test-service".to_string(), Some(constants::DEFAULT_GROUP.to_string()), vec![service_instance1], - ); + ).await; ``` ### Props diff --git a/examples/simple_app.rs b/examples/simple_app.rs index 4cb88e8..f9b5dfa 100644 --- a/examples/simple_app.rs +++ b/examples/simple_app.rs @@ -7,8 +7,6 @@ use nacos_sdk::api::naming::{ }; use nacos_sdk::api::props::ClientProps; -const NACOS_ADDRESS: &str = "127.0.0.1:8848"; - /// enable https auth run with command: /// cargo run --example simple_app --features default,tls #[tokio::main] @@ -23,7 +21,7 @@ async fn main() -> Result<(), Box> { .init(); let client_props = ClientProps::new() - .server_addr(NACOS_ADDRESS) + .server_addr(constants::DEFAULT_SERVER_ADDR) // .remote_grpc_port(9838) // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. .namespace("") @@ -38,17 +36,21 @@ async fn main() -> Result<(), Box> { let config_service = ConfigServiceBuilder::new(client_props.clone()) .enable_auth_plugin_http() // TODO You can choose not to enable auth .build()?; - let config_resp = config_service.get_config("todo-data-id".to_string(), "LOVE".to_string()); + let config_resp = config_service + .get_config("todo-data-id".to_string(), "LOVE".to_string()) + .await; match config_resp { Ok(config_resp) => tracing::info!("get the config {}", config_resp), Err(err) => tracing::error!("get the config {:?}", err), } - let _listen = config_service.add_listener( - "todo-data-id".to_string(), - "LOVE".to_string(), - std::sync::Arc::new(SimpleConfigChangeListener {}), - ); + let _listen = config_service + .add_listener( + "todo-data-id".to_string(), + "LOVE".to_string(), + std::sync::Arc::new(SimpleConfigChangeListener {}), + ) + .await; match _listen { Ok(_) => tracing::info!("listening the config success"), Err(err) => tracing::error!("listen config error {:?}", err), @@ -62,31 +64,37 @@ async fn main() -> Result<(), Box> { .build()?; let listener = std::sync::Arc::new(SimpleInstanceChangeListener); - let _subscribe_ret = naming_service.subscribe( - "test-service".to_string(), - Some(constants::DEFAULT_GROUP.to_string()), - Vec::default(), - listener, - ); + let _subscribe_ret = naming_service + .subscribe( + "test-service".to_string(), + Some(constants::DEFAULT_GROUP.to_string()), + Vec::default(), + listener, + ) + .await; let service_instance1 = ServiceInstance { ip: "127.0.0.1".to_string(), port: 9090, ..Default::default() }; - let _register_instance_ret = naming_service.batch_register_instance( - "test-service".to_string(), - Some(constants::DEFAULT_GROUP.to_string()), - vec![service_instance1], - ); + let _register_instance_ret = naming_service + .batch_register_instance( + "test-service".to_string(), + Some(constants::DEFAULT_GROUP.to_string()), + vec![service_instance1], + ) + .await; tokio::time::sleep(tokio::time::Duration::from_millis(666)).await; - let instances_ret = naming_service.get_all_instances( - "test-service".to_string(), - Some(constants::DEFAULT_GROUP.to_string()), - Vec::default(), - false, - ); + let instances_ret = naming_service + .get_all_instances( + "test-service".to_string(), + Some(constants::DEFAULT_GROUP.to_string()), + Vec::default(), + false, + ) + .await; match instances_ret { Ok(instances) => tracing::info!("get_all_instances {:?}", instances), Err(err) => tracing::error!("naming get_all_instances error {:?}", err), diff --git a/src/api/config.rs b/src/api/config.rs index da897ed..a855876 100644 --- a/src/api/config.rs +++ b/src/api/config.rs @@ -10,7 +10,7 @@ use crate::api::{error, plugin, props}; /// ```ignore /// let mut config_service = nacos_sdk::api::config::ConfigServiceBuilder::new( /// nacos_sdk::api::props::ClientProps::new() -/// .server_addr("0.0.0.0:8848") +/// .server_addr("127.0.0.1:8848") /// // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. /// .namespace("") /// .app_name("todo-your-app-name"), @@ -18,74 +18,6 @@ use crate::api::{error, plugin, props}; /// .build()?; /// ``` #[doc(alias("config", "sdk", "api"))] -#[cfg(not(feature = "async"))] -pub trait ConfigService { - /// Get config, return the content. - /// - /// Attention to [`error::Error::ConfigNotFound`], [`error::Error::ConfigQueryConflict`] - fn get_config(&self, data_id: String, group: String) -> error::Result; - - /// Publish config, return true/false. - fn publish_config( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - ) -> error::Result; - - /// Cas publish config with cas_md5 (prev content's md5), return true/false. - fn publish_config_cas( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - cas_md5: String, - ) -> error::Result; - - /// Beta publish config, return true/false. - fn publish_config_beta( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - beta_ips: String, - ) -> error::Result; - - /// Publish config with params (see keys [`constants::*`]), return true/false. - fn publish_config_param( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - cas_md5: Option, - params: HashMap, - ) -> error::Result; - - /// Remove config, return true/false. - fn remove_config(&self, data_id: String, group: String) -> error::Result; - - /// Listen the config change. - fn add_listener( - &self, - data_id: String, - group: String, - listener: Arc, - ) -> error::Result<()>; - - /// Remove a Listener. - fn remove_listener( - &self, - data_id: String, - group: String, - listener: Arc, - ) -> error::Result<()>; -} - -#[cfg(feature = "async")] #[async_trait::async_trait] pub trait ConfigService: Send + Sync { /// Get config, return the content. @@ -256,7 +188,7 @@ pub mod constants { /// ```ignore /// let mut config_service = nacos_sdk::api::config::ConfigServiceBuilder::new( /// nacos_sdk::api::props::ClientProps::new() -/// .server_addr("0.0.0.0:8848") +/// .server_addr("127.0.0.1:8848") /// // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. /// .namespace("") /// .app_name("todo-your-app-name"), @@ -368,21 +300,26 @@ mod tests { "test_api_config_service".to_string(), Some("text".to_string()), ) + .await .unwrap(); // sleep for config sync in server sleep(Duration::from_millis(111)).await; - let config = config_service.get_config(data_id.clone(), group.clone()); + let config = config_service + .get_config(data_id.clone(), group.clone()) + .await; match config { Ok(config) => tracing::info!("get the config {}", config), Err(err) => tracing::error!("get the config {:?}", err), } - let _listen = config_service.add_listener( - data_id.clone(), - group.clone(), - std::sync::Arc::new(TestConfigChangeListener {}), - ); + let _listen = config_service + .add_listener( + data_id.clone(), + group.clone(), + std::sync::Arc::new(TestConfigChangeListener {}), + ) + .await; match _listen { Ok(_) => tracing::info!("listening the config success"), Err(err) => tracing::error!("listen config error {:?}", err), @@ -396,22 +333,26 @@ mod tests { "test_api_config_service_for_listener".to_string(), Some("text".to_string()), ) + .await .unwrap(); // example get a config not exit - let config_resp = - config_service.get_config("todo-data-id".to_string(), "todo-group".to_string()); + let config_resp = config_service + .get_config("todo-data-id".to_string(), "todo-group".to_string()) + .await; match config_resp { Ok(config_resp) => tracing::info!("get the config {}", config_resp), Err(err) => tracing::error!("get the config {:?}", err), } // example add a listener with config not exit - let _listen = config_service.add_listener( - "todo-data-id".to_string(), - "todo-group".to_string(), - std::sync::Arc::new(TestConfigChangeListener {}), - ); + let _listen = config_service + .add_listener( + "todo-data-id".to_string(), + "todo-group".to_string(), + std::sync::Arc::new(TestConfigChangeListener {}), + ) + .await; match _listen { Ok(_) => tracing::info!("listening the config success"), Err(err) => tracing::error!("listen config error {:?}", err), @@ -431,8 +372,9 @@ mod tests { let config_service = ConfigServiceBuilder::default().build().unwrap(); // remove a config not exit - let remove_resp = - config_service.remove_config("todo-data-id".to_string(), "todo-group".to_string()); + let remove_resp = config_service + .remove_config("todo-data-id".to_string(), "todo-group".to_string()) + .await; match remove_resp { Ok(result) => tracing::info!("remove a config not exit: {}", result), Err(err) => tracing::error!("remove a config not exit: {:?}", err), @@ -456,6 +398,7 @@ mod tests { "test_api_config_service_publish_config".to_string(), Some("text".to_string()), ) + .await .unwrap(); tracing::info!("publish a config: {}", publish_resp); assert_eq!(true, publish_resp); @@ -485,6 +428,7 @@ mod tests { None, params, ) + .await .unwrap(); tracing::info!("publish a config with param: {}", publish_resp); assert_eq!(true, publish_resp); @@ -508,6 +452,7 @@ mod tests { None, "127.0.0.1,192.168.0.1".to_string(), ) + .await .unwrap(); tracing::info!("publish a config with beta: {}", publish_resp); assert_eq!(true, publish_resp); @@ -532,6 +477,7 @@ mod tests { "test_api_config_service_publish_config_cas".to_string(), None, ) + .await .unwrap(); assert_eq!(true, publish_resp); @@ -541,6 +487,7 @@ mod tests { // get a config let config_resp = config_service .get_config(data_id.clone(), group.clone()) + .await .unwrap(); // publish a config with cas @@ -554,19 +501,22 @@ mod tests { None, config_resp.md5().to_string(), ) + .await .unwrap(); tracing::info!("publish a config with cas: {}", publish_resp); assert_eq!(true, publish_resp); // publish a config with cas md5 not right let content_cas_md5_not_right = "test_api_config_service_publish_config_cas_md5_not_right"; - let publish_resp = config_service.publish_config_cas( - data_id.clone(), - group.clone(), - content_cas_md5_not_right.to_string(), - None, - config_resp.md5().to_string(), - ); + let publish_resp = config_service + .publish_config_cas( + data_id.clone(), + group.clone(), + content_cas_md5_not_right.to_string(), + None, + config_resp.md5().to_string(), + ) + .await; match publish_resp { Ok(result) => tracing::info!("publish a config with cas: {}", result), Err(err) => tracing::error!("publish a config with cas: {:?}", err), @@ -575,6 +525,7 @@ mod tests { let config_resp = config_service .get_config(data_id.clone(), group.clone()) + .await .unwrap(); assert_ne!(content_cas_md5_not_right, config_resp.content().as_str()); assert_eq!(content_cas_md5.as_str(), config_resp.content().as_str()); diff --git a/src/api/constants.rs b/src/api/constants.rs index 72d6a51..872a6bf 100644 --- a/src/api/constants.rs +++ b/src/api/constants.rs @@ -1,4 +1,4 @@ -pub const DEFAULT_SERVER_ADDR: &str = "0.0.0.0:8848"; +pub const DEFAULT_SERVER_ADDR: &str = "127.0.0.1:8848"; pub const DEFAULT_SERVER_PORT: u32 = 8848; diff --git a/src/api/naming.rs b/src/api/naming.rs index f13ee0d..9ec89fa 100644 --- a/src/api/naming.rs +++ b/src/api/naming.rs @@ -135,7 +135,7 @@ pub trait NamingEventListener: Send + Sync + 'static { /// ```ignore /// let mut naming_service = nacos_sdk::api::naming::NamingServiceBuilder::new( /// nacos_sdk::api::props::ClientProps::new() -/// .server_addr("0.0.0.0:8848") +/// .server_addr("127.0.0.1:8848") /// // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. /// .namespace("") /// .app_name("todo-your-app-name"), @@ -143,79 +143,6 @@ pub trait NamingEventListener: Send + Sync + 'static { /// .build()?; /// ``` #[doc(alias("naming", "sdk", "api"))] -#[cfg(not(feature = "async"))] -pub trait NamingService { - fn register_instance( - &self, - service_name: String, - group_name: Option, - service_instance: ServiceInstance, - ) -> Result<()>; - - fn deregister_instance( - &self, - service_name: String, - group_name: Option, - service_instance: ServiceInstance, - ) -> Result<()>; - - fn batch_register_instance( - &self, - service_name: String, - group_name: Option, - service_instances: Vec, - ) -> Result<()>; - - fn get_all_instances( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - ) -> Result>; - - fn select_instances( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - healthy: bool, - ) -> Result>; - - fn select_one_healthy_instance( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - ) -> Result; - - fn get_service_list( - &self, - page_no: i32, - page_size: i32, - group_name: Option, - ) -> Result<(Vec, i32)>; - - fn subscribe( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - event_listener: Arc, - ) -> Result<()>; - - fn unsubscribe( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - event_listener: Arc, - ) -> Result<()>; -} - -#[cfg(feature = "async")] #[async_trait::async_trait] pub trait NamingService: Send + Sync { async fn register_instance( @@ -295,7 +222,7 @@ pub trait NamingService: Send + Sync { /// ```ignore /// let mut naming_service = nacos_sdk::api::naming::NamingServiceBuilder::new( /// nacos_sdk::api::props::ClientProps::new() -/// .server_addr("0.0.0.0:8848") +/// .server_addr("127.0.0.1:8848") /// // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. /// .namespace("") /// .app_name("todo-your-app-name"), diff --git a/src/api/plugin/auth/auth_by_http.rs b/src/api/plugin/auth/auth_by_http.rs index b53273f..7d3ffcd 100644 --- a/src/api/plugin/auth/auth_by_http.rs +++ b/src/api/plugin/auth/auth_by_http.rs @@ -122,7 +122,7 @@ mod tests { .init(); let http_auth_plugin = HttpLoginAuthPlugin::default(); - let server_list = vec!["0.0.0.0:8848".to_string()]; + let server_list = vec!["127.0.0.1:8848".to_string()]; let auth_context = AuthContext::default() .add_param(crate::api::plugin::USERNAME, "nacos") diff --git a/src/config/mod.rs b/src/config/mod.rs index d7ff80f..0227851 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -50,115 +50,6 @@ impl NacosConfigService { } } -#[cfg(not(feature = "async"))] -impl ConfigService for NacosConfigService { - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn get_config( - &self, - data_id: String, - group: String, - ) -> crate::api::error::Result { - let future = self.client_worker.get_config(data_id, group); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn publish_config( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - ) -> crate::api::error::Result { - let future = self - .client_worker - .publish_config(data_id, group, content, content_type); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn publish_config_cas( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - cas_md5: String, - ) -> crate::api::error::Result { - let future = - self.client_worker - .publish_config_cas(data_id, group, content, content_type, cas_md5); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn publish_config_beta( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - beta_ips: String, - ) -> crate::api::error::Result { - let future = - self.client_worker - .publish_config_beta(data_id, group, content, content_type, beta_ips); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn publish_config_param( - &self, - data_id: String, - group: String, - content: String, - content_type: Option, - cas_md5: Option, - params: std::collections::HashMap, - ) -> crate::api::error::Result { - let future = self.client_worker.publish_config_param( - data_id, - group, - content, - content_type, - cas_md5, - params, - ); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn remove_config(&self, data_id: String, group: String) -> crate::api::error::Result { - let future = self.client_worker.remove_config(data_id, group); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn add_listener( - &self, - data_id: String, - group: String, - listener: std::sync::Arc, - ) -> crate::api::error::Result<()> { - let future = self.client_worker.add_listener(data_id, group, listener); - futures::executor::block_on(future); - Ok(()) - } - - #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] - fn remove_listener( - &self, - data_id: String, - group: String, - listener: std::sync::Arc, - ) -> crate::api::error::Result<()> { - let future = self.client_worker.remove_listener(data_id, group, listener); - futures::executor::block_on(future); - Ok(()) - } -} - -#[cfg(feature = "async")] #[async_trait::async_trait] impl ConfigService for NacosConfigService { #[instrument(fields(client_id = &self.client_id, group = group, data_id = data_id), skip_all)] diff --git a/src/lib.rs b/src/lib.rs index 8876406..80efc80 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,10 +25,10 @@ //! ## Add Dependency //! //! Add the dependency in `Cargo.toml`: -//! - If you need async API, which can be enabled via `features = ["async"]` +//! - If you need sync API, maybe `futures::executor::block_on(future_fn)` //! ```toml //! [dependencies] -//! nacos-sdk = { version = "0.3", features = ["default"] } +//! nacos-sdk = { version = "0.4", features = ["default"] } //! ``` //! //! ## General Configurations and Initialization @@ -40,7 +40,7 @@ //! ```ignore //! let config_service = nacos_sdk::api::config::ConfigServiceBuilder::new( //! nacos_sdk::api::props::ClientProps::new() -//! .server_addr("0.0.0.0:8848") +//! .server_addr("127.0.0.1:8848") //! // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. //! .namespace("") //! .app_name("todo-your-app-name"), @@ -53,7 +53,7 @@ //! ```ignore //! let naming_service = nacos_sdk::api::naming::NamingServiceBuilder::new( //! nacos_sdk::api::props::ClientProps::new() -//! .server_addr("0.0.0.0:8848") +//! .server_addr("127.0.0.1:8848") //! // Attention! "public" is "", it is recommended to customize the namespace with clear meaning. //! .namespace("") //! .app_name("todo-your-app-name"), diff --git a/src/naming/mod.rs b/src/naming/mod.rs index d0a9274..f2b3d06 100644 --- a/src/naming/mod.rs +++ b/src/naming/mod.rs @@ -643,142 +643,24 @@ impl NacosNamingService { } } -#[cfg(not(feature = "async"))] +#[async_trait::async_trait] impl NamingService for NacosNamingService { #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn deregister_instance( - &self, - service_name: String, - group_name: Option, - service_instance: ServiceInstance, - ) -> Result<()> { - if service_instance.ephemeral { - let future = self.deregister_ephemeral_instance_async( - service_name, - group_name, - service_instance, - ); - futures::executor::block_on(future) - } else { - let future = self.deregister_persistent_instance_async( - service_name, - group_name, - service_instance, - ); - futures::executor::block_on(future) - } - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn batch_register_instance( - &self, - service_name: String, - group_name: Option, - service_instances: Vec, - ) -> Result<()> { - let future = - self.batch_register_instance_async(service_name, group_name, service_instances); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn get_all_instances( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - ) -> Result> { - let future = self.get_all_instances_async(service_name, group_name, clusters, subscribe); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn select_one_healthy_instance( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - ) -> Result { - let future = - self.select_one_healthy_instance_async(service_name, group_name, clusters, subscribe); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn get_service_list( - &self, - page_no: i32, - page_size: i32, - group_name: Option, - ) -> Result<(Vec, i32)> { - let future = self.get_service_list_async(page_no, page_size, group_name); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn subscribe( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - event_listener: Arc, - ) -> Result<()> { - let future = self.subscribe_async(service_name, group_name, clusters, Some(event_listener)); - let _ = futures::executor::block_on(future); - Ok(()) - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn unsubscribe( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - event_listener: Arc, - ) -> Result<()> { - let future = - self.unsubscribe_async(service_name, group_name, clusters, Some(event_listener)); - futures::executor::block_on(future) - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn register_instance( + async fn register_instance( &self, service_name: String, group_name: Option, service_instance: ServiceInstance, ) -> Result<()> { if service_instance.ephemeral { - let future = - self.register_ephemeral_instance_async(service_name, group_name, service_instance); - futures::executor::block_on(future) + self.register_ephemeral_instance_async(service_name, group_name, service_instance) + .await } else { - let future = - self.register_persistent_instance_async(service_name, group_name, service_instance); - futures::executor::block_on(future) + self.register_persistent_instance_async(service_name, group_name, service_instance) + .await } } - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - fn select_instances( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - healthy: bool, - ) -> Result> { - let future = - self.select_instances_async(service_name, group_name, clusters, subscribe, healthy); - futures::executor::block_on(future) - } -} - -#[cfg(feature = "async")] -#[async_trait::async_trait] -impl NamingService for NacosNamingService { #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] async fn deregister_instance( &self, @@ -818,6 +700,19 @@ impl NamingService for NacosNamingService { .await } + #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] + async fn select_instances( + &self, + service_name: String, + group_name: Option, + clusters: Vec, + subscribe: bool, + healthy: bool, + ) -> Result> { + self.select_instances_async(service_name, group_name, clusters, subscribe, healthy) + .await + } + #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] async fn select_one_healthy_instance( &self, @@ -866,42 +761,13 @@ impl NamingService for NacosNamingService { self.unsubscribe_async(service_name, group_name, clusters, Some(event_listener)) .await } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - async fn register_instance( - &self, - service_name: String, - group_name: Option, - service_instance: ServiceInstance, - ) -> Result<()> { - if service_instance.ephemeral { - self.register_ephemeral_instance_async(service_name, group_name, service_instance) - .await - } else { - self.register_persistent_instance_async(service_name, group_name, service_instance) - .await - } - } - - #[instrument(fields(client_id = &self.client_id, group = group_name), skip_all)] - async fn select_instances( - &self, - service_name: String, - group_name: Option, - clusters: Vec, - subscribe: bool, - healthy: bool, - ) -> Result> { - self.select_instances_async(service_name, group_name, clusters, subscribe, healthy) - .await - } } #[cfg(test)] pub(crate) mod tests { use core::time; - use std::{collections::HashMap, thread}; + use std::collections::HashMap; use tracing::{info, metadata::LevelFilter}; @@ -909,9 +775,9 @@ pub(crate) mod tests { use super::*; - #[test] + #[tokio::test] #[ignore] - fn test_ephemeral_register_service() -> Result<()> { + async fn test_ephemeral_register_service() { tracing_subscriber::fmt() .with_thread_names(true) .with_file(true) @@ -927,7 +793,8 @@ pub(crate) mod tests { metadata.insert("netType".to_string(), "external".to_string()); metadata.insert("version".to_string(), "2.0".to_string()); - let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))?; + let naming_service = + NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default())).unwrap(); let service_instance = ServiceInstance { ip: "127.0.0.1".to_string(), port: 9090, @@ -935,21 +802,18 @@ pub(crate) mod tests { ..Default::default() }; - let ret = naming_service.register_instance( - "test-ephemeral-service".to_string(), - None, - service_instance, - ); + let ret = naming_service + .register_instance("test-ephemeral-service".to_string(), None, service_instance) + .await; info!("response. {ret:?}"); - let ten_millis = time::Duration::from_secs(100); - thread::sleep(ten_millis); - Ok(()) + let ten_millis = time::Duration::from_secs(1); + tokio::time::sleep(ten_millis).await; } - #[test] + #[tokio::test] #[ignore] - fn test_persistent_register_service() -> Result<()> { + async fn test_persistent_register_service() { tracing_subscriber::fmt() .with_thread_names(true) .with_file(true) @@ -965,7 +829,8 @@ pub(crate) mod tests { metadata.insert("netType".to_string(), "external".to_string()); metadata.insert("version".to_string(), "2.0".to_string()); - let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))?; + let naming_service = + NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default())).unwrap(); let service_instance = ServiceInstance { ip: "127.0.0.1".to_string(), port: 8848, @@ -974,21 +839,22 @@ pub(crate) mod tests { ..Default::default() }; - let ret = naming_service.register_instance( - "test-persistent-service".to_string(), - None, - service_instance, - ); + let ret = naming_service + .register_instance( + "test-persistent-service".to_string(), + None, + service_instance, + ) + .await; info!("response. {ret:?}"); - let ten_millis = time::Duration::from_secs(100); - thread::sleep(ten_millis); - Ok(()) + let ten_millis = time::Duration::from_secs(1); + tokio::time::sleep(ten_millis).await; } - #[test] + #[tokio::test] #[ignore] - fn test_register_and_deregister_persistent_service() -> Result<()> { + async fn test_register_and_deregister_persistent_service() { tracing_subscriber::fmt() .with_thread_names(true) .with_file(true) @@ -1004,7 +870,8 @@ pub(crate) mod tests { metadata.insert("netType".to_string(), "external".to_string()); metadata.insert("version".to_string(), "2.0".to_string()); - let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))?; + let naming_service = + NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default())).unwrap(); let service_instance = ServiceInstance { ip: "127.0.0.1".to_string(), port: 8848, @@ -1013,31 +880,34 @@ pub(crate) mod tests { ..Default::default() }; - let ret = naming_service.register_instance( - "test-persistent-service".to_string(), - None, - service_instance.clone(), - ); + let ret = naming_service + .register_instance( + "test-persistent-service".to_string(), + None, + service_instance.clone(), + ) + .await; info!("response. {ret:?}"); let ten_millis = time::Duration::from_secs(30); - thread::sleep(ten_millis); + tokio::time::sleep(ten_millis).await; - let ret = naming_service.deregister_instance( - "test-persistent-service".to_string(), - Some(crate::api::constants::DEFAULT_GROUP.to_string()), - service_instance, - ); + let ret = naming_service + .deregister_instance( + "test-persistent-service".to_string(), + Some(crate::api::constants::DEFAULT_GROUP.to_string()), + service_instance, + ) + .await; info!("response. {ret:?}"); let ten_millis = time::Duration::from_secs(30); - thread::sleep(ten_millis); - Ok(()) + tokio::time::sleep(ten_millis).await; } - #[test] + #[tokio::test] #[ignore] - fn test_register_and_deregister_ephemeral_service() -> Result<()> { + async fn test_register_and_deregister_ephemeral_service() { tracing_subscriber::fmt() .with_thread_names(true) .with_file(true) @@ -1053,7 +923,8 @@ pub(crate) mod tests { metadata.insert("netType".to_string(), "external".to_string()); metadata.insert("version".to_string(), "2.0".to_string()); - let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))?; + let naming_service = + NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default())).unwrap(); let service_instance = ServiceInstance { ip: "127.0.0.1".to_string(), port: 9090, @@ -1061,31 +932,34 @@ pub(crate) mod tests { ..Default::default() }; - let ret = naming_service.register_instance( - "test-ephemeral-service".to_string(), - None, - service_instance.clone(), - ); + let ret = naming_service + .register_instance( + "test-ephemeral-service".to_string(), + None, + service_instance.clone(), + ) + .await; info!("response. {ret:?}"); let ten_millis = time::Duration::from_secs(30); - thread::sleep(ten_millis); + tokio::time::sleep(ten_millis).await; - let ret = naming_service.deregister_instance( - "test-ephemeral-service".to_string(), - Some(crate::api::constants::DEFAULT_GROUP.to_string()), - service_instance, - ); + let ret = naming_service + .deregister_instance( + "test-ephemeral-service".to_string(), + Some(crate::api::constants::DEFAULT_GROUP.to_string()), + service_instance, + ) + .await; info!("response. {ret:?}"); let ten_millis = time::Duration::from_secs(30); - thread::sleep(ten_millis); - Ok(()) + tokio::time::sleep(ten_millis).await; } - #[test] + #[tokio::test] #[ignore] - fn test_batch_register_service() -> Result<()> { + async fn test_batch_register_service() { tracing_subscriber::fmt() .with_thread_names(true) .with_file(true) @@ -1101,7 +975,8 @@ pub(crate) mod tests { metadata.insert("netType".to_string(), "external".to_string()); metadata.insert("version".to_string(), "2.0".to_string()); - let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))?; + let naming_service = + NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default())).unwrap(); let service_instance1 = ServiceInstance { ip: "127.0.0.1".to_string(), port: 9090, @@ -1125,21 +1000,22 @@ pub(crate) mod tests { let instance_vec = vec![service_instance1, service_instance2, service_instance3]; - let ret = naming_service.batch_register_instance( - "test-service".to_string(), - Some(crate::api::constants::DEFAULT_GROUP.to_string()), - instance_vec, - ); + let ret = naming_service + .batch_register_instance( + "test-service".to_string(), + Some(crate::api::constants::DEFAULT_GROUP.to_string()), + instance_vec, + ) + .await; info!("response. {ret:?}"); - let ten_millis = time::Duration::from_secs(300); - thread::sleep(ten_millis); - Ok(()) + let ten_millis = time::Duration::from_secs(3); + tokio::time::sleep(ten_millis).await; } - #[test] + #[tokio::test] #[ignore] - fn test_batch_register_service_and_query_all_instances() -> Result<()> { + async fn test_batch_register_service_and_query_all_instances() { tracing_subscriber::fmt() .with_thread_names(true) .with_file(true) @@ -1155,7 +1031,8 @@ pub(crate) mod tests { metadata.insert("netType".to_string(), "external".to_string()); metadata.insert("version".to_string(), "2.0".to_string()); - let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))?; + let naming_service = + NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default())).unwrap(); let service_instance1 = ServiceInstance { ip: "127.0.0.1".to_string(), port: 9090, @@ -1178,31 +1055,34 @@ pub(crate) mod tests { }; let instance_vec = vec![service_instance1, service_instance2, service_instance3]; - let ret = naming_service.batch_register_instance( - "test-service".to_string(), - Some(crate::api::constants::DEFAULT_GROUP.to_string()), - instance_vec, - ); + let ret = naming_service + .batch_register_instance( + "test-service".to_string(), + Some(crate::api::constants::DEFAULT_GROUP.to_string()), + instance_vec, + ) + .await; info!("response. {ret:?}"); let ten_millis = time::Duration::from_secs(10); - thread::sleep(ten_millis); + tokio::time::sleep(ten_millis).await; - let all_instances = naming_service.get_all_instances( - "test-service".to_string(), - Some(crate::api::constants::DEFAULT_GROUP.to_string()), - Vec::default(), - false, - ); + let all_instances = naming_service + .get_all_instances( + "test-service".to_string(), + Some(crate::api::constants::DEFAULT_GROUP.to_string()), + Vec::default(), + false, + ) + .await; info!("response. {all_instances:?}"); - thread::sleep(ten_millis); - Ok(()) + tokio::time::sleep(ten_millis).await; } - #[test] + #[tokio::test] #[ignore] - fn test_select_instance() -> Result<()> { + async fn test_select_instance() { tracing_subscriber::fmt() .with_thread_names(true) .with_file(true) @@ -1218,7 +1098,8 @@ pub(crate) mod tests { metadata.insert("netType".to_string(), "external".to_string()); metadata.insert("version".to_string(), "2.0".to_string()); - let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))?; + let naming_service = + NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default())).unwrap(); let service_instance1 = ServiceInstance { ip: "127.0.0.1".to_string(), port: 9090, @@ -1241,32 +1122,35 @@ pub(crate) mod tests { }; let instance_vec = vec![service_instance1, service_instance2, service_instance3]; - let ret = naming_service.batch_register_instance( - "test-service".to_string(), - Some(crate::api::constants::DEFAULT_GROUP.to_string()), - instance_vec, - ); + let ret = naming_service + .batch_register_instance( + "test-service".to_string(), + Some(crate::api::constants::DEFAULT_GROUP.to_string()), + instance_vec, + ) + .await; info!("response. {ret:?}"); let ten_millis = time::Duration::from_secs(10); - thread::sleep(ten_millis); + tokio::time::sleep(ten_millis).await; - let all_instances = naming_service.select_instances( - "test-service".to_string(), - Some(crate::api::constants::DEFAULT_GROUP.to_string()), - Vec::default(), - false, - true, - ); + let all_instances = naming_service + .select_instances( + "test-service".to_string(), + Some(crate::api::constants::DEFAULT_GROUP.to_string()), + Vec::default(), + false, + true, + ) + .await; info!("response. {all_instances:?}"); - thread::sleep(ten_millis); - Ok(()) + tokio::time::sleep(ten_millis).await; } - #[test] + #[tokio::test] #[ignore] - fn test_select_one_healthy_instance() -> Result<()> { + async fn test_select_one_healthy_instance() { tracing_subscriber::fmt() .with_thread_names(true) .with_file(true) @@ -1282,7 +1166,8 @@ pub(crate) mod tests { metadata.insert("netType".to_string(), "external".to_string()); metadata.insert("version".to_string(), "2.0".to_string()); - let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))?; + let naming_service = + NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default())).unwrap(); let service_instance1 = ServiceInstance { ip: "127.0.0.1".to_string(), port: 9090, @@ -1305,33 +1190,36 @@ pub(crate) mod tests { }; let instance_vec = vec![service_instance1, service_instance2, service_instance3]; - let ret = naming_service.batch_register_instance( - "test-service".to_string(), - Some(crate::api::constants::DEFAULT_GROUP.to_string()), - instance_vec, - ); + let ret = naming_service + .batch_register_instance( + "test-service".to_string(), + Some(crate::api::constants::DEFAULT_GROUP.to_string()), + instance_vec, + ) + .await; info!("response. {ret:?}"); let ten_millis = time::Duration::from_secs(10); - thread::sleep(ten_millis); + tokio::time::sleep(ten_millis).await; for _ in 0..3 { - let all_instances = naming_service.select_one_healthy_instance( - "test-service".to_string(), - Some(crate::api::constants::DEFAULT_GROUP.to_string()), - Vec::default(), - false, - ); + let all_instances = naming_service + .select_one_healthy_instance( + "test-service".to_string(), + Some(crate::api::constants::DEFAULT_GROUP.to_string()), + Vec::default(), + false, + ) + .await; info!("response. {all_instances:?}"); } - thread::sleep(ten_millis); - Ok(()) + tokio::time::sleep(ten_millis).await; } - #[test] + #[tokio::test] #[ignore] - fn test_get_service_list() -> Result<()> { + async fn test_get_service_list() { tracing_subscriber::fmt() .with_thread_names(true) .with_file(true) @@ -1347,7 +1235,8 @@ pub(crate) mod tests { metadata.insert("netType".to_string(), "external".to_string()); metadata.insert("version".to_string(), "2.0".to_string()); - let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))?; + let naming_service = + NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default())).unwrap(); let service_instance1 = ServiceInstance { ip: "127.0.0.1".to_string(), port: 9090, @@ -1370,21 +1259,22 @@ pub(crate) mod tests { }; let instance_vec = vec![service_instance1, service_instance2, service_instance3]; - let ret = naming_service.batch_register_instance( - "test-service".to_string(), - Some(crate::api::constants::DEFAULT_GROUP.to_string()), - instance_vec, - ); + let ret = naming_service + .batch_register_instance( + "test-service".to_string(), + Some(crate::api::constants::DEFAULT_GROUP.to_string()), + instance_vec, + ) + .await; info!("response. {ret:?}"); let ten_millis = time::Duration::from_secs(10); - thread::sleep(ten_millis); + tokio::time::sleep(ten_millis).await; - let service_list = naming_service.get_service_list(1, 50, None); + let service_list = naming_service.get_service_list(1, 50, None).await; info!("response. {service_list:?}"); - thread::sleep(ten_millis); - Ok(()) + tokio::time::sleep(ten_millis).await; } #[derive(Hash, PartialEq)] @@ -1396,9 +1286,9 @@ pub(crate) mod tests { } } - #[test] + #[tokio::test] #[ignore] - fn test_service_push() -> Result<()> { + async fn test_service_push() { tracing_subscriber::fmt() .with_thread_names(true) .with_file(true) @@ -1414,7 +1304,8 @@ pub(crate) mod tests { metadata.insert("netType".to_string(), "external".to_string()); metadata.insert("version".to_string(), "2.0".to_string()); - let naming_service = NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default()))?; + let naming_service = + NacosNamingService::new(props, Arc::new(NoopAuthPlugin::default())).unwrap(); let service_instance1 = ServiceInstance { ip: "127.0.0.1".to_string(), port: 9090, @@ -1437,25 +1328,28 @@ pub(crate) mod tests { }; let instance_vec = vec![service_instance1, service_instance2, service_instance3]; - let ret = naming_service.batch_register_instance( - "test-service".to_string(), - Some(crate::api::constants::DEFAULT_GROUP.to_string()), - instance_vec, - ); + let ret = naming_service + .batch_register_instance( + "test-service".to_string(), + Some(crate::api::constants::DEFAULT_GROUP.to_string()), + instance_vec, + ) + .await; info!("response. {ret:?}"); let listener = Arc::new(InstancesChangeEventListener); - let ret = naming_service.subscribe( - "test-service".to_string(), - Some(crate::api::constants::DEFAULT_GROUP.to_string()), - Vec::default(), - listener, - ); + let ret = naming_service + .subscribe( + "test-service".to_string(), + Some(crate::api::constants::DEFAULT_GROUP.to_string()), + Vec::default(), + listener, + ) + .await; info!("response. {ret:?}"); let ten_millis = time::Duration::from_secs(3000); - thread::sleep(ten_millis); - Ok(()) + tokio::time::sleep(ten_millis).await; } }