Skip to content

Commit

Permalink
feat(postgres): Improve primary filters when multiple databases exist
Browse files Browse the repository at this point in the history
  • Loading branch information
gabe565 committed Jan 10, 2024
1 parent 5662bbe commit 2919fdc
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 92 deletions.
6 changes: 3 additions & 3 deletions internal/database/detect_dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (
var ErrDatabaseNotFound = errors.New("could not detect a database")

func DetectDialect(ctx context.Context, client kubernetes.KubeClient) (config.Database, []v1.Pod, error) {
pods, err := client.GetNamespacedPods(ctx)
podList, err := client.GetNamespacedPods(ctx)
if err != nil {
return nil, []v1.Pod{}, err
}

for _, g := range All() {
pods, err := client.FilterPodList(pods, g.PodLabels())
if err == nil {
pods := kubernetes.FilterPodList(podList.Items, g.PodLabels())
if len(pods) != 0 {
return g, pods, nil
}
}
Expand Down
82 changes: 50 additions & 32 deletions internal/database/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ import (
v1 "k8s.io/api/core/v1"
)

var (
postgresqlHaQuery = kubernetes.LabelQueryAnd{
{Name: "app.kubernetes.io/name", Value: "postgresql-ha"},
{Name: "app.kubernetes.io/component", Value: "postgresql"},
}

cnpgQuery = kubernetes.LabelQuery{Name: "cnpg.io/cluster", Operator: selection.Exists}

zalandoQuery = kubernetes.LabelQuery{Name: "application", Value: "spilo"}
)

type Postgres struct{}

func (Postgres) Name() string {
Expand Down Expand Up @@ -77,23 +88,22 @@ func (Postgres) PodLabels() []kubernetes.LabelQueryable {
{Name: "app.kubernetes.io/name", Value: "postgresql"},
{Name: "app.kubernetes.io/component", Value: "primary"},
},
kubernetes.LabelQueryAnd{
{Name: "app.kubernetes.io/name", Value: "postgresql-ha"},
{Name: "app.kubernetes.io/component", Value: "postgresql"},
},
kubernetes.LabelQuery{Name: "cnpg.io/cluster", Operator: selection.Exists},
kubernetes.LabelQuery{Name: "application", Value: "spilo"},
postgresqlHaQuery,
cnpgQuery,
zalandoQuery,
kubernetes.LabelQuery{Name: "app", Value: "postgresql"},
}
}

func (Postgres) FilterPods(ctx context.Context, client kubernetes.KubeClient, pods []v1.Pod) ([]v1.Pod, error) {
if len(pods) == 0 {
if len(pods) <= 1 {
return pods, nil
}

var leaderName string
if pods[0].Labels["app.kubernetes.io/name"] == "postgresql-ha" {
preferred := make([]v1.Pod, 0, len(pods))

// bitnami/postgresql-ha
if matched := postgresqlHaQuery.FindPods(pods); len(matched) != 0 {
// HA chart. Need to detect primary.
log.Debug("querying Bitnami repmgr for primary instance")
cmd := command.NewBuilder(
Expand All @@ -106,14 +116,15 @@ func (Postgres) FilterPods(ctx context.Context, client kubernetes.KubeClient, po
var buf bytes.Buffer
var errBuf strings.Builder
if err := client.Exec(ctx, kubernetes.ExecOptions{
Pod: pods[0],
Pod: matched[0],
Cmd: cmd.String(),
Stdout: &buf,
Stderr: &errBuf,
}); err != nil {
return pods, fmt.Errorf("%w: %s", err, errBuf.String())
}

var primaryName string
r := csv.NewReader(&buf)
for {
row, err := r.Read()
Expand All @@ -124,28 +135,40 @@ func (Postgres) FilterPods(ctx context.Context, client kubernetes.KubeClient, po
return pods, err
}
if row[2] == "primary" {
leaderName = row[1]
primaryName = row[1]
break
}
}
} else if _, ok := pods[0].Labels["cnpg.io/cluster"]; ok {
log.Debug("filtering CloudNativePG Pods for Leader")

for key, pod := range pods {
if role, ok := pod.Labels["cnpg.io/instanceRole"]; ok {
if role == "primary" {
return pods[key : key+1], nil
if primaryName != "" {
for _, pod := range matched {
if pod.Name == primaryName {
preferred = append(preferred, pod)
}
}
}
} else if pods[0].Labels["application"] == "spilo" {
}

// CloudNativePG
if matched := cnpgQuery.FindPods(pods); len(matched) != 0 {
log.Debug("filtering CloudNativePG Pods for Leader")

for _, pod := range matched {
if role, ok := pod.Labels["cnpg.io/instanceRole"]; ok && role == "primary" {
preferred = append(preferred, pod)
}
}
}

// Zalando Postgres Operator
if matched := zalandoQuery.FindPods(pods); len(matched) != 0 {
log.Debug("querying Patroni for primary instance")
cmd := command.NewBuilder("patronictl", "list", "--format=json")

var buf bytes.Buffer
var errBuf strings.Builder
if err := client.Exec(ctx, kubernetes.ExecOptions{
Pod: pods[0],
Pod: matched[0],
Cmd: cmd.String(),
Stdout: &buf,
Stderr: &errBuf,
Expand All @@ -159,24 +182,19 @@ func (Postgres) FilterPods(ctx context.Context, client kubernetes.KubeClient, po
}

for _, member := range data {
if role, ok := member["Role"]; ok && role == "Leader" {
if name, ok := member["Member"].(string); ok {
leaderName = name
break
if role, ok := member["Role"]; ok && role != "Leader" {
if leaderName, ok := member["Member"].(string); ok {
for _, pod := range matched {
if pod.Name == leaderName {
preferred = append(preferred, pod)
}
}
}
}
}
}

if leaderName != "" {
for key, pod := range pods {
if pod.Name == leaderName {
return pods[key : key+1], nil
}
}
}

return pods, nil
return preferred, nil
}

func (db Postgres) PasswordEnvNames(c config.Global) kubernetes.ConfigFinders {
Expand Down
13 changes: 6 additions & 7 deletions internal/kubernetes/label_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@ func (query LabelQuery) Matches(pod v1.Pod) bool {
return false
}

func (query LabelQuery) FindPods(list *v1.PodList) (pods []v1.Pod, err error) {
for _, pod := range list.Items {
func (query LabelQuery) FindPods(pods []v1.Pod) []v1.Pod {
matched := make([]v1.Pod, 0, len(pods))

for _, pod := range pods {
if query.Matches(pod) {
pods = append(pods, pod)
matched = append(matched, pod)
}
}

if len(pods) == 0 {
err = ErrPodNotFound
}
return pods, err
return matched
}
13 changes: 6 additions & 7 deletions internal/kubernetes/label_query_and.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ func (queries LabelQueryAnd) Matches(pod v1.Pod) bool {
return true
}

func (queries LabelQueryAnd) FindPods(list *v1.PodList) (pods []v1.Pod, err error) {
for _, pod := range list.Items {
func (queries LabelQueryAnd) FindPods(pods []v1.Pod) []v1.Pod {
matched := make([]v1.Pod, 0, len(pods))

for _, pod := range pods {
if queries.Matches(pod) {
pods = append(pods, pod)
matched = append(matched, pod)
}
}

if len(pods) == 0 {
err = ErrPodNotFound
}
return pods, err
return matched
}
19 changes: 5 additions & 14 deletions internal/kubernetes/label_query_and_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,45 +9,36 @@ import (

func TestLabelQueryAnd_FindPods(t *testing.T) {
type args struct {
list *v1.PodList
list []v1.Pod
}
tests := []struct {
name string
queries LabelQueryAnd
args args
wantPods []v1.Pod
wantErr bool
}{
{
"1 found",
LabelQueryAnd{
{Name: "key", Value: "value"},
{Name: "key2", Value: "value2"},
},
args{&podList},
args{pods},
[]v1.Pod{pod},
false,
},
{
"0 found",
LabelQueryAnd{
{Name: "key", Value: "value"},
{Name: "key2", Value: "wrong"},
},
args{&podList},
nil,
true,
args{pods},
[]v1.Pod{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotPods, err := tt.queries.FindPods(tt.args.list)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, tt.wantPods, gotPods)
assert.Equal(t, tt.wantPods, tt.queries.FindPods(tt.args.list))
})
}
}
Expand Down
17 changes: 5 additions & 12 deletions internal/kubernetes/label_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,32 @@ var pod = v1.Pod{
},
}

var podList = v1.PodList{Items: []v1.Pod{pod}}
var pods = []v1.Pod{pod}

func TestLabelQuery_FindPods(t *testing.T) {
type fields struct {
Name string
Value string
}
type args struct {
list *v1.PodList
list []v1.Pod
}
tests := []struct {
name string
fields fields
args args
wantPods []v1.Pod
wantErr bool
}{
{"1 found", fields{"key", "value"}, args{&podList}, []v1.Pod{pod}, false},
{"0 found", fields{"key", "wrong"}, args{&podList}, nil, true},
{"1 found", fields{"key", "value"}, args{pods}, []v1.Pod{pod}},
{"0 found", fields{"key", "wrong"}, args{pods}, []v1.Pod{}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
query := LabelQuery{
Name: tt.fields.Name,
Value: tt.fields.Value,
}
gotPods, err := query.FindPods(tt.args.list)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, tt.wantPods, gotPods)
assert.Equal(t, tt.wantPods, query.FindPods(tt.args.list))
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/kubernetes/label_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import v1 "k8s.io/api/core/v1"

type LabelQueryable interface {
Matches(pod v1.Pod) bool
FindPods(list *v1.PodList) ([]v1.Pod, error)
FindPods(pods []v1.Pod) []v1.Pod
}
26 changes: 10 additions & 16 deletions internal/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,34 +100,28 @@ func (client KubeClient) Exec(ctx context.Context, opt ExecOptions) error {
}

func (client KubeClient) GetPodsFiltered(ctx context.Context, queries []LabelQueryable) ([]v1.Pod, error) {
pods, err := client.GetNamespacedPods(ctx)
podList, err := client.GetNamespacedPods(ctx)
if err != nil {
return []v1.Pod{}, err
}
return client.FilterPodList(pods, queries)
return FilterPodList(podList.Items, queries), nil
}

func (client KubeClient) FilterPodList(pods *v1.PodList, queries []LabelQueryable) (foundPods []v1.Pod, err error) {
func FilterPodList(pods []v1.Pod, queries []LabelQueryable) []v1.Pod {
matched := make([]v1.Pod, 0, len(pods))

for _, query := range queries {
var p []v1.Pod
p, err = query.FindPods(pods)
if errors.Is(err, ErrPodNotFound) {
log.WithField("query", query).Trace(err)
p := query.FindPods(pods)
if len(p) == 0 {
log.WithField("query", query).Trace(ErrPodNotFound)
continue
}
log.WithFields(log.Fields{
"query": query,
"count": len(p),
}).Trace("query returned podlist")
foundPods = append(foundPods, p...)
matched = append(matched, p...)
}

if len(foundPods) == 0 {
if errors.Is(err, ErrPodNotFound) {
err = ErrPodNotFound
}

return []v1.Pod{}, err
}
return foundPods, nil
return matched
}
4 changes: 4 additions & 0 deletions internal/util/cmd_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func DefaultSetup(cmd *cobra.Command, conf *config.Global, opts SetupOptions) (e
if err != nil {
return err
}

if len(pods) == 0 {
return kubernetes.ErrPodNotFound
}
}
}

Expand Down

0 comments on commit 2919fdc

Please sign in to comment.