Skip to content

Commit

Permalink
feat: add silent option
Browse files Browse the repository at this point in the history
  • Loading branch information
jigarkhwar committed Apr 30, 2024
1 parent b7d75c1 commit 4c20c02
Show file tree
Hide file tree
Showing 16 changed files with 137 additions and 39 deletions.
3 changes: 2 additions & 1 deletion publish.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ ThisBuild / versionScheme := Some("semver-spec")
ThisBuild / organization := "org.galaxio"
ThisBuild / organizationName := "Galaxio Team"
ThisBuild / organizationHomepage := Some(url("https://github.com/galax-io"))
ThisBuild / description := "Plugin to support kafka performance testing in Gatling."

ThisBuild / homepage := Some(url("https://github.com/galax-io/gatling-kafka-plugin"))
ThisBuild / scmInfo := Some(
ScmInfo(
Expand All @@ -21,6 +21,7 @@ ThisBuild / developers := List(
)

// Remove all additional repository other than Maven Central from POM
ThisBuild / pomIncludeRepository := { _ => false }
ThisBuild / publishTo := {
val nexus = "https://s01.oss.sonatype.org/"
if (isSnapshot.value) Some("snapshots" at nexus + "content/repositories/snapshots")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,10 @@ public KafkaRequestBuilderBase(org.galaxio.gatling.kafka.request.builder.KafkaRe

public <K, V> RequestBuilder<?, ?> send(V payload, Headers headers) {
return new RequestBuilder<>(
wrapped.send(
null,
wrapped.send(null,
calculateExpression(payload),
toStaticValueExpression(headers),
Sender.noSchemaSender()
));
Sender.noSchemaSender()));
}

public ReqRepBase requestReply() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,24 @@

import io.gatling.javaapi.core.ActionBuilder;

import java.util.function.Function;

public class RequestBuilder<K, V> implements ActionBuilder {

private final org.galaxio.gatling.kafka.request.builder.RequestBuilder<K, V> wrapped;

public RequestBuilder(org.galaxio.gatling.kafka.request.builder.RequestBuilder<K,V> wrapped) {
public RequestBuilder(org.galaxio.gatling.kafka.request.builder.RequestBuilder<K, V> wrapped) {
this.wrapped = wrapped;
}

protected RequestBuilder<K, V> make(Function<
org.galaxio.gatling.kafka.request.builder.RequestBuilder<K, V>,
org.galaxio.gatling.kafka.request.builder.RequestBuilder<K, V>>
f) {
return new RequestBuilder<>(f.apply(wrapped));

}

@Override
public io.gatling.core.action.builder.ActionBuilder asScala() {
return wrapped.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.galaxio.gatling.kafka.actions

import com.softwaremill.quicklens._
import io.gatling.core.action.Action
import io.gatling.core.action.builder.ActionBuilder
import io.gatling.core.structure.ScenarioContext
Expand All @@ -9,7 +10,7 @@ import org.galaxio.gatling.kafka.request.builder.KafkaAttributes

import scala.jdk.CollectionConverters._

class KafkaRequestActionBuilder[K, V](attr: KafkaAttributes[K, V]) extends ActionBuilder {
case class KafkaRequestActionBuilder[K, V](attributes: KafkaAttributes[K, V]) extends ActionBuilder {

override def build(ctx: ScenarioContext, next: Action): Action = {

Expand All @@ -24,7 +25,7 @@ class KafkaRequestActionBuilder[K, V](attr: KafkaAttributes[K, V]) extends Actio

new KafkaRequestAction(
producer,
attr,
attributes,
coreComponents,
kafkaComponents.kafkaProtocol,
throttled,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.galaxio.gatling.kafka.actions

import com.softwaremill.quicklens._
import io.gatling.core.action.Action
import io.gatling.core.action.builder.ActionBuilder
import io.gatling.core.structure.ScenarioContext
Expand All @@ -10,8 +11,10 @@ import org.galaxio.gatling.kafka.protocol.KafkaProtocol
import org.galaxio.gatling.kafka.request.builder.Avro4sAttributes

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

case class KafkaRequestAvro4sActionBuilder[K: ClassTag, V: ClassTag](attributes: Avro4sAttributes[K, V]) extends ActionBuilder with NameGen {

class KafkaRequestAvro4sActionBuilder[K, V](attr: Avro4sAttributes[K, V]) extends ActionBuilder with NameGen {
override def build(ctx: ScenarioContext, next: Action): Action = {
import ctx._

Expand All @@ -23,7 +26,7 @@ class KafkaRequestAvro4sActionBuilder[K, V](attr: Avro4sAttributes[K, V]) extend

new KafkaAvro4sRequestAction(
producer,
attr,
attributes,
coreComponents,
kafkaComponents.kafkaProtocol,
throttled,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package org.galaxio.gatling.kafka.actions

import com.softwaremill.quicklens.ModifyPimp
import com.softwaremill.quicklens._
import io.gatling.core.action.Action
import io.gatling.core.action.builder.ActionBuilder
import io.gatling.core.structure.ScenarioContext
import io.gatling.core.util.NameGen
import org.galaxio.gatling.kafka.KafkaCheck
import org.galaxio.gatling.kafka.protocol.KafkaProtocol
import org.galaxio.gatling.kafka.request.builder.KafkaRequestReplyAttributes
import org.galaxio.gatling.kafka.request.builder.{KafkaRequestBuilder, KafkaRequestReplyAttributes}

import scala.reflect.ClassTag

case class KafkaRequestReplyActionBuilder[K: ClassTag, V: ClassTag](attributes: KafkaRequestReplyAttributes[K, V])
extends ActionBuilder {
extends ActionBuilder with NameGen {

def silent: KafkaRequestReplyActionBuilder[K, V] = this.modify(_.attributes.silent).setTo(Some(true))

def notSilent: KafkaRequestReplyActionBuilder[K, V] = this.modify(_.attributes.silent).setTo(Some(false))

def check(checks: KafkaCheck*): KafkaRequestReplyActionBuilder[K, V] =
this.modify(_.attributes.checks).using(_ ::: checks.toList)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ case class Avro4sAttributes[K, V](
format: RecordFormat[V],
fromRecord: FromRecord[V],
headers: Option[Expression[Headers]],
silent: Option[Boolean],
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ case class KafkaAttributes[K, V](
key: Option[Expression[K]],
payload: Expression[V],
headers: Option[Expression[Headers]],
silent: Option[Boolean],
)
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package org.galaxio.gatling.kafka.request.builder

import com.softwaremill.quicklens.ModifyPimp
import io.gatling.core.action.builder.ActionBuilder
import org.galaxio.gatling.kafka.actions.KafkaRequestAvro4sActionBuilder

case class KafkaAvro4sRequestBuilder[K, V](attr: Avro4sAttributes[K, V]) extends RequestBuilder[K, V] {
import scala.reflect.ClassTag

def build: ActionBuilder = new KafkaRequestAvro4sActionBuilder(attr)
case class KafkaAvro4sRequestBuilder[K: ClassTag, V: ClassTag](attributes: Avro4sAttributes[K, V])
extends RequestBuilder[K, V] {

def build: ActionBuilder = KafkaRequestAvro4sActionBuilder(attributes)

override def silent: RequestBuilder[K, V] = this.modify(_.attributes.silent).setTo(Some(true))

override def notSilent: RequestBuilder[K, V] = this.modify(_.attributes.silent).setTo(Some(false))
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package org.galaxio.gatling.kafka.request.builder

import com.softwaremill.quicklens.ModifyPimp
import io.gatling.core.action.builder.ActionBuilder
import org.galaxio.gatling.kafka.actions.KafkaRequestActionBuilder

case class KafkaRequestBuilder[K, V](attr: KafkaAttributes[K, V]) extends RequestBuilder[K, V] {
case class KafkaRequestBuilder[K, V](attributes: KafkaAttributes[K, V]) extends RequestBuilder[K, V] {

def build: ActionBuilder = new KafkaRequestActionBuilder(attr)
def silent: KafkaRequestBuilder[K, V] = this.modify(_.attributes.silent).setTo(Some(true))

def notSilent: KafkaRequestBuilder[K, V] = this.modify(_.attributes.silent).setTo(Some(false))

def build: ActionBuilder = KafkaRequestActionBuilder(attributes)

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ case class KafkaRequestBuilderBase(requestName: Expression[String]) {
sender.send(requestName, Some(key), payload, Some(headers))
}

def send[V](payload: Expression[V])(implicit sender: Sender[Nothing, V]): RequestBuilder[_, V] =
sender.send(requestName, None, payload)
def send[V](payload: Expression[V])(implicit
sender: Sender[Nothing, V],
): RequestBuilder[_, V] =
sender.send(requestName = requestName, key = None, payload = payload, headers = None)

def requestReply: ReqRepBase.type = ReqRepBase

Expand All @@ -37,22 +39,24 @@ case class KafkaRequestBuilderBase(requestName: Expression[String]) {
): KafkaRequestReplyActionBuilder[K, V] = {
KafkaRequestReplyActionBuilder[K, V](
new KafkaRequestReplyAttributes[K, V](
requestName,
inputTopic,
outputTopic,
key,
payload,
Some(headers),
implicitly[Serde[K]].serializer(),
implicitly[Serde[V]].serializer(),
List.empty,
requestName = requestName,
inputTopic = inputTopic,
outputTopic = outputTopic,
key = key,
value = payload,
headers = Some(headers),
keySerializer = implicitly[Serde[K]].serializer(),
valueSerializer = implicitly[Serde[V]].serializer(),
checks = List.empty,
silent = None,
),
)
}
}

case class RRInTopicStep(inputTopic: Expression[String]) {
def replyTopic(outputTopic: Expression[String]): RROutTopicStep = RROutTopicStep(inputTopic, outputTopic)

}
def requestTopic(rt: Expression[String]): RRInTopicStep = RRInTopicStep(rt)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ case class KafkaRequestReplyAttributes[K, V](
keySerializer: Serializer[K],
valueSerializer: Serializer[V],
checks: List[KafkaCheck],
silent: Option[Boolean],
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,28 @@ trait LowPriorSender {
new Sender[K, V] {

override def send(requestName: Expression[String], payload: Expression[V]): RequestBuilder[Nothing, V] =
KafkaRequestBuilder[Nothing, V](KafkaAttributes(requestName, None, payload, None))
KafkaRequestBuilder[Nothing, V](
KafkaAttributes(requestName = requestName, key = None, payload = payload, headers = None, silent = None),
)

override def send(
requestName: Expression[String],
key: Option[Expression[K]],
payload: Expression[V],
): RequestBuilder[K, V] =
KafkaRequestBuilder[K, V](KafkaAttributes(requestName, key, payload, None))
KafkaRequestBuilder[K, V](
KafkaAttributes(requestName = requestName, key = key, payload = payload, headers = None, silent = None),
)

override def send(
requestName: Expression[String],
key: Option[Expression[K]],
payload: Expression[V],
headers: Option[Expression[Headers]],
): RequestBuilder[K, V] =
KafkaRequestBuilder[K, V](KafkaAttributes(requestName, key, payload, headers))
KafkaRequestBuilder[K, V](
KafkaAttributes(requestName = requestName, key = key, payload = payload, headers = headers, silent = None),
)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@ import io.gatling.core.action.builder.ActionBuilder
trait RequestBuilder[K, V] {

def build: ActionBuilder

def silent: RequestBuilder[K, V]

def notSilent: RequestBuilder[K, V]

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import com.sksamuel.avro4s.{FromRecord, RecordFormat, SchemaFor}
import io.gatling.core.session.Expression
import org.apache.kafka.common.header.Headers

import scala.reflect.ClassTag

trait Sender[K, V] {

def send(requestName: Expression[String], payload: Expression[V]): RequestBuilder[Nothing, V]
Expand All @@ -21,30 +23,64 @@ trait Sender[K, V] {

object Sender extends LowPriorSender {

implicit def Avro4sSender[K, V](implicit
implicit def Avro4sSender[K: ClassTag, V: ClassTag](implicit
schema: SchemaFor[V],
format: RecordFormat[V],
fromRecord: FromRecord[V],
headers: Headers,
): Sender[K, V] = new Sender[K, V] {

override def send(requestName: Expression[String], payload: Expression[V]): RequestBuilder[Nothing, V] =
new KafkaAvro4sRequestBuilder[Nothing, V](Avro4sAttributes(requestName, None, payload, schema, format, fromRecord, None))
new KafkaAvro4sRequestBuilder[Nothing, V](
Avro4sAttributes(
requestName = requestName,
key = None,
payload = payload,
schema = schema,
format = format,
fromRecord = fromRecord,
headers = None,
silent = None,
),
)

override def send(
requestName: Expression[String],
key: Option[Expression[K]],
payload: Expression[V],
): RequestBuilder[K, V] =
new KafkaAvro4sRequestBuilder[K, V](Avro4sAttributes(requestName, key, payload, schema, format, fromRecord, None))
new KafkaAvro4sRequestBuilder[K, V](
Avro4sAttributes(
requestName = requestName,
key = key,
payload = payload,
schema = schema,
format = format,
fromRecord = fromRecord,
headers = None,
silent = None,
),
)

override def send(
requestName: Expression[String],
key: Option[Expression[K]],
payload: Expression[V],
headers: Option[Expression[Headers]],
): RequestBuilder[K, V] =
new KafkaAvro4sRequestBuilder[K, V](Avro4sAttributes(requestName, key, payload, schema, format, fromRecord, headers))
new KafkaAvro4sRequestBuilder[K, V](
Avro4sAttributes(
requestName = requestName,
key = key,
payload = payload,
schema = schema,
format = format,
fromRecord = fromRecord,
headers = headers,
silent = None,
),
)

}

}
Loading

0 comments on commit 4c20c02

Please sign in to comment.