Skip to content

Commit

Permalink
Merge pull request #51 from dfinity/igornovg/new-agent
Browse files Browse the repository at this point in the history
chore(BOUN-1299): bump ic-agent
  • Loading branch information
blind-oracle authored Nov 20, 2024
2 parents f3c6d4d + ebb2669 commit 7892a8c
Show file tree
Hide file tree
Showing 15 changed files with 461 additions and 913 deletions.
502 changes: 280 additions & 222 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ clickhouse = { version = "0.13.1", features = [
"inserter",
"rustls-tls",
] }
console-subscriber = "0.4.1"
console-subscriber = { version = "0.4.1", optional = true }
ctrlc = { version = "3.4.5", features = ["termination"] }
derive-new = "0.7.0"
fqdn = "0.4.1"
Expand All @@ -44,16 +44,17 @@ http-body = "1.0.1"
http-body-util = "0.1.2"
humantime = "2.1.0"
hyper-util = "0.1.10"
ic-agent = { version = "0.37.1", features = ["reqwest"] }
ic-agent = { version = "0.39.1", features = [
"ring",
"_internal_dynamic-routing",
] }
ic-bn-lib = { git = "https://github.com/dfinity/ic-bn-lib", rev = "526d34d15cfbf369d8baf2dae9932aa18d570a1d" }
ic-http-gateway = { git = "https://github.com/dfinity/http-gateway", tag = "0.1.0-b0" }
ic-http-gateway = { git = "https://github.com/dfinity/http-gateway", tag = "0.1.0-b2" }
itertools = "0.13.0"
lazy_static = "1.5.0"
maxminddb = "0.24.0"
mockall = "0.13.0"
moka = { version = "0.12.8", features = ["sync", "future"] }
ocsp-stapler = "0.4.1"
once_cell = "1.20.2"
prometheus = "0.13.4"
rand = "0.8.5"
regex = "1.11.1"
Expand Down Expand Up @@ -98,7 +99,11 @@ webpki-roots = "0.26.6"
x509-parser = "0.16.0"
zstd = "0.13.2"

[features]
tokio_console = ["console-subscriber"]

[dev-dependencies]
mockall = "0.13.0"
hex-literal = "0.4.1"
hyper = "1.5.0"
criterion = { version = "0.5.1", features = ["async_tokio"] }
Expand All @@ -113,8 +118,3 @@ panic = "abort"

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

[patch.crates-io]
ic-agent = { package = "ic-agent", git = "https://github.com/dfinity/agent-rs", branch = "dynamic_route", features = [
"reqwest",
] }
21 changes: 18 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,24 @@ pub struct Ic {
#[clap(env, long)]
pub ic_root_key: Option<PathBuf>,

/// Maximum number of request retries for connection failures.
/// Maximum number of request retries for connection failures and HTTP code 429.
/// First attempt is not counted.
#[clap(env, long, default_value = "5")]
pub ic_max_request_retries: u32,
pub ic_request_retries: usize,

/// How long to wait between retries.
/// With each retry this duration will be doubled.
/// E.g. first delay 25ms, next 50ms and so on.
#[clap(env, long, default_value = "25ms", value_parser = parse_duration)]
pub ic_request_retry_interval: Duration,

/// Max request body size to allow from the client
#[clap(env, long, default_value = "10MB", value_parser = parse_size_usize)]
pub ic_request_max_size: usize,

/// Max response size to allow from the IC
#[clap(env, long, default_value = "3MB", value_parser = parse_size_usize)]
pub ic_response_max_size: usize,

/// Disable response verification for the IC requests.
#[clap(env, long)]
Expand Down Expand Up @@ -300,7 +315,7 @@ pub struct Log {

/// Enables the Tokio console.
/// It's listening on 127.0.0.1:6669
#[cfg(tokio_unstable)]
#[cfg(all(tokio_unstable, feature = "tokio_console"))]
#[clap(env, long)]
pub log_tokio_console: bool,

Expand Down
2 changes: 1 addition & 1 deletion src/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub fn setup_logging(cli: &Log) -> Result<(), Error> {
None
};

#[cfg(tokio_unstable)]
#[cfg(all(tokio_unstable, feature = "tokio_console"))]
let tokio_console_layer = if cli.log_tokio_console {
Some(console_subscriber::spawn())
} else {
Expand Down
7 changes: 3 additions & 4 deletions src/routing/ic/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use ic_http_gateway::{CanisterRequest, HttpGatewayClient, HttpGatewayRequestArgs
use crate::routing::{
error_cause::ErrorCause,
ic::{
transport::{Context, CONTEXT},
http_service::{Context, CONTEXT},
IcResponseStatus,
},
middleware::request_id::RequestId,
Expand All @@ -22,13 +22,12 @@ use crate::routing::{

use super::{BNRequestMetadata, BNResponseMetadata};

const MAX_REQUEST_BODY_SIZE: usize = 10 * 1_048_576;

#[derive(derive_new::new)]
pub struct HandlerState {
client: HttpGatewayClient,
verify_response: bool,
body_read_timeout: Duration,
request_max_size: usize,
}

// Main HTTP->IC request handler
Expand All @@ -46,7 +45,7 @@ pub async fn handler(

let (parts, body) = request.into_parts();

let body = buffer_body(body, MAX_REQUEST_BODY_SIZE, state.body_read_timeout).await;
let body = buffer_body(body, state.request_max_size, state.body_read_timeout).await;
let body = match body {
Ok(v) => v,
Err(e) => {
Expand Down
2 changes: 1 addition & 1 deletion src/routing/ic/health_check.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use http::{Method, StatusCode};
use ic_agent::agent::http_transport::dynamic_routing::{
use ic_agent::agent::route_provider::dynamic_routing::{
dynamic_route_provider::DynamicRouteProviderError,
health_check::{HealthCheck, HealthCheckStatus},
node::Node,
Expand Down
111 changes: 111 additions & 0 deletions src/routing/ic/http_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use std::{cell::RefCell, sync::Arc, time::Duration};

use async_trait::async_trait;
use http::StatusCode;
use ic_agent::{agent::HttpService, AgentError};
use ic_bn_lib::http::Client as HttpClient;
use reqwest::{
header::{HeaderMap, HeaderValue},
Request, Response,
};
use tokio::task_local;

pub struct Context {
pub hostname: Option<String>,
pub headers_in: HeaderMap<HeaderValue>,
pub headers_out: HeaderMap<HeaderValue>,
}

impl Context {
pub fn new() -> RefCell<Self> {
RefCell::new(Self {
hostname: None,
headers_in: HeaderMap::new(),
headers_out: HeaderMap::new(),
})
}
}

task_local! {
pub static CONTEXT: RefCell<Context>;
}

/// Service that executes requests on IC-Agent's behalf
#[derive(Debug, derive_new::new)]
pub struct AgentHttpService {
client: Arc<dyn HttpClient>,
retry_interval: Duration,
}

impl AgentHttpService {
async fn execute(&self, mut request: Request) -> Result<Response, reqwest::Error> {
let read_state = request.url().path().ends_with("/read_state");

// Add HTTP headers if requested
let _ = CONTEXT.try_with(|x| {
let mut ctx = x.borrow_mut();
ctx.hostname = Some(request.url().authority().to_string());

for (k, v) in &ctx.headers_out {
request.headers_mut().insert(k, v.clone());
}
});

let response = self.client.execute(request).await?;

// Add response headers.
// Don't do it for the read_state calls because for a single incoming request
// the agent can do several outgoing requests (e.g. read_state to get keys and then query)
// and we need only one set of response headers.
if !read_state {
let _ = CONTEXT.try_with(|x| {
let mut ctx = x.borrow_mut();

for (k, v) in response.headers() {
ctx.headers_in.insert(k, v.clone());
}
});
}

Ok(response)
}
}

#[async_trait]
impl HttpService for AgentHttpService {
async fn call<'a>(
&'a self,
req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
max_retries: usize,
) -> Result<Response, AgentError> {
let mut retry = 0;
let mut interval = self.retry_interval;

loop {
// TODO should we retry on Agent's request generation failure?
let request = req()?;

match self.execute(request).await {
Ok(v) => {
// Retry only on 429 for now
let should_retry = v.status() == StatusCode::TOO_MANY_REQUESTS && retry < max_retries;
if !should_retry {
return Ok(v);
}
}

Err(e) => {
// Don't retry on any errors except connect for now
if !e.is_connect() || retry >= max_retries {
return Err(AgentError::TransportError(e));
}
}
}

// Wait & backoff
tokio::time::sleep(interval).await;
retry += 1;
interval *= 2;
}
}
}
28 changes: 17 additions & 11 deletions src/routing/ic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@

pub mod handler;
pub mod health_check;
pub mod http_service;
pub mod nodes_fetcher;
pub mod route_provider;
pub mod transport;

use std::{fs, sync::Arc};

use anyhow::{Context, Error};
use http::{header::HeaderName, HeaderMap};
use http_body_util::Either;
use ic_agent::agent::http_transport::route_provider::RouteProvider;
use ic_agent::agent::route_provider::RouteProvider;
use ic_bn_lib::http::{
headers::{
X_IC_CACHE_BYPASS_REASON, X_IC_CACHE_STATUS, X_IC_CANISTER_ID_CBOR, X_IC_ERROR_CAUSE,
Expand All @@ -21,7 +21,9 @@ use ic_bn_lib::http::{
},
Client as HttpClient,
};
use ic_http_gateway::{HttpGatewayClient, HttpGatewayResponse, HttpGatewayResponseMetadata};
use ic_http_gateway::{
HttpGatewayClient, HttpGatewayClientBuilder, HttpGatewayResponse, HttpGatewayResponseMetadata,
};

use crate::Cli;

Expand Down Expand Up @@ -99,25 +101,29 @@ pub fn setup(
http_client: Arc<dyn HttpClient>,
route_provider: Arc<dyn RouteProvider>,
) -> Result<HttpGatewayClient, Error> {
let transport = transport::ReqwestTransport::create_with_client_route(
route_provider,
let http_service = Arc::new(http_service::AgentHttpService::new(
http_client,
cli.ic.ic_max_request_retries,
);
cli.ic.ic_request_retry_interval,
));

let agent = ic_agent::Agent::builder()
.with_transport(transport)
.with_arc_http_middleware(http_service)
.with_max_response_body_size(cli.ic.ic_response_max_size)
.with_max_tcp_error_retries(cli.ic.ic_request_retries)
.with_arc_route_provider(route_provider)
.with_verify_query_signatures(cli.ic.ic_enable_replica_signed_queries)
.build()?;
.build()
.context("unable to build Agent")?;

if let Some(v) = &cli.ic.ic_root_key {
let key = fs::read(v).context("unable to read IC root key")?;
agent.set_root_key(key);
}

let client = ic_http_gateway::HttpGatewayClientBuilder::new()
let client = HttpGatewayClientBuilder::new()
.with_agent(agent)
.build()?;
.build()
.context("unable to build HTTP gateway client")?;

Ok(client)
}
19 changes: 7 additions & 12 deletions src/routing/ic/nodes_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use async_trait::async_trait;
use candid::Principal;
use ic_agent::{
agent::http_transport::{
dynamic_routing::{
dynamic_route_provider::DynamicRouteProviderError, node::Node, nodes_fetch::Fetch,
},
ReqwestTransport,
agent::route_provider::dynamic_routing::{
dynamic_route_provider::DynamicRouteProviderError, node::Node, nodes_fetch::Fetch,
},
Agent,
};
Expand Down Expand Up @@ -39,23 +36,20 @@ impl NodesFetcher {
#[async_trait]
impl Fetch for NodesFetcher {
async fn fetch(&self, url: Url) -> Result<Vec<Node>, DynamicRouteProviderError> {
let transport = ReqwestTransport::create_with_client(url, self.http_client.clone())
.map_err(|err| {
DynamicRouteProviderError::NodesFetchError(format!(
"Failed to build transport: {err}"
))
})?;
let agent = Agent::builder()
.with_transport(transport)
.with_http_client(self.http_client.clone())
.with_url(url)
.build()
.map_err(|err| {
DynamicRouteProviderError::NodesFetchError(format!(
"Failed to build the agent: {err}"
))
})?;

if let Some(key) = self.root_key.clone() {
agent.set_root_key(key);
}

let api_bns = agent
.fetch_api_boundary_nodes_by_subnet_id(self.subnet_id)
.await
Expand All @@ -64,6 +58,7 @@ impl Fetch for NodesFetcher {
"Failed to fetch API nodes: {err}"
))
})?;

// If some API BNs have invalid domain names, they are discarded.
let nodes = api_bns
.iter()
Expand Down
4 changes: 2 additions & 2 deletions src/routing/ic/route_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::sync::Arc;
use anyhow::anyhow;
use candid::Principal;
use ic_agent::agent::http_transport::reqwest_transport::reqwest::Client as AgentClient;
use ic_agent::agent::http_transport::{
use ic_agent::agent::route_provider::{
dynamic_routing::{
dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
snapshot::latency_based_routing::LatencyRoutingSnapshot,
},
route_provider::{RoundRobinRouteProvider, RouteProvider},
RoundRobinRouteProvider, RouteProvider,
};
use tracing::info;
use url::Url;
Expand Down
Loading

0 comments on commit 7892a8c

Please sign in to comment.