Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: etcd lease handlining #801

Merged
merged 1 commit into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/backend/runtime/omni/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func BuildEtcdPersistentState(ctx context.Context, params *config.Params, logger
return buildEtcdPersistentState(ctx, params, logger, f)
}

func GetEmbeddedEtcdClient(ctx context.Context, params *config.EtcdParams, logger *zap.Logger, f func(context.Context, *clientv3.Client) error) error {
return getEmbeddedEtcdClient(ctx, params, logger, f)
func GetEmbeddedEtcdClientWithServer(ctx context.Context, params *config.EtcdParams, logger *zap.Logger, f func(context.Context, *clientv3.Client, func() error) error) error {
return getEmbeddedEtcdClientWithServerCloser(ctx, params, logger, f)
}

func EtcdElections(ctx context.Context, client *clientv3.Client, electionKey string, logger *zap.Logger, f func(ctx context.Context, client *clientv3.Client) error) error {
Expand Down
13 changes: 10 additions & 3 deletions internal/backend/runtime/omni/state_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net"
"net/url"
"os"
"sync"
"time"

"github.com/cosi-project/runtime/pkg/resource"
Expand Down Expand Up @@ -137,6 +138,12 @@ func getEtcdClient(ctx context.Context, params *config.EtcdParams, logger *zap.L
}

func getEmbeddedEtcdClient(ctx context.Context, params *config.EtcdParams, logger *zap.Logger, f func(context.Context, *clientv3.Client) error) error {
return getEmbeddedEtcdClientWithServerCloser(ctx, params, logger, func(ctx context.Context, cli *clientv3.Client, _ func() error) error {
return f(ctx, cli)
})
}

func getEmbeddedEtcdClientWithServerCloser(ctx context.Context, params *config.EtcdParams, logger *zap.Logger, f func(ctx context.Context, cli *clientv3.Client, closer func() error) error) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down Expand Up @@ -222,7 +229,7 @@ func getEmbeddedEtcdClient(ctx context.Context, params *config.EtcdParams, logge
return err
}

closer := func() error {
closer := sync.OnceValue(func() error {
if err = cli.Close(); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("error closing client: %w", err)
}
Expand All @@ -236,15 +243,15 @@ func getEmbeddedEtcdClient(ctx context.Context, params *config.EtcdParams, logge
}

return nil
}
})

defer func() {
if err = closer(); err != nil {
logger.Error("error stopping embedded etcd", zap.Error(err))
}
}()

return f(ctx, cli)
return f(ctx, cli, closer)
}

func getExternalEtcdClient(ctx context.Context, params *config.EtcdParams, logger *zap.Logger, f func(context.Context, *clientv3.Client) error) error {
Expand Down
26 changes: 17 additions & 9 deletions internal/backend/runtime/omni/state_etcd_election.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package omni

import (
"context"
"errors"
"fmt"
"path"
"time"
Expand Down Expand Up @@ -76,30 +77,37 @@ campaignLoop:
logger.Info("resigned from the etcd election campaign", zap.Error(resignErr))
}()

ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)

panichandler.Go(func() {
observe := election.Observe(ctx)

observeLoop:
for {
select {
case <-sess.Done():
logger.Info("etcd session closed")
logger.Error("etcd session closed")

break observeLoop
cancel(errors.New("etcd session closed"))

return
case <-ctx.Done():
break observeLoop
return
case resp, ok := <-observe:
if !ok {
break observeLoop
logger.Error("etcd observe channel closed")

cancel(errors.New("etcd observe channel closed"))

return
}

if string(resp.Kvs[0].Value) != campaignKey {
logger.Info("detected new leader", zap.ByteString("leader", resp.Kvs[0].Value))
logger.Error("detected new leader", zap.ByteString("leader", resp.Kvs[0].Value))

cancel(errors.New("etcd detected new leader"))

break observeLoop
return
}
}
}
Expand Down
51 changes: 48 additions & 3 deletions internal/backend/runtime/omni/state_etcd_election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap/zaptest"
Expand All @@ -32,22 +33,66 @@ func mockRunner(id int, started chan<- int, closed <-chan error) func(ctx contex
case err := <-closed:
return err
case <-ctx.Done():
return ctx.Err()
return context.Cause(ctx)
}
}
}

func TestEtcdElectionsLost(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

logger := zaptest.NewLogger(t)

require.NoError(t, omni.GetEmbeddedEtcdClientWithServer(ctx, &config.EtcdParams{
Embedded: true,
EmbeddedDBPath: t.TempDir(),
Endpoints: []string{"http://localhost:0"},
}, logger, func(ctx context.Context, client *clientv3.Client, serverCloser func() error) error {
started := make(chan int)
closed := make(chan error)
errCh := make(chan error)
electionKey := uuid.NewString()

// run mock runner, it should win the elections and keep running
go func() {
errCh <- omni.EtcdElections(ctx, client, electionKey, logger, mockRunner(1, started, closed))
}()

select {
case id := <-started:
require.Equal(t, 1, id)
case <-ctx.Done():
t.Fatal("runner didn't start")
}

// abort etcd, that aborts the election campaign
assert.NoError(t, serverCloser())

// at this point the runner should stop
select {
case err := <-errCh:
require.Error(t, err)
require.Contains(t, err.Error(), "etcd session closed")
case <-ctx.Done():
t.Fatal("runner didn't stop")
}

return nil
}))
}

func TestEtcdElections(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

logger := zaptest.NewLogger(t)

require.NoError(t, omni.GetEmbeddedEtcdClient(ctx, &config.EtcdParams{
require.NoError(t, omni.GetEmbeddedEtcdClientWithServer(ctx, &config.EtcdParams{
Embedded: true,
EmbeddedDBPath: t.TempDir(),
Endpoints: []string{"http://localhost:0"},
}, logger, func(ctx context.Context, client *clientv3.Client) error {
}, logger, func(ctx context.Context, client *clientv3.Client, _ func() error) error {
started := make(chan int)
closed := make(chan error)
errCh := make(chan error)
Expand Down