Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge master in rc/v1.7.0 #272

Merged
merged 7 commits into from
Apr 15, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
get write index
miiu96 committed Apr 9, 2024
commit d37c354d35fd5c01c9a1d80a2cb860a48fd86536
47 changes: 46 additions & 1 deletion client/elasticClient.go
Original file line number Diff line number Diff line change
@@ -153,8 +153,14 @@ func (ec *elasticClient) DoQueryRemove(ctx context.Context, index string, body *
log.Warn("elasticClient.doRefresh", "cannot do refresh", err)
}

writeIndex, err := ec.getWriteIndex(index)
if err != nil {
log.Warn("elasticClient.getWriteIndex", "cannot do get write index", err)
return err
}

res, err := ec.client.DeleteByQuery(
[]string{index},
[]string{writeIndex},
body,
ec.client.DeleteByQuery.WithIgnoreUnavailable(true),
ec.client.DeleteByQuery.WithConflicts(esConflictsPolicy),
@@ -323,6 +329,45 @@ func (ec *elasticClient) createAlias(alias string, index string) error {
return parseResponse(res, nil, elasticDefaultErrorResponseHandler)
}

func (ec *elasticClient) getWriteIndex(alias string) (string, error) {
res, err := ec.client.Indices.GetAlias(
ec.client.Indices.GetAlias.WithIndex(alias),
)
if err != nil {
return "", err
}

var indexData map[string]struct {
Aliases map[string]struct {
IsWriteIndex bool `json:"is_write_index"`
} `json:"aliases"`
}

err = parseResponse(res, &indexData, elasticDefaultErrorResponseHandler)
if err != nil {
return "", err
}

// Iterate over the map and find the write index
var writeIndex string
for index, details := range indexData {
if len(indexData) == 1 {
return index, nil
}

for _, indexAlias := range details.Aliases {
if indexAlias.IsWriteIndex {
writeIndex = index
break
}
}
if writeIndex != "" {
break
}
}
return writeIndex, nil
}

// UpdateByQuery will update all the documents that match the provided query from the provided index
func (ec *elasticClient) UpdateByQuery(ctx context.Context, index string, buff *bytes.Buffer) error {
reader := bytes.NewReader(buff.Bytes())
52 changes: 50 additions & 2 deletions client/elasticClient_test.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ package client

import (
"context"
"io/ioutil"
"io"
"net/http"
"net/http/httptest"
"os"
@@ -53,7 +53,7 @@ func TestElasticClient_DoMultiGet(t *testing.T) {
jsonFile, err := os.Open("./testsData/response-multi-get.json")
require.Nil(t, err)

byteValue, _ := ioutil.ReadAll(jsonFile)
byteValue, _ := io.ReadAll(jsonFile)
_, _ = w.Write(byteValue)
}

@@ -75,3 +75,51 @@ func TestElasticClient_DoMultiGet(t *testing.T) {
_, ok := resMap["docs"]
require.True(t, ok)
}

func TestElasticClient_GetWriteIndexMultipleIndicesBehind(t *testing.T) {
handler := http.NotFound
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler(w, r)
}))
defer ts.Close()

handler = func(w http.ResponseWriter, r *http.Request) {
jsonFile, err := os.Open("./testsData/response-get-alias.json")
require.Nil(t, err)

byteValue, _ := io.ReadAll(jsonFile)
_, _ = w.Write(byteValue)
}

esClient, _ := NewElasticClient(elasticsearch.Config{
Addresses: []string{ts.URL},
Logger: &logging.CustomLogger{},
})
res, err := esClient.getWriteIndex("blocks")
require.Nil(t, err)
require.Equal(t, "blocks-000004", res)
}

func TestElasticClient_GetWriteIndexOneIndex(t *testing.T) {
handler := http.NotFound
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler(w, r)
}))
defer ts.Close()

handler = func(w http.ResponseWriter, r *http.Request) {
jsonFile, err := os.Open("./testsData/response-get-alias-only-one-index.json")
require.Nil(t, err)

byteValue, _ := io.ReadAll(jsonFile)
_, _ = w.Write(byteValue)
}

esClient, _ := NewElasticClient(elasticsearch.Config{
Addresses: []string{ts.URL},
Logger: &logging.CustomLogger{},
})
res, err := esClient.getWriteIndex("delegators")
require.Nil(t, err)
require.Equal(t, "delegators-000001", res)
}
7 changes: 7 additions & 0 deletions client/testsData/response-get-alias-only-one-index.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"delegators-000001" : {
"aliases" : {
"delegators" : { }
}
}
}
30 changes: 30 additions & 0 deletions client/testsData/response-get-alias.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"blocks-000003": {
"aliases": {
"blocks": {
"is_write_index": false
}
}
},
"blocks-000004": {
"aliases": {
"blocks": {
"is_write_index": true
}
}
},
"blocks-000002": {
"aliases": {
"blocks": {
"is_write_index": false
}
}
},
"blocks-000001": {
"aliases": {
"blocks": {
"is_write_index": false
}
}
}
}