Skip to content

Commit

Permalink
Store AST directly in provider mappings (#6114) (#6158)
Browse files Browse the repository at this point in the history
* Store AST directly in dynamic provider mappings

* Store AST directly in context provider mappings

(cherry picked from commit fe0f6b0)

Co-authored-by: Mikołaj Świątek <[email protected]>
  • Loading branch information
mergify[bot] and swiatekm authored Nov 27, 2024
1 parent 19f8412 commit ab5e07a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 47 deletions.
3 changes: 3 additions & 0 deletions internal/pkg/agent/transpiler/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,9 @@ func (a *AST) HashStr() string {

// Equal check if two AST are equals by using the computed hash.
func (a *AST) Equal(other *AST) bool {
if a.root == nil || other.root == nil {
return a.root == other.root
}
return bytes.Equal(a.Hash(), other.Hash())
}

Expand Down
10 changes: 7 additions & 3 deletions internal/pkg/composable/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
package composable

import (
"maps"
"os"
"path/filepath"
"strings"
"testing"

"github.com/elastic/elastic-agent/internal/pkg/agent/transpiler"

"gopkg.in/yaml.v3"
"k8s.io/apimachinery/pkg/util/uuid"

Expand Down Expand Up @@ -54,7 +55,8 @@ func BenchmarkGenerateVars100Pods(b *testing.B) {
mappings: make(map[string]dynamicProviderMapping),
}
for i := 0; i < podCount; i++ {
podData := maps.Clone(providerMapping)
podData, err := transpiler.NewAST(providerMapping)
require.NoError(b, err)
podUID := uuid.NewUUID()
podMapping := dynamicProviderMapping{
mapping: podData,
Expand All @@ -63,8 +65,10 @@ func BenchmarkGenerateVars100Pods(b *testing.B) {
}
c.dynamicProviders[providerName] = providerState
} else {
providerAst, err := transpiler.NewAST(providerData[providerName])
require.NoError(b, err)
providerState := &contextProviderState{
mapping: providerData[providerName],
mapping: providerAst,
}
c.contextProviders[providerName] = providerState
}
Expand Down
60 changes: 16 additions & 44 deletions internal/pkg/composable/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error)
if err != nil {
return nil, errors.New(err, fmt.Sprintf("failed to build provider '%s'", name), errors.TypeConfig, errors.M("provider", name))
}
emptyMapping, _ := transpiler.NewAST(nil)
contextProviders[name] = &contextProviderState{
// Safe for Context to be nil here because it will be filled in
// by (*controller).Run before the provider is started.
provider: provider,
mapping: emptyMapping,
}
}

Expand Down Expand Up @@ -275,20 +277,17 @@ func (c *controller) Close() {
func (c *controller) generateVars(fetchContextProviders mapstr.M) []*transpiler.Vars {
// build the vars list of mappings
vars := make([]*transpiler.Vars, 1)
mapping := map[string]interface{}{}
mapping, _ := transpiler.NewAST(map[string]any{})
for name, state := range c.contextProviders {
mapping[name] = state.Current()
_ = mapping.Insert(state.Current(), name)
}
// this is ensured not to error, by how the mappings states are verified
mappingAst, _ := transpiler.NewAST(mapping)
vars[0] = transpiler.NewVarsFromAst("", mappingAst, fetchContextProviders)
vars[0] = transpiler.NewVarsFromAst("", mapping, fetchContextProviders)

// add to the vars list for each dynamic providers mappings
for name, state := range c.dynamicProviders {
for _, mappings := range state.Mappings() {
local := mappingAst.ShallowClone()
dynamicAst, _ := transpiler.NewAST(mappings.mapping)
_ = local.Insert(dynamicAst, name)
local := mapping.ShallowClone()
_ = local.Insert(mappings.mapping, name)
id := fmt.Sprintf("%s-%s", name, mappings.id)
v := transpiler.NewVarsWithProcessorsFromAst(id, local, name, mappings.processors, fetchContextProviders)
vars = append(vars, v)
Expand All @@ -302,7 +301,7 @@ type contextProviderState struct {

provider corecomp.ContextProvider
lock sync.RWMutex
mapping map[string]interface{}
mapping *transpiler.AST
signal chan bool
}

Expand All @@ -324,30 +323,25 @@ func (c *contextProviderState) Signal() {
// Set sets the current mapping.
func (c *contextProviderState) Set(mapping map[string]interface{}) error {
var err error
mapping, err = cloneMap(mapping)
if err != nil {
return err
}
// ensure creating vars will not error
_, err = transpiler.NewVars("", mapping, nil)
ast, err := transpiler.NewAST(mapping)
if err != nil {
return err
}

c.lock.Lock()
defer c.lock.Unlock()

if reflect.DeepEqual(c.mapping, mapping) {
if c.mapping != nil && c.mapping.Equal(ast) {
// same mapping; no need to update and signal
return nil
}
c.mapping = mapping
c.mapping = ast
c.Signal()
return nil
}

// Current returns the current mapping.
func (c *contextProviderState) Current() map[string]interface{} {
func (c *contextProviderState) Current() *transpiler.AST {
c.lock.RLock()
defer c.lock.RUnlock()
return c.mapping
Expand All @@ -356,7 +350,7 @@ func (c *contextProviderState) Current() map[string]interface{} {
type dynamicProviderMapping struct {
id string
priority int
mapping map[string]interface{}
mapping *transpiler.AST
processors transpiler.Processors
}

Expand All @@ -376,31 +370,25 @@ type dynamicProviderState struct {
// to ensure that matching of variables occurs on the lower priority mappings first.
func (c *dynamicProviderState) AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error {
var err error
mapping, err = cloneMap(mapping)
if err != nil {
return err
}
processors, err = cloneMapArray(processors)
if err != nil {
return err
}
// ensure creating vars will not error
_, err = transpiler.NewVars("", mapping, nil)
ast, err := transpiler.NewAST(mapping)
if err != nil {
return err
}

c.lock.Lock()
defer c.lock.Unlock()
curr, ok := c.mappings[id]
if ok && reflect.DeepEqual(curr.mapping, mapping) && reflect.DeepEqual(curr.processors, processors) {
if ok && curr.mapping.Equal(ast) && reflect.DeepEqual(curr.processors, processors) {
// same mapping; no need to update and signal
return nil
}
c.mappings[id] = dynamicProviderMapping{
id: id,
priority: priority,
mapping: mapping,
mapping: ast,
processors: processors,
}

Expand Down Expand Up @@ -458,22 +446,6 @@ func (c *dynamicProviderState) Mappings() []dynamicProviderMapping {
return mappings
}

func cloneMap(source map[string]interface{}) (map[string]interface{}, error) {
if source == nil {
return nil, nil
}
bytes, err := json.Marshal(source)
if err != nil {
return nil, fmt.Errorf("failed to clone: %w", err)
}
var dest map[string]interface{}
err = json.Unmarshal(bytes, &dest)
if err != nil {
return nil, fmt.Errorf("failed to clone: %w", err)
}
return dest, nil
}

func cloneMapArray(source []map[string]interface{}) ([]map[string]interface{}, error) {
if source == nil {
return nil, nil
Expand Down

0 comments on commit ab5e07a

Please sign in to comment.