diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala index 480994af..70fe937c 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/Consumer.scala @@ -289,6 +289,7 @@ object Consumer { rebalanceListener.onPartitionsAssigned(consumer, assigned) ) .getOrThrowFiberFailure() + .run() } } diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala index d5aaa3b5..7c81308e 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/EventLoop.scala @@ -222,8 +222,10 @@ object EventLoop { } yield delayedRebalanceEffect } - override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] = - partitionsAssigned.succeed(partitions) + override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])( + implicit trace: Trace + ): UIO[DelayedRebalanceEffect] = + partitionsAssigned.succeed(partitions).as(DelayedRebalanceEffect.unit) } } diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RebalanceListener.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RebalanceListener.scala index e08433ca..00bfd8cf 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RebalanceListener.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RebalanceListener.scala @@ -5,7 +5,7 @@ import zio.{Tag, Trace, UIO, URIO, ZEnvironment, ZIO, ZLayer} trait RebalanceListener[-R] { self => def onPartitionsRevoked(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[R, DelayedRebalanceEffect] - def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[R, Any] + def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[R, DelayedRebalanceEffect] def *>[R1](other: RebalanceListener[R1]) = new RebalanceListener[R with R1] { override def onPartitionsRevoked(consumer: Consumer, partitions: Set[TopicPartition])( @@ -16,7 +16,9 @@ trait RebalanceListener[-R] { self => ef2 <- other.onPartitionsRevoked(consumer, partitions) } yield ef1 *> ef2 - override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[R with R1, Any] = + override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])( + implicit trace: Trace + ): URIO[R with R1, DelayedRebalanceEffect] = self.onPartitionsAssigned(consumer, partitions) *> other.onPartitionsAssigned(consumer, partitions) } @@ -25,7 +27,9 @@ trait RebalanceListener[-R] { self => implicit trace: Trace ): URIO[Any, DelayedRebalanceEffect] = self.onPartitionsRevoked(consumer, partitions).provide(ZLayer.succeed(r)) - override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[Any, Any] = + override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])( + implicit trace: Trace + ): URIO[Any, DelayedRebalanceEffect] = self.onPartitionsAssigned(consumer, partitions).provide(ZLayer.succeed(r)) } @@ -34,7 +38,9 @@ trait RebalanceListener[-R] { self => implicit trace: Trace ): URIO[Any, DelayedRebalanceEffect] = self.onPartitionsRevoked(consumer, partitions).provideEnvironment(r) - override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[Any, Any] = + override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])( + implicit trace: Trace + ): URIO[Any, DelayedRebalanceEffect] = self.onPartitionsAssigned(consumer, partitions).provideEnvironment(r) } } @@ -89,8 +95,10 @@ object RebalanceListener { implicit trace: Trace ): URIO[R, DelayedRebalanceEffect] = onRevoked(consumer, partitions).as(DelayedRebalanceEffect.unit) - override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[R, Any] = - onAssigned(consumer, partitions) + override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])( + implicit trace: Trace + ): URIO[R, DelayedRebalanceEffect] = + onAssigned(consumer, partitions).as(DelayedRebalanceEffect.unit) } def apply( @@ -102,8 +110,9 @@ object RebalanceListener { implicit trace: Trace ): UIO[DelayedRebalanceEffect] = onRevoked(partitions).as(DelayedRebalanceEffect.unit) - override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] = onAssigned( - partitions - ) + override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])( + implicit trace: Trace + ): UIO[DelayedRebalanceEffect] = + onAssigned(partitions).as(DelayedRebalanceEffect.unit) } } diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala index 19c05ee1..9ea37db3 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/RecordConsumer.scala @@ -168,12 +168,12 @@ object RecordConsumer { override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])( implicit trace: Trace - ): URIO[R1, Any] = + ): URIO[R1, DelayedRebalanceEffect] = for { allAssigned <- assigned.updateAndGet(_ => partitions) _ <- consumerSubscriptionRef.set(subscription) _ <- promise.succeed(allAssigned) - } yield () + } yield DelayedRebalanceEffect.unit } _ <- subscribe[R1](subscription, rebalanceListener)(consumer) diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala index d6f122a8..8a260a9a 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/ReportingConsumer.scala @@ -50,7 +50,7 @@ case class ReportingConsumer(clientId: ClientId, group: Group, internal: Consume } .map(_._2)).provideEnvironment(r) - override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] = + override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[DelayedRebalanceEffect] = (report(PartitionsAssigned(clientId, group, partitions, config.consumerAttributes)) *> rebalanceListener.onPartitionsAssigned(consumer, partitions)).provideEnvironment(r) } diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchConsumer.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchConsumer.scala index 053439a6..3d92e85a 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchConsumer.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchConsumer.scala @@ -91,12 +91,12 @@ object BatchConsumer { override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])( implicit trace: Trace - ): URIO[R1, Any] = + ): URIO[R1, DelayedRebalanceEffect] = for { allAssigned <- assigned.updateAndGet(_ => partitions) _ <- consumerSubscriptionRef.set(subscription) _ <- promise.succeed(allAssigned) - } yield () + } yield DelayedRebalanceEffect.unit } _ <- subscribe[R1](subscription, rebalanceListener)(consumer) resubscribeTimeout = config.resubscribeTimeout @@ -157,8 +157,10 @@ object BatchConsumer { ): URIO[Any, DelayedRebalanceEffect] = assignments.update(_ -- partitions).as(DelayedRebalanceEffect.unit) - override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[Any, Any] = - assignments.update(_ ++ partitions) + override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])( + implicit trace: Trace + ): URIO[Any, DelayedRebalanceEffect] = + assignments.update(_ ++ partitions).as(DelayedRebalanceEffect.unit) } } } diff --git a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchEventLoop.scala b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchEventLoop.scala index 02c3564c..b166fbf4 100644 --- a/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchEventLoop.scala +++ b/core/src/main/scala/com/wixpress/dst/greyhound/core/consumer/batched/BatchEventLoop.scala @@ -243,8 +243,8 @@ object BatchEventLoop { state.partitionsRevoked(partitions).as(DelayedRebalanceEffect.unit) } - override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] = - partitionsAssigned.succeed(()) + override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[DelayedRebalanceEffect] = + partitionsAssigned.succeed(()).as(DelayedRebalanceEffect.unit) } } diff --git a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/RebalanceListenerTest.scala b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/RebalanceListenerTest.scala index d1ad0e44..3e028df3 100644 --- a/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/RebalanceListenerTest.scala +++ b/core/src/test/scala/com/wixpress/dst/greyhound/core/consumer/RebalanceListenerTest.scala @@ -24,8 +24,8 @@ class RebalanceListenerTest extends JUnitRunnableSpec { log(s"$id.revoke $partitions").as(DelayedRebalanceEffect(unsafeLog(s"$id.revoke.tle $partitions"))) override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])( implicit trace: Trace - ): URIO[Any, Any] = - log(s"$id.assigned $partitions") + ): URIO[Any, DelayedRebalanceEffect] = + log(s"$id.assigned $partitions").as(DelayedRebalanceEffect(unsafeLog(s"$id.assigned.tle $partitions"))) } l1l2 = listener("l1") *> listener("l2") partitions = Set(TopicPartition("topic", 0))