Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

close connections which violate policy after updates #772

Merged
merged 21 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2f36a49
initial impl for draining connections which are invalid after policy …
ilrudie Jan 16, 2024
eeef8b0
addressing early comments, fixing deadlocks, fixing drain logic
ilrudie Jan 17, 2024
07589b6
seperate connection close and proxy drain into different channels
ilrudie Jan 22, 2024
2ac3e7a
fix cargo fmt error
ilrudie Jan 22, 2024
e042654
unit testing for ConnectionManager
ilrudie Jan 23, 2024
b7825f9
sort connections to remove flakiness from indeterminant vec ordering
ilrudie Jan 23, 2024
5080ea8
fix trait impls for Identity per clippy
ilrudie Jan 23, 2024
99dacd9
more clippy fixes
ilrudie Jan 23, 2024
e90bf03
impl closing denied connections for inbound_passthrough, DRY asnyc fn…
ilrudie Jan 23, 2024
fa5dd8a
testing for policy_watcher
ilrudie Jan 24, 2024
59594d3
cleanup extra newline
ilrudie Jan 24, 2024
4808316
use hickory_resolver instead of trust_dns_resolver for connection_man…
ilrudie Jan 24, 2024
0ac3b25
enhancement to stop leaking connection channels in the connection man…
ilrudie Jan 24, 2024
79c12c8
remove unnecessary atomic
ilrudie Jan 30, 2024
55e6afa
use the map.entry... suggestion
ilrudie Jan 30, 2024
e4f4ce6
derive PartialOrd and Ord instead of implement
ilrudie Jan 30, 2024
f6d174b
alert policy subscribers at most once per xds update
ilrudie Jan 31, 2024
aadcc51
remove unneeded clone
ilrudie Jan 31, 2024
a840456
separate beginning to manage a conn from requestin a close receiver
ilrudie Feb 2, 2024
e2468da
register connections in outbound as well
ilrudie Feb 2, 2024
45c243f
switch to borrows for connection_manager, audit clone usage
ilrudie Feb 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/identity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ pub enum Identity {
},
}

impl Ord for Identity {
// Not sure this is a super legit compare but I think it should work for POC
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given this is no long POC should we do a more robust approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I do at want to consider if this is adequate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using something like this would force us to consider how Identity::X should be compared to Identity::Spiffe if/when a new identity format is added.

impl Ord for Identity {
    fn cmp(&self, other: &Self) -> Ordering {
        let s = match self {
            Identity::Spiffe {
                trust_domain,
                namespace,
                service_account,
            } => trust_domain.to_owned() + namespace + service_account,
        };
        let o = match other {
            Identity::Spiffe {
                trust_domain,
                namespace,
                service_account,
            } => trust_domain.to_owned() + namespace + service_account,
        };
        s.cmp(&o)
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we actually need ord?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nm, they can be derived... that's the play

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ord is required to sort which is used in the testing.

fn cmp(&self, other: &Self) -> Ordering {
let s = format!("{self}");
let o = format!("{other}");
s.cmp(&o)
}
}

impl PartialOrd for Identity {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl EncodeLabelValue for Identity {
fn encode(&self, writer: &mut LabelValueEncoder) -> Result<(), std::fmt::Error> {
writer.write_str(&self.to_string())
Expand Down
1 change: 1 addition & 0 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::state::workload::{network_addr, Workload};
use crate::state::DemandProxyState;
use crate::{config, identity, socket, tls};

mod connection_manager;
mod inbound;
mod inbound_passthrough;
#[allow(non_camel_case_types)]
Expand Down
359 changes: 359 additions & 0 deletions src/proxy/connection_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,359 @@
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::proxy::error;
use crate::rbac::Connection;
use crate::state::DemandProxyState;
use drain;
use std::collections::HashMap;
use std::sync::{atomic::AtomicUsize, Arc};
use tokio::sync::watch;
use tokio::sync::RwLock;
use tracing::{debug, info};

struct ConnectionDrain {
tx: drain::Signal,
rx: drain::Watch,
count: AtomicUsize,
}

impl ConnectionDrain {
fn new() -> Self {
let (tx, rx) = drain::channel();
let count = AtomicUsize::new(1);
ConnectionDrain { tx, rx, count }
}

/// drain drops the internal reference to rx and then signals drain on the tx
// always inline, this is for convenience so that we don't forget to drop the rx but there's really no reason it needs to grow the stack
#[inline(always)]
async fn drain(self) {
drop(self.rx); // very important, drain cannont complete if there are outstand rx
self.tx.drain().await;
}
}

#[derive(Clone)]
pub struct ConnectionManager {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a "pool" tracking connections. It seems like there, ideally, would be a single data structure tracking connections. I guess Pool tracks outer HBONE connections while this tracks inner user connections, though. Also I don't see a feasible way to use 1 structure anyways, so mostly hypothetical.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good thought. I couldn't work out how to do it either though.

drains: Arc<RwLock<HashMap<Connection, ConnectionDrain>>>,
}

impl ConnectionManager {
pub fn new() -> Self {
ConnectionManager {
drains: Arc::new(RwLock::new(HashMap::new())),
}
}

// register a connection with the manager and get a channel to receive on
pub async fn track(self, c: &Connection) -> drain::Watch {
let cd = ConnectionDrain::new();
let rx = cd.rx.clone();
let mut drains = self.drains.write().await;
if let Some(w) = drains.remove(c) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: del+mutate+insert can be done with code like:

map.entry("poneyland")
   .and_modify(|e| { *e += 1 })
   .or_insert(42);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe not with the return rx aspect though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modified rust playground example

Seems like it could work. TY

Copy link
Contributor Author

@ilrudie ilrudie Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could very well be a step too far and difficult to grok but:

    // register a connection with the manager and get a channel to receive on
    pub async fn track(self, c: &Connection) -> drain::Watch {
        self.drains
            .write()
            .await
            .entry(c.to_owned())
            .and_modify(|cd| cd.count += 1)
            .or_insert(ConnectionDrain::new())
            .rx
            .clone()
    }

w.count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let rx = w.rx.clone();
drains.insert(c.clone(), w);
debug!("there are {} tracked connections", drains.len());
return rx;
}
drains.insert(c.clone(), cd);
debug!("there are {} tracked connections", drains.len());
rx
}

// releases tracking on a connection
// uses a counter to determine if there are other tracked connections or not so it may retain the tx/rx channels when necessary
pub async fn release(self, c: &Connection) {
let mut drains = self.drains.write().await;
if let Some((k, v)) = drains.remove_entry(c) {
if v.count.fetch_sub(1, std::sync::atomic::Ordering::SeqCst) > 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why do we need atomic operations if we have a write lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct. I was gunna try something a little fancier and ended going with this without removing the atomic. A lot of this logic may be just removed though if we want to switch off the drain crate onto a different type of channel as Ben suggested elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be better now. Atomic removed

// something else is tracking this connection, retain
drains.insert(k, v);
Copy link
Contributor

@bleggett bleggett Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this already - but drain-rs is a very light and rather inflexible wrapper around plain old tokio channels, which are considerably more flexible and not that much harder to use, and I suspect there might be other ways to accomplish this by using tokio channels directly (different channel type, or something like WeakSender), without the manual external atomic count.

But the effect would be the same so it is probably not worth it, this will do the job AFAICT - so just FYI if drain-rs gives you trouble and it's easier to drop or replace, I don't have a problem with ditching it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a fair idea. I'm not sure we strictly need the bidirectional close functionality the crate offers for this case... in fact we may prefer a fire and forget since for our purposes right now we don't actually care if the receiver get's a signal or an error. We're sending 1 bit of information which just means do your thing... At any rate this is certainly something to consider.

}
}
}

// signal all connections listening to this channel to take action (typically terminate traffic)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment seems confusing, or I don't understand it. Signal all connections? But the input is 1 connection.

Is Connection representing the outer hbone connection, and drain() waits until all inner connects close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially a single rbac::connection could have multiple connections handling traffic I think. In that case they all need to be closed and this code should handle the case if it arises.

async fn close(&self, c: &Connection) {
if let Some(cd) = self.drains.clone().write().await.remove(c) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove clone

cd.drain().await;
} else {
// this is bad, possibly drain called twice
error!("requested drain on a Connection which wasn't initialized");
}
}

// get a list of all connections being tracked
async fn connections(&self) -> Vec<Connection> {
// potentially large copy under read lock, could require optomization
self.drains.read().await.keys().cloned().collect()
}
}

pub async fn policy_watcher(
state: DemandProxyState,
mut stop_rx: watch::Receiver<()>,
connection_manager: ConnectionManager,
parent_proxy: &str,
) {
let mut policies_changed = state.read().policies.subscribe();
loop {
tokio::select! {
_ = stop_rx.changed() => {
break;
}
_ = policies_changed.changed() => {
let connections = connection_manager.connections().await;
for conn in connections {
if !state.assert_rbac(&conn).await {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not a Rust expert - does this await individually or globally? Any time there's IO, you probably don't want to stall in a loop in a switch - you may overflow the pending events because IO is slow. It's best to fire-and-forget and then let someone reap the zombies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will await the individual assertion against the one connection but await doesn't mean block (there is a specific block function if that's what you want though). If the Future being awaited is not ready it relinquishes control and will be polled later by the executor of the task/thread.

Rusts Futures are somewhat unique in that they are lazy. If nothing drives them to completion they will not do any work so we can't totally fire and forget. We can either await them and let the executor/callback model poll them or we could collect the all futures and join them later. If we do neither no work would happen though.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like you do want to block the loop or at least debounce it to avoid edge-trigerred event spam in the runtime. But I have no idea how to express that in Rust. Maybe a separate actor that is "fast" in consuming policy events but "slow" in enacting them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think having scoped events would be a solid optimization. IMO it would be better to keep this PR a little smaller and then iterate on some optimizations as we move forward but if folks strongly disagree we can start optimizing now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code you are commenting on is sort of that actor (in concept at least). It's being spun up in its own task so it should be (at least somewhat) independent of the tasks which are actually moving data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand the event spam, is the concern is we enqueue multiple times to trigger this code? Like while we are looping here, we get changed() multiple times stacked up?

If so, I think it will have at most one pending. That being said, you could still hit some spam I guess

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the implementation of sending to policy subscribers to be less frequent in cases where we've got batched updates. Should help

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be a race between connection tracked and policy update.
like:

t0: a connection established with old rbac
t1: the rbac updated
t2: policy watcher running drain here
t3. the connection established at t0 get tracked

the connection actually should be closed

connection_manager.close(&conn).await;
info!("{parent_proxy} connection {conn} closed because it's no longer allowed after a policy update");
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use drain::Watch;
use hickory_resolver::config::{ResolverConfig, ResolverOpts};
use std::net::{Ipv4Addr, SocketAddrV4};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::sync::watch;

use crate::rbac::Connection;
use crate::state::{DemandProxyState, ProxyState};
use crate::xds::istio::security::{Action, Authorization, Scope};
use crate::xds::ProxyStateUpdateMutator;

use super::ConnectionManager;

#[tokio::test]
async fn test_connection_manager_close() {
// setup a new ConnectionManager
let connection_manager = ConnectionManager::new();
// ensure drains is empty
assert_eq!(connection_manager.drains.read().await.len(), 0);
assert_eq!(connection_manager.connections().await.len(), 0);

// track a new connection
let conn1 = Connection {
src_identity: None,
src_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)),
dst_network: "".to_string(),
dst: std::net::SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 0, 2), 8080)),
};
let close1 = connection_manager.clone().track(&conn1).await;
// ensure drains contains exactly 1 item
assert_eq!(connection_manager.drains.read().await.len(), 1);
assert_eq!(connection_manager.connections().await.len(), 1);
assert_eq!(connection_manager.connections().await, vec!(conn1.clone()));

// setup a second track on the same connection
let another_conn1 = conn1.clone();
let another_close1 = connection_manager.clone().track(&another_conn1).await;
// ensure drains contains exactly 1 item
assert_eq!(connection_manager.drains.read().await.len(), 1);
assert_eq!(connection_manager.connections().await.len(), 1);
assert_eq!(connection_manager.connections().await, vec!(conn1.clone()));

// track a second connection
let conn2 = Connection {
src_identity: None,
src_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 0, 3)),
dst_network: "".to_string(),
dst: std::net::SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 0, 2), 8080)),
};
let close2 = connection_manager.clone().track(&conn2).await;
// ensure drains contains exactly 2 items
assert_eq!(connection_manager.drains.read().await.len(), 2);
assert_eq!(connection_manager.connections().await.len(), 2);
let mut connections = connection_manager.connections().await;
connections.sort(); // ordering cannot be guaranteed without sorting
assert_eq!(connections, vec![conn1.clone(), conn2.clone()]);

// spawn tasks to assert that we close in a timely manner for conn1
tokio::spawn(assert_close(close1));
tokio::spawn(assert_close(another_close1));
// close conn1
connection_manager.close(&conn1).await;
// ensure drains contains exactly 1 item
assert_eq!(connection_manager.drains.read().await.len(), 1);
assert_eq!(connection_manager.connections().await.len(), 1);
assert_eq!(connection_manager.connections().await, vec!(conn2.clone()));

// spawn a task to assert that we close in a timely manner for conn2
tokio::spawn(assert_close(close2));
// close conn2
connection_manager.close(&conn2).await;
// assert that drains is empty again
assert_eq!(connection_manager.drains.read().await.len(), 0);
assert_eq!(connection_manager.connections().await.len(), 0);
}

#[tokio::test]
async fn test_connection_manager_release() {
// setup a new ConnectionManager
let connection_manager = ConnectionManager::new();
// ensure drains is empty
assert_eq!(connection_manager.drains.read().await.len(), 0);
assert_eq!(connection_manager.connections().await.len(), 0);

// create a new connection
let conn1 = Connection {
src_identity: None,
src_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)),
dst_network: "".to_string(),
dst: std::net::SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 0, 2), 8080)),
};

// create a second connection
let conn2 = Connection {
src_identity: None,
src_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 0, 3)),
dst_network: "".to_string(),
dst: std::net::SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 0, 2), 8080)),
};

let another_conn1 = conn1.clone();

// watch the connections
let close1 = connection_manager.clone().track(&conn1).await;
let another_close1 = connection_manager.clone().track(&another_conn1).await;
// ensure drains contains exactly 1 item
assert_eq!(connection_manager.drains.read().await.len(), 1);
assert_eq!(connection_manager.connections().await.len(), 1);
assert_eq!(connection_manager.connections().await, vec!(conn1.clone()));

// release conn1's clone
drop(another_close1);
connection_manager.clone().release(&another_conn1).await;
// ensure drains still contains exactly 1 item
assert_eq!(connection_manager.drains.read().await.len(), 1);
assert_eq!(connection_manager.connections().await.len(), 1);
assert_eq!(connection_manager.connections().await, vec!(conn1.clone()));

// track conn2
let close2 = connection_manager.clone().track(&conn2).await;
// ensure drains contains exactly 2 items
assert_eq!(connection_manager.drains.read().await.len(), 2);
assert_eq!(connection_manager.connections().await.len(), 2);
let mut connections = connection_manager.connections().await;
connections.sort(); // ordering cannot be guaranteed without sorting
assert_eq!(connections, vec![conn1.clone(), conn2.clone()]);

// release conn1
drop(close1);
connection_manager.clone().release(&conn1).await;
// ensure drains contains exactly 1 item
assert_eq!(connection_manager.drains.read().await.len(), 1);
assert_eq!(connection_manager.connections().await.len(), 1);
assert_eq!(connection_manager.connections().await, vec!(conn2.clone()));

// clone conn2 and track it
let another_conn2 = conn2.clone();
let another_close2 = connection_manager.clone().track(&another_conn2).await;
drop(close2);
// release tracking on conn2
connection_manager.clone().release(&conn2).await;
// ensure drains still contains exactly 1 item
assert_eq!(connection_manager.drains.read().await.len(), 1);
assert_eq!(connection_manager.connections().await.len(), 1);
assert_eq!(
connection_manager.connections().await,
vec!(another_conn2.clone())
);

// release tracking on conn2's clone
drop(another_close2);
connection_manager.clone().release(&another_conn2).await;
// ensure drains contains exactly 0 items
assert_eq!(connection_manager.drains.read().await.len(), 0);
assert_eq!(connection_manager.connections().await.len(), 0);
}

#[tokio::test]
async fn test_policy_watcher_lifecycle() {
// preamble: setup an environment
let state = Arc::new(RwLock::new(ProxyState::default()));
let dstate = DemandProxyState::new(
state.clone(),
None,
ResolverConfig::default(),
ResolverOpts::default(),
);
let connection_manager = ConnectionManager::new();
let parent_proxy = "test";
let (stop_tx, stop_rx) = watch::channel(());
let state_mutator = ProxyStateUpdateMutator::new_no_fetch();

// clones to move into spawned task
let ds = dstate.clone();
let cm = connection_manager.clone();
// spawn a task which watches policy and asserts that the policy watcher stop correctly
tokio::spawn(async move {
let res = tokio::time::timeout(
Duration::from_secs(1),
super::policy_watcher(ds, stop_rx, cm, parent_proxy),
)
.await;
assert!(res.is_ok())
});

// create a test connection
let conn1 = Connection {
src_identity: None,
src_ip: std::net::IpAddr::V4(Ipv4Addr::new(192, 168, 0, 1)),
dst_network: "".to_string(),
dst: std::net::SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(192, 168, 0, 2), 8080)),
};
// watch the connection
let close1 = connection_manager.clone().track(&conn1).await;

// generate policy which denies everything
let auth = Authorization {
name: "allow-nothing".to_string(),
action: Action::Deny as i32,
scope: Scope::Global as i32,
namespace: "default".to_string(),
rules: vec![],
};

// spawn an assertion that our connection close is received
tokio::spawn(assert_close(close1));

// update our state
let mut s = state
.write()
.expect("test fails if we're unable to get a write lock on state");
let res = state_mutator.insert_authorization(&mut s, auth);
// assert that the update was OK
assert!(res.is_ok());
// release lock
drop(s);

// send the signal which stops policy watcher
stop_tx.send_replace(());
}

// small helper to assert that the Watches are working in a timely manner
async fn assert_close(c: Watch) {
let result = tokio::time::timeout(Duration::from_secs(1), c.signaled()).await;
assert!(result.is_ok())
}
}
Loading