Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add support for Kafka Connect #28

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions deploy/dev/connect-cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: kafka
spec:
image: hoptimator-connect-runtime
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
group.id: my-connect-cluster
offset.storage.topic: my-connect-cluster-offsets
config.storage.topic: my-connect-cluster-configs
status.storage.topic: my-connect-cluster-status
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "2"
memory: 2Gi

4 changes: 4 additions & 0 deletions deploy/dev/flink.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: flink
60 changes: 60 additions & 0 deletions deploy/kafkaconnectors.crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: kafkaconnectors.hoptimator.linkedin.com
spec:
group: hoptimator.linkedin.com
names:
kind: KafkaConnector
listKind: KafkaConnectorList
plural: kafkaconnectors
singular: kafkaconnector
shortNames:
- kc
preserveUnknownFields: false
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
description: Kafka Connect connector
type: object
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
description: Desired Kafka connector.
type: object
properties:
connectorName:
description: The connector name.
type: string
clusterBaseUrl:
description: URL of the Connect cluster.
type: string
connectorConfig:
description: Connector configuration.
type: object
additionalProperties:
type: string
required:
- connectorName
- clusterBaseUrl
status:
description: Current state of the connector.
type: object
properties:
ready:
description: Whether the requested connector has been created.
type: boolean
message:
description: Error or success message, for information only.
type: string
subresources:
status: {}
2 changes: 1 addition & 1 deletion deploy/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
name: hoptimator-controller
rules:
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["sqljobs", "kafkatopics", "subscriptions"]
resources: ["sqljobs", "kafkatopics", "kafkaconnectors", "subscriptions"]
verbs: ["get", "watch", "list", "update", "create"]
- apiGroups: ["flink.apache.org"]
resources: ["flinkdeployments"]
Expand Down
14 changes: 14 additions & 0 deletions deploy/samples/kafkaconnectors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: KafkaConnector
metadata:
name: heartbeat-connector
spec:
connectorName: heartbeat
clusterBaseUrl: http://my-connect-cluster-connect-api.kafka.svc:8083
connectorConfig:
connector.class: "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"
heartbeats.topic.replication.factor: "1"
source.cluster.alias: "source"
target.cluster.alias: "target"
source.cluster.bootstrap.servers: my-cluster-kafka-bootstrap.kafka.svc:9092
target.cluster.bootstrap.servers: my-cluster-kafka-bootstrap.kafka.svc:9092
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ flinkConnectorKafka = "org.apache.flink:flink-sql-connector-kafka:1.17.0"
flinkConnectorMySqlCdc = "com.ververica:flink-sql-connector-mysql-cdc:2.3.0"
gson = "com.google.code.gson:gson:2.9.0"
jackson = "com.fasterxml.jackson.core:jackson-core:2.14.1"
jacksonDatabind = "com.fasterxml.jackson.core:jackson-databind:2.14.1"
jacksonYaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.14.1"
javaxAnnotationApi = "javax.annotation:javax.annotation-api:1.3.2"
junit = "junit:junit:4.12"
Expand Down
4 changes: 4 additions & 0 deletions hoptimator-connect-runtime/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY ./build/libs/hoptimator-connect-runtime-all.jar /opt/kafka/plugins/
USER 1001
25 changes: 25 additions & 0 deletions hoptimator-connect-runtime/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
plugins {
id 'com.github.johnrengelman.shadow' version '8.1.1'
id 'java'
id 'application'
id 'idea'
}

tasks.withType(JavaCompile) {
options.release = 8
}

dependencies {
implementation libs.debeziumConnectorMysql
}

idea {
module {
downloadJavadoc = true
downloadSources = true
}
}

shadowJar {
mainClassName = 'no-main-class'
}
1 change: 1 addition & 0 deletions hoptimator-kafka-adapter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies {
implementation libs.kafkaClients
implementation libs.kubernetesClient
implementation libs.kubernetesExtendedClient
implementation libs.jacksonDatabind

testImplementation libs.junit
testImplementation libs.assertj
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.linkedin.hoptimator.operator.kafka;

import com.linkedin.hoptimator.operator.Operator;
import com.linkedin.hoptimator.models.V1alpha1KafkaConnector;

import io.kubernetes.client.extended.controller.Controller;
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
import io.kubernetes.client.extended.controller.reconciler.Reconciler;
import io.kubernetes.client.extended.controller.reconciler.Request;
import io.kubernetes.client.extended.controller.reconciler.Result;

import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
import okhttp3.Response;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaConnectorReconciler implements Reconciler {
private final static Logger log = LoggerFactory.getLogger(KafkaConnectorReconciler.class);
private final static String KAFKACONNECTOR = "hoptimator.linkedin.com/v1alpha1/KafkaConnector";
private final static MediaType JSON = MediaType.get("application/json; charset=utf-8");
private final ObjectMapper objectMapper = new ObjectMapper();
private final OkHttpClient httpClient = new OkHttpClient.Builder().build();

private final Operator operator;

public KafkaConnectorReconciler(Operator operator) {
this.operator = operator;
}

private int putConfig(String url, Map<String, String> config) throws IOException {
String json = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(config);
RequestBody body = RequestBody.create(json, JSON);
okhttp3.Request request = new okhttp3.Request.Builder().url(url).put(body).build();
try (Response response = httpClient.newCall(request).execute()) {
log.info("Response: {}.", response.body().string());
return response.code();
}
}

@Override
public Result reconcile(Request request) {
log.info("Reconciling request {}", request);
String name = request.getName();
String namespace = request.getNamespace();

try {
V1alpha1KafkaConnector object = operator.<V1alpha1KafkaConnector>fetch(KAFKACONNECTOR, namespace, name);

if (object == null) {
log.info("Object {}/{} deleted. Skipping.", namespace, name);
return new Result(false);
}

String connectorName = object.getSpec().getConnectorName();
String baseUrl = object.getSpec().getClusterBaseUrl();
String url = baseUrl + "/connectors/" + connectorName + "/config";
Map<String, String> connectorConfig = object.getSpec().getConnectorConfig();

int status = putConfig(url, connectorConfig);
switch(status) {
case 201:
log.info("Created new connector {} with config {}.", connectorName, connectorConfig);
break;
case 200:
log.info("Updated existing connector {} with config {}.", connectorName, connectorConfig);
break;
default:
log.error("{} PUT {}.", status, url);
return new Result(true, operator.failureRetryDuration());
}
} catch (Exception e) {
log.error("Encountered exception while reconciling KafkaConnector {}/{}", namespace, name, e);
return new Result(true, operator.failureRetryDuration());
}
log.info("Done reconciling {}/{}", namespace, name);
return new Result(false);
}

public static Controller controller(Operator operator) {
Reconciler reconciler = new KafkaConnectorReconciler(operator);
return ControllerBuilder.defaultBuilder(operator.informerFactory())
.withReconciler(reconciler)
.withName("kafka-connector-controller")
.withWorkerCount(1)
//.withReadyFunc(resourceInformer::hasSynced) // optional, only starts controller when the
// cache has synced up
//.withWorkQueue(resourceWorkQueue)
//.watch()
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1KafkaConnector.class, x).build())
.build();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
import com.linkedin.hoptimator.operator.Operator;
import com.linkedin.hoptimator.models.V1alpha1KafkaTopic;
import com.linkedin.hoptimator.models.V1alpha1KafkaTopicList;
import com.linkedin.hoptimator.models.V1alpha1KafkaConnector;
import com.linkedin.hoptimator.models.V1alpha1KafkaConnectorList;

import io.kubernetes.client.extended.controller.Controller;
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
import io.kubernetes.client.extended.controller.reconciler.Reconciler;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

Expand All @@ -20,14 +23,25 @@ public Collection<Controller> controllers(Operator operator) {
operator.registerApi("KafkaTopic", "kafkatopic", "kafkatopics", "hoptimator.linkedin.com",
"v1alpha1", V1alpha1KafkaTopic.class, V1alpha1KafkaTopicList.class);

Reconciler reconciler = new KafkaTopicReconciler(operator);
Controller controller = ControllerBuilder.defaultBuilder(operator.informerFactory())
.withReconciler(reconciler)
operator.registerApi("KafkaConnector", "kafkaconnector", "kafkaconnectors", "hoptimator.linkedin.com",
"v1alpha1", V1alpha1KafkaConnector.class, V1alpha1KafkaConnectorList.class);

Reconciler topicReconciler = new KafkaTopicReconciler(operator);
Controller topicController = ControllerBuilder.defaultBuilder(operator.informerFactory())
.withReconciler(topicReconciler)
.withName("kafka-topic-controller")
.withWorkerCount(1)
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1KafkaTopic.class, x).build())
.build();

return Collections.singleton(controller);
Reconciler connectorReconciler = new KafkaConnectorReconciler(operator);
Controller connectorController = ControllerBuilder.defaultBuilder(operator.informerFactory())
.withReconciler(connectorReconciler)
.withName("kafka-connector-controller")
.withWorkerCount(1)
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1KafkaConnector.class, x).build())
.build();

return Arrays.asList(new Controller[]{topicController, connectorController});
}
}
1 change: 1 addition & 0 deletions hoptimator-models/generate-models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ docker run \
ghcr.io/kubernetes-client/java/crd-model-gen:v1.0.6 \
/generate.sh -o "$(pwd)/hoptimator-models" -n "" -p "com.linkedin.hoptimator" \
-u "$(pwd)/deploy/kafkatopics.crd.yaml" \
-u "$(pwd)/deploy/kafkaconnectors.crd.yaml" \
-u "$(pwd)/deploy/subscriptions.crd.yaml" \
-u "$(pwd)/deploy/sqljobs.crd.yaml" \
&& echo "done."
Loading