Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Jun 7, 2023
1 parent 0a5a4ee commit fe766f1
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 0 deletions.
29 changes: 29 additions & 0 deletions deploy/dev/connect-cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: kafka
spec:
image: hoptimator-connect-runtime
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
group.id: my-connect-cluster
offset.storage.topic: my-connect-cluster-offsets
config.storage.topic: my-connect-cluster-configs
status.storage.topic: my-connect-cluster-status
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
resources:
requests:
cpu: "1"
memory: 2Gi
limits:
cpu: "2"
memory: 2Gi

4 changes: 4 additions & 0 deletions deploy/dev/flink.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: v1
kind: ServiceAccount
metadata:
name: flink
58 changes: 58 additions & 0 deletions deploy/kafkaconnectors.crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: kafkaconnectors.hoptimator.linkedin.com
spec:
group: hoptimator.linkedin.com
names:
kind: KafkaConnector
listKind: KafkaConnectorList
plural: kafkaconnectors
singular: kafkaconnector
shortNames:
- kc
preserveUnknownFields: false
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
description: Kafka Connect connector
type: object
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
description: Desired Kafka connector.
type: object
properties:
connectorName:
description: The connector name.
type: string
clusterBaseUrl:
description: URL of the Connect cluster.
type: string
connectorConfig:
description: Connector configuration.
type: object
required:
- connectorName
- clusterBaseUrl
status:
description: Current state of the connector.
type: object
properties:
ready:
description: Whether the requested connector has been created.
type: boolean
message:
description: Error or success message, for information only.
type: string
subresources:
status: {}
14 changes: 14 additions & 0 deletions deploy/samples/kafkaconnectors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: KafkaConnector
metadata:
name: heartbeat-connector
spec:
connectorName: heartbeat
clusterBaseUrl: http://my-connect-cluster-connect-api.kafka.svc:8083
connectorConfig:
connector.class: "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"
heartbeats.topic.replication.factor: "1"
source.cluster.alias: "source"
target.cluster.alias: "target"
source.cluster.bootstrap.servers: my-cluster-kafka-bootstrap.kafka.svc:9092
target.cluster.bootstrap.servers: my-cluster-kafka-bootstrap.kafka.svc:9092
4 changes: 4 additions & 0 deletions hoptimator-connect-runtime/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
USER root:root
COPY ./build/libs/hoptimator-connect-runtime-all.jar /opt/kafka/plugins/
USER 1001
25 changes: 25 additions & 0 deletions hoptimator-connect-runtime/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
plugins {
id 'com.github.johnrengelman.shadow' version '8.1.1'
id 'java'
id 'application'
id 'idea'
}

tasks.withType(JavaCompile) {
options.release = 8
}

dependencies {
implementation libs.debeziumConnectorMysql
}

idea {
module {
downloadJavadoc = true
downloadSources = true
}
}

shadowJar {
mainClassName = 'no-main-class'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.linkedin.hoptimator.operator.kafka;

import com.linkedin.hoptimator.operator.Operator;
import com.linkedin.hoptimator.models.V1alpha1KafkaConnector;

import io.kubernetes.client.extended.controller.Controller;
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
import io.kubernetes.client.extended.controller.reconciler.Reconciler;
import io.kubernetes.client.extended.controller.reconciler.Request;
import io.kubernetes.client.extended.controller.reconciler.Result;

import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
import okhttp3.Response;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaConnectorReconciler implements Reconciler {
private final static Logger log = LoggerFactory.getLogger(KafkaConnectorReconciler.class);
private final static String KAFKACONNECTOR = "hoptimator.linkedin.com/v1alpha1/KafkaConnector";
private final static MediaType JSON = MediaType.get("application/json; charset=utf-8");

private final Operator operator;

public KafkaConnectorReconciler(Operator operator) {
this.operator = operator;
}

private int putConfig(String url, Map<String, String> config) throws IOException {
String json = operator.objectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(config);
RequestBody body = RequestBody.create(json, JSON);
okhttp3.Request request = new okhttp3.Request.Builder().url(url).put(body).build();
try (Response response = operator.httpClient().newCall(request).execute()) {
log.info("Response: {}.", response.body().string());
return response.code();
}
}

@Override
public Result reconcile(Request request) {
log.info("Reconciling request {}", request);
String name = request.getName();
String namespace = request.getNamespace();

try {
V1alpha1KafkaConnector object = operator.<V1alpha1KafkaConnector>fetch(KAFKACONNECTOR, namespace, name);

if (object == null) {
log.info("Object {}/{} deleted. Skipping.", namespace, name);
return new Result(false);
}

String connectorName = object.getSpec().getConnectorName();
String baseUrl = object.getSpec().getClusterBaseUrl();
String url = baseUrl + "/connectors/" + connectorName + "/config";
Map<String, String> connectorConfig = object.getSpec().getConnectorConfig();

int status = putConfig(url, connectorConfig);
switch(status) {
case 201:
log.info("Created new connector {} with config {}.", connectorName, connectorConfig);
break;
case 200:
log.info("Updated existing connector {} with config {}.", connectorName, connectorConfig);
break;
default:
log.error("{} PUT {}.", status, url);
return new Result(true, operator.failureRetryDuration());
}
} catch (Exception e) {
log.error("Encountered exception while reconciling KafkaConnector {}/{}", namespace, name, e);
return new Result(true, operator.failureRetryDuration());
}
log.info("Done reconciling {}/{}", namespace, name);
return new Result(false);
}

public static Controller controller(Operator operator) {
Reconciler reconciler = new KafkaConnectorReconciler(operator);
return ControllerBuilder.defaultBuilder(operator.informerFactory())
.withReconciler(reconciler)
.withName("kafka-connector-controller")
.withWorkerCount(1)
//.withReadyFunc(resourceInformer::hasSynced) // optional, only starts controller when the
// cache has synced up
//.withWorkQueue(resourceWorkQueue)
//.watch()
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1KafkaConnector.class, x).build())
.build();
}
}

Binary file not shown.

0 comments on commit fe766f1

Please sign in to comment.