Skip to content

Commit

Permalink
Add NettyFuture spec (#247)
Browse files Browse the repository at this point in the history
- improve NettyFuture in when it comes to resolved Futures
- add NettyFuture Spec
  • Loading branch information
andyglow authored Oct 18, 2023
1 parent 83ebf74 commit 27d63d1
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 33 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,6 @@ lazy val root = (project in file("."))
scalaStm,
slf4jApi,
slf4jSimple,
akkaHttp(scalaV.value).cross(CrossVersion.for3Use2_13),
akkaStream(scalaV.value).cross(CrossVersion.for3Use2_13)))
akkaHttp(scalaVersion.value).cross(CrossVersion.binary),
akkaStream(scalaVersion.value).cross(CrossVersion.binary)))

18 changes: 9 additions & 9 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import sbt._
import sbt.*

object Dependencies {
lazy val Examples = config("example") extend Compile
Expand All @@ -14,10 +14,10 @@ object Dependencies {

object akkaHttp {

def apply(scalaVer: ScalaVer) = {
val v = scalaVer match {
case ScalaVer._211 => "10.1.9"
case _ => "10.2.6"
def apply(scalaVer: String): ModuleID = {
val v = ScalaVer.fromString(scalaVer) match {
case Some(ScalaVer._211) => "10.1.9"
case _ => "10.5.3"

}

Expand All @@ -26,10 +26,10 @@ object Dependencies {
}

object akkaStream {
def apply(scalaVer: ScalaVer) = {
val v = scalaVer match {
case ScalaVer._211 => "2.5.32"
case _ => "2.6.16"
def apply(scalaVer: String): ModuleID = {
val v = ScalaVer.fromString(scalaVer) match {
case Some(ScalaVer._211) => "2.5.32"
case _ => "2.8.5"
}

"com.typesafe.akka" %% "akka-stream" % v % Test
Expand Down
19 changes: 10 additions & 9 deletions project/ScalaVer.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sbt.Keys.{crossScalaVersions, scalaVersion}
import sbt.settingKey
import sbt.librarymanagement.CrossVersion
import sbt.{Def, settingKey}

sealed abstract class ScalaVer(val full: String)

Expand All @@ -15,21 +16,21 @@ object ScalaVer {

val values: Seq[ScalaVer] = Set(_211, _212, _213, _300).toSeq

val default: ScalaVer = _300
val default: ScalaVer = _211

def fromEnv: Option[ScalaVer] = sys.env.get("SCALA_VER") flatMap fromString

def fromString(full: String): Option[ScalaVer] = full match {
case x if x startsWith "2.11" => Some(_211)
case x if x startsWith "2.12" => Some(_212)
case x if x startsWith "2.13" => Some(_213)
case x if x startsWith "3.0" => Some(_300)
case _ => None
def fromString(full: String): Option[ScalaVer] = CrossVersion.partialVersion(full) match {
case Some((2, 11)) => Some(_211)
case Some((2, 12)) => Some(_212)
case Some((2, 13)) => Some(_213)
case Some((3, _)) => Some(_300)
case _ => None
}

lazy val scalaV = settingKey[ScalaVer]("Current Scala Version")

def settings = Seq(
def settings: Seq[Def.Setting[? >: String & Seq[String] & ScalaVer <: Object]] = Seq(
scalaVersion := (ScalaVer.fromEnv getOrElse ScalaVer.default).full,
crossScalaVersions := ScalaVer.values.map(_.full),
scalaV := ScalaVer.fromString(scalaVersion.value) getOrElse ScalaVer.default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@ import scala.concurrent.Promise
object NettyFuture {

def apply[T](f: NFuture[T]): Future[T] = {
val promise = Promise[T]()
val futureListener = new GenericFutureListener[NFuture[T]]() {
override def operationComplete(future: NFuture[T]): Unit = {
if (future.isSuccess) promise.success(future.getNow) else promise.failure(future.cause())
()
if (f.isDone) {
if (f.isSuccess) Future.successful(f.getNow) else Future.failed(f.cause())
} else {
val promise = Promise[T]()
val futureListener = new GenericFutureListener[NFuture[T]]() {
override def operationComplete(future: NFuture[T]): Unit = {
if (future.isSuccess) promise.success(future.getNow) else promise.failure(future.cause())
()
}
}
f.addListener(futureListener)
promise.future
}
f.addListener(futureListener)
promise.future
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import akka.http.scaladsl.model.ws.UpgradeToWebSocket
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.Materializer

import java.util.concurrent.Callable
import scala.concurrent.Future

trait CrossScalaVersionAkkaApi {
trait PlatformDependent {

@inline def launchServer(
interface: String,
Expand All @@ -20,3 +21,10 @@ trait CrossScalaVersionAkkaApi {

@inline def websocketAttribution(x: HttpRequest): Option[UpgradeToWebSocket] = x.header[UpgradeToWebSocket]
}

object PlatformDependent {

@inline def callable[T](fn: () => T): Callable[T] = new Callable[T] {
def call(): T = fn()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import akka.http.scaladsl.model.ws.WebSocketUpgrade
import akka.http.scaladsl.model.{AttributeKeys, HttpRequest, HttpResponse}
import akka.stream.Materializer

import java.util.concurrent.Callable
import scala.concurrent.Future

trait CrossScalaVersionAkkaApi {
trait PlatformDependent {

@inline def launchServer(
interface: String,
Expand All @@ -20,3 +21,8 @@ trait CrossScalaVersionAkkaApi {

@inline def websocketAttribution(x: HttpRequest): Option[WebSocketUpgrade] = x.attribute(AttributeKeys.webSocketUpgrade)
}

object PlatformDependent {

@inline def callable[T](fn: () => T): Callable[T] = () => fn()
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import akka.http.scaladsl.model.ws.WebSocketUpgrade
import akka.http.scaladsl.model.{AttributeKeys, HttpRequest, HttpResponse}
import akka.stream.Materializer

import java.util.concurrent.Callable
import scala.concurrent.Future

trait CrossScalaVersionAkkaApi {
trait PlatformDependent {

@inline def launchServer(
interface: String,
Expand All @@ -20,3 +21,8 @@ trait CrossScalaVersionAkkaApi {

@inline def websocketAttribution(x: HttpRequest): Option[WebSocketUpgrade] = x.attribute(AttributeKeys.webSocketUpgrade)
}

object PlatformDependent {

@inline def callable[T](fn: () => T): Callable[T] = () => fn()
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import akka.http.scaladsl.model.ws.WebSocketUpgrade
import akka.http.scaladsl.model.{AttributeKeys, HttpRequest, HttpResponse}
import akka.stream.Materializer

import java.util.concurrent.Callable
import scala.concurrent.Future

trait CrossScalaVersionAkkaApi {
trait PlatformDependent {

def launchServer(
@inline def launchServer(
interface: String,
port: Int,
handler: HttpRequest => HttpResponse)(implicit system: ActorSystem, mat: Materializer): Future[Http.ServerBinding] = {
Expand All @@ -20,3 +21,8 @@ trait CrossScalaVersionAkkaApi {

@inline def websocketAttribution(x: HttpRequest): Option[WebSocketUpgrade] = x.attribute(AttributeKeys.webSocketUpgrade)
}

object PlatformDependent {

@inline def callable[T](fn: () => T): Callable[T] = () => fn()
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import akka.stream._
import akka.stream.scaladsl.Flow


object TestServer extends CrossScalaVersionAkkaApi {
object TestServer extends PlatformDependent {

def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.github.andyglow.websocket.util

import io.netty.channel.nio.NioEventLoopGroup
import io.netty.util.concurrent.EventExecutorGroup
import org.scalatest.BeforeAndAfterAll
import org.scalatest.OptionValues._
import org.scalatest.TryValues._
import org.scalatest.matchers.should.Matchers._
import org.scalatest.wordspec.AnyWordSpec

import scala.concurrent.Await
import scala.concurrent.duration._
import com.github.andyglow.websocket.testserver.PlatformDependent._

class NettyFutureSpec extends AnyWordSpec with BeforeAndAfterAll {

private var executors: EventExecutorGroup = _

override def beforeAll(): Unit = {
executors = new NioEventLoopGroup()
}

override def afterAll(): Unit = {
executors.shutdownGracefully().syncUninterruptibly()
()
}

"NettyFuture" should {

"handled resolved successful futures" in {
val fut = NettyFuture(executors.submit(callable(() => "foo")).await())
fut.value.isDefined shouldBe true
fut.value.value.isSuccess shouldBe true
fut.value.value.get shouldBe "foo"
}

"handled un-resolved successful futures" in {
val fut = NettyFuture(executors.submit(callable(() => "foo")))
val v = Await.result(fut, 1.second)
v shouldBe "foo"
}

"handled resolved failed futures" in {
val fut = NettyFuture(executors.submit(callable[String](() => throw new IllegalStateException("err"))).await())
fut.value.isDefined shouldBe true
fut.value.value.isSuccess shouldBe false
fut.value.value.failure.exception shouldBe a [IllegalStateException]
fut.value.value.failure.exception.getMessage shouldBe "err"
}

"handled un-resolved failed futures" in {
val fut = NettyFuture(executors.submit(callable[String](() => throw new IllegalStateException("err"))))
val v = the [IllegalStateException] thrownBy { Await.result(fut, 1.second) }
v.getMessage shouldBe "err"
}
}
}

0 comments on commit 27d63d1

Please sign in to comment.