diff --git a/build.sbt b/build.sbt index cf5f3f2b..25a89b8a 100644 --- a/build.sbt +++ b/build.sbt @@ -319,6 +319,27 @@ lazy val `play-ahc-ws-standalone` = project `shaded` ).disablePlugins(sbtassembly.AssemblyPlugin) +//--------------------------------------------------------------- +// Akka Http implementation of WS +//--------------------------------------------------------------- + +lazy val `play-akka-http-ws-standalone` = project + .in(file("play-akka-http-ws-standalone")) + .settings(commonSettings) + .settings(formattingSettings) + .settings( + fork in Test := true, + testOptions in Test := Seq(Tests.Argument(TestFrameworks.JUnit, "-a", "-v")) + ) + .settings( + // The scaladoc generation + ) + .settings(libraryDependencies ++= standaloneAkkaHttpWSDependencies) + .dependsOn( + `play-ws-standalone` + ) + .disablePlugins(sbtassembly.AssemblyPlugin) + //--------------------------------------------------------------- // JSON Readables and Writables //--------------------------------------------------------------- @@ -380,6 +401,7 @@ lazy val `integration-tests` = project.in(file("integration-tests")) .settings(shadedOAuthSettings) .dependsOn( `play-ahc-ws-standalone`, + `play-akka-http-ws-standalone`, `play-ws-standalone-json`, `play-ws-standalone-xml` ) @@ -402,6 +424,7 @@ lazy val root = project `play-ws-standalone-json`, `play-ws-standalone-xml`, `play-ahc-ws-standalone`, + `play-akka-http-ws-standalone`, `integration-tests` ) .disablePlugins(sbtassembly.AssemblyPlugin) diff --git a/integration-tests/src/test/resources/README.md b/integration-tests/src/test/resources/README.md new file mode 100644 index 00000000..25ce1e1f --- /dev/null +++ b/integration-tests/src/test/resources/README.md @@ -0,0 +1 @@ +The `rootCA.crt` and `server.p12` have been taken from [Akka Http Tls Tests Keys](https://github.com/akka/akka-http/tree/v10.0.11/akka-http-core/src/test/resources/keys). \ No newline at end of file diff --git a/integration-tests/src/test/resources/rootCA.crt b/integration-tests/src/test/resources/rootCA.crt new file mode 100644 index 00000000..6ba9fb75 --- /dev/null +++ b/integration-tests/src/test/resources/rootCA.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDXTCCAkWgAwIBAgIJANYwx08wP3STMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMTUwNzIzMDk0ODI2WhcNMjUwNDIxMDk0ODI2WjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB +CgKCAQEArk0K/Rn7uND2YGFBks5Sok1WvNdHQccPESEw2hNVF32ExAhbBXCrFaIl +Io0q4eYSbypeauEjDXB/NJXurEefL8ONXK62erJDKKQ0aTTYqsVifoNYA9ORWoGE +XhtAfOx4xvzr6vF1e3kz0PB/A4ftn0vvVygYnf/2E2bQZgaw8dXP5lIGasEzzigB +LX/qTEW/vBOL98Rxp6JvjwvYMbPSZGwNwSz+tI5W2psdE1Mga2Qnsv3j+STWlD9v ++JlgdN8r3PyR1sl3jC7gCj3AaOhv4RbAbqjwnZ9nrckx16PFiMtJiVRea7CQXN7g +191EVujQnlg1LOhiSMKwVsuoXr08ywIDAQABo1AwTjAdBgNVHQ4EFgQU2THI/ilU +M0xds3vZlV4CvhAZ1d8wHwYDVR0jBBgwFoAU2THI/ilUM0xds3vZlV4CvhAZ1d8w +DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAK9LO0HyIi0xbTISsc+A5 +LQyZowgRAGqsNNmni7NKDXauPLZrCfDVhvo/FPP1XSFShXo7ARvro9lul4AJlkNN +VgX0gbWtkiAx0uLqlbMsC6imj2L9boRse7mzI/Ymem5SNTn9GUnlMiZ74rca9UT4 +Dk9YytrT4FSpomiL6z8Xj604W3RuLSdEfpfcn3Jh2tFSZ9hyLwB7ATUTA/yuj1SU +G1gmoPMvlnPzNj2lIqyIdQxGdxt+L3mFO20CxBkeieWqQuNptpjwptliFjkZJJZP +wQlx9qLLvs/eFC2AUWj+hbsl37PuARR9hoeqbKRcUjwGtaXOqikrvX1qzPc2+ij9 +/w== +-----END CERTIFICATE----- diff --git a/integration-tests/src/test/resources/server.p12 b/integration-tests/src/test/resources/server.p12 new file mode 100644 index 00000000..d72cc9f3 Binary files /dev/null and b/integration-tests/src/test/resources/server.p12 differ diff --git a/integration-tests/src/test/scala/akka/io/AkkaExampleOrgToLocalhostDnsProvider.scala b/integration-tests/src/test/scala/akka/io/AkkaExampleOrgToLocalhostDnsProvider.scala new file mode 100644 index 00000000..b6e11495 --- /dev/null +++ b/integration-tests/src/test/scala/akka/io/AkkaExampleOrgToLocalhostDnsProvider.scala @@ -0,0 +1,15 @@ +package akka.io + +import java.net.InetAddress + +import scala.concurrent.duration._ + +class AkkaExampleOrgToLocalhostDnsProvider extends DnsProvider { + override def cache: Dns = { + val cache = new SimpleDnsCache() + cache.put(Dns.Resolved("akka.example.org", Seq(InetAddress.getByName("127.0.0.1"))), 1.hour.toMillis) + cache + } + override def actorClass = classOf[InetAddressDnsResolver] + override def managerClass = classOf[SimpleDnsManager] +} diff --git a/integration-tests/src/test/scala/play/AkkaServerProvider.scala b/integration-tests/src/test/scala/play/AkkaServerProvider.scala index ae890818..0845ac6a 100644 --- a/integration-tests/src/test/scala/play/AkkaServerProvider.scala +++ b/integration-tests/src/test/scala/play/AkkaServerProvider.scala @@ -3,10 +3,17 @@ */ package play +import java.io.InputStream +import java.net.{ InetAddress, UnknownHostException } +import java.security.cert.CertificateFactory +import java.security.{ KeyStore, SecureRandom } +import javax.net.ssl.{ KeyManagerFactory, SSLContext, SSLParameters, TrustManagerFactory } + import akka.actor.ActorSystem -import akka.http.scaladsl.Http +import akka.http.scaladsl.{ Http, HttpsConnectionContext } import akka.http.scaladsl.server.Route import akka.stream.ActorMaterializer +import com.typesafe.config.ConfigFactory import org.specs2.concurrent.ExecutionEnv import org.specs2.specification.BeforeAfterAll @@ -26,29 +33,79 @@ trait AkkaServerProvider extends BeforeAfterAll { def executionEnv: ExecutionEnv var testServerPort: Int = _ + var testServerPortHttps: Int = _ val defaultTimeout: FiniteDuration = 5.seconds // Create Akka system for thread and streaming management - implicit val system = ActorSystem() + implicit val system = ActorSystem("AkkaServerProvider", ConfigFactory.parseString( + s""" + |akka.io.dns.inet-address.provider-object = ${classOf[akka.io.AkkaExampleOrgToLocalhostDnsProvider].getName} + """.stripMargin).withFallback(ConfigFactory.load())) implicit val materializer = ActorMaterializer() - lazy val futureServer: Future[Http.ServerBinding] = { + lazy val futureServer: Future[Seq[Http.ServerBinding]] = { + implicit val ec = executionEnv.executionContext + // Using 0 (zero) means that a random free port will be used. // So our tests can run in parallel and won't mess with each other. - Http().bindAndHandle(routes, "localhost", 0) + val httpBinding = Http().bindAndHandle(routes, "localhost", 0) + .map { b => testServerPort = b.localAddress.getPort; b } + val httpsBinding = Http().bindAndHandle(routes, "localhost", 0, connectionContext = serverHttpContext()) + .map { b => testServerPortHttps = b.localAddress.getPort; b } + + Future.sequence(Seq(httpBinding, httpBinding)) } override def beforeAll(): Unit = { - val portFuture = futureServer.map(_.localAddress.getPort)(executionEnv.executionContext) - portFuture.onSuccess { - case port => testServerPort = port - }(executionEnv.executionContext) - Await.ready(portFuture, defaultTimeout) + implicit val ec = executionEnv.executionContext + Await.ready(futureServer, defaultTimeout) } override def afterAll(): Unit = { - futureServer.foreach(_.unbind())(executionEnv.executionContext) + futureServer.foreach(_.foreach(_.unbind()))(executionEnv.executionContext) val terminate = system.terminate() Await.ready(terminate, defaultTimeout) } -} \ No newline at end of file + + private def serverHttpContext() = { + // never put passwords into code! + val password = "abcdef".toCharArray + + val ks = KeyStore.getInstance("PKCS12") + ks.load(resourceStream("server.p12"), password) + + val keyManagerFactory = KeyManagerFactory.getInstance("SunX509") + keyManagerFactory.init(ks, password) + + val context = SSLContext.getInstance("TLS") + context.init(keyManagerFactory.getKeyManagers, null, new SecureRandom) + + new HttpsConnectionContext(context) + } + + def clientHttpsContext() = { + val certStore = KeyStore.getInstance(KeyStore.getDefaultType) + certStore.load(null, null) + // only do this if you want to accept a custom root CA. Understand what you are doing! + certStore.setCertificateEntry("ca", loadX509Certificate("rootCA.crt")) + + val certManagerFactory = TrustManagerFactory.getInstance("SunX509") + certManagerFactory.init(certStore) + + val context = SSLContext.getInstance("TLS") + context.init(null, certManagerFactory.getTrustManagers, new SecureRandom) + + val params = new SSLParameters() + params.setEndpointIdentificationAlgorithm("https") + new HttpsConnectionContext(context, sslParameters = Some(params)) + } + + private def resourceStream(resourceName: String): InputStream = { + val is = getClass.getClassLoader.getResourceAsStream(resourceName) + require(is ne null, s"Resource $resourceName not found") + is + } + + private def loadX509Certificate(resourceName: String) = + CertificateFactory.getInstance("X.509").generateCertificate(resourceStream(resourceName)) +} diff --git a/integration-tests/src/test/scala/play/api/libs/ws/WSClientSpec.scala b/integration-tests/src/test/scala/play/api/libs/ws/WSClientSpec.scala new file mode 100644 index 00000000..e2821d6e --- /dev/null +++ b/integration-tests/src/test/scala/play/api/libs/ws/WSClientSpec.scala @@ -0,0 +1,479 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package play.api.libs.ws + +import akka.http.scaladsl.model.{ ContentTypes, HttpEntity, HttpHeader, StatusCodes } +import akka.http.scaladsl.model.headers.{ Host, HttpCookie } +import akka.http.scaladsl.server.directives.Credentials +import akka.stream.scaladsl.{ Sink, Source } +import akka.util.ByteString +import org.specs2.concurrent.ExecutionEnv +import org.specs2.execute.Result +import org.specs2.matcher.FutureMatchers +import org.specs2.mutable.Specification +import play.AkkaServerProvider +import play.api.libs.ws.ahc.StandaloneAhcWSClient + +import scala.concurrent.{ Future, TimeoutException } +import scala.concurrent.duration._ +import scala.xml.Elem + +object WSClientSpec { + private def authenticator(credentials: Credentials): Option[String] = + credentials match { + case p @ Credentials.Provided(id) if p.verify("pass") => Some(id) + case _ => None + } + + val routes = { + import akka.http.scaladsl.server.Directives._ + import akka.http.scaladsl.marshalling.Marshaller._ + path("xml") { + entity(as[String]) { echo => + complete(echo) + } + } ~ + path("auth" / "basic") { + authenticateBasic(realm = "secure site", authenticator) { id => + complete(s"Authenticated $id") + } + } ~ + path("virtualhost") { + extractRequest { r => + val vh = r.header[Host].get + complete(vh.host.address + ":" + vh.port) + } + } ~ + path("timeout") { + extractActorSystem { sys => + import sys.dispatcher + complete(akka.pattern.after(2.seconds, sys.scheduler)(Future.successful("timeout"))) + } + } ~ + path("204") { + complete(StatusCodes.NoContent) + } ~ + path("stream") { + val source = Source("streamed".toIndexedSeq).map(c => ByteString(c.toString)) + val httpEntity = HttpEntity(ContentTypes.`application/octet-stream`, source) + complete(httpEntity) + } ~ + path("cookies") { + extractRequest { r => + val cookies = r.cookies.map(c => HttpCookie(c.name, c.value)) :+ HttpCookie("cookie3", "cookie3") + setCookie(cookies.head, cookies.tail: _*) { + complete("OK") + } + } + } ~ + path("scheme") { + extractRequest { r => + complete(r.uri.scheme) + } + } ~ + get { + entity(as[String]) { echo => + complete(s"GET $echo") + } + } ~ + post { + entity(as[String]) { echo => + complete(s"POST $echo") + } + } ~ + patch { + entity(as[String]) { echo => + complete(s"PATCH $echo") + } + } ~ + put { + entity(as[String]) { echo => + complete(s"PUT $echo") + } + } ~ + delete { + entity(as[String]) { echo => + complete("DELETE") + } + } ~ + head { + complete(StatusCodes.OK) + } ~ + options { + entity(as[String]) { echo => + complete("OPTIONS") + } + } + } +} + +trait WSClientSpec extends Specification + with AkkaServerProvider + with FutureMatchers + with DefaultBodyReadables { + + implicit def executionEnv: ExecutionEnv + + def withClient()(block: StandaloneWSClient => Result): Result + + override val routes = WSClientSpec.routes + + "WSClient" should { + "throw an exception on invalid url" in { + withClient() { client => + { client.url("localhost") } must throwAn[IllegalArgumentException] + } + } + + "not throw exception on valid url" in { + withClient() { client => + { client.url(s"http://localhost:$testServerPort") } must not(throwAn[IllegalArgumentException]) + } + } + + "set the basic request parameters" in { + withClient() { client => + val request = client.url(s"http://localhost:$testServerPort") + + request.url must be_==(s"http://localhost:$testServerPort") + request.method must be_==("GET") + request.contentType must beNone + request.body must be_==(EmptyBody) + + import DefaultBodyWritables._ + val textRequest = request.withBody("text") + textRequest.contentType must beSome("text/plain") + textRequest.body must beAnInstanceOf[InMemoryBody] + + val streamRequest = request.withBody(Source.empty[ByteString]) + streamRequest.contentType must beSome("application/octet-stream") + streamRequest.body must beAnInstanceOf[SourceBody] + } + } + + "correctly URL-encode the query string part" in { + withClient() { + _.url("http://example.com") + .withQueryStringParameters("&" -> "=") + .uri + .toString must equalTo("http://example.com?%26=%3D") + } + } + + "discard old query parameters when setting new ones" in { + withClient() { + _.url("http://example.com") + .withQueryStringParameters("bar" -> "baz") + .withQueryStringParameters("bar" -> "bah") + .uri.toString must equalTo("http://example.com?bar=bah") + } + } + + "add query string param" in { + withClient() { + _.url("http://example.com") + .withQueryStringParameters("bar" -> "baz") + .addQueryStringParameters("bar" -> "bah") + .uri.toString must equalTo("http://example.com?bar=bah&bar=baz") + } + } + + "support adding several query string values for a parameter" in { + withClient() { client => + val request = client + .url("http://example.com") + .withQueryStringParameters("play" -> "foo1", "play" -> "foo2") + .addQueryStringParameters("play" -> "foo3", "play" -> "foo4") + + request.queryString.get("play") must beSome + .which(_ must containTheSameElementsAs(Seq("foo1", "foo2", "foo3", "foo4"))) + } + } + + "support adding headers" in { + withClient() { client => + val request = client.url("http://playframework.com/") + .withHttpHeaders("key" -> "value1") + .addHttpHeaders("key" -> "value2") + + request.header("key") must beSome("value1") + request.headers("key") must containTheSameElementsAs(Seq("value1", "value2")) + + request.headerValues("raktas") must beEmpty + request.header("raktas") must beNone + + request + .withHttpHeaders("key" -> "value1") + .headers("key") must containTheSameElementsAs(Seq("value1")) + } + } + + "not make Content-Type header if there is Content-Type in headers already" in { + import DefaultBodyWritables._ + withClient() { + _.url("http://playframework.com/") + .withHttpHeaders("Content-Type" -> "fake/contenttype; charset=utf-8") + .withBody("I am a text/plain body") + .header("Content-Type").map(_.toLowerCase) must beSome("fake/contenttype; charset=utf-8") + } + } + + "treat headers as case insensitive" in { + withClient() { + _.url("http://playframework.com/") + .withHttpHeaders("key" -> "value1", "KEY" -> "value2") + .headers("key") must containTheSameElementsAs(Seq("value1", "value2")) + } + } + + "return underlying implementations" in { + withClient() { client => + client.underlying.getClass.getName must beOneOf( + "play.shaded.ahc.org.asynchttpclient.DefaultAsyncHttpClient", + "akka.http.scaladsl.HttpExt" + ) + + client + .url(s"http://localhost:$testServerPort/index") + .get() + .map { + _.underlying.getClass.getName must beOneOf( + "play.shaded.ahc.org.asynchttpclient.netty.NettyResponse", + "akka.http.scaladsl.model.HttpResponse" + ) + } + .awaitFor(defaultTimeout) + } + } + + "request a url as an in memory string" in { + withClient() { + _.url(s"http://localhost:$testServerPort/index") + .get() + .map(_.body[String]) + .map(_ must beEqualTo("GET ")) + .awaitFor(defaultTimeout) + } + } + + "request a url as a Foo" in { + case class Foo(body: String) + + implicit val fooBodyReadable = BodyReadable[Foo] { response => + Foo(response.body) + } + + withClient() { + _.url(s"http://localhost:$testServerPort/index") + .get() + .map(_.body[Foo]) + .map(_ must beEqualTo(Foo("GET "))) + .awaitFor(defaultTimeout) + } + } + + "request a url as a stream" in { + withClient() { + _.url(s"http://localhost:$testServerPort/index") + .stream() + .map(_.bodyAsSource) + .flatMap(_.runWith(Sink.head)) + .map(_.utf8String must beEqualTo("GET ")) + .awaitFor(defaultTimeout) + } + } + + "request a https url" in { + withClient() { client => + // FIXME configure ssl context with custom cert and enable SSL test for AHC + if (client.isInstanceOf[StandaloneAhcWSClient]) + success + else + client.url(s"https://akka.example.org:$testServerPortHttps/scheme") + .get() + .map(_.body[String]) + .map(_ must beEqualTo("https")) + .awaitFor(defaultTimeout) + } + } + + "send post request" in { + import DefaultBodyWritables._ + withClient() { + _.url(s"http://localhost:$testServerPort") + .post("hello world") + .map(_.body must be_==("POST hello world")) + .awaitFor(defaultTimeout) + } + } + + "send patch request" in { + import DefaultBodyWritables._ + withClient() { + _.url(s"http://localhost:$testServerPort") + .patch("hello world") + .map(_.body must be_==("PATCH hello world")) + .awaitFor(defaultTimeout) + } + } + + "send put request" in { + import DefaultBodyWritables._ + withClient() { + _.url(s"http://localhost:$testServerPort") + .put("hello world") + .map(_.body must be_==("PUT hello world")) + .awaitFor(defaultTimeout) + } + } + + "send delete request" in { + withClient() { + _.url(s"http://localhost:$testServerPort") + .delete() + .map(_.body must be_==("DELETE")) + .awaitFor(defaultTimeout) + } + } + + "send head request" in { + withClient() { + _.url(s"http://localhost:$testServerPort") + .head() + .map(_.status must be_==(200)) + .awaitFor(defaultTimeout) + } + } + + "send options request" in { + withClient() { + _.url(s"http://localhost:$testServerPort") + .options() + .map(_.body must be_==("OPTIONS")) + .awaitFor(defaultTimeout) + } + } + + "round trip XML" in { + val document = XML.parser.loadString( + """ + | + | hello + | world + |""".stripMargin) + + import XMLBodyWritables._ + import XMLBodyReadables._ + withClient() { + _.url(s"http://localhost:$testServerPort/xml") + .post(document) + .map(_.body[Elem]) + .map(_ must be_==(document)) + .awaitFor(defaultTimeout) + } + } + + "authenticate basic" in { + withClient() { client => + val requestWithoutAuth = client.url(s"http://localhost:$testServerPort/auth/basic") + requestWithoutAuth.auth must beNone + + val requestWithAuth = requestWithoutAuth + .withAuth("user", "pass", WSAuthScheme.BASIC) + requestWithAuth.auth must beSome(("user", "pass", WSAuthScheme.BASIC)) + + requestWithAuth + .get() + .map(_.body) + .map(_ must be_==("Authenticated user")) + .awaitFor(defaultTimeout) + } + } + + "set host header" in { + withClient() { client => + val requestWithoutVirtualHost = client.url(s"http://localhost:$testServerPort/virtualhost") + requestWithoutVirtualHost.virtualHost must beNone + + val requestWithVirtualHost = requestWithoutVirtualHost + .withVirtualHost("virtualhost:1337") + requestWithVirtualHost.virtualHost must beSome("virtualhost:1337") + + requestWithVirtualHost + .get() + .map(_.body must be_==("virtualhost:1337")) + .awaitFor(defaultTimeout) + } + } + + "complete after timeout" in { + withClient() { client => + val requestWithoutTimeout = client.url(s"http://localhost:$testServerPort/timeout") + requestWithoutTimeout.requestTimeout must beNone + + val requestWithTimeout = requestWithoutTimeout + .withRequestTimeout(100.millis) + requestWithTimeout.requestTimeout must beSome(100) + + requestWithTimeout + .get() + .map(_ => failure) + .recover { + case ex => + ex must beAnInstanceOf[TimeoutException] + ex.getMessage must startWith("Request timeout") + success + } + .awaitFor(defaultTimeout) + } + } + + "provide response status text" in { + withClient() { + _.url(s"http://localhost:$testServerPort/204") + .get() + .map(_.statusText must be_==("No Content")) + .awaitFor(defaultTimeout) + } + } + + "allow access body more than one time" in { + withClient() { + _.url(s"http://localhost:$testServerPort/stream") + .get() + .map { resp => + resp.body[String] must beEqualTo("streamed") + resp.body[String] must beEqualTo("streamed") + } + .awaitFor(defaultTimeout) + } + } + + "send and receive cookies" in { + withClient() { client => + val cookie1 = DefaultWSCookie("cookie1", "cookie1") + val cookie2 = DefaultWSCookie("cookie2", "cookie2") + val cookie3 = DefaultWSCookie("cookie3", "cookie3") + + val request = client + .url(s"http://localhost:$testServerPort/cookies") + .addCookies(cookie1) + + request.cookies must containTheSameElementsAs(Seq(cookie1)) + + request.withCookies(cookie2).cookies must containTheSameElementsAs(Seq(cookie2)) + + request + .addCookies(cookie2) + .get() + .map { resp => + resp.cookies must containTheSameElementsAs(Seq(cookie1, cookie2, cookie3)) + resp.cookie(cookie1.name) must be_==(Some(cookie1)) + } + .awaitFor(defaultTimeout) + } + } + + } +} diff --git a/integration-tests/src/test/scala/play/api/libs/ws/WSRequestFilterSpec.scala b/integration-tests/src/test/scala/play/api/libs/ws/WSRequestFilterSpec.scala new file mode 100644 index 00000000..497259a5 --- /dev/null +++ b/integration-tests/src/test/scala/play/api/libs/ws/WSRequestFilterSpec.scala @@ -0,0 +1,163 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package play.api.libs.ws + +import akka.http.scaladsl.model.headers.RawHeader +import akka.http.scaladsl.model.{ ContentTypes, HttpEntity } +import akka.http.scaladsl.server.Route +import org.specs2.concurrent.ExecutionEnv +import org.specs2.matcher.FutureMatchers +import org.specs2.mutable.Specification +import org.specs2.specification.AfterAll +import org.specs2.execute.Result +import play.AkkaServerProvider + +import scala.collection.mutable + +object WSRequestFilterSpec { + + val routes: Route = { + import akka.http.scaladsl.server.Directives._ + headerValueByName("X-Request-Id") { value => + respondWithHeader(RawHeader("X-Request-Id", value)) { + val httpEntity = HttpEntity(ContentTypes.`text/html(UTF-8)`, "

Say hello to akka-http

") + complete(httpEntity) + } + } ~ { + get { + parameters('key.as[String]) { (key) => + val httpEntity = HttpEntity(ContentTypes.`text/html(UTF-8)`, s"

Say hello to akka-http, key = $key

") + complete(httpEntity) + } + } + } + } +} + +trait WSRequestFilterSpec extends Specification with AkkaServerProvider with AfterAll with FutureMatchers { + import DefaultBodyReadables._ + + implicit def executionEnv: ExecutionEnv + + def withClient()(block: StandaloneWSClient => Result): Result + + override val routes = WSRequestFilterSpec.routes + + "with request filters" should { + + class CallbackRequestFilter(callList: mutable.Buffer[Int], value: Int) extends WSRequestFilter { + override def apply(executor: WSRequestExecutor): WSRequestExecutor = { + callList.append(value) + executor + } + } + + class HeaderAppendingFilter(key: String, value: String) extends WSRequestFilter { + override def apply(executor: WSRequestExecutor): WSRequestExecutor = { + WSRequestExecutor(r => executor(r.withHttpHeaders((key, value)))) + } + } + + "execute with adhoc request filter" in { + withClient() { + _.url(s"http://localhost:$testServerPort") + .withRequestFilter(WSRequestFilter { e => + WSRequestExecutor(r => e.apply(r.withQueryStringParameters("key" -> "some string"))) + }) + .get() + .map(_.body[String] must contain("some string")) + .awaitFor(defaultTimeout) + } + } + + "stream with adhoc request filter" in { + withClient() { + _.url(s"http://localhost:$testServerPort") + .withRequestFilter(WSRequestFilter { e => + WSRequestExecutor(r => e.apply(r.withQueryStringParameters("key" -> "some string"))) + }) + .withMethod("GET") + .stream() + .map(_.body[String] must contain("some string")) + .awaitFor(defaultTimeout) + } + } + + "execute with one request filter" in { + val callList = scala.collection.mutable.ArrayBuffer[Int]() + withClient() { + _.url(s"http://localhost:$testServerPort") + .withRequestFilter(new CallbackRequestFilter(callList, 1)) + .get() + .map(_ => callList must contain(1)) + .awaitFor(defaultTimeout) + } + } + + "stream with one request filter" in { + val callList = scala.collection.mutable.ArrayBuffer[Int]() + withClient() { + _.url(s"http://localhost:$testServerPort") + .withRequestFilter(new CallbackRequestFilter(callList, 1)) + .withMethod("GET") + .stream() + .map(_ => callList must contain(1)) + .awaitFor(defaultTimeout) + } + } + + "execute with three request filters" in { + val callList = scala.collection.mutable.ArrayBuffer[Int]() + withClient() { + _.url(s"http://localhost:$testServerPort") + .withRequestFilter(new CallbackRequestFilter(callList, 1)) + .withRequestFilter(new CallbackRequestFilter(callList, 2)) + .withRequestFilter(new CallbackRequestFilter(callList, 3)) + .get() + .map(_ => callList must containTheSameElementsAs(Seq(1, 2, 3))) + .awaitFor(defaultTimeout) + } + } + + "stream with three request filters" in { + val callList = scala.collection.mutable.ArrayBuffer[Int]() + withClient() { + _.url(s"http://localhost:$testServerPort") + .withRequestFilter(new CallbackRequestFilter(callList, 1)) + .withRequestFilter(new CallbackRequestFilter(callList, 2)) + .withRequestFilter(new CallbackRequestFilter(callList, 3)) + .withMethod("GET") + .stream() + .map(_ => callList must containTheSameElementsAs(Seq(1, 2, 3))) + .awaitFor(defaultTimeout) + } + } + + "allow filters to modify the executing request" in { + val appendedHeader = "X-Request-Id" + val appendedHeaderValue = "someid" + withClient() { + _.url(s"http://localhost:$testServerPort") + .withRequestFilter(new HeaderAppendingFilter(appendedHeader, appendedHeaderValue)) + .get() + .map(_.headers("X-Request-Id").head must be_==("someid")) + .awaitFor(defaultTimeout) + } + } + + "allow filters to modify the streaming request" in { + val appendedHeader = "X-Request-Id" + val appendedHeaderValue = "someid" + withClient() { + _.url(s"http://localhost:$testServerPort") + .withRequestFilter(new HeaderAppendingFilter(appendedHeader, appendedHeaderValue)) + .withMethod("GET") + .stream() + .map(_.headers("X-Request-Id").head must be_==("someid")) + .awaitFor(defaultTimeout) + } + } + } +} diff --git a/integration-tests/src/test/scala/play/api/libs/ws/ahc/AhcWSClientSpec.scala b/integration-tests/src/test/scala/play/api/libs/ws/ahc/AhcWSClientSpec.scala index 22a16fe1..a25845ee 100644 --- a/integration-tests/src/test/scala/play/api/libs/ws/ahc/AhcWSClientSpec.scala +++ b/integration-tests/src/test/scala/play/api/libs/ws/ahc/AhcWSClientSpec.scala @@ -1,26 +1,12 @@ -/* - * Copyright (C) 2009-2017 Lightbend Inc. - */ package play.api.libs.ws.ahc -import akka.stream.scaladsl.Sink -import akka.util.ByteString -import org.specs2.concurrent.{ ExecutionEnv, FutureAwait } +import org.specs2.concurrent.ExecutionEnv import org.specs2.execute.Result -import org.specs2.matcher.FutureMatchers -import org.specs2.mutable.Specification -import play.AkkaServerProvider -import play.api.libs.ws.{ BodyReadable, DefaultBodyReadables } +import play.api.libs.ws.{ StandaloneWSClient, WSClientSpec } -import scala.concurrent._ - -class AhcWSClientSpec(implicit val executionEnv: ExecutionEnv) extends Specification - with AkkaServerProvider - with FutureMatchers - with FutureAwait - with DefaultBodyReadables { - - def withClient(config: AhcWSClientConfig = AhcWSClientConfigFactory.forConfig())(block: StandaloneAhcWSClient => Result): Result = { +class AhcWSClientSpec(implicit override val executionEnv: ExecutionEnv) extends WSClientSpec { + def withClient()(block: StandaloneWSClient => Result): Result = { + val config = AhcWSClientConfigFactory.forConfig() val client = StandaloneAhcWSClient(config) try { block(client) @@ -28,64 +14,4 @@ class AhcWSClientSpec(implicit val executionEnv: ExecutionEnv) extends Specifica client.close() } } - - override val routes = { - import akka.http.scaladsl.server.Directives._ - get { - complete("

Say hello to akka-http

") - } ~ - post { - entity(as[String]) { echo => - complete(echo) - } - } - } - - "url" should { - "throw an exception on invalid url" in { - withClient() { client => - { client.url("localhost") } must throwAn[IllegalArgumentException] - } - } - - "not throw exception on valid url" in { - withClient() { client => - { client.url(s"http://localhost:$testServerPort") } must not(throwAn[IllegalArgumentException]) - } - } - } - - "WSClient" should { - - "request a url as an in memory string" in { - withClient() { client => - val result = Await.result(client.url(s"http://localhost:$testServerPort/index").get().map(res => res.body[String]), defaultTimeout) - result must beEqualTo("

Say hello to akka-http

") - } - } - - "request a url as a Foo" in { - case class Foo(body: String) - - implicit val fooBodyReadable = BodyReadable[Foo] { response => - import play.shaded.ahc.org.asynchttpclient.{ Response => AHCResponse } - val ahcResponse = response.asInstanceOf[StandaloneAhcWSResponse].underlying[AHCResponse] - Foo(ahcResponse.getResponseBody) - } - - withClient() { client => - val result = Await.result(client.url(s"http://localhost:$testServerPort/index").get().map(res => res.body[Foo]), defaultTimeout) - result must beEqualTo(Foo("

Say hello to akka-http

")) - } - } - - "request a url as a stream" in { - withClient() { client => - val resultSource = Await.result(client.url(s"http://localhost:$testServerPort/index").stream().map(_.bodyAsSource), defaultTimeout) - val bytes: ByteString = Await.result(resultSource.runWith(Sink.head), defaultTimeout) - bytes.utf8String must beEqualTo("

Say hello to akka-http

") - } - } - - } } diff --git a/integration-tests/src/test/scala/play/api/libs/ws/ahc/AhcWSRequestFilterSpec.scala b/integration-tests/src/test/scala/play/api/libs/ws/ahc/AhcWSRequestFilterSpec.scala index 4f99b7a6..0c0e6b3c 100644 --- a/integration-tests/src/test/scala/play/api/libs/ws/ahc/AhcWSRequestFilterSpec.scala +++ b/integration-tests/src/test/scala/play/api/libs/ws/ahc/AhcWSRequestFilterSpec.scala @@ -4,141 +4,18 @@ package play.api.libs.ws.ahc -import akka.http.scaladsl.model.headers.RawHeader -import akka.http.scaladsl.model.{ ContentTypes, HttpEntity } -import akka.http.scaladsl.server.Route import org.specs2.concurrent.ExecutionEnv -import org.specs2.matcher.FutureMatchers -import org.specs2.mutable.Specification -import org.specs2.specification.AfterAll -import play.AkkaServerProvider +import org.specs2.execute.Result import play.api.libs.ws._ -import scala.collection.mutable - -class AhcWSRequestFilterSpec(implicit val executionEnv: ExecutionEnv) extends Specification with AkkaServerProvider with AfterAll with FutureMatchers { - import DefaultBodyReadables._ - - // Create the standalone WS client - val client = StandaloneAhcWSClient() - - override val routes: Route = { - import akka.http.scaladsl.server.Directives._ - headerValueByName("X-Request-Id") { value => - respondWithHeader(RawHeader("X-Request-Id", value)) { - val httpEntity = HttpEntity(ContentTypes.`text/html(UTF-8)`, "

Say hello to akka-http

") - complete(httpEntity) - } - } ~ { - get { - parameters('key.as[String]) { (key) => - val httpEntity = HttpEntity(ContentTypes.`text/html(UTF-8)`, s"

Say hello to akka-http, key = $key

") - complete(httpEntity) - } - } - } - } - - override def afterAll: Unit = { - super.afterAll() - client.close() - } - - "with request filters" should { - - class CallbackRequestFilter(callList: mutable.Buffer[Int], value: Int) extends WSRequestFilter { - override def apply(executor: WSRequestExecutor): WSRequestExecutor = { - callList.append(value) - executor - } - } - - class HeaderAppendingFilter(key: String, value: String) extends WSRequestFilter { - override def apply(executor: WSRequestExecutor): WSRequestExecutor = { - WSRequestExecutor(r => executor(r.withHttpHeaders((key, value)))) - } - } - - "execute with adhoc request filter" in { - client.url(s"http://localhost:$testServerPort").withRequestFilter(WSRequestFilter { e => - WSRequestExecutor(r => e.apply(r.withQueryStringParameters("key" -> "some string"))) - }).get().map { response => - response.body[String] must contain("some string") - }.await(retries = 0, timeout = defaultTimeout) - } - - "stream with adhoc request filter" in { - client.url(s"http://localhost:$testServerPort").withRequestFilter(WSRequestFilter { e => - WSRequestExecutor(r => e.apply(r.withQueryStringParameters("key" -> "some string"))) - }).withMethod("GET").stream().map { response => - response.body[String] must contain("some string") - }.await(retries = 0, timeout = defaultTimeout) - } - - "execute with one request filter" in { - val callList = scala.collection.mutable.ArrayBuffer[Int]() - client.url(s"http://localhost:$testServerPort") - .withRequestFilter(new CallbackRequestFilter(callList, 1)) - .get().map { _ => - callList must contain(1) - } - .await(retries = 0, timeout = defaultTimeout) - } - - "stream with one request filter" in { - val callList = scala.collection.mutable.ArrayBuffer[Int]() - client.url(s"http://localhost:$testServerPort") - .withRequestFilter(new CallbackRequestFilter(callList, 1)) - .withMethod("GET").stream().map { _ => - callList must contain(1) - } - .await(retries = 0, timeout = defaultTimeout) - } - - "execute with three request filters" in { - val callList = scala.collection.mutable.ArrayBuffer[Int]() - client.url(s"http://localhost:$testServerPort") - .withRequestFilter(new CallbackRequestFilter(callList, 1)) - .withRequestFilter(new CallbackRequestFilter(callList, 2)) - .withRequestFilter(new CallbackRequestFilter(callList, 3)) - .get().map { _ => - callList must containTheSameElementsAs(Seq(1, 2, 3)) - } - .await(retries = 0, timeout = defaultTimeout) - } - - "stream with three request filters" in { - val callList = scala.collection.mutable.ArrayBuffer[Int]() - client.url(s"http://localhost:$testServerPort") - .withRequestFilter(new CallbackRequestFilter(callList, 1)) - .withRequestFilter(new CallbackRequestFilter(callList, 2)) - .withRequestFilter(new CallbackRequestFilter(callList, 3)) - .withMethod("GET").stream().map { _ => - callList must containTheSameElementsAs(Seq(1, 2, 3)) - } - .await(retries = 0, timeout = defaultTimeout) - } - - "allow filters to modify the executing request" in { - val appendedHeader = "X-Request-Id" - val appendedHeaderValue = "someid" - client.url(s"http://localhost:$testServerPort") - .withRequestFilter(new HeaderAppendingFilter(appendedHeader, appendedHeaderValue)) - .get().map { response ⇒ - response.headers("X-Request-Id").head must be_==("someid") - } - .await(retries = 0, timeout = defaultTimeout) - } - - "allow filters to modify the streaming request" in { - val appendedHeader = "X-Request-Id" - val appendedHeaderValue = "someid" - client.url(s"http://localhost:$testServerPort") - .withRequestFilter(new HeaderAppendingFilter(appendedHeader, appendedHeaderValue)) - .withMethod("GET").stream().map { response ⇒ - response.headers("X-Request-Id").head must be_==("someid") - } - .await(retries = 0, timeout = defaultTimeout) +class AhcWSRequestFilterSpec(implicit val executionEnv: ExecutionEnv) extends WSRequestFilterSpec { + def withClient()(block: StandaloneWSClient => Result): Result = { + val config = AhcWSClientConfigFactory.forConfig() + val client = StandaloneAhcWSClient(config) + try { + block(client) + } finally { + client.close() } } } diff --git a/integration-tests/src/test/scala/play/api/libs/ws/akkahttp/AkkaHttpWSClientSpec.scala b/integration-tests/src/test/scala/play/api/libs/ws/akkahttp/AkkaHttpWSClientSpec.scala new file mode 100644 index 00000000..2853c3bd --- /dev/null +++ b/integration-tests/src/test/scala/play/api/libs/ws/akkahttp/AkkaHttpWSClientSpec.scala @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package play.api.libs.ws.akkahttp + +import org.specs2.concurrent.ExecutionEnv +import org.specs2.execute.Result +import play.api.libs.ws.{ StandaloneWSClient, WSClientSpec } + +class AkkaHttpWSClientSpec(implicit override val executionEnv: ExecutionEnv) extends WSClientSpec { + def withClient()(block: StandaloneWSClient => Result): Result = { + val client = StandaloneAkkaHttpWSClient(clientHttpsContext())(system, materializer) + try { + block(client) + } finally { + client.close() + } + } +} diff --git a/integration-tests/src/test/scala/play/api/libs/ws/akkahttp/AkkaHttpWSRequestFilterSpec.scala b/integration-tests/src/test/scala/play/api/libs/ws/akkahttp/AkkaHttpWSRequestFilterSpec.scala new file mode 100644 index 00000000..f5573e9c --- /dev/null +++ b/integration-tests/src/test/scala/play/api/libs/ws/akkahttp/AkkaHttpWSRequestFilterSpec.scala @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package play.api.libs.ws.akkahttp + +import org.specs2.concurrent.ExecutionEnv +import org.specs2.execute.Result +import play.api.libs.ws._ + +class AkkaHttpWSRequestFilterSpec(implicit override val executionEnv: ExecutionEnv) extends WSRequestFilterSpec { + def withClient()(block: StandaloneWSClient => Result): Result = { + val client = StandaloneAkkaHttpWSClient()(system, materializer) + try { + block(client) + } finally { + client.close() + } + } +} diff --git a/integration-tests/src/test/scala/play/libs/ws/WSClientSpec.scala b/integration-tests/src/test/scala/play/libs/ws/WSClientSpec.scala new file mode 100644 index 00000000..82708c8d --- /dev/null +++ b/integration-tests/src/test/scala/play/libs/ws/WSClientSpec.scala @@ -0,0 +1,427 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package play.libs.ws + +import java.net.MalformedURLException +import java.time.Duration + +import akka.stream.javadsl.{ Sink, Source } +import org.specs2.concurrent.ExecutionEnv +import org.specs2.execute.Result +import org.specs2.matcher.FutureMatchers +import org.specs2.mutable.Specification +import play.AkkaServerProvider +import play.libs.ws.ahc.StandaloneAhcWSClient + +import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters._ +import scala.collection.JavaConverters._ +import scala.concurrent.TimeoutException + +trait WSClientSpec extends Specification + with AkkaServerProvider + with FutureMatchers + with DefaultBodyWritables + with XMLBodyWritables with XMLBodyReadables { + + implicit def executionEnv: ExecutionEnv + + def withClient()(block: StandaloneWSClient => Result): Result + + override val routes = play.api.libs.ws.WSClientSpec.routes + + "WSClient" should { + "throw an exception on invalid url" in { + withClient() { client => + // akka http parses no scheme properly + { client.url("httt://localhost") } must (throwAn[RuntimeException] like { + case ex: RuntimeException => + ex.getCause must beAnInstanceOf[MalformedURLException] + }) + } + } + + "not throw exception on valid url" in { + withClient() { client => + { client.url(s"http://localhost:$testServerPort") } must not(throwAn[IllegalArgumentException]) + } + } + + "set the basic request parameters" in { + withClient() { client => + val request = client.url(s"http://localhost:$testServerPort") + + request.getUrl must be_==(s"http://localhost:$testServerPort") + request.getContentType must beNull + + client + .url(s"http://localhost:$testServerPort") + .setContentType("text/plain") + .getContentType must be_==("text/plain") + + client + .url(s"http://localhost:$testServerPort") + .setBody(body("text")) + .getContentType must be_==("text/plain") + + client + .url(s"http://localhost:$testServerPort") + .setBody(body(Source.empty())) + .getContentType must be_==("application/octet-stream") + } + } + + "correctly URL-encode the query string part" in { + withClient() { client => + val request = client.url("http://example.com") + .setQueryString(Map("&" -> Seq("=").asJava).asJava) + + request.getUrl must equalTo("http://example.com") + request.getQueryParameters.asScala.mapValues(_.asScala) must be_==(Map("&" -> Seq("="))) + } + } + + "discard old query parameters when setting new ones" in { + withClient() { + _.url("http://example.com") + .setQueryString(Map("bar" -> Seq("baz").asJava).asJava) + .setQueryString(Map("bar" -> Seq("bah").asJava).asJava) + .getQueryParameters.asScala.mapValues(_.asScala) must be_==(Map("bar" -> Seq("bah"))) + } + } + + "add query string param" in { + withClient() { + _.url("http://example.com") + .setQueryString("bar=baz") + .addQueryParameter("bar", "bah") + .getQueryParameters.asScala.mapValues(_.asScala) must be_==(Map("bar" -> Seq("baz", "bah"))) + } + } + + "support adding several query string values for a parameter" in { + // need to pass in mutable list, + // as the AHC implementation stores the passed in list + // and then adds elements to it from other API calls + val list = new java.util.ArrayList[String]().asInstanceOf[java.util.List[String]] + list.add("foo1") + list.add("foo2") + withClient() { client => + val request = client + .url("http://example.com") + .setQueryString(Map("play" -> list).asJava) + .addQueryParameter("play", "foo3") + + request.getQueryParameters.get("play").asScala must containTheSameElementsAs(Seq("foo1", "foo2", "foo3")) + } + } + + "support adding headers" in { + // need to pass in mutable list, + // as the AHC implementation stores the passed in list + // and then adds elements to it from other API calls + val list = new java.util.ArrayList[String]().asInstanceOf[java.util.List[String]] + list.add("value1") + withClient() { client => + val request = client.url("http://playframework.com/") + .setHeaders(Map("key" -> list).asJava) + .addHeader("key", "value2") + + request.getHeaders.get("key").asScala must containTheSameElementsAs(Seq("value1", "value2")) + request.getHeaderValues("key").asScala must containTheSameElementsAs(Seq("value1", "value2")) + request.getHeader("key").asScala must beSome("value1") + + request.getHeaderValues("raktas").asScala must beEmpty + request.getHeader("raktas").asScala must beNone + + request + .setHeaders(Map("key" -> Seq("value1").asJava).asJava) + .getHeaders.get("key").asScala must containTheSameElementsAs(Seq("value1")) + } + } + + "not make Content-Type header if there is Content-Type in headers already" in { + withClient() { + _.url("http://playframework.com/") + .addHeader("Content-Type", "fake/contenttype; charset=utf-8") + .setBody(body("I am a text/plain body")) + .getHeader("Content-Type").asScala.map(_.toLowerCase) must beSome("fake/contenttype; charset=utf-8") + } + } + + // FIXME this is different from Scala API but this is how AHC implementation is + "treat headers as case sensitive" in { + withClient() { + _.url("http://playframework.com/") + .setHeaders(Map("key" -> Seq("value1").asJava, "KEY" -> Seq("value2").asJava).asJava) + .getHeaderValues("key").asScala must be_==(Seq("value1")) + } + } + + "return underlying implementations" in { + withClient() { client => + client.getUnderlying.getClass.getName must beOneOf( + "play.shaded.ahc.org.asynchttpclient.DefaultAsyncHttpClient", + "akka.http.javadsl.Http" + ) + + client + .url(s"http://localhost:$testServerPort/index") + .get() + .toScala + .map { + _.getUnderlying.getClass.getName must beOneOf( + "play.shaded.ahc.org.asynchttpclient.netty.NettyResponse", + "akka.http.scaladsl.model.HttpResponse" + ) + } + .awaitFor(defaultTimeout) + } + } + + "request a url as an in memory string" in { + withClient() { + _.url(s"http://localhost:$testServerPort/index") + .get() + .toScala + .map(_.getBody) + .map(_ must beEqualTo("GET ")) + .awaitFor(defaultTimeout) + } + } + + "request a url as a Foo" in { + case class Foo(body: String) + + val fooBodyReadable = new BodyReadable[Foo] { + override def apply(t: StandaloneWSResponse): Foo = Foo(t.getBody) + } + + withClient() { + _.url(s"http://localhost:$testServerPort/index") + .get() + .toScala + .map(_.getBody(fooBodyReadable)) + .map(_ must beEqualTo(Foo("GET "))) + .awaitFor(defaultTimeout) + } + } + + "request a url as a stream" in { + withClient() { + _.url(s"http://localhost:$testServerPort/index") + .stream() + .toScala + .map(_.getBodyAsSource) + .flatMap(_.runWith(Sink.head(), materializer).toScala) + .map(_.utf8String must beEqualTo("GET ")) + .awaitFor(defaultTimeout) + } + } + + "request a https url" in { + withClient() { client => + // FIXME configure ssl context with custom cert and enable SSL test for AHC + if (client.isInstanceOf[StandaloneAhcWSClient]) + success + else + client.url(s"https://akka.example.org:$testServerPortHttps/scheme") + .get() + .toScala + .map(_.getBody) + .map(_ must beEqualTo("https")) + .awaitFor(defaultTimeout) + } + } + + "send post request" in { + withClient() { + _.url(s"http://localhost:$testServerPort") + .post(body("hello world")) + .toScala + .map(_.getBody must be_==("POST hello world")) + .awaitFor(defaultTimeout) + } + } + + "send patch request" in { + withClient() { + _.url(s"http://localhost:$testServerPort") + .patch(body("hello world")) + .toScala + .map(_.getBody must be_==("PATCH hello world")) + .awaitFor(defaultTimeout) + } + } + + "send put request" in { + withClient() { + _.url(s"http://localhost:$testServerPort") + .put(body("hello world")) + .toScala + .map(_.getBody must be_==("PUT hello world")) + .awaitFor(defaultTimeout) + } + } + + "send delete request" in { + withClient() { + _.url(s"http://localhost:$testServerPort") + .delete() + .toScala + .map(_.getBody must be_==("DELETE")) + .awaitFor(defaultTimeout) + } + } + + "send head request" in { + withClient() { + _.url(s"http://localhost:$testServerPort") + .head() + .toScala + .map(_.getStatus must be_==(200)) + .awaitFor(defaultTimeout) + } + } + + "send options request" in { + withClient() { + _.url(s"http://localhost:$testServerPort") + .options() + .toScala + .map(_.getBody must be_==("OPTIONS")) + .awaitFor(defaultTimeout) + } + } + + "round trip XML" in { + val document = XML.fromString( + """ + | + | hello + | world + |""".stripMargin) + document.normalizeDocument() + + withClient() { + _.url(s"http://localhost:$testServerPort/xml") + .post(body(document)) + .toScala + .map { resp => + resp.getContentType must be_==("text/plain; charset=UTF-8") + resp + } + .map(_.getBody(xml())) + .map(_.isEqualNode(document) must be_==(true)) + .awaitFor(defaultTimeout) + } + } + + "authenticate basic" in { + withClient() { client => + val requestWithoutAuth = client.url(s"http://localhost:$testServerPort/auth/basic") + requestWithoutAuth.getUsername must beNull + requestWithoutAuth.getPassword must beNull + requestWithoutAuth.getScheme must beNull + + val requestWithAuth = requestWithoutAuth + .setAuth("user", "pass", WSAuthScheme.BASIC) + + requestWithAuth.getUsername must be_==("user") + requestWithAuth.getPassword must be_==("pass") + requestWithAuth.getScheme must be_==(WSAuthScheme.BASIC) + + requestWithAuth + .get() + .toScala + .map(_.getBody) + .map(_ must be_==("Authenticated user")) + .awaitFor(defaultTimeout) + } + } + + "set host header" in { + withClient() { + _.url(s"http://localhost:$testServerPort/virtualhost") + .setVirtualHost("virtualhost:1337") + .get() + .toScala + .map(_.getBody must be_==("virtualhost:1337")) + .awaitFor(defaultTimeout) + } + } + + "complete after timeout" in { + withClient() { client => + val requestWithoutTimeout = client.url(s"http://localhost:$testServerPort/timeout") + requestWithoutTimeout.getRequestTimeoutDuration must be_==(Duration.ZERO) + + val requestWithTimeout = requestWithoutTimeout + .setRequestTimeout(Duration.ofMillis(100)) + requestWithTimeout.getRequestTimeoutDuration must be_==(Duration.ofMillis(100)) + + requestWithTimeout + .get() + .toScala + .map(_ => failure) + .recover { + case ex => + // due to java/scala conversions of future, the exception + // gets wrapped in CompletionException which we here unwrap + val e = if (ex.getCause != null) ex.getCause else ex + e must beAnInstanceOf[TimeoutException] + e.getMessage must startWith("Request timeout") + success + } + .awaitFor(defaultTimeout) + } + } + + "provide response status text" in { + withClient() { + _.url(s"http://localhost:$testServerPort/204") + .get() + .toScala + .map(_.getStatusText must be_==("No Content")) + .awaitFor(defaultTimeout) + } + } + + "allow access body more than one time" in { + withClient() { + _.url(s"http://localhost:$testServerPort/stream") + .get() + .toScala + .map { resp => + resp.getBody must beEqualTo("streamed") + resp.getBody must beEqualTo("streamed") + } + .awaitFor(defaultTimeout) + } + } + + "send and receive cookies" in { + val cookie1 = new WSCookieBuilder().setName("cookie1").setValue("cookie1").build() + val cookie2 = new WSCookieBuilder().setName("cookie2").setValue("cookie2").build() + val cookie3 = new WSCookieBuilder().setName("cookie3").setValue("cookie3").build() + val cookie4 = new WSCookieBuilder().setName("cookie4").setValue("cookie4").build() + val cookie5 = new WSCookieBuilder().setName("cookie5").setValue("cookie5").build() + def toTuple(c: WSCookie) = (c.getName, c.getValue) + withClient() { + _.url(s"http://localhost:$testServerPort/cookies") + .addCookie(cookie5) + .setCookies(Seq(cookie1).asJava) + .addCookie(cookie2) + .addCookies(cookie4) + .get() + .toScala + .map { resp => + resp.getCookies.asScala.map(toTuple) must containTheSameElementsAs(Seq(cookie1, cookie2, cookie3, cookie4).map(toTuple)) + resp.getCookie(cookie1.getName).asScala.map(toTuple) must be_==(Some(cookie1).map(toTuple)) + } + .awaitFor(defaultTimeout) + } + } + } +} diff --git a/integration-tests/src/test/scala/play/libs/ws/WSRequestFilterSpec.scala b/integration-tests/src/test/scala/play/libs/ws/WSRequestFilterSpec.scala new file mode 100644 index 00000000..0525060a --- /dev/null +++ b/integration-tests/src/test/scala/play/libs/ws/WSRequestFilterSpec.scala @@ -0,0 +1,142 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package play.libs.ws + +import org.specs2.concurrent.ExecutionEnv +import org.specs2.execute.Result +import org.specs2.matcher.FutureMatchers +import org.specs2.mutable.Specification +import play.AkkaServerProvider +import play.libs.ws.ahc.{ CallbackRequestFilter, HeaderAppendingFilter } + +import scala.compat.java8.FutureConverters._ +import scala.collection.JavaConverters._ + +trait WSRequestFilterSpec extends Specification + with AkkaServerProvider + with FutureMatchers { + + implicit def executionEnv: ExecutionEnv + + def withClient()(block: StandaloneWSClient => Result): Result + + override val routes = play.api.libs.ws.WSRequestFilterSpec.routes + + "with request filters" should { + + "execute with adhoc request filter" in { + withClient() { + _.url(s"http://localhost:$testServerPort") + .setRequestFilter(new WSRequestFilter { + override def apply(ex: WSRequestExecutor) = new WSRequestExecutor { + override def apply(r: StandaloneWSRequest) = ex.apply(r.addQueryParameter("key", "some string")) + } + }) + .get() + .toScala + .map(_.getBody must contain("some string")) + .awaitFor(defaultTimeout) + } + } + + "stream with adhoc request filter" in { + withClient() { + _.url(s"http://localhost:$testServerPort") + .setRequestFilter(new WSRequestFilter { + override def apply(ex: WSRequestExecutor) = new WSRequestExecutor { + override def apply(r: StandaloneWSRequest) = ex.apply(r.addQueryParameter("key", "some string")) + } + }) + .setMethod("GET") + .stream() + .toScala + .map(_.getBody must contain("some string")) + .awaitFor(defaultTimeout) + } + } + + "execute with one request filter" in { + val callList = new java.util.ArrayList[Integer]() + withClient() { + _.url(s"http://localhost:$testServerPort") + .setRequestFilter(new CallbackRequestFilter(callList, 1)) + .get() + .toScala + .map(_ => callList.asScala must contain(1)) + .awaitFor(defaultTimeout) + } + } + + "stream with one request filter" in { + val callList = new java.util.ArrayList[Integer]() + withClient() { + _.url(s"http://localhost:$testServerPort") + .setRequestFilter(new CallbackRequestFilter(callList, 1)) + .setMethod("GET") + .stream() + .toScala + .map(_ => callList.asScala must contain(1)) + .awaitFor(defaultTimeout) + } + } + + "execute with three request filters" in { + val callList = new java.util.ArrayList[Integer]() + withClient() { + _.url(s"http://localhost:$testServerPort") + .setRequestFilter(new CallbackRequestFilter(callList, 1)) + .setRequestFilter(new CallbackRequestFilter(callList, 2)) + .setRequestFilter(new CallbackRequestFilter(callList, 3)) + .get() + .toScala + .map(_ => callList.asScala must containTheSameElementsAs(Seq(1, 2, 3))) + .awaitFor(defaultTimeout) + } + } + + "stream with three request filters" in { + val callList = new java.util.ArrayList[Integer]() + withClient() { + _.url(s"http://localhost:$testServerPort") + .setRequestFilter(new CallbackRequestFilter(callList, 1)) + .setRequestFilter(new CallbackRequestFilter(callList, 2)) + .setRequestFilter(new CallbackRequestFilter(callList, 3)) + .setMethod("GET") + .stream() + .toScala + .map(_ => callList.asScala must containTheSameElementsAs(Seq(1, 2, 3))) + .awaitFor(defaultTimeout) + } + } + + "allow filters to modify the executing request" in { + val appendedHeader = "X-Request-Id" + val appendedHeaderValue = "someid" + withClient() { + _.url(s"http://localhost:$testServerPort") + .setRequestFilter(new HeaderAppendingFilter(appendedHeader, appendedHeaderValue)) + .get() + .toScala + .map(_.getSingleHeader(appendedHeader).get must be_==(appendedHeaderValue)) + .awaitFor(defaultTimeout) + } + } + + "allow filters to modify the streaming request" in { + val appendedHeader = "X-Request-Id" + val appendedHeaderValue = "someid" + withClient() { + _.url(s"http://localhost:$testServerPort") + .setRequestFilter(new HeaderAppendingFilter(appendedHeader, appendedHeaderValue)) + .setMethod("GET") + .stream() + .toScala + .map(_.getSingleHeader(appendedHeader).get must be_==(appendedHeaderValue)) + .awaitFor(defaultTimeout) + } + } + + } + +} diff --git a/integration-tests/src/test/scala/play/libs/ws/ahc/AhcWSClientSpec.scala b/integration-tests/src/test/scala/play/libs/ws/ahc/AhcWSClientSpec.scala index d1abf288..64096a4b 100644 --- a/integration-tests/src/test/scala/play/libs/ws/ahc/AhcWSClientSpec.scala +++ b/integration-tests/src/test/scala/play/libs/ws/ahc/AhcWSClientSpec.scala @@ -3,106 +3,19 @@ */ package play.libs.ws.ahc -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.stream.ActorMaterializer -import akka.stream.javadsl.{ Sink, Source } -import akka.util.ByteString import com.typesafe.config.ConfigFactory import org.specs2.concurrent.ExecutionEnv -import org.specs2.matcher.FutureMatchers -import org.specs2.mutable.Specification -import org.specs2.specification.AfterAll +import org.specs2.execute.Result import play.libs.ws._ -import scala.compat.java8.FutureConverters._ -import scala.concurrent.Future -import scala.concurrent.duration._ - -class AhcWSClientSpec(implicit executionEnv: ExecutionEnv) extends Specification with AfterAll with FutureMatchers with XMLBodyWritables with XMLBodyReadables { - val testServerPort = 49134 - - sequential - - // Create Akka system for thread and streaming management - implicit val system = ActorSystem() - implicit val materializer = ActorMaterializer() - - // Create the standalone WS client with no cache - val client = StandaloneAhcWSClient.create( - AhcWSClientConfigFactory.forConfig(ConfigFactory.load, this.getClass.getClassLoader), - null, - materializer - ) - - private val route = { - import akka.http.scaladsl.server.Directives._ - get { - complete("

Say hello to akka-http

") - } ~ - post { - entity(as[String]) { echo => - complete(echo) - } - } - } - - private val futureServer = { - Http().bindAndHandle(route, "localhost", testServerPort) - } - - override def afterAll = { - futureServer.foreach(_.unbind) - client.close() - system.terminate() - } - - "play.libs.ws.ahc.StandaloneAhcWSClient" should { - - "get successfully" in { - def someOtherMethod(string: String) = { - new InMemoryBodyWritable(akka.util.ByteString.fromString(string), "text/plain") - } - toScala(client.url(s"http://localhost:$testServerPort").post(someOtherMethod("hello world"))).map(response => - response.getBody() must be_==("hello world") - ).await(retries = 0, timeout = 5.seconds) - } - - "source successfully" in { - val future = toScala(client.url(s"http://localhost:$testServerPort").stream()) - val result: Future[ByteString] = future.flatMap { response: StandaloneWSResponse => - toScala(response.getBodyAsSource.runWith(Sink.head(), materializer)) - } - val expected: ByteString = ByteString.fromString("

Say hello to akka-http

") - result must be_==(expected).await(retries = 0, timeout = 5.seconds) - } - - "round trip XML successfully" in { - val document = XML.fromString(""" - | - | hello - | world - |""".stripMargin) - document.normalizeDocument() - - toScala { - client.url(s"http://localhost:$testServerPort").post(body(document)) - }.map { response => - import javax.xml.parsers.DocumentBuilderFactory - val dbf = DocumentBuilderFactory.newInstance - dbf.setNamespaceAware(true) - dbf.setCoalescing(true) - dbf.setIgnoringElementContentWhitespace(true) - dbf.setIgnoringComments(true) - val db = dbf.newDocumentBuilder - - val responseXml = response.getBody(xml()) - responseXml.normalizeDocument() - - responseXml.isEqualNode(document) must beTrue - }.await(retries = 0, timeout = 5.seconds) +class AhcWSClientSpec(implicit override val executionEnv: ExecutionEnv) extends WSClientSpec { + def withClient()(block: StandaloneWSClient => Result): Result = { + val config = AhcWSClientConfigFactory.forConfig(ConfigFactory.load, this.getClass.getClassLoader) + val client = StandaloneAhcWSClient.create(config, materializer) + try { + block(client) + } finally { + client.close() } - } - } diff --git a/integration-tests/src/test/scala/play/libs/ws/ahc/AhcWSRequestFilterSpec.scala b/integration-tests/src/test/scala/play/libs/ws/ahc/AhcWSRequestFilterSpec.scala index 22f215a9..ee3304c4 100644 --- a/integration-tests/src/test/scala/play/libs/ws/ahc/AhcWSRequestFilterSpec.scala +++ b/integration-tests/src/test/scala/play/libs/ws/ahc/AhcWSRequestFilterSpec.scala @@ -3,131 +3,19 @@ */ package play.libs.ws.ahc -import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.model.{ ContentTypes, HttpEntity } -import akka.http.scaladsl.model.headers.RawHeader -import akka.stream.ActorMaterializer import com.typesafe.config.ConfigFactory import org.specs2.concurrent.ExecutionEnv -import org.specs2.matcher.FutureMatchers -import org.specs2.mutable.Specification -import org.specs2.specification.AfterAll - -import scala.concurrent.duration._ -import scala.compat.java8.FutureConverters - -class AhcWSRequestFilterSpec(implicit executionEnv: ExecutionEnv) extends Specification with AfterAll with FutureMatchers { - val testServerPort = 49134 - - sequential - - // Create Akka system for thread and streaming management - implicit val system = ActorSystem() - implicit val materializer = ActorMaterializer() - - // Create the standalone WS client with no cache - private val client = StandaloneAhcWSClient.create( - AhcWSClientConfigFactory.forConfig(ConfigFactory.load, this.getClass.getClassLoader), - null, - materializer - ) - - private val route = { - import akka.http.scaladsl.server.Directives._ - headerValueByName("X-Request-Id") { value => - respondWithHeader(RawHeader("X-Request-Id", value)) { - val httpEntity = HttpEntity(ContentTypes.`text/html(UTF-8)`, "

Say hello to akka-http

") - complete(httpEntity) - } - } ~ { - val httpEntity = HttpEntity(ContentTypes.`text/html(UTF-8)`, "

Say hello to akka-http

") - complete(httpEntity) - } - } - - private val futureServer = { - Http().bindAndHandle(route, "localhost", testServerPort) - } - - override def afterAll: Unit = { - futureServer.foreach(_.unbind) - client.close() - system.terminate() - } - - "setRequestFilter" should { - - "execute with one request filter" in { - import scala.collection.JavaConverters._ - val callList = new java.util.ArrayList[Integer]() - val responseFuture = FutureConverters.toScala(client.url(s"http://localhost:$testServerPort") - .setRequestFilter(new CallbackRequestFilter(callList, 1)) - .get()) - responseFuture.map { _ => - callList.asScala must contain(1) - }.await(retries = 0, timeout = 5.seconds) - } - - "stream with one request filter" in { - import scala.collection.JavaConverters._ - val callList = new java.util.ArrayList[Integer]() - val responseFuture = FutureConverters.toScala(client.url(s"http://localhost:$testServerPort") - .setRequestFilter(new CallbackRequestFilter(callList, 1)) - .stream()) - responseFuture.map { _ => - callList.asScala must contain(1) - }.await(retries = 0, timeout = 5.seconds) - } - - "execute with three request filter" in { - import scala.collection.JavaConverters._ - val callList = new java.util.ArrayList[Integer]() - val responseFuture = FutureConverters.toScala(client.url(s"http://localhost:$testServerPort") - .setRequestFilter(new CallbackRequestFilter(callList, 1)) - .setRequestFilter(new CallbackRequestFilter(callList, 2)) - .setRequestFilter(new CallbackRequestFilter(callList, 3)) - .get()) - responseFuture.map { _ => - callList.asScala must containTheSameElementsAs(Seq(1, 2, 3)) - }.await(retries = 0, timeout = 5.seconds) - } - - "stream with three request filters" in { - import scala.collection.JavaConverters._ - val callList = new java.util.ArrayList[Integer]() - val responseFuture = FutureConverters.toScala(client.url(s"http://localhost:$testServerPort") - .setRequestFilter(new CallbackRequestFilter(callList, 1)) - .setRequestFilter(new CallbackRequestFilter(callList, 2)) - .setRequestFilter(new CallbackRequestFilter(callList, 3)) - .stream()) - responseFuture.map { _ => - callList.asScala must containTheSameElementsAs(Seq(1, 2, 3)) - }.await(retries = 0, timeout = 5.seconds) - } - - "allow filters to modify the executing request" in { - val appendedHeader = "X-Request-Id" - val appendedHeaderValue = "someid" - val responseFuture = FutureConverters.toScala(client.url(s"http://localhost:$testServerPort") - .setRequestFilter(new HeaderAppendingFilter(appendedHeader, appendedHeaderValue)) - .get()) - - responseFuture.map { response => - response.getHeaders.get("X-Request-Id").get(0) must be_==("someid") - }.await(retries = 0, timeout = 5.seconds) - } - - "allow filters to modify the streaming request" in { - val appendedHeader = "X-Request-Id" - val appendedHeaderValue = "someid" - val responseFuture = FutureConverters.toScala(client.url(s"http://localhost:$testServerPort") - .setRequestFilter(new HeaderAppendingFilter(appendedHeader, appendedHeaderValue)) - .stream()) - - responseFuture.map { response => - response.getHeaders.get("X-Request-Id").get(0) must be_==("someid") - }.await(retries = 0, timeout = 5.seconds) +import org.specs2.execute.Result +import play.libs.ws.{ StandaloneWSClient, WSRequestFilterSpec } + +class AhcWSRequestFilterSpec(implicit override val executionEnv: ExecutionEnv) extends WSRequestFilterSpec { + def withClient()(block: StandaloneWSClient => Result): Result = { + val config = AhcWSClientConfigFactory.forConfig(ConfigFactory.load, this.getClass.getClassLoader) + val client = StandaloneAhcWSClient.create(config, materializer) + try { + block(client) + } finally { + client.close() } } } diff --git a/integration-tests/src/test/scala/play/libs/ws/akkahttp/AkkaHttpWSClientSpec.scala b/integration-tests/src/test/scala/play/libs/ws/akkahttp/AkkaHttpWSClientSpec.scala new file mode 100644 index 00000000..bc92ed1c --- /dev/null +++ b/integration-tests/src/test/scala/play/libs/ws/akkahttp/AkkaHttpWSClientSpec.scala @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package play.libs.ws.akkahttp + +import org.specs2.concurrent.ExecutionEnv +import org.specs2.execute.Result +import play.libs.ws.{ StandaloneWSClient, WSClientSpec } + +class AkkaHttpWSClientSpec(implicit override val executionEnv: ExecutionEnv) extends WSClientSpec { + def withClient()(block: StandaloneWSClient => Result): Result = { + val client = new StandaloneAkkaHttpWSClient(system, materializer, clientHttpsContext()) + try { + block(client) + } finally { + client.close() + } + } +} diff --git a/integration-tests/src/test/scala/play/libs/ws/akkahttp/AkkaHttpWSRequestFilterSpec.scala b/integration-tests/src/test/scala/play/libs/ws/akkahttp/AkkaHttpWSRequestFilterSpec.scala new file mode 100644 index 00000000..cfb3839a --- /dev/null +++ b/integration-tests/src/test/scala/play/libs/ws/akkahttp/AkkaHttpWSRequestFilterSpec.scala @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package play.libs.ws.akkahttp + +import org.specs2.concurrent.ExecutionEnv +import org.specs2.execute.Result +import play.libs.ws.{ StandaloneWSClient, WSRequestFilterSpec } + +class AkkaHttpWSRequestFilterSpec(implicit override val executionEnv: ExecutionEnv) extends WSRequestFilterSpec { + def withClient()(block: StandaloneWSClient => Result): Result = { + val client = new StandaloneAkkaHttpWSClient(system, materializer) + try { + block(client) + } finally { + client.close() + } + } +} diff --git a/play-akka-http-ws-standalone/src/main/java/play/libs/ws/akkahttp/StandaloneAkkaHttpWSClient.java b/play-akka-http-ws-standalone/src/main/java/play/libs/ws/akkahttp/StandaloneAkkaHttpWSClient.java new file mode 100644 index 00000000..293c11da --- /dev/null +++ b/play-akka-http-ws-standalone/src/main/java/play/libs/ws/akkahttp/StandaloneAkkaHttpWSClient.java @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package play.libs.ws.akkahttp; + +import akka.actor.ActorSystem; +import akka.http.javadsl.Http; +import akka.http.javadsl.HttpsConnectionContext; +import akka.stream.Materializer; +import play.libs.ws.StandaloneWSClient; +import play.libs.ws.StandaloneWSRequest; + +import java.io.IOException; +import java.net.MalformedURLException; + +public final class StandaloneAkkaHttpWSClient implements StandaloneWSClient { + + private final ActorSystem sys; + private final Materializer mat; + private final HttpsConnectionContext ctx; + + public StandaloneAkkaHttpWSClient(ActorSystem sys, Materializer mat) { + this(sys, mat, Http.get(sys).defaultClientHttpsContext()); + } + + public StandaloneAkkaHttpWSClient(ActorSystem sys, Materializer mat, HttpsConnectionContext ctx) { + this.sys = sys; + this.mat = mat; + this.ctx = ctx; + } + + /** + * The underlying implementation of the client, if any. You must cast the returned value to the type you want. + * + * @return the backing class. + */ + @Override + public Object getUnderlying() { + return Http.get(sys); + } + + /** + * Returns a StandaloneWSRequest object representing the URL. You can append additional + * properties on the StandaloneWSRequest by chaining calls, and execute the request to + * return an asynchronous {@code CompletionStage}. + * + * @param url the URL to request + * @return the request + */ + @Override + public StandaloneWSRequest url(String url) { + try { + return new StandaloneAkkaHttpWSRequest(url, sys, mat, ctx); + } + catch (IllegalArgumentException ex) { + throw new RuntimeException(new MalformedURLException(ex.getMessage())); + } + } + + /** + * Closes this client, and releases underlying resources. + *

+ * Use this for manually instantiated clients. + */ + @Override + public void close() throws IOException { + Http.get(sys).shutdownAllConnectionPools(); + } +} diff --git a/play-akka-http-ws-standalone/src/main/java/play/libs/ws/akkahttp/StandaloneAkkaHttpWSRequest.java b/play-akka-http-ws-standalone/src/main/java/play/libs/ws/akkahttp/StandaloneAkkaHttpWSRequest.java new file mode 100644 index 00000000..9a1a84a0 --- /dev/null +++ b/play-akka-http-ws-standalone/src/main/java/play/libs/ws/akkahttp/StandaloneAkkaHttpWSRequest.java @@ -0,0 +1,656 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package play.libs.ws.akkahttp; + +import akka.actor.ActorSystem; +import akka.http.impl.model.parser.HeaderParser$; +import akka.http.javadsl.Http; +import akka.http.javadsl.HttpsConnectionContext; +import akka.http.javadsl.model.*; +import akka.http.javadsl.model.headers.Authorization; +import akka.http.javadsl.model.headers.BasicHttpCredentials; +import akka.http.javadsl.model.headers.Cookie; +import akka.japi.Pair; +import akka.parboiled2.ParserInput$; +import akka.pattern.PatternsCS; +import akka.stream.Materializer; +import play.libs.ws.*; +import scala.concurrent.duration.FiniteDuration; +import scala.util.Either; + +import java.nio.charset.Charset; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +public final class StandaloneAkkaHttpWSRequest implements StandaloneWSRequest { + + private final HttpRequest request; + private final List filters; + private final Duration timeout; + + private final ActorSystem sys; + private final Materializer mat; + private final HttpsConnectionContext ctx; + + StandaloneAkkaHttpWSRequest(String url, ActorSystem sys, Materializer mat, HttpsConnectionContext ctx) { + this(HttpRequest.create(url), new ArrayList<>(), Duration.ZERO, sys, mat, ctx); + } + + private StandaloneAkkaHttpWSRequest(HttpRequest request, List filters, Duration timeout, ActorSystem sys, Materializer mat, HttpsConnectionContext ctx) { + this.request = request; + this.filters = filters; + this.timeout = timeout; + this.sys = sys; + this.mat = mat; + this.ctx = ctx; + } + + /** + * Perform a GET on the request asynchronously. + * + * @return a promise to the response + */ + @Override + public CompletionStage get() { + return execute(HttpMethods.GET.value()); + } + + /** + * Perform a PATCH on the request asynchronously. + * + * @param body the BodyWritable + * @return a promise to the response + */ + @Override + public CompletionStage patch(BodyWritable body) { + return setBody(body).execute(HttpMethods.PATCH.value()); + } + + /** + * Perform a POST on the request asynchronously. + * + * @param body the BodyWritable + * @return a promise to the response + */ + @Override + public CompletionStage post(BodyWritable body) { + return setBody(body).execute(HttpMethods.POST.value()); + } + + /** + * Perform a PUT on the request asynchronously. + * + * @param body the BodyWritable + * @return a promise to the response + */ + @Override + public CompletionStage put(BodyWritable body) { + return setBody(body).execute(HttpMethods.PUT.value()); + } + + /** + * Perform a DELETE on the request asynchronously. + * + * @return a promise to the response + */ + @Override + public CompletionStage delete() { + return execute(HttpMethods.DELETE.value()); + } + + /** + * Perform a HEAD on the request asynchronously. + * + * @return a promise to the response + */ + @Override + public CompletionStage head() { + return execute(HttpMethods.HEAD.value()); + } + + /** + * Perform an OPTIONS on the request asynchronously. + * + * @return a promise to the response + */ + @Override + public CompletionStage options() { + return execute(HttpMethods.OPTIONS.value()); + } + + /** + * Executes an arbitrary method on the request asynchronously. + * + * @param method The method to execute + * @return a promise to the response + */ + @Override + public CompletionStage execute(String method) { + return setMethod(method).execute(); + } + + /** + * Executes an arbitrary method on the request asynchronously. Should be used with setMethod(). + * + * @return a promise to the response + */ + @Override + public CompletionStage execute() { + final WSRequestExecutor akkaExecutor = (request) -> { + final CompletableFuture resultFuture = + Http.get(sys) + .singleRequest(((StandaloneAkkaHttpWSRequest)request).request, ctx) + .thenApply((r) -> new StandaloneAkkaHttpWSResponse(r, mat)) + .toCompletableFuture(); + + if (timeout.equals(Duration.ZERO)) { + return resultFuture.thenApply((response) -> (StandaloneWSResponse)response); + } + else { + final CompletableFuture timeoutException = new CompletableFuture<>(); + timeoutException.completeExceptionally(new TimeoutException("Request timeout after " + timeout)); + + final CompletableFuture timeoutFuture = + PatternsCS.after( + FiniteDuration.apply(timeout.toNanos(), TimeUnit.NANOSECONDS), + sys.scheduler(), sys.dispatcher(), + timeoutException + ).toCompletableFuture(); + + return CompletableFuture.anyOf(resultFuture, timeoutFuture).thenApply((response) -> (StandaloneWSResponse)response); + } + }; + + return filters.stream() + .reduce(akkaExecutor, (executor, filter) -> filter.apply(executor), (exec1, exec2) -> exec2) + .apply(this); + } + + /** + * Executes this request and streams the response body. + *

+ * Use {@code response.bodyAsSource()} with this method. + * + * @return a promise to the response + */ + @Override + public CompletionStage stream() { + return execute(); + } + + /** + * Sets the HTTP method this request should use, where the no args execute() method is invoked. + * + * @param method the HTTP method. + * @return the modified WSRequest. + */ + @Override + public StandaloneWSRequest setMethod(String method) { + return copy(request.withMethod( + HttpMethods.lookup(method) + .orElseThrow(() -> new IllegalArgumentException("Unknown HTTP method " + method)))); + } + + /** + * Set the body this request should use. + * + * @param body the body of the request. + * @return the modified WSRequest. + */ + @Override + public StandaloneWSRequest setBody(BodyWritable body) { + if (body instanceof InMemoryBodyWritable) { + final InMemoryBodyWritable writable = (InMemoryBodyWritable)body; + return copy(request.withEntity( + HttpEntities.create(parseContentType(body.contentType()), writable.body().get()))); + } + else if (body instanceof SourceBodyWritable) { + final SourceBodyWritable writable = (SourceBodyWritable)body; + return copy(request.withEntity( + HttpEntities.create(parseContentType(body.contentType()), writable.body().get()))); + } + else { + throw new IllegalArgumentException("Unsupported BodyWritable: " + body); + } + } + + /** + * Set headers to the request. Note that duplicate headers are allowed + * by the HTTP specification, and removing a header is not available + * through this API. Any existing header will be discarded here. + * + * @param headers the headers + * @return the modified WSRequest. + */ + @Override + public StandaloneWSRequest setHeaders(Map> headers) { + // FIXME JAVA API no Java Api to replace headers on a request + HttpRequest requestNoHeaders = request; + for (HttpHeader h: request.getHeaders()) { + requestNoHeaders = requestNoHeaders.removeHeader(h.name()); + } + + return copy( + requestNoHeaders.addHeaders( + headers.entrySet().stream().flatMap( + (h) -> h.getValue().stream().map((v) -> parseHeader(h.getKey(), v)) + ).collect(Collectors.toList())) + ); + } + + /** + * Adds a header to the request. Note that duplicate headers are allowed + * by the HTTP specification, and removing a header is not available + * through this API. Existent headers will be preserved. + * + * @param name the header name + * @param value the header value + * @return the modified WSRequest. + */ + @Override + public StandaloneWSRequest addHeader(String name, String value) { + return copy(request.addHeader(parseHeader(name, value))); + } + + /** + * Sets the query string to query. + * + * @param query the fully formed query string + * @return the modified WSRequest. + */ + @Override + public StandaloneWSRequest setQueryString(String query) { + return copy( + request.withUri( + request.getUri().query( + Query.create(query)))); + } + + /** + * Adds a query parameter with the given name, this can be called repeatedly and will preserve existing values. + * Duplicate query parameters are allowed. + * + * @param name the query parameter name + * @param value the query parameter value + * @return the modified WSRequest. + */ + @Override + public StandaloneWSRequest addQueryParameter(String name, String value) { + return copy( + request.withUri( + request.getUri().query( + request.getUri().query().withParam(name, value)))); + } + + /** + * Sets the query string parameters. This will discard existing values. + * + * @param params the query string parameters + * @return the modified WSRequest. + */ + @Override + public StandaloneWSRequest setQueryString(Map> params) { + return copy( + request.withUri( + request.getUri().query( + Query.create(params.entrySet().stream().flatMap( + (entry) -> entry.getValue().stream().map((value) -> Pair.create(entry.getKey(), value)) + ).collect(Collectors.toList()))))); + } + + /** + * Add a new cookie. This can be called repeatedly and will preserve existing cookies. + * + * @param cookie the cookie to be added + * @return the modified WSRequest. + * @see #addCookies(WSCookie...) + * @see #setCookies(List) + */ + @Override + public StandaloneWSRequest addCookie(WSCookie cookie) { + return addCookies(cookie); + } + + /** + * Add new cookies. This can be called repeatedly and will preserve existing cookies. + * + * @param cookies the list of cookies to be added + * @return the modified WSRequest. + * @see #addCookie(WSCookie) + * @see #setCookies(List) + */ + @Override + public StandaloneWSRequest addCookies(WSCookie... cookies) { + return copy(request.addHeaders( + Arrays.stream(cookies) + .map((c) -> Cookie.create(c.getName(), c.getValue())) + .collect(Collectors.toList()))); + } + + /** + * Set the request cookies. This discard the existing cookies. + * + * @param cookies the cookies to be used. + * @return the modified WSRequest. + */ + @Override + public StandaloneWSRequest setCookies(List cookies) { + return copy(request.removeHeader("Cookie")).addCookies(cookies.toArray(new WSCookie[cookies.size()])); + } + + /** + * Sets the authentication header for the current request using BASIC authentication. + * + * @param userInfo a string formed as "username:password". + * @return the modified WSRequest. + */ + @Override + public StandaloneWSRequest setAuth(String userInfo) { + final String[] credentials = userInfo.split(":"); + return setAuth(credentials[0], credentials[1]); + } + + /** + * Sets the authentication header for the current request using BASIC authentication. + * + * @param username the basic auth username + * @param password the basic auth password + * @return the modified WSRequest. + */ + @Override + public StandaloneWSRequest setAuth(String username, String password) { + return setAuth(username, password, WSAuthScheme.BASIC); + } + + /** + * Sets the authentication header for the current request. + * + * @param username the username + * @param password the password + * @param scheme authentication scheme + * @return the modified WSRequest. + */ + @Override + public StandaloneWSRequest setAuth(String username, String password, WSAuthScheme scheme) { + if (scheme.equals(WSAuthScheme.BASIC)) { + return copy(request.addCredentials(BasicHttpCredentials.createBasicHttpCredentials(username, password))); + } + else { + throw new IllegalArgumentException("Authentication scheme [" + scheme + "] not yet supported"); + } + } + + /** + * Sets an (OAuth) signature calculator. + * + * @param calculator the signature calculator + * @return the modified WSRequest + */ + @Override + public StandaloneWSRequest sign(WSSignatureCalculator calculator) { + // FIXME https://github.com/playframework/play-ws/issues/207 + throw new UnsupportedOperationException("Implementation is missing"); + } + + /** + * Sets whether redirects (301, 302) should be followed automatically. + * + * @param followRedirects true if the request should follow redirects + * @return the modified WSRequest + */ + @Override + public StandaloneWSRequest setFollowRedirects(boolean followRedirects) { + // FIXME https://github.com/playframework/play-ws/issues/207 + throw new UnsupportedOperationException("Implementation is missing"); + } + + /** + * Sets the virtual host as a "hostname:port" string. + * + * @param virtualHost the virtual host + * @return the modified WSRequest + */ + @Override + public StandaloneWSRequest setVirtualHost(String virtualHost) { + // FIXME JAVA API missing Host.create(Authority) Java Api in Akka Http + return copy(request.addHeader(akka.http.scaladsl.model.headers.Host.apply( + akka.http.scaladsl.model.Uri.Authority$.MODULE$.parse( + ParserInput$.MODULE$.apply(virtualHost), + Charset.forName("UTF8"), + akka.http.scaladsl.model.Uri$ParsingMode$Relaxed$.MODULE$)))); + } + + /** + * Sets the request timeout duration. Java {@link Duration} class does not have a specific instance + * to represent an infinite timeout, but according to the docs, in practice, you can somehow emulate + * it: + *

+ *

+ * A physical duration could be of infinite length. For practicality, the duration is stored + * with constraints similar to Instant. The duration uses nanosecond resolution with a maximum + * value of the seconds that can be held in a long. This is greater than the current estimated + * age of the universe. + *
+ *

+ * Play WS uses the convention of setting a duration with negative value to have an infinite timeout. + * So you will have: + *

+ *

java.time.Duration timeout = Duration.ofSeconds(-1);
. + * + * In practice, you can also have an extreme long duration, like: + * + *
java.time.Duration timeout = Duration.ofMillis(Long.MAX_VALUE);
+ * + * And, as the {@link Duration} docs states, this will be good enough since this duration is greater than + * the current estimate age of the universe. + * + * @param timeout the request timeout in milliseconds. A duration of -1 indicates an infinite request timeout. + * @return the modified WSRequest. + */ + @Override + public StandaloneWSRequest setRequestTimeout(Duration timeout) { + return copy(timeout); + } + + /** + * Adds a request filter. + * + * @param filter a transforming filter. + * @return the modified request. + */ + @Override + public StandaloneWSRequest setRequestFilter(WSRequestFilter filter) { + final List newFilters = new ArrayList<>(filters); + newFilters.add(filter); + return copy(newFilters); + } + + /** + * Set the content type. If the request body is a String, and no charset parameter is included, then it will + * default to UTF-8. + * + * @param contentType The content type + * @return the modified WSRequest + */ + @Override + public StandaloneWSRequest setContentType(String contentType) { + // FIXME JAVA API entity.withContentType is missing in Java Api of Akka Http + return copy(request.withEntity( + ((RequestEntity)((akka.http.scaladsl.model.HttpEntity)request.entity()).withContentType((akka.http.scaladsl.model.ContentType)parseContentType(contentType))) + )); + } + + /** + * @return the URL of the request. This has not passed through an internal request builder and so will not be signed. + */ + @Override + public String getUrl() { + return request.getUri().query(Query.EMPTY).toString(); + } + + /** + * @return the headers (a copy to prevent side-effects). This has not passed through an internal request builder and so will not be signed. + */ + @Override + public Map> getHeaders() { + final Map> headers = new HashMap<>(); + for (final HttpHeader header: request.getHeaders()) { + if (headers.containsKey(header.name())) { + headers.get(header.name()).add(header.value()); + } + else { + headers.put(header.name(), new ArrayList<>(Collections.singletonList(header.value()))); + } + } + return headers; + } + + /** + * Get all the values of header with the specified name. If there are no values for + * the header with the specified name, than an empty List is returned. + * + * @param name the header name. + * @return all the values for this header name. + */ + @Override + public List getHeaderValues(String name) { + return getHeaders().getOrDefault(name, new ArrayList<>()); + } + + /** + * Get the value of the header with the specified name. If there are more than one values + * for this header, the first value is returned. If there are no values, than an empty + * Optional is returned. + * + * @param name the header name + * @return the header value + */ + @Override + public Optional getHeader(String name) { + return getHeaderValues(name).stream().findFirst(); + } + + /** + * @return the query parameters (a copy to prevent side-effects). This has not passed through an internal request builder and so will not be signed. + */ + @Override + public Map> getQueryParameters() { + return request.getUri().query().toMultiMap(); + } + + /** + * @return the auth username, null if not an authenticated request. + */ + @Override + public String getUsername() { + return request.getHeader(Authorization.class) + .filter((a) -> a.credentials() instanceof BasicHttpCredentials) + .map((a) -> (BasicHttpCredentials)a.credentials()) + .map(BasicHttpCredentials::username) + .orElseGet(() -> null); + } + + /** + * @return the auth password, null if not an authenticated request + */ + @Override + public String getPassword() { + return request.getHeader(Authorization.class) + .filter((a) -> a.credentials() instanceof BasicHttpCredentials) + .map((a) -> (BasicHttpCredentials)a.credentials()) + .map(BasicHttpCredentials::password) + .orElseGet(() -> null); + } + + /** + * @return the auth scheme, null if not an authenticated request. + */ + @Override + public WSAuthScheme getScheme() { + return request.getHeader(Authorization.class) + .filter((a) -> a.credentials() instanceof BasicHttpCredentials) + .map((a) -> WSAuthScheme.BASIC) + .orElseGet(() -> null); + } + + /** + * @return the signature calculator (example: OAuth), null if none is set. + */ + @Override + public WSSignatureCalculator getCalculator() { + // FIXME https://github.com/playframework/play-ws/issues/207 + throw new UnsupportedOperationException("Implementation is missing"); + } + + /** + * Gets the original request timeout duration, passed into the request as input. + * + * @return the timeout duration. + */ + @Override + public Duration getRequestTimeoutDuration() { + return timeout; + } + + /** + * @return true if the request is configure to follow redirect, false if it is configure not to, null if nothing is configured and the global client preference should be used instead. + */ + @Override + public boolean getFollowRedirects() { + // FIXME https://github.com/playframework/play-ws/issues/207 + return false; + } + + /** + * @return the content type, if any, or null. + */ + @Override + public String getContentType() { + // FIXME JAVA API no CotentTypes.NoContentType Java Api in Akka Http + return request.entity().getContentType() + .equals(akka.http.scaladsl.model.ContentTypes.NoContentType()) ? null : request.entity().getContentType().toString(); + } + + private HttpHeader parseHeader(String name, String value) { + // FIXME JAVA API missing HttpHeader.parse Java API in Akka Http + final akka.http.scaladsl.model.HttpHeader.ParsingResult result = + akka.http.scaladsl.model.HttpHeader$.MODULE$.parse(name, value, HeaderParser$.MODULE$.DefaultSettings()); + + if (result instanceof akka.http.scaladsl.model.HttpHeader$ParsingResult$Ok) { + return ((akka.http.scaladsl.model.HttpHeader$ParsingResult$Ok)result).header(); + } + else { + throw new IllegalArgumentException("Unable to parse header [" + name + "] with value [" + value + "]"); + } + } + + private ContentType parseContentType(String contentType) { + // FIXME JAVA API missing ContentType.parse Java API in Akka Http + final Either, akka.http.scaladsl.model.ContentType> contentTypeEither = akka.http.scaladsl.model.ContentType$.MODULE$.parse(contentType); + if (contentTypeEither.isRight()) { + return contentTypeEither.right().get(); + } + else { + throw new IllegalArgumentException("Unable to parse content type: " + contentType); + } + } + + private StandaloneWSRequest copy(HttpRequest request) { + return new StandaloneAkkaHttpWSRequest(request, this.filters, this.timeout, this.sys, this.mat, this.ctx); + } + + private StandaloneWSRequest copy(List filters) { + return new StandaloneAkkaHttpWSRequest(this.request, filters, this.timeout, this.sys, this.mat, this.ctx); + } + + private StandaloneWSRequest copy(Duration timeout) { + return new StandaloneAkkaHttpWSRequest(this.request, this.filters, timeout, this.sys, this.mat, this.ctx); + } + +} diff --git a/play-akka-http-ws-standalone/src/main/java/play/libs/ws/akkahttp/StandaloneAkkaHttpWSResponse.java b/play-akka-http-ws-standalone/src/main/java/play/libs/ws/akkahttp/StandaloneAkkaHttpWSResponse.java new file mode 100644 index 00000000..5e109210 --- /dev/null +++ b/play-akka-http-ws-standalone/src/main/java/play/libs/ws/akkahttp/StandaloneAkkaHttpWSResponse.java @@ -0,0 +1,211 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package play.libs.ws.akkahttp; + +import akka.http.javadsl.model.HttpHeader; +import akka.http.javadsl.model.HttpResponse; +import akka.http.javadsl.model.ResponseEntity; +import akka.http.javadsl.model.headers.SetCookie; +import akka.stream.Materializer; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.util.ByteString; +import play.libs.ws.BodyReadable; +import play.libs.ws.StandaloneWSResponse; +import play.libs.ws.WSCookie; +import play.libs.ws.WSCookieBuilder; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +public final class StandaloneAkkaHttpWSResponse implements StandaloneWSResponse { + + // FIXME make configurable + final Duration UNMARSHAL_TIMEOUT = Duration.ofSeconds(1); + + private final HttpResponse response; + private final Materializer mat; + + private HttpResponse strictResponse; + + StandaloneAkkaHttpWSResponse(HttpResponse response, Materializer mat) { + this.response = response; + this.mat = mat; + } + + /** + * @return all the headers from the response. + */ + @Override + public Map> getHeaders() { + final Map> headers = new HashMap<>(); + for (final HttpHeader header: response.getHeaders()) { + if (headers.containsKey(header.name())) { + headers.get(header.name()).add(header.value()); + } + else { + headers.put(header.name(), new ArrayList<>(Collections.singletonList(header.value()))); + } + } + return headers; + } + + /** + * @return the underlying implementation response object, if any. + */ + @Override + public Object getUnderlying() { + return response; + } + + /** + * @return the HTTP status code from the response. + */ + @Override + public int getStatus() { + return response.status().intValue(); + } + + /** + * @return the text associated with the status code. + */ + @Override + public String getStatusText() { + return response.status().reason(); + } + + /** + * @return all the cookies from the response. + */ + @Override + public List getCookies() { + return StreamSupport.stream(response.getHeaders().spliterator(), false) + .filter((h) -> h instanceof SetCookie) + .map((h) -> { + final SetCookie c = (SetCookie)h; + return new WSCookieBuilder().setName(c.cookie().name()).setValue(c.cookie().value()).build(); + }) + .collect(Collectors.toList()); + } + + /** + * @param name the cookie name + * @return a single cookie from the response, if any. + */ + @Override + public Optional getCookie(String name) { + return getCookies().stream().filter((c) -> c.getName().equals(name)).findFirst(); + } + + /** + * @return the content type. + */ + @Override + public String getContentType() { + // FIXME JAVA API no CotentTypes.NoContentType Java Api in Akka Http + return response.entity().getContentType() + .equals(akka.http.scaladsl.model.ContentTypes.NoContentType()) ? null : response.entity().getContentType().toString(); + } + + /** + * Returns the response getBody as a particular type, through a + * {@link BodyReadable} transformation. You can define your + * own {@link BodyReadable} types: + *

+ *

+   * {@code public class MyClass {
+   *   private BodyReadable fooReadable = (response) -> new Foo();
+   *
+   *   public void readAsFoo(StandaloneWSResponse response) {
+   *       Foo foo = response.getBody(fooReadable);
+   *   }
+   * }
+   * }
+   * 
+ *

+ * or use {@code play.libs.ws.ahc.DefaultResponseReadables} + * for the built-ins: + *

+ *

+   * {@code public class MyClass implements DefaultResponseReadables {
+   *     public void readAsString(StandaloneWSResponse response) {
+   *         String getBody = response.getBody(string());
+   *     }
+   *
+   *     public void readAsJson(StandaloneWSResponse response) {
+   *         JsonNode json = response.getBody(json());
+   *     }
+   * }
+   * }
+   * 
+ * + * @param readable the readable to convert the response to a T + * @return the response getBody transformed into an instance of T + */ + @Override + public T getBody(BodyReadable readable) { + return readable.apply(this); + } + + /** + * The response body decoded as String, using a simple algorithm to guess the encoding. + *

+ * This decodes the body to a string representation based on the following algorithm: + *

+ * 1. Look for a "charset" parameter on the Content-Type. If it exists, set `charset` to its value and goto step 3. + * 2. If the Content-Type is of type "text", set $charset to "ISO-8859-1"; else set `charset` to "UTF-8". + * 3. Decode the raw bytes of the body using `charset`. + *

+ * Note that this does not take into account any special cases for specific content types. For example, for + * application/json, we do not support encoding autodetection and will trust the charset parameter if provided. + * + * @return the response body parsed as a String using the above algorithm. + */ + @Override + public String getBody() { + return getBodyAsBytes().utf8String(); + } + + @Override + public ByteString getBodyAsBytes() { + try { + // FIXME JAVA API no Unmarshalling Java API in Akka Http + return getStrictResponse().entity().getDataBytes() + .runWith(Sink.fold(ByteString.empty(), (b1, b2) -> b1.concat(b2)), mat) + .toCompletableFuture() + .get(UNMARSHAL_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + @Override + public Source getBodyAsSource() { + return response.entity().getDataBytes(); + } + + private synchronized HttpResponse getStrictResponse() { + if (strictResponse != null) { + return strictResponse; + } + else { + try { + final ResponseEntity strictEntity = response.entity() + .toStrict(UNMARSHAL_TIMEOUT.toMillis(), mat) + .toCompletableFuture() + .get(UNMARSHAL_TIMEOUT.toNanos(), TimeUnit.NANOSECONDS); + // FIXME JAVA API no toStrict Java API in Akka Http + this.strictResponse = response.withEntity(strictEntity); + return this.strictResponse; + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } +} diff --git a/play-akka-http-ws-standalone/src/main/scala/play/api/libs/ws/akkahttp/StandaloneAkkaHttpWSClient.scala b/play-akka-http-ws-standalone/src/main/scala/play/api/libs/ws/akkahttp/StandaloneAkkaHttpWSClient.scala new file mode 100644 index 00000000..9052155f --- /dev/null +++ b/play-akka-http-ws-standalone/src/main/scala/play/api/libs/ws/akkahttp/StandaloneAkkaHttpWSClient.scala @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package play.api.libs.ws.akkahttp + +import akka.actor.ActorSystem +import akka.http.scaladsl.{ Http, HttpsConnectionContext } +import akka.http.scaladsl.model.IllegalUriException +import akka.stream.Materializer +import play.api.libs.ws.{ StandaloneWSClient, StandaloneWSRequest } + +import scala.util.control.NonFatal + +object StandaloneAkkaHttpWSClient { + def apply()(implicit sys: ActorSystem, mat: Materializer): StandaloneWSClient = + apply(Http().defaultClientHttpsContext)(sys, mat) + + def apply(ctx: HttpsConnectionContext)(implicit sys: ActorSystem, mat: Materializer): StandaloneWSClient = + new StandaloneAkkaHttpWSClient()(sys, mat, ctx) +} + +final class StandaloneAkkaHttpWSClient private ()( + implicit + val sys: ActorSystem, val mat: Materializer, val ctx: HttpsConnectionContext +) extends StandaloneWSClient { + /** + * The underlying implementation of the client, if any. You must cast explicitly to the type you want. + * + * @tparam T the type you are expecting (i.e. isInstanceOf) + * @return the backing class. + */ + override def underlying[T]: T = Http().asInstanceOf[T] + + /** + * Generates a request. Throws IllegalArgumentException if the URL is invalid. + * + * @param url The base URL to make HTTP requests to. + * @return a request + */ + override def url(url: String): StandaloneWSRequest = try { + StandaloneAkkaHttpWSRequest(url) + } catch { + case ex: IllegalUriException => throw new IllegalArgumentException(ex.getMessage, ex) + case NonFatal(ex) => throw ex + } + + /** + * Closes this client, and releases underlying resources. + */ + override def close(): Unit = + Http().shutdownAllConnectionPools() +} diff --git a/play-akka-http-ws-standalone/src/main/scala/play/api/libs/ws/akkahttp/StandaloneAkkaHttpWSRequest.scala b/play-akka-http-ws-standalone/src/main/scala/play/api/libs/ws/akkahttp/StandaloneAkkaHttpWSRequest.scala new file mode 100644 index 00000000..48babbdb --- /dev/null +++ b/play-akka-http-ws-standalone/src/main/scala/play/api/libs/ws/akkahttp/StandaloneAkkaHttpWSRequest.scala @@ -0,0 +1,341 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package play.api.libs.ws.akkahttp + +import java.net.URI + +import akka.actor.ActorSystem +import akka.http.scaladsl.{ Http, HttpsConnectionContext } +import akka.http.scaladsl.model.HttpHeader.ParsingResult +import akka.http.scaladsl.model.Uri.Authority +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers._ +import akka.stream.Materializer +import play.api.libs.ws._ + +import scala.collection.immutable +import scala.concurrent.duration.Duration.Infinite +import scala.concurrent.{ Future, TimeoutException } +import scala.concurrent.duration.{ Duration, FiniteDuration } + +object StandaloneAkkaHttpWSRequest { + def apply(url: String)(implicit sys: ActorSystem, mat: Materializer, ctx: HttpsConnectionContext): StandaloneAkkaHttpWSRequest = new StandaloneAkkaHttpWSRequest(HttpRequest().withUri(Uri.parseAbsolute(url)), Seq.empty, Duration.Inf) +} + +final class StandaloneAkkaHttpWSRequest private ( + val request: HttpRequest, + val filters: Seq[WSRequestFilter], + val timeout: Duration +)(implicit val sys: ActorSystem, val mat: Materializer, val ctx: HttpsConnectionContext) extends StandaloneWSRequest { + + override type Self = StandaloneWSRequest + override type Response = StandaloneWSResponse + + /** + * The base URL for this request + */ + override def url: String = request.uri.toString + + /** + * The URI for this request + */ + override def uri: URI = + // TODO convert directly instead of via string + new URI(request.uri.toString) + + /** + * The content type for this request, if any is defined. + */ + override def contentType: Option[String] = request.entity.contentType match { + case ContentTypes.NoContentType => None + case contentType => Some(contentType.toString) + } + + /** + * The method for this request + */ + override def method: String = request.method.value + + /** + * The body of this request + */ + override def body: WSBody = request.entity match { + case HttpEntity.Empty => EmptyBody + case HttpEntity.Strict(_, data) => InMemoryBody(data) + case e: HttpEntity.Chunked => SourceBody(e.dataBytes) + case other => throw new IllegalArgumentException(s"Unknown request body type [${request.entity}]") + } + + /** + * The headers for this request + */ + override def headers: Map[String, Seq[String]] = + request.headers + .map(h => HttpHeader.parse(h.name().toLowerCase, h.value())) + .collect { + case ParsingResult.Ok(header, _) => header + } + .groupBy(_.name) + .mapValues(_.map(_.value())) + + /** + * The query string for this request + */ + override def queryString: Map[String, Seq[String]] = request.uri.query().toMultiMap + + /** + * The cookies for this request + */ + override def cookies: Seq[WSCookie] = + request.cookies.map(c => DefaultWSCookie(c.name, c.value)) + + /** + * A calculator of the signature for this request + */ + override def calc: Option[WSSignatureCalculator] = + // FIXME https://github.com/playframework/play-ws/issues/207 + None + + /** + * The authentication this request should use + */ + override def auth: Option[(String, String, WSAuthScheme)] = + request.header[Authorization].map(_.credentials).collect { + case BasicHttpCredentials(username, password) => (username, password, WSAuthScheme.BASIC) + case other => throw new IllegalArgumentException(s"Authorization [$other] not yet supported") + } + + /** + * Whether this request should follow redirects + */ + override def followRedirects: Option[Boolean] = + // FIXME https://github.com/playframework/play-ws/issues/207 + Some(false) + + /** + * The timeout for the request + */ + override def requestTimeout: Option[Int] = timeout match { + case duration: FiniteDuration => Some(duration.toMillis.toInt) + case duration: Infinite => None + } + + /** + * The virtual host this request will use + */ + override def virtualHost: Option[String] = request.header[Host].map(h => h.host.address + ":" + h.port) + + /** + * The proxy server this request will use + */ + override def proxyServer: Option[WSProxyServer] = + // FIXME https://github.com/playframework/play-ws/issues/207 + None + + /** + * sets the signature calculator for the request + * + * @param calc the signature calculator + */ + override def sign(calc: WSSignatureCalculator): Self = + // FIXME https://github.com/playframework/play-ws/issues/207 + ??? + + /** + * sets the authentication realm + */ + override def withAuth(username: String, password: String, scheme: WSAuthScheme): Self = + scheme match { + case WSAuthScheme.BASIC => copy(request = request.addHeader(Authorization(BasicHttpCredentials(username, password)))) + case authScheme => throw new IllegalArgumentException(s"Authentication scheme [$scheme] not yet supported") + } + + /** + * Returns this request with the given headers, discarding the existing ones. + * + * @param headers the headers to be used + */ + override def withHttpHeaders(headers: (String, String)*): Self = + copy(request = request.withHeaders(headers + .map { case (name, value) => HttpHeader.parse(name, value) } + .collect { + case ParsingResult.Ok(header, _) => header + } + .to[immutable.Seq] + )) + + /** + * Returns this request with the given query string parameters, discarding the existing ones. + * + * @param parameters the query string parameters + */ + override def withQueryStringParameters(parameters: (String, String)*): Self = + copy(request = request.copy(uri = request.uri.withQuery(Uri.Query(parameters.reverse: _*)))) + + /** + * Returns this request with the given cookies, discarding the existing ones. + * + * @param cookies the cookies to be used + */ + override def withCookies(cookies: WSCookie*): Self = + copy(request = request.mapHeaders(h => h.filter { + case c: Cookie => false + case _ => true + } ++ cookies.map(c => Cookie(c.name, c.value)))) + + /** + * Sets whether redirects (301, 302) should be followed automatically + */ + override def withFollowRedirects(follow: Boolean): Self = + // FIXME https://github.com/playframework/play-ws/issues/207 + ??? + + /** + * Sets the maximum time you expect the request to take. + * Use Duration.Inf to set an infinite request timeout. + * Warning: a stream consumption will be interrupted when this time is reached unless Duration.Inf is set. + */ + override def withRequestTimeout(timeout: Duration): Self = + copy(timeout = timeout) + + /** + * Adds a filter to the request that can transform the request for subsequent filters. + */ + override def withRequestFilter(filter: WSRequestFilter): Self = + copy(filters = filters :+ filter) + + /** + * Sets the virtual host to use in this request + */ + override def withVirtualHost(vh: String): Self = + copy(request = request.addHeader(Host(Authority.parse(vh)))) + + /** + * Sets the proxy server to use in this request + */ + override def withProxyServer(proxyServer: WSProxyServer): Self = + // FIXME https://github.com/playframework/play-ws/issues/207 + ??? + + /** + * Sets the method for this request + */ + override def withMethod(method: String): Self = + copy(request = request.withMethod( + HttpMethods.getForKey(method) + .getOrElse(throw new IllegalArgumentException(s"Unknown HTTP method $method")))) + + /** + * Sets the body for this request. + */ + override def withBody[T: BodyWritable](body: T): Self = { + val writable = implicitly[BodyWritable[T]] + val contentType = ContentType.parse(writable.contentType) match { + case Left(_) => throw new IllegalArgumentException(s"Unknown content type [${writable.contentType}]") + case Right(ct) => ct + } + + val requestWithEntity = request.withEntity(writable.transform(body) match { + case InMemoryBody(bytes) => HttpEntity(contentType, bytes) + case SourceBody(source) => HttpEntity(contentType, source) + case EmptyBody => HttpEntity.Empty + }) + + copy(request = requestWithEntity) + } + + /** + * Performs a GET. + */ + override def get(): Future[Response] = + execute(HttpMethods.GET.value) + + /** + * Performs a PATCH request. + * + * @param body the payload body submitted with this request + * @return a future with the response for the PATCH request + */ + override def patch[T: BodyWritable](body: T): Future[Response] = + withBody(body).execute(HttpMethods.PATCH.value) + + /** + * Performs a POST request. + * + * @param body the payload body submitted with this request + * @return a future with the response for the POST request + */ + override def post[T: BodyWritable](body: T): Future[Response] = + withBody(body).execute(HttpMethods.POST.value) + + /** + * Performs a PUT request. + * + * @param body the payload body submitted with this request + * @return a future with the response for the PUT request + */ + override def put[T: BodyWritable](body: T): Future[Response] = + withBody(body).execute(HttpMethods.PUT.value) + + /** + * Perform a DELETE on the request asynchronously. + */ + override def delete(): Future[Response] = + execute(HttpMethods.DELETE.value) + + /** + * Perform a HEAD on the request asynchronously. + */ + override def head(): Future[Response] = + execute(HttpMethods.HEAD.value) + + /** + * Perform a OPTIONS on the request asynchronously. + */ + override def options(): Future[Response] = + execute(HttpMethods.OPTIONS.value) + + /** + * Executes the given HTTP method. + * + * @param method the HTTP method that will be executed + * @return a future with the response for this request + */ + override def execute(method: String): Future[Response] = + withMethod(method).execute() + + /** + * Execute this request + */ + override def execute(): Future[Response] = { + val akkaExecutor = WSRequestExecutor { request => + import sys.dispatcher + val akkaRequest = request.asInstanceOf[StandaloneAkkaHttpWSRequest].request + val timeoutFuture = timeout match { + case duration: Infinite => Seq.empty[Future[StandaloneAkkaHttpWSResponse]] + case duration: FiniteDuration => + Seq(akka.pattern.after(duration, sys.scheduler)(Future.failed(new TimeoutException(s"Request timeout after $duration")))) + } + Future.firstCompletedOf( + timeoutFuture :+ + Http().singleRequest(akkaRequest, connectionContext = ctx).map(StandaloneAkkaHttpWSResponse.apply)(sys.dispatcher) + ) + } + + val execution = filters.foldRight(akkaExecutor)((filter, executor) => filter.apply(executor)) + + execution(this) + } + + /** + * Execute this request and stream the response body. + */ + override def stream(): Future[Response] = execute() + + private def copy( + request: HttpRequest = request, + filters: Seq[WSRequestFilter] = filters, + timeout: Duration = timeout + ) = new StandaloneAkkaHttpWSRequest(request, filters, timeout) +} diff --git a/play-akka-http-ws-standalone/src/main/scala/play/api/libs/ws/akkahttp/StandaloneAkkaHttpWSResponse.scala b/play-akka-http-ws-standalone/src/main/scala/play/api/libs/ws/akkahttp/StandaloneAkkaHttpWSResponse.scala new file mode 100644 index 00000000..5937ba2d --- /dev/null +++ b/play-akka-http-ws-standalone/src/main/scala/play/api/libs/ws/akkahttp/StandaloneAkkaHttpWSResponse.scala @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package play.api.libs.ws.akkahttp + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.HttpResponse +import akka.http.scaladsl.model.headers.`Set-Cookie` +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.Materializer +import akka.stream.scaladsl.Source +import akka.util.ByteString +import play.api.libs.ws.{ DefaultWSCookie, StandaloneWSResponse, WSCookie } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +private[akkahttp] object StandaloneAkkaHttpWSResponse { + def apply(resp: HttpResponse)(implicit sys: ActorSystem, mat: Materializer) = new StandaloneAkkaHttpWSResponse(resp) +} + +final class StandaloneAkkaHttpWSResponse private (val response: HttpResponse)(implicit val sys: ActorSystem, val mat: Materializer) extends StandaloneWSResponse { + + // FIXME make configurable + final val UnmarshalTimeout = 1.second + + private lazy val strictResponse = response.toStrict(UnmarshalTimeout)(sys.dispatcher, mat) + + /** + * Return the current headers for this response. + */ + override def headers: Map[String, Seq[String]] = + response.headers + .groupBy(_.name) + .mapValues(_.map(_.value())) + + /** + * Get the underlying response object. + */ + override def underlying[T]: T = response.asInstanceOf[T] + + /** + * The response status code. + */ + override def status: Int = response.status.intValue() + + /** + * The response status message. + */ + override def statusText: String = response.status.reason + + /** + * Get all the cookies. + */ + override def cookies: Seq[WSCookie] = response.headers.collect { + case `Set-Cookie`(cookie) => DefaultWSCookie(cookie.name, cookie.value) + } + + /** + * Get only one cookie, using the cookie name. + */ + override def cookie(name: String): Option[WSCookie] = cookies.find(_.name == name) + + /** + * The response body decoded as String, using a simple algorithm to guess the encoding. + * + * This decodes the body to a string representation based on the following algorithm: + * + * 1. Look for a "charset" parameter on the Content-Type. If it exists, set `charset` to its value and go to step 3. + * 2. If the Content-Type is of type "text", set charset to "ISO-8859-1"; else set `charset` to "UTF-8". + * 3. Decode the raw bytes of the body using `charset`. + * + * Note that this does not take into account any special cases for specific content types. For example, for + * application/json, we do not support encoding autodetection and will trust the charset parameter if provided.. + * + * @return the response body parsed as a String using the above algorithm. + */ + override def body: String = + bodyAsBytes.utf8String + + /** + * @return The response body as ByteString. + */ + override def bodyAsBytes: ByteString = { + import sys.dispatcher + import akka.http.scaladsl.unmarshalling.PredefinedFromEntityUnmarshallers.byteStringUnmarshaller + Await.result(strictResponse.flatMap(Unmarshal(_).to[ByteString]), UnmarshalTimeout) + } + + /** + * @return the response as a source of bytes + */ + override def bodyAsSource: Source[ByteString, _] = response.entity.dataBytes +} diff --git a/play-ws-standalone/src/main/scala/play/api/libs/ws/StandaloneWSRequest.scala b/play-ws-standalone/src/main/scala/play/api/libs/ws/StandaloneWSRequest.scala index 5f083c89..b5f00a65 100644 --- a/play-ws-standalone/src/main/scala/play/api/libs/ws/StandaloneWSRequest.scala +++ b/play-ws-standalone/src/main/scala/play/api/libs/ws/StandaloneWSRequest.scala @@ -161,7 +161,7 @@ trait StandaloneWSRequest { } /** - * Returns this request with the given query string parameters, discarding the existing ones. + * Returns this request with the given cookies, discarding the existing ones. * * @param cookies the cookies to be used */ diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 9a6ce48a..76ba16f1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -57,6 +57,8 @@ object Dependencies { val standaloneAhcWSDependencies = scalaJava8Compat ++ cachecontrol ++ slf4jApi ++ reactiveStreams ++ testDependencies + val standaloneAkkaHttpWSDependencies = akkaHttp + val standaloneAhcWSJsonDependencies = playJson ++ testDependencies val standaloneAhcWSXMLDependencies = scalaXml ++ testDependencies