From 6bcf141a7ebaf2c87673bdd9759aaa88a91af36b Mon Sep 17 00:00:00 2001 From: Giovanni Baratta Date: Sun, 10 Mar 2024 17:05:48 +0000 Subject: [PATCH] AB#154 feat: add http request forwaring in EventToHttpConverter job --- .../it/giovannibaratta/AsyncHttpClient.java | 60 +++++++++++++++++++ .../giovannibaratta/EventToHttpConverter.java | 42 ++++++++++--- .../it/giovannibaratta/KafkaDeserializer.java | 25 ++++++++ .../java/it/giovannibaratta/KafkaEvent.java | 20 +++++++ service/dev-external-deps/docker-compose.yaml | 22 ++++++- service/dev-external-deps/nginx/nginx.conf | 9 +++ service/package.json | 1 + 7 files changed, 169 insertions(+), 10 deletions(-) create mode 100644 service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/AsyncHttpClient.java create mode 100644 service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/KafkaDeserializer.java create mode 100644 service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/KafkaEvent.java create mode 100644 service/dev-external-deps/nginx/nginx.conf diff --git a/service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/AsyncHttpClient.java b/service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/AsyncHttpClient.java new file mode 100644 index 0000000..daba0c9 --- /dev/null +++ b/service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/AsyncHttpClient.java @@ -0,0 +1,60 @@ +package it.giovannibaratta; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; + +class AsyncHttpClient extends RichAsyncFunction { + + private Map eventRouting; + private HttpClient client; + + public AsyncHttpClient(Map eventRouting) { + this.eventRouting = eventRouting; + } + + @Override + public void open(Configuration parameters) throws Exception { + client = HttpClient.newHttpClient(); + } + + @Override + public void asyncInvoke(KafkaEvent event, final ResultFuture resultFuture) throws Exception { + + String destination = eventRouting.get(event.topic); + + HttpRequest request = HttpRequest.newBuilder() + .POST(HttpRequest.BodyPublishers.ofString(event.value)) + .uri(URI.create(destination)) + .build(); + + CompletableFuture> responseFuture = client.sendAsync(request, + HttpResponse.BodyHandlers.ofString()); + + CompletableFuture.supplyAsync(new Supplier() { + @Override + public KafkaEvent get() { + try { + HttpResponse result = responseFuture.get(); + System.out.println("Sent request to " + destination + ". Result status code " + result.statusCode()); + return event; + } catch (InterruptedException | ExecutionException e) { + System.out.println("Error while performing request"); + return null; + } + } + }).thenAccept((KafkaEvent value) -> { + resultFuture.complete(Collections.singleton(value)); + }); + } +} diff --git a/service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/EventToHttpConverter.java b/service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/EventToHttpConverter.java index 3e12e1e..b89ee3c 100644 --- a/service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/EventToHttpConverter.java +++ b/service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/EventToHttpConverter.java @@ -1,10 +1,14 @@ package it.giovannibaratta; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -15,21 +19,41 @@ public static void main(String[] args) throws Exception { ParameterTool parameters = ParameterTool.fromArgs(args); String kafkaBootstrapServers = parameters.getRequired("kafkaBootstrapServers"); - String kafkaTopic = parameters.getRequired("kafkaTopic"); + String[] kafkaTopics = parameters.getRequired("kafkaTopics").split(","); + + if (kafkaTopics.length < 1) { + throw new IllegalArgumentException("At least one Kafka topic must be provided"); + } + + Map kafkaTopicDestination = new HashMap(); + + for (String topic : kafkaTopics) { + String destination = parameters.getRequired(topic); + + if(destination == null || destination.isEmpty() { + throw new IllegalArgumentException("Destination for topic " + topic + " must be provided"); + } + + kafkaTopicDestination.put(topic, destination); + } + + AsyncHttpClient httpClient = new AsyncHttpClient(kafkaTopicDestination); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - KafkaSource source = KafkaSource.builder() + KafkaSource source = KafkaSource.builder() .setBootstrapServers(kafkaBootstrapServers) - .setTopics(kafkaTopic) - .setGroupId("flink") - .setStartingOffsets(OffsetsInitializer.earliest()) - .setValueOnlyDeserializer(new SimpleStringSchema()) + .setTopics(kafkaTopics) + .setStartingOffsets(OffsetsInitializer.latest()) + .setDeserializer(new KafkaDeserializer()) .build(); - DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); + DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); + + // The provided transformation does not retry in case of failed requests + AsyncDataStream.unorderedWait(stream, httpClient, 1000, TimeUnit.MILLISECONDS, 100); - // Print each event to the console + // Print messages for debugging stream.print(); env.execute("Kafka events to HTTP requests"); diff --git a/service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/KafkaDeserializer.java b/service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/KafkaDeserializer.java new file mode 100644 index 0000000..5972b64 --- /dev/null +++ b/service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/KafkaDeserializer.java @@ -0,0 +1,25 @@ +package it.giovannibaratta; + +import java.nio.charset.StandardCharsets; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class KafkaDeserializer implements + KafkaRecordDeserializationSchema { + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(KafkaEvent.class); + } + + @Override + // Given a Kafka record, extract the topic and the value and build a KafkaEvent + public void deserialize(ConsumerRecord record, org.apache.flink.util.Collector out) { + String topic = record.topic(); + String value = new String(record.value(), StandardCharsets.UTF_8); + KafkaEvent event = new KafkaEvent(topic, value); + out.collect(event); + } +} diff --git a/service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/KafkaEvent.java b/service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/KafkaEvent.java new file mode 100644 index 0000000..5a50f4d --- /dev/null +++ b/service/dev-external-deps/apache-flink/app/src/main/java/it/giovannibaratta/KafkaEvent.java @@ -0,0 +1,20 @@ +package it.giovannibaratta; + +public class KafkaEvent { + + public final String topic; + public final String value; + + public KafkaEvent(String topic, String value) { + this.topic = topic; + this.value = value; + } + + @Override + public String toString() { + return "KafkaEvent{" + + "topic='" + topic + '\'' + + ", value='" + value + '\'' + + '}'; + } +} \ No newline at end of file diff --git a/service/dev-external-deps/docker-compose.yaml b/service/dev-external-deps/docker-compose.yaml index e2ce3f5..4796fd8 100644 --- a/service/dev-external-deps/docker-compose.yaml +++ b/service/dev-external-deps/docker-compose.yaml @@ -73,8 +73,17 @@ services: # Custom properties required by the job - --kafkaBootstrapServers - "message-broker:9092" - - --kafkaTopic + # If you need to add a new topic, add the topic name separated by a comma from the previous one + # and add two extra variables using the following schema + # - -- + # - "" + # Example: + # - --my-custom-topic + # - "http://example.local:8080/my-custom-topic" + - --kafkaTopics - "run-status-changed" + - --run-status-changed + - "http://backend-proxy:9000/runs/status-changed" environment: - | FLINK_PROPERTIES= @@ -111,6 +120,17 @@ services: - default - host-network + backend-proxy: + image: nginx + container_name: backend-proxy + volumes: + - type: bind + source: ./nginx/nginx.conf + target: /etc/nginx/conf.d/default.conf + read_only: true + ports: + - 9000:9000 + networks: host-network: # Pre-existing network created during docker installation diff --git a/service/dev-external-deps/nginx/nginx.conf b/service/dev-external-deps/nginx/nginx.conf new file mode 100644 index 0000000..cbdc4ed --- /dev/null +++ b/service/dev-external-deps/nginx/nginx.conf @@ -0,0 +1,9 @@ +server { + listen 9000; + server_name backend-proxy; + location / { +# host.docker.internal points to the host machine from within a docker container + proxy_pass http://host.docker.internal:3000; + proxy_http_version 1.1; + } +} \ No newline at end of file diff --git a/service/package.json b/service/package.json index 2bd420a..9ff9bd4 100644 --- a/service/package.json +++ b/service/package.json @@ -5,6 +5,7 @@ "deps:start": "docker-compose -f dev-external-deps/docker-compose.yaml up -d", "deps:stop": "docker-compose -f dev-external-deps/docker-compose.yaml stop", "deps:down": "docker-compose -f dev-external-deps/docker-compose.yaml down", + "deps:rebuild": "docker-compose -f dev-external-deps/docker-compose.yaml up -d --build", "deps:update-schema": "yarn deps:start && yarn prisma:pull && yarn prisma:generate", "db:migrate": "yarn deps:start && yarn liquibase:update", "liquibase:update": "yarn liquibase:update:dev && yarn liquibase:update:test",