From 0264a20191d0d28173b9bb1cab6e076934e3f320 Mon Sep 17 00:00:00 2001 From: Kushagra Udai Date: Mon, 24 Jun 2024 23:40:22 -0700 Subject: [PATCH] Upgrade rs-consul to latest http, hyper, opentelemetry --- .github/workflows/format-code.yml | 2 +- .github/workflows/lint.yml | 2 +- .github/workflows/main.yml | 4 +- .github/workflows/publish.yml | 6 +- CHANGELOG.md | 9 +++ Cargo.toml | 12 ++-- rust-toolchain | 2 +- src/hyper_wrapper.rs | 1 - src/lib.rs | 92 ++++++++++++++++++++----------- 9 files changed, 84 insertions(+), 46 deletions(-) diff --git a/.github/workflows/format-code.yml b/.github/workflows/format-code.yml index 1c5251f..42693b3 100644 --- a/.github/workflows/format-code.yml +++ b/.github/workflows/format-code.yml @@ -8,7 +8,7 @@ on: jobs: format-code: runs-on: "ubuntu-latest" - container: rust:1.77 + container: rust:1.79 steps: - name: Checkout the code on merge diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 297fb62..9e0d1d5 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -9,7 +9,7 @@ on: jobs: lint: runs-on: "ubuntu-latest" - container: rust:1.77 + container: rust:1.79 steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 911ca6f..5d87f53 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -13,7 +13,7 @@ jobs: matrix: features: ["", "--no-default-features --features rustls-native"] runs-on: "ubuntu-latest" - container: rust:1.74 + container: rust:1.79 steps: - uses: actions/checkout@v2 @@ -26,7 +26,7 @@ jobs: matrix: features: ["", "--no-default-features --features rustls-native"] runs-on: "ubuntu-latest" - container: rust:1.77 + container: rust:1.79 services: consul: image: consul:1.11.11 diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 9841d9e..e2a6b3d 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -6,7 +6,7 @@ on: jobs: test: runs-on: ubuntu-latest - container: rust:1.77 + container: rust:1.79 services: consul: image: consul:1.11.11 @@ -25,7 +25,7 @@ jobs: dry-run: runs-on: ubuntu-latest - container: rust:1.77 + container: rust:1.79 steps: - uses: actions/checkout@v2 @@ -36,7 +36,7 @@ jobs: publish: needs: [test, dry-run] runs-on: ubuntu-latest - container: rust:1.74 + container: rust:1.79 environment: crates-publish steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index 8afc7bc..be5de48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## Unreleased +## 0.7.0 - 2024-06-25 + +### Changed + +- `opentelemetry` updated to version `0.23` from `0.22`. +- `http` updated to version `1.0` from `0.2`. +- `hyper` updated to version `1.0` from `0.14`. +- `hyper-rustls` updated to version `0.27` from `0.24`. + ## 0.6.0 - 2024-04-01 ### Changed diff --git a/Cargo.toml b/Cargo.toml index c8a9841..c1c15fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rs-consul" -version = "0.6.0" +version = "0.7.0" authors = ["Roblox"] edition = "2021" description = "This crate provides access to a set of strongly typed apis to interact with consul (https://www.consul.io/)" @@ -20,11 +20,13 @@ trace = ["dep:opentelemetry"] [dependencies] base64 = "0.22" futures = "0.3" -http = "0.2" -hyper = { version = "0.14", features = ["full"] } -hyper-rustls = { version = "0.24" } +http = "1" +http-body-util = "0.1" +hyper = { version = "1", features = ["full"] } +hyper-rustls = { version = "0.27" } +hyper-util = { version = "0.1", features = ["client", "client-legacy", "tokio", "http2"] } lazy_static = { version = "1", optional = true } -opentelemetry = { version = "0.22", optional = true } +opentelemetry = { version = "0.23", optional = true } prometheus = { version = "0.13", optional = true } quick-error = "2" serde = { version = "1.0", features = ["derive"] } diff --git a/rust-toolchain b/rust-toolchain index f23daf4..c408301 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -1.77 \ No newline at end of file +1.79 \ No newline at end of file diff --git a/src/hyper_wrapper.rs b/src/hyper_wrapper.rs index b51425f..9688ede 100644 --- a/src/hyper_wrapper.rs +++ b/src/hyper_wrapper.rs @@ -21,7 +21,6 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -#![cfg(feature = "trace")] use hyper::Version; use opentelemetry::{ global::{BoxedSpan, BoxedTracer}, diff --git a/src/lib.rs b/src/lib.rs index 343bd64..a8d21d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,12 +27,18 @@ SOFTWARE. //! This crate provides access to a set of strongly typed apis to interact with consul (https://www.consul.io/) #![deny(missing_docs)] +use http_body_util::BodyExt; use std::collections::HashMap; +use std::convert::Infallible; use std::time::{Duration, Instant}; use std::{env, str::Utf8Error}; use base64::Engine; -use hyper::{body::Buf, client::HttpConnector, Body, Method}; +use http_body_util::combinators::BoxBody; +use http_body_util::{Empty, Full}; +use hyper::body::Bytes; +use hyper::{body::Buf, Method}; +use hyper_util::client::legacy::{connect::HttpConnector, Builder, Client}; #[cfg(any(feature = "rustls-native", feature = "rustls-webpki"))] #[cfg(feature = "metrics")] use lazy_static::lazy_static; @@ -66,7 +72,7 @@ quick_error! { /// The request was invalid and could not be converted into a proper http request. RequestError(err: http::Error) {} /// The consul server response could not be converted into a proper http response. - ResponseError(err: hyper::Error) {} + ResponseError(err: hyper_util::client::legacy::Error) {} /// The consul server response was invalid. InvalidResponse(err: hyper::Error) {} /// The consul server response could not be deserialized from json. @@ -151,7 +157,7 @@ const GET_SESSION_METHOD_NAME: &str = "get_session"; pub(crate) type Result = std::result::Result; /// The config necessary to create a new consul client. -#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Config { /// The address of the consul server. This must include the protocol to connect over eg. http or https. pub address: String, @@ -160,7 +166,12 @@ pub struct Config { /// The hyper builder for the internal http client. #[serde(skip)] - pub hyper_builder: hyper::client::Builder, + #[serde(default = "default_builder")] + pub hyper_builder: hyper_util::client::legacy::Builder, +} + +fn default_builder() -> Builder { + Builder::new(hyper_util::rt::TokioExecutor::new()) } impl Config { @@ -176,7 +187,7 @@ impl Config { Config { address: addr, token: Some(token), - hyper_builder: Default::default(), + hyper_builder: Builder::new(hyper_util::rt::TokioExecutor::new()), } } } @@ -224,26 +235,26 @@ impl Drop for Lock<'_> { #[derive(Debug)] /// This struct defines the consul client and allows access to the consul api via method syntax. pub struct Consul { - https_client: hyper::Client, Body>, + https_client: Client, BoxBody>, config: Config, #[cfg(feature = "trace")] tracer: BoxedTracer, } -fn https_connector() -> hyper_rustls::HttpsConnector { +fn https_connector() -> Result> { #[cfg(feature = "rustls-webpki")] - return hyper_rustls::HttpsConnectorBuilder::new() + return Ok(hyper_rustls::HttpsConnectorBuilder::new() .with_webpki_roots() .https_or_http() .enable_http1() - .build(); + .build()); #[allow(unreachable_code)] // Clippy doesn't realize if the feature is disabled, this code would execute. - hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() + Ok(hyper_rustls::HttpsConnectorBuilder::new() + .with_native_roots()? .https_or_http() .enable_http1() - .build() + .build()) } impl Consul { @@ -251,15 +262,17 @@ impl Consul { /// This is the entry point for this crate. /// #Arguments: /// - [Config](consul::Config) - pub fn new(config: Config) -> Self { - let https = https_connector(); - let https_client = config.hyper_builder.build::<_, hyper::Body>(https); - Consul { + pub fn new(config: Config) -> Result { + let https = https_connector()?; + let https_client = config + .hyper_builder + .build::<_, BoxBody>(https); + Ok(Consul { https_client, config, #[cfg(feature = "trace")] tracer: global::tracer("consul"), - } + }) } /// Reads a key from Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#read-key) for more information. @@ -270,7 +283,12 @@ impl Consul { pub async fn read_key(&self, request: ReadKeyRequest<'_>) -> Result> { let req = self.build_read_key_req(request); let (mut response_body, _index) = self - .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) + .execute_request( + req, + BoxBody::new(http_body_util::Empty::::new()), + None, + READ_KEY_METHOD_NAME, + ) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); serde_json::from_slice::>(&bytes) @@ -310,7 +328,7 @@ impl Consul { let (mut response_body, index) = self .execute_request( req, - Body::from(value), + BoxBody::new(Full::::new(Bytes::from(value))), None, CREATE_OR_UPDATE_KEY_METHOD_NAME, ) @@ -398,7 +416,12 @@ impl Consul { url = add_namespace_and_datacenter(url, request.namespace, request.datacenter); req = req.uri(url); let (mut response_body, _index) = self - .execute_request(req, hyper::Body::empty(), None, DELETE_KEY_METHOD_NAME) + .execute_request( + req, + BoxBody::new(Empty::::new()), + None, + DELETE_KEY_METHOD_NAME, + ) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); serde_json::from_slice(&bytes).map_err(ConsulError::ResponseDeserializationFailed) @@ -444,7 +467,7 @@ impl Consul { let (_watch, index) = self .execute_request( lock_index_req, - hyper::Body::empty(), + BoxBody::new(http_body_util::Empty::::new()), None, GET_LOCK_METHOD_NAME, ) @@ -486,7 +509,7 @@ impl Consul { let payload = serde_json::to_string(payload).map_err(ConsulError::InvalidRequest)?; self.execute_request( request, - payload.into(), + BoxBody::new(Full::::new(Bytes::from(payload.into_bytes()))), Some(Duration::from_secs(5)), REGISTER_ENTITY_METHOD_NAME, ) @@ -506,7 +529,7 @@ impl Consul { let payload = serde_json::to_string(payload).map_err(ConsulError::InvalidRequest)?; self.execute_request( request, - payload.into(), + BoxBody::new(Full::::new(Bytes::from(payload.into_bytes()))), Some(Duration::from_secs(5)), DEREGISTER_ENTITY_METHOD_NAME, ) @@ -534,7 +557,7 @@ impl Consul { let (mut response_body, index) = self .execute_request( request, - hyper::Body::empty(), + BoxBody::new(Empty::::new()), query_opts.timeout, GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME, ) @@ -566,7 +589,7 @@ impl Consul { let (mut response_body, index) = self .execute_request( req, - hyper::Body::empty(), + BoxBody::new(Empty::::new()), query_opts.timeout, GET_SERVICE_NODES_METHOD_NAME, ) @@ -684,7 +707,9 @@ impl Consul { let (mut response_body, _index) = self .execute_request( req, - hyper::Body::from(create_session_json), + BoxBody::new(Full::::new(Bytes::from( + create_session_json.into_bytes(), + ))), None, GET_SESSION_METHOD_NAME, ) @@ -718,7 +743,7 @@ impl Consul { async fn execute_request<'a>( &self, req: http::request::Builder, - body: hyper::Body, + body: BoxBody, duration: Option, request_name: &str, ) -> Result<(Box, u64)> { @@ -764,9 +789,12 @@ impl Consul { if status != hyper::StatusCode::OK { record_failure_metric_if_enabled(&method, request_name); - let mut response_body = hyper::body::aggregate(response.into_body()) + let mut response_body = response + .into_body() + .collect() .await - .map_err(|e| ConsulError::UnexpectedResponseCode(status, e.to_string()))?; + .map_err(|e| ConsulError::UnexpectedResponseCode(status, e.to_string()))? + .aggregate(); let bytes = response_body.copy_to_bytes(response_body.remaining()); let resp = std::str::from_utf8(&bytes) .map_err(|e| ConsulError::UnexpectedResponseCode(status, e.to_string()))?; @@ -780,7 +808,7 @@ impl Consul { None => 0, }; - match hyper::body::aggregate(response.into_body()).await { + match response.into_body().collect().await.map(|b| b.aggregate()) { Ok(body) => Ok((Box::new(body), index)), Err(e) => { record_failure_metric_if_enabled(&method, request_name); @@ -974,7 +1002,7 @@ mod tests { .iter() .map(|sn| sn.service.address.clone()) .collect(); - let expected_addresses = vec![ + let expected_addresses = [ "1.1.1.1".to_string(), "2.2.2.2".to_string(), "3.3.3.3".to_string(), @@ -1261,7 +1289,7 @@ mod tests { fn get_client() -> Consul { let conf: Config = Config::from_env(); - Consul::new(conf) + Consul::new(conf).unwrap() } async fn create_or_update_key_value(