Skip to content

Commit

Permalink
fix the build
Browse files Browse the repository at this point in the history
  • Loading branch information
Andriy Onyshchuk committed Oct 18, 2023
1 parent 1c7e961 commit 83ebf74
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 31 deletions.
2 changes: 1 addition & 1 deletion project/ScalaVer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object ScalaVer {

val values: Seq[ScalaVer] = Set(_211, _212, _213, _300).toSeq

val default: ScalaVer = _212
val default: ScalaVer = _300

def fromEnv: Option[ScalaVer] = sys.env.get("SCALA_VER") flatMap fromString

Expand Down
14 changes: 9 additions & 5 deletions src/main/scala/com/github/andyglow/websocket/Websocket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,19 @@ trait Websocket {
*/
private[websocket] class WebsocketImpl(ch: Channel) extends Websocket {

override def ![T: MessageAdapter](msg: T): Unit =
ch writeAndFlush { MessageAdapter[T] format msg }
override def ![T: MessageAdapter](msg: T): Unit = {
ch.writeAndFlush(MessageAdapter[T].format(msg))
()
}

override def close()(implicit ec: ExecutionContext): Future[Unit] = {
ch writeAndFlush new CloseWebSocketFrame
ch.writeAndFlush(new CloseWebSocketFrame)
val f = NettyFuture(ch.closeFuture())
f map { _ => () }
f.map { _ => () }
}

override def ping()(implicit ec: ExecutionContext): Unit =
override def ping()(implicit ec: ExecutionContext): Unit = {
ch.writeAndFlush(new PingWebSocketFrame())
()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class WebsocketClient[T: MessageAdapter] private (
new websocketx.WebSocketFrameAggregator(Int.MaxValue),
nettyHandler
)

()
}
}

Expand All @@ -114,7 +116,10 @@ class WebsocketClient[T: MessageAdapter] private (

/** Runs though shutdown process synchronously
*/
def shutdownSync(): Unit = shutdown().syncUninterruptibly()
def shutdownSync(): Unit = {
shutdown().syncUninterruptibly()
()
}

/** Executes shutdown returning Future that is going to be resolved once shutdown process is completed or if error has
* happened.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package com.github.andyglow.websocket

import io.netty.buffer.ByteBufHolder
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPromise
import io.netty.channel.SimpleChannelInboundHandler
import io.netty.handler.codec.http.FullHttpResponse
import io.netty.handler.codec.http.websocketx._
import io.netty.util.CharsetUtil
import scala.annotation.nowarn
import scala.concurrent.stm._
import scala.util.control.NonFatal

Expand Down Expand Up @@ -61,15 +59,18 @@ private[websocket] class WebsocketNettyHandler[T](

override def channelActive(ctx: ChannelHandlerContext): Unit = {
handshaker.handshake(ctx.channel())
()
}

override def channelRead0(ctx: ChannelHandlerContext, msg: ByteBufHolder): Unit = {
val ch = ctx.channel()
def controls: PartialFunction[ByteBufHolder, ChannelFuture] = {
def controls: PartialFunction[ByteBufHolder, Unit] = {
case _: CloseWebSocketFrame =>
try { handler.onClose(()) }
catch { case NonFatal(ex) => handler.reportFailure(ex) }
ch.close()
()

case msg: FullHttpResponse =>
if (!handshaker.isHandshakeComplete) {
try {
Expand All @@ -81,6 +82,7 @@ private[websocket] class WebsocketNettyHandler[T](
case ex: WebSocketHandshakeException =>
atomic { implicit txn => handshakeFuture().setFailure(ex) }
}
()
} else {
throw new IllegalStateException(
s"Unexpected FullHttpResponse (status=${msg.status}, content=${msg.content().toString(CharsetUtil.UTF_8)})"
Expand All @@ -92,13 +94,14 @@ private[websocket] class WebsocketNettyHandler[T](
}

@Override
@nowarn
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
ctx.close()
atomic { implicit txn =>
val f = handshakeFuture()
if (!f.isDone) f.setFailure(cause)
else handler.reportFailure(cause)
}

()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import scala.concurrent.Promise
object NettyFuture {

def apply[T](f: NFuture[T]): Future[T] = {
val p = Promise[T]()
val l = new GenericFutureListener[NFuture[T]]() {
val promise = Promise[T]()
val futureListener = new GenericFutureListener[NFuture[T]]() {
override def operationComplete(future: NFuture[T]): Unit = {
if (future.isSuccess) p.success(future.getNow) else p.failure(future.cause())
if (future.isSuccess) promise.success(future.getNow) else promise.failure(future.cause())
()
}
}
f addListener l
p.future
f.addListener(futureListener)
promise.future
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package com.github.andyglow.websocket.testserver

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.UpgradeToWebSocket
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.Materializer

import scala.concurrent.Future

trait CrossScalaVersionAkkaApi {

def launchServer(
@inline def launchServer(
interface: String,
port: Int,
handler: HttpRequest => HttpResponse)(implicit system: ActorSystem, mat: Materializer): Future[Http.ServerBinding] = {

Http().bindAndHandleSync(handler, interface, port)
}

@inline def websocketAttribution(x: HttpRequest): Option[UpgradeToWebSocket] = x.header[UpgradeToWebSocket]
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package com.github.andyglow.websocket.testserver

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.ws.WebSocketUpgrade
import akka.http.scaladsl.model.{AttributeKeys, HttpRequest, HttpResponse}
import akka.stream.Materializer

import scala.concurrent.Future

trait CrossScalaVersionAkkaApi {

def launchServer(
@inline def launchServer(
interface: String,
port: Int,
handler: HttpRequest => HttpResponse)(implicit system: ActorSystem, mat: Materializer): Future[Http.ServerBinding] = {

Http().newServerAt(interface, port).bindSync(handler)
}

@inline def websocketAttribution(x: HttpRequest): Option[WebSocketUpgrade] = x.attribute(AttributeKeys.webSocketUpgrade)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package com.github.andyglow.websocket.testserver

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.ws.WebSocketUpgrade
import akka.http.scaladsl.model.{AttributeKeys, HttpRequest, HttpResponse}
import akka.stream.Materializer

import scala.concurrent.Future
Expand All @@ -16,4 +17,6 @@ trait CrossScalaVersionAkkaApi {

Http().newServerAt(interface, port).bindSync(handler)
}

@inline def websocketAttribution(x: HttpRequest): Option[WebSocketUpgrade] = x.attribute(AttributeKeys.webSocketUpgrade)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package com.github.andyglow.websocket.testserver

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.ws.WebSocketUpgrade
import akka.http.scaladsl.model.{AttributeKeys, HttpRequest, HttpResponse}
import akka.stream.Materializer

import scala.concurrent.Future

trait CrossScalaVersionAkkaApi {

def launchServer(
@inline def launchServer(
interface: String,
port: Int,
handler: HttpRequest => HttpResponse)(implicit system: ActorSystem, mat: Materializer): Future[Http.ServerBinding] = {

Http().newServerAt(interface, port).bindSync(handler)
}

@inline def websocketAttribution(x: HttpRequest): Option[WebSocketUpgrade] = x.attribute(AttributeKeys.webSocketUpgrade)
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.github.andyglow.websocket

import io.netty.buffer.{ByteBuf, ByteBufHolder, Unpooled}
import io.netty.buffer.{ByteBufHolder, Unpooled}
import io.netty.channel.{Channel, ChannelFuture}
import io.netty.handler.codec.http.websocketx.{BinaryWebSocketFrame, CloseWebSocketFrame, PingWebSocketFrame, TextWebSocketFrame}
import org.scalatest.funsuite.AnyFunSuite
import org.mockito.Mockito._
import org.scalactic.source.Position
import org.scalatest.funsuite.AnyFunSuite

import java.nio.{ByteBuffer, CharBuffer}
import java.util.concurrent.{ExecutorService, Executors}
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext

class WebsocketSpec extends AnyFunSuite {
Expand Down Expand Up @@ -41,7 +41,7 @@ class WebsocketSpec extends AnyFunSuite {
)
}

private def examine(wsRun: Websocket => Unit, verifyRun: Channel => Unit)(implicit pos: Position): Unit = {
private def examine(wsRun: Websocket => Any, verifyRun: Channel => Any)(implicit pos: Position): Unit = {
val ch = mock(classOf[Channel])
val closeFut = mock(classOf[ChannelFuture])
when(ch.closeFuture()).thenReturn(closeFut)
Expand All @@ -50,6 +50,7 @@ class WebsocketSpec extends AnyFunSuite {
wsRun(ws)
try {
verifyRun(ch)
()
} catch {
case th: Throwable => fail(th)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package com.github.andyglow.websocket.testserver

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage, UpgradeToWebSocket, WebSocketUpgrade}
import akka.http.scaladsl.model.{AttributeKeys, HttpRequest, HttpResponse}
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream._
import akka.stream.scaladsl.{Flow, Sink}
import akka.stream.scaladsl.Flow


object TestServer extends CrossScalaVersionAkkaApi {
Expand All @@ -25,7 +24,7 @@ object TestServer extends CrossScalaVersionAkkaApi {

val requestHandler: HttpRequest => HttpResponse = {
(req: HttpRequest) =>
req.attribute(AttributeKeys.webSocketUpgrade) match {
websocketAttribution(req) match {
case Some(upgrade) => upgrade handleMessages service
case None => HttpResponse(400, entity = "Not a valid websocket request!")
}
Expand Down

0 comments on commit 83ebf74

Please sign in to comment.