Skip to content

Commit

Permalink
fix: silent option for request/reply
Browse files Browse the repository at this point in the history
  • Loading branch information
red-bashmak authored and jigarkhwar committed May 9, 2024
1 parent 18ba78c commit f130728
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 18 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ project/plugins/project/

/.idea/
/.bsp/
.DS_Store
.DS_Store
results/
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class KafkaRequestReplyAction[K: ClassTag, V: ClassTag](
logMessage(s"Record sent user=${session.userId} key=${new String(msg.key)} topic=${rm.topic()}", msg)
}
val id = components.kafkaProtocol.messageMatcher.requestMatch(msg)

components.trackersPool
.tracker(msg.inputTopic, msg.outputTopic, components.kafkaProtocol.messageMatcher, None)
.track(
Expand All @@ -98,6 +99,7 @@ class KafkaRequestReplyAction[K: ClassTag, V: ClassTag](
session,
next,
requestNameString,
attributes.silent.getOrElse(false)
)
},
e => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class KafkaMessageTracker(actor: ActorRef) {
session: Session,
next: Action,
requestName: String,
silentRequest: Boolean
): Unit =
actor ! MessagePublished(
matchId,
Expand All @@ -25,5 +26,6 @@ class KafkaMessageTracker(actor: ActorRef) {
session,
next,
requestName,
silentRequest
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ object KafkaMessageTrackerActor {
session: Session,
next: Action,
requestName: String,
silentRequest: Boolean
)

case class MessageConsumed(
Expand All @@ -36,15 +37,15 @@ object KafkaMessageTrackerActor {
message: KafkaProtocolMessage,
)

case object TimeoutScan
private case object TimeoutScan

private def makeKeyForSentMessages(m: Array[Byte]): String =
Option(m).map(java.util.Base64.getEncoder.encodeToString).getOrElse("")
}

class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends Actor with Timers with LazyLogging {
import KafkaMessageTrackerActor._
def triggerPeriodicTimeoutScan(
private def triggerPeriodicTimeoutScan(
periodicTimeoutScanTriggered: Boolean,
sentMessages: mutable.HashMap[String, MessagePublished],
timedOutMessages: mutable.ArrayBuffer[MessagePublished],
Expand All @@ -70,18 +71,21 @@ class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends A
requestName: String,
responseCode: Option[String],
message: Option[String],
silentRequest: Boolean
): Unit = {
statsEngine.logResponse(
session.scenario,
session.groups,
requestName,
sent,
received,
status,
responseCode,
message,
)
next ! session.logGroupRequestTimings(sent, received)
if(! silentRequest) {
statsEngine.logResponse(
session.scenario,
session.groups,
requestName,
sent,
received,
status,
responseCode,
message,
)
next ! session.logGroupRequestTimings(sent, received)
} else next ! session
}

/** Processes a matched message
Expand All @@ -94,6 +98,7 @@ class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends A
message: KafkaProtocolMessage,
next: Action,
requestName: String,
silentRequest: Boolean,
): Unit = {
val (newSession, error) = Check.check(message, session, checks)
error match {
Expand All @@ -107,9 +112,10 @@ class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends A
requestName,
message.responseCode,
Some(errorMessage),
silentRequest
)
case _ =>
executeNext(newSession, sent, received, OK, next, requestName, None, None)
executeNext(newSession, sent, received, OK, next, requestName, None, None, silentRequest)
}
}

Expand All @@ -130,8 +136,8 @@ class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends A
case MessageConsumed(replyId, received, message) =>
// if key is missing, message was already acked and is a dup, or request timeout
val key = makeKeyForSentMessages(replyId)
sentMessages.remove(key).foreach { case MessagePublished(_, sent, _, checks, session, next, requestName) =>
processMessage(session, sent, received, checks, message, next, requestName)
sentMessages.remove(key).foreach { case MessagePublished(_, sent, _, checks, session, next, requestName, silentRequest) =>
processMessage(session, sent, received, checks, message, next, requestName, silentRequest)
}

case TimeoutScan =>
Expand All @@ -142,7 +148,7 @@ class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends A
timedOutMessages += messagePublished
}
}
for (MessagePublished(matchId, sent, receivedTimeout, _, session, next, requestName) <- timedOutMessages) {
for (MessagePublished(matchId, sent, receivedTimeout, _, session, next, requestName, silentRequest) <- timedOutMessages) {
sentMessages.remove(makeKeyForSentMessages(matchId))
executeNext(
session.markAsFailed,
Expand All @@ -153,6 +159,7 @@ class KafkaMessageTrackerActor(statsEngine: StatsEngine, clock: Clock) extends A
requestName,
None,
Some(s"Reply timeout after $receivedTimeout ms"),
silentRequest
)
}
timedOutMessages.clear()
Expand Down

0 comments on commit f130728

Please sign in to comment.