Skip to content

Commit

Permalink
Add support for updating druid credential dynamically
Browse files Browse the repository at this point in the history
Signed-off-by: Tapajit Chandra Paul <[email protected]>
  • Loading branch information
tapojit047 committed Nov 19, 2024
1 parent e5ed4c5 commit 9670d3b
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 266 deletions.
41 changes: 28 additions & 13 deletions druid/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package druid
import (
"encoding/json"
"fmt"
"net/http"
"time"

druidgo "github.com/grafadruid/go-druid"
"github.com/pkg/errors"
"k8s.io/klog/v2"
health "kmodules.xyz/client-go/tools/healthchecker"
)

type Client struct {
Expand All @@ -44,13 +44,12 @@ const (
DruidHealthCheckDataSource = "kubedb-datasource"
)

func (c *Client) CloseDruidClient(hcs *health.HealthCard) {
func (c *Client) CloseDruidClient() {
err := c.Close()
if err != nil {
klog.Error(err, "Failed to close druid middleManagers client")
return
}
hcs.ClientClosed()
}

func IsDBConnected(druidClients []*Client) (bool, error) {
Expand Down Expand Up @@ -102,7 +101,6 @@ func (c *Client) CheckNodeDiscoveryStatus() (bool, error) {
}

func (c *Client) CheckDataSourceExistence() (bool, error) {
method := "POST"
path := "druid/v2/sql"

data := map[string]interface{}{
Expand All @@ -116,7 +114,7 @@ func (c *Client) CheckDataSourceExistence() (bool, error) {
rawMessage := json.RawMessage(jsonData)

var result []map[string]interface{}
_, err = c.ExecuteRequest(method, path, rawMessage, &result)
_, err = c.ExecuteRequest(http.MethodPost, path, rawMessage, &result)
if err != nil {
klog.Error("Failed to execute request", err)
return false, err
Expand Down Expand Up @@ -193,7 +191,6 @@ func (c *Client) GetData() (string, error) {
}

func (c *Client) runSelectQuery() (string, error) {
method := "POST"
path := "druid/v2/sql"
data := map[string]interface{}{
"query": "SELECT * FROM \"kubedb-datasource\"",
Expand All @@ -206,7 +203,7 @@ func (c *Client) runSelectQuery() (string, error) {
rawMessage := json.RawMessage(jsonData)

var result []map[string]interface{}
_, err = c.ExecuteRequest(method, path, rawMessage, &result)
_, err = c.ExecuteRequest(http.MethodPost, path, rawMessage, &result)
if err != nil {
klog.Error("Failed to execute POST query request", err)
return "", err
Expand All @@ -228,7 +225,6 @@ func (c *Client) updateCoordinatorsWaitBeforeDeletingConfig(value int32) error {
}

func (c *Client) updateCoordinatorDynamicConfig(data map[string]interface{}) error {
method := "POST"
path := "druid/coordinator/v1/config"

jsonData, err := json.Marshal(data)
Expand All @@ -237,7 +233,7 @@ func (c *Client) updateCoordinatorDynamicConfig(data map[string]interface{}) err
}
rawMessage := json.RawMessage(jsonData)

_, err = c.ExecuteRequest(method, path, rawMessage, nil)
_, err = c.ExecuteRequest(http.MethodPost, path, rawMessage, nil)
if err != nil {
klog.Error("Failed to execute coordinator config update request", err)
return err
Expand Down Expand Up @@ -276,11 +272,10 @@ func (c *Client) submitTask(taskType DruidTaskType, dataSource string, data stri
task = GetKillTaskDefinition()
}
rawMessage := json.RawMessage(task)
method := "POST"
path := "druid/indexer/v1/task"

var result map[string]interface{}
_, err := c.ExecuteRequest(method, path, rawMessage, &result)
_, err := c.ExecuteRequest(http.MethodPost, path, rawMessage, &result)
if err != nil {
klog.Error("Failed to execute POST ingestion or kill task request", err)
return "", err
Expand Down Expand Up @@ -342,11 +337,10 @@ func GetKillTaskDefinition() string {
}

func (c *Client) CheckTaskStatus(taskID string) (bool, error) {
method := "GET"
path := fmt.Sprintf("druid/indexer/v1/task/%s/status", taskID)

var result map[string]interface{}
_, err := c.ExecuteRequest(method, path, nil, &result)
_, err := c.ExecuteRequest(http.MethodGet, path, nil, &result)
if err != nil {
klog.Error("Failed to execute GET task status request", err)
return false, err
Expand Down Expand Up @@ -376,3 +370,24 @@ func (c *Client) checkDBReadAccess(oldData string) error {
}
return errors.New("failed to read ingested data")
}

// Reference: https://druid.apache.org/docs/latest/development/extensions-core/druid-basic-security/#usercredential-management
func (c *Client) UpdateDruidPassword(password string) error {
path := "druid-ext/basic-security/authentication/db/basic/users/admin/credentials"

data := map[string]interface{}{
"password": password,
}
jsonData, err := json.Marshal(data)
if err != nil {
return err
}
rawMessage := json.RawMessage(jsonData)

_, err = c.ExecuteRequest(http.MethodPost, path, rawMessage, nil)
if err != nil {
klog.Error("Failed to execute coordinator config update request", err)
return err
}
return nil
}
14 changes: 13 additions & 1 deletion druid/kubedb_client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type KubeDBClientBuilder struct {
url string
podName string
nodeRole olddbapi.DruidNodeRoleType
password string
ctx context.Context
}

Expand Down Expand Up @@ -76,6 +77,11 @@ func (o *KubeDBClientBuilder) WithNodeRole(nodeRole olddbapi.DruidNodeRoleType)
return o
}

func (o *KubeDBClientBuilder) WithPassword(password string) *KubeDBClientBuilder {
o.password = password
return o
}

func (o *KubeDBClientBuilder) GetDruidClient() (*Client, error) {
var druidOpts []druidgo.ClientOption
// Add druid auth credential to the client
Expand Down Expand Up @@ -120,8 +126,14 @@ func (o *KubeDBClientBuilder) getClientAuthOpts() (*druidgo.ClientOption, error)
}
return nil, err
}

var password string
userName := string(authSecret.Data[core.BasicAuthUsernameKey])
password := string(authSecret.Data[core.BasicAuthPasswordKey])
if o.password != "" {
password = o.password
} else {
password = string(authSecret.Data[core.BasicAuthPasswordKey])
}

druidAuthOpts := druidgo.WithBasicAuth(userName, password)
return &druidAuthOpts, nil
Expand Down
45 changes: 0 additions & 45 deletions vendor/kmodules.xyz/client-go/tools/healthchecker/const.go

This file was deleted.

78 changes: 0 additions & 78 deletions vendor/kmodules.xyz/client-go/tools/healthchecker/health_card.go

This file was deleted.

Loading

0 comments on commit 9670d3b

Please sign in to comment.