From f8922e104cffd83f8cce9152ed88615ea157569c Mon Sep 17 00:00:00 2001 From: Xin Luo <65529035+luoxiner@users.noreply.github.com> Date: Thu, 12 Sep 2024 22:45:05 +0800 Subject: [PATCH] feat: Support Aliyun ram AuthPlugin (#245) --- Cargo.toml | 6 + nacos-macro/src/message/request.rs | 37 +- src/api/config.rs | 5 + src/api/constants.rs | 6 + src/api/naming.rs | 5 + src/api/plugin/auth/auth_by_aliyun_ram.rs | 719 ++++++++++++++++++++ src/api/plugin/auth/auth_by_http.rs | 10 +- src/api/plugin/auth/mod.rs | 70 +- src/api/props.rs | 65 +- src/common/remote/grpc/layers/auth.rs | 185 ----- src/common/remote/grpc/layers/mod.rs | 1 - src/common/remote/grpc/message/mod.rs | 3 + src/common/remote/grpc/mod.rs | 1 - src/common/remote/grpc/nacos_grpc_client.rs | 41 +- src/config/worker.rs | 12 +- src/naming/mod.rs | 12 +- 16 files changed, 954 insertions(+), 224 deletions(-) create mode 100644 src/api/plugin/auth/auth_by_aliyun_ram.rs delete mode 100644 src/common/remote/grpc/layers/auth.rs delete mode 100644 src/common/remote/grpc/layers/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 0200ef0..1cb2ea4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ config = [] naming = [] tls = ["reqwest/default-tls"] auth-by-http = ["reqwest"] +auth-by-aliyun = ["ring", "base64", "chrono"] [dependencies] arc-swap = "1.7" @@ -64,6 +65,11 @@ rand = "0.8.5" # now only for feature="auth-by-http" reqwest = { version = "0.12", default-features = false, features = [], optional = true } +# only for aliyun-ram-auth +ring = { version = "0.17.8", default-features = false, optional = true } +base64 = { version = "0.22.1", default-features = false, optional = true } +chrono = { version = "0.4", features = ["now"] ,optional = true } + async-trait = "0.1" async-stream = "0.3.5" tonic = "0.12" diff --git a/nacos-macro/src/message/request.rs b/nacos-macro/src/message/request.rs index e9ba791..9381df3 100644 --- a/nacos-macro/src/message/request.rs +++ b/nacos-macro/src/message/request.rs @@ -24,6 +24,40 @@ pub(crate) fn grpc_request( } }; + let into_request_resource = match macro_args.module { + super::Module::Naming => { + quote! { + fn request_resource(&self) -> Option { + Some(crate::api::plugin::RequestResource { + request_type: "Naming".to_string(), + namespace: self.namespace.clone(), + group: self.group_name.clone(), + resource: self.service_name.clone() + }) + } + } + } + super::Module::Config => { + quote! { + fn request_resource(&self) -> Option { + Some(crate::api::plugin::RequestResource { + request_type: "Config".to_string(), + namespace: self.namespace.clone(), + group: self.group.clone(), + resource: self.data_id.clone() + }) + } + } + } + _ => { + quote! { + fn request_resource(&self) -> Option { + None + } + } + } + }; + // add derive GrpcRequestMessage let grpc_message_request = quote! { @@ -52,8 +86,9 @@ pub(crate) fn grpc_request( fn module(&self) -> &str { #module } - } + #into_request_resource + } }; // add field diff --git a/src/api/config.rs b/src/api/config.rs index a855876..0e356c5 100644 --- a/src/api/config.rs +++ b/src/api/config.rs @@ -226,6 +226,11 @@ impl ConfigServiceBuilder { self.with_auth_plugin(Arc::new(plugin::HttpLoginAuthPlugin::default())) } + #[cfg(feature = "auth-by-aliyun")] + pub fn enable_auth_plugin_aliyun(self) -> Self { + self.with_auth_plugin(Arc::new(plugin::AliyunRamAuthPlugin::default())) + } + /// Set [`plugin::AuthPlugin`] pub fn with_auth_plugin(mut self, auth_plugin: Arc) -> Self { self.auth_plugin = Some(auth_plugin); diff --git a/src/api/constants.rs b/src/api/constants.rs index 872a6bf..8965fcf 100644 --- a/src/api/constants.rs +++ b/src/api/constants.rs @@ -44,6 +44,12 @@ pub const ENV_NACOS_CLIENT_AUTH_USERNAME: &str = "NACOS_CLIENT_USERNAME"; pub const ENV_NACOS_CLIENT_AUTH_PASSWORD: &str = "NACOS_CLIENT_PASSWORD"; +pub const ENV_NACOS_CLIENT_AUTH_ACCESS_KEY: &str = "NACOS_CLIENT_ACCESS_KEY"; + +pub const ENV_NACOS_CLIENT_AUTH_ACCESS_SECRET: &str = "NACOS_CLIENT_SECRET_KEY"; + +pub const ENV_NACOS_CLIENT_SIGN_REGION_ID: &str = "NACOS_CLIENT_SIGN_REGION_ID"; + /// env `NACOS_CLIENT_NAMING_PUSH_EMPTY_PROTECTION`, default true pub const ENV_NACOS_CLIENT_NAMING_PUSH_EMPTY_PROTECTION: &str = "NACOS_CLIENT_NAMING_PUSH_EMPTY_PROTECTION"; diff --git a/src/api/naming.rs b/src/api/naming.rs index 9ec89fa..037d091 100644 --- a/src/api/naming.rs +++ b/src/api/naming.rs @@ -248,6 +248,11 @@ impl NamingServiceBuilder { self.with_auth_plugin(Arc::new(plugin::HttpLoginAuthPlugin::default())) } + #[cfg(feature = "auth-by-aliyun")] + pub fn enable_auth_plugin_aliyun(self) -> Self { + self.with_auth_plugin(Arc::new(plugin::AliyunRamAuthPlugin::default())) + } + /// Set [`plugin::AuthPlugin`] pub fn with_auth_plugin(mut self, auth_plugin: Arc) -> Self { self.auth_plugin = Some(auth_plugin); diff --git a/src/api/plugin/auth/auth_by_aliyun_ram.rs b/src/api/plugin/auth/auth_by_aliyun_ram.rs new file mode 100644 index 0000000..79b254b --- /dev/null +++ b/src/api/plugin/auth/auth_by_aliyun_ram.rs @@ -0,0 +1,719 @@ +use super::RequestResource; +use crate::api::plugin::{AuthContext, AuthPlugin, LoginIdentityContext}; +use arc_swap::ArcSwap; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +pub(crate) const ACCESS_KEY: &str = "accessKey"; +pub(crate) const ACCESS_SECRET: &str = "secretKey"; +pub(crate) const SIGN_REGION_ID: &str = "signature_region_id"; +pub const SIGNATURE_VERSION: &str = "signatureVersion"; +pub const GROUPED_NAME: &str = "groupedName"; +pub const ACCESS_KEY_HEADER: &str = "Spas-AccessKey"; +pub const V4: &str = "v4"; +pub const SIGNATURE_V4_METHOD: &str = "HmacSHA256"; +pub const SIGNATURE_V4_PRODUCE: &str = "mse-nacos"; +pub const PREFIX: &str = "aliyun_v4"; +pub const CONSTANT: &str = "aliyun_v4_request"; +pub const V4_SIGN_DATE_FORMATTER: &str = "%Y%m%d"; +pub const SIGNATURE_FILED: &str = "signature"; +pub const DATA_FILED: &str = "data"; +pub const AK_FILED: &str = "ak"; +pub const TIMESTAMP_HEADER: &str = "Timestamp"; +pub const SIGNATURE_HEADER: &str = "Spas-Signature"; +pub const GROUP_KEY: &str = "group"; +pub const TENANT_KEY: &str = "tenant"; +pub const SHA_ENCRYPT: &str = "HmacSHA1"; + +fn get_current_timestamp() -> u128 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() +} + +mod sign_utils { + use base64::Engine; + use ring::hmac::{self, Algorithm, HMAC_SHA1_FOR_LEGACY_USE_ONLY}; + + pub fn sign_with_alg(data: &[u8], key: &[u8], alg: Algorithm) -> Vec { + let key = hmac::Key::new(alg, key); + let tag = hmac::sign(&key, data); + tag.as_ref().to_vec() + } + + pub fn sign_to_base64_with_alg(data: &[u8], key: &[u8], alg: Algorithm) -> String { + base64::prelude::BASE64_STANDARD.encode(sign_with_alg(data, key, alg).as_slice()) + } + + pub fn sign_with_hmac_sha1(data: &str, key: &str) -> String { + sign_to_base64_with_alg( + data.as_bytes(), + key.as_bytes(), + HMAC_SHA1_FOR_LEGACY_USE_ONLY, + ) + } +} + +mod spas_adaptor { + use crate::api::plugin::auth::auth_by_aliyun_ram::{get_current_timestamp, sign_utils}; + use crate::api::plugin::{SIGNATURE_HEADER, TIMESTAMP_HEADER}; + use std::collections::HashMap; + + pub fn get_sign_header_for_resource( + resource: &str, + secret_key: &str, + ) -> HashMap { + let mut headers = HashMap::default(); + let timestamp = get_current_timestamp(); + headers.insert(TIMESTAMP_HEADER.to_string(), timestamp.to_string()); + if !secret_key.is_empty() { + let sign = if resource.is_empty() { + sign_utils::sign_with_hmac_sha1(×tamp.to_string(), secret_key) + } else { + sign_utils::sign_with_hmac_sha1( + format!("{}+{}", resource, timestamp).as_str(), + secret_key, + ) + }; + headers.insert(SIGNATURE_HEADER.to_string(), sign.to_string()); + } + headers + } +} + +mod calculate_v4_signing_key_util { + use crate::api::plugin::{PREFIX, V4_SIGN_DATE_FORMATTER}; + use chrono::Utc; + use ring::hmac::{self, Algorithm}; + + use super::{ + sign_utils::{self, sign_to_base64_with_alg}, + CONSTANT, SIGNATURE_V4_PRODUCE, + }; + + pub fn first_signing_key(secret: &str, date: &str, alg: Algorithm) -> Vec { + sign_utils::sign_with_alg( + date.as_bytes(), + format!("{}{}", PREFIX, secret).as_bytes(), + alg, + ) + } + + pub fn region_signing_key(secret: &str, date: &str, region: &str, alg: Algorithm) -> Vec { + let vec = first_signing_key(secret, date, alg); + sign_utils::sign_with_alg(region.as_bytes(), vec.as_slice(), alg) + } + + pub fn final_signing_key_string( + secret: &str, + date: &str, + region: &str, + product_code: &str, + alg: Algorithm, + ) -> String { + let second_sign_key = region_signing_key(secret, date, region, alg); + let third_signing_key = + sign_utils::sign_with_alg(product_code.as_bytes(), &second_sign_key, alg); + sign_to_base64_with_alg(CONSTANT.as_bytes(), &third_signing_key, alg) + } + + pub fn final_signing_key_string_with_default_info(secret: &str, region: &str) -> String { + let sign_date = Utc::now().format(V4_SIGN_DATE_FORMATTER).to_string(); + final_signing_key_string( + secret, + &sign_date, + region, + SIGNATURE_V4_PRODUCE, + hmac::HMAC_SHA256, + ) + } +} + +pub(crate) struct AliyunRamAuthPlugin { + access_key: ArcSwap>, + secret_key: ArcSwap>, + signature_region_id: ArcSwap>, + initialized: ArcSwap, +} + +impl AliyunRamAuthPlugin { + pub(crate) fn default() -> Self { + Self { + access_key: ArcSwap::from_pointee(None), + secret_key: ArcSwap::from_pointee(None), + signature_region_id: ArcSwap::from_pointee(None), + initialized: ArcSwap::from_pointee(false), + } + } + + fn get_login_identify_for_naming(&self, resource: RequestResource) -> LoginIdentityContext { + Self::get_sign_data(&resource) + .map(|sign_data| { + let mut res = LoginIdentityContext::default(); + let mut signature_key = self.secret_key.load().as_ref().as_ref().unwrap().clone(); + if let Some(region_id) = self.signature_region_id.load().as_ref() { + signature_key = + calculate_v4_signing_key_util::final_signing_key_string_with_default_info( + self.secret_key.load().as_ref().as_ref().unwrap(), + region_id, + ); + res = res.add_context(SIGNATURE_VERSION, V4); + } + let signature = sign_utils::sign_with_hmac_sha1(&sign_data, &signature_key); + res = res.add_context(SIGNATURE_FILED, signature); + res = res.add_context(DATA_FILED, sign_data); + res = res.add_context(AK_FILED, self.access_key.load().as_ref().as_ref().unwrap()); + res + }) + .unwrap_or_default() + } + + fn get_sign_data(resource: &RequestResource) -> Option { + Self::get_grouped_service_name(resource).map(|grouped_service_name| { + let timestamp = get_current_timestamp(); + if grouped_service_name.is_empty() { + format!("{}", timestamp) + } else { + format!("{}@@{}", timestamp, grouped_service_name) + } + }) + } + + fn get_grouped_service_name(resource: &RequestResource) -> Option { + resource.resource.as_ref().map(|service_name| { + if service_name.contains("@@") || resource.group.is_none() { + service_name.clone() + } else { + format!("{}@@{}", resource.group.as_ref().unwrap(), service_name) + } + }) + } + + fn get_login_identify_for_config(&self, resource: RequestResource) -> LoginIdentityContext { + let mut res = LoginIdentityContext::default(); + let mut signature_key = self.secret_key.load().as_ref().as_ref().unwrap().clone(); + if let Some(region_id) = self.signature_region_id.load().as_ref().as_ref() { + signature_key = + calculate_v4_signing_key_util::final_signing_key_string_with_default_info( + self.secret_key.load().as_ref().as_ref().unwrap(), + region_id, + ); + res = res.add_context(SIGNATURE_VERSION, V4); + } + res = res.add_context( + ACCESS_KEY_HEADER, + self.access_key.load().as_ref().as_ref().unwrap(), + ); + res = res.add_contexts(spas_adaptor::get_sign_header_for_resource( + Self::get_resource_for_config(&resource).as_str(), + signature_key.as_str(), + )); + res + } + + fn get_resource_for_config(resource: &RequestResource) -> String { + let namespace = resource + .namespace + .as_ref() + .filter(|value| !value.is_empty()); + let group = resource.group.as_ref().filter(|value| !value.is_empty()); + if namespace.is_some() && group.is_some() { + format!("{}+{}", namespace.unwrap(), group.unwrap()) + } else if let Some(g) = group { + g.to_string() + } else if let Some(n) = namespace { + n.to_string() + } else { + String::from("") + } + } +} + +#[async_trait::async_trait] +impl AuthPlugin for AliyunRamAuthPlugin { + /// no need to login + async fn login(&self, _: Vec, auth_context: AuthContext) { + if *self.initialized.load().as_ref() { + return; + } + + let ak = auth_context + .params + .get(ACCESS_KEY) + .expect("Init aliyun ram auth plugin error, missing accessKey") + .to_owned(); + let sk = auth_context + .params + .get(ACCESS_SECRET) + .expect("Init aliyun ram auth plugin error, missing secretKey") + .to_owned(); + tracing::info!("Initialize using ak: {}", ak); + self.access_key.store(Arc::new(Some(ak))); + self.secret_key.store(Arc::new(Some(sk))); + if let Some(signature_region_id) = auth_context.params.get(SIGN_REGION_ID) { + self.signature_region_id + .store(Arc::new(Some(signature_region_id.to_string()))); + tracing::info!( + "Initializing using signature region: {}", + signature_region_id + ); + } + self.initialized.store(Arc::new(true)); + } + + /// Get the [`LoginIdentityContext`]. + fn get_login_identity(&self, resource: RequestResource) -> LoginIdentityContext { + tracing::trace!("signature to resource {:?}", resource); + match resource.request_type.as_str() { + "Naming" => self.get_login_identify_for_naming(resource), + "Config" => self.get_login_identify_for_config(resource), + _ => LoginIdentityContext::default(), + } + } +} + +#[cfg(test)] +mod test { + use crate::api::config::{ConfigChangeListener, ConfigResponse, ConfigService}; + use crate::api::naming::{ + NamingChangeEvent, NamingEventListener, NamingService, ServiceInstance, + }; + use crate::api::plugin::auth::auth_by_aliyun_ram::{ + calculate_v4_signing_key_util, sign_utils, spas_adaptor, + }; + use crate::api::plugin::{ + AliyunRamAuthPlugin, AuthContext, AuthPlugin, RequestResource, ACCESS_KEY, + ACCESS_KEY_HEADER, ACCESS_SECRET, AK_FILED, DATA_FILED, SIGNATURE_FILED, SIGNATURE_HEADER, + SIGNATURE_VERSION, SIGN_REGION_ID, TIMESTAMP_HEADER, + }; + use crate::api::props::ClientProps; + use crate::config::NacosConfigService; + use crate::naming::NacosNamingService; + use arc_swap::ArcSwap; + use std::sync::Arc; + use std::time::Duration; + use tokio::time::sleep; + + #[test] + fn test_sign_with_hmac_sha1() { + let sign_data = sign_utils::sign_with_hmac_sha1("test", "test"); + // DJRRXBXlCVuKh6ULoN87847QX+Y= is signed data generated by + // com.alibaba.nacos.client.auth.ram.utils.SignUtil#sign + // method in nacos java sdk + assert_eq!(sign_data, "DJRRXBXlCVuKh6ULoN87847QX+Y="); + } + + #[test] + fn test_get_sign_header_for_resource() { + let mut sign_headers = spas_adaptor::get_sign_header_for_resource("test", ""); + assert_eq!(sign_headers.len(), 1); + assert!(sign_headers.contains_key(TIMESTAMP_HEADER)); + + sign_headers = spas_adaptor::get_sign_header_for_resource("", "test"); + assert_eq!(sign_headers.len(), 2); + let timestamp = sign_headers.get(TIMESTAMP_HEADER).unwrap(); + let sign_data = sign_headers.get(SIGNATURE_HEADER).unwrap(); + let expected_sign_data = sign_utils::sign_with_hmac_sha1(timestamp, "test"); + assert_eq!(&expected_sign_data, sign_data); + + sign_headers = spas_adaptor::get_sign_header_for_resource("test", "test"); + assert_eq!(sign_headers.len(), 2); + let timestamp = sign_headers.get(TIMESTAMP_HEADER).unwrap(); + let sign_data = sign_headers.get(SIGNATURE_HEADER).unwrap(); + let expected_sign_data = + sign_utils::sign_with_hmac_sha1(format!("{}+{}", "test", timestamp).as_str(), "test"); + assert_eq!(&expected_sign_data, sign_data); + } + + #[tokio::test] + async fn test_aliyun_ram_plugin_login() { + let aliyun_ram_auth_plugin = AliyunRamAuthPlugin::default(); + let mut context = AuthContext::default(); + context = context.add_param(ACCESS_KEY, "test"); + context = context.add_param(ACCESS_SECRET, "test"); + context = context.add_param(SIGN_REGION_ID, "cn-hangzhou"); + aliyun_ram_auth_plugin.login(Vec::new(), context).await; + assert!(*aliyun_ram_auth_plugin.initialized.load().as_ref()); + assert_eq!( + "test", + aliyun_ram_auth_plugin + .access_key + .load() + .as_ref() + .as_ref() + .unwrap() + ); + assert_eq!( + "test", + aliyun_ram_auth_plugin + .secret_key + .load() + .as_ref() + .as_ref() + .unwrap() + ); + assert_eq!( + "cn-hangzhou", + aliyun_ram_auth_plugin + .signature_region_id + .load() + .as_ref() + .as_ref() + .unwrap() + ); + } + + #[test] + fn test_get_resource_for_config() { + let mut resource = RequestResource::default(); + resource.group = Some("test-group".to_owned()); + let resource = AliyunRamAuthPlugin::get_resource_for_config(&resource); + assert_eq!("test-group", resource); + + let mut resource = RequestResource::default(); + resource.namespace = Some("test-ns".to_owned()); + let resource = AliyunRamAuthPlugin::get_resource_for_config(&resource); + assert_eq!("test-ns", resource); + + let mut resource = RequestResource::default(); + resource.namespace = Some("test-ns".to_owned()); + resource.group = Some("test-group".to_owned()); + let resource = AliyunRamAuthPlugin::get_resource_for_config(&resource); + assert_eq!("test-ns+test-group", resource); + + let resource = RequestResource::default(); + let resource = AliyunRamAuthPlugin::get_resource_for_config(&resource); + assert_eq!("", resource) + } + + #[test] + fn test_get_grouped_service() { + let mut resource = RequestResource::default(); + resource.resource = Some("test@@test".to_owned()); + let grouped_service_name = AliyunRamAuthPlugin::get_grouped_service_name(&resource); + assert_eq!("test@@test", grouped_service_name.unwrap()); + + let mut resource = RequestResource::default(); + resource.resource = Some("test".to_owned()); + let grouped_service_name = AliyunRamAuthPlugin::get_grouped_service_name(&resource); + assert_eq!("test", grouped_service_name.unwrap()); + + let mut resource = RequestResource::default(); + resource.resource = Some("test".to_owned()); + resource.group = Some("test".to_owned()); + let grouped_service_name = AliyunRamAuthPlugin::get_grouped_service_name(&resource); + assert_eq!("test@@test", grouped_service_name.unwrap()); + } + + #[test] + fn test_get_sign_data() { + let mut request = RequestResource::default(); + request.resource = Some("test".to_owned()); + let sign_data = AliyunRamAuthPlugin::get_sign_data(&request).unwrap(); + assert!(sign_data.contains("@@test")) + } + + #[tokio::test] + async fn test_get_login_identity_for_naming() { + let aliyun_ram_auth_plugin = AliyunRamAuthPlugin::default(); + let mut context = AuthContext::default(); + context = context.add_param(ACCESS_KEY, "test-ak"); + context = context.add_param(ACCESS_SECRET, "test-sk"); + context = context.add_param(SIGN_REGION_ID, "cn-hangzhou"); + aliyun_ram_auth_plugin.login(Vec::new(), context).await; + + let mut resource = RequestResource::default(); + resource.request_type = "Naming".to_owned(); + resource.namespace = Some("".to_owned()); + resource.group = Some("test-group".to_owned()); + resource.resource = Some("test-resource".to_owned()); + let identity_context = aliyun_ram_auth_plugin.get_login_identify_for_naming(resource); + assert_eq!(4, identity_context.contexts.len()); + assert_eq!( + "v4", + identity_context.contexts.get(SIGNATURE_VERSION).unwrap() + ); + assert_eq!("test-ak", identity_context.contexts.get(AK_FILED).unwrap()); + let data = identity_context.contexts.get(DATA_FILED).unwrap(); + let sign_data = identity_context + .contexts + .get(SIGNATURE_FILED) + .unwrap() + .clone(); + + let signature_key = + calculate_v4_signing_key_util::final_signing_key_string_with_default_info( + "test-sk", + "cn-hangzhou", + ); + assert_eq!( + sign_utils::sign_with_hmac_sha1(&data, &signature_key), + sign_data + ); + } + + #[tokio::test] + async fn test_get_login_identity_for_config() { + let aliyun_ram_auth_plugin = AliyunRamAuthPlugin::default(); + let mut context = AuthContext::default(); + context = context.add_param(ACCESS_KEY, "test-ak"); + context = context.add_param(ACCESS_SECRET, "test-sk"); + context = context.add_param(SIGN_REGION_ID, "cn-hangzhou"); + aliyun_ram_auth_plugin.login(Vec::new(), context).await; + + let mut resource = RequestResource::default(); + resource.request_type = "Config".to_owned(); + resource.namespace = Some("".to_owned()); + resource.group = Some("test-group".to_owned()); + resource.resource = Some("test-resource".to_owned()); + let identity_context = aliyun_ram_auth_plugin.get_login_identify_for_config(resource); + assert_eq!(4, identity_context.contexts.len()); + assert_eq!( + "v4", + identity_context.contexts.get(SIGNATURE_VERSION).unwrap() + ); + assert_eq!( + "test-ak", + identity_context.contexts.get(ACCESS_KEY_HEADER).unwrap() + ); + + let timestamp = identity_context.contexts.get(TIMESTAMP_HEADER).unwrap(); + let sign_data = identity_context.contexts.get(SIGNATURE_HEADER).unwrap(); + + let signature_key = + calculate_v4_signing_key_util::final_signing_key_string_with_default_info( + "test-sk", + "cn-hangzhou", + ); + assert_eq!( + &sign_utils::sign_with_hmac_sha1( + format!("{}+{}", "test-group", timestamp).as_str(), + &signature_key + ), + sign_data + ); + } + + #[test] + fn test_final_signing_key_string_with_default_info() { + let sign_data = calculate_v4_signing_key_util::final_signing_key_string_with_default_info( + "test", + "cn-hangzhou", + ); + assert_eq!("lHVX6NEPs3+EKxO3g2iklCwbseQnAWz5nLce9Lm0Po4=", sign_data) + } + + struct TestNamingEventListener { + instance_now: ArcSwap>, + } + + impl NamingEventListener for TestNamingEventListener { + fn event(&self, event: Arc) { + self.instance_now + .store(Arc::new(event.instances.as_ref().unwrap().clone())); + } + } + + impl TestNamingEventListener { + fn default() -> Self { + Self { + instance_now: ArcSwap::new(Arc::new(Vec::new())), + } + } + } + + fn init_ram_client_prop_from_env() -> ClientProps { + ClientProps::new() + .namespace(std::env::var("NAMESPACE").unwrap_or("".to_string())) + .server_addr(std::env::var("SERVER_ADDR").unwrap()) + .auth_ext(ACCESS_KEY, std::env::var("AK").unwrap()) + .auth_ext(ACCESS_SECRET, std::env::var("SK").unwrap()) + } + + fn make_service_instance(ip: &str, port: i32) -> ServiceInstance { + let mut instance = ServiceInstance::default(); + instance.ip = ip.to_string(); + instance.port = port; + instance + } + + #[ignore] + #[tokio::test] + async fn test_naming_ram() { + let props = init_ram_client_prop_from_env(); + let naming_client = + NacosNamingService::new(props, Arc::new(AliyunRamAuthPlugin::default())).unwrap(); + naming_client + .register_instance( + "test".to_owned(), + Some("test".to_owned()), + make_service_instance("1.1.1.1", 8080), + ) + .await + .unwrap(); + let instances = naming_client + .get_all_instances( + "test".to_string(), + Some("test".to_string()), + Vec::new(), + false, + ) + .await + .unwrap(); + assert_eq!(1, instances.len()); + assert_eq!("1.1.1.1", instances[0].ip); + assert_eq!(8080, instances[0].port); + + naming_client + .deregister_instance( + "test".to_owned(), + Some("test".to_owned()), + make_service_instance("1.1.1.1", 8080), + ) + .await + .unwrap(); + sleep(Duration::from_secs(5)).await; + let instance = naming_client + .get_all_instances( + "test".to_string(), + Some("test".to_string()), + Vec::new(), + false, + ) + .await + .unwrap(); + assert_eq!(0, instance.len()); + + let listener = Arc::new(TestNamingEventListener::default()); + naming_client + .subscribe("test".to_string(), None, Vec::new(), listener.clone()) + .await + .unwrap(); + + naming_client + .register_instance( + "test".to_owned(), + None, + make_service_instance("2.2.2.2", 80), + ) + .await + .unwrap(); + sleep(Duration::from_secs(5)).await; + assert_eq!(1, listener.instance_now.load().len()); + + naming_client + .unsubscribe("test".to_string(), None, Vec::new(), listener.clone()) + .await + .unwrap(); + naming_client + .batch_register_instance( + "test".to_owned(), + Some("test".to_owned()), + vec![ + make_service_instance("2.2.2.2", 8080), + make_service_instance("3.3.3.3", 8080), + ], + ) + .await + .unwrap(); + + sleep(Duration::from_secs(5)).await; + let instance = naming_client + .get_all_instances( + "test".to_string(), + Some("test".to_string()), + Vec::new(), + false, + ) + .await + .unwrap(); + assert_eq!(2, instance.len()); + } + + struct TestConfigListener { + current_content: ArcSwap, + } + + impl ConfigChangeListener for TestConfigListener { + fn notify(&self, config_resp: ConfigResponse) { + self.current_content + .store(Arc::new(config_resp.content().to_string())) + } + } + + impl Default for TestConfigListener { + fn default() -> Self { + Self { + current_content: Default::default(), + } + } + } + + #[ignore] + #[tokio::test] + async fn test_config_ram() { + let props = init_ram_client_prop_from_env(); + let config_client = + NacosConfigService::new(props, Arc::new(AliyunRamAuthPlugin::default()), Vec::new()) + .unwrap(); + config_client + .publish_config( + "test".to_string(), + "test".to_string(), + "test=test".to_string(), + Some("properties".to_string()), + ) + .await + .unwrap(); + sleep(Duration::from_secs(5)).await; + + let response = config_client + .get_config("test".to_string(), "test".to_string()) + .await + .unwrap(); + assert_eq!("test=test", response.content()); + assert_eq!("properties", response.content_type()); + assert_eq!("test", response.group()); + assert_eq!("test", response.data_id()); + + config_client + .remove_config("test".to_string(), "test".to_string()) + .await + .unwrap(); + sleep(Duration::from_secs(5)).await; + let result = config_client + .get_config("test".to_string(), "test".to_string()) + .await; + assert!(result.is_err()); + assert_eq!( + "config not found: error_code=300,message=config data not exist", + result.err().unwrap().to_string() + ); + + let listener = Arc::new(TestConfigListener::default()); + config_client + .add_listener("test-1".to_string(), "test".to_string(), listener.clone()) + .await + .unwrap(); + config_client + .publish_config( + "test-1".to_string(), + "test".to_string(), + "test=test".to_string(), + Some("properties".to_string()), + ) + .await + .unwrap(); + sleep(Duration::from_secs(5)).await; + let result = config_client + .get_config("test-1".to_string(), "test".to_string()) + .await; + assert_eq!(result.unwrap().content(), "test=test"); + + config_client + .remove_listener("test-1".to_string(), "test".to_string(), listener.clone()) + .await + .unwrap(); + } +} diff --git a/src/api/plugin/auth/auth_by_http.rs b/src/api/plugin/auth/auth_by_http.rs index 4048cca..8a3a90f 100644 --- a/src/api/plugin/auth/auth_by_http.rs +++ b/src/api/plugin/auth/auth_by_http.rs @@ -6,6 +6,8 @@ use tokio::time::{Duration, Instant}; use crate::api::plugin::{AuthContext, AuthPlugin, LoginIdentityContext}; +use super::RequestResource; + pub const USERNAME: &str = "username"; pub const PASSWORD: &str = "password"; @@ -99,7 +101,7 @@ impl AuthPlugin for HttpLoginAuthPlugin { } } - fn get_login_identity(&self) -> LoginIdentityContext { + fn get_login_identity(&self, _: RequestResource) -> LoginIdentityContext { self.login_identity.load().deref().deref().to_owned() } } @@ -113,7 +115,7 @@ struct HttpLoginResponse { #[cfg(test)] mod tests { - use crate::api::plugin::{AuthContext, AuthPlugin, HttpLoginAuthPlugin}; + use crate::api::plugin::{AuthContext, AuthPlugin, HttpLoginAuthPlugin, RequestResource}; #[tokio::test] #[ignore] @@ -132,13 +134,13 @@ mod tests { http_auth_plugin .login(server_list.clone(), auth_context.clone()) .await; - let login_identity_1 = http_auth_plugin.get_login_identity(); + let login_identity_1 = http_auth_plugin.get_login_identity(RequestResource::default()); assert_eq!(login_identity_1.contexts.len(), 1); tokio::time::sleep(tokio::time::Duration::from_millis(111)).await; http_auth_plugin.login(server_list, auth_context).await; - let login_identity_2 = http_auth_plugin.get_login_identity(); + let login_identity_2 = http_auth_plugin.get_login_identity(RequestResource::default()); assert_eq!(login_identity_1.contexts, login_identity_2.contexts) } } diff --git a/src/api/plugin/auth/mod.rs b/src/api/plugin/auth/mod.rs index 798ae65..b9173bf 100644 --- a/src/api/plugin/auth/mod.rs +++ b/src/api/plugin/auth/mod.rs @@ -3,7 +3,16 @@ mod auth_by_http; #[cfg(feature = "auth-by-http")] pub use auth_by_http::*; -use std::collections::HashMap; +#[cfg(feature = "auth-by-aliyun")] +mod auth_by_aliyun_ram; +#[cfg(feature = "auth-by-aliyun")] +pub use auth_by_aliyun_ram::*; + +use std::{collections::HashMap, sync::Arc, thread, time::Duration}; +use tokio::{sync::oneshot, time::sleep}; +use tracing::{debug, debug_span, info, Instrument}; + +use crate::common::executor; /// Auth plugin in Client. /// This api may change in the future, please forgive me if you customize the implementation. @@ -13,7 +22,7 @@ pub trait AuthPlugin: Send + Sync { async fn login(&self, server_list: Vec, auth_context: AuthContext); /// Get the [`LoginIdentityContext`]. - fn get_login_identity(&self) -> LoginIdentityContext; + fn get_login_identity(&self, resource: RequestResource) -> LoginIdentityContext; } #[derive(Clone, Default)] @@ -67,8 +76,63 @@ impl AuthPlugin for NoopAuthPlugin { // noop } - fn get_login_identity(&self) -> LoginIdentityContext { + fn get_login_identity(&self, _: RequestResource) -> LoginIdentityContext { // noop self.login_identity.clone() } } + +pub fn init_auth_plugin( + auth_plugin: Arc, + server_list: Vec, + auth_params: HashMap, + id: String, +) { + let (tx, rx) = oneshot::channel::<()>(); + executor::spawn( + async move { + info!("init auth task"); + let auth_context = AuthContext::default().add_params(auth_params); + auth_plugin + .login(server_list.clone(), auth_context.clone()) + .in_current_span() + .await; + info!("init auth finish"); + let _ = tx.send(()); + + info!("auth plugin task start."); + loop { + auth_plugin + .login(server_list.clone(), auth_context.clone()) + .in_current_span() + .await; + debug!("auth_plugin schedule at fixed delay"); + sleep(Duration::from_secs(30)).await; + } + } + .instrument(debug_span!("auth_task", id = id)), + ); + + let wait_ret = thread::spawn(move || rx.blocking_recv()); + + let _ = wait_ret.join().unwrap(); +} + +#[derive(Debug)] +pub struct RequestResource { + pub request_type: String, + pub namespace: Option, + pub group: Option, + pub resource: Option, +} + +impl RequestResource { + fn default() -> Self { + Self { + request_type: "".to_string(), + namespace: None, + group: None, + resource: None, + } + } +} diff --git a/src/api/props.rs b/src/api/props.rs index ef386fe..a575559 100644 --- a/src/api/props.rs +++ b/src/api/props.rs @@ -97,16 +97,41 @@ impl ClientProps { pub(crate) fn get_auth_context(&self) -> HashMap { let mut auth_context = self.auth_context.clone(); if self.env_first { - if let Some(u) = get_value_option(ENV_NACOS_CLIENT_AUTH_USERNAME) { - auth_context.insert(crate::api::plugin::USERNAME.into(), u); - } - if let Some(p) = get_value_option(ENV_NACOS_CLIENT_AUTH_PASSWORD) { - auth_context.insert(crate::api::plugin::PASSWORD.into(), p); - } + self.get_http_auth_context(&mut auth_context); + self.get_aliyun_auth_context(&mut auth_context); } auth_context } + #[cfg(feature = "auth-by-http")] + fn get_http_auth_context(&self, context: &mut HashMap) { + if let Some(u) = get_value_option(ENV_NACOS_CLIENT_AUTH_USERNAME) { + context.insert(crate::api::plugin::USERNAME.into(), u); + } + if let Some(p) = get_value_option(ENV_NACOS_CLIENT_AUTH_PASSWORD) { + context.insert(crate::api::plugin::PASSWORD.into(), p); + } + } + + #[cfg(not(feature = "auth-by-http"))] + fn get_http_auth_context(&self, context: &mut HashMap) {} + + #[cfg(feature = "auth-by-aliyun")] + fn get_aliyun_auth_context(&self, context: &mut HashMap) { + if let Some(ak) = get_value_option(ENV_NACOS_CLIENT_AUTH_ACCESS_KEY) { + context.insert(crate::api::plugin::ACCESS_KEY.into(), ak); + } + if let Some(sk) = get_value_option(ENV_NACOS_CLIENT_AUTH_ACCESS_SECRET) { + context.insert(crate::api::plugin::ACCESS_SECRET.into(), sk); + } + if let Some(sign_region_id) = get_value_option(ENV_NACOS_CLIENT_SIGN_REGION_ID) { + context.insert(crate::api::plugin::SIGN_REGION_ID.into(), sign_region_id); + } + } + + #[cfg(not(feature = "auth-by-aliyun"))] + fn get_aliyun_auth_context(&self, context: &mut HashMap) {} + pub(crate) fn get_server_list(&self) -> crate::api::error::Result> { let server_addr = self.get_server_addr(); if server_addr.trim().is_empty() { @@ -223,6 +248,34 @@ impl ClientProps { self } + /// Add access-key + #[cfg(feature = "auth-by-aliyun")] + pub fn auth_access_key(mut self, access_key: impl Into) -> Self { + self.auth_context + .insert(crate::api::plugin::ACCESS_KEY.into(), access_key.into()); + self + } + + /// Add access-secret + #[cfg(feature = "auth-by-aliyun")] + pub fn auth_access_secret(mut self, access_secret: impl Into) -> Self { + self.auth_context.insert( + crate::api::plugin::ACCESS_SECRET.into(), + access_secret.into(), + ); + self + } + + /// Add signature region id + #[cfg(feature = "auth-by-aliyun")] + pub fn auth_signature_region_id(mut self, signature_region_id: impl Into) -> Self { + self.auth_context.insert( + crate::api::plugin::SIGN_REGION_ID.into(), + signature_region_id.into(), + ); + self + } + /// Add auth ext params. pub fn auth_ext(mut self, key: impl Into, val: impl Into) -> Self { self.auth_context.insert(key.into(), val.into()); diff --git a/src/common/remote/grpc/layers/auth.rs b/src/common/remote/grpc/layers/auth.rs deleted file mode 100644 index 1a2b92a..0000000 --- a/src/common/remote/grpc/layers/auth.rs +++ /dev/null @@ -1,185 +0,0 @@ -use std::{collections::HashMap, pin::Pin, sync::Arc, task::Poll, thread, time::Duration}; - -use async_stream::stream; -use futures::Future; -use tokio::{sync::oneshot, time::sleep}; -use tower::{Layer, Service}; -use tracing::{debug, debug_span, info, Instrument}; - -use crate::{ - api::{ - error::Error, - plugin::{AuthContext, AuthPlugin}, - }, - common::{ - executor, - remote::grpc::nacos_grpc_service::{ - DynamicBiStreamingCallService, DynamicUnaryCallService, GrpcStream, - }, - }, - nacos_proto::v2::{Metadata, Payload}, -}; - -pub(crate) struct AuthLayer { - auth_plugin: Arc, -} - -impl AuthLayer { - pub(crate) fn new( - auth_plugin: Arc, - server_list: Vec, - auth_params: HashMap, - id: String, - ) -> Self { - Self::init(auth_plugin.clone(), server_list, auth_params, id); - Self { auth_plugin } - } - - fn init( - auth_plugin: Arc, - server_list: Vec, - auth_params: HashMap, - id: String, - ) { - let (tx, rx) = oneshot::channel::<()>(); - executor::spawn( - async move { - info!("init auth task"); - let auth_context = AuthContext::default().add_params(auth_params); - auth_plugin - .login(server_list.clone(), auth_context.clone()) - .in_current_span() - .await; - info!("init auth finish"); - let _ = tx.send(()); - - info!("auth plugin task start."); - loop { - auth_plugin - .login(server_list.clone(), auth_context.clone()) - .in_current_span() - .await; - debug!("auth_plugin schedule at fixed delay"); - sleep(Duration::from_secs(30)).await; - } - } - .instrument(debug_span!("auth_task", id = id)), - ); - - let wait_ret = thread::spawn(move || rx.blocking_recv()); - - let _ = wait_ret.join().unwrap(); - } -} - -struct AuthUnaryCallService { - auth_plugin: Arc, - inner: DynamicUnaryCallService, -} - -impl Service for AuthUnaryCallService { - type Response = Payload; - - type Error = Error; - - type Future = Pin> + Send + 'static>>; - - fn poll_ready( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, mut req: Payload) -> Self::Future { - let contexts = self.auth_plugin.get_login_identity().contexts; - - let metadata = req.metadata.take(); - let metadata = if let Some(mut metadata) = metadata { - metadata.headers.extend(contexts); - Some(metadata) - } else { - let metadata = Metadata { - headers: contexts, - ..Default::default() - }; - Some(metadata) - }; - req.metadata = metadata; - - self.inner.call(req) - } -} - -struct AuthBiStreamingCallService { - auth_plugin: Arc, - inner: DynamicBiStreamingCallService, -} - -impl Service> for AuthBiStreamingCallService { - type Response = GrpcStream>; - - type Error = Error; - - type Future = Pin< - Box< - dyn Future>, Error>> + Send + 'static, - >, - >; - - fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, req: GrpcStream) -> Self::Future { - let auth_plugin = self.auth_plugin.clone(); - let stream = stream! { - for await mut value in req { - let contexts = auth_plugin.get_login_identity().contexts; - - let metadata = value.metadata.take(); - let metadata = if let Some(mut metadata) = metadata { - metadata.headers.extend(contexts); - Some(metadata) - } else { - let metadata = Metadata{ - headers: contexts, - ..Default::default() - }; - Some(metadata) - }; - value.metadata = metadata; - - yield value; - } - }; - - let stream = GrpcStream::new(Box::pin(stream)); - - self.inner.call(stream) - } -} - -impl Layer for AuthLayer { - type Service = DynamicUnaryCallService; - - fn layer(&self, inner: DynamicUnaryCallService) -> Self::Service { - let service = AuthUnaryCallService { - auth_plugin: self.auth_plugin.clone(), - inner, - }; - Box::new(service) - } -} - -impl Layer for AuthLayer { - type Service = DynamicBiStreamingCallService; - - fn layer(&self, inner: DynamicBiStreamingCallService) -> Self::Service { - let service = AuthBiStreamingCallService { - auth_plugin: self.auth_plugin.clone(), - inner, - }; - Box::new(service) - } -} diff --git a/src/common/remote/grpc/layers/mod.rs b/src/common/remote/grpc/layers/mod.rs deleted file mode 100644 index 6f8ff8b..0000000 --- a/src/common/remote/grpc/layers/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub(crate) mod auth; diff --git a/src/common/remote/grpc/message/mod.rs b/src/common/remote/grpc/message/mod.rs index d5e455a..6faa88f 100644 --- a/src/common/remote/grpc/message/mod.rs +++ b/src/common/remote/grpc/message/mod.rs @@ -8,6 +8,7 @@ use crate::api::error::Error::ErrResponse; use crate::api::error::Error::ErrResult; use crate::api::error::Error::Serialization; use crate::api::error::Result; +use crate::api::plugin::RequestResource; use crate::common::remote::grpc::message::response::ErrorResponse; use crate::nacos_proto::v2::{Metadata, Payload}; use std::fmt::Debug; @@ -204,6 +205,8 @@ pub(crate) trait GrpcRequestMessage: GrpcMessageData { fn request_id(&self) -> Option<&String>; fn module(&self) -> &str; + + fn request_resource(&self) -> Option; } pub(crate) trait GrpcResponseMessage: GrpcMessageData { diff --git a/src/common/remote/grpc/mod.rs b/src/common/remote/grpc/mod.rs index 5a3e9c5..3a7d8c9 100644 --- a/src/common/remote/grpc/mod.rs +++ b/src/common/remote/grpc/mod.rs @@ -1,6 +1,5 @@ pub(crate) mod config; pub(crate) mod handlers; -pub(crate) mod layers; pub(crate) mod message; pub(crate) mod nacos_grpc_client; pub(crate) mod nacos_grpc_connection; diff --git a/src/common/remote/grpc/nacos_grpc_client.rs b/src/common/remote/grpc/nacos_grpc_client.rs index 41bb044..b981146 100644 --- a/src/common/remote/grpc/nacos_grpc_client.rs +++ b/src/common/remote/grpc/nacos_grpc_client.rs @@ -3,6 +3,7 @@ use tower::layer::util::Stack; use tracing::{instrument, Instrument}; use crate::api::error::Error; +use crate::api::plugin::{init_auth_plugin, AuthPlugin, NoopAuthPlugin}; use crate::common::remote::grpc::message::{request::NacosClientAbilities, GrpcMessageData}; use crate::common::remote::grpc::message::{ GrpcMessage, GrpcMessageBuilder, GrpcRequestMessage, GrpcResponseMessage, @@ -24,6 +25,7 @@ const APP_FILED: &str = "app"; pub(crate) struct NacosGrpcClient { app_name: String, send_request: Arc, + auth_plugin: Arc, } impl NacosGrpcClient { @@ -36,7 +38,11 @@ impl NacosGrpcClient { Request: GrpcRequestMessage + 'static, Response: GrpcResponseMessage + 'static, { - let request_headers = request.take_headers(); + let mut request_headers = request.take_headers(); + if let Some(resource) = request.request_resource() { + let auth_context = self.auth_plugin.get_login_identity(resource); + request_headers.extend(auth_context.contexts); + } let grpc_request = GrpcMessageBuilder::new(request) .header(APP_FILED.to_owned(), self.app_name.clone()) @@ -72,6 +78,8 @@ pub(crate) struct NacosGrpcClientBuilder { disconnected_listener: Option, unary_call_layer: Option, bi_call_layer: Option, + auth_plugin: Arc, + auth_context: HashMap, max_retries: Option, } @@ -90,6 +98,8 @@ impl NacosGrpcClientBuilder { disconnected_listener: None, unary_call_layer: None, bi_call_layer: None, + auth_context: Default::default(), + auth_plugin: Arc::new(NoopAuthPlugin::default()), max_retries: None, } } @@ -284,6 +294,20 @@ impl NacosGrpcClientBuilder { } } + pub(crate) fn auth_plugin(self, auth_plugin: Arc) -> Self { + Self { + auth_plugin, + ..self + } + } + + pub(crate) fn auth_context(self, auth_context: HashMap) -> Self { + Self { + auth_context, + ..self + } + } + pub(crate) fn register_server_request_handler( mut self, handler: Arc, @@ -348,7 +372,7 @@ impl NacosGrpcClientBuilder { ); let send_request = { - let server_list = PollingServerListService::new(self.server_list); + let server_list = PollingServerListService::new(self.server_list.clone()); let mut tonic_builder = TonicBuilder::new(self.grpc_config, server_list); if let Some(layer) = self.unary_call_layer { tonic_builder = tonic_builder.unary_call_layer(layer); @@ -377,14 +401,24 @@ impl NacosGrpcClientBuilder { connection = connection.disconnected_listener(disconnected_listener); } - let failover_connection = connection.into_failover_connection(id); + let failover_connection = connection.into_failover_connection(id.clone()); Arc::new(failover_connection) as Arc }; + + init_auth_plugin( + self.auth_plugin.clone(), + self.server_list.clone(), + self.auth_context.clone(), + id, + ); + let app_name = self.app_name; + let auth_plugin = self.auth_plugin; NacosGrpcClient { app_name, send_request, + auth_plugin, } } } @@ -436,6 +470,7 @@ pub mod tests { let nacos_grpc_client = NacosGrpcClient { app_name: "test_app".to_string(), send_request: Arc::new(mock_send_request), + auth_plugin: Arc::new(NoopAuthPlugin::default()), }; let response = nacos_grpc_client diff --git a/src/config/worker.rs b/src/config/worker.rs index aab4709..8c65c15 100644 --- a/src/config/worker.rs +++ b/src/config/worker.rs @@ -1,7 +1,6 @@ use crate::api::config::ConfigResponse; use crate::api::plugin::{AuthPlugin, ConfigFilter, ConfigReq, ConfigResp}; use crate::api::props::ClientProps; -use crate::common::remote::grpc::layers::auth::AuthLayer; use crate::common::remote::grpc::message::GrpcResponseMessage; use crate::common::remote::grpc::{NacosGrpcClient, NacosGrpcClientBuilder}; use crate::config::cache::CacheData; @@ -37,13 +36,6 @@ impl ConfigWorker { let (notify_change_tx, notify_change_rx) = tokio::sync::mpsc::channel(16); let notify_change_tx_clone = notify_change_tx.clone(); - let auth_layer = Arc::new(AuthLayer::new( - auth_plugin.clone(), - client_props.get_server_list()?.to_vec(), - client_props.get_auth_context(), - client_id.clone(), - )); - let remote_client = NacosGrpcClientBuilder::new(client_props.get_server_list()?) .port(client_props.get_remote_grpc_port()) .namespace(client_props.get_namespace()) @@ -65,8 +57,8 @@ impl ConfigWorker { .register_server_request_handler::(Arc::new( ConfigChangeNotifyHandler { notify_change_tx }, )) - .unary_call_layer(auth_layer.clone()) - .bi_call_layer(auth_layer) + .auth_plugin(auth_plugin) + .auth_context(client_props.get_auth_context()) .max_retries(client_props.get_max_retries()) .build(client_id); diff --git a/src/naming/mod.rs b/src/naming/mod.rs index 7ebc502..d90c9a6 100644 --- a/src/naming/mod.rs +++ b/src/naming/mod.rs @@ -12,7 +12,6 @@ use crate::api::props::ClientProps; use crate::common::cache::{Cache, CacheBuilder}; use crate::common::executor; -use crate::common::remote::grpc::layers::auth::AuthLayer; use crate::common::remote::grpc::message::GrpcRequestMessage; use crate::common::remote::grpc::message::GrpcResponseMessage; use crate::common::remote::grpc::NacosGrpcClient; @@ -89,13 +88,6 @@ impl NacosNamingService { let server_request_handler = NamingPushRequestHandler::new(emitter.clone()); - let auth_layer = Arc::new(AuthLayer::new( - auth_plugin, - server_list.to_vec(), - client_props.get_auth_context(), - client_id.clone(), - )); - let nacos_grpc_client = NacosGrpcClientBuilder::new(server_list.to_vec()) .port(client_props.get_remote_grpc_port()) .namespace(namespace.clone()) @@ -131,8 +123,8 @@ impl NacosNamingService { redo.on_grpc_client_disconnect().await; }); }) - .unary_call_layer(auth_layer.clone()) - .bi_call_layer(auth_layer) + .auth_context(client_props.get_auth_context()) + .auth_plugin(auth_plugin) .max_retries(client_props.get_max_retries()) .build(client_id.clone());