Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge 0.23.13 -> 0.24 #126

Merged
merged 16 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,36 +55,39 @@ private[jetty] object JettyLifeCycle {
* internally, e.g. due to some internal error occurring.
*/
private[this] def stopLifeCycle[F[_]](lifeCycle: LifeCycle)(implicit F: Async[F]): F[Unit] =
F.async_[Unit] { cb =>
lifeCycle.addEventListener(
new LifeCycle.Listener {
override def lifeCycleStopped(a: LifeCycle): Unit =
cb(Right(()))
override def lifeCycleFailure(a: LifeCycle, error: Throwable): Unit =
cb(Left(error))
}
)
F.async[Unit] { cb =>
F.delay {
val listener =
new LifeCycle.Listener {
override def lifeCycleStopped(a: LifeCycle): Unit =
cb(Right(()))
override def lifeCycleFailure(a: LifeCycle, error: Throwable): Unit =
cb(Left(error))
}
lifeCycle.addEventListener(listener)

// In the general case, it is not sufficient to merely call stop(). For
// example, the concrete implementation of stop() for the canonical
// Jetty Server instance will shortcut to a `return` call taking no
// action if the server is "stopping". This method _can't_ return until
// we are _actually stopped_, so we have to check three different states
// here.
// In the general case, it is not sufficient to merely call stop(). For
// example, the concrete implementation of stop() for the canonical
// Jetty Server instance will shortcut to a `return` call taking no
// action if the server is "stopping". This method _can't_ return until
// we are _actually stopped_, so we have to check three different states
// here.

if (lifeCycle.isStopped) {
// If the first case, we are already stopped, so our listener won't be
// called and we just return.
cb(Right(()))
} else if (lifeCycle.isStopping()) {
// If it is stopping, we need to wait for our listener to get invoked.
()
} else {
// If it is neither stopped nor stopping, we need to request a stop
// and then wait for the event. It is imperative that we add the
// listener beforehand here. Otherwise we have some very annoying race
// conditions.
lifeCycle.stop()
if (lifeCycle.isStopped) {
// If the first case, we are already stopped, so our listener won't be
// called and we just return.
cb(Right(()))
} else if (lifeCycle.isStopping()) {
// If it is stopping, we need to wait for our listener to get invoked.
()
} else {
// If it is neither stopped nor stopping, we need to request a stop
// and then wait for the event. It is imperative that we add the
// listener beforehand here. Otherwise we have some very annoying race
// conditions.
lifeCycle.stop()
}
Some(F.delay(lifeCycle.removeEventListener(listener)).void)
}
}

Expand All @@ -95,51 +98,53 @@ private[jetty] object JettyLifeCycle {
* (or starting) this will fail.
*/
private[this] def startLifeCycle[F[_]](lifeCycle: LifeCycle)(implicit F: Async[F]): F[Unit] =
F.async_[Unit] { cb =>
lifeCycle.addEventListener(
new LifeCycle.Listener {
F.async[Unit] { cb =>
F.delay {
val listener = new LifeCycle.Listener {
override def lifeCycleStarted(a: LifeCycle): Unit =
cb(Right(()))
override def lifeCycleFailure(a: LifeCycle, error: Throwable): Unit =
cb(Left(error))
}
)
lifeCycle.addEventListener(listener)

// Sanity check to ensure the LifeCycle component is not already
// started. A couple of notes here.
//
// - There is _always_ going to be a small chance of a race condition
// here in the final branch where we invoke `lifeCycle.start()` _if_
// something else has a reference to the `LifeCycle`
// value. Thankfully, unlike the stopLifeCycle function, this is
// completely in the control of the caller. As long as the caller
// doesn't leak the reference (or call .start() themselves) nothing
// internally to Jetty should ever invoke .start().
// - Jetty components allow for reuse in many cases, unless the
// .destroy() method is invoked (and the particular type implements
// `Destroyable`, it's not part of `LifeCycle`). Jetty uses this for
// "soft" resets of the `LifeCycle` component. Thus it is possible
// that this `LifeCycle` component has been started before, though I
// don't recommend this and we don't (at this time) do that in the
// http4s codebase.
if (lifeCycle.isStarted) {
cb(
Left(
new IllegalStateException(
"Attempting to start Jetty LifeCycle component, but it is already started."
// Sanity check to ensure the LifeCycle component is not already
// started. A couple of notes here.
//
// - There is _always_ going to be a small chance of a race condition
// here in the final branch where we invoke `lifeCycle.start()` _if_
// something else has a reference to the `LifeCycle`
// value. Thankfully, unlike the stopLifeCycle function, this is
// completely in the control of the caller. As long as the caller
// doesn't leak the reference (or call .start() themselves) nothing
// internally to Jetty should ever invoke .start().
// - Jetty components allow for reuse in many cases, unless the
// .destroy() method is invoked (and the particular type implements
// `Destroyable`, it's not part of `LifeCycle`). Jetty uses this for
// "soft" resets of the `LifeCycle` component. Thus it is possible
// that this `LifeCycle` component has been started before, though I
// don't recommend this and we don't (at this time) do that in the
// http4s codebase.
if (lifeCycle.isStarted) {
cb(
Left(
new IllegalStateException(
"Attempting to start Jetty LifeCycle component, but it is already started."
)
)
)
)
} else if (lifeCycle.isStarting) {
cb(
Left(
new IllegalStateException(
"Attempting to start Jetty LifeCycle component, but it is already starting."
} else if (lifeCycle.isStarting) {
cb(
Left(
new IllegalStateException(
"Attempting to start Jetty LifeCycle component, but it is already starting."
)
)
)
)
} else {
lifeCycle.start()
} else {
lifeCycle.start()
}
Some(F.delay(lifeCycle.removeEventListener(listener)).void)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ import cats.effect.Temporal
import munit.CatsEffectSuite
import org.eclipse.jetty.client.HttpClient
import org.eclipse.jetty.client.api.Request
import org.eclipse.jetty.client.api.Response
import org.eclipse.jetty.client.api.Result
import org.eclipse.jetty.client.util.BufferingResponseListener
import org.eclipse.jetty.client.util.StringRequestContent
import org.http4s.dsl.io._
import org.http4s.server.Server
Expand Down Expand Up @@ -62,7 +59,7 @@ class JettyServerSuite extends CatsEffectSuite {
Ok(req.body)

case GET -> Root / "never" =>
IO.never
IO.async(_ => IO.pure(Some(IO.unit)))

case GET -> Root / "slow" =>
Temporal[IO].sleep(50.millis) *> Ok("slow")
Expand All @@ -74,16 +71,7 @@ class JettyServerSuite extends CatsEffectSuite {
private val jettyServer = ResourceFixture[Server](serverR)

private def fetchBody(req: Request): IO[String] =
IO.async_ { cb =>
val listener = new BufferingResponseListener() {
override def onFailure(resp: Response, t: Throwable) =
cb(Left(t))

override def onComplete(result: Result) =
cb(Right(getContentAsString))
}
req.send(listener)
}
IO.interruptible(req.send().getContentAsString())

private def get(server: Server, path: String): IO[String] = {
val req = client().newRequest(s"http://127.0.0.1:${server.address.getPort}$path")
Expand Down
Loading