Skip to content

Commit

Permalink
Upgrade rs-consul to latest http, hyper, opentelemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
kushudai committed Jun 25, 2024
1 parent e18ce70 commit 0264a20
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 46 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/format-code.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
jobs:
format-code:
runs-on: "ubuntu-latest"
container: rust:1.77
container: rust:1.79

steps:
- name: Checkout the code on merge
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:
jobs:
lint:
runs-on: "ubuntu-latest"
container: rust:1.77
container: rust:1.79

steps:
- uses: actions/checkout@v2
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
matrix:
features: ["", "--no-default-features --features rustls-native"]
runs-on: "ubuntu-latest"
container: rust:1.74
container: rust:1.79

steps:
- uses: actions/checkout@v2
Expand All @@ -26,7 +26,7 @@ jobs:
matrix:
features: ["", "--no-default-features --features rustls-native"]
runs-on: "ubuntu-latest"
container: rust:1.77
container: rust:1.79
services:
consul:
image: consul:1.11.11
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
jobs:
test:
runs-on: ubuntu-latest
container: rust:1.77
container: rust:1.79
services:
consul:
image: consul:1.11.11
Expand All @@ -25,7 +25,7 @@ jobs:

dry-run:
runs-on: ubuntu-latest
container: rust:1.77
container: rust:1.79

steps:
- uses: actions/checkout@v2
Expand All @@ -36,7 +36,7 @@ jobs:
publish:
needs: [test, dry-run]
runs-on: ubuntu-latest
container: rust:1.74
container: rust:1.79
environment: crates-publish

steps:
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

## 0.7.0 - 2024-06-25

### Changed

- `opentelemetry` updated to version `0.23` from `0.22`.
- `http` updated to version `1.0` from `0.2`.
- `hyper` updated to version `1.0` from `0.14`.
- `hyper-rustls` updated to version `0.27` from `0.24`.

## 0.6.0 - 2024-04-01

### Changed
Expand Down
12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rs-consul"
version = "0.6.0"
version = "0.7.0"
authors = ["Roblox"]
edition = "2021"
description = "This crate provides access to a set of strongly typed apis to interact with consul (https://www.consul.io/)"
Expand All @@ -20,11 +20,13 @@ trace = ["dep:opentelemetry"]
[dependencies]
base64 = "0.22"
futures = "0.3"
http = "0.2"
hyper = { version = "0.14", features = ["full"] }
hyper-rustls = { version = "0.24" }
http = "1"
http-body-util = "0.1"
hyper = { version = "1", features = ["full"] }
hyper-rustls = { version = "0.27" }
hyper-util = { version = "0.1", features = ["client", "client-legacy", "tokio", "http2"] }
lazy_static = { version = "1", optional = true }
opentelemetry = { version = "0.22", optional = true }
opentelemetry = { version = "0.23", optional = true }
prometheus = { version = "0.13", optional = true }
quick-error = "2"
serde = { version = "1.0", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.77
1.79
1 change: 0 additions & 1 deletion src/hyper_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#![cfg(feature = "trace")]
use hyper::Version;
use opentelemetry::{
global::{BoxedSpan, BoxedTracer},
Expand Down
92 changes: 60 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,18 @@ SOFTWARE.
//! This crate provides access to a set of strongly typed apis to interact with consul (https://www.consul.io/)
#![deny(missing_docs)]

use http_body_util::BodyExt;
use std::collections::HashMap;
use std::convert::Infallible;
use std::time::{Duration, Instant};
use std::{env, str::Utf8Error};

use base64::Engine;
use hyper::{body::Buf, client::HttpConnector, Body, Method};
use http_body_util::combinators::BoxBody;
use http_body_util::{Empty, Full};
use hyper::body::Bytes;
use hyper::{body::Buf, Method};
use hyper_util::client::legacy::{connect::HttpConnector, Builder, Client};
#[cfg(any(feature = "rustls-native", feature = "rustls-webpki"))]
#[cfg(feature = "metrics")]
use lazy_static::lazy_static;
Expand Down Expand Up @@ -66,7 +72,7 @@ quick_error! {
/// The request was invalid and could not be converted into a proper http request.
RequestError(err: http::Error) {}
/// The consul server response could not be converted into a proper http response.
ResponseError(err: hyper::Error) {}
ResponseError(err: hyper_util::client::legacy::Error) {}
/// The consul server response was invalid.
InvalidResponse(err: hyper::Error) {}
/// The consul server response could not be deserialized from json.
Expand Down Expand Up @@ -151,7 +157,7 @@ const GET_SESSION_METHOD_NAME: &str = "get_session";
pub(crate) type Result<T> = std::result::Result<T, ConsulError>;

/// The config necessary to create a new consul client.
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Config {
/// The address of the consul server. This must include the protocol to connect over eg. http or https.
pub address: String,
Expand All @@ -160,7 +166,12 @@ pub struct Config {

/// The hyper builder for the internal http client.
#[serde(skip)]
pub hyper_builder: hyper::client::Builder,
#[serde(default = "default_builder")]
pub hyper_builder: hyper_util::client::legacy::Builder,
}

fn default_builder() -> Builder {
Builder::new(hyper_util::rt::TokioExecutor::new())
}

impl Config {
Expand All @@ -176,7 +187,7 @@ impl Config {
Config {
address: addr,
token: Some(token),
hyper_builder: Default::default(),
hyper_builder: Builder::new(hyper_util::rt::TokioExecutor::new()),
}
}
}
Expand Down Expand Up @@ -224,42 +235,44 @@ impl Drop for Lock<'_> {
#[derive(Debug)]
/// This struct defines the consul client and allows access to the consul api via method syntax.
pub struct Consul {
https_client: hyper::Client<hyper_rustls::HttpsConnector<HttpConnector>, Body>,
https_client: Client<hyper_rustls::HttpsConnector<HttpConnector>, BoxBody<Bytes, Infallible>>,
config: Config,
#[cfg(feature = "trace")]
tracer: BoxedTracer,
}

fn https_connector() -> hyper_rustls::HttpsConnector<HttpConnector> {
fn https_connector() -> Result<hyper_rustls::HttpsConnector<HttpConnector>> {
#[cfg(feature = "rustls-webpki")]
return hyper_rustls::HttpsConnectorBuilder::new()
return Ok(hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()
.https_or_http()
.enable_http1()
.build();
.build());
#[allow(unreachable_code)]
// Clippy doesn't realize if the feature is disabled, this code would execute.
hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
Ok(hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()?
.https_or_http()
.enable_http1()
.build()
.build())
}

impl Consul {
/// Creates a new instance of [`Consul`](consul::Consul).
/// This is the entry point for this crate.
/// #Arguments:
/// - [Config](consul::Config)
pub fn new(config: Config) -> Self {
let https = https_connector();
let https_client = config.hyper_builder.build::<_, hyper::Body>(https);
Consul {
pub fn new(config: Config) -> Result<Self> {
let https = https_connector()?;
let https_client = config
.hyper_builder
.build::<_, BoxBody<Bytes, Infallible>>(https);
Ok(Consul {
https_client,
config,
#[cfg(feature = "trace")]
tracer: global::tracer("consul"),
}
})
}

/// Reads a key from Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#read-key) for more information.
Expand All @@ -270,7 +283,12 @@ impl Consul {
pub async fn read_key(&self, request: ReadKeyRequest<'_>) -> Result<Vec<ReadKeyResponse>> {
let req = self.build_read_key_req(request);
let (mut response_body, _index) = self
.execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME)
.execute_request(
req,
BoxBody::new(http_body_util::Empty::<Bytes>::new()),
None,
READ_KEY_METHOD_NAME,
)
.await?;
let bytes = response_body.copy_to_bytes(response_body.remaining());
serde_json::from_slice::<Vec<ReadKeyResponse>>(&bytes)
Expand Down Expand Up @@ -310,7 +328,7 @@ impl Consul {
let (mut response_body, index) = self
.execute_request(
req,
Body::from(value),
BoxBody::new(Full::<Bytes>::new(Bytes::from(value))),
None,
CREATE_OR_UPDATE_KEY_METHOD_NAME,
)
Expand Down Expand Up @@ -398,7 +416,12 @@ impl Consul {
url = add_namespace_and_datacenter(url, request.namespace, request.datacenter);
req = req.uri(url);
let (mut response_body, _index) = self
.execute_request(req, hyper::Body::empty(), None, DELETE_KEY_METHOD_NAME)
.execute_request(
req,
BoxBody::new(Empty::<Bytes>::new()),
None,
DELETE_KEY_METHOD_NAME,
)
.await?;
let bytes = response_body.copy_to_bytes(response_body.remaining());
serde_json::from_slice(&bytes).map_err(ConsulError::ResponseDeserializationFailed)
Expand Down Expand Up @@ -444,7 +467,7 @@ impl Consul {
let (_watch, index) = self
.execute_request(
lock_index_req,
hyper::Body::empty(),
BoxBody::new(http_body_util::Empty::<Bytes>::new()),
None,
GET_LOCK_METHOD_NAME,
)
Expand Down Expand Up @@ -486,7 +509,7 @@ impl Consul {
let payload = serde_json::to_string(payload).map_err(ConsulError::InvalidRequest)?;
self.execute_request(
request,
payload.into(),
BoxBody::new(Full::<Bytes>::new(Bytes::from(payload.into_bytes()))),
Some(Duration::from_secs(5)),
REGISTER_ENTITY_METHOD_NAME,
)
Expand All @@ -506,7 +529,7 @@ impl Consul {
let payload = serde_json::to_string(payload).map_err(ConsulError::InvalidRequest)?;
self.execute_request(
request,
payload.into(),
BoxBody::new(Full::<Bytes>::new(Bytes::from(payload.into_bytes()))),
Some(Duration::from_secs(5)),
DEREGISTER_ENTITY_METHOD_NAME,
)
Expand Down Expand Up @@ -534,7 +557,7 @@ impl Consul {
let (mut response_body, index) = self
.execute_request(
request,
hyper::Body::empty(),
BoxBody::new(Empty::<Bytes>::new()),
query_opts.timeout,
GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME,
)
Expand Down Expand Up @@ -566,7 +589,7 @@ impl Consul {
let (mut response_body, index) = self
.execute_request(
req,
hyper::Body::empty(),
BoxBody::new(Empty::<Bytes>::new()),
query_opts.timeout,
GET_SERVICE_NODES_METHOD_NAME,
)
Expand Down Expand Up @@ -684,7 +707,9 @@ impl Consul {
let (mut response_body, _index) = self
.execute_request(
req,
hyper::Body::from(create_session_json),
BoxBody::new(Full::<Bytes>::new(Bytes::from(
create_session_json.into_bytes(),
))),
None,
GET_SESSION_METHOD_NAME,
)
Expand Down Expand Up @@ -718,7 +743,7 @@ impl Consul {
async fn execute_request<'a>(
&self,
req: http::request::Builder,
body: hyper::Body,
body: BoxBody<Bytes, Infallible>,
duration: Option<std::time::Duration>,
request_name: &str,
) -> Result<(Box<dyn Buf>, u64)> {
Expand Down Expand Up @@ -764,9 +789,12 @@ impl Consul {
if status != hyper::StatusCode::OK {
record_failure_metric_if_enabled(&method, request_name);

let mut response_body = hyper::body::aggregate(response.into_body())
let mut response_body = response
.into_body()
.collect()
.await
.map_err(|e| ConsulError::UnexpectedResponseCode(status, e.to_string()))?;
.map_err(|e| ConsulError::UnexpectedResponseCode(status, e.to_string()))?
.aggregate();
let bytes = response_body.copy_to_bytes(response_body.remaining());
let resp = std::str::from_utf8(&bytes)
.map_err(|e| ConsulError::UnexpectedResponseCode(status, e.to_string()))?;
Expand All @@ -780,7 +808,7 @@ impl Consul {
None => 0,
};

match hyper::body::aggregate(response.into_body()).await {
match response.into_body().collect().await.map(|b| b.aggregate()) {
Ok(body) => Ok((Box::new(body), index)),
Err(e) => {
record_failure_metric_if_enabled(&method, request_name);
Expand Down Expand Up @@ -974,7 +1002,7 @@ mod tests {
.iter()
.map(|sn| sn.service.address.clone())
.collect();
let expected_addresses = vec![
let expected_addresses = [
"1.1.1.1".to_string(),
"2.2.2.2".to_string(),
"3.3.3.3".to_string(),
Expand Down Expand Up @@ -1261,7 +1289,7 @@ mod tests {

fn get_client() -> Consul {
let conf: Config = Config::from_env();
Consul::new(conf)
Consul::new(conf).unwrap()
}

async fn create_or_update_key_value(
Expand Down

0 comments on commit 0264a20

Please sign in to comment.