Skip to content

Commit

Permalink
AB#154 feat: Add Apache Flink job to print Kafka events to console
Browse files Browse the repository at this point in the history
  • Loading branch information
giovannibaratta committed Mar 10, 2024
1 parent 94404b7 commit 7be0999
Show file tree
Hide file tree
Showing 17 changed files with 538 additions and 2 deletions.
9 changes: 7 additions & 2 deletions .devcontainer/install_dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ lefthook_dependencies=(

liquibase_dependencies=(
liquibase=4.25.1
openjdk-8-jre
openjdk-17-jre
)

apache_flink_dependencies=(
openjdk-17-jdk
)

rm /tmp/installation-completed 2> /dev/null || true
Expand All @@ -41,7 +45,8 @@ sudo apt-get install -y \
"${jekyll_dependencies[@]}" \
"${parallel_dependencies[@]}" \
"${lefthook_dependencies[@]}" \
"${liquibase_dependencies[@]}"
"${liquibase_dependencies[@]}" \
"${apache_flink_dependencies[@]}"

# Notify other scripts that all the dependencies are installed
touch /tmp/installation-completed
2 changes: 2 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
"jest.jestCommandLine": "yarn test",
"jest.rootPath": "core",

"java.jdt.ls.java.home": "/usr/lib/jvm/java-17-openjdk-amd64/",

"terminal.integrated.copyOnSelection": true,
"terminal.integrated.cursorBlinking": true
}
5 changes: 5 additions & 0 deletions service/dev-external-deps/apache-flink/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.gradle
gradlew.bat
README.md
app/build
app/bin
9 changes: 9 additions & 0 deletions service/dev-external-deps/apache-flink/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# https://help.github.com/articles/dealing-with-line-endings/
#
# Linux start script should use lf
/gradlew text eol=lf

# These are Windows script files and should use crlf
*.bat text eol=crlf

5 changes: 5 additions & 0 deletions service/dev-external-deps/apache-flink/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Ignore Gradle project-specific cache directory
.gradle

# Ignore Gradle build output directory
build
27 changes: 27 additions & 0 deletions service/dev-external-deps/apache-flink/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
FROM amazoncorretto:17 as builder

WORKDIR /workdir

COPY gradle gradle
COPY gradlew settings.gradle.kts ./

# Force gradle initialization to download gradle binaries and cache the layer
RUN ./gradlew --version

COPY app app

RUN ./gradlew jar


FROM flink:1.18.1-scala_2.12-java17

# Download the Kafka dependencies
RUN wget \
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.1.0-1.18/flink-connector-kafka-3.1.0-1.18.jar \
-O /opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar

RUN wget \
https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.1.0/kafka-clients-3.1.0.jar \
-O /opt/flink/lib/kafka-clients-3.1.0.jar

COPY --from=builder /workdir/app/build/libs/app.jar /opt/flink/usrlib/EventToHttpConverter.jar
3 changes: 3 additions & 0 deletions service/dev-external-deps/apache-flink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# EventToHttpConverted

This is a simple Flink job that reads events from a Kafka topic and sends them to a HTTP endpoint.
2 changes: 2 additions & 0 deletions service/dev-external-deps/apache-flink/app/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
build
bin
35 changes: 35 additions & 0 deletions service/dev-external-deps/apache-flink/app/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
plugins {
application
}

repositories {
mavenCentral()
}

dependencies {
implementation(libs.guava)
compileOnly("org.apache.flink:flink-core:1.18.1")
compileOnly("org.apache.flink:flink-streaming-java:1.18.1")
implementation("org.apache.flink:flink-connector-kafka:3.1.0-1.18")
implementation("org.apache.flink:flink-connector-base:1.18.1")
implementation("org.apache.kafka:kafka_2.13:3.1.0")
implementation("org.apache.kafka:kafka-clients:3.1.0")
}

java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}

application {
mainClass = "it.giovannibaratta.EventToHttpConverter"
}

tasks.jar {
manifest {
attributes(
"Main-Class" to "it.giovannibaratta.EventToHttpConverter"
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package it.giovannibaratta;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class EventToHttpConverter {

public static void main(String[] args) throws Exception {

ParameterTool parameters = ParameterTool.fromArgs(args);

String kafkaBootstrapServers = parameters.getRequired("kafkaBootstrapServers");
String kafkaTopic = parameters.getRequired("kafkaTopic");

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(kafkaBootstrapServers)
.setTopics(kafkaTopic)
.setGroupId("flink")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

// Print each event to the console
stream.print();

env.execute("Kafka events to HTTP requests");
}
}
10 changes: 10 additions & 0 deletions service/dev-external-deps/apache-flink/gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# This file was generated by the Gradle 'init' task.
# https://docs.gradle.org/current/userguide/platforms.html#sub::toml-dependencies-format

[versions]
guava = "32.1.3-jre"
junit-jupiter = "5.10.0"

[libraries]
guava = { module = "com.google.guava:guava", version.ref = "guava" }
junit-jupiter = { module = "org.junit.jupiter:junit-jupiter", version.ref = "junit-jupiter" }
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.6-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
org.gradle.caching=true
Loading

0 comments on commit 7be0999

Please sign in to comment.