From 553cbd2131114f0a6bd9a494bf6bd3ff9b9d1a5d Mon Sep 17 00:00:00 2001 From: Alexey Rykhalskiy Date: Tue, 10 Dec 2024 17:52:54 +0200 Subject: [PATCH] -- kafka and avro --- ...nsumeStreamAvroWithoutSchemaRegistry.scala | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 ce3/src/main/scala/fss101/d13kafka/avro/Fs2ConsumeStreamAvroWithoutSchemaRegistry.scala diff --git a/ce3/src/main/scala/fss101/d13kafka/avro/Fs2ConsumeStreamAvroWithoutSchemaRegistry.scala b/ce3/src/main/scala/fss101/d13kafka/avro/Fs2ConsumeStreamAvroWithoutSchemaRegistry.scala new file mode 100644 index 00000000..272ae90f --- /dev/null +++ b/ce3/src/main/scala/fss101/d13kafka/avro/Fs2ConsumeStreamAvroWithoutSchemaRegistry.scala @@ -0,0 +1,45 @@ +package fss101.d13kafka.avro + +import _root_.vulcan.Codec +import cats.effect.IO +import cats.effect.IOApp +import fs2.kafka._ +import fss101.d13kafka.KafkaConfiguration +import fss101.d13kafka.vulkan.Car +import java.util + +object Fs2ConsumeStreamAvroWithoutSchemaRegistry extends IOApp.Simple with KafkaConfiguration { + + /** this deserializer will strip 5 magic bytes + * from the Kafka payload + * and decode the content as a plain avro message + */ + implicit val carDeserializer: ValueDeserializer[IO, Car] = + GenericDeserializer.instance[IO, Car] { + (topic, headers, bytes) => + IO.delay { + val payload = util.Arrays.copyOf(bytes, 5) + Codec[Car].schema + .flatMap(schema => Codec.fromBinary[Car](payload, schema)) + .fold(e => IO.raiseError[Car](new RuntimeException(e.toString())), IO(_)) + }.flatten + } + + val consumerSettings = ConsumerSettings[IO, String, Car] + .withAutoOffsetReset(AutoOffsetReset.Earliest) + .withBootstrapServers(kafkaIp) + .withGroupId(consumerGroupId) + + val consumerSubscribed = + KafkaConsumer + .stream(consumerSettings) + .evalTap(_.subscribeTo(topicAvro)) + .flatMap(_.stream) + + override def run: IO[Unit] = + consumerSubscribed + .evalTap(x => IO.blocking(println(x))) + .compile + .drain + +}