Skip to content

Commit

Permalink
tcproute: add support for round-robin load balancing (#148)
Browse files Browse the repository at this point in the history
Add support for round-robin load balancing for `TCPRoute`. This enables
mutliple backend references in a UDPRoute object, with traffic being
distributed to each backend in a round-robin fashion.

A new BPF map `TCP_CONNECTIONS` was introduced to help keep track of
active TCP connections, with its key storing the clien <<ip:port>>
identifier and the value storing the backend's <<ip:port>> along with
the state of the connection.

Fixes #119 

Follow up items:
* modify `TCP_CONNECTIONS` to be more generic; get rid of
`BLIXT_CONNTRACK` and refactor udp ingress and icmp egress to use the
new generic map
  • Loading branch information
k8s-ci-robot authored Dec 11, 2023
2 parents 940f6aa + 4acd90b commit fa647b5
Show file tree
Hide file tree
Showing 15 changed files with 635 additions and 117 deletions.
18 changes: 10 additions & 8 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,16 @@ jobs:
BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server"
TAG: "integration-tests"

- name: run integration tests with bpfd
run: make test.integration
env:
BLIXT_CONTROLPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-controlplane"
BLIXT_DATAPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-dataplane"
BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server"
BLIXT_USE_BPFD: true
TAG: "integration-tests"
# temporarily disabled due to upstream changes in bpfman (previously bpfd)
# ref: https://github.com/kubernetes-sigs/blixt/issues/152
# - name: run integration tests with bpfd
# run: make test.integration
# env:
# BLIXT_CONTROLPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-controlplane"
# BLIXT_DATAPLANE_IMAGE: "ghcr.io/kubernetes-sigs/blixt-dataplane"
# BLIXT_UDP_SERVER_IMAGE: "ghcr.io/kubernetes-sigs/blixt-udp-test-server"
# BLIXT_USE_BPFD: true
# TAG: "integration-tests"

## Upload diagnostics if integration test step failed.
- name: upload diagnostics
Expand Down
6 changes: 2 additions & 4 deletions config/samples/tcproute/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ spec:
spec:
containers:
- name: server
image: ghcr.io/shaneutt/malutki
image: istio/tcp-echo-server:1.1
imagePullPolicy: IfNotPresent
env:
- name: LISTEN_PORT
value: "8080"
args: [ "8080", "blixt-tcproute-sample:" ]
ports:
- containerPort: 8080
protocol: TCP
Expand Down
7 changes: 7 additions & 0 deletions config/tests/tcproute-rr/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- ../../samples/tcproute
- server.yaml
patches:
- path: patch.yaml
17 changes: 17 additions & 0 deletions config/tests/tcproute-rr/patch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: gateway.networking.k8s.io/v1alpha2
kind: TCPRoute
metadata:
name: blixt-tcproute-sample
spec:
parentRefs:
- name: blixt-tcproute-sample
port: 8080
rules:
- backendRefs:
- name: blixt-tcproute-sample
port: 8080
- backendRefs:
- name: tcproute-rr-v1
port: 8080
- name: tcproute-rr-v2
port: 8080
76 changes: 76 additions & 0 deletions config/tests/tcproute-rr/server.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: tcproute-rr-v1
labels:
app: tcproute-rr-v1
spec:
selector:
matchLabels:
app: tcproute-rr-v1
template:
metadata:
labels:
app: tcproute-rr-v1
spec:
containers:
- name: tcp-echo
image: istio/tcp-echo-server:1.1
imagePullPolicy: IfNotPresent
args: [ "8080", "tcproute-rr-v1:" ]
ports:
- containerPort: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: tcproute-rr-v2
labels:
app: tcproute-rr-v2
spec:
selector:
matchLabels:
app: tcproute-rr-v2
template:
metadata:
labels:
app: tcproute-rr-v2
spec:
containers:
- name: tcp-echo
image: istio/tcp-echo-server:1.1
imagePullPolicy: IfNotPresent
args: [ "8080", "tcproute-rr-v2:" ]
ports:
- containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
labels:
app: tcproute-rr-v1
name: tcproute-rr-v1
spec:
ports:
- name: tcp
port: 8080
protocol: TCP
selector:
app: tcproute-rr-v1
type: ClusterIP
---
apiVersion: v1
kind: Service
metadata:
labels:
app: tcproute-rr-v2
name: tcproute-rr-v2
spec:
ports:
- name: tcp
port: 8080
protocol: TCP
selector:
app: tcproute-rr-v2
type: ClusterIP
5 changes: 3 additions & 2 deletions dataplane/api-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ use aya::maps::{HashMap, MapData};
use tonic::transport::Server;

use backends::backends_server::BackendsServer;
use common::{BackendKey, BackendList};
use common::{BackendKey, BackendList, ClientKey, TCPBackend};

pub async fn start(
addr: Ipv4Addr,
port: u16,
backends_map: HashMap<MapData, BackendKey, BackendList>,
gateway_indexes_map: HashMap<MapData, BackendKey, u16>,
tcp_conns_map: HashMap<MapData, ClientKey, TCPBackend>,
) -> Result<(), Error> {
let server = server::BackendService::new(backends_map, gateway_indexes_map);
let server = server::BackendService::new(backends_map, gateway_indexes_map, tcp_conns_map);
// TODO: mTLS https://github.com/Kong/blixt/issues/50
Server::builder()
.add_service(BackendsServer::new(server))
Expand Down
41 changes: 37 additions & 4 deletions dataplane/api-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,31 @@ use std::net::Ipv4Addr;
use std::sync::Arc;

use anyhow::Error;
use aya::maps::{HashMap, MapData};
use aya::maps::{HashMap, MapData, MapError};
use tokio::sync::Mutex;
use tonic::{Request, Response, Status};

use crate::backends::backends_server::Backends;
use crate::backends::{Confirmation, InterfaceIndexConfirmation, PodIp, Targets, Vip};
use crate::netutils::{if_name_for_routing_ip, if_nametoindex};
use common::{Backend, BackendKey, BackendList, BACKENDS_ARRAY_CAPACITY};
use common::{Backend, BackendKey, BackendList, ClientKey, TCPBackend, BACKENDS_ARRAY_CAPACITY};

pub struct BackendService {
backends_map: Arc<Mutex<HashMap<MapData, BackendKey, BackendList>>>,
gateway_indexes_map: Arc<Mutex<HashMap<MapData, BackendKey, u16>>>,
tcp_conns_map: Arc<Mutex<HashMap<MapData, ClientKey, TCPBackend>>>,
}

impl BackendService {
pub fn new(
backends_map: HashMap<MapData, BackendKey, BackendList>,
gateway_indexes_map: HashMap<MapData, BackendKey, u16>,
tcp_conns_map: HashMap<MapData, ClientKey, TCPBackend>,
) -> BackendService {
BackendService {
backends_map: Arc::new(Mutex::new(backends_map)),
gateway_indexes_map: Arc::new(Mutex::new(gateway_indexes_map)),
tcp_conns_map: Arc::new(Mutex::new(tcp_conns_map)),
}
}

Expand All @@ -51,6 +54,35 @@ impl BackendService {
backends_map.remove(&key)?;
let mut gateway_indexes_map = self.gateway_indexes_map.lock().await;
gateway_indexes_map.remove(&key)?;

// Delete all entries in our tcp connection tracking map that this backend
// key was related to. This is needed because the TCPRoute might have been
// deleted with TCP connection(s) still open, so without the below logic
// they'll hang around forever.
// Its better to do this rather than maintain a reverse index because the index
// would need to be updated with each new connection. With remove being a less
// frequently used operation, the performance cost is less visible.
let mut tcp_conns_map = self.tcp_conns_map.lock().await;
for item in tcp_conns_map
.iter()
.collect::<Vec<Result<(ClientKey, TCPBackend), MapError>>>()
{
match item {
Ok((
client_key,
TCPBackend {
backend: _,
backend_key,
state: _,
},
)) => {
if backend_key == key {
tcp_conns_map.remove(&client_key)?;
};
}
Err(err) => return Err(err.into()),
};
}
Ok(())
}
}
Expand Down Expand Up @@ -144,9 +176,10 @@ impl Backends for BackendService {
match self.insert_and_reset_index(key, backend_list).await {
Ok(_) => Ok(Response::new(Confirmation {
confirmation: format!(
"success, vip {}:{} was updated",
"success, vip {}:{} was updated with {} backends",
Ipv4Addr::from(vip.ip),
vip.port
vip.port,
count,
),
})),
Err(err) => Err(Status::internal(format!("failure: {}", err))),
Expand Down
40 changes: 39 additions & 1 deletion dataplane/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Backend {
#[cfg(feature = "user")]
unsafe impl aya::Pod for Backend {}

#[derive(Copy, Clone, Debug)]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[repr(C)]
pub struct BackendKey {
pub ip: u32,
Expand All @@ -40,3 +40,41 @@ pub struct BackendList {

#[cfg(feature = "user")]
unsafe impl aya::Pod for BackendList {}

#[derive(Copy, Clone, Debug)]
#[repr(C)]
pub struct ClientKey {
pub ip: u32,
pub port: u32,
}

#[cfg(feature = "user")]
unsafe impl aya::Pod for ClientKey {}

// TCPState contains variants that represent the current phase of the TCP connection at a point in
// time during the connection's termination.
#[derive(Copy, Clone, Debug, Default)]
#[repr(C)]
pub enum TCPState {
#[default]
Established,
FinWait1,
FinWait2,
Closing,
TimeWait,
Closed,
}

#[cfg(feature = "user")]
unsafe impl aya::Pod for TCPState {}

#[derive(Copy, Clone, Debug)]
#[repr(C)]
pub struct TCPBackend {
pub backend: Backend,
pub backend_key: BackendKey,
pub state: TCPState,
}

#[cfg(feature = "user")]
unsafe impl aya::Pod for TCPBackend {}
45 changes: 31 additions & 14 deletions dataplane/ebpf/src/egress/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use aya_bpf::{
programs::TcContext,
};
use aya_log_ebpf::info;
use common::ClientKey;
use network_types::{eth::EthHdr, ip::Ipv4Hdr, tcp::TcpHdr};

use crate::{
utils::{csum_fold_helper, ptr_at},
BLIXT_CONNTRACK,
utils::{csum_fold_helper, ptr_at, update_tcp_conns},
TCP_CONNECTIONS,
};

pub fn handle_tcp_egress(ctx: TcContext) -> Result<i32, i64> {
Expand All @@ -29,25 +30,30 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result<i32, i64> {

// capture some IP and port information
let client_addr = unsafe { (*ip_hdr).dst_addr };
let dest_port = unsafe { (*tcp_hdr).dest.to_be() };
let ip_port_tuple = unsafe { BLIXT_CONNTRACK.get(&client_addr) }.ok_or(TC_ACT_PIPE)?;

// verify traffic destination
if ip_port_tuple.1 as u16 != dest_port {
return Ok(TC_ACT_PIPE);
}
let dest_port = unsafe { (*tcp_hdr).dest };
// The source identifier
let client_key = ClientKey {
ip: u32::from_be(client_addr),
port: u16::from_be(dest_port) as u32,
};
let tcp_backend = unsafe { TCP_CONNECTIONS.get(&client_key) }.ok_or(TC_ACT_PIPE)?;

info!(
&ctx,
"Received TCP packet destined for tracked IP {:i}:{} setting source IP to VIP {:i}",
"Received TCP packet destined for tracked IP {:i}:{} setting source IP to VIP {:i}:{}",
u32::from_be(client_addr),
ip_port_tuple.1 as u16,
u32::from_be(ip_port_tuple.0),
u16::from_be(dest_port),
tcp_backend.backend_key.ip,
tcp_backend.backend_key.port,
);

// TODO: connection tracking cleanup https://github.com/kubernetes-sigs/blixt/issues/85
// SNAT the ip address
unsafe {
(*ip_hdr).src_addr = ip_port_tuple.0;
(*ip_hdr).src_addr = tcp_backend.backend_key.ip.to_be();
};
// SNAT the port
unsafe { (*tcp_hdr).source = u16::from_be(tcp_backend.backend_key.port as u16) };

if (ctx.data() + EthHdr::LEN + Ipv4Hdr::LEN) > ctx.data_end() {
info!(&ctx, "Iphdr is out of bounds");
Expand All @@ -67,7 +73,18 @@ pub fn handle_tcp_egress(ctx: TcContext) -> Result<i32, i64> {
unsafe { (*ip_hdr).check = csum_fold_helper(full_cksum) };
unsafe { (*tcp_hdr).check = 0 };

// TODO: connection tracking cleanup https://github.com/kubernetes-sigs/blixt/issues/85
let tcp_hdr_ref = unsafe { tcp_hdr.as_ref().ok_or(TC_ACT_OK)? };

// If the packet has the RST flag set, it means the connection is being terminated, so remove it
// from our map.
if tcp_hdr_ref.rst() == 1 {
unsafe {
TCP_CONNECTIONS.remove(&client_key)?;
}
}

let mut tcp_bk = *tcp_backend;
update_tcp_conns(tcp_hdr_ref, &client_key, &mut tcp_bk)?;

Ok(TC_ACT_PIPE)
}
Loading

0 comments on commit fa647b5

Please sign in to comment.