From f130728ced0eb7d15f40db027585b24ff0dec950 Mon Sep 17 00:00:00 2001 From: red-bashmak <47944216+red-bashmak@users.noreply.github.com> Date: Thu, 9 May 2024 23:22:27 +0300 Subject: [PATCH] fix: silent option for request/reply --- .gitignore | 3 +- .../actions/KafkaRequestReplyAction.scala | 2 + .../kafka/client/KafkaMessageTracker.scala | 2 + .../client/KafkaMessageTrackerActor.scala | 41 +++++++++++-------- 4 files changed, 30 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 47a08e8..3eb86d0 100644 --- a/.gitignore +++ b/.gitignore @@ -49,4 +49,5 @@ project/plugins/project/ /.idea/ /.bsp/ -.DS_Store \ No newline at end of file +.DS_Store +results/ \ No newline at end of file diff --git a/src/main/scala/org/galaxio/gatling/kafka/actions/KafkaRequestReplyAction.scala b/src/main/scala/org/galaxio/gatling/kafka/actions/KafkaRequestReplyAction.scala index 7eb51be..bb6d172 100644 --- a/src/main/scala/org/galaxio/gatling/kafka/actions/KafkaRequestReplyAction.scala +++ b/src/main/scala/org/galaxio/gatling/kafka/actions/KafkaRequestReplyAction.scala @@ -88,6 +88,7 @@ class KafkaRequestReplyAction[K: ClassTag, V: ClassTag]( logMessage(s"Record sent user=${session.userId} key=${new String(msg.key)} topic=${rm.topic()}", msg) } val id = components.kafkaProtocol.messageMatcher.requestMatch(msg) + components.trackersPool .tracker(msg.inputTopic, msg.outputTopic, components.kafkaProtocol.messageMatcher, None) .track( @@ -98,6 +99,7 @@ class KafkaRequestReplyAction[K: ClassTag, V: ClassTag]( session, next, requestNameString, + attributes.silent.getOrElse(false) ) }, e => { diff --git a/src/main/scala/org/galaxio/gatling/kafka/client/KafkaMessageTracker.scala b/src/main/scala/org/galaxio/gatling/kafka/client/KafkaMessageTracker.scala index 2ec0558..a8d04fb 100644 --- a/src/main/scala/org/galaxio/gatling/kafka/client/KafkaMessageTracker.scala +++ b/src/main/scala/org/galaxio/gatling/kafka/client/KafkaMessageTracker.scala @@ -16,6 +16,7 @@ class KafkaMessageTracker(actor: ActorRef) { session: Session, next: Action, requestName: String, + silentRequest: Boolean ): Unit = actor ! MessagePublished( matchId, @@ -25,5 +26,6 @@ class KafkaMessageTracker(actor: ActorRef) { session, next, requestName, + silentRequest ) } diff --git a/src/main/scala/org/galaxio/gatling/kafka/client/KafkaMessageTrackerActor.scala b/src/main/scala/org/galaxio/gatling/kafka/client/KafkaMessageTrackerActor.scala index c8c25e8..9edc1c6 100644 --- a/src/main/scala/org/galaxio/gatling/kafka/client/KafkaMessageTrackerActor.scala +++ b/src/main/scala/org/galaxio/gatling/kafka/client/KafkaMessageTrackerActor.scala @@ -28,6 +28,7 @@ object KafkaMessageTrackerActor { session: Session, next: Action, requestName: String, + silentRequest: Boolean ) case class MessageConsumed( @@ -36,7 +37,7 @@ object KafkaMessageTrackerActor { message: KafkaProtocolMessage, ) - case object TimeoutScan + private case object TimeoutScan private def makeKeyForSentMessages(m: Array[Byte]): String = Option(m).map(java.util.Base64.getEncoder.encodeToString).getOrElse("") @@ -44,7 +45,7 @@ object KafkaMessageTrackerActor { class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends Actor with Timers with LazyLogging { import KafkaMessageTrackerActor._ - def triggerPeriodicTimeoutScan( + private def triggerPeriodicTimeoutScan( periodicTimeoutScanTriggered: Boolean, sentMessages: mutable.HashMap[String, MessagePublished], timedOutMessages: mutable.ArrayBuffer[MessagePublished], @@ -70,18 +71,21 @@ class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends A requestName: String, responseCode: Option[String], message: Option[String], + silentRequest: Boolean ): Unit = { - statsEngine.logResponse( - session.scenario, - session.groups, - requestName, - sent, - received, - status, - responseCode, - message, - ) - next ! session.logGroupRequestTimings(sent, received) + if(! silentRequest) { + statsEngine.logResponse( + session.scenario, + session.groups, + requestName, + sent, + received, + status, + responseCode, + message, + ) + next ! session.logGroupRequestTimings(sent, received) + } else next ! session } /** Processes a matched message @@ -94,6 +98,7 @@ class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends A message: KafkaProtocolMessage, next: Action, requestName: String, + silentRequest: Boolean, ): Unit = { val (newSession, error) = Check.check(message, session, checks) error match { @@ -107,9 +112,10 @@ class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends A requestName, message.responseCode, Some(errorMessage), + silentRequest ) case _ => - executeNext(newSession, sent, received, OK, next, requestName, None, None) + executeNext(newSession, sent, received, OK, next, requestName, None, None, silentRequest) } } @@ -130,8 +136,8 @@ class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends A case MessageConsumed(replyId, received, message) => // if key is missing, message was already acked and is a dup, or request timeout val key = makeKeyForSentMessages(replyId) - sentMessages.remove(key).foreach { case MessagePublished(_, sent, _, checks, session, next, requestName) => - processMessage(session, sent, received, checks, message, next, requestName) + sentMessages.remove(key).foreach { case MessagePublished(_, sent, _, checks, session, next, requestName, silentRequest) => + processMessage(session, sent, received, checks, message, next, requestName, silentRequest) } case TimeoutScan => @@ -142,7 +148,7 @@ class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends A timedOutMessages += messagePublished } } - for (MessagePublished(matchId, sent, receivedTimeout, _, session, next, requestName) <- timedOutMessages) { + for (MessagePublished(matchId, sent, receivedTimeout, _, session, next, requestName, silentRequest) <- timedOutMessages) { sentMessages.remove(makeKeyForSentMessages(matchId)) executeNext( session.markAsFailed, @@ -153,6 +159,7 @@ class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends A requestName, None, Some(s"Reply timeout after $receivedTimeout ms"), + silentRequest ) } timedOutMessages.clear()