Skip to content

Add the possibility to match producer and consumed messages using headers instead of just using the message Key. #100

Open
mmartin-paxos opened this issue Oct 14, 2022 · 6 comments

Comments

@mmartin-paxos
Copy link

mmartin-paxos commented Oct 14, 2022

With the current solution, it is only possible to match producer and consumed messages using the key from the sent message. However, in our project, we have an internal key change from produced and consumed messages. This means that with the current implementation we are not able to bind produced messages to consumed one. Therefore Gatling is always returning that the whole flow hasn't been successfully finished.

To solve this issue It can be possible to match consumed and produced messages using some customizable headers, such as correlation-id.

The test scenario would look like snipped below after adding the configurable Headers option:

  val scn: ScenarioBuilder = scenario("Test kafka")
    .feed(feeder)
    .exec(
      kafka("ReqRep")
        .requestReply
        .requestTopic("exp.inbound")
        .replyTopic("exp.outbound", Some("correlation-id"))
        .send[String, String]("test-key", "#{payload}", addHeader("correlation-id", "#{correlationId}"))

In order to make this configurable headers option, we already create a Proof of concept.
You can have a look at the last 3 commits of this Repo: https://github.com/imaculan/gatling-kafka-plugin/commits/header_extractor.

Thanks

@neoscaler
Copy link

We would also need this feature, as the keys are not the same in our event flow.

@3alster
Copy link

3alster commented Feb 21, 2023

Since #101 you can provide your own matching function (to match input and output message). Take a look on

def matchByOwnVal(message: KafkaProtocolMessage): Array[Byte] = {
// do something with the message and extract the values your are interested in
// method is called:
// - for each message which will be sent out
// - for each message which has been received
"Custom Message".getBytes // just returning something
}
val kafkaProtocolMatchByMessage: KafkaProtocol = kafka.requestReply
.producerSettings(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
),
)
.consumeSettings(
Map(
"bootstrap.servers" -> "localhost:9092",
),
)
.timeout(5.seconds)
.matchByMessage(matchByOwnVal)

you just need to implement matchByOwnVal that will extract a key from the message that will be used for matching request and reply data

@DevArosan
Copy link

@3alster Thanks for the suggestion. How does the matchByOwnVal function work? As I understand the implementation, the function is called every time after sending and receiving a message. But how can I check if the received message matches the sent message? The matchByOwnVal function only returns an array[byte]. What I need is if the received message matches a sent message it should return true, otherwise false and the test should fail.

@3alster
Copy link

3alster commented Mar 21, 2023

@DevArosan that function should return the key from the message. You should implement it in a way that it returns the same key for your input and output message.

@DevArosan
Copy link

DevArosan commented Mar 21, 2023

@3alster Thanks for the quick answer. I do not exactly understand. So my use case is, to match some property from my input message to some property of my output message. Because the Key of the message is different and cannot be used to match. How should that work? Could you give me some examples of what that should look like?

@3alster
Copy link

3alster commented Mar 22, 2023

@DevArosan then, just return the matching property of your message by matchByOwnVal. The plugin will do the rest.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants