Skip to content

Commit

Permalink
fixes after merge
Browse files Browse the repository at this point in the history
  • Loading branch information
miiu96 committed Apr 30, 2024
1 parent fa7dfd0 commit 6ba76b3
Show file tree
Hide file tree
Showing 12 changed files with 393 additions and 1 deletion.
20 changes: 20 additions & 0 deletions client/elasticClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,26 @@ func (ec *elasticClient) DoMultiGet(ctx context.Context, ids []string, index str
return nil
}

func (ec *elasticClient) DoSearchRequest(ctx context.Context, index string, buff *bytes.Buffer, resBody interface{}) error {
res, err := ec.client.Search(
ec.client.Search.WithContext(ctx),
ec.client.Search.WithIndex(index),
ec.client.Search.WithBody(buff),
)
if err != nil {
return err
}

err = parseResponse(res, &resBody, elasticDefaultErrorResponseHandler)
if err != nil {
log.Warn("elasticClient.DoSearchRequest",
"error parsing response", err.Error())
return err
}

return nil
}

// DoQueryRemove will do a query remove to elasticsearch server
func (ec *elasticClient) DoQueryRemove(ctx context.Context, index string, body *bytes.Buffer) error {
err := ec.doRefresh(index)
Expand Down
6 changes: 6 additions & 0 deletions migrations/dtos/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package dtos

const (
MigrationInProgress = "in_progress"
MigrationCompleted = "completed"
)
23 changes: 23 additions & 0 deletions migrations/dtos/dtos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package dtos

import "github.com/multiversx/mx-chain-es-indexer-go/data"

type ClusterSettings struct {
URL string
User string
Password string
}

type MigrationInfo struct {
Status string `json:"status"`
Timestamp uint64 `json:"timestamp"`
}

type ResponseLogsSearch struct {
Hits struct {
Hits []struct {
ID string `json:"_id"`
Source *data.Logs `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
9 changes: 9 additions & 0 deletions migrations/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package migrations

type MigrationHandler interface {
DoMigration(migrationName string) error
}

type MigrationProcessor interface {
StartMigrations(migrationNames []string) error
}
12 changes: 12 additions & 0 deletions migrations/process/migrationProcessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package process

type migrationProcessor struct {
}

func NewMigrationProcessor() (*migrationProcessor, error) {
return &migrationProcessor{}, nil
}

func (mp *migrationProcessor) StartMigrations(migrations []string) error {
return nil
}
228 changes: 228 additions & 0 deletions migrations/split/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
package split

import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/pubkeyConverter"
"github.com/multiversx/mx-chain-core-go/core/sharding"
"github.com/multiversx/mx-chain-es-indexer-go/client"
"github.com/multiversx/mx-chain-es-indexer-go/data"
"github.com/multiversx/mx-chain-es-indexer-go/migrations/dtos"
"github.com/multiversx/mx-chain-es-indexer-go/process/dataindexer"
logger "github.com/multiversx/mx-chain-logger-go"
"github.com/tidwall/gjson"
)

var log = logger.GetOrCreate("split-logs")

type ArgsEventsProc struct {
SourceCluster dtos.ClusterSettings
DestinationCluster dtos.ClusterSettings
}

type eventsProcessor struct {
sourceESClient EsClient
destinationESClient EsClient

count int
addressConverter core.PubkeyConverter
}

func NewEventsProcessor(args ArgsEventsProc) (*eventsProcessor, error) {
sourceClient, err := client.NewElasticClient(elasticsearch.Config{
Addresses: []string{args.SourceCluster.URL},
Username: args.SourceCluster.User,
Password: args.SourceCluster.Password,
})
if err != nil {
return nil, err
}
destinationClient, err := client.NewElasticClient(elasticsearch.Config{

Check failure on line 45 in migrations/split/events.go

View workflow job for this annotation

GitHub Actions / golangci linter

ineffectual assignment to err (ineffassign)
Addresses: []string{args.DestinationCluster.URL},
Username: args.DestinationCluster.User,
Password: args.DestinationCluster.Password,
})

pubKeyConverter, err := pubkeyConverter.NewBech32PubkeyConverter(32, "erd")
if err != nil {
return nil, err
}
return &eventsProcessor{
sourceESClient: sourceClient,
destinationESClient: destinationClient,
addressConverter: pubKeyConverter,
}, nil
}

func (ep *eventsProcessor) SplitLogIndexInEvents(migrationID, sourceIndex, destinationIndex string) error {
migrationInfo, err := ep.checkStatusOfSplit(migrationID)
if err != nil {
return err
}

lastTimestamp := migrationInfo.Timestamp

done := false
for !done {
query := computeQueryBasedOnTimestamp(lastTimestamp)
response := &dtos.ResponseLogsSearch{}
err = ep.sourceESClient.DoSearchRequest(context.Background(), sourceIndex, bytes.NewBuffer(query), response)
if err != nil {
return err
}

lastTimestamp, done, err = ep.prepareAndIndexEventsFromLogs(migrationID, response, destinationIndex)
if err != nil {
return err
}
ep.count++

log.Info("indexing events", "bulk-count", ep.count, "current-timestamp", lastTimestamp)

}

return nil
}

func (ep *eventsProcessor) prepareAndIndexEventsFromLogs(splitID string, logsResponse *dtos.ResponseLogsSearch, destinationIndex string) (uint64, bool, error) {
logEvents := make([]*data.LogEvent, 0)
for _, dbLog := range logsResponse.Hits.Hits {
dbLogEvents, err := ep.createEventsFromLog(dbLog.ID, dbLog.Source)
if err != nil {
return 0, false, err
}

logEvents = append(logEvents, dbLogEvents...)
}

buffSlice := data.NewBufferSlice(0)
for _, dbLog := range logEvents {
err := serializeLogEvent(dbLog, buffSlice)
if err != nil {
return 0, false, err
}
}

err := ep.doBulkRequests(destinationIndex, buffSlice.Buffers())
if err != nil {
return 0, false, err
}

// if we get less documents then the search size means the migration is done
migrationDone := len(logsResponse.Hits.Hits) != searchSIZE

lastEvent := logEvents[len(logEvents)-1]
err = ep.writeStatusOfSplit(splitID, uint64(lastEvent.Timestamp), migrationDone)
if err != nil {
return 0, false, err
}

return uint64(lastEvent.Timestamp), migrationDone, nil
}

func (ep *eventsProcessor) doBulkRequests(index string, buffSlice []*bytes.Buffer) error {
var err error
for idx := range buffSlice {
err = ep.destinationESClient.DoBulkRequest(context.Background(), buffSlice[idx], index)
if err != nil {
return err
}
}

return nil
}

func (ep *eventsProcessor) createEventsFromLog(txHash string, log *data.Logs) ([]*data.LogEvent, error) {
logEvents := make([]*data.LogEvent, 0, len(log.Events))
for _, event := range log.Events {
addressBytes, err := ep.addressConverter.Decode(log.Address)
if err != nil {
return nil, err
}

eventShardID := sharding.ComputeShardID(addressBytes, 3)
logEvents = append(logEvents, &data.LogEvent{
ID: fmt.Sprintf("%s-%d-%d", txHash, eventShardID, event.Order),
TxHash: txHash,
OriginalTxHash: log.OriginalTxHash,
LogAddress: log.Address,
Address: event.Address,
Identifier: event.Identifier,
Data: hex.EncodeToString(event.Data),
AdditionalData: hexEncodeSlice(event.AdditionalData),
Topics: hexEncodeSlice(event.Topics),
Order: event.Order,
ShardID: eventShardID,
Timestamp: log.Timestamp,
})
}

return logEvents, nil
}

func (ep *eventsProcessor) writeStatusOfSplit(splitID string, timestamp uint64, isDone bool) error {
status := dtos.MigrationInProgress
if isDone {
status = dtos.MigrationCompleted
}

migrationInfo := dtos.MigrationInfo{
Status: status,
Timestamp: timestamp,
}

meta := []byte(fmt.Sprintf(`{ "index" : { "_index":"%s", "_id" : "%s" } }%s`, dataindexer.ValuesIndex, splitID, "\n"))
migrationInfoBytes, err := json.Marshal(migrationInfo)
if err != nil {
return err
}

buffSlice := data.NewBufferSlice(0)
err = buffSlice.PutData(meta, migrationInfoBytes)
if err != nil {
return err
}

return ep.destinationESClient.DoBulkRequest(context.Background(), buffSlice.Buffers()[0], "")
}

func (ep *eventsProcessor) checkStatusOfSplit(splitID string) (*dtos.MigrationInfo, error) {
var response json.RawMessage
err := ep.destinationESClient.DoMultiGet(context.Background(), []string{splitID}, dataindexer.ValuesIndex, true, &response)
if err != nil {
return nil, err
}

numOfDocs := gjson.Get(string(response), "docs.#").Int()
if numOfDocs == 0 {
return nil, nil
}

found := gjson.Get(string(response), "docs.0.found")
if !found.Bool() {
return &dtos.MigrationInfo{}, nil
}

responseBytes := []byte(gjson.Get(string(response), "docs.0._source").String())

migrationInfo := &dtos.MigrationInfo{}
err = json.Unmarshal(responseBytes, migrationInfo)
return migrationInfo, err
}

func hexEncodeSlice(input [][]byte) []string {
hexEncoded := make([]string, 0, len(input))
for idx := 0; idx < len(input); idx++ {
hexEncoded = append(hexEncoded, hex.EncodeToString(input[idx]))
}
if len(hexEncoded) == 0 {
return nil
}

return hexEncoded
}
28 changes: 28 additions & 0 deletions migrations/split/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package split

import (
"github.com/multiversx/mx-chain-es-indexer-go/migrations/dtos"
"github.com/stretchr/testify/require"
"testing"
)

func TestWriteStatusOfSplit(t *testing.T) {
ep, err := NewEventsProcessor(ArgsEventsProc{
SourceCluster: dtos.ClusterSettings{
URL: "https://index.multiversx.com",
},
DestinationCluster: dtos.ClusterSettings{
URL: "http://127.0.0.1:9200",
},
})
require.NoError(t, err)

timestamp := uint64(12345)
err = ep.SplitLogIndexInEvents("splitEvent", "logs", "events")
require.NoError(t, err)

migrationInfo, err := ep.checkStatusOfSplit("splitEvent")
require.NoError(t, err)
require.Equal(t, timestamp, migrationInfo.Timestamp)

}
12 changes: 12 additions & 0 deletions migrations/split/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package split

import (
"bytes"
"context"
)

type EsClient interface {
DoBulkRequest(ctx context.Context, buff *bytes.Buffer, index string) error
DoSearchRequest(ctx context.Context, index string, buff *bytes.Buffer, resBody interface{}) error
DoMultiGet(ctx context.Context, ids []string, index string, withSource bool, res interface{}) error
}
36 changes: 36 additions & 0 deletions migrations/split/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package split

import "fmt"

const searchSIZE = 9999

func computeQueryBasedOnTimestamp(timestamp uint64) []byte {
if timestamp == 0 {
query := fmt.Sprintf(`{
"size": %d,
"query": {
"match_all": {}
},
"sort": [
{"timestamp": "asc"}
]
}`, searchSIZE)

return []byte(query)
}

query := fmt.Sprintf(`
{
"size": %d,
"query": {
"match_all": {}
},
"sort": [
{"timestamp": "asc"}
],
"search_after": ["%d"]
}
`, searchSIZE, timestamp)

return []byte(query)
}
Loading

0 comments on commit 6ba76b3

Please sign in to comment.