Skip to content

Commit

Permalink
feat: add silent option
Browse files Browse the repository at this point in the history
  • Loading branch information
jigarkhwar committed Apr 30, 2024
1 parent b7d75c1 commit be1a6f0
Show file tree
Hide file tree
Showing 17 changed files with 157 additions and 55 deletions.
9 changes: 5 additions & 4 deletions publish.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ ThisBuild / versionScheme := Some("semver-spec")
ThisBuild / organization := "org.galaxio"
ThisBuild / organizationName := "Galaxio Team"
ThisBuild / organizationHomepage := Some(url("https://github.com/galax-io"))
ThisBuild / description := "Plugin to support kafka performance testing in Gatling."
ThisBuild / homepage := Some(url("https://github.com/galax-io/gatling-kafka-plugin"))
ThisBuild / scmInfo := Some(

ThisBuild / homepage := Some(url("https://github.com/galax-io/gatling-kafka-plugin"))
ThisBuild / scmInfo := Some(
ScmInfo(
url("https://github.com/galax-io/gatling-kafka-plugin"),
"[email protected]:galax-io/gatling-kafka-plugin.git",
Expand All @@ -21,7 +21,8 @@ ThisBuild / developers := List(
)

// Remove all additional repository other than Maven Central from POM
ThisBuild / publishTo := {
ThisBuild / pomIncludeRepository := { _ => false }
ThisBuild / publishTo := {
val nexus = "https://s01.oss.sonatype.org/"
if (isSnapshot.value) Some("snapshots" at nexus + "content/repositories/snapshots")
else Some("releases" at nexus + "service/local/staging/deploy/maven2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,10 @@ public KafkaRequestBuilderBase(org.galaxio.gatling.kafka.request.builder.KafkaRe

public <K, V> RequestBuilder<?, ?> send(V payload, Headers headers) {
return new RequestBuilder<>(
wrapped.send(
null,
wrapped.send(null,
calculateExpression(payload),
toStaticValueExpression(headers),
Sender.noSchemaSender()
));
Sender.noSchemaSender()));
}

public ReqRepBase requestReply() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,24 @@

import io.gatling.javaapi.core.ActionBuilder;

import java.util.function.Function;

public class RequestBuilder<K, V> implements ActionBuilder {

private final org.galaxio.gatling.kafka.request.builder.RequestBuilder<K, V> wrapped;

public RequestBuilder(org.galaxio.gatling.kafka.request.builder.RequestBuilder<K,V> wrapped) {
public RequestBuilder(org.galaxio.gatling.kafka.request.builder.RequestBuilder<K, V> wrapped) {
this.wrapped = wrapped;
}

protected RequestBuilder<K, V> make(Function<
org.galaxio.gatling.kafka.request.builder.RequestBuilder<K, V>,
org.galaxio.gatling.kafka.request.builder.RequestBuilder<K, V>>
f) {
return new RequestBuilder<>(f.apply(wrapped));

}

@Override
public io.gatling.core.action.builder.ActionBuilder asScala() {
return wrapped.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.galaxio.gatling.kafka.request.builder.KafkaAttributes

class KafkaRequestAction[K, V](
val producer: KafkaProducer[K, V],
val attr: KafkaAttributes[K, V],
val attributes: KafkaAttributes[K, V],
val coreComponents: CoreComponents,
val kafkaProtocol: KafkaProtocol,
val throttled: Boolean,
Expand All @@ -27,9 +27,9 @@ class KafkaRequestAction[K, V](

override def execute(session: Session): Unit = recover(session) {

attr requestName session flatMap { requestName =>
attributes requestName session flatMap { requestName =>
val outcome =
sendRequest(requestName, producer, attr, throttled, session)
sendRequest(requestName, producer, attributes, throttled, session)

outcome.onFailure(errorMessage =>
statsEngine.reportUnbuildableRequest(session.scenario, session.groups, requestName, errorMessage),
Expand Down Expand Up @@ -58,6 +58,8 @@ class KafkaRequestAction[K, V](
.map(h => h(session).toOption.get)
.orNull

val silent: Boolean = kafkaAttributes.silent.getOrElse(false)

val record = new ProducerRecord[K, V](
kafkaProtocol.producerTopic,
null,
Expand All @@ -74,16 +76,17 @@ class KafkaRequestAction[K, V](

val requestEndDate = clock.nowMillis

statsEngine.logResponse(
session.scenario,
session.groups,
requestName,
startTimestamp = requestStartDate,
endTimestamp = requestEndDate,
if (e == null) OK else KO,
None,
if (e == null) None else Some(e.getMessage),
)
if (!silent)
statsEngine.logResponse(
session.scenario,
session.groups,
requestName,
startTimestamp = requestStartDate,
endTimestamp = requestEndDate,
if (e == null) OK else KO,
None,
if (e == null) None else Some(e.getMessage),
)

coreComponents.throttler match {
case Some(th) if throttled => th.throttle(session.scenario, () => next ! session)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.galaxio.gatling.kafka.actions

import com.softwaremill.quicklens._
import io.gatling.core.action.Action
import io.gatling.core.action.builder.ActionBuilder
import io.gatling.core.structure.ScenarioContext
Expand All @@ -9,7 +10,7 @@ import org.galaxio.gatling.kafka.request.builder.KafkaAttributes

import scala.jdk.CollectionConverters._

class KafkaRequestActionBuilder[K, V](attr: KafkaAttributes[K, V]) extends ActionBuilder {
case class KafkaRequestActionBuilder[K, V](attributes: KafkaAttributes[K, V]) extends ActionBuilder {

override def build(ctx: ScenarioContext, next: Action): Action = {

Expand All @@ -24,7 +25,7 @@ class KafkaRequestActionBuilder[K, V](attr: KafkaAttributes[K, V]) extends Actio

new KafkaRequestAction(
producer,
attr,
attributes,
coreComponents,
kafkaComponents.kafkaProtocol,
throttled,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.galaxio.gatling.kafka.actions

import com.softwaremill.quicklens._
import io.gatling.core.action.Action
import io.gatling.core.action.builder.ActionBuilder
import io.gatling.core.structure.ScenarioContext
Expand All @@ -10,8 +11,11 @@ import org.galaxio.gatling.kafka.protocol.KafkaProtocol
import org.galaxio.gatling.kafka.request.builder.Avro4sAttributes

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

case class KafkaRequestAvro4sActionBuilder[K: ClassTag, V: ClassTag](attributes: Avro4sAttributes[K, V])
extends ActionBuilder with NameGen {

class KafkaRequestAvro4sActionBuilder[K, V](attr: Avro4sAttributes[K, V]) extends ActionBuilder with NameGen {
override def build(ctx: ScenarioContext, next: Action): Action = {
import ctx._

Expand All @@ -23,7 +27,7 @@ class KafkaRequestAvro4sActionBuilder[K, V](attr: Avro4sAttributes[K, V]) extend

new KafkaAvro4sRequestAction(
producer,
attr,
attributes,
coreComponents,
kafkaComponents.kafkaProtocol,
throttled,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package org.galaxio.gatling.kafka.actions

import com.softwaremill.quicklens.ModifyPimp
import com.softwaremill.quicklens._
import io.gatling.core.action.Action
import io.gatling.core.action.builder.ActionBuilder
import io.gatling.core.structure.ScenarioContext
import io.gatling.core.util.NameGen
import org.galaxio.gatling.kafka.KafkaCheck
import org.galaxio.gatling.kafka.protocol.KafkaProtocol
import org.galaxio.gatling.kafka.request.builder.KafkaRequestReplyAttributes
import org.galaxio.gatling.kafka.request.builder.{KafkaRequestBuilder, KafkaRequestReplyAttributes}

import scala.reflect.ClassTag

case class KafkaRequestReplyActionBuilder[K: ClassTag, V: ClassTag](attributes: KafkaRequestReplyAttributes[K, V])
extends ActionBuilder {
extends ActionBuilder with NameGen {

def silent: KafkaRequestReplyActionBuilder[K, V] = this.modify(_.attributes.silent).setTo(Some(true))

def notSilent: KafkaRequestReplyActionBuilder[K, V] = this.modify(_.attributes.silent).setTo(Some(false))

def check(checks: KafkaCheck*): KafkaRequestReplyActionBuilder[K, V] =
this.modify(_.attributes.checks).using(_ ::: checks.toList)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ case class Avro4sAttributes[K, V](
format: RecordFormat[V],
fromRecord: FromRecord[V],
headers: Option[Expression[Headers]],
silent: Option[Boolean],
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ case class KafkaAttributes[K, V](
key: Option[Expression[K]],
payload: Expression[V],
headers: Option[Expression[Headers]],
silent: Option[Boolean],
)
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package org.galaxio.gatling.kafka.request.builder

import com.softwaremill.quicklens.ModifyPimp
import io.gatling.core.action.builder.ActionBuilder
import org.galaxio.gatling.kafka.actions.KafkaRequestAvro4sActionBuilder

case class KafkaAvro4sRequestBuilder[K, V](attr: Avro4sAttributes[K, V]) extends RequestBuilder[K, V] {
import scala.reflect.ClassTag

def build: ActionBuilder = new KafkaRequestAvro4sActionBuilder(attr)
case class KafkaAvro4sRequestBuilder[K: ClassTag, V: ClassTag](attributes: Avro4sAttributes[K, V])
extends RequestBuilder[K, V] {

def build: ActionBuilder = KafkaRequestAvro4sActionBuilder(attributes)

override def silent: RequestBuilder[K, V] = this.modify(_.attributes.silent).setTo(Some(true))

override def notSilent: RequestBuilder[K, V] = this.modify(_.attributes.silent).setTo(Some(false))
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package org.galaxio.gatling.kafka.request.builder

import com.softwaremill.quicklens.ModifyPimp
import io.gatling.core.action.builder.ActionBuilder
import org.galaxio.gatling.kafka.actions.KafkaRequestActionBuilder

case class KafkaRequestBuilder[K, V](attr: KafkaAttributes[K, V]) extends RequestBuilder[K, V] {
case class KafkaRequestBuilder[K, V](attributes: KafkaAttributes[K, V]) extends RequestBuilder[K, V] {

def build: ActionBuilder = new KafkaRequestActionBuilder(attr)
def silent: KafkaRequestBuilder[K, V] = this.modify(_.attributes.silent).setTo(Some(true))

def notSilent: KafkaRequestBuilder[K, V] = this.modify(_.attributes.silent).setTo(Some(false))

def build: ActionBuilder = KafkaRequestActionBuilder(attributes)

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ case class KafkaRequestBuilderBase(requestName: Expression[String]) {
sender.send(requestName, Some(key), payload, Some(headers))
}

def send[V](payload: Expression[V])(implicit sender: Sender[Nothing, V]): RequestBuilder[_, V] =
sender.send(requestName, None, payload)
def send[V](payload: Expression[V])(implicit
sender: Sender[Nothing, V],
): RequestBuilder[_, V] =
sender.send(requestName = requestName, key = None, payload = payload, headers = None)

def requestReply: ReqRepBase.type = ReqRepBase

Expand All @@ -37,22 +39,24 @@ case class KafkaRequestBuilderBase(requestName: Expression[String]) {
): KafkaRequestReplyActionBuilder[K, V] = {
KafkaRequestReplyActionBuilder[K, V](
new KafkaRequestReplyAttributes[K, V](
requestName,
inputTopic,
outputTopic,
key,
payload,
Some(headers),
implicitly[Serde[K]].serializer(),
implicitly[Serde[V]].serializer(),
List.empty,
requestName = requestName,
inputTopic = inputTopic,
outputTopic = outputTopic,
key = key,
value = payload,
headers = Some(headers),
keySerializer = implicitly[Serde[K]].serializer(),
valueSerializer = implicitly[Serde[V]].serializer(),
checks = List.empty,
silent = None,
),
)
}
}

case class RRInTopicStep(inputTopic: Expression[String]) {
def replyTopic(outputTopic: Expression[String]): RROutTopicStep = RROutTopicStep(inputTopic, outputTopic)

}
def requestTopic(rt: Expression[String]): RRInTopicStep = RRInTopicStep(rt)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ case class KafkaRequestReplyAttributes[K, V](
keySerializer: Serializer[K],
valueSerializer: Serializer[V],
checks: List[KafkaCheck],
silent: Option[Boolean],
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,28 @@ trait LowPriorSender {
new Sender[K, V] {

override def send(requestName: Expression[String], payload: Expression[V]): RequestBuilder[Nothing, V] =
KafkaRequestBuilder[Nothing, V](KafkaAttributes(requestName, None, payload, None))
KafkaRequestBuilder[Nothing, V](
KafkaAttributes(requestName = requestName, key = None, payload = payload, headers = None, silent = None),
)

override def send(
requestName: Expression[String],
key: Option[Expression[K]],
payload: Expression[V],
): RequestBuilder[K, V] =
KafkaRequestBuilder[K, V](KafkaAttributes(requestName, key, payload, None))
KafkaRequestBuilder[K, V](
KafkaAttributes(requestName = requestName, key = key, payload = payload, headers = None, silent = None),
)

override def send(
requestName: Expression[String],
key: Option[Expression[K]],
payload: Expression[V],
headers: Option[Expression[Headers]],
): RequestBuilder[K, V] =
KafkaRequestBuilder[K, V](KafkaAttributes(requestName, key, payload, headers))
KafkaRequestBuilder[K, V](
KafkaAttributes(requestName = requestName, key = key, payload = payload, headers = headers, silent = None),
)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@ import io.gatling.core.action.builder.ActionBuilder
trait RequestBuilder[K, V] {

def build: ActionBuilder

def silent: RequestBuilder[K, V]

def notSilent: RequestBuilder[K, V]

}
Loading

0 comments on commit be1a6f0

Please sign in to comment.