From 2d8f3a2fabd7b99048188f4e9750929eaf222c95 Mon Sep 17 00:00:00 2001 From: kannar Date: Tue, 27 Aug 2024 16:17:17 +0200 Subject: [PATCH 1/4] pulsar: v3.3.1 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 825e52b..88ba0c4 100644 --- a/build.sbt +++ b/build.sbt @@ -17,7 +17,7 @@ val Log4jVersion = "2.22.1" val MonixVersion = "3.4.1" val PekkoStreamVersion = "1.0.2" val PlayJsonVersion = "2.10.4" -val PulsarVersion = "3.2.0" +val PulsarVersion = "3.3.1" val ReactiveStreamsVersion = "1.0.2" val FunctionalStreamsVersion = "3.9.4" val Json4sVersion = "4.0.7" From d0bbafc271707311d163fb8ff4c4a96f33e8c8b5 Mon Sep 17 00:00:00 2001 From: kannar Date: Tue, 27 Aug 2024 16:18:03 +0200 Subject: [PATCH 2/4] add missing impl --- .../src/main/scala/com/sksamuel/pulsar4s/MessageId.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/MessageId.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/MessageId.scala index eedc26a..e01dc7c 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/MessageId.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/MessageId.scala @@ -35,14 +35,17 @@ private case class Pulsar4sMessageIdImpl(underlying: JMessageId) extends Message } override def ledgerId: Option[Long] = underlying match { case m: MessageIdImpl => Option(m.getLedgerId) + case m: TopicMessageIdImpl => Option(m.getLedgerId) case _ => None } override def entryId: Option[Long] = underlying match { case m: MessageIdImpl => Option(m.getEntryId) + case m: TopicMessageIdImpl => Option(m.getEntryId) case _ => None } override def partitionIndex: Option[Int] = underlying match { case m: MessageIdImpl => Some(m.getPartitionIndex) + case m: TopicMessageIdImpl => Option(m.getPartitionIndex) case _ => None } } From 105d2d48a9696b236a4d89a79eb8962e7c7772cd Mon Sep 17 00:00:00 2001 From: kannar Date: Tue, 27 Aug 2024 16:18:28 +0200 Subject: [PATCH 3/4] add batchindex to messageid --- .../com/sksamuel/pulsar4s/cats/CatsAsyncHandlerTest.scala | 1 + .../src/main/scala/com/sksamuel/pulsar4s/MessageId.scala | 6 ++++++ .../sksamuel/pulsar4s/monixs/MonixAsyncHandlerTest.scala | 1 + .../sksamuel/pulsar4s/scalaz/ScalazAsyncHandlerTest.scala | 1 + .../com/sksamuel/pulsar4s/zio/ZioAsyncHandlerTest.scala | 1 + 5 files changed, 10 insertions(+) diff --git a/pulsar4s-cats-effect/src/test/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandlerTest.scala b/pulsar4s-cats-effect/src/test/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandlerTest.scala index 29633b3..5aa3681 100644 --- a/pulsar4s-cats-effect/src/test/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandlerTest.scala +++ b/pulsar4s-cats-effect/src/test/scala/com/sksamuel/pulsar4s/cats/CatsAsyncHandlerTest.scala @@ -54,6 +54,7 @@ class CatsAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfter val r = t.unsafeRunSync() r.entryId shouldBe value.messageId.entryId r.partitionIndex shouldBe value.messageId.partitionIndex + r.batchIndex shouldBe value.messageId.batchIndex consumer.close() } diff --git a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/MessageId.scala b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/MessageId.scala index e01dc7c..719e20a 100644 --- a/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/MessageId.scala +++ b/pulsar4s-core/src/main/scala/com/sksamuel/pulsar4s/MessageId.scala @@ -18,6 +18,7 @@ sealed trait MessageId { def ledgerId: Option[Long] def entryId: Option[Long] def partitionIndex: Option[Int] + def batchIndex: Option[Int] } private case class Pulsar4sMessageIdImpl(underlying: JMessageId) extends MessageId { @@ -48,6 +49,11 @@ private case class Pulsar4sMessageIdImpl(underlying: JMessageId) extends Message case m: TopicMessageIdImpl => Option(m.getPartitionIndex) case _ => None } + override def batchIndex: Option[Int] = underlying match { + case m: MessageIdImpl => Some(m.getBatchIndex) + case m: TopicMessageIdImpl => Option(m.getBatchIndex) + case _ => None + } } object MessageId { diff --git a/pulsar4s-monix/src/test/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandlerTest.scala b/pulsar4s-monix/src/test/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandlerTest.scala index 583472e..54faff1 100644 --- a/pulsar4s-monix/src/test/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandlerTest.scala +++ b/pulsar4s-monix/src/test/scala/com/sksamuel/pulsar4s/monixs/MonixAsyncHandlerTest.scala @@ -56,6 +56,7 @@ class MonixAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfte val r = Await.result(rFuture, Duration.Inf) r.entryId shouldBe value.messageId.entryId r.partitionIndex shouldBe value.messageId.partitionIndex + r.batchIndex shouldBe value.messageId.batchIndex consumer.close() } diff --git a/pulsar4s-scalaz/src/test/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandlerTest.scala b/pulsar4s-scalaz/src/test/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandlerTest.scala index 6712c1e..313ae00 100644 --- a/pulsar4s-scalaz/src/test/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandlerTest.scala +++ b/pulsar4s-scalaz/src/test/scala/com/sksamuel/pulsar4s/scalaz/ScalazAsyncHandlerTest.scala @@ -47,6 +47,7 @@ class ScalazAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAft val r = t.unsafePerformSync r.entryId shouldBe value.messageId.entryId r.partitionIndex shouldBe value.messageId.partitionIndex + r.batchIndex shouldBe value.messageId.batchIndex consumer.close() } } diff --git a/pulsar4s-zio/src/test/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandlerTest.scala b/pulsar4s-zio/src/test/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandlerTest.scala index e02e370..43819d1 100644 --- a/pulsar4s-zio/src/test/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandlerTest.scala +++ b/pulsar4s-zio/src/test/scala/com/sksamuel/pulsar4s/zio/ZioAsyncHandlerTest.scala @@ -53,6 +53,7 @@ class ZioAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfterA val r = Unsafe.unsafe(implicit unsafe => zio.Runtime.default.unsafe.run(t.either.map(_.toOption.get)).getOrThrowFiberFailure()) r.entryId shouldBe value.messageId.entryId r.partitionIndex shouldBe value.messageId.partitionIndex + r.batchIndex shouldBe value.messageId.batchIndex consumer.close() } From a96ab9bc49186ca0b0fe6f7f4ad8c07831d8df95 Mon Sep 17 00:00:00 2001 From: Julien Durillon Date: Tue, 27 Aug 2024 16:51:25 +0200 Subject: [PATCH 4/4] Bump deps --- build.sbt | 26 +++++++++++++------------- project/build.properties | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/build.sbt b/build.sbt index 88ba0c4..97806a2 100644 --- a/build.sbt +++ b/build.sbt @@ -8,34 +8,34 @@ def publishVersion = if (isRelease) releaseVersion else if (isGithubActions) "2. val org = "com.clever-cloud.pulsar4s" val AkkaStreamVersion = "2.6.20" // compatible with Akka 2.5.x and 2.6.x -val CatsEffectVersion = "3.5.3" +val CatsEffectVersion = "3.5.4" val CirceVersion = "0.14.6" val CommonsIoVersion = "2.4" val ExtsVersion = "1.61.1" val JacksonVersion = "2.14.3" -val Log4jVersion = "2.22.1" +val Log4jVersion = "2.23.1" val MonixVersion = "3.4.1" val PekkoStreamVersion = "1.0.2" -val PlayJsonVersion = "2.10.4" +val PlayJsonVersion = "2.10.6" val PulsarVersion = "3.3.1" val ReactiveStreamsVersion = "1.0.2" -val FunctionalStreamsVersion = "3.9.4" +val FunctionalStreamsVersion = "3.10.2" val Json4sVersion = "4.0.7" // Version of Avro4s for Scala 2.X -val Avro4sVersionFor2 = "4.1.1" +val Avro4sVersionFor2 = "4.1.2" // Version of Avro4s for Scala 3.X -val Avro4sVersionFor3 = "5.0.9" -val ScalaVersion = "3.3.1" -val ScalatestVersion = "3.2.17" -val ScalazVersion = "7.2.35" -val Slf4jVersion = "2.0.11" +val Avro4sVersionFor3 = "5.0.13" +val ScalaVersion = "3.3.3" +val ScalatestVersion = "3.2.19" +val ScalazVersion = "7.2.36" +val Slf4jVersion = "2.0.16" val SprayJsonVersion = "1.3.6" -val ZIOVersion = "2.0.21" -val ZIOInteropCatsVersion = "23.0.03" +val ZIOVersion = "2.0.22" +val ZIOInteropCatsVersion = "23.1.0.3" lazy val commonScalaVersionSettings = Seq( scalaVersion := ScalaVersion, - crossScalaVersions := Seq("2.12.18", "2.13.12", ScalaVersion) + crossScalaVersions := Seq("2.12.19", "2.13.14", ScalaVersion) ) lazy val warnUnusedImport = Seq( diff --git a/project/build.properties b/project/build.properties index abbbce5..04267b1 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.9.8 +sbt.version=1.9.9