Skip to content

Commit

Permalink
Add Kafka Schema Registry Client (#110)
Browse files Browse the repository at this point in the history
* Add Kafka Schema Registry Client

Signed-off-by: obaydullahmhs <[email protected]>
  • Loading branch information
obaydullahmhs authored May 31, 2024
1 parent 9c578ab commit ed82b00
Show file tree
Hide file tree
Showing 38 changed files with 14,948 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
k8s.io/klog/v2 v2.120.1
kmodules.xyz/client-go v0.29.14
kmodules.xyz/custom-resources v0.29.1
kubedb.dev/apimachinery v0.45.2-0.20240530120824-5e6b27ed36e9
kubedb.dev/apimachinery v0.45.2-0.20240531110354-049490369476
sigs.k8s.io/controller-runtime v0.17.4
xorm.io/xorm v1.3.6
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,8 @@ kmodules.xyz/monitoring-agent-api v0.29.0 h1:gpFl6OZrlMLb/ySMHdREI9EwGtnJ91oZBn9
kmodules.xyz/monitoring-agent-api v0.29.0/go.mod h1:iNbvaMTgVFOI5q2LJtGK91j4Dmjv4ZRiRdasGmWLKQI=
kmodules.xyz/offshoot-api v0.29.2 h1:akXmvkNqFz1n9p1STVs9iP7ODYET0S7BhcYCMXEjK4A=
kmodules.xyz/offshoot-api v0.29.2/go.mod h1:Wv7Xo8wbvznI+8bhaylRFHFjkt30xRDOUOnqV8kOAxM=
kubedb.dev/apimachinery v0.45.2-0.20240530120824-5e6b27ed36e9 h1:uidPG7sU5MT1sgKSgcyhK6H6Slyj+Y1gIqJRavST1UY=
kubedb.dev/apimachinery v0.45.2-0.20240530120824-5e6b27ed36e9/go.mod h1:LBoGqbBHFeIRLv1KtkkmVxjK/g9CmQETVpwszFp9Ly0=
kubedb.dev/apimachinery v0.45.2-0.20240531110354-049490369476 h1:suwRPKSUTPJXBLRopWmgxE4U9S7POo5iMROR4tnlQlU=
kubedb.dev/apimachinery v0.45.2-0.20240531110354-049490369476/go.mod h1:LBoGqbBHFeIRLv1KtkkmVxjK/g9CmQETVpwszFp9Ly0=
kubeops.dev/petset v0.0.5 h1:VVXi39JhjondlbHyZ98z0MLp6VCmiCMinL59K48Y2zA=
kubeops.dev/petset v0.0.5/go.mod h1:ijtKT1HlAht2vBEZj5LW7C00XEs3B0d1VdCQgd5V4cA=
lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=
Expand Down
96 changes: 96 additions & 0 deletions kafka/schemaregistry/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright AppsCode Inc. and Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package schemaregistry

import (
"encoding/json"
"io"
"net/http"

"github.com/go-resty/resty/v2"
"github.com/pkg/errors"
"k8s.io/klog/v2"
)

type Client struct {
*resty.Client
Config *Config
}

type Response struct {
Code int
Header http.Header
Body io.ReadCloser
}

type ResponseBody struct {
Status string `json:"status"`
Checks []Check `json:"checks"`
}

type Check struct {
Name string `json:"name"`
Status string `json:"status"`
Data map[string]interface{} `json:"data,omitempty"`
}

type Config struct {
host string
api string
connectionScheme string
transport *http.Transport
}

func (cc *Client) GetSchemaRegistryHealth() (*Response, error) {
req := cc.Client.R().SetDoNotParseResponse(true)
res, err := req.Get(cc.Config.api)
if err != nil {
klog.Error(err, "Failed to send http request")
return nil, err
}
response := &Response{
Code: res.StatusCode(),
Header: res.Header(),
Body: res.RawBody(),
}

return response, nil
}

// IsSchemaRegistryHealthy parse health response in json from server and
// return overall status of the server
func (cc *Client) IsSchemaRegistryHealthy(response *Response) (bool, error) {
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
err1 := errors.Wrap(err, "failed to parse response body")
if err1 != nil {
return
}
return
}
}(response.Body)

var responseBody ResponseBody
body, _ := io.ReadAll(response.Body)
err := json.Unmarshal(body, &responseBody)
if err != nil {
return false, errors.Wrap(err, "Failed to parse response body")
}

return responseBody.Status == "UP", nil
}
96 changes: 96 additions & 0 deletions kafka/schemaregistry/kubedb_client_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright AppsCode Inc. and Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package schemaregistry

import (
"context"
"fmt"
"net"
"net/http"
"time"

"github.com/go-resty/resty/v2"
kapi "kubedb.dev/apimachinery/apis/kafka/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type KubeDBClientBuilder struct {
kc client.Client
dbSchema *kapi.SchemaRegistry
url string
path string
podName string
ctx context.Context
}

func NewKubeDBClientBuilder(kc client.Client, dbSchema *kapi.SchemaRegistry) *KubeDBClientBuilder {
return &KubeDBClientBuilder{
kc: kc,
dbSchema: dbSchema,
url: GetConnectionURL(dbSchema),
}
}

func (o *KubeDBClientBuilder) WithPod(podName string) *KubeDBClientBuilder {
o.podName = podName
return o
}

func (o *KubeDBClientBuilder) WithURL(url string) *KubeDBClientBuilder {
o.url = url
return o
}

func (o *KubeDBClientBuilder) WithPath(path string) *KubeDBClientBuilder {
o.path = path
return o
}

func (o *KubeDBClientBuilder) WithContext(ctx context.Context) *KubeDBClientBuilder {
o.ctx = ctx
return o
}

func (o *KubeDBClientBuilder) GetSchemaRegistryClient() (*Client, error) {
config := Config{
host: o.url,
api: o.path,
transport: &http.Transport{
IdleConnTimeout: time.Second * 3,
DialContext: (&net.Dialer{
Timeout: time.Second * 30,
}).DialContext,
},
connectionScheme: o.dbSchema.GetConnectionScheme(),
}

newClient := resty.New()
newClient.SetTransport(config.transport).SetScheme(config.connectionScheme).SetBaseURL(config.host)
newClient.SetHeader("Accept", "application/json")
newClient.SetTimeout(time.Second * 30)
newClient.SetDisableWarn(true)

return &Client{
Client: newClient,
Config: &config,
}, nil
}

func GetConnectionURL(registry *kapi.SchemaRegistry) string {
return fmt.Sprintf("%s://%s.%s.svc:%d", registry.GetConnectionScheme(), registry.ServiceName(), registry.Namespace, kapi.ApicurioRegistryRESTPort)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright AppsCode Inc. and Contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
"fmt"

"kubedb.dev/apimachinery/apis"
"kubedb.dev/apimachinery/apis/catalog"
"kubedb.dev/apimachinery/crds"

"kmodules.xyz/client-go/apiextensions"
)

func (_ ClickHouseVersion) CustomResourceDefinition() *apiextensions.CustomResourceDefinition {
return crds.MustCustomResourceDefinition(SchemeGroupVersion.WithResource(ResourcePluralClickHouseVersion))
}

var _ apis.ResourceInfo = &ClickHouseVersion{}

func (r ClickHouseVersion) ResourceFQN() string {
return fmt.Sprintf("%s.%s", ResourcePluralClickHouseVersion, catalog.GroupName)
}

func (r ClickHouseVersion) ResourceShortCode() string {
return ResourceCodeClickHouseVersion
}

func (r ClickHouseVersion) ResourceKind() string {
return ResourceKindClickHouseVersion
}

func (r ClickHouseVersion) ResourceSingular() string {
return ResourceSingularClickHouseVersion
}

func (r ClickHouseVersion) ResourcePlural() string {
return ResourcePluralClickHouseVersion
}

func (r ClickHouseVersion) ValidateSpecs() error {
if r.Spec.Version == "" ||
r.Spec.DB.Image == "" {
return fmt.Errorf(`atleast one of the following specs is not set for ClickHouseVersion "%v":
spec.version,
spec.db.image`, r.Name)
}
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
ResourceCodeClickHouseVersion = "chversion"
ResourceKindClickHouseVersion = "ClickHouseVersion"
ResourceSingularClickHouseVersion = "clickhouseversion"
ResourcePluralClickHouseVersion = "clickhouseversions"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.

// ClickHouseVersion defines a ClickHouse database version.

// +genclient
// +genclient:nonNamespaced
// +genclient:skipVerbs=updateStatus
// +k8s:openapi-gen=true
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// +kubebuilder:object:root=true
// +kubebuilder:resource:path=clickhouseversions,singular=clickhouseversion,scope=Cluster,shortName=chversion,categories={datastore,kubedb,appscode}
// +kubebuilder:printcolumn:name="Version",type="string",JSONPath=".spec.version"
// +kubebuilder:printcolumn:name="DB_IMAGE",type="string",JSONPath=".spec.db.image"
// +kubebuilder:printcolumn:name="Deprecated",type="boolean",JSONPath=".spec.deprecated"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
type ClickHouseVersion struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec ClickHouseVersionSpec `json:"spec,omitempty"`
Status ClickHouseVersionStatus `json:"status,omitempty"`
}

// ClickHouseVersionSpec defines the desired state of ClickHouseVersion
type ClickHouseVersionSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file

// Version
Version string `json:"version"`

// Database Image
DB ClickHouseVersionDatabase `json:"db"`

// Database Image
InitContainer ClickHouseInitContainer `json:"initContainer"`

// SecurityContext is for the additional config for the DB container
// +optional
SecurityContext SecurityContext `json:"securityContext"`
}

// ClickHouseVersionDatabase is the ClickHouse Database image
type ClickHouseVersionDatabase struct {
Image string `json:"image"`
}

// ClickHouseInitContainer is the ClickHouse init Container image
type ClickHouseInitContainer struct {
Image string `json:"image"`
}

// ClickHouseVersionStatus defines the observed state of ClickHouseVersion
type ClickHouseVersionStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// ClickHouseVersionList contains a list of ClickHouseVersion
type ClickHouseVersionList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []ClickHouseVersion `json:"items"`
}
Loading

0 comments on commit ed82b00

Please sign in to comment.