Skip to content

Commit

Permalink
feat: reuse etcd-mutex instance
Browse files Browse the repository at this point in the history
Signed-off-by: CeerDecy <[email protected]>
  • Loading branch information
CeerDecy committed Oct 21, 2024
1 parent 4509125 commit b5db027
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 3 deletions.
2 changes: 1 addition & 1 deletion providers/etcd-mutex/examples/examples.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
etcd:
endpoints: "https://127.0.0.1:2379"
endpoints: "http://127.0.0.1:2379"
# tls:
# cert_file: "etcd-client.pem"
# cert_key_file: "etcd-client-key.pem"
Expand Down
15 changes: 13 additions & 2 deletions providers/etcd-mutex/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type provider struct {
etcd etcd.Interface
instances map[string]Mutex
inProcMutex *inProcMutex
lock *sync.Mutex
}

// Init .
Expand All @@ -76,19 +77,28 @@ func (p *provider) NewWithTTL(ctx context.Context, key string, ttl time.Duration
if seconds > 0 {
opts = append(opts, concurrency.WithTTL(seconds))
}
return &etcdMutex{
mutex := &etcdMutex{
log: p.Log,
key: filepath.Clean(filepath.Join(p.Cfg.RootPath, key)),
client: p.etcd.Client(),
opts: opts,
inProcLock: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
}, nil
}
p.lock.Lock()
defer p.lock.Unlock()
p.instances[key] = mutex
return mutex, nil
}

// New .
func (p *provider) New(ctx context.Context, key string) (Mutex, error) {
p.lock.Lock()
if ins, ok := p.instances[key]; ok {
return ins, nil
}
p.lock.Unlock()
return p.NewWithTTL(ctx, key, time.Duration(0))
}

Expand Down Expand Up @@ -293,6 +303,7 @@ func init() {
return &provider{
instances: make(map[string]Mutex),
inProcMutex: &inProcMutex{lock: make(chan struct{}, 1)},
lock: &sync.Mutex{},
}
},
})
Expand Down
95 changes: 95 additions & 0 deletions providers/etcd-mutex/mutex_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package mutex

import (
"context"
_ "embed"
"fmt"
"os"
"path/filepath"
"sync"
"testing"
"time"

"github.com/erda-project/erda-infra/base/servicehub"
)

type provider2 struct {
Mutex Interface // autowired
Lock Mutex `mutex-key:"test-key"` // autowired
}

func (p *provider2) Run(ctx context.Context) error {
fmt.Println("running ...")
go func() {
time.Sleep(10000 * time.Second)
err := p.Lock.Close()
if err != nil {
fmt.Println("Close err: ", err)
return
}
err = p.Lock.Close()
if err != nil {
fmt.Println("Close err: ", err)
return
}
}()
//ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
mu, err := p.Mutex.New(ctx, "keyAAAA")
if err != nil {
return err
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := mu.Lock(ctx)
if err != nil {
return
}
defer mu.Unlock(ctx)
for i := 0; i < 10; i++ {
fmt.Printf("A wait %vs\n", i+1)
time.Sleep(1 * time.Second)
}
fmt.Println("AAA===>")
}()
time.Sleep(1 * time.Second)
ctx, cancelFunc = context.WithCancel(context.Background())
defer cancelFunc()
mu, err = p.Mutex.New(ctx, "keyAAAA")
if err != nil {
return err
}
wg.Add(1)
go func() {
defer wg.Done()
err := mu.Lock(ctx)
if err != nil {
return
}
defer mu.Unlock(ctx)
time.Sleep(5 * time.Second)
fmt.Println("BBB===>")
}()
wg.Wait()
return nil
}

func init() {
servicehub.Register("example", &servicehub.Spec{
Services: []string{"example"},
Dependencies: []string{"etcd-mutex"},
Description: "example",
Creator: func() servicehub.Provider {
return &provider2{}
},
})
}

func TestLock(t *testing.T) {
dir, _ := os.Getwd()
hub := servicehub.New()
hub.Run("examples", filepath.Join(dir, "examples", "examples.yaml"), os.Args...)
}

0 comments on commit b5db027

Please sign in to comment.