Skip to content

Commit

Permalink
Add support for Kafka Connect
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Jun 7, 2023
1 parent fe766f1 commit e3a718b
Show file tree
Hide file tree
Showing 27 changed files with 754 additions and 22 deletions.
2 changes: 2 additions & 0 deletions deploy/kafkaconnectors.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ spec:
connectorConfig:
description: Connector configuration.
type: object
additionalProperties:
type: string
required:
- connectorName
- clusterBaseUrl
Expand Down
2 changes: 1 addition & 1 deletion deploy/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
name: hoptimator-controller
rules:
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["sqljobs", "kafkatopics", "subscriptions"]
resources: ["sqljobs", "kafkatopics", "kafkaconnectors", "subscriptions"]
verbs: ["get", "watch", "list", "update", "create"]
- apiGroups: ["flink.apache.org"]
resources: ["flinkdeployments"]
Expand Down
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ flinkConnectorKafka = "org.apache.flink:flink-sql-connector-kafka:1.17.0"
flinkConnectorMySqlCdc = "com.ververica:flink-sql-connector-mysql-cdc:2.3.0"
gson = "com.google.code.gson:gson:2.9.0"
jackson = "com.fasterxml.jackson.core:jackson-core:2.14.1"
jacksonDatabind = "com.fasterxml.jackson.core:jackson-databind:2.14.1"
jacksonYaml = "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.14.1"
javaxAnnotationApi = "javax.annotation:javax.annotation-api:1.3.2"
junit = "junit:junit:4.12"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ public boolean echoToFile() {
}
}

<<<<<<< HEAD
private class TestCommandHandler implements CommandHandler {

@Override
Expand Down
1 change: 1 addition & 0 deletions hoptimator-kafka-adapter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies {
implementation libs.kafkaClients
implementation libs.kubernetesClient
implementation libs.kubernetesExtendedClient
implementation libs.jacksonDatabind

testImplementation libs.junit
testImplementation libs.assertj
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import okhttp3.RequestBody;
import okhttp3.Response;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,6 +32,8 @@ public class KafkaConnectorReconciler implements Reconciler {
private final static Logger log = LoggerFactory.getLogger(KafkaConnectorReconciler.class);
private final static String KAFKACONNECTOR = "hoptimator.linkedin.com/v1alpha1/KafkaConnector";
private final static MediaType JSON = MediaType.get("application/json; charset=utf-8");
private final ObjectMapper objectMapper = new ObjectMapper();
private final OkHttpClient httpClient = new OkHttpClient.Builder().build();

private final Operator operator;

Expand All @@ -38,10 +42,10 @@ public KafkaConnectorReconciler(Operator operator) {
}

private int putConfig(String url, Map<String, String> config) throws IOException {
String json = operator.objectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(config);
String json = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(config);
RequestBody body = RequestBody.create(json, JSON);
okhttp3.Request request = new okhttp3.Request.Builder().url(url).put(body).build();
try (Response response = operator.httpClient().newCall(request).execute()) {
try (Response response = httpClient.newCall(request).execute()) {
log.info("Response: {}.", response.body().string());
return response.code();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
import com.linkedin.hoptimator.operator.Operator;
import com.linkedin.hoptimator.models.V1alpha1KafkaTopic;
import com.linkedin.hoptimator.models.V1alpha1KafkaTopicList;
import com.linkedin.hoptimator.models.V1alpha1KafkaConnector;
import com.linkedin.hoptimator.models.V1alpha1KafkaConnectorList;

import io.kubernetes.client.extended.controller.Controller;
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
import io.kubernetes.client.extended.controller.reconciler.Reconciler;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

Expand All @@ -20,14 +23,25 @@ public Collection<Controller> controllers(Operator operator) {
operator.registerApi("KafkaTopic", "kafkatopic", "kafkatopics", "hoptimator.linkedin.com",
"v1alpha1", V1alpha1KafkaTopic.class, V1alpha1KafkaTopicList.class);

Reconciler reconciler = new KafkaTopicReconciler(operator);
Controller controller = ControllerBuilder.defaultBuilder(operator.informerFactory())
.withReconciler(reconciler)
operator.registerApi("KafkaConnector", "kafkaconnector", "kafkaconnectors", "hoptimator.linkedin.com",
"v1alpha1", V1alpha1KafkaConnector.class, V1alpha1KafkaConnectorList.class);

Reconciler topicReconciler = new KafkaTopicReconciler(operator);
Controller topicController = ControllerBuilder.defaultBuilder(operator.informerFactory())
.withReconciler(topicReconciler)
.withName("kafka-topic-controller")
.withWorkerCount(1)
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1KafkaTopic.class, x).build())
.build();

return Collections.singleton(controller);
Reconciler connectorReconciler = new KafkaConnectorReconciler(operator);
Controller connectorController = ControllerBuilder.defaultBuilder(operator.informerFactory())
.withReconciler(connectorReconciler)
.withName("kafka-connector-controller")
.withWorkerCount(1)
.watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1KafkaConnector.class, x).build())
.build();

return Arrays.asList(new Controller[]{topicController, connectorController});
}
}
1 change: 1 addition & 0 deletions hoptimator-models/generate-models.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ docker run \
ghcr.io/kubernetes-client/java/crd-model-gen:v1.0.6 \
/generate.sh -o "$(pwd)/hoptimator-models" -n "" -p "com.linkedin.hoptimator" \
-u "$(pwd)/deploy/kafkatopics.crd.yaml" \
-u "$(pwd)/deploy/kafkaconnectors.crd.yaml" \
-u "$(pwd)/deploy/subscriptions.crd.yaml" \
-u "$(pwd)/deploy/sqljobs.crd.yaml" \
&& echo "done."
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Kubernetes
* No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
*
* The version of the OpenAPI document: v1.21.1
*
*
* NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
* https://openapi-generator.tech
* Do not edit the class manually.
*/


package com.linkedin.hoptimator.models;

import java.util.Objects;
import java.util.Arrays;
import com.google.gson.TypeAdapter;
import com.google.gson.annotations.JsonAdapter;
import com.google.gson.annotations.SerializedName;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import com.linkedin.hoptimator.models.V1alpha1KafkaConnectorSpec;
import com.linkedin.hoptimator.models.V1alpha1KafkaConnectorStatus;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.IOException;

/**
* Kafka Connect connector
*/
@ApiModel(description = "Kafka Connect connector")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-07T04:20:03.484Z[Etc/UTC]")
public class V1alpha1KafkaConnector implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
private String apiVersion;

public static final String SERIALIZED_NAME_KIND = "kind";
@SerializedName(SERIALIZED_NAME_KIND)
private String kind;

public static final String SERIALIZED_NAME_METADATA = "metadata";
@SerializedName(SERIALIZED_NAME_METADATA)
private V1ObjectMeta metadata = null;

public static final String SERIALIZED_NAME_SPEC = "spec";
@SerializedName(SERIALIZED_NAME_SPEC)
private V1alpha1KafkaConnectorSpec spec;

public static final String SERIALIZED_NAME_STATUS = "status";
@SerializedName(SERIALIZED_NAME_STATUS)
private V1alpha1KafkaConnectorStatus status;


public V1alpha1KafkaConnector apiVersion(String apiVersion) {

this.apiVersion = apiVersion;
return this;
}

/**
* APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
* @return apiVersion
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources")

public String getApiVersion() {
return apiVersion;
}


public void setApiVersion(String apiVersion) {
this.apiVersion = apiVersion;
}


public V1alpha1KafkaConnector kind(String kind) {

this.kind = kind;
return this;
}

/**
* Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
* @return kind
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds")

public String getKind() {
return kind;
}


public void setKind(String kind) {
this.kind = kind;
}


public V1alpha1KafkaConnector metadata(V1ObjectMeta metadata) {

this.metadata = metadata;
return this;
}

/**
* Get metadata
* @return metadata
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "")

public V1ObjectMeta getMetadata() {
return metadata;
}


public void setMetadata(V1ObjectMeta metadata) {
this.metadata = metadata;
}


public V1alpha1KafkaConnector spec(V1alpha1KafkaConnectorSpec spec) {

this.spec = spec;
return this;
}

/**
* Get spec
* @return spec
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "")

public V1alpha1KafkaConnectorSpec getSpec() {
return spec;
}


public void setSpec(V1alpha1KafkaConnectorSpec spec) {
this.spec = spec;
}


public V1alpha1KafkaConnector status(V1alpha1KafkaConnectorStatus status) {

this.status = status;
return this;
}

/**
* Get status
* @return status
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "")

public V1alpha1KafkaConnectorStatus getStatus() {
return status;
}


public void setStatus(V1alpha1KafkaConnectorStatus status) {
this.status = status;
}


@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
V1alpha1KafkaConnector v1alpha1KafkaConnector = (V1alpha1KafkaConnector) o;
return Objects.equals(this.apiVersion, v1alpha1KafkaConnector.apiVersion) &&
Objects.equals(this.kind, v1alpha1KafkaConnector.kind) &&
Objects.equals(this.metadata, v1alpha1KafkaConnector.metadata) &&
Objects.equals(this.spec, v1alpha1KafkaConnector.spec) &&
Objects.equals(this.status, v1alpha1KafkaConnector.status);
}

@Override
public int hashCode() {
return Objects.hash(apiVersion, kind, metadata, spec, status);
}


@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class V1alpha1KafkaConnector {\n");
sb.append(" apiVersion: ").append(toIndentedString(apiVersion)).append("\n");
sb.append(" kind: ").append(toIndentedString(kind)).append("\n");
sb.append(" metadata: ").append(toIndentedString(metadata)).append("\n");
sb.append(" spec: ").append(toIndentedString(spec)).append("\n");
sb.append(" status: ").append(toIndentedString(status)).append("\n");
sb.append("}");
return sb.toString();
}

/**
* Convert the given object to string with each line indented by 4 spaces
* (except the first line).
*/
private String toIndentedString(Object o) {
if (o == null) {
return "null";
}
return o.toString().replace("\n", "\n ");
}

}

Loading

0 comments on commit e3a718b

Please sign in to comment.