Skip to content

Commit

Permalink
chore: Add Kafka testcontainers support
Browse files Browse the repository at this point in the history
  • Loading branch information
christophd committed Mar 13, 2023
1 parent f0f7bd2 commit caa44e8
Show file tree
Hide file tree
Showing 10 changed files with 399 additions and 3 deletions.
37 changes: 37 additions & 0 deletions examples/kafka/kafka-source-binding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

apiVersion: camel.apache.org/v1alpha1
kind: KameletBinding
metadata:
name: kafka-source-binding
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1alpha1
name: kafka-source
properties:
bootstrapServers: ${YAKS_TESTCONTAINERS_KAFKA_LOCAL_BOOTSTRAP_SERVERS}
user: ${user}
password: ${password}
topic: ${topic}
securityProtocol: ${securityProtocol}
deserializeHeaders: ${deserializeHeaders}
sink:
uri: yaks:resolveURL('test-service')/result

63 changes: 63 additions & 0 deletions examples/kafka/kafka-source.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

Feature: Kafka Kamelet source

Background:
Given variable user is ""
Given variable password is ""
Given variables
| securityProtocol | PLAINTEXT |
| deserializeHeaders | true |
| topic | my-topic |
| source | Kafka Kamelet source |
| message | Camel K rocks! |
Given Kafka topic: ${topic}
Given Kafka topic partition: 0
Given HTTP server timeout is 15000 ms
Given HTTP server "test-service"

Scenario: Create infrastructure
Given start Kafka container
Given create Kubernetes service test-service with target port 8080

Scenario: Create Kamelet binding
When load KameletBinding kafka-source-binding.yaml
Then Camel K integration kafka-source-binding should be running
And Camel K integration kafka-source-binding should print Subscribing ${topic}-Thread 0 to topic ${topic}
And sleep 10sec

Scenario: Send message to Kafka topic and verify sink output
Given variable key is "citrus:randomNumber(4)"
Given Kafka connection
| url | ${YAKS_TESTCONTAINERS_KAFKA_LOCAL_BOOTSTRAP_SERVERS} |
Given Kafka message key: ${key}
When send Kafka message with body and headers: ${message}
| event-source | ${source} |
Then expect HTTP request body: ${message}
Then expect HTTP request headers
| event-source | ${source} |
| kafka.TOPIC | ${topic} |
| kafka.KEY | ${key} |
| kafka.PARTITION | 0 |
And receive POST /result
And send HTTP 200 OK

Scenario: Remove resources
Given delete KameletBinding kafka-source-binding
And delete Kubernetes service test-service
And stop Kafka container
42 changes: 42 additions & 0 deletions examples/kafka/yaks-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

config:
runtime:
env:
- name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES
value: false
- name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES
value: false
- name: YAKS_JBANG_CAMEL_DUMP_INTEGRATION_OUTPUT
value: true
- name: YAKS_TESTCONTAINERS_AUTO_REMOVE_RESOURCES
value: false
- name: CITRUS_TYPE_CONVERTER
value: camel
settings:
loggers:
- name: INTEGRATION_STATUS
level: INFO
- name: INTEGRATION_LOGS
level: INFO
resources:
- kafka-source-binding.yaml
post:
- name: print dump
if: env:CI=true && failure()
run: kamel dump
5 changes: 5 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@
<artifactId>redpanda</artifactId>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers.version}</version>
</dependency>

<!-- JMS connection factory dependencies in provided scope -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ public void setConnection(DataTable properties) {
kafkaEndpoint.getEndpointConfiguration().setConsumerGroup(context.replaceDynamicContentInString(consumerGroup));
}

@Given("^new (?:Kafka|kafka) connection$")
public void createConnection(DataTable properties) {
setConnection(properties);
kafkaEndpoint = new KafkaEndpoint(kafkaEndpoint.getEndpointConfiguration());
citrus.getCitrusContext().getReferenceResolver().bind(endpointName, kafkaEndpoint);
}

@Given("^(?:Kafka|kafka) producer configuration$")
public void setProducerConfig(DataTable properties) {
Map<String, Object> producerProperties = properties.asMap(String.class, Object.class);
Expand Down
4 changes: 4 additions & 0 deletions java/steps/yaks-testcontainers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
<groupId>org.testcontainers</groupId>
<artifactId>redpanda</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
</dependency>

<!-- AWS Localstack -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.citrusframework.yaks.testcontainers;

/**
* @author Christoph Deppisch
*/
public class KafkaSettings {

private static final String KAFKA_PROPERTY_PREFIX = TestContainersSettings.TESTCONTAINERS_PROPERTY_PREFIX + "kafka.";
private static final String KAFKA_ENV_PREFIX = TestContainersSettings.TESTCONTAINERS_ENV_PREFIX + "KAFKA_";

private static final String KAFKA_VERSION_PROPERTY = KAFKA_PROPERTY_PREFIX + "version";
private static final String KAFKA_VERSION_ENV = KAFKA_ENV_PREFIX + "KAFKA_VERSION";
private static final String KAFKA_VERSION_DEFAULT = "7.0.1";

private static final String KAFKA_IMAGE_NAME_PROPERTY = KAFKA_PROPERTY_PREFIX + "image.name";
private static final String KAFKA_IMAGE_NAME_ENV = KAFKA_ENV_PREFIX + "KAFKA_IMAGE_NAME";
private static final String KAFKA_IMAGE_NAME_DEFAULT = "confluentinc/cp-kafka";

private static final String STARTUP_TIMEOUT_PROPERTY = KAFKA_PROPERTY_PREFIX + "startup.timeout";
private static final String STARTUP_TIMEOUT_ENV = KAFKA_ENV_PREFIX + "STARTUP_TIMEOUT";
private static final String STARTUP_TIMEOUT_DEFAULT = "180";

private KafkaSettings() {
// prevent instantiation of utility class
}

/**
* Kafka version setting.
* @return
*/
public static String getImageName() {
return System.getProperty(KAFKA_IMAGE_NAME_PROPERTY,
System.getenv(KAFKA_IMAGE_NAME_ENV) != null ? System.getenv(KAFKA_IMAGE_NAME_ENV) : KAFKA_IMAGE_NAME_DEFAULT);
}

/**
* Kafka version setting.
* @return
*/
public static String getVersion() {
return System.getProperty(KAFKA_VERSION_PROPERTY,
System.getenv(KAFKA_VERSION_ENV) != null ? System.getenv(KAFKA_VERSION_ENV) : KAFKA_VERSION_DEFAULT);
}

/**
* Time in seconds to wait for the container to startup and accept connections.
* @return
*/
public static int getStartupTimeout() {
return Integer.parseInt(System.getProperty(STARTUP_TIMEOUT_PROPERTY,
System.getenv(STARTUP_TIMEOUT_ENV) != null ? System.getenv(STARTUP_TIMEOUT_ENV) : STARTUP_TIMEOUT_DEFAULT));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.citrusframework.yaks.testcontainers;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import com.consol.citrus.Citrus;
import com.consol.citrus.TestCaseRunner;
import com.consol.citrus.annotations.CitrusFramework;
import com.consol.citrus.annotations.CitrusResource;
import com.consol.citrus.context.TestContext;
import io.cucumber.datatable.DataTable;
import io.cucumber.java.Before;
import io.cucumber.java.Scenario;
import io.cucumber.java.en.Given;
import org.citrusframework.yaks.YaksSettings;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import static com.consol.citrus.container.FinallySequence.Builder.doFinally;
import static java.time.temporal.ChronoUnit.SECONDS;

public class KafkaSteps {

@CitrusFramework
private Citrus citrus;

@CitrusResource
private TestCaseRunner runner;

@CitrusResource
private TestContext context;

private String kafkaVersion = KafkaSettings.getVersion();

private KafkaContainer kafkaContainer;

private int startupTimeout = KafkaSettings.getStartupTimeout();

private Map<String, String> env = new HashMap<>();

@Before
public void before(Scenario scenario) {
if (kafkaContainer == null && citrus.getCitrusContext().getReferenceResolver().isResolvable(KafkaContainer.class)) {
kafkaContainer = citrus.getCitrusContext().getReferenceResolver().resolve("kafkaContainer", KafkaContainer.class);
setConnectionSettings(kafkaContainer, context);
}
}

@Given("^Kafka container version (^\\s+)$")
public void setKafkaVersion(String version) {
this.kafkaVersion = version;
}

@Given("^Kafka container startup timeout is (\\d+)(?: s| seconds)$")
public void setStartupTimeout(int timeout) {
this.startupTimeout = timeout;
}

@Given("^Kafka container env settings$")
public void setEnvSettings(DataTable settings) {
this.env.putAll(settings.asMap());
}

@Given("^start Kafka container$")
public void startKafka() {
kafkaContainer = new KafkaContainer(DockerImageName.parse(KafkaSettings.getImageName()).withTag(kafkaVersion))
.withLabel("app", "yaks")
.withLabel("app.kubernetes.io/name", "kafka")
.withLabel("app.kubernetes.io/part-of", TestContainersSettings.getTestName())
.withLabel("app.openshift.io/connects-to", TestContainersSettings.getTestId())
.withNetworkAliases("kafka")
.withEnv(env)
.withStartupTimeout(Duration.of(startupTimeout, SECONDS));

kafkaContainer.start();

citrus.getCitrusContext().bind("kafkaContainer", kafkaContainer);

setConnectionSettings(kafkaContainer, context);

if (TestContainersSteps.autoRemoveResources) {
runner.run(doFinally()
.actions(context -> kafkaContainer.stop()));
}
}

@Given("^stop Kafka container$")
public void stopKafka() {
if (kafkaContainer != null) {
kafkaContainer.stop();
}

env = new HashMap<>();
}

/**
* Sets the connection settings in current test context in the form of test variables.
* @param kafkaContainer
* @param context
*/
private void setConnectionSettings(KafkaContainer kafkaContainer, TestContext context) {
if (!kafkaContainer.isRunning()) {
return;
}

String containerId = kafkaContainer.getContainerId().substring(0, 12);

context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_HOST", kafkaContainer.getHost());
context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_CONTAINER_IP", kafkaContainer.getHost());
context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_CONTAINER_ID", containerId);
context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_CONTAINER_NAME", kafkaContainer.getContainerName());
context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_SERVICE_PORT", String.valueOf(kafkaContainer.getMappedPort(KafkaContainer.KAFKA_PORT)));
context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_PORT", String.valueOf(kafkaContainer.getMappedPort(KafkaContainer.KAFKA_PORT)));
context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_LOCAL_BOOTSTRAP_SERVERS", kafkaContainer.getBootstrapServers());

if (YaksSettings.isLocal()) {
context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_SERVICE_NAME", "kafka");
context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_BOOTSTRAP_SERVERS", kafkaContainer.getBootstrapServers());
} else {
context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_SERVICE_NAME", String.format("kd-%s", containerId));
context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_BOOTSTRAP_SERVERS", String.format("kd-%s:%s", containerId, kafkaContainer.getMappedPort(KafkaContainer.KAFKA_PORT)));
}

context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_KUBE_DOCK_HOST", String.format("kd-%s", containerId));
}
}
Loading

0 comments on commit caa44e8

Please sign in to comment.