Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detecting Consumer Failures #20

Open
jedossa opened this issue Jan 4, 2018 · 2 comments
Open

Detecting Consumer Failures #20

jedossa opened this issue Jan 4, 2018 · 2 comments

Comments

@jedossa
Copy link

jedossa commented Jan 4, 2018

I have a case with an unpredictable delay in the processing time of messages, following Kafka consumer documentation I modified KafkaConsumerObservable.runLoop as follows:

def runLoop(consumer: KafkaConsumer[K, V]): Task[Unit] = {
      val ackTask: Task[Ack] = Task.unsafeCreate { (context, cb) =>
        implicit val s = context.scheduler
        s.executeAsync { () =>
          context.frameRef.reset()
          val ackFuture =
            try consumer.synchronized {
              if (context.connection.isCanceled) Stop
              else {
                val next = blocking(consumer.poll(pollTimeoutMillis))
                // Pasue partition
                blocking(consumer.pause(consumer.assignment()))
                Observer.feed(out, next.asScala)(out.scheduler)
              }
            } catch {
              case NonFatal(ex) =>
                Future.failed(ex)
            }

          ackFuture.syncOnComplete {
            case Success(ack) =>
              var streamErrors = true
              try consumer.synchronized {
                if (context.connection.isCanceled) {
                  streamErrors = false
                  cb.asyncOnSuccess(Stop)
                } else {
                  // Resume partition and commit offset
                  consumer.resume(consumer.assignment())
                  consumerCommit(consumer)
                  streamErrors = false
                  cb.asyncOnSuccess(ack)
                }
              } catch {
                case NonFatal(ex) =>
                  if (streamErrors) cb.asyncOnError(ex)
                  else s.reportFailure(ex)
              }

            case Failure(ex) =>
              cb.asyncOnError(ex)
          }
        }
      }

      ackTask.flatMap {
        case Stop     => Task.unit
        case Continue => runLoop(consumer)
      }
    }

Is possible to handle this use case without modify the exposed observable ?
Is worth it to create a PR for this case ?

Thanks!

@leandrob13
Copy link
Collaborator

@jedossa Can you elaborate a bit more on the justification of these changes?

@jedossa
Copy link
Author

jedossa commented Feb 8, 2018

I just want to explore an option for use cases where message processing time varies unpredictably.
In these cases -according to Kafka docs- is recommended to move message processing to another thread, continue calling poll, disable automatic commits and pause the partition.
But I am not 100% sure that this changes are the solution for this use cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants