Skip to content

Commit

Permalink
Modify some functions
Browse files Browse the repository at this point in the history
Signed-off-by: obaydullahmhs <[email protected]>
  • Loading branch information
obaydullahmhs committed Dec 21, 2023
1 parent dd488a2 commit 06c468e
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 13 deletions.
11 changes: 5 additions & 6 deletions kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type MessageMetadata struct {
}

func (c *Client) IsDBConnected() (bool, error) {
// TODO: try using refreshcontroller
controller, err := c.RefreshController()
if err != nil || controller == nil {
klog.Error(err, "Failed to Get kafka controller")
Expand All @@ -61,9 +60,9 @@ func (c *Client) IsDBConnected() (bool, error) {
return false, err
}
if connected {
klog.Info(fmt.Sprintf("Successfully connected broker: %s", controller.Addr()))
klog.V(5).Info(fmt.Sprintf("Successfully connected broker: %s", controller.Addr()))
} else {
klog.Info(fmt.Sprintf("Failed to connect broker: %s", controller.Addr()))
klog.Error(fmt.Sprintf("Failed to connect broker: %s", controller.Addr()))
}

return connected, nil
Expand Down Expand Up @@ -121,7 +120,7 @@ func (a *AdminClient) EnsureKafkaTopic(topic string, topicConfig map[string]*str
}
return nil
}
func (c *Client) DeleteTopic(topics ...string) {
func (c *Client) DeleteTopics(topics ...string) {
broker, err := c.RefreshController()
if err != nil {
klog.Error(err, "Failed to refresh controller for kafka-health topic")
Expand Down Expand Up @@ -164,7 +163,7 @@ func (p *ProducerClient) SendMessageWithProducer(partition int32, topic, key, me
return &msgMetadata, nil
}

func (c ConsumerClient) ConsumeMessages(partition int32, topic string, offset int64, signal *chan bool, message *chan MessageMetadata) error {
func (c *ConsumerClient) ConsumeMessages(partition int32, topic string, offset int64, signal *chan bool, message *chan MessageMetadata) error {
var err error
var partitionConsumer kafkago.PartitionConsumer
partitionConsumer, err = c.ConsumePartition(topic, partition, offset)
Expand All @@ -179,7 +178,7 @@ func (c ConsumerClient) ConsumeMessages(partition int32, topic string, offset in
case <-*signal:
return nil
case err := <-partitionConsumer.Errors():
klog.Info(fmt.Sprintf("could not process message, err: %s", err.Error()))
klog.Error(fmt.Sprintf("could not process message, err: %s", err.Error()))
return err
case msg := <-partitionConsumer.Messages():
msgMetadata := MessageMetadata{
Expand Down
10 changes: 3 additions & 7 deletions kafka/kubedb_client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (o *KubeDBClientBuilder) GetConfig() (*kafkago.Config, error) {
clientConfig := kafkago.NewConfig()
if !o.db.Spec.DisableSecurity {
if o.db.Spec.AuthSecret == nil {
klog.Info("Authsecret not set")
klog.Info("Auth-secret not set")
return nil, errors.New("auth-secret is not set")
}

Expand All @@ -77,10 +77,10 @@ func (o *KubeDBClientBuilder) GetConfig() (*kafkago.Config, error) {
}, authSecret)
if err != nil {
if kerr.IsNotFound(err) {
klog.Error(err, "Authsecret not found")
klog.Error(err, "Auth-secret not found")
return nil, errors.New("auth-secret is not found")
}
klog.Error(err, "Failed to get authsecret")
klog.Error(err, "Failed to get auth-secret")
return nil, err
}

Expand Down Expand Up @@ -136,7 +136,6 @@ func (o *KubeDBClientBuilder) GetConfig() (*kafkago.Config, error) {
}

func (o *KubeDBClientBuilder) GetKafkaClient() (*Client, error) {

clientConfig, err := o.GetConfig()
if err != nil {
return nil, err
Expand All @@ -155,7 +154,6 @@ func (o *KubeDBClientBuilder) GetKafkaClient() (*Client, error) {
}

func (o *KubeDBClientBuilder) GetKafkaProducerClient() (*ProducerClient, error) {

clientConfig, err := o.GetConfig()
if err != nil {
return nil, err
Expand All @@ -174,7 +172,6 @@ func (o *KubeDBClientBuilder) GetKafkaProducerClient() (*ProducerClient, error)
}

func (o *KubeDBClientBuilder) GetKafkaAdminClient() (*AdminClient, error) {

clientConfig, err := o.GetConfig()
if err != nil {
return nil, err
Expand All @@ -193,7 +190,6 @@ func (o *KubeDBClientBuilder) GetKafkaAdminClient() (*AdminClient, error) {
}

func (o *KubeDBClientBuilder) GetKafkaConsumerClient() (*ConsumerClient, error) {

clientConfig, err := o.GetConfig()
if err != nil {
return nil, err
Expand Down

0 comments on commit 06c468e

Please sign in to comment.