From d083ceae49f026741301268d1c075d292bb860ea Mon Sep 17 00:00:00 2001 From: Yuri Silva <121974740+yurimssilva@users.noreply.github.com> Date: Mon, 25 Sep 2023 16:11:56 +0100 Subject: [PATCH] feat: add streaming kafka-to-http sample (#108) * feat: add streaming kafka-to-http sample Co-authored-by: ndr_brt * fix: gitignore removed from streaming samples and changed kafka image * fix: string serializer class * fix: kafka container config * refactor: versioning and timeout * fix: versioning * refactor: updated edc version * refactor: added executor to produce scheduled messages --------- Co-authored-by: ndr_brt --- gradle/libs.versions.toml | 6 + settings.gradle.kts | 1 + system-tests/build.gradle.kts | 4 + .../streaming/Streaming02KafkaToHttpTest.java | 157 ++++++++++++++++++ .../streaming-01-http-to-http/.gitignore | 1 - .../0-dataplane.json | 9 + .../streaming-02-kafka-to-http/1-asset.json | 12 ++ .../2-policy-definition.json | 10 ++ .../3-contract-definition.json | 7 + .../4-get-dataset.json | 7 + .../5-negotiate-contract.json | 23 +++ .../6-transfer.json | 15 ++ .../streaming-02-kafka-to-http/README.md | 154 +++++++++++++++++ .../streaming-02-runtime/build.gradle.kts | 49 ++++++ .../streaming-02-runtime/consumer.properties | 11 ++ .../streaming-02-runtime/provider.properties | 12 ++ 16 files changed, 477 insertions(+), 1 deletion(-) create mode 100644 system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java delete mode 100644 transfer/streaming/streaming-01-http-to-http/.gitignore create mode 100644 transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/1-asset.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/6-transfer.json create mode 100644 transfer/streaming/streaming-02-kafka-to-http/README.md create mode 100644 transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build.gradle.kts create mode 100644 transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/consumer.properties create mode 100644 transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/provider.properties diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index e7b30a3a..6e779ba8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -12,6 +12,8 @@ okhttp-mockwebserver = "5.0.0-alpha.11" openTelemetry = "1.18.0" restAssured = "5.3.2" rsApi = "3.1.0" +kafkaClients = "3.5.1" +testContainers = "1.18.3" [libraries] @@ -33,6 +35,7 @@ edc-data-plane-azure-storage = { module = "org.eclipse.edc:data-plane-azure-stor edc-data-plane-client = { module = "org.eclipse.edc:data-plane-client", version.ref = "edc" } edc-data-plane-core = { module = "org.eclipse.edc:data-plane-core", version.ref = "edc" } edc-data-plane-http = { module = "org.eclipse.edc:data-plane-http", version.ref = "edc" } +edc-data-plane-kafka = { module = "org.eclipse.edc:data-plane-kafka", version.ref = "edc" } edc-data-plane-selector-api = { module = "org.eclipse.edc:data-plane-selector-api", version.ref = "edc" } edc-data-plane-selector-client = { module = "org.eclipse.edc:data-plane-selector-client", version.ref = "edc" } edc-data-plane-selector-core = { module = "org.eclipse.edc:data-plane-selector-core", version.ref = "edc" } @@ -66,6 +69,9 @@ junit-pioneer = { module = "org.junit-pioneer:junit-pioneer", version.ref = "jun okhttp-mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp-mockwebserver" } opentelemetry-annotations = { module = "io.opentelemetry:opentelemetry-extension-annotations", version.ref = "openTelemetry" } restAssured = { module = "io.rest-assured:rest-assured", version.ref = "restAssured" } +kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafkaClients" } +testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "testContainers" } +testcontainers-junit = { module = "org.testcontainers:junit-jupiter", version.ref = "testContainers" } [plugins] shadow = { id = "com.github.johnrengelman.shadow", version = "8.1.1" } diff --git a/settings.gradle.kts b/settings.gradle.kts index 5d3eb165..dbcb5126 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -57,6 +57,7 @@ include("transfer:transfer-06-consumer-pull-http:http-pull-connector") include("transfer:transfer-07-provider-push-http:http-push-connector") include("transfer:streaming:streaming-01-http-to-http:streaming-01-runtime") +include("transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime") include("util:http-request-logger") diff --git a/system-tests/build.gradle.kts b/system-tests/build.gradle.kts index b59b966e..c5276dfa 100644 --- a/system-tests/build.gradle.kts +++ b/system-tests/build.gradle.kts @@ -24,6 +24,9 @@ dependencies { testImplementation(libs.awaitility) testImplementation(libs.okhttp.mockwebserver) testImplementation(libs.restAssured) + testImplementation(libs.testcontainers.junit) + testImplementation(libs.testcontainers.kafka) + testImplementation(libs.kafka.clients) // runtimes testCompileOnly(project(":basic:basic-01-basic-connector")) @@ -34,4 +37,5 @@ dependencies { testCompileOnly(project(":transfer:transfer-02-file-transfer-listener:file-transfer-listener-consumer")) testCompileOnly(project(":transfer:transfer-03-modify-transferprocess:modify-transferprocess-consumer")) testCompileOnly(project(":transfer:streaming:streaming-01-http-to-http:streaming-01-runtime")) + testCompileOnly(project(":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime")) } diff --git a/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java new file mode 100644 index 00000000..21708542 --- /dev/null +++ b/system-tests/src/test/java/org/eclipse/edc/samples/transfer/streaming/Streaming02KafkaToHttpTest.java @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial test implementation for sample + * + */ + +package org.eclipse.edc.samples.transfer.streaming; + +import jakarta.json.Json; +import okhttp3.mockwebserver.MockWebServer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; +import org.eclipse.edc.junit.testfixtures.TestUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileContentFromRelativePath; +import static org.eclipse.edc.samples.transfer.FileTransferSampleTestCommon.getFileFromRelativePath; + +@Testcontainers +@EndToEndTest +public class Streaming02KafkaToHttpTest { + + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.4.0"; + private static final String TOPIC = "kafka-stream-topic"; + private static final String MAX_DURATION = "PT30S"; + private static final String SAMPLE_FOLDER = "transfer/streaming/streaming-02-kafka-to-http"; + private static final Duration TIMEOUT = Duration.ofSeconds(30); + private static final Participant PROVIDER = Participant.Builder.newInstance() + .name("provider") + .id("provider") + .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:18181/management"))) + .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:18182/protocol"))) + .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:18183/control"))) + .build(); + private static final Participant CONSUMER = Participant.Builder.newInstance() + .name("consumer") + .id("consumer") + .managementEndpoint(new Participant.Endpoint(URI.create("http://localhost:28181/management"))) + .protocolEndpoint(new Participant.Endpoint(URI.create("http://localhost:28182/protocol"))) + .controlEndpoint(new Participant.Endpoint(URI.create("http://localhost:28183/control"))) + .build(); + + @Container + static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) + .withKraft() + .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true") + .withEnv("KAFKA_CREATE_TOPICS", TOPIC.concat(":1:1")); + + @RegisterExtension + static EdcRuntimeExtension providerConnector = new EdcRuntimeExtension( + ":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime", + "provider", + Map.of( + "edc.fs.config", + getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-02-runtime/provider.properties") + .getAbsolutePath() + ) + ); + + @RegisterExtension + static EdcRuntimeExtension consumerConnector = new EdcRuntimeExtension( + ":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime", + "consumer", + Map.of( + "edc.fs.config", + getFileFromRelativePath(SAMPLE_FOLDER + "/streaming-02-runtime/consumer.properties") + .getAbsolutePath() + ) + ); + private final int httpReceiverPort = TestUtils.getFreePort(); + private final MockWebServer consumerReceiverServer = new MockWebServer(); + + @BeforeEach + void setUp() throws IOException { + consumerReceiverServer.start(httpReceiverPort); + } + + @Test + void streamData() { + + PROVIDER.registerDataPlane(List.of("Kafka"), List.of("HttpData")); + + PROVIDER.createAsset(getFileContentFromRelativePath(SAMPLE_FOLDER + "/1-asset.json") + .replace("{{bootstrap.servers}}", kafkaContainer.getBootstrapServers()) + .replace("{{max.duration}}", MAX_DURATION) + .replace("{{topic}}", TOPIC)); + PROVIDER.createPolicyDefinition(getFileContentFromRelativePath(SAMPLE_FOLDER + "/2-policy-definition.json")); + PROVIDER.createContractDefinition( + getFileContentFromRelativePath(SAMPLE_FOLDER + "/3-contract-definition.json")); + + var destination = Json.createObjectBuilder() + .add("type", "HttpData") + .add("baseUrl", "http://localhost:" + httpReceiverPort) + .build(); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, "kafka-stream-asset", + Json.createObjectBuilder().build(), destination); + + await().atMost(TIMEOUT).untilAsserted(() -> { + String state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(STARTED.name()); + }); + + var producer = createKafkaProducer(); + var message = "message"; + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> producer + .send(new ProducerRecord<>(TOPIC, "key", message)), 0L, 100L, MICROSECONDS); + + await().atMost(TIMEOUT).untilAsserted(() -> { + var request = consumerReceiverServer.takeRequest(); + assertThat(request).isNotNull(); + assertThat(request.getBody().readByteArray()).isEqualTo(message.getBytes()); + }); + + producer.close(); + } + + private Producer createKafkaProducer() { + var props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return new KafkaProducer<>(props); + } + +} diff --git a/transfer/streaming/streaming-01-http-to-http/.gitignore b/transfer/streaming/streaming-01-http-to-http/.gitignore deleted file mode 100644 index 3db92cfa..00000000 --- a/transfer/streaming/streaming-01-http-to-http/.gitignore +++ /dev/null @@ -1 +0,0 @@ -source/* diff --git a/transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json b/transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json new file mode 100644 index 00000000..9f1285a1 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json @@ -0,0 +1,9 @@ +{ + "@context": { + "edc": "https://w3id.org/edc/v0.0.1/ns/" + }, + "@id": "http-pull-provider-dataplane", + "url": "http://localhost:19192/control/transfer", + "allowedSourceTypes": [ "Kafka" ], + "allowedDestTypes": [ "HttpData" ] +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/1-asset.json b/transfer/streaming/streaming-02-kafka-to-http/1-asset.json new file mode 100644 index 00000000..675614c9 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/1-asset.json @@ -0,0 +1,12 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@id": "kafka-stream-asset", + "properties": { + }, + "dataAddress": { + "type": "Kafka", + "kafka.bootstrap.servers": "{{bootstrap.servers}}", + "maxDuration": "{{max.duration}}", + "topic": "{{topic}}" + } +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json b/transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json new file mode 100644 index 00000000..4919c71a --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json @@ -0,0 +1,10 @@ +{ + "@context": { + "@vocab": "https://w3id.org/edc/v0.0.1/ns/", + "odrl": "http://www.w3.org/ns/odrl/2/" + }, + "@id": "no-constraint-policy", + "policy": { + "@type": "odrl:use" + } +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json b/transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json new file mode 100644 index 00000000..d424ec90 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json @@ -0,0 +1,7 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@id": "contract-definition", + "accessPolicyId": "no-constraint-policy", + "contractPolicyId": "no-constraint-policy", + "assetsSelector": [] +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json b/transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json new file mode 100644 index 00000000..0ec57558 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json @@ -0,0 +1,7 @@ +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@type": "DatasetRequest", + "@id": "kafka-stream-asset", + "counterPartyAddress": "http://localhost:18182/protocol", + "protocol": "dataspace-protocol-http" +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json b/transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json new file mode 100644 index 00000000..b525b894 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json @@ -0,0 +1,23 @@ +{ + "@context": { + "edc": "https://w3id.org/edc/v0.0.1/ns/", + "odrl": "http://www.w3.org/ns/odrl/2/" + }, + "@type": "NegotiationInitiateRequestDto", + "connectorAddress": "http://localhost:18182/protocol", + "counterPartyAddress": "http://localhost:18182/protocol", + "providerId": "provider", + "protocol": "dataspace-protocol-http", + "offer": { + "offerId": "{{offerId}}", + "assetId": "kafka-stream-asset", + "policy": { + "@id": "{{offerId}}", + "@type": "use", + "odrl:permission": [], + "odrl:prohibition": [], + "odrl:obligation": [], + "odrl:target": "kafka-stream-asset" + } + } +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/6-transfer.json b/transfer/streaming/streaming-02-kafka-to-http/6-transfer.json new file mode 100644 index 00000000..fc7e3937 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/6-transfer.json @@ -0,0 +1,15 @@ +{ + "@context": { + "edc": "https://w3id.org/edc/v0.0.1/ns/" + }, + "@type": "TransferRequest", + "dataDestination": { + "type": "HttpData", + "baseUrl": "http://localhost:4000" + }, + "protocol": "dataspace-protocol-http", + "assetId": "stream-asset", + "contractId": "{{contract-agreement-id}}", + "connectorId": "provider", + "connectorAddress": "http://localhost:18182/protocol" +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/README.md b/transfer/streaming/streaming-02-kafka-to-http/README.md new file mode 100644 index 00000000..aeac12c7 --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/README.md @@ -0,0 +1,154 @@ +# Streaming KAFKA to HTTP + +This sample demonstrates how to set up the EDC to stream messages from Kafka to HTTP. +This code is only for demonstration purposes and should not be used in production. + +## Concept + +We will use the data-plane kafka `DataSource` extension that will pull event records from a kafka topic and push it +to every consumer that has started a `TransferProcess` for a related asset. + +### Run + +Build the connector runtime, which will be used both for the provider and consumer: +```shell +./gradlew :transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime:build +``` + +Run the provider and the consumer, which must be started from different terminal shells: +```shell +# provider +export EDC_FS_CONFIG=transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/provider.properties +java -jar transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build/libs/connector.jar + +#consumer +export EDC_FS_CONFIG=transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/consumer.properties +java -jar transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build/libs/connector.jar +``` + +### Register Data Plane on provider + +The provider connector needs to be aware of the kafka streaming capabilities of the embedded dataplane, which can be registered with +this call: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json -X POST "http://localhost:18181/management/v2/dataplanes" +``` + +If you look at the `0-dataplane.json` you'll notice that the supported source is `Kafka` and the supported sink is `HttpData`. + +### Register Asset, Policy Definition and Contract Definition on provider + +A "source" kafka topic must first be created where the data plane will get the event records to be sent to the consumers. +To do this, initiate a Kafka server with the source topic: +```shell +docker run -e "KAFKA_CREATE_TOPICS={{topic}}:1:1" -p 9092:9092 -d bashj79/kafka-kraft:3.0.0 +``` + +Then put values of `kafka.bootstrap.servers`, `maxDuration` and `topic` in the [1-asset.json](1-asset.json) file replacing their placeholders. +```json + "dataAddress": { + "type": "Kafka", + "kafka.bootstrap.servers": "{{bootstrap.servers}}", + "maxDuration": "{{max.duration}}" + "topic": "{{topic}}" + } +``` + +Then create the Asset, the Policy Definition and the Contract Definition with these three calls: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/1-asset.json -X POST "http://localhost:18181/management/v3/assets" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" +``` + +### Negotiate the contract + +The typical flow requires fetching the catalog from the consumer side and using the contract offer to negotiate a contract. +However, in this sample case, we already have the provider asset (`"kafka-stream-asset"`) so we can get the related dataset +directly with this call: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq +``` + +The output will be something like: +```json +{ + "@id": "kafka-stream-asset", + "@type": "dcat:Dataset", + "odrl:hasPolicy": { + "@id": "Y29udHJhY3QtZGVmaW5pdGlvbg==:c3RyZWFtLWFzc2V0:NDlhYTUzZWEtMDUzMS00ZDkyLTg4Y2YtMGRjMTc4MmQ1NjY4", + "@type": "odrl:Set", + "odrl:permission": [], + "odrl:prohibition": [], + "odrl:obligation": [], + "odrl:target": "kafka-stream-asset" + }, + "dcat:distribution": { + "@type": "dcat:Distribution", + "dct:format": { + "@id": "HttpData" + }, + "dcat:accessService": "b24dfdbc-d17f-4d6e-9b5c-8fa71dacecfc" + }, + "edc:id": "kafka-stream-asset", + "@context": { + "dct": "https://purl.org/dc/terms/", + "edc": "https://w3id.org/edc/v0.0.1/ns/", + "dcat": "https://www.w3.org/ns/dcat/", + "odrl": "http://www.w3.org/ns/odrl/2/", + "dspace": "https://w3id.org/dspace/v0.8/" + } +} +``` + +With the `odrl:hasPolicy/@id` we can now replace it in the [negotiate-contract.json](5-negotiate-contract.json) file +and request the contract negotiation: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json -X POST "http://localhost:28181/management/v2/contractnegotiations" -s | jq +``` + +### Start the transfer + +First we need to set up the receiver server on the consumer side that will receive a call for every new event. For this +you'll need to open another terminal shell and run: +```shell +./gradlew util:http-request-logger:build +HTTP_SERVER_PORT=4000 java -jar util/http-request-logger/build/libs/http-request-logger.jar +``` +It will run on port 4000. + +At this point the contract agreement should already been issued, to verify that, please check the contract negotiation state with +this call, replacing `{{contract-negotiation-id}}` with the id returned by the negotiate contract call. +```shell +curl "http://localhost:28181/management/v2/contractnegotiations/{{contract-negotiation-id}}" -s | jq +``` + +If the `edc:contractAgreementId` is valued, it can be used to start the transfer, replacing it in the [6-transfer.json](6-transfer.json) +file to `{{contract-agreement-id}}` and then calling the connector with this command: +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/6-transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq +``` +> Note that the destination address is `localhost:4000`, this because is where our http server is listening. + +Let's wait until the transfer state is `STARTED` state executing this call, replacing to `{{transfer-process-id}}` the id returned +by the start transfer call: +```shell +curl "http://localhost:28181/management/v2/transferprocesses/{{transfer-process-id}}" -s | jq +``` + +### Produce events + +With the Kafka server running in Docker, you can use the Kafka command-line producer `kafka-console-producer.sh` to produce a message. In a new terminal shell, you'll need to execute: +```shell +docker exec -it {{docker-container-id}} /opt/kafka/bin/kafka-console-producer.sh --topic kafka-stream-topic --bootstrap-server localhost:9092 +``` +This command will open an interactive prompt for you to input your message. Once you've typed your message and pressed Enter, it will be produced, consumed and pushed to the receiver server. You should observe the content being logged on its terminal shell: + +``` +Incoming request +Method: POST +Path: / +Body: + +... +``` \ No newline at end of file diff --git a/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build.gradle.kts b/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build.gradle.kts new file mode 100644 index 00000000..745aa1ae --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build.gradle.kts @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2020, 2021 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - initial API and implementation + * Fraunhofer Institute for Software and Systems Engineering - added dependencies + * ZF Friedrichshafen AG - add dependency + * + */ + +plugins { + `java-library` + id("application") + alias(libs.plugins.shadow) +} + +dependencies { + implementation(libs.edc.control.plane.api.client) + implementation(libs.edc.control.plane.core) + implementation(libs.edc.data.plane.selector.core) + implementation(libs.edc.api.observability) + implementation(libs.edc.configuration.filesystem) + implementation(libs.edc.iam.mock) + implementation(libs.edc.management.api) + implementation(libs.edc.dsp) + implementation(libs.edc.data.plane.selector.api) + implementation(libs.edc.data.plane.selector.client) + implementation(libs.edc.transfer.data.plane) + implementation(libs.edc.data.plane.spi) + implementation(libs.edc.data.plane.core) + implementation(libs.edc.data.plane.http) + implementation(libs.edc.data.plane.kafka) + +} + +application { + mainClass.set("org.eclipse.edc.boot.system.runtime.BaseRuntime") +} + +tasks.withType { + mergeServiceFiles() + archiveFileName.set("connector.jar") +} diff --git a/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/consumer.properties b/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/consumer.properties new file mode 100644 index 00000000..a778cd4d --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/consumer.properties @@ -0,0 +1,11 @@ +web.http.port=28180 +web.http.path=/api +web.http.management.port=28181 +web.http.management.path=/management +web.http.protocol.port=28182 +web.http.protocol.path=/protocol +web.http.control.port=28183 +web.http.control.path=/control +edc.dsp.callback.address=http://localhost:28182/protocol +edc.participant.id=consumer +edc.ids.id=urn:connector:consumer diff --git a/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/provider.properties b/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/provider.properties new file mode 100644 index 00000000..a357378a --- /dev/null +++ b/transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/provider.properties @@ -0,0 +1,12 @@ +web.http.port=18180 +web.http.path=/api +web.http.management.port=18181 +web.http.management.path=/management +web.http.protocol.port=18182 +web.http.protocol.path=/protocol +web.http.control.port=18183 +web.http.control.path=/control +edc.dsp.callback.address=http://localhost:18182/protocol +edc.participant.id=provider +edc.ids.id=urn:connector:provider +edc.dataplane.http.sink.partition.size=1