Skip to content

Commit

Permalink
Introduce TCP keepalive configuration (#186)
Browse files Browse the repository at this point in the history
* Add logging to proxy::tcp

* update rust version in dockerfile

* Introduce TCP keepalive configuration

In some network environments, peers may silently drop connections such
that the proxy cannot detect that the peer's socket has been closed.

The [TCP keepalive socket options][tcp-keepalive] configures the kernel
to actively probe connections to ensure connectivity and prevent idle
timeouts.

This change adds stack modules that attempt to configure accept and
connect sockets' TCP keepalive socket options. There are four new
environment configurations the proxy supports:

- `LINKERD2_PROXY_INBOUND_ACCEPT_KEEPALIVE`
- `LINKERD2_PROXY_OUTBOUND_ACCEPT_KEEPALIVE`
- `LINKERD2_PROXY_INBOUND_CONNECT_KEEPALIVE`
- `LINKERD2_PROXY_OUTBOUND_CONNECT_KEEPALIVE`

When an environment variable is unset, no keepalive is set on the
corresponding sockets. Otherwise, its value is parsed as a duration.
OSes may or may not understand subsecond values.

It is recommended to only set the inbound-accept and outbound-connect
keepalive values, as keepalives shouldn'tbe necessary on localhost.

Relates to linkerd/linkerd2#1949 linkerd/linkerd2#2182

[tcp-keepalive]: http://www.tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html

* Use smarter controller keepalives

For the controller's pods, it may not make sense to use the outbound
keepalive when commuciating with the proxy api, because this API may be
served on localhost.

If the controller's address is localhost/loopback, then use the
inbound connect keepalive instead.
  • Loading branch information
olix0r authored Feb 4, 2019
1 parent ccf13a2 commit 0fc35e2
Show file tree
Hide file tree
Showing 14 changed files with 375 additions and 62 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# This reduces build time and produces binaries with debug symbols, at the expense of
# runtime performance.

ARG RUST_IMAGE=rust:1.30.0
ARG RUST_IMAGE=rust:1.32.0
ARG RUNTIME_IMAGE=gcr.io/linkerd-io/base:2017-10-30.01

## Builds the proxy as incrementally as possible.
Expand Down
42 changes: 42 additions & 0 deletions src/app/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ pub struct Config {
/// The maximum amount of time to wait for a connection to a remote peer.
pub outbound_connect_timeout: Duration,

// TCP Keepalive set on accepted inbound connections.
pub inbound_accept_keepalive: Option<Duration>,

// TCP Keepalive set on accepted outbound connections.
pub outbound_accept_keepalive: Option<Duration>,

// TCP Keepalive set on inbound connections to the local application.
pub inbound_connect_keepalive: Option<Duration>,

// TCP Keepalive set on outbound connections to the remote peers.
pub outbound_connect_keepalive: Option<Duration>,

pub inbound_ports_disable_protocol_detection: IndexSet<u16>,

pub outbound_ports_disable_protocol_detection: IndexSet<u16>,
Expand Down Expand Up @@ -179,6 +191,12 @@ pub const ENV_METRICS_RETAIN_IDLE: &str = "LINKERD2_PROXY_METRICS_RETAIN_IDLE";
const ENV_INBOUND_CONNECT_TIMEOUT: &str = "LINKERD2_PROXY_INBOUND_CONNECT_TIMEOUT";
const ENV_OUTBOUND_CONNECT_TIMEOUT: &str = "LINKERD2_PROXY_OUTBOUND_CONNECT_TIMEOUT";

const ENV_INBOUND_ACCEPT_KEEPALIVE: &str = "LINKERD2_PROXY_INBOUND_ACCEPT_KEEPALIVE";
const ENV_OUTBOUND_ACCEPT_KEEPALIVE: &str = "LINKERD2_PROXY_OUTBOUND_ACCEPT_KEEPALIVE";

const ENV_INBOUND_CONNECT_KEEPALIVE: &str = "LINKERD2_PROXY_INBOUND_CONNECT_KEEPALIVE";
const ENV_OUTBOUND_CONNECT_KEEPALIVE: &str = "LINKERD2_PROXY_OUTBOUND_CONNECT_KEEPALIVE";

pub const DEPRECATED_ENV_PRIVATE_LISTENER: &str = "LINKERD2_PROXY_PRIVATE_LISTENER";
pub const DEPRECATED_ENV_PRIVATE_FORWARD: &str = "LINKERD2_PROXY_PRIVATE_FORWARD";
const DEPRECATED_ENV_PUBLIC_LISTENER: &str = "LINKERD2_PROXY_PUBLIC_LISTENER";
Expand Down Expand Up @@ -321,35 +339,53 @@ impl<'a> TryFrom<&'a Strings> for Config {
strings, ENV_INBOUND_LISTENER, DEPRECATED_ENV_PUBLIC_LISTENER, parse_addr);
let control_listener_addr = parse(strings, ENV_CONTROL_LISTENER, parse_addr);
let metrics_listener_addr = parse(strings, ENV_METRICS_LISTENER, parse_addr);

let inbound_forward = parse_deprecated(
strings, ENV_INBOUND_FORWARD, DEPRECATED_ENV_PRIVATE_FORWARD, parse_addr);

let inbound_connect_timeout = parse_deprecated(
strings, ENV_INBOUND_CONNECT_TIMEOUT, DEPRECATED_ENV_PRIVATE_CONNECT_TIMEOUT, parse_duration);
let outbound_connect_timeout = parse_deprecated(
strings, ENV_OUTBOUND_CONNECT_TIMEOUT, DEPRECATED_ENV_PUBLIC_CONNECT_TIMEOUT, parse_duration);

let inbound_accept_keepalive = parse(strings, ENV_INBOUND_ACCEPT_KEEPALIVE, parse_duration);
let outbound_accept_keepalive = parse(strings, ENV_OUTBOUND_ACCEPT_KEEPALIVE, parse_duration);

let inbound_connect_keepalive = parse(strings, ENV_INBOUND_CONNECT_KEEPALIVE, parse_duration);
let outbound_connect_keepalive = parse(strings, ENV_OUTBOUND_CONNECT_KEEPALIVE, parse_duration);

let inbound_disable_ports = parse(strings, ENV_INBOUND_PORTS_DISABLE_PROTOCOL_DETECTION, parse_port_set);
let outbound_disable_ports = parse(strings, ENV_OUTBOUND_PORTS_DISABLE_PROTOCOL_DETECTION, parse_port_set);

let inbound_router_capacity = parse(strings, ENV_INBOUND_ROUTER_CAPACITY, parse_number);
let outbound_router_capacity = parse(strings, ENV_OUTBOUND_ROUTER_CAPACITY, parse_number);

let inbound_router_max_idle_age = parse(strings, ENV_INBOUND_ROUTER_MAX_IDLE_AGE, parse_duration);
let outbound_router_max_idle_age = parse(strings, ENV_OUTBOUND_ROUTER_MAX_IDLE_AGE, parse_duration);

let destination_concurrency_limit =
parse(strings, ENV_DESTINATION_CLIENT_CONCURRENCY_LIMIT, parse_number);
let destination_get_suffixes =
parse(strings, ENV_DESTINATION_GET_SUFFIXES, parse_dns_suffixes);
let destination_profile_suffixes =
parse(strings, ENV_DESTINATION_PROFILE_SUFFIXES, parse_dns_suffixes);

let tls_trust_anchors = parse(strings, ENV_TLS_TRUST_ANCHORS, parse_path);
let tls_end_entity_cert = parse(strings, ENV_TLS_CERT, parse_path);
let tls_private_key = parse(strings, ENV_TLS_PRIVATE_KEY, parse_path);
let tls_pod_identity_template = strings.get(ENV_TLS_POD_IDENTITY);
let tls_controller_identity = strings.get(ENV_TLS_CONTROLLER_IDENTITY);

let resolv_conf_path = strings.get(ENV_RESOLV_CONF);

let metrics_retain_idle = parse(strings, ENV_METRICS_RETAIN_IDLE, parse_duration);

let dns_min_ttl = parse(strings, ENV_DNS_MIN_TTL, parse_duration);
let dns_max_ttl = parse(strings, ENV_DNS_MAX_TTL, parse_duration);

let dns_canonicalize_timeout = parse(strings, ENV_DNS_CANONICALIZE_TIMEOUT, parse_duration)?
.unwrap_or(DEFAULT_DNS_CANONICALIZE_TIMEOUT);

let pod_namespace = strings.get(ENV_POD_NAMESPACE).and_then(|maybe_value| {
// There cannot be a default pod namespace, and the pod namespace is required.
maybe_value.ok_or_else(|| {
Expand Down Expand Up @@ -462,6 +498,12 @@ impl<'a> TryFrom<&'a Strings> for Config {
outbound_connect_timeout: outbound_connect_timeout?
.unwrap_or(DEFAULT_OUTBOUND_CONNECT_TIMEOUT),

inbound_accept_keepalive: inbound_accept_keepalive?,
outbound_accept_keepalive: outbound_accept_keepalive?,

inbound_connect_keepalive: inbound_connect_keepalive?,
outbound_connect_keepalive: outbound_connect_keepalive?,

inbound_ports_disable_protocol_detection: inbound_disable_ports?
.unwrap_or_else(|| default_disable_ports_protocol_detection()),
outbound_ports_disable_protocol_detection: outbound_disable_ports?
Expand Down
7 changes: 0 additions & 7 deletions src/app/control.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fmt;
use std::time::Duration;

use svc;
use transport::tls;
Expand All @@ -10,23 +9,17 @@ pub struct Config {
addr: Addr,
tls_server_identity: Conditional<tls::Identity, tls::ReasonForNoTls>,
tls_config: tls::ConditionalClientConfig,
backoff: Duration,
connect_timeout: Duration,
}

impl Config {
pub fn new(
addr: Addr,
tls_server_identity: Conditional<tls::Identity, tls::ReasonForNoTls>,
backoff: Duration,
connect_timeout: Duration,
) -> Self {
Self {
addr,
tls_server_identity,
tls_config: Conditional::None(tls::ReasonForNoTls::Disabled),
backoff,
connect_timeout,
}
}

Expand Down
52 changes: 32 additions & 20 deletions src/app/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use svc::{
use tap;
use task;
use telemetry;
use transport::{self, connect, tls, BoundPort, Connection, GetOriginalDst};
use transport::{self, connect, keepalive, tls, BoundPort, Connection, GetOriginalDst};
use {Addr, Conditional};

use super::config::Config;
Expand Down Expand Up @@ -239,16 +239,18 @@ where
.as_ref()
.and_then(|s| s.controller_identity.clone().map(|id| id));

let control_config = control_host_and_port.map(|host_and_port| {
control::Config::new(
host_and_port,
tls_server_identity,
config.control_backoff_delay,
config.control_connect_timeout,
)
// If the controller is on localhost, use the inbound keepalive.
// If the controller is remote, use the outbound keepalive.
let keepalive = control_host_and_port.as_ref().and_then(|a| {
if a.is_loopback() {
config.inbound_connect_keepalive
} else {
config.outbound_connect_keepalive
}
});

let stack = connect::Stack::new()
.push(keepalive::connect::layer(keepalive))
.push(control::client::layer())
.push(control::resolve::layer(dns_resolver.clone()))
.push(reconnect::layer().with_fixed_backoff(config.control_backoff_delay))
Expand All @@ -267,10 +269,10 @@ where
// spawn a task on an executor when `make` is called. This is done
// lazily so that a default executor is available to spawn the
// background buffering task.
future::lazy(move || match control_config {
future::lazy(move || match control_host_and_port {
None => Ok(None),
Some(config) => stack
.make(&config)
Some(addr) => stack
.make(&control::Config::new(addr, tls_server_identity))
.map(Some)
.map_err(|e| error!("failed to build controller: {}", e)),
})
Expand All @@ -297,7 +299,9 @@ where
let profiles_client = ProfilesClient::new(controller, Duration::from_secs(3));

let outbound = {
use super::outbound::{discovery::Resolve, orig_proto_upgrade, server_id, Endpoint};
use super::outbound::{
discovery::Resolve, orig_proto_upgrade, server_id, Endpoint,
};
use proxy::{
canonicalize,
http::{balance, header_from_target, metrics, retry},
Expand All @@ -315,6 +319,7 @@ where
// Establishes connections to remote peers (for both TCP
// forwarding and HTTP proxying).
let connect = connect::Stack::new()
.push(keepalive::connect::layer(config.outbound_connect_keepalive))
.push(svc::timeout::layer(config.outbound_connect_timeout))
.push(transport_metrics.connect("outbound"));

Expand Down Expand Up @@ -363,7 +368,9 @@ where
// is retryable.
let dst_route_layer = phantom_data::layer()
.push(insert_target::layer())
.push(metrics::layer::<_, classify::Response>(retry_http_metrics.clone()))
.push(metrics::layer::<_, classify::Response>(
retry_http_metrics.clone(),
))
.push(retry::layer(retry_http_metrics))
.push(proxy::http::timeout::layer())
.push(metrics::layer::<_, classify::Response>(route_http_metrics))
Expand Down Expand Up @@ -461,7 +468,9 @@ where

// Instantiated for each TCP connection received from the local
// application (including HTTP connections).
let accept = transport_metrics.accept("outbound").bind(());
let accept = keepalive::accept::layer(config.outbound_accept_keepalive)
.push(transport_metrics.accept("outbound"))
.bind(());

serve(
"out",
Expand Down Expand Up @@ -489,6 +498,7 @@ where
// Establishes connections to the local application (for both
// TCP forwarding and HTTP proxying).
let connect = connect::Stack::new()
.push(keepalive::connect::layer(config.inbound_connect_keepalive))
.push(svc::timeout::layer(config.inbound_connect_timeout))
.push(transport_metrics.connect("inbound"))
.push(rewrite_loopback_addr::layer());
Expand Down Expand Up @@ -600,7 +610,9 @@ where

// As the inbound proxy accepts connections, we don't do any
// special transport-level handling.
let accept = transport_metrics.accept("inbound").bind(());
let accept = keepalive::accept::layer(config.inbound_accept_keepalive)
.push(transport_metrics.accept("inbound"))
.bind(());

serve(
"in",
Expand Down Expand Up @@ -683,17 +695,17 @@ fn serve<A, C, R, B, G>(
where
A: svc::Stack<proxy::server::Source, Error = Never> + Send + Clone + 'static,
A::Value: proxy::Accept<Connection>,
<A::Value as proxy::Accept<Connection>>::Io: Send + transport::Peek + 'static,
<A::Value as proxy::Accept<Connection>>::Io: fmt::Debug + Send + transport::Peek + 'static,
C: svc::Stack<connect::Target, Error = Never> + Send + Clone + 'static,
C::Value: connect::Connect + Send,
<C::Value as connect::Connect>::Connected: Send + 'static,
<C::Value as connect::Connect>::Connected: fmt::Debug + Send + 'static,
<C::Value as connect::Connect>::Future: Send + 'static,
<C::Value as connect::Connect>::Error: fmt::Debug + 'static,
R: svc::Stack<proxy::server::Source, Error = Never> + Send + Clone + 'static,
R::Value:
svc::Service<http::Request<proxy::http::Body>, Response = http::Response<B>>,
R::Value: svc::Service<http::Request<proxy::http::Body>, Response = http::Response<B>>,
R::Value: Send + 'static,
<R::Value as svc::Service<http::Request<proxy::http::Body>>>::Error: error::Error + Send + Sync + 'static,
<R::Value as svc::Service<http::Request<proxy::http::Body>>>::Error:
error::Error + Send + Sync + 'static,
<R::Value as svc::Service<http::Request<proxy::http::Body>>>::Future: Send + 'static,
B: hyper::body::Payload + Default + Send + 'static,
G: GetOriginalDst + Send + 'static,
Expand Down
2 changes: 1 addition & 1 deletion src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use tokio::io::{AsyncRead, AsyncWrite};

pub mod buffer;
pub mod canonicalize;
pub mod http;
pub mod grpc;
pub mod http;
pub mod limit;
mod protocol;
pub mod reconnect;
Expand Down
4 changes: 2 additions & 2 deletions src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ impl<A, C, R, B> Server<A, C, R, B>
where
A: Stack<Source, Error = Never> + Clone,
A::Value: Accept<Connection>,
<A::Value as Accept<Connection>>::Io: Send + Peek + 'static,
<A::Value as Accept<Connection>>::Io: fmt::Debug + Send + Peek + 'static,
C: Stack<connect::Target, Error = Never> + Clone,
C::Value: connect::Connect,
<C::Value as connect::Connect>::Connected: Send + 'static,
<C::Value as connect::Connect>::Connected: fmt::Debug + Send + 'static,
<C::Value as connect::Connect>::Future: Send + 'static,
<C::Value as connect::Connect>::Error: fmt::Debug + 'static,
R: Stack<Source, Error = Never> + Clone,
Expand Down
Loading

0 comments on commit 0fc35e2

Please sign in to comment.