Skip to content

Commit

Permalink
feat: INFRA-508 Added support for groups management with etcd
Browse files Browse the repository at this point in the history
  • Loading branch information
Magnitus- committed Dec 21, 2023
1 parent aa7769b commit bbfb330
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
74 changes: 74 additions & 0 deletions client/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package client

import (
"errors"
"fmt"
)

type Group struct {
KeyPrefix string
Id string
}

func (cli *EtcdClient) JoinGroup(gr Group) (error) {
_, err := cli.PutKey(fmt.Sprintf("%s%s", gr.KeyPrefix, gr.Id), fmt.Sprintf("%s", gr.Id))
return err
}

func (cli *EtcdClient) LeaveGroup(gr Group) (error) {
return cli.DeleteKey(fmt.Sprintf("%s%s", gr.KeyPrefix, gr.Id))
}

func (cli *EtcdClient) GetGroupMembers(gr Group) (map[string]string, int64, error) {
info, err := cli.GetPrefix(gr.KeyPrefix)
if err != nil {
return nil, -1, err
}

return info.Keys.ToValueMap(gr.KeyPrefix), info.Revision, nil
}

func (cli *EtcdClient) WaitGroupCountThreshold(gr Group, threshold int64, doneCh <-chan struct{}) <-chan error {
errCh := make(chan error)
go func() {
members, rev, err := cli.GetGroupMembers(gr)
if err != nil {
errCh <- err
close(errCh)
return
}

if int64(len(members)) >= threshold {
close(errCh)
return
}

wcCh := cli.Watch(gr.KeyPrefix, WatchOptions{IsPrefix: true, TrimPrefix: true, Revision: rev + 1})
for true {
select {
case res, ok := <-wcCh:
if !ok {
errCh <- errors.New("Watch stopped before reaching threshold")
close(errCh)
return
}

if res.Error != nil {
errCh <- res.Error
close(errCh)
return
}

res.Changes.ApplyOn(members)
if int64(len(members)) >= threshold {
close(errCh)
return
}
case <-doneCh:
close(errCh)
return
}
}
}()
return errCh
}
13 changes: 13 additions & 0 deletions client/key-watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ type WatchInfo struct {
Deletions []string
}

/*
Apply the changes of a WatchInfo structure to a map of strings
*/
func (info *WatchInfo) ApplyOn(dest map[string]string) {
for key, val := range info.Upserts {
dest[key] = val.Value
}

for _, key := range info.Deletions {
delete(dest, key)
}
}

/*
Events returned by the watch function.
It can report either a change or an error.
Expand Down

0 comments on commit bbfb330

Please sign in to comment.