Skip to content

Commit

Permalink
feat: add silent option
Browse files Browse the repository at this point in the history
  • Loading branch information
jigarkhwar committed May 9, 2024
1 parent e0b414e commit 0a147f5
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 32 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ jobs:
zookeeper:
image: zookeeper:latest
env:
ALLOW_ANONYMOUS_LOGIN: yes
ZOO_MY_ID: "1"
ZOO_PORT: "2181"
ZOO_SERVERS: server.1=zoo1:2888:3888
ports:
- '2181:2181'
kafka:
image: wurstmeister/kafka:2.13-2.6.3
image: bitnami/kafka:latest
env:
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_LISTENERS: BROKER://:9092,EXTERNAL://:9093
Expand Down
34 changes: 34 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
services:
zookeeper:
image: bitnami/zookeeper:latest
environment:
ALLOW_ANONYMOUS_LOGIN: yes
ZOO_MY_ID: "1"
ZOO_PORT: "2181"
ZOO_SERVERS: server.1=zoo1:2888:3888
ports:
- '2181:2181'
kafka:
image: bitnami/kafka:latest
environment:
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_LISTENERS: BROKER://:9092,EXTERNAL://:9093
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka:9092,EXTERNAL://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
KAFKA_BROKER_ID: "1"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CREATE_TOPICS: "myTopic1:1:1, test.t1:1:1, myTopic2:1:1, test.t2:1:1, myTopic3:1:1, test.t3:1:1"
ports:
- '9092:9092'
- '9093:9093'
schema-registry:
image: confluentinc/cp-schema-registry:7.2.1
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092,localhost:9093'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:9094
ports:
- '9094:9094'
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ public KafkaRequestBuilderBase(org.galaxio.gatling.kafka.request.builder.KafkaRe
calculateExpression(key),
calculateExpression(payload),
toStaticValueExpression(new RecordHeaders()),
false,
Sender.noSchemaSender()
));
}

public <K, V> RequestBuilder<?, ?> send(K key, V payload, Headers headers) {
public <K, V> RequestBuilder<?, ?> send(K key, V payload, Headers headers, boolean silent) {
return new RequestBuilder<>(
wrapped.send(
calculateExpression(key),
calculateExpression(payload),
toStaticValueExpression(headers),
silent,
Sender.noSchemaSender()
));
}
Expand All @@ -57,11 +59,12 @@ public KafkaRequestBuilderBase(org.galaxio.gatling.kafka.request.builder.KafkaRe
Sender.noSchemaSender()));
}

public <K, V> RequestBuilder<?, ?> send(V payload, Headers headers) {
public <K, V> RequestBuilder<?, ?> send(V payload, Headers headers, boolean silent) {
return new RequestBuilder<>(
wrapped.send(null,
calculateExpression(payload),
toStaticValueExpression(headers),
silent,
Sender.noSchemaSender()));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
package org.galaxio.gatling.kafka.javaapi.request.builder;

import io.gatling.javaapi.core.ActionBuilder;
import org.checkerframework.checker.nullness.qual.NonNull;

import java.util.function.Function;

public class RequestBuilder<K, V> implements ActionBuilder {

private final org.galaxio.gatling.kafka.request.builder.RequestBuilder<K, V> wrapped;

@NonNull
public RequestBuilder<K, V> silent() {
return make(org.galaxio.gatling.kafka.request.builder.RequestBuilder::silent);
}

@NonNull
public RequestBuilder<K, V> notSilent() {
return make(org.galaxio.gatling.kafka.request.builder.RequestBuilder::silent);
}

public RequestBuilder(org.galaxio.gatling.kafka.request.builder.RequestBuilder<K, V> wrapped) {
this.wrapped = wrapped;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import io.gatling.core.structure.ScenarioContext
import io.gatling.core.util.NameGen
import org.galaxio.gatling.kafka.KafkaCheck
import org.galaxio.gatling.kafka.protocol.KafkaProtocol
import org.galaxio.gatling.kafka.request.builder.{KafkaRequestBuilder, KafkaRequestReplyAttributes}
import org.galaxio.gatling.kafka.request.builder.{KafkaRequestReplyAttributes, RequestBuilder}

import scala.reflect.ClassTag

Expand All @@ -32,4 +32,5 @@ case class KafkaRequestReplyActionBuilder[K: ClassTag, V: ClassTag](attributes:
ctx.coreComponents.throttler,
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,45 @@ case class KafkaRequestBuilderBase(requestName: Expression[String]) {
key: Expression[K],
payload: Expression[V],
headers: Expression[Headers] = List.empty[Header],
silent: Boolean = false,
)(implicit
sender: Sender[K, V],
): RequestBuilder[K, V] = {
if (key == null)
sender.send(requestName, None, payload, Some(headers))
sender.send(requestName, None, payload, Some(headers), Some(silent))
else
sender.send(requestName, Some(key), payload, Some(headers))
sender.send(requestName, Some(key), payload, Some(headers), Some(silent))
}

def send[V](payload: Expression[V])(implicit
sender: Sender[Nothing, V],
): RequestBuilder[_, V] =
sender.send(requestName = requestName, key = None, payload = payload, headers = None)
sender.send(requestName = requestName, key = None, payload = payload, headers = None, silent = None)

def requestReply: ReqRepBase.type = ReqRepBase

object ReqRepBase {

case class RROutTopicStep(inputTopic: Expression[String], outputTopic: Expression[String]) {

def send[K: Serde: ClassTag, V: Serde: ClassTag](
key: Expression[K],
payload: Expression[V],
headers: Expression[Headers] = List.empty[Header].expressionSuccess,
): KafkaRequestReplyActionBuilder[K, V] = {
KafkaRequestReplyActionBuilder[K, V](
new KafkaRequestReplyAttributes[K, V](
requestName = requestName,
inputTopic = inputTopic,
outputTopic = outputTopic,
key = key,
value = payload,
headers = Some(headers),
keySerializer = implicitly[Serde[K]].serializer(),
valueSerializer = implicitly[Serde[V]].serializer(),
checks = List.empty,
silent = None,
),
)
}
): KafkaRequestReplyActionBuilder[K, V] = KafkaRequestReplyActionBuilder[K, V](
new KafkaRequestReplyAttributes[K, V](
requestName = requestName,
inputTopic = inputTopic,
outputTopic = outputTopic,
key = key,
value = payload,
headers = Some(headers),
keySerializer = implicitly[Serde[K]].serializer(),
valueSerializer = implicitly[Serde[V]].serializer(),
checks = List.empty,
silent = None,
),
)
}

case class RRInTopicStep(inputTopic: Expression[String]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ trait LowPriorSender {
key: Option[Expression[K]],
payload: Expression[V],
headers: Option[Expression[Headers]],
silent: Option[Boolean],
): RequestBuilder[K, V] =
KafkaRequestBuilder[K, V](
KafkaAttributes(requestName = requestName, key = key, payload = payload, headers = headers, silent = None),
KafkaAttributes(requestName = requestName, key = key, payload = payload, headers = headers, silent = silent),
)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ trait Sender[K, V] {
key: Option[Expression[K]],
payload: Expression[V],
headers: Option[Expression[Headers]],
silent: Option[Boolean],
): RequestBuilder[K, V]

}
Expand Down Expand Up @@ -65,6 +66,7 @@ object Sender extends LowPriorSender {
key: Option[Expression[K]],
payload: Expression[V],
headers: Option[Expression[Headers]],
silent: Option[Boolean],
): RequestBuilder[K, V] =
new KafkaAvro4sRequestBuilder[K, V](
Avro4sAttributes(
Expand All @@ -75,7 +77,7 @@ object Sender extends LowPriorSender {
format = format,
fromRecord = fromRecord,
headers = headers,
silent = None,
silent = silent,
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ public class ProducerSimulation extends Simulation {

private final KafkaProtocolBuilder kafkaConsumerConf =
KafkaDsl.kafka().topic("test.topic")
.properties(Map.of(ProducerConfig.ACKS_CONFIG, "1"));
.properties(Map.of(ProducerConfig.ACKS_CONFIG, "1"));

private final ScenarioBuilder scn = scenario("Basic")
.exec(KafkaDsl.kafka("BasicRequest").send("foo"))
.exec(KafkaDsl.kafka("BasicRequest SILENT").send("foo")
.silent())
.exec(KafkaDsl.kafka("dld").send("true", 12.0));

}
Original file line number Diff line number Diff line change
Expand Up @@ -160,21 +160,22 @@ class KafkaGatlingTest extends Simulation {
.send[Any, String](null, "nullkey"),
)
.exec(
kafka("Request String")
kafka("Request String SILENT")
.send[String]("foo")
.silent,
)

val scn: ScenarioBuilder = scenario("Request String")
.exec(kafka("Request String 2").send[String, String]("testCheckJson", """{ "m": "dkf" }"""))
.exec(kafka("Request String 2 SILENT").send[String, String]("testCheckJson", """{ "m": "dkf" }""").silent)

val scn2: ScenarioBuilder = scenario("Request Byte")
.exec(
kafka("Request Byte")
.send[Array[Byte], Array[Byte]]("key".getBytes(), "tstBytes".getBytes()),
)
.exec(
kafka("Request Byte")
kafka("Request Byte SILENT")
.send[Array[Byte], Array[Byte]]("key".getBytes(), "tstBytes".getBytes())
.silent,
)
Expand All @@ -187,19 +188,27 @@ class KafkaGatlingTest extends Simulation {
.send[Array[Byte], Array[Byte]]("test".getBytes(), "tstBytes".getBytes())
.check(bodyBytes.is("tstBytes".getBytes()).saveAs("bodyInfo")),
)
.exec(
kafka("Request Reply Bytes SILENT").requestReply
.requestTopic("myTopic2")
.replyTopic("test.t2")
.send[Array[Byte], Array[Byte]]("test".getBytes(), "tstBytes".getBytes())
.silent
.check(bodyBytes.is("tstBytes".getBytes()).saveAs("bodyInfo")),
)

val scnAvro4s: ScenarioBuilder = scenario("Request Avro4s")
.exec(
kafka("Request Simple Avro4s")
.send(Ingredient("Cheese", 1d, 50d)),
.send[Ingredient](Ingredient("Cheese", 1d, 50d)),
)
.exec(
kafka("Request Avro4s")
.send[String, Ingredient]("key4s", Ingredient("Cheese", 0d, 70d)),
)
.exec(
kafka("Request Simple Avro4s")
.send(Ingredient("Cheese", 1d, 50d))
kafka("Request Simple Avro4s SILENT")
.send[Ingredient](Ingredient("Cheese", 1d, 50d))
.silent,
)

Expand All @@ -210,6 +219,13 @@ class KafkaGatlingTest extends Simulation {
.replyTopic("test.t2")
.send[Array[Byte], Array[Byte]]("testWO".getBytes(), "tstBytesWO".getBytes()),
)
.exec(
kafka("Request Reply Bytes wo SILENT").requestReply
.requestTopic("myTopic2")
.replyTopic("test.t2")
.send[Array[Byte], Array[Byte]]("testWO".getBytes(), "tstBytesWO".getBytes())
.silent,
)

setUp(
scnRR.inject(atOnceUsers(1)).protocols(kafkaProtocolRRString),
Expand All @@ -220,7 +236,7 @@ class KafkaGatlingTest extends Simulation {
scnRRwo.inject(atOnceUsers(1)).protocols(kafkaProtocolRRBytes2),
scnwokey.inject(nothingFor(1), atOnceUsers(1)).protocols(kafkaConfwoKey),
).assertions(
global.failedRequests.percent.lt(15.0),
global.failedRequests.percent.lt(17.0),
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ class KafkaJavaapiMethodsGatlingTest extends Simulation {
.send("testJavaWithHeadersWithoutKey", new RecordHeaders().add("test-header", "test_value".getBytes()))
.asScala(),
)
.exec(
kafka("Request String without headers and key SILENT")
.send("testJavaWithoutKeyAndHeaders")
.silent()
.asScala(),
)
.inject(nothingFor(1), atOnceUsers(1))
.protocols(kafkaConfwoKey),
)
Expand Down

0 comments on commit 0a147f5

Please sign in to comment.