From b5db0279469c0b22ee29c267ec824248d72c5387 Mon Sep 17 00:00:00 2001 From: CeerDecy <1748788674@qq.com> Date: Mon, 21 Oct 2024 13:48:41 +0800 Subject: [PATCH] feat: reuse etcd-mutex instance Signed-off-by: CeerDecy <1748788674@qq.com> --- providers/etcd-mutex/examples/examples.yaml | 2 +- providers/etcd-mutex/mutex.go | 15 +++- providers/etcd-mutex/mutex_test.go | 95 +++++++++++++++++++++ 3 files changed, 109 insertions(+), 3 deletions(-) create mode 100644 providers/etcd-mutex/mutex_test.go diff --git a/providers/etcd-mutex/examples/examples.yaml b/providers/etcd-mutex/examples/examples.yaml index ecdc5e91..422280c0 100644 --- a/providers/etcd-mutex/examples/examples.yaml +++ b/providers/etcd-mutex/examples/examples.yaml @@ -1,5 +1,5 @@ etcd: - endpoints: "https://127.0.0.1:2379" + endpoints: "http://127.0.0.1:2379" # tls: # cert_file: "etcd-client.pem" # cert_key_file: "etcd-client-key.pem" diff --git a/providers/etcd-mutex/mutex.go b/providers/etcd-mutex/mutex.go index 1b9c3191..bf93dba2 100644 --- a/providers/etcd-mutex/mutex.go +++ b/providers/etcd-mutex/mutex.go @@ -59,6 +59,7 @@ type provider struct { etcd etcd.Interface instances map[string]Mutex inProcMutex *inProcMutex + lock *sync.Mutex } // Init . @@ -76,7 +77,7 @@ func (p *provider) NewWithTTL(ctx context.Context, key string, ttl time.Duration if seconds > 0 { opts = append(opts, concurrency.WithTTL(seconds)) } - return &etcdMutex{ + mutex := &etcdMutex{ log: p.Log, key: filepath.Clean(filepath.Join(p.Cfg.RootPath, key)), client: p.etcd.Client(), @@ -84,11 +85,20 @@ func (p *provider) NewWithTTL(ctx context.Context, key string, ttl time.Duration inProcLock: make(chan struct{}, 1), ctx: ctx, cancel: cancel, - }, nil + } + p.lock.Lock() + defer p.lock.Unlock() + p.instances[key] = mutex + return mutex, nil } // New . func (p *provider) New(ctx context.Context, key string) (Mutex, error) { + p.lock.Lock() + if ins, ok := p.instances[key]; ok { + return ins, nil + } + p.lock.Unlock() return p.NewWithTTL(ctx, key, time.Duration(0)) } @@ -293,6 +303,7 @@ func init() { return &provider{ instances: make(map[string]Mutex), inProcMutex: &inProcMutex{lock: make(chan struct{}, 1)}, + lock: &sync.Mutex{}, } }, }) diff --git a/providers/etcd-mutex/mutex_test.go b/providers/etcd-mutex/mutex_test.go new file mode 100644 index 00000000..0b3c9181 --- /dev/null +++ b/providers/etcd-mutex/mutex_test.go @@ -0,0 +1,95 @@ +package mutex + +import ( + "context" + _ "embed" + "fmt" + "os" + "path/filepath" + "sync" + "testing" + "time" + + "github.com/erda-project/erda-infra/base/servicehub" +) + +type provider2 struct { + Mutex Interface // autowired + Lock Mutex `mutex-key:"test-key"` // autowired +} + +func (p *provider2) Run(ctx context.Context) error { + fmt.Println("running ...") + go func() { + time.Sleep(10000 * time.Second) + err := p.Lock.Close() + if err != nil { + fmt.Println("Close err: ", err) + return + } + err = p.Lock.Close() + if err != nil { + fmt.Println("Close err: ", err) + return + } + }() + //ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + mu, err := p.Mutex.New(ctx, "keyAAAA") + if err != nil { + return err + } + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := mu.Lock(ctx) + if err != nil { + return + } + defer mu.Unlock(ctx) + for i := 0; i < 10; i++ { + fmt.Printf("A wait %vs\n", i+1) + time.Sleep(1 * time.Second) + } + fmt.Println("AAA===>") + }() + time.Sleep(1 * time.Second) + ctx, cancelFunc = context.WithCancel(context.Background()) + defer cancelFunc() + mu, err = p.Mutex.New(ctx, "keyAAAA") + if err != nil { + return err + } + wg.Add(1) + go func() { + defer wg.Done() + err := mu.Lock(ctx) + if err != nil { + return + } + defer mu.Unlock(ctx) + time.Sleep(5 * time.Second) + fmt.Println("BBB===>") + }() + wg.Wait() + return nil +} + +func init() { + servicehub.Register("example", &servicehub.Spec{ + Services: []string{"example"}, + Dependencies: []string{"etcd-mutex"}, + Description: "example", + Creator: func() servicehub.Provider { + return &provider2{} + }, + }) +} + +func TestLock(t *testing.T) { + dir, _ := os.Getwd() + hub := servicehub.New() + hub.Run("examples", filepath.Join(dir, "examples", "examples.yaml"), os.Args...) +}