diff --git a/project/ScalaVer.scala b/project/ScalaVer.scala index 70dfe23..6d118dd 100644 --- a/project/ScalaVer.scala +++ b/project/ScalaVer.scala @@ -15,7 +15,7 @@ object ScalaVer { val values: Seq[ScalaVer] = Set(_211, _212, _213, _300).toSeq - val default: ScalaVer = _212 + val default: ScalaVer = _300 def fromEnv: Option[ScalaVer] = sys.env.get("SCALA_VER") flatMap fromString diff --git a/src/main/scala/com/github/andyglow/websocket/Websocket.scala b/src/main/scala/com/github/andyglow/websocket/Websocket.scala index 5343d2d..a1dce24 100644 --- a/src/main/scala/com/github/andyglow/websocket/Websocket.scala +++ b/src/main/scala/com/github/andyglow/websocket/Websocket.scala @@ -52,15 +52,19 @@ trait Websocket { */ private[websocket] class WebsocketImpl(ch: Channel) extends Websocket { - override def ![T: MessageAdapter](msg: T): Unit = - ch writeAndFlush { MessageAdapter[T] format msg } + override def ![T: MessageAdapter](msg: T): Unit = { + ch.writeAndFlush(MessageAdapter[T].format(msg)) + () + } override def close()(implicit ec: ExecutionContext): Future[Unit] = { - ch writeAndFlush new CloseWebSocketFrame + ch.writeAndFlush(new CloseWebSocketFrame) val f = NettyFuture(ch.closeFuture()) - f map { _ => () } + f.map { _ => () } } - override def ping()(implicit ec: ExecutionContext): Unit = + override def ping()(implicit ec: ExecutionContext): Unit = { ch.writeAndFlush(new PingWebSocketFrame()) + () + } } diff --git a/src/main/scala/com/github/andyglow/websocket/WebsocketClient.scala b/src/main/scala/com/github/andyglow/websocket/WebsocketClient.scala index eee64c9..b2fa12f 100644 --- a/src/main/scala/com/github/andyglow/websocket/WebsocketClient.scala +++ b/src/main/scala/com/github/andyglow/websocket/WebsocketClient.scala @@ -90,6 +90,8 @@ class WebsocketClient[T: MessageAdapter] private ( new websocketx.WebSocketFrameAggregator(Int.MaxValue), nettyHandler ) + + () } } @@ -114,7 +116,10 @@ class WebsocketClient[T: MessageAdapter] private ( /** Runs though shutdown process synchronously */ - def shutdownSync(): Unit = shutdown().syncUninterruptibly() + def shutdownSync(): Unit = { + shutdown().syncUninterruptibly() + () + } /** Executes shutdown returning Future that is going to be resolved once shutdown process is completed or if error has * happened. diff --git a/src/main/scala/com/github/andyglow/websocket/WebsocketNettyHandler.scala b/src/main/scala/com/github/andyglow/websocket/WebsocketNettyHandler.scala index 0634b3a..783456f 100644 --- a/src/main/scala/com/github/andyglow/websocket/WebsocketNettyHandler.scala +++ b/src/main/scala/com/github/andyglow/websocket/WebsocketNettyHandler.scala @@ -1,14 +1,12 @@ package com.github.andyglow.websocket import io.netty.buffer.ByteBufHolder -import io.netty.channel.ChannelFuture import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelPromise import io.netty.channel.SimpleChannelInboundHandler import io.netty.handler.codec.http.FullHttpResponse import io.netty.handler.codec.http.websocketx._ import io.netty.util.CharsetUtil -import scala.annotation.nowarn import scala.concurrent.stm._ import scala.util.control.NonFatal @@ -61,15 +59,18 @@ private[websocket] class WebsocketNettyHandler[T]( override def channelActive(ctx: ChannelHandlerContext): Unit = { handshaker.handshake(ctx.channel()) + () } override def channelRead0(ctx: ChannelHandlerContext, msg: ByteBufHolder): Unit = { val ch = ctx.channel() - def controls: PartialFunction[ByteBufHolder, ChannelFuture] = { + def controls: PartialFunction[ByteBufHolder, Unit] = { case _: CloseWebSocketFrame => try { handler.onClose(()) } catch { case NonFatal(ex) => handler.reportFailure(ex) } ch.close() + () + case msg: FullHttpResponse => if (!handshaker.isHandshakeComplete) { try { @@ -81,6 +82,7 @@ private[websocket] class WebsocketNettyHandler[T]( case ex: WebSocketHandshakeException => atomic { implicit txn => handshakeFuture().setFailure(ex) } } + () } else { throw new IllegalStateException( s"Unexpected FullHttpResponse (status=${msg.status}, content=${msg.content().toString(CharsetUtil.UTF_8)})" @@ -92,7 +94,6 @@ private[websocket] class WebsocketNettyHandler[T]( } @Override - @nowarn override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { ctx.close() atomic { implicit txn => @@ -100,5 +101,7 @@ private[websocket] class WebsocketNettyHandler[T]( if (!f.isDone) f.setFailure(cause) else handler.reportFailure(cause) } + + () } } diff --git a/src/main/scala/com/github/andyglow/websocket/util/NettyFuture.scala b/src/main/scala/com/github/andyglow/websocket/util/NettyFuture.scala index a6f14c9..ec3a4a9 100644 --- a/src/main/scala/com/github/andyglow/websocket/util/NettyFuture.scala +++ b/src/main/scala/com/github/andyglow/websocket/util/NettyFuture.scala @@ -10,13 +10,14 @@ import scala.concurrent.Promise object NettyFuture { def apply[T](f: NFuture[T]): Future[T] = { - val p = Promise[T]() - val l = new GenericFutureListener[NFuture[T]]() { + val promise = Promise[T]() + val futureListener = new GenericFutureListener[NFuture[T]]() { override def operationComplete(future: NFuture[T]): Unit = { - if (future.isSuccess) p.success(future.getNow) else p.failure(future.cause()) + if (future.isSuccess) promise.success(future.getNow) else promise.failure(future.cause()) + () } } - f addListener l - p.future + f.addListener(futureListener) + promise.future } } diff --git a/src/test/scala-2.11/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala b/src/test/scala-2.11/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala index bdb7423..f865252 100644 --- a/src/test/scala-2.11/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala +++ b/src/test/scala-2.11/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala @@ -2,6 +2,7 @@ package com.github.andyglow.websocket.testserver import akka.actor.ActorSystem import akka.http.scaladsl.Http +import akka.http.scaladsl.model.ws.UpgradeToWebSocket import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.stream.Materializer @@ -9,11 +10,13 @@ import scala.concurrent.Future trait CrossScalaVersionAkkaApi { - def launchServer( + @inline def launchServer( interface: String, port: Int, handler: HttpRequest => HttpResponse)(implicit system: ActorSystem, mat: Materializer): Future[Http.ServerBinding] = { Http().bindAndHandleSync(handler, interface, port) } + + @inline def websocketAttribution(x: HttpRequest): Option[UpgradeToWebSocket] = x.header[UpgradeToWebSocket] } diff --git a/src/test/scala-2.12/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala b/src/test/scala-2.12/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala index 289c9bb..7f0ee17 100644 --- a/src/test/scala-2.12/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala +++ b/src/test/scala-2.12/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala @@ -2,18 +2,21 @@ package com.github.andyglow.websocket.testserver import akka.actor.ActorSystem import akka.http.scaladsl.Http -import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import akka.http.scaladsl.model.ws.WebSocketUpgrade +import akka.http.scaladsl.model.{AttributeKeys, HttpRequest, HttpResponse} import akka.stream.Materializer import scala.concurrent.Future trait CrossScalaVersionAkkaApi { - def launchServer( + @inline def launchServer( interface: String, port: Int, handler: HttpRequest => HttpResponse)(implicit system: ActorSystem, mat: Materializer): Future[Http.ServerBinding] = { Http().newServerAt(interface, port).bindSync(handler) } + + @inline def websocketAttribution(x: HttpRequest): Option[WebSocketUpgrade] = x.attribute(AttributeKeys.webSocketUpgrade) } diff --git a/src/test/scala-2.13/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala b/src/test/scala-2.13/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala index 289c9bb..2624b52 100644 --- a/src/test/scala-2.13/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala +++ b/src/test/scala-2.13/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala @@ -2,7 +2,8 @@ package com.github.andyglow.websocket.testserver import akka.actor.ActorSystem import akka.http.scaladsl.Http -import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import akka.http.scaladsl.model.ws.WebSocketUpgrade +import akka.http.scaladsl.model.{AttributeKeys, HttpRequest, HttpResponse} import akka.stream.Materializer import scala.concurrent.Future @@ -16,4 +17,6 @@ trait CrossScalaVersionAkkaApi { Http().newServerAt(interface, port).bindSync(handler) } + + @inline def websocketAttribution(x: HttpRequest): Option[WebSocketUpgrade] = x.attribute(AttributeKeys.webSocketUpgrade) } diff --git a/src/test/scala-3/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala b/src/test/scala-3/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala index 289c9bb..7f0ee17 100644 --- a/src/test/scala-3/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala +++ b/src/test/scala-3/com/github/andyglow/websocket/testserver/CrossScalaVersionAkkaApi.scala @@ -2,18 +2,21 @@ package com.github.andyglow.websocket.testserver import akka.actor.ActorSystem import akka.http.scaladsl.Http -import akka.http.scaladsl.model.{HttpRequest, HttpResponse} +import akka.http.scaladsl.model.ws.WebSocketUpgrade +import akka.http.scaladsl.model.{AttributeKeys, HttpRequest, HttpResponse} import akka.stream.Materializer import scala.concurrent.Future trait CrossScalaVersionAkkaApi { - def launchServer( + @inline def launchServer( interface: String, port: Int, handler: HttpRequest => HttpResponse)(implicit system: ActorSystem, mat: Materializer): Future[Http.ServerBinding] = { Http().newServerAt(interface, port).bindSync(handler) } + + @inline def websocketAttribution(x: HttpRequest): Option[WebSocketUpgrade] = x.attribute(AttributeKeys.webSocketUpgrade) } diff --git a/src/test/scala/com/github/andyglow/websocket/WebsocketSpec.scala b/src/test/scala/com/github/andyglow/websocket/WebsocketSpec.scala index 9e546b7..41bc250 100644 --- a/src/test/scala/com/github/andyglow/websocket/WebsocketSpec.scala +++ b/src/test/scala/com/github/andyglow/websocket/WebsocketSpec.scala @@ -1,14 +1,14 @@ package com.github.andyglow.websocket -import io.netty.buffer.{ByteBuf, ByteBufHolder, Unpooled} +import io.netty.buffer.{ByteBufHolder, Unpooled} import io.netty.channel.{Channel, ChannelFuture} import io.netty.handler.codec.http.websocketx.{BinaryWebSocketFrame, CloseWebSocketFrame, PingWebSocketFrame, TextWebSocketFrame} -import org.scalatest.funsuite.AnyFunSuite import org.mockito.Mockito._ import org.scalactic.source.Position +import org.scalatest.funsuite.AnyFunSuite import java.nio.{ByteBuffer, CharBuffer} -import java.util.concurrent.{ExecutorService, Executors} +import java.util.concurrent.Executors import scala.concurrent.ExecutionContext class WebsocketSpec extends AnyFunSuite { @@ -41,7 +41,7 @@ class WebsocketSpec extends AnyFunSuite { ) } - private def examine(wsRun: Websocket => Unit, verifyRun: Channel => Unit)(implicit pos: Position): Unit = { + private def examine(wsRun: Websocket => Any, verifyRun: Channel => Any)(implicit pos: Position): Unit = { val ch = mock(classOf[Channel]) val closeFut = mock(classOf[ChannelFuture]) when(ch.closeFuture()).thenReturn(closeFut) @@ -50,6 +50,7 @@ class WebsocketSpec extends AnyFunSuite { wsRun(ws) try { verifyRun(ch) + () } catch { case th: Throwable => fail(th) } diff --git a/src/test/scala/com/github/andyglow/websocket/testserver/TestServer.scala b/src/test/scala/com/github/andyglow/websocket/testserver/TestServer.scala index 990defc..26369ca 100644 --- a/src/test/scala/com/github/andyglow/websocket/testserver/TestServer.scala +++ b/src/test/scala/com/github/andyglow/websocket/testserver/TestServer.scala @@ -1,11 +1,10 @@ package com.github.andyglow.websocket.testserver import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage, UpgradeToWebSocket, WebSocketUpgrade} -import akka.http.scaladsl.model.{AttributeKeys, HttpRequest, HttpResponse} +import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage} +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.stream._ -import akka.stream.scaladsl.{Flow, Sink} +import akka.stream.scaladsl.Flow object TestServer extends CrossScalaVersionAkkaApi { @@ -25,7 +24,7 @@ object TestServer extends CrossScalaVersionAkkaApi { val requestHandler: HttpRequest => HttpResponse = { (req: HttpRequest) => - req.attribute(AttributeKeys.webSocketUpgrade) match { + websocketAttribution(req) match { case Some(upgrade) => upgrade handleMessages service case None => HttpResponse(400, entity = "Not a valid websocket request!") }