diff --git a/.circleci/config.yml b/.circleci/config.yml index 95cfdc29..4a8637a2 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -22,7 +22,7 @@ version: 2 jobs: build: docker: - - image: rust:1.26.0 + - image: rustlang/rust:nightly environment: RUSTFLAGS: -D warnings steps: @@ -33,6 +33,7 @@ jobs: - run: rustc --version > ~/rust-version - *RESTORE_DEPS - run: cargo test + - run: cargo test --no-default-features - *SAVE_DEPS workflows: diff --git a/.gitignore b/.gitignore index 9b16963c..582014fd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ target/ Cargo.lock .vscode/ +.idea/ +*.iml diff --git a/Cargo.toml b/Cargo.toml index 991812cb..8e35318b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ name = "hyper-openssl" version = "0.7.1" authors = ["Steven Fackler "] +edition = "2018" description = "Hyper TLS support via OpenSSL" license = "MIT/Apache-2.0" repository = "https://github.com/sfackler/hyper-openssl" @@ -16,15 +17,14 @@ runtime = ["hyper/runtime"] [dependencies] antidote = "1.0.0" bytes = "0.4" -hyper = { version = "0.12.14", default-features = false } -lazy_static = "1.0" +hyper = { version = "=0.13.0-alpha.1", default-features = false } linked_hash_set = "0.1" +once_cell = "1.0" openssl = "0.10.19" openssl-sys = "0.9.26" -tokio-io = "0.1.2" -tokio-openssl = "0.3" -futures = "0.1.14" +tokio = "=0.2.0-alpha.4" +tokio-openssl = "=0.4.0-alpha.4" [dev-dependencies] -hyper = "0.12" -tokio = "0.1" +futures-preview = "=0.3.0-alpha.18" +hyper = "=0.13.0-alpha.1" diff --git a/src/lib.rs b/src/lib.rs index a4981d4f..d9f7d171 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,49 +2,38 @@ #![warn(missing_docs)] #![doc(html_root_url = "https://docs.rs/hyper-openssl/0.7")] -extern crate antidote; -extern crate bytes; -extern crate futures; -extern crate hyper; -extern crate linked_hash_set; -pub extern crate openssl; -extern crate tokio_io; -extern crate tokio_openssl; - -#[macro_use] -extern crate lazy_static; - -#[cfg(test)] -extern crate tokio; - use antidote::Mutex; use bytes::{Buf, BufMut}; -use futures::{Async, Future, Poll}; use hyper::client::connect::{Connect, Connected, Destination}; #[cfg(feature = "runtime")] use hyper::client::HttpConnector; use openssl::error::ErrorStack; use openssl::ex_data::Index; use openssl::ssl::{ - ConnectConfiguration, Ssl, SslConnector, SslConnectorBuilder, SslMethod, SslSessionCacheMode, + ConnectConfiguration, Ssl, SslConnector, SslConnectorBuilder, SslSessionCacheMode, }; +#[cfg(feature = "runtime")] +use openssl::ssl::SslMethod; use std::error::Error; use std::fmt::Debug; -use std::io::{self, Read, Write}; -use std::mem; +use std::io; use std::sync::Arc; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_openssl::{ConnectAsync, ConnectConfigurationExt, SslStream}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_openssl::SslStream; use cache::{SessionCache, SessionKey}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Poll, Context}; +use once_cell::sync::OnceCell; mod cache; #[cfg(test)] mod test; -lazy_static! { - // The unwrap here isn't great but this only fails on OOM - static ref KEY_INDEX: Index = Ssl::new_ex_index().unwrap(); +fn key_index() -> Result, ErrorStack> { + static IDX: OnceCell> = OnceCell::new(); + IDX.get_or_try_init(|| Ssl::new_ex_index()).map(|v| *v) } #[derive(Clone)] @@ -52,7 +41,7 @@ struct Inner { ssl: SslConnector, cache: Arc>, callback: Option< - Arc Result<(), ErrorStack> + Sync + Send>, + Arc Result<(), ErrorStack> + Sync + Send>, >, } @@ -75,7 +64,8 @@ impl Inner { } } - conf.set_ex_data(*KEY_INDEX, key); + let idx = key_index()?; + conf.set_ex_data(idx, key); Ok(conf) } @@ -96,8 +86,8 @@ impl HttpsConnector { /// HTTP/2 and HTTP/1.1. /// /// Requires the `runtime` Cargo feature. - pub fn new(threads: usize) -> Result, ErrorStack> { - let mut http = HttpConnector::new(threads); + pub fn new() -> Result, ErrorStack> { + let mut http = HttpConnector::new(); http.enforce_http(false); let mut ssl = SslConnector::builder(SslMethod::tls())?; // avoid unused_mut warnings when building against OpenSSL 1.0.1 @@ -129,7 +119,7 @@ where ssl.set_new_session_callback({ let cache = cache.clone(); move |ssl, session| { - if let Some(key) = ssl.ex_data(*KEY_INDEX) { + if let Some(key) = key_index().ok().and_then(|idx| ssl.ex_data(idx)) { cache.lock().insert(key.clone(), session); } } @@ -165,109 +155,47 @@ where impl Connect for HttpsConnector where T: Connect, - T::Transport: Debug + Sync + Send, + T::Transport: Debug + Sync, + T::Future: 'static, { type Transport = MaybeHttpsStream; - type Error = Box; - type Future = ConnectFuture; + type Error = Box; + type Future = + Pin> + Send>>; - fn connect(&self, destination: Destination) -> ConnectFuture { + fn connect(&self, destination: Destination) -> Self::Future { let tls_setup = if destination.scheme() == "https" { Some((self.inner.clone(), destination.clone())) } else { None }; - let conn = self.http.connect(destination); + let connect = self.http.connect(destination); - ConnectFuture(ConnectState::InnerConnect { conn, tls_setup }) - } -} + let f = async { + let (conn, mut connected) = connect.await.map_err(Into::into)?; -enum ConnectState -where - T: Connect, -{ - InnerConnect { - conn: T::Future, - tls_setup: Option<(Inner, Destination)>, - }, - Handshake { - handshake: ConnectAsync, - connected: Connected, - }, - Terminal, -} + let (inner, destination) = match tls_setup { + Some((inner, destination)) => (inner, destination), + None => return Ok((MaybeHttpsStream::Http(conn), connected)), + }; -/// A future connecting to a remote HTTP server. -pub struct ConnectFuture(ConnectState) -where - T: Connect; + let config = inner.setup_ssl(&destination)?; + let stream = tokio_openssl::connect(config, destination.host(), conn).await?; -impl Future for ConnectFuture -where - T: Connect, - T::Transport: Debug + Sync + Send, -{ - type Item = (MaybeHttpsStream, Connected); - type Error = Box; - - fn poll( - &mut self, - ) -> Poll<(MaybeHttpsStream, Connected), Box> { - loop { - match mem::replace(&mut self.0, ConnectState::Terminal) { - ConnectState::InnerConnect { - mut conn, - tls_setup, - } => match conn.poll() { - Ok(Async::Ready((stream, connected))) => match tls_setup { - Some((inner, destination)) => { - let ssl = inner.setup_ssl(&destination)?; - let handshake = ssl.connect_async(destination.host(), stream); - self.0 = ConnectState::Handshake { - handshake, - connected, - }; - } - None => { - return Ok(Async::Ready((MaybeHttpsStream::Http(stream), connected))); - } - }, - Ok(Async::NotReady) => { - self.0 = ConnectState::InnerConnect { conn, tls_setup }; - return Ok(Async::NotReady); - } - Err(e) => return Err(e.into()), - }, - ConnectState::Handshake { - mut handshake, - mut connected, - } => match handshake.poll() { - Ok(Async::Ready(stream)) => { - // avoid unused_mut warnings on OpenSSL 1.0.1 - connected = connected; - - #[cfg(ossl102)] - { - if let Some(b"h2") = stream.get_ref().ssl().selected_alpn_protocol() { - connected = connected.negotiated_h2(); - } - } - return Ok(Async::Ready((MaybeHttpsStream::Https(stream), connected))); - } - Ok(Async::NotReady) => { - self.0 = ConnectState::Handshake { - handshake, - connected, - }; - return Ok(Async::NotReady); - } - Err(e) => return Err(e.into()), - }, - ConnectState::Terminal => panic!("future polled after completion"), - }; - } + // Avoid unused_mut warnings on OpenSSL 1.0.1 + connected = connected; + #[cfg(ossl102)] + { + if let Some(b"h2") = stream.ssl().selected_alpn_protocol() { + connected = connected.negotiated_h2(); + } + } + + Ok((MaybeHttpsStream::Https(stream), connected)) + }; + + Box::pin(f) } } @@ -279,77 +207,67 @@ pub enum MaybeHttpsStream { Https(SslStream), } -impl Read for MaybeHttpsStream +impl AsyncRead for MaybeHttpsStream where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match *self { - MaybeHttpsStream::Http(ref mut s) => s.read(buf), - MaybeHttpsStream::Https(ref mut s) => s.read(buf), + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + match &*self { + MaybeHttpsStream::Http(s) => s.prepare_uninitialized_buffer(buf), + MaybeHttpsStream::Https(s) => s.prepare_uninitialized_buffer(buf), } } -} -impl AsyncRead for MaybeHttpsStream -where - T: AsyncRead + AsyncWrite, -{ - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - match *self { - MaybeHttpsStream::Http(ref s) => s.prepare_uninitialized_buffer(buf), - MaybeHttpsStream::Https(ref s) => s.prepare_uninitialized_buffer(buf), + fn poll_read(mut self: Pin<&mut Self>, ctx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + match &mut *self { + MaybeHttpsStream::Http(s) => Pin::new(s).poll_read(ctx, buf), + MaybeHttpsStream::Https(s) => Pin::new(s).poll_read(ctx, buf), } } - fn read_buf(&mut self, buf: &mut B) -> Poll + fn poll_read_buf(mut self: Pin<&mut Self>, ctx: &mut Context<'_>, buf: &mut B) -> Poll> where B: BufMut, { - match *self { - MaybeHttpsStream::Http(ref mut s) => s.read_buf(buf), - MaybeHttpsStream::Https(ref mut s) => s.read_buf(buf), + match &mut *self { + MaybeHttpsStream::Http(s) => Pin::new(s).poll_read_buf(ctx, buf), + MaybeHttpsStream::Https(s) => Pin::new(s).poll_read_buf(ctx, buf), } } } -impl Write for MaybeHttpsStream +impl AsyncWrite for MaybeHttpsStream where - T: AsyncRead + AsyncWrite, + T: AsyncRead + AsyncWrite + Unpin, { - fn write(&mut self, buf: &[u8]) -> io::Result { - match *self { - MaybeHttpsStream::Http(ref mut s) => s.write(buf), - MaybeHttpsStream::Https(ref mut s) => s.write(buf), + fn poll_write(mut self: Pin<&mut Self>, ctx: &mut Context<'_>, buf: &[u8]) -> Poll> { + match &mut *self { + MaybeHttpsStream::Http(s) => Pin::new(s).poll_write(ctx, buf), + MaybeHttpsStream::Https(s) => Pin::new(s).poll_write(ctx, buf), } } - fn flush(&mut self) -> io::Result<()> { - match *self { - MaybeHttpsStream::Http(ref mut s) => s.flush(), - MaybeHttpsStream::Https(ref mut s) => s.flush(), + fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + match &mut *self { + MaybeHttpsStream::Http(s) => Pin::new(s).poll_flush(ctx), + MaybeHttpsStream::Https(s) => Pin::new(s).poll_flush(ctx), } } -} -impl AsyncWrite for MaybeHttpsStream -where - T: AsyncRead + AsyncWrite, -{ - fn shutdown(&mut self) -> Poll<(), io::Error> { - match *self { - MaybeHttpsStream::Http(ref mut s) => s.shutdown(), - MaybeHttpsStream::Https(ref mut s) => s.shutdown(), + fn poll_shutdown(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + match &mut *self { + MaybeHttpsStream::Http(s) => Pin::new(s).poll_shutdown(ctx), + MaybeHttpsStream::Https(s) => Pin::new(s).poll_shutdown(ctx), } } - fn write_buf(&mut self, buf: &mut B) -> Poll + fn poll_write_buf(mut self: Pin<&mut Self>, ctx: &mut Context<'_>, buf: &mut B) -> Poll> where B: Buf, { - match *self { - MaybeHttpsStream::Http(ref mut s) => s.write_buf(buf), - MaybeHttpsStream::Https(ref mut s) => s.write_buf(buf), + match &mut *self { + MaybeHttpsStream::Http(s) => Pin::new(s).poll_write_buf(ctx, buf), + MaybeHttpsStream::Https(s) => Pin::new(s).poll_write_buf(ctx, buf), } } } diff --git a/src/test.rs b/src/test.rs index 9c356e01..aacef95a 100644 --- a/src/test.rs +++ b/src/test.rs @@ -1,75 +1,55 @@ -use futures::stream::Stream; use hyper::client::HttpConnector; +use futures::future; +use futures::stream::TryStreamExt; +use hyper::{service, Response}; +use tokio::net::TcpListener; use hyper::{Body, Client}; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; -use std::thread; -use tokio::runtime::current_thread::Runtime; +use hyper::server::conn::Http; use super::*; -#[test] +#[tokio::test] #[cfg(feature = "runtime")] -fn google() { - let ssl = HttpsConnector::new(4).unwrap(); +async fn google() { + let ssl = HttpsConnector::new().unwrap(); let client = Client::builder().keep_alive(false).build::<_, Body>(ssl); - let mut runtime = Runtime::new().unwrap(); - - let f = client - .get("https://www.google.com".parse().unwrap()) - .and_then(|resp| { - assert!(resp.status().is_success(), "{}", resp.status()); - resp.into_body().for_each(|_| Ok(())) - }); - runtime.block_on(f).unwrap(); - - let f = client - .get("https://www.google.com".parse().unwrap()) - .and_then(|resp| { - assert!(resp.status().is_success(), "{}", resp.status()); - resp.into_body().for_each(|_| Ok(())) - }); - runtime.block_on(f).unwrap(); - - let f = client - .get("https://www.google.com".parse().unwrap()) - .and_then(|resp| { - assert!(resp.status().is_success(), "{}", resp.status()); - resp.into_body().for_each(|_| Ok(())) - }); - runtime.block_on(f).unwrap(); + for _ in 0..3 { + let resp = client.get("https://www.google.com".parse().unwrap()).await.unwrap(); + assert!(resp.status().is_success(), "{}", resp.status()); + resp.into_body().try_concat().await.unwrap(); + } } -#[test] -fn localhost() { - let listener = ::std::net::TcpListener::bind("127.0.0.1:15410").unwrap(); +#[tokio::test] +async fn localhost() { + let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let port = listener.local_addr().unwrap().port(); - let mut ctx = SslAcceptor::mozilla_modern(SslMethod::tls()).unwrap(); - #[cfg(ossl111)] - { - ctx.clear_options(openssl::ssl::SslOptions::NO_TLSV1_3); - } - - ctx.set_session_id_context(b"test").unwrap(); - ctx.set_certificate_chain_file("test/cert.pem").unwrap(); - ctx.set_private_key_file("test/key.pem", SslFiletype::PEM) - .unwrap(); - let ctx = ctx.build(); + let server = async move { + let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); + acceptor.set_session_id_context(b"test").unwrap(); + acceptor.set_private_key_file("test/key.pem", SslFiletype::PEM).unwrap(); + acceptor.set_certificate_chain_file("test/cert.pem").unwrap(); + let acceptor = acceptor.build(); - let thread = thread::spawn(move || { for _ in 0..3 { - let stream = listener.accept().unwrap().0; - let mut stream = ctx.accept(stream).unwrap(); - stream.read_exact(&mut [0]).unwrap(); - stream - .write_all(b"HTTP/1.1 204 No Content\r\nConnection: close\r\n\r\n") + let stream = listener.accept().await.unwrap().0; + let stream = tokio_openssl::accept(&acceptor, stream).await.unwrap(); + + let service = service::service_fn(|_| future::ready(Ok::<_, io::Error>(Response::new(Body::empty())))); + + Http::new() + .keep_alive(false) + .serve_connection(stream, service) + .await .unwrap(); - stream.shutdown().unwrap(); } - }); + }; + tokio::spawn(server); - let mut connector = HttpConnector::new(1); + let mut connector = HttpConnector::new(); connector.enforce_http(false); let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap(); ssl.set_ca_file("test/cert.pem").unwrap(); @@ -88,65 +68,46 @@ fn localhost() { let ssl = HttpsConnector::with_connector(connector, ssl).unwrap(); let client = Client::builder().build::<_, Body>(ssl); - let mut runtime = Runtime::new().unwrap(); - for _ in 0..3 { - let f = client - .get(format!("https://localhost:{}", port).parse().unwrap()) - .and_then(|resp| { - assert!(resp.status().is_success(), "{}", resp.status()); - resp.into_body().for_each(|_| Ok(())) - }); - runtime.block_on(f).unwrap(); + let resp = client.get(format!("https://localhost:{}", port).parse().unwrap()).await.unwrap(); + assert!(resp.status().is_success(), "{}", resp.status()); + resp.into_body().try_concat().await.unwrap(); } - - thread.join().unwrap(); } -#[test] +#[tokio::test] #[cfg(ossl102)] -fn alpn_h2() { - use futures::future; - use hyper::server::conn::Http; - use hyper::service; - use hyper::Response; +async fn alpn_h2() { use openssl::ssl::{self, AlpnError}; - use tokio::net::TcpListener; - use tokio_openssl::SslAcceptorExt; - let mut listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let mut listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let port = listener.local_addr().unwrap().port(); - let mut ctx = SslAcceptor::mozilla_modern(SslMethod::tls()).unwrap(); - ctx.set_certificate_chain_file("test/cert.pem").unwrap(); - ctx.set_private_key_file("test/key.pem", SslFiletype::PEM) - .unwrap(); - ctx.set_alpn_select_callback(|_, client| { - ssl::select_next_proto(b"\x02h2", client).ok_or(AlpnError::NOACK) - }); - let ctx = ctx.build(); - - let server = future::poll_fn(move || listener.poll_accept()) - .map_err(|e| panic!("tcp accept error: {}", e)) - .and_then(move |(stream, _)| ctx.accept_async(stream)) - .map_err(|e| panic!("tls accept error: {}", e)) - .and_then(|stream| { - assert_eq!( - stream.get_ref().ssl().selected_alpn_protocol().unwrap(), - b"h2" - ); - Http::new().http2_only(true).serve_connection( - stream, - service::service_fn_ok(|_| Response::new(Body::empty())), - ) - }) - .map(|_| ()) - .map_err(|e| panic!("http error: {}", e)); - - let mut runtime = Runtime::new().unwrap(); - runtime.spawn(server); - - let mut connector = HttpConnector::new(1); + let server = async move { + let mut acceptor = SslAcceptor::mozilla_modern(SslMethod::tls()).unwrap(); + acceptor.set_certificate_chain_file("test/cert.pem").unwrap(); + acceptor.set_private_key_file("test/key.pem", SslFiletype::PEM) + .unwrap(); + acceptor.set_alpn_select_callback(|_, client| { + ssl::select_next_proto(b"\x02h2", client).ok_or(AlpnError::NOACK) + }); + let acceptor = acceptor.build(); + + let stream = listener.accept().await.unwrap().0; + let stream = tokio_openssl::accept(&acceptor, stream).await.unwrap(); + assert_eq!(stream.ssl().selected_alpn_protocol().unwrap(), b"h2"); + + let service = service::service_fn(|_| future::ready(Ok::<_, io::Error>(Response::new(Body::empty())))); + + Http::new() + .http2_only(true) + .serve_connection(stream, service) + .await + .unwrap(); + }; + tokio::spawn(server); + + let mut connector = HttpConnector::new(); connector.enforce_http(false); let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap(); ssl.set_ca_file("test/cert.pem").unwrap(); @@ -155,14 +116,7 @@ fn alpn_h2() { let ssl = HttpsConnector::with_connector(connector, ssl).unwrap(); let client = Client::builder().build::<_, Body>(ssl); - let f = client - .get(format!("https://localhost:{}", port).parse().unwrap()) - .and_then(|resp| { - assert!(resp.status().is_success(), "{}", resp.status()); - resp.into_body().for_each(|_| Ok(())) - }); - runtime.block_on(f).unwrap(); - drop(client); - - runtime.run().unwrap(); + let resp = client.get(format!("https://localhost:{}", port).parse().unwrap()).await.unwrap(); + assert!(resp.status().is_success(), "{}", resp.status()); + resp.into_body().try_concat().await.unwrap(); }