Skip to content

Commit

Permalink
AB#154 feat: add http request forwaring in EventToHttpConverter job
Browse files Browse the repository at this point in the history
  • Loading branch information
giovannibaratta committed Mar 11, 2024
1 parent 7be0999 commit 6bcf141
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package it.giovannibaratta;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

class AsyncHttpClient extends RichAsyncFunction<KafkaEvent, KafkaEvent> {

private Map<String, String> eventRouting;
private HttpClient client;

public AsyncHttpClient(Map<String, String> eventRouting) {
this.eventRouting = eventRouting;
}

@Override
public void open(Configuration parameters) throws Exception {
client = HttpClient.newHttpClient();
}

@Override
public void asyncInvoke(KafkaEvent event, final ResultFuture<KafkaEvent> resultFuture) throws Exception {

String destination = eventRouting.get(event.topic);

HttpRequest request = HttpRequest.newBuilder()
.POST(HttpRequest.BodyPublishers.ofString(event.value))
.uri(URI.create(destination))
.build();

CompletableFuture<HttpResponse<String>> responseFuture = client.sendAsync(request,
HttpResponse.BodyHandlers.ofString());

CompletableFuture.supplyAsync(new Supplier<KafkaEvent>() {
@Override
public KafkaEvent get() {
try {
HttpResponse<String> result = responseFuture.get();
System.out.println("Sent request to " + destination + ". Result status code " + result.statusCode());
return event;
} catch (InterruptedException | ExecutionException e) {
System.out.println("Error while performing request");
return null;
}
}
}).thenAccept((KafkaEvent value) -> {
resultFuture.complete(Collections.singleton(value));
});
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package it.giovannibaratta;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

Expand All @@ -15,21 +19,41 @@ public static void main(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromArgs(args);

String kafkaBootstrapServers = parameters.getRequired("kafkaBootstrapServers");
String kafkaTopic = parameters.getRequired("kafkaTopic");
String[] kafkaTopics = parameters.getRequired("kafkaTopics").split(",");

if (kafkaTopics.length < 1) {
throw new IllegalArgumentException("At least one Kafka topic must be provided");
}

Map<String, String> kafkaTopicDestination = new HashMap<String, String>();

for (String topic : kafkaTopics) {
String destination = parameters.getRequired(topic);

if(destination == null || destination.isEmpty() {
throw new IllegalArgumentException("Destination for topic " + topic + " must be provided");
}

kafkaTopicDestination.put(topic, destination);
}

AsyncHttpClient httpClient = new AsyncHttpClient(kafkaTopicDestination);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource<String> source = KafkaSource.<String>builder()
KafkaSource<KafkaEvent> source = KafkaSource.<KafkaEvent>builder()
.setBootstrapServers(kafkaBootstrapServers)
.setTopics(kafkaTopic)
.setGroupId("flink")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setTopics(kafkaTopics)
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(new KafkaDeserializer())
.build();

DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
DataStream<KafkaEvent> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

// The provided transformation does not retry in case of failed requests
AsyncDataStream.unorderedWait(stream, httpClient, 1000, TimeUnit.MILLISECONDS, 100);

// Print each event to the console
// Print messages for debugging
stream.print();

env.execute("Kafka events to HTTP requests");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package it.giovannibaratta;

import java.nio.charset.StandardCharsets;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class KafkaDeserializer implements
KafkaRecordDeserializationSchema<KafkaEvent> {

@Override
public TypeInformation<KafkaEvent> getProducedType() {
return TypeInformation.of(KafkaEvent.class);
}

@Override
// Given a Kafka record, extract the topic and the value and build a KafkaEvent
public void deserialize(ConsumerRecord<byte[], byte[]> record, org.apache.flink.util.Collector<KafkaEvent> out) {
String topic = record.topic();
String value = new String(record.value(), StandardCharsets.UTF_8);
KafkaEvent event = new KafkaEvent(topic, value);
out.collect(event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package it.giovannibaratta;

public class KafkaEvent {

public final String topic;
public final String value;

public KafkaEvent(String topic, String value) {
this.topic = topic;
this.value = value;
}

@Override
public String toString() {
return "KafkaEvent{" +
"topic='" + topic + '\'' +
", value='" + value + '\'' +
'}';
}
}
22 changes: 21 additions & 1 deletion service/dev-external-deps/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,17 @@ services:
# Custom properties required by the job
- --kafkaBootstrapServers
- "message-broker:9092"
- --kafkaTopic
# If you need to add a new topic, add the topic name separated by a comma from the previous one
# and add two extra variables using the following schema
# - --<topic-name>
# - "<http-endpoint>"
# Example:
# - --my-custom-topic
# - "http://example.local:8080/my-custom-topic"
- --kafkaTopics
- "run-status-changed"
- --run-status-changed
- "http://backend-proxy:9000/runs/status-changed"
environment:
- |
FLINK_PROPERTIES=
Expand Down Expand Up @@ -111,6 +120,17 @@ services:
- default
- host-network

backend-proxy:
image: nginx
container_name: backend-proxy
volumes:
- type: bind
source: ./nginx/nginx.conf
target: /etc/nginx/conf.d/default.conf
read_only: true
ports:
- 9000:9000

networks:
host-network:
# Pre-existing network created during docker installation
Expand Down
9 changes: 9 additions & 0 deletions service/dev-external-deps/nginx/nginx.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
server {
listen 9000;
server_name backend-proxy;
location / {
# host.docker.internal points to the host machine from within a docker container
proxy_pass http://host.docker.internal:3000;
proxy_http_version 1.1;
}
}
1 change: 1 addition & 0 deletions service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"deps:start": "docker-compose -f dev-external-deps/docker-compose.yaml up -d",
"deps:stop": "docker-compose -f dev-external-deps/docker-compose.yaml stop",
"deps:down": "docker-compose -f dev-external-deps/docker-compose.yaml down",
"deps:rebuild": "docker-compose -f dev-external-deps/docker-compose.yaml up -d --build",
"deps:update-schema": "yarn deps:start && yarn prisma:pull && yarn prisma:generate",
"db:migrate": "yarn deps:start && yarn liquibase:update",
"liquibase:update": "yarn liquibase:update:dev && yarn liquibase:update:test",
Expand Down

0 comments on commit 6bcf141

Please sign in to comment.