Skip to content

Commit

Permalink
[greyhound] change onPartitionsAssigned result to DelayedRebalanceEff…
Browse files Browse the repository at this point in the history
…ect (#36206)

GitOrigin-RevId: 43388b3fc6c16782373261553029c86e5111286d
  • Loading branch information
ben-wattelman authored and wix-oss committed Oct 5, 2023
1 parent afe453c commit 728af6e
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ object Consumer {
rebalanceListener.onPartitionsAssigned(consumer, assigned)
)
.getOrThrowFiberFailure()
.run()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,10 @@ object EventLoop {
} yield delayedRebalanceEffect
}

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] =
partitionsAssigned.succeed(partitions)
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(
implicit trace: Trace
): UIO[DelayedRebalanceEffect] =
partitionsAssigned.succeed(partitions).as(DelayedRebalanceEffect.unit)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import zio.{Tag, Trace, UIO, URIO, ZEnvironment, ZIO, ZLayer}

trait RebalanceListener[-R] { self =>
def onPartitionsRevoked(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[R, DelayedRebalanceEffect]
def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[R, Any]
def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[R, DelayedRebalanceEffect]

def *>[R1](other: RebalanceListener[R1]) = new RebalanceListener[R with R1] {
override def onPartitionsRevoked(consumer: Consumer, partitions: Set[TopicPartition])(
Expand All @@ -16,7 +16,9 @@ trait RebalanceListener[-R] { self =>
ef2 <- other.onPartitionsRevoked(consumer, partitions)
} yield ef1 *> ef2

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[R with R1, Any] =
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(
implicit trace: Trace
): URIO[R with R1, DelayedRebalanceEffect] =
self.onPartitionsAssigned(consumer, partitions) *> other.onPartitionsAssigned(consumer, partitions)
}

Expand All @@ -25,7 +27,9 @@ trait RebalanceListener[-R] { self =>
implicit trace: Trace
): URIO[Any, DelayedRebalanceEffect] =
self.onPartitionsRevoked(consumer, partitions).provide(ZLayer.succeed(r))
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[Any, Any] =
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(
implicit trace: Trace
): URIO[Any, DelayedRebalanceEffect] =
self.onPartitionsAssigned(consumer, partitions).provide(ZLayer.succeed(r))
}

Expand All @@ -34,7 +38,9 @@ trait RebalanceListener[-R] { self =>
implicit trace: Trace
): URIO[Any, DelayedRebalanceEffect] =
self.onPartitionsRevoked(consumer, partitions).provideEnvironment(r)
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[Any, Any] =
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(
implicit trace: Trace
): URIO[Any, DelayedRebalanceEffect] =
self.onPartitionsAssigned(consumer, partitions).provideEnvironment(r)
}
}
Expand Down Expand Up @@ -89,8 +95,10 @@ object RebalanceListener {
implicit trace: Trace
): URIO[R, DelayedRebalanceEffect] =
onRevoked(consumer, partitions).as(DelayedRebalanceEffect.unit)
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[R, Any] =
onAssigned(consumer, partitions)
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(
implicit trace: Trace
): URIO[R, DelayedRebalanceEffect] =
onAssigned(consumer, partitions).as(DelayedRebalanceEffect.unit)
}

def apply(
Expand All @@ -102,8 +110,9 @@ object RebalanceListener {
implicit trace: Trace
): UIO[DelayedRebalanceEffect] =
onRevoked(partitions).as(DelayedRebalanceEffect.unit)
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] = onAssigned(
partitions
)
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(
implicit trace: Trace
): UIO[DelayedRebalanceEffect] =
onAssigned(partitions).as(DelayedRebalanceEffect.unit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ object RecordConsumer {

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(
implicit trace: Trace
): URIO[R1, Any] =
): URIO[R1, DelayedRebalanceEffect] =
for {
allAssigned <- assigned.updateAndGet(_ => partitions)
_ <- consumerSubscriptionRef.set(subscription)
_ <- promise.succeed(allAssigned)
} yield ()
} yield DelayedRebalanceEffect.unit
}

_ <- subscribe[R1](subscription, rebalanceListener)(consumer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ case class ReportingConsumer(clientId: ClientId, group: Group, internal: Consume
}
.map(_._2)).provideEnvironment(r)

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] =
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[DelayedRebalanceEffect] =
(report(PartitionsAssigned(clientId, group, partitions, config.consumerAttributes)) *>
rebalanceListener.onPartitionsAssigned(consumer, partitions)).provideEnvironment(r)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ object BatchConsumer {

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(
implicit trace: Trace
): URIO[R1, Any] =
): URIO[R1, DelayedRebalanceEffect] =
for {
allAssigned <- assigned.updateAndGet(_ => partitions)
_ <- consumerSubscriptionRef.set(subscription)
_ <- promise.succeed(allAssigned)
} yield ()
} yield DelayedRebalanceEffect.unit
}
_ <- subscribe[R1](subscription, rebalanceListener)(consumer)
resubscribeTimeout = config.resubscribeTimeout
Expand Down Expand Up @@ -157,8 +157,10 @@ object BatchConsumer {
): URIO[Any, DelayedRebalanceEffect] =
assignments.update(_ -- partitions).as(DelayedRebalanceEffect.unit)

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): URIO[Any, Any] =
assignments.update(_ ++ partitions)
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(
implicit trace: Trace
): URIO[Any, DelayedRebalanceEffect] =
assignments.update(_ ++ partitions).as(DelayedRebalanceEffect.unit)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ object BatchEventLoop {
state.partitionsRevoked(partitions).as(DelayedRebalanceEffect.unit)
}

override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[Any] =
partitionsAssigned.succeed(())
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(implicit trace: Trace): UIO[DelayedRebalanceEffect] =
partitionsAssigned.succeed(()).as(DelayedRebalanceEffect.unit)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class RebalanceListenerTest extends JUnitRunnableSpec {
log(s"$id.revoke $partitions").as(DelayedRebalanceEffect(unsafeLog(s"$id.revoke.tle $partitions")))
override def onPartitionsAssigned(consumer: Consumer, partitions: Set[TopicPartition])(
implicit trace: Trace
): URIO[Any, Any] =
log(s"$id.assigned $partitions")
): URIO[Any, DelayedRebalanceEffect] =
log(s"$id.assigned $partitions").as(DelayedRebalanceEffect(unsafeLog(s"$id.assigned.tle $partitions")))
}
l1l2 = listener("l1") *> listener("l2")
partitions = Set(TopicPartition("topic", 0))
Expand Down

0 comments on commit 728af6e

Please sign in to comment.