Skip to content

Commit

Permalink
testing for policy_watcher
Browse files Browse the repository at this point in the history
Signed-off-by: ilrudie <[email protected]>
  • Loading branch information
ilrudie committed Jan 24, 2024
1 parent 7ca1321 commit e34a9aa
Showing 1 changed file with 77 additions and 14 deletions.
91 changes: 77 additions & 14 deletions src/proxy/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,6 @@ impl ConnectionManager {
// potentially large copy under read lock, could require optomization
self.drains.read().await.keys().cloned().collect()
}

#[allow(dead_code)]
pub async fn drain_all(self) {
let mut drains = self.drains.write_owned().await;
for (_conn, cd) in drains.drain() {
cd.drain().await;
}
}
}

pub async fn policy_watcher(
Expand Down Expand Up @@ -120,13 +112,20 @@ pub async fn policy_watcher(
}

#[cfg(test)]
mod test {
mod tests {
use drain::Watch;
use std::net::{Ipv4Addr, SocketAddrV4};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::sync::watch;
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};

use super::ConnectionManager;
use crate::rbac::Connection;
use crate::state::{DemandProxyState, ProxyState};
use crate::xds::istio::security::{Action, Authorization, Scope};
use crate::xds::ProxyStateUpdateMutator;

use super::ConnectionManager;

#[tokio::test]
async fn test_connection_manager() {
Expand Down Expand Up @@ -173,8 +172,8 @@ mod test {
assert_eq!(connections, vec![conn1.clone(), conn2.clone()]);

// spawn tasks to assert that we close in a timely manner for conn1
tokio::spawn(async_close_assert(close1));
tokio::spawn(async_close_assert(another_close1));
tokio::spawn(assert_close(close1));
tokio::spawn(assert_close(another_close1));
// close conn1
connection_manager.close(&conn1).await;
// ensure drains contains exactly 1 item
Expand All @@ -183,16 +182,80 @@ mod test {
assert_eq!(connection_manager.connections().await, vec!(conn2.clone()));

// spawn a task to assert that we close in a timely manner for conn2
tokio::spawn(async_close_assert(close2));
tokio::spawn(assert_close(close2));
// close conn2
connection_manager.close(&conn2).await;
// assert that drains is empty again
assert_eq!(connection_manager.drains.read().await.len(), 0);
assert_eq!(connection_manager.connections().await.len(), 0);
}

#[tokio::test]
async fn test_policy_watcher_closes() {
// preamble: setup an environment
let state = Arc::new(RwLock::new(ProxyState::default()));
let dstate = DemandProxyState::new(
state.clone(),
None,
ResolverConfig::default(),
ResolverOpts::default(),
);
let connection_manager = ConnectionManager::new();
let parent_proxy = "test";
let (stop_tx, stop_rx) = watch::channel(());
let state_mutator = ProxyStateUpdateMutator::new_no_fetch();

// clones to move into spawned task
let ds = dstate.clone();
let cm = connection_manager.clone();
// spawn a task which watches policy and asserts that the policy watcher stop correctly
tokio::spawn(async move {
let res = tokio::time::timeout(
Duration::from_secs(1),
super::policy_watcher(ds, stop_rx, cm, parent_proxy),
)
.await;
assert!(res.is_ok())
});

// create a test connection
let conn1 = Connection {
src_identity: None,
src_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)),
dst_network: "".to_string(),
dst: std::net::SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 0, 2), 8080)),
};
// watch the connection
let close1 = connection_manager.clone().track(&conn1).await;

// generate policy which denies everything
let auth = Authorization {
name: "allow-nothing".to_string(),
action: Action::Deny as i32,
scope: Scope::Global as i32,
namespace: "default".to_string(),
rules: vec![],
};

// spawn an assertion that our connection close is received
tokio::spawn(assert_close(close1));

// update our state
let mut s = state
.write()
.expect("test fails if we're unable to get a write lock on state");
let res = state_mutator.insert_authorization(&mut s, auth);
// assert that the update was OK
assert!(res.is_ok());
// release lock
drop(s);

// send the signal which stops policy watcher
stop_tx.send_replace(());
}

// small helper to assert that the Watches are working in a timely manner
async fn async_close_assert(c: Watch) {
async fn assert_close(c: Watch) {
let result = tokio::time::timeout(Duration::from_secs(1), c.signaled()).await;
assert!(result.is_ok())
}
Expand Down

0 comments on commit e34a9aa

Please sign in to comment.