Skip to content

Commit

Permalink
multi-tenant consumer proxy redesign - initial commit (#35244)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: e7a2532372964be70c34350951ca99bfc52f7684
  • Loading branch information
berman7 authored and wix-oss committed Oct 5, 2023
1 parent 64f7743 commit 370e44a
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ case class TopicPartition(topic: Topic, partition: Partition) {
}

object TopicPartition {
def fromKafka(topicPartition: KafkaTopicPartition): TopicPartition =
TopicPartition(topicPartition.topic, topicPartition.partition)
def apply(topicPartition: KafkaTopicPartition): TopicPartition =
TopicPartition(topicPartition.topic, topicPartition.partition)
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ trait Consumer {

def assignment(implicit trace: Trace): Task[Set[TopicPartition]]

def assign(tps: Set[TopicPartition])(implicit trace: Trace): Task[Unit] = ZIO.fail(new IllegalStateException("Not implemented"))

def config(implicit trace: Trace): ConsumerConfig

def listTopics(implicit trace: Trace): RIO[Any, Map[Topic, List[PartitionInfo]]]
Expand Down Expand Up @@ -124,7 +126,7 @@ object Consumer {
override def poll(timeout: Duration)(implicit trace: Trace): RIO[Any, Records] =
withConsumerM { c =>
rewindPositionsOnError(c) {
attemptBlocking(c.poll(time.Duration.ofMillis(timeout.toMillis)).asScala.map(ConsumerRecord(_)))
attemptBlocking(c.poll(time.Duration.ofMillis(timeout.toMillis)).asScala.map(rec => ConsumerRecord(rec, config.groupId)))
.flatMap(ZIO.foreach(_)(cfg.decryptor.decrypt))
}
}
Expand Down Expand Up @@ -208,6 +210,9 @@ object Consumer {
withConsumer(_.assignment().asScala.toSet.map(TopicPartition.apply(_: org.apache.kafka.common.TopicPartition)))
}

override def assign(tps: Set[TopicPartition])(implicit trace: Trace): Task[Unit] =
withConsumer(_.assign(kafkaPartitions(tps)))

private def allPositionsUnsafe = attemptBlocking {
consumer
.assignment()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ case class ConsumerRecord[+K, +V](
value: V,
pollTime: Long,
bytesTotal: Long,
producedTimestamp: Long
producedTimestamp: Long,
consumerGroupId: ConsumerGroupId
) {
def id: String = s"$topic:$partition:$offset"

Expand All @@ -30,7 +31,8 @@ case class ConsumerRecord[+K, +V](
value = fv(value),
pollTime = pollTime,
bytesTotal = bytesTotal,
producedTimestamp = producedTimestamp
producedTimestamp = producedTimestamp,
consumerGroupId = consumerGroupId
)

def bimapM[R, E, K2, V2](fk: K => ZIO[R, E, K2], fv: V => ZIO[R, E, V2]): ZIO[R, E, ConsumerRecord[K2, V2]] =
Expand All @@ -46,7 +48,8 @@ case class ConsumerRecord[+K, +V](
value = value2,
pollTime = pollTime,
bytesTotal = bytesTotal,
producedTimestamp = producedTimestamp
producedTimestamp = producedTimestamp,
consumerGroupId = consumerGroupId
)

def mapKey[K2](f: K => K2): ConsumerRecord[K2, V] = bimap(f, identity)
Expand All @@ -56,7 +59,7 @@ case class ConsumerRecord[+K, +V](
}

object ConsumerRecord {
def apply[K, V](record: KafkaConsumerRecord[K, V]): ConsumerRecord[K, V] =
def apply[K, V](record: KafkaConsumerRecord[K, V], consumerGroupId: ConsumerGroupId): ConsumerRecord[K, V] =
ConsumerRecord(
topic = record.topic,
partition = record.partition,
Expand All @@ -67,6 +70,7 @@ object ConsumerRecord {
pollTime = System.currentTimeMillis,
producedTimestamp = record.timestamp,
bytesTotal = record.serializedValueSize() + record.serializedKeySize() +
record.headers().toArray.map(h => h.key.length + h.value.length).sum
record.headers().toArray.map(h => h.key.length + h.value.length).sum,
consumerGroupId = consumerGroupId
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ object EventLoopTest {
val partition = 0
val offset = 0L
val record: ConsumerRecord[Chunk[Byte], Chunk[Byte]] =
ConsumerRecord(topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)
ConsumerRecord(topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L, "")
val exception = new RuntimeException("oops")

def recordsFrom(records: ConsumerRecord[Chunk[Byte], Chunk[Byte]]*): Records = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ object TestSupport {
Chunk.fromArray(payload.getBytes),
0L,
payload.getBytes.length,
0L
0L,
""
)

def records(topicCount: Int = 4, partitions: Int = 4, perPartition: Int = 3, hint: String = "") = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,12 @@ class DispatcherTest extends BaseTest[TestMetrics with TestClock] {
_ <- ZIO.foreachDiscard(0 to (highWatermark + 1)) { offset =>
submit(
dispatcher,
ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)
ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L, "")
)
}
_ <- submit(
dispatcher,
ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 6L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)
ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 6L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L, "")
) // Will be dropped
_ <- eventuallyZ(dispatcher.resumeablePartitions(Set(topicPartition)))(_.isEmpty)
_ <- ZIO.foreachDiscard(1 to 4)(_ => queue.take)
Expand Down Expand Up @@ -195,13 +195,13 @@ class DispatcherTest extends BaseTest[TestMetrics with TestClock] {
_ <- ZIO.foreachDiscard(0 to (highWatermark + 1)) { offset =>
submit(
dispatcher,
ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)
ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L, "")
)
}
overCapacitySubmitResult <-
submit(
dispatcher,
ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 6L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)
ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 6L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L, "")
) // Will be dropped
resumeablePartitionsWhenInHighWatermark <- dispatcher.resumeablePartitions(Set(topicPartition))
_ <- ZIO.foreachDiscard(1 to 4)(_ => queue.take)
Expand All @@ -212,13 +212,13 @@ class DispatcherTest extends BaseTest[TestMetrics with TestClock] {
_ <- ZIO.foreachDiscard(0 to 3) { offset =>
submit(
dispatcher,
ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)
ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, offset, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L, "")
)
}
overCapacitySubmitResult2 <-
submit(
dispatcher,
ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 16L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)
ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 16L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L, "")
) // Will be dropped
_ <- ZIO.foreachDiscard(1 to 4)(_ => queue.take)
_ <- TestClock.adjust(1.second)
Expand Down Expand Up @@ -332,7 +332,7 @@ class DispatcherTest extends BaseTest[TestMetrics with TestClock] {
val topic = "topic"
val partition = 0
val topicPartition = TopicPartition(topic, partition)
val record = ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 0L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L)
val record = ConsumerRecord[Chunk[Byte], Chunk[Byte]](topic, partition, 0L, Headers.Empty, None, Chunk.empty, 0L, 0L, 0L, "")

def getKeys(numKeys: Int) = (0 until numKeys).map(i => Some(Chunk.fromArray(s"key$i".getBytes)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class BlockingStateResolverTest extends BaseTest[TestEnvironment with GreyhoundM
resolver = BlockingStateResolver(blockingState)
_ <- blockingState.set(Map(TopicPartitionTarget(TopicPartition(topic, partition)) -> state))

shouldBlock <- resolver.resolve(ConsumerRecord(topic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L))
shouldBlock <- resolver.resolve(ConsumerRecord(topic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L, ""))
} yield shouldBlock === expectedShouldBlock
}
}
Expand All @@ -48,7 +48,7 @@ class BlockingStateResolverTest extends BaseTest[TestEnvironment with GreyhoundM
resolver = BlockingStateResolver(blockingState)
_ <- blockingState.set(Map(TopicTarget(topic) -> state))

shouldBlock <- resolver.resolve(ConsumerRecord(topic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L))
shouldBlock <- resolver.resolve(ConsumerRecord(topic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L, ""))
} yield shouldBlock === expectedShouldBlock
}
}
Expand All @@ -62,7 +62,7 @@ class BlockingStateResolverTest extends BaseTest[TestEnvironment with GreyhoundM

resolver = BlockingStateResolver(blockingState)

shouldBlock <- resolver.resolve(ConsumerRecord(missingTopic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L))
shouldBlock <- resolver.resolve(ConsumerRecord(missingTopic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L, ""))
} yield shouldBlock === true
}

Expand All @@ -77,7 +77,7 @@ class BlockingStateResolverTest extends BaseTest[TestEnvironment with GreyhoundM

resolver = BlockingStateResolver(blockingState)

record = ConsumerRecord(topic, partition, offset, headers, Some(key), value, 0L, 0L, 0L)
record = ConsumerRecord(topic, partition, offset, headers, Some(key), value, 0L, 0L, 0L, "")
shouldBlock <- resolver.resolve(record)
updatedStateMap <- blockingState.get
updatedState = updatedStateMap(TopicPartitionTarget(tpartition))
Expand All @@ -95,7 +95,7 @@ class BlockingStateResolverTest extends BaseTest[TestEnvironment with GreyhoundM

resolver = BlockingStateResolver(blockingState)

record = ConsumerRecord(topic, partition, offset, headers, Some(key), value, 0L, 0L, 0L)
record = ConsumerRecord(topic, partition, offset, headers, Some(key), value, 0L, 0L, 0L, "")
shouldBlock <- resolver.resolve(record)
updatedStateMap <- blockingState.get
updatedState = updatedStateMap(TopicPartitionTarget(tpartition))
Expand All @@ -111,7 +111,7 @@ class BlockingStateResolverTest extends BaseTest[TestEnvironment with GreyhoundM

resolver = BlockingStateResolver(blockingState)

shouldBlock <- resolver.resolve(ConsumerRecord(topic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L))
shouldBlock <- resolver.resolve(ConsumerRecord(topic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L, ""))
updatedStateMap <- blockingState.get
updatedState = updatedStateMap(TopicTarget(topic))
} yield shouldBlock === true and updatedState === InternalBlocking
Expand Down Expand Up @@ -150,7 +150,7 @@ class BlockingStateResolverTest extends BaseTest[TestEnvironment with GreyhoundM
)
)

shouldBlock <- resolver.resolve(ConsumerRecord(topic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L))
shouldBlock <- resolver.resolve(ConsumerRecord(topic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L, ""))
} yield shouldBlock === expectedShouldBlock
}
}
Expand All @@ -170,8 +170,8 @@ class BlockingStateResolverTest extends BaseTest[TestEnvironment with GreyhoundM

resolver = BlockingStateResolver(blockingState)

record = ConsumerRecord(topic, partition, offset, headers, Some(key), value, 0L, 0L, 0L)
record2 = ConsumerRecord(anotherTopic, partition, offset, headers, Some(key), value, 0L, 0L, 0L)
record = ConsumerRecord(topic, partition, offset, headers, Some(key), value, 0L, 0L, 0L, "")
record2 = ConsumerRecord(anotherTopic, partition, offset, headers, Some(key), value, 0L, 0L, 0L, "")
shouldBlockBefore <- resolver.resolve(record)
shouldBlockBefore2 <- resolver.resolve(record2)
_ <- resolver.setBlockingState(BlockErrors(topic))
Expand Down Expand Up @@ -219,7 +219,7 @@ class BlockingStateResolverTest extends BaseTest[TestEnvironment with GreyhoundM

resolver = BlockingStateResolver(blockingState)

record = ConsumerRecord(topic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L)
record = ConsumerRecord(topic, partition, offset, Headers.Empty, Some(key), value, 0L, 0L, 0L, "")
shouldBlock <- resolver.resolve(record)
updatedStateMap <- blockingState.get
updatedStateTopic = updatedStateMap(TopicTarget(topic))
Expand All @@ -228,7 +228,7 @@ class BlockingStateResolverTest extends BaseTest[TestEnvironment with GreyhoundM
}
}

final val BlockedMessageState = Blocked(ConsumerRecord("", 0, 0, Headers.Empty, None, "", 0L, 0L, 0L))
final val BlockedMessageState = Blocked(ConsumerRecord("", 0, 0, Headers.Empty, None, "", 0L, 0L, 0L, ""))
}

case class Foo(message: String)
Loading

0 comments on commit 370e44a

Please sign in to comment.