Skip to content

Commit

Permalink
Merge pull request #632 from CleverCloud/addMissingMessageIdImplAndBu…
Browse files Browse the repository at this point in the history
…mpClient

add missing messageId impl, add batchIndex, pulsar: v3.3.1
  • Loading branch information
judu authored Aug 27, 2024
2 parents 16d4f91 + 49ef2f3 commit e256484
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 15 deletions.
28 changes: 14 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,34 @@ def publishVersion = if (isRelease) releaseVersion else if (isGithubActions) "2.

val org = "com.clever-cloud.pulsar4s"
val AkkaStreamVersion = "2.6.20" // compatible with Akka 2.5.x and 2.6.x
val CatsEffectVersion = "3.5.3"
val CatsEffectVersion = "3.5.4"
val CirceVersion = "0.14.6"
val CommonsIoVersion = "2.4"
val ExtsVersion = "1.61.1"
val JacksonVersion = "2.17.2"
val Log4jVersion = "2.22.1"
val Log4jVersion = "2.23.1"
val MonixVersion = "3.4.1"
val PekkoStreamVersion = "1.0.2"
val PlayJsonVersion = "2.10.4"
val PulsarVersion = "3.2.0"
val PlayJsonVersion = "2.10.6"
val PulsarVersion = "3.3.1"
val ReactiveStreamsVersion = "1.0.2"
val FunctionalStreamsVersion = "3.9.4"
val FunctionalStreamsVersion = "3.10.2"
val Json4sVersion = "4.0.7"
// Version of Avro4s for Scala 2.X
val Avro4sVersionFor2 = "4.1.1"
val Avro4sVersionFor2 = "4.1.2"
// Version of Avro4s for Scala 3.X
val Avro4sVersionFor3 = "5.0.9"
val ScalaVersion = "3.3.1"
val ScalatestVersion = "3.2.17"
val ScalazVersion = "7.2.35"
val Slf4jVersion = "2.0.11"
val Avro4sVersionFor3 = "5.0.13"
val ScalaVersion = "3.3.3"
val ScalatestVersion = "3.2.19"
val ScalazVersion = "7.2.36"
val Slf4jVersion = "2.0.16"
val SprayJsonVersion = "1.3.6"
val ZIOVersion = "2.0.21"
val ZIOInteropCatsVersion = "23.0.03"
val ZIOVersion = "2.0.22"
val ZIOInteropCatsVersion = "23.1.0.3"

lazy val commonScalaVersionSettings = Seq(
scalaVersion := ScalaVersion,
crossScalaVersions := Seq("2.12.18", "2.13.12", ScalaVersion)
crossScalaVersions := Seq("2.12.19", "2.13.14", ScalaVersion)
)

lazy val warnUnusedImport = Seq(
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.9.8
sbt.version=1.9.9
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class CatsAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfter
val r = t.unsafeRunSync()
r.entryId shouldBe value.messageId.entryId
r.partitionIndex shouldBe value.messageId.partitionIndex
r.batchIndex shouldBe value.messageId.batchIndex
consumer.close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ sealed trait MessageId {
def ledgerId: Option[Long]
def entryId: Option[Long]
def partitionIndex: Option[Int]
def batchIndex: Option[Int]
}

private case class Pulsar4sMessageIdImpl(underlying: JMessageId) extends MessageId {
Expand All @@ -35,14 +36,22 @@ private case class Pulsar4sMessageIdImpl(underlying: JMessageId) extends Message
}
override def ledgerId: Option[Long] = underlying match {
case m: MessageIdImpl => Option(m.getLedgerId)
case m: TopicMessageIdImpl => Option(m.getLedgerId)
case _ => None
}
override def entryId: Option[Long] = underlying match {
case m: MessageIdImpl => Option(m.getEntryId)
case m: TopicMessageIdImpl => Option(m.getEntryId)
case _ => None
}
override def partitionIndex: Option[Int] = underlying match {
case m: MessageIdImpl => Some(m.getPartitionIndex)
case m: TopicMessageIdImpl => Option(m.getPartitionIndex)
case _ => None
}
override def batchIndex: Option[Int] = underlying match {
case m: MessageIdImpl => Some(m.getBatchIndex)
case m: TopicMessageIdImpl => Option(m.getBatchIndex)
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class MonixAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfte
val r = Await.result(rFuture, Duration.Inf)
r.entryId shouldBe value.messageId.entryId
r.partitionIndex shouldBe value.messageId.partitionIndex
r.batchIndex shouldBe value.messageId.batchIndex
consumer.close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ScalazAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAft
val r = t.unsafePerformSync
r.entryId shouldBe value.messageId.entryId
r.partitionIndex shouldBe value.messageId.partitionIndex
r.batchIndex shouldBe value.messageId.batchIndex
consumer.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class ZioAsyncHandlerTest extends AnyFunSuite with Matchers with BeforeAndAfterA
val r = Unsafe.unsafe(implicit unsafe => zio.Runtime.default.unsafe.run(t.either.map(_.toOption.get)).getOrThrowFiberFailure())
r.entryId shouldBe value.messageId.entryId
r.partitionIndex shouldBe value.messageId.partitionIndex
r.batchIndex shouldBe value.messageId.batchIndex
consumer.close()
}

Expand Down

0 comments on commit e256484

Please sign in to comment.