Skip to content

Commit

Permalink
feat: add streaming kafka-to-http sample (#108)
Browse files Browse the repository at this point in the history
* feat: add streaming kafka-to-http sample

Co-authored-by: ndr_brt <[email protected]>

* fix: gitignore removed from streaming samples and changed kafka image

* fix: string serializer class

* fix: kafka container config

* refactor: versioning and timeout

* fix: versioning

* refactor: updated edc version

* refactor: added executor to produce scheduled messages

---------

Co-authored-by: ndr_brt <[email protected]>
  • Loading branch information
yurimssilva and ndr-brt authored Sep 25, 2023
1 parent edd7719 commit d083cea
Show file tree
Hide file tree
Showing 16 changed files with 477 additions and 1 deletion.
6 changes: 6 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ okhttp-mockwebserver = "5.0.0-alpha.11"
openTelemetry = "1.18.0"
restAssured = "5.3.2"
rsApi = "3.1.0"
kafkaClients = "3.5.1"
testContainers = "1.18.3"


[libraries]
Expand All @@ -33,6 +35,7 @@ edc-data-plane-azure-storage = { module = "org.eclipse.edc:data-plane-azure-stor
edc-data-plane-client = { module = "org.eclipse.edc:data-plane-client", version.ref = "edc" }
edc-data-plane-core = { module = "org.eclipse.edc:data-plane-core", version.ref = "edc" }
edc-data-plane-http = { module = "org.eclipse.edc:data-plane-http", version.ref = "edc" }
edc-data-plane-kafka = { module = "org.eclipse.edc:data-plane-kafka", version.ref = "edc" }
edc-data-plane-selector-api = { module = "org.eclipse.edc:data-plane-selector-api", version.ref = "edc" }
edc-data-plane-selector-client = { module = "org.eclipse.edc:data-plane-selector-client", version.ref = "edc" }
edc-data-plane-selector-core = { module = "org.eclipse.edc:data-plane-selector-core", version.ref = "edc" }
Expand Down Expand Up @@ -66,6 +69,9 @@ junit-pioneer = { module = "org.junit-pioneer:junit-pioneer", version.ref = "jun
okhttp-mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp-mockwebserver" }
opentelemetry-annotations = { module = "io.opentelemetry:opentelemetry-extension-annotations", version.ref = "openTelemetry" }
restAssured = { module = "io.rest-assured:rest-assured", version.ref = "restAssured" }
kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafkaClients" }
testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "testContainers" }
testcontainers-junit = { module = "org.testcontainers:junit-jupiter", version.ref = "testContainers" }

[plugins]
shadow = { id = "com.github.johnrengelman.shadow", version = "8.1.1" }
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ include("transfer:transfer-06-consumer-pull-http:http-pull-connector")
include("transfer:transfer-07-provider-push-http:http-push-connector")

include("transfer:streaming:streaming-01-http-to-http:streaming-01-runtime")
include("transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime")

include("util:http-request-logger")

Expand Down
4 changes: 4 additions & 0 deletions system-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ dependencies {
testImplementation(libs.awaitility)
testImplementation(libs.okhttp.mockwebserver)
testImplementation(libs.restAssured)
testImplementation(libs.testcontainers.junit)
testImplementation(libs.testcontainers.kafka)
testImplementation(libs.kafka.clients)

// runtimes
testCompileOnly(project(":basic:basic-01-basic-connector"))
Expand All @@ -34,4 +37,5 @@ dependencies {
testCompileOnly(project(":transfer:transfer-02-file-transfer-listener:file-transfer-listener-consumer"))
testCompileOnly(project(":transfer:transfer-03-modify-transferprocess:modify-transferprocess-consumer"))
testCompileOnly(project(":transfer:streaming:streaming-01-http-to-http:streaming-01-runtime"))
testCompileOnly(project(":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial test implementation for sample
*
*/

package org.eclipse.edc.samples.transfer.streaming;

import jakarta.json.Json;
import okhttp3.mockwebserver.MockWebServer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.edc.junit.annotations.EndToEndTest;
import org.eclipse.edc.junit.extensions.EdcRuntimeExtension;
import org.eclipse.edc.junit.testfixtures.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED;
import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileContentFromRelativePath;
import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileFromRelativePath;

@Testcontainers
@EndToEndTest
public class Streaming02KafkaToHttpTest {

private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0";
private static final String TOPIC = "kafka-stream-topic";
private static final String MAX_DURATION = "PT30S";
private static final String SAMPLE_FOLDER = "transfer/streaming/streaming-02-kafka-to-http";
private static final Duration TIMEOUT = Duration.ofSeconds(30);
private static final Participant PROVIDER = Participant.Builder.newInstance()
.name("provider")
.id("provider")
.managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:18181/management")))
.protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:18182/protocol")))
.controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:18183/control")))
.build();
private static final Participant CONSUMER = Participant.Builder.newInstance()
.name("consumer")
.id("consumer")
.managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:28181/management")))
.protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:28182/protocol")))
.controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:28183/control")))
.build();

@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
.withKraft()
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
.withEnv("KAFKA_CREATE_TOPICS", TOPIC.concat(":1:1"));

@RegisterExtension
static EdcRuntimeExtension providerConnector = new EdcRuntimeExtension(
":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime",
"provider",
Map.of(
"edc.fs.config",
getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-02-runtime/provider.properties")
.getAbsolutePath()
)
);

@RegisterExtension
static EdcRuntimeExtension consumerConnector = new EdcRuntimeExtension(
":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime",
"consumer",
Map.of(
"edc.fs.config",
getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-02-runtime/consumer.properties")
.getAbsolutePath()
)
);
private final int httpReceiverPort = TestUtils.getFreePort();
private final MockWebServer consumerReceiverServer = new MockWebServer();

@BeforeEach
void setUp() throws IOException {
consumerReceiverServer.start(httpReceiverPort);
}

@Test
void streamData() {

PROVIDER.registerDataPlane(List.of("Kafka"), List.of("HttpData"));

PROVIDER.createAsset(getFileContentFromRelativePath(SAMPLE_FOLDER + "/1-asset.json")
.replace("{{bootstrap.servers}}", kafkaContainer.getBootstrapServers())
.replace("{{max.duration}}", MAX_DURATION)
.replace("{{topic}}", TOPIC));
PROVIDER.createPolicyDefinition(getFileContentFromRelativePath(SAMPLE_FOLDER + "/2-policy-definition.json"));
PROVIDER.createContractDefinition(
getFileContentFromRelativePath(SAMPLE_FOLDER + "/3-contract-definition.json"));

var destination = Json.createObjectBuilder()
.add("type", "HttpData")
.add("baseUrl", "http://localhost:" + httpReceiverPort)
.build();

var transferProcessId = CONSUMER.requestAsset(PROVIDER, "kafka-stream-asset",
Json.createObjectBuilder().build(), destination);

await().atMost(TIMEOUT).untilAsserted(() -> {
String state = CONSUMER.getTransferProcessState(transferProcessId);
assertThat(state).isEqualTo(STARTED.name());
});

var producer = createKafkaProducer();
var message = "message";
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> producer
.send(new ProducerRecord<>(TOPIC, "key", message)), 0L, 100L, MICROSECONDS);

await().atMost(TIMEOUT).untilAsserted(() -> {
var request = consumerReceiverServer.takeRequest();
assertThat(request).isNotNull();
assertThat(request.getBody().readByteArray()).isEqualTo(message.getBytes());
});

producer.close();
}

private Producer<String, String> createKafkaProducer() {
var props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}

}
1 change: 0 additions & 1 deletion transfer/streaming/streaming-01-http-to-http/.gitignore

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"@context": {
"edc": "https://w3id.org/edc/v0.0.1/ns/"
},
"@id": "http-pull-provider-dataplane",
"url": "http://localhost:19192/control/transfer",
"allowedSourceTypes": [ "Kafka" ],
"allowedDestTypes": [ "HttpData" ]
}
12 changes: 12 additions & 0 deletions transfer/streaming/streaming-02-kafka-to-http/1-asset.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" },
"@id": "kafka-stream-asset",
"properties": {
},
"dataAddress": {
"type": "Kafka",
"kafka.bootstrap.servers": "{{bootstrap.servers}}",
"maxDuration": "{{max.duration}}",
"topic": "{{topic}}"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
},
"@id": "no-constraint-policy",
"policy": {
"@type": "odrl:use"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" },
"@id": "contract-definition",
"accessPolicyId": "no-constraint-policy",
"contractPolicyId": "no-constraint-policy",
"assetsSelector": []
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" },
"@type": "DatasetRequest",
"@id": "kafka-stream-asset",
"counterPartyAddress": "http://localhost:18182/protocol",
"protocol": "dataspace-protocol-http"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"@context": {
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"odrl": "http://www.w3.org/ns/odrl/2/"
},
"@type": "NegotiationInitiateRequestDto",
"connectorAddress": "http://localhost:18182/protocol",
"counterPartyAddress": "http://localhost:18182/protocol",
"providerId": "provider",
"protocol": "dataspace-protocol-http",
"offer": {
"offerId": "{{offerId}}",
"assetId": "kafka-stream-asset",
"policy": {
"@id": "{{offerId}}",
"@type": "use",
"odrl:permission": [],
"odrl:prohibition": [],
"odrl:obligation": [],
"odrl:target": "kafka-stream-asset"
}
}
}
15 changes: 15 additions & 0 deletions transfer/streaming/streaming-02-kafka-to-http/6-transfer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"@context": {
"edc": "https://w3id.org/edc/v0.0.1/ns/"
},
"@type": "TransferRequest",
"dataDestination": {
"type": "HttpData",
"baseUrl": "http://localhost:4000"
},
"protocol": "dataspace-protocol-http",
"assetId": "stream-asset",
"contractId": "{{contract-agreement-id}}",
"connectorId": "provider",
"connectorAddress": "http://localhost:18182/protocol"
}
Loading

0 comments on commit d083cea

Please sign in to comment.