Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set own ExecutionContext in Twitter future #860

Closed
politrons opened this issue Jun 24, 2020 · 15 comments
Closed

Set own ExecutionContext in Twitter future #860

politrons opened this issue Jun 24, 2020 · 15 comments

Comments

@politrons
Copy link

Hi I´m reading the documentation regarding the Futures and Context where to execute them https://twitter.github.io/finagle/guide/Contexts.html

But I cannot find a way to pass to finagle to use my own ExecutionContext as you can do with Scala future.
There's any way to do that?.

Regards.

@hamdiallam
Copy link
Contributor

hamdiallam commented Jun 24, 2020

Hey, I believe what you're looking for is a FuturePool, https://twitter.github.io/util/docs/com/twitter/util/FuturePool$.html

You can create a FuturePool backed by a supplied ExecutorService. The work in the apply will be scheduled by the Executor along with the continuations on the returned future.

@politrons
Copy link
Author

How is this FuturePool used once I created?. Finagle Service does not apply a function param for this FuturePool. shall I defined as implicit like Scala Future allow and that's it?. Sorry I´m little bit lost here.

@hamdiallam
Copy link
Contributor

No problem, I understand the confusion.

Some clarity around what you are trying to accomplish would be helpful. In terms of Finagle Service, is your intention to use your own Scheduler for finagle threads? Or are you trying to simply trying to perform some computation asynchronously using a provided Executor?

By default, all of Finagle's futures are scheduled using the FinagleScheduler. This can be configured or entirely swapped if this is what you're looking to configure (i do not recommend). Scheduler.setUnsafe

If you're simply looking to do some computation into another thread pool that's backed by your own Executor, this is where FuturePools come in. For example

val pool = FuturePool(Executors.newFixedThreadPool(2))
val interruptibleThreadPool = FuturePool.interruptible(Executors.defaultThreadPool())

val myResult: Future[SomeTypeGoesHere] = pool {
 someReallyExpensiveComputation() // will be scheduled on the executor backing `pool`
}

FuturePool already comes with a default executor if you're just looked to just offload some computations

val fooBarResult: Future[fooBarReturnType] = FuturePool.unboundedPool {
  fooBar() // scheduled using the default executor
}

However, the global Scheduler is what is responsible for scheduling the submitted Runnables to corresponding threads (FinagleScheduler or one that you set).
The com.twitter.finagle.exp.scheduler flags provides some options for schedulers provided by Finagle

@politrons
Copy link
Author

politrons commented Jun 25, 2020

How would you use a FuturePool with a Finagle Service created?.

What I want is to make the request of the Finagle Service running in a particular ExecutionContext

FuturePool(Executors.newFixedThreadPool(100)).Future().flatMap(_ => service(request) this is terrible 😅

I made several experiments with and without Twitter Future

with Twitter Future

val promise = Promise[Unit]()

          FuturePool(Executors.newFixedThreadPool(100)){
            TwitterFuture {
              Thread.sleep(500)
              promise.success(Unit)
            }
          }

I can only reach 41 TPS

with Scala which use a Execution context of 100

val promise = Promise[Unit]()

          ScalaFuture{
              Thread.sleep(500)
              promise.success(Unit)
            }

I can have 61 TPS.

As you can see the difference of performance is huge, and is because the TwitterFuture which emulate a Finagle Service(request) which is a TwitterFuture seems like it's working in default ExecutionContext same Threads as core of machine. Which in our use case is not a good default config since we do blocking tasks

Any suggestion how to force Service(request) to run in a specific ExecutionContext ?

@hamdiallam
Copy link
Contributor

I see. Have you looked using the Offloading functionality in Finagle, https://twitter.github.io/finagle/guide/ThreadingModel.html#offloading?

The threads used by Finagle are IO threads. Blocking these threads will impact performance. Using the offload filter to run your application code in a dedicated thread pool would mitigate this.

For example, 14 worker (application) threads and 14 IO threads.
-com.twitter.finagle.offload.numWorkers=14 -com.twitter.finagle.netty4.numWorkers=10

I believe this would give you the behavior you're looking for if I'm not mistaken

@politrons
Copy link
Author

I´m afriad I cannot use offloading in our mandatory version 18.11.0 😒

@hamdiallam
Copy link
Contributor

hamdiallam commented Jun 25, 2020

Ah, I didn't realize you also posted #861

The offload filter can still be applied manually to your client/server stacks. Did you get a chance to see my latest comment there?

@politrons
Copy link
Author

politrons commented Jun 25, 2020 via email

@hamdiallam
Copy link
Contributor

hamdiallam commented Jun 25, 2020

yep!

It would look something like this

val offloadWorkerPool = FuturePool(Executors.newCachedThreadPool(14) // application worker pool
val offloadClientFilter = new OffloadFilter.Client(offloadWorkerPool)
val rawHttpClient = Http.client.newService("twitter.com:80")
val client = offloadClientFilter.andThen(rawHttpClient)

client(Request("/")).map { resp =>
  // this code will not be running on an IO Finagle thread
  // and instead one of the offload threads
  handleResp()
}

Be sure to share the offloadWorkerPool so that the same worker pool is used across your various clients. A similar process can be used for the offloading your server code.

The number of IO threads by default is the number of cores on the machine. If you'd like to change that, this flag should still be available to you
-com.twitter.finagle.netty4.numWorkers=<num io threads>

@politrons
Copy link
Author

Yep I just create my own Filter and I add it the Service

private val pool: ExecutorServiceFuturePool = FuturePool(executorService)

def withCustomExecutionContext: () => Filter[Request, Response, Request, Response] = () =>
  (request: Request, service: Service[Request, Response]) => {
    val response = service(request)
    val shifted = Promise.interrupts[Response](response)
    response.respond { tryResponse =>
      pool(shifted.update(tryResponse))
    }
    shifted
  }

filters.withCustomExecutionContext())
    .andThen(getService(requestInfo))

Unfortunately no improvement so far. I will continue investigating.
Thanks a lot for all the support

@hamdiallam
Copy link
Contributor

hm interesting. I would also recommend profiling your service/code to find blocking threads, making sure they aren't finagle threads

@politrons
Copy link
Author

politrons commented Jun 26, 2020

I´ve been playing with visualVM and I can see the 100 threads of the FuturePool that I created, but they are park, and I can only see the 24 threads of finagle/netty4 running.

Checking the API of future

respond(k: (Try[A]) ⇒ Unit): Future[A]
When the computation completes, invoke the given callback function.

It that's means that it will be executed in the Finagle main thread until receive response?, that would not fix then our problem

  def withCustomExecutionContext: () => Filter[Request, Response, Request, Response] = () =>
    (request: Request, service: Service[Request, Response]) => {
      pool(() => {
        "hello world"
      })
      val response = service(request)
      val shifted = Promise.interrupts[Response](response)
      response.respond { tryResponse =>  //**Too late the whole request/response happens in the Main Finagle thread**
        pool(shifted.update(tryResponse))
      }
      shifted
    }

I dont think the interrupt and shifted is working as we expect. 😔 I will try to do a very simple example to see if that filter works out of my whole project.

@politrons
Copy link
Author

politrons commented Jun 27, 2020

Hi again I made a very simple example, and watching threads with VisualVM I cannot being able to make the Future to being interrupted and passed the execution into one Thread of the FuturePool . You can always see in the print the Thread name main

object TestFuturePool extends App {

  private val pool: ExecutorServiceFuturePool = FuturePool(Executors.newFixedThreadPool(100))

  while (true) {

    val future: Future[String] = Future {
      Thread.sleep(500)
      println(Thread.currentThread().getName)
      "Hello world!"
    }.interruptible()

    val shifted: Promise[String] = Promise.interrupts[String](future)

    future.respond(tryResponse => {
      pool(shifted.update(tryResponse))
    })

    Thread.sleep(200)

  }

}

@politrons
Copy link
Author

politrons commented Jun 27, 2020

I cannot make work this code to make the Finagle service run in FuturePool threads, always is running in Netty-finagle-X threads

  private val executorService = Executors.newFixedThreadPool(100)
  private val pool: ExecutorServiceFuturePool = FuturePool(executorService)

  private def shiftThread[T](response: Future[T]): Future[T] = {
    val shifted = Promise.interrupts[T](response)
    response.respond { t =>
      log.info(null, s"FINAGLE RESPONSE $t IN THREAD ${Thread.currentThread().getName}")
      pool(shifted.update(t))
    }
  }

But the only thing that I achieve is to move the response from the request in the FuturePool what I would like to do is make the request in the FuturePool already

Here as you can see the request/response is done in the finagle/netty thread pool

FINAGLE RESPONSE Return(Response("HTTP/1.1 Status(200)")) IN THREAD finagle/netty4-1-10

@hamdiallam
Copy link
Contributor

Closing in favor of #862

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants