diff --git a/examples/kafka/kafka-source-binding.yaml b/examples/kafka/kafka-source-binding.yaml new file mode 100644 index 00000000..c32d6851 --- /dev/null +++ b/examples/kafka/kafka-source-binding.yaml @@ -0,0 +1,37 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: kafka-source-binding +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: kafka-source + properties: + bootstrapServers: ${YAKS_TESTCONTAINERS_KAFKA_LOCAL_BOOTSTRAP_SERVERS} + user: ${user} + password: ${password} + topic: ${topic} + securityProtocol: ${securityProtocol} + deserializeHeaders: ${deserializeHeaders} + sink: + uri: yaks:resolveURL('test-service')/result + diff --git a/examples/kafka/kafka-source.feature b/examples/kafka/kafka-source.feature new file mode 100644 index 00000000..065bccfa --- /dev/null +++ b/examples/kafka/kafka-source.feature @@ -0,0 +1,63 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +Feature: Kafka Kamelet source + + Background: + Given variable user is "" + Given variable password is "" + Given variables + | securityProtocol | PLAINTEXT | + | deserializeHeaders | true | + | topic | my-topic | + | source | Kafka Kamelet source | + | message | Camel K rocks! | + Given Kafka topic: ${topic} + Given Kafka topic partition: 0 + Given HTTP server timeout is 15000 ms + Given HTTP server "test-service" + + Scenario: Create infrastructure + Given start Kafka container + Given create Kubernetes service test-service with target port 8080 + + Scenario: Create Kamelet binding + When load KameletBinding kafka-source-binding.yaml + Then Camel K integration kafka-source-binding should be running + And Camel K integration kafka-source-binding should print Subscribing ${topic}-Thread 0 to topic ${topic} + And sleep 10sec + + Scenario: Send message to Kafka topic and verify sink output + Given variable key is "citrus:randomNumber(4)" + Given Kafka connection + | url | ${YAKS_TESTCONTAINERS_KAFKA_LOCAL_BOOTSTRAP_SERVERS} | + Given Kafka message key: ${key} + When send Kafka message with body and headers: ${message} + | event-source | ${source} | + Then expect HTTP request body: ${message} + Then expect HTTP request headers + | event-source | ${source} | + | kafka.TOPIC | ${topic} | + | kafka.KEY | ${key} | + | kafka.PARTITION | 0 | + And receive POST /result + And send HTTP 200 OK + + Scenario: Remove resources + Given delete KameletBinding kafka-source-binding + And delete Kubernetes service test-service + And stop Kafka container diff --git a/examples/kafka/yaks-config.yaml b/examples/kafka/yaks-config.yaml new file mode 100644 index 00000000..907a6022 --- /dev/null +++ b/examples/kafka/yaks-config.yaml @@ -0,0 +1,42 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +config: + runtime: + env: + - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_JBANG_CAMEL_DUMP_INTEGRATION_OUTPUT + value: true + - name: YAKS_TESTCONTAINERS_AUTO_REMOVE_RESOURCES + value: false + - name: CITRUS_TYPE_CONVERTER + value: camel + settings: + loggers: + - name: INTEGRATION_STATUS + level: INFO + - name: INTEGRATION_LOGS + level: INFO + resources: + - kafka-source-binding.yaml +post: + - name: print dump + if: env:CI=true && failure() + run: kamel dump diff --git a/java/pom.xml b/java/pom.xml index 4dc1e2ab..88bd2319 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -313,6 +313,11 @@ redpanda ${testcontainers.version} + + org.testcontainers + kafka + ${testcontainers.version} + diff --git a/java/steps/yaks-kafka/src/main/java/org/citrusframework/yaks/kafka/KafkaSteps.java b/java/steps/yaks-kafka/src/main/java/org/citrusframework/yaks/kafka/KafkaSteps.java index 5c8b8321..13b7beed 100644 --- a/java/steps/yaks-kafka/src/main/java/org/citrusframework/yaks/kafka/KafkaSteps.java +++ b/java/steps/yaks-kafka/src/main/java/org/citrusframework/yaks/kafka/KafkaSteps.java @@ -106,6 +106,13 @@ public void setConnection(DataTable properties) { kafkaEndpoint.getEndpointConfiguration().setConsumerGroup(context.replaceDynamicContentInString(consumerGroup)); } + @Given("^new (?:Kafka|kafka) connection$") + public void createConnection(DataTable properties) { + setConnection(properties); + kafkaEndpoint = new KafkaEndpoint(kafkaEndpoint.getEndpointConfiguration()); + citrus.getCitrusContext().getReferenceResolver().bind(endpointName, kafkaEndpoint); + } + @Given("^(?:Kafka|kafka) producer configuration$") public void setProducerConfig(DataTable properties) { Map producerProperties = properties.asMap(String.class, Object.class); diff --git a/java/steps/yaks-testcontainers/pom.xml b/java/steps/yaks-testcontainers/pom.xml index b8b1354c..216adbf6 100644 --- a/java/steps/yaks-testcontainers/pom.xml +++ b/java/steps/yaks-testcontainers/pom.xml @@ -62,6 +62,10 @@ org.testcontainers redpanda + + org.testcontainers + kafka + diff --git a/java/steps/yaks-testcontainers/src/main/java/org/citrusframework/yaks/testcontainers/KafkaSettings.java b/java/steps/yaks-testcontainers/src/main/java/org/citrusframework/yaks/testcontainers/KafkaSettings.java new file mode 100644 index 00000000..9f91a247 --- /dev/null +++ b/java/steps/yaks-testcontainers/src/main/java/org/citrusframework/yaks/testcontainers/KafkaSettings.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.citrusframework.yaks.testcontainers; + +/** + * @author Christoph Deppisch + */ +public class KafkaSettings { + + private static final String KAFKA_PROPERTY_PREFIX = TestContainersSettings.TESTCONTAINERS_PROPERTY_PREFIX + "kafka."; + private static final String KAFKA_ENV_PREFIX = TestContainersSettings.TESTCONTAINERS_ENV_PREFIX + "KAFKA_"; + + private static final String KAFKA_VERSION_PROPERTY = KAFKA_PROPERTY_PREFIX + "version"; + private static final String KAFKA_VERSION_ENV = KAFKA_ENV_PREFIX + "KAFKA_VERSION"; + private static final String KAFKA_VERSION_DEFAULT = "7.0.1"; + + private static final String KAFKA_IMAGE_NAME_PROPERTY = KAFKA_PROPERTY_PREFIX + "image.name"; + private static final String KAFKA_IMAGE_NAME_ENV = KAFKA_ENV_PREFIX + "KAFKA_IMAGE_NAME"; + private static final String KAFKA_IMAGE_NAME_DEFAULT = "confluentinc/cp-kafka"; + + private static final String STARTUP_TIMEOUT_PROPERTY = KAFKA_PROPERTY_PREFIX + "startup.timeout"; + private static final String STARTUP_TIMEOUT_ENV = KAFKA_ENV_PREFIX + "STARTUP_TIMEOUT"; + private static final String STARTUP_TIMEOUT_DEFAULT = "180"; + + private KafkaSettings() { + // prevent instantiation of utility class + } + + /** + * Kafka version setting. + * @return + */ + public static String getImageName() { + return System.getProperty(KAFKA_IMAGE_NAME_PROPERTY, + System.getenv(KAFKA_IMAGE_NAME_ENV) != null ? System.getenv(KAFKA_IMAGE_NAME_ENV) : KAFKA_IMAGE_NAME_DEFAULT); + } + + /** + * Kafka version setting. + * @return + */ + public static String getVersion() { + return System.getProperty(KAFKA_VERSION_PROPERTY, + System.getenv(KAFKA_VERSION_ENV) != null ? System.getenv(KAFKA_VERSION_ENV) : KAFKA_VERSION_DEFAULT); + } + + /** + * Time in seconds to wait for the container to startup and accept connections. + * @return + */ + public static int getStartupTimeout() { + return Integer.parseInt(System.getProperty(STARTUP_TIMEOUT_PROPERTY, + System.getenv(STARTUP_TIMEOUT_ENV) != null ? System.getenv(STARTUP_TIMEOUT_ENV) : STARTUP_TIMEOUT_DEFAULT)); + } +} diff --git a/java/steps/yaks-testcontainers/src/main/java/org/citrusframework/yaks/testcontainers/KafkaSteps.java b/java/steps/yaks-testcontainers/src/main/java/org/citrusframework/yaks/testcontainers/KafkaSteps.java new file mode 100644 index 00000000..e9a08ee8 --- /dev/null +++ b/java/steps/yaks-testcontainers/src/main/java/org/citrusframework/yaks/testcontainers/KafkaSteps.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.citrusframework.yaks.testcontainers; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +import com.consol.citrus.Citrus; +import com.consol.citrus.TestCaseRunner; +import com.consol.citrus.annotations.CitrusFramework; +import com.consol.citrus.annotations.CitrusResource; +import com.consol.citrus.context.TestContext; +import io.cucumber.datatable.DataTable; +import io.cucumber.java.Before; +import io.cucumber.java.Scenario; +import io.cucumber.java.en.Given; +import org.citrusframework.yaks.YaksSettings; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import static com.consol.citrus.container.FinallySequence.Builder.doFinally; +import static java.time.temporal.ChronoUnit.SECONDS; + +public class KafkaSteps { + + @CitrusFramework + private Citrus citrus; + + @CitrusResource + private TestCaseRunner runner; + + @CitrusResource + private TestContext context; + + private String kafkaVersion = KafkaSettings.getVersion(); + + private KafkaContainer kafkaContainer; + + private int startupTimeout = KafkaSettings.getStartupTimeout(); + + private Map env = new HashMap<>(); + + @Before + public void before(Scenario scenario) { + if (kafkaContainer == null && citrus.getCitrusContext().getReferenceResolver().isResolvable(KafkaContainer.class)) { + kafkaContainer = citrus.getCitrusContext().getReferenceResolver().resolve("kafkaContainer", KafkaContainer.class); + setConnectionSettings(kafkaContainer, context); + } + } + + @Given("^Kafka container version (^\\s+)$") + public void setKafkaVersion(String version) { + this.kafkaVersion = version; + } + + @Given("^Kafka container startup timeout is (\\d+)(?: s| seconds)$") + public void setStartupTimeout(int timeout) { + this.startupTimeout = timeout; + } + + @Given("^Kafka container env settings$") + public void setEnvSettings(DataTable settings) { + this.env.putAll(settings.asMap()); + } + + @Given("^start Kafka container$") + public void startKafka() { + kafkaContainer = new KafkaContainer(DockerImageName.parse(KafkaSettings.getImageName()).withTag(kafkaVersion)) + .withLabel("app", "yaks") + .withLabel("app.kubernetes.io/name", "kafka") + .withLabel("app.kubernetes.io/part-of", TestContainersSettings.getTestName()) + .withLabel("app.openshift.io/connects-to", TestContainersSettings.getTestId()) + .withNetworkAliases("kafka") + .withEnv(env) + .withStartupTimeout(Duration.of(startupTimeout, SECONDS)); + + kafkaContainer.start(); + + citrus.getCitrusContext().bind("kafkaContainer", kafkaContainer); + + setConnectionSettings(kafkaContainer, context); + + if (TestContainersSteps.autoRemoveResources) { + runner.run(doFinally() + .actions(context -> kafkaContainer.stop())); + } + } + + @Given("^stop Kafka container$") + public void stopKafka() { + if (kafkaContainer != null) { + kafkaContainer.stop(); + } + + env = new HashMap<>(); + } + + /** + * Sets the connection settings in current test context in the form of test variables. + * @param kafkaContainer + * @param context + */ + private void setConnectionSettings(KafkaContainer kafkaContainer, TestContext context) { + if (!kafkaContainer.isRunning()) { + return; + } + + String containerId = kafkaContainer.getContainerId().substring(0, 12); + + context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_HOST", kafkaContainer.getHost()); + context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_CONTAINER_IP", kafkaContainer.getHost()); + context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_CONTAINER_ID", containerId); + context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_CONTAINER_NAME", kafkaContainer.getContainerName()); + context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_SERVICE_PORT", String.valueOf(kafkaContainer.getMappedPort(KafkaContainer.KAFKA_PORT))); + context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_PORT", String.valueOf(kafkaContainer.getMappedPort(KafkaContainer.KAFKA_PORT))); + context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_LOCAL_BOOTSTRAP_SERVERS", kafkaContainer.getBootstrapServers()); + + if (YaksSettings.isLocal()) { + context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_SERVICE_NAME", "kafka"); + context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_BOOTSTRAP_SERVERS", kafkaContainer.getBootstrapServers()); + } else { + context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_SERVICE_NAME", String.format("kd-%s", containerId)); + context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_BOOTSTRAP_SERVERS", String.format("kd-%s:%s", containerId, kafkaContainer.getMappedPort(KafkaContainer.KAFKA_PORT))); + } + + context.setVariable(TestContainersSteps.TESTCONTAINERS_VARIABLE_PREFIX + "KAFKA_KUBE_DOCK_HOST", String.format("kd-%s", containerId)); + } +} diff --git a/java/steps/yaks-testcontainers/src/test/resources/org/citrusframework/yaks/testcontainers/kafka.feature b/java/steps/yaks-testcontainers/src/test/resources/org/citrusframework/yaks/testcontainers/kafka.feature new file mode 100644 index 00000000..5caf56b3 --- /dev/null +++ b/java/steps/yaks-testcontainers/src/test/resources/org/citrusframework/yaks/testcontainers/kafka.feature @@ -0,0 +1,24 @@ +Feature: Kafka + + Background: + Given Disable auto removal of Testcontainers resources + Given Kafka consumer timeout is 5000 milliseconds + Given Kafka topic: hello + + Scenario: Start Kafka container + Given start Kafka container + + Scenario: Send and receive Kafka message + Given variables + | key | citrus:randomNumber(4) | + | message | Hello Kafka | + Given new Kafka connection + | url | ${YAKS_TESTCONTAINERS_KAFKA_LOCAL_BOOTSTRAP_SERVERS} | + And Kafka message key: ${key} + When send Kafka message with body and headers: ${message} + | messageId | ${key} | + Then expect Kafka message with body and headers: ${message} + | messageId | ${key} | + + Scenario: Stop Kafka container + Given stop Kafka container diff --git a/java/steps/yaks-testcontainers/src/test/resources/org/citrusframework/yaks/testcontainers/redpanda.feature b/java/steps/yaks-testcontainers/src/test/resources/org/citrusframework/yaks/testcontainers/redpanda.feature index 369e408f..a47d27b5 100644 --- a/java/steps/yaks-testcontainers/src/test/resources/org/citrusframework/yaks/testcontainers/redpanda.feature +++ b/java/steps/yaks-testcontainers/src/test/resources/org/citrusframework/yaks/testcontainers/redpanda.feature @@ -5,14 +5,14 @@ Feature: Redpanda Given Kafka consumer timeout is 5000 milliseconds Given Kafka topic: hello - Scenario: Start container + Scenario: Start Redpanda container Given start Redpanda container Scenario: Send and receive Kafka message Given variables | key | citrus:randomNumber(4) | | message | Hello Redpanda | - Given Kafka connection + Given new Kafka connection | url | ${YAKS_TESTCONTAINERS_REDPANDA_LOCAL_BOOTSTRAP_SERVERS} | And Kafka message key: ${key} When send Kafka message with body and headers: ${message} @@ -20,5 +20,5 @@ Feature: Redpanda Then expect Kafka message with body and headers: ${message} | messageId | ${key} | - Scenario: Stop container + Scenario: Stop Redpanda container Given stop Redpanda container