Skip to content

Commit

Permalink
AutoCommit polling logic (monix#94)
Browse files Browse the repository at this point in the history
  • Loading branch information
voidconductor committed Sep 11, 2019
1 parent 7884fd9 commit a287fc3
Showing 1 changed file with 6 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,16 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] (
s.executeAsync { () =>
val ackFuture =
try consumer.synchronized {
val assignment = consumer.assignment()
if (cancelable.isCanceled) Stop
else {
consumer.resume(assignment)
val next = blocking(consumer.poll(pollTimeoutMillis))
consumer.pause(assignment)
if (shouldCommitBefore) consumerCommit(consumer)
// Feeding the observer happens on the Subscriber's scheduler
// if any asynchronous boundaries happen
isAcked.set(false)
Observer.feed(out, next.asScala)(out.scheduler)
}
} catch {
Expand All @@ -83,6 +87,7 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] (

ackFuture.syncOnComplete {
case Success(ack) =>
isAcked.set(true)
// The `streamError` flag protects against contract violations
// (i.e. onSuccess/onError should happen only once).
// Not really required, but we don't want to depend on the
Expand All @@ -106,6 +111,7 @@ final class KafkaConsumerObservableAutoCommit[K, V] private[kafka] (
}

case Failure(ex) =>
isAcked.set(true)
asyncCb.onError(ex)
}
}
Expand Down

0 comments on commit a287fc3

Please sign in to comment.