Skip to content

Commit

Permalink
Merge pull request moby#47302 from akerouanton/libnet-ds-PersistConne…
Browse files Browse the repository at this point in the history
…ction

libnet: boltdb: remove PersistConnection
  • Loading branch information
thaJeztah authored Feb 2, 2024
2 parents 7c4828f + 83af50a commit 701dd98
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 98 deletions.
1 change: 0 additions & 1 deletion libnetwork/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func DefaultScope(dataDir string) ScopeCfg {
Config: &store.Config{
Bucket: "libnetwork",
ConnectionTimeout: time.Minute,
PersistConnection: true,
},
},
}
Expand Down
96 changes: 15 additions & 81 deletions libnetwork/internal/kvstore/boltdb/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ type BoltDB struct {
dbIndex uint64
path string
timeout time.Duration
// By default libkv opens and closes the bolt DB connection for every
// get/put operation. This allows multiple apps to use a Bolt DB at the
// same time.
// PersistConnection flag provides an option to override ths behavior.
// ie: open the connection in New and use it till Close is called.
PersistConnection bool
}

const (
Expand All @@ -53,15 +47,11 @@ func New(endpoint string, options *store.Config) (store.Store, error) {
return nil, err
}

var db *bolt.DB
if options.PersistConnection {
var err error
db, err = bolt.Open(endpoint, filePerm, &bolt.Options{
Timeout: options.ConnectionTimeout,
})
if err != nil {
return nil, err
}
db, err := bolt.Open(endpoint, filePerm, &bolt.Options{
Timeout: options.ConnectionTimeout,
})
if err != nil {
return nil, err
}

timeout := transientTimeout
Expand All @@ -70,50 +60,21 @@ func New(endpoint string, options *store.Config) (store.Store, error) {
}

b := &BoltDB{
client: db,
path: endpoint,
boltBucket: []byte(options.Bucket),
timeout: timeout,
PersistConnection: options.PersistConnection,
client: db,
path: endpoint,
boltBucket: []byte(options.Bucket),
timeout: timeout,
}

return b, nil
}

func (b *BoltDB) reset() {
b.path = ""
b.boltBucket = []byte{}
}

func (b *BoltDB) getDBhandle() (*bolt.DB, error) {
if !b.PersistConnection {
db, err := bolt.Open(b.path, filePerm, &bolt.Options{Timeout: b.timeout})
if err != nil {
return nil, err
}
b.client = db
}
return b.client, nil
}

func (b *BoltDB) releaseDBhandle() {
if !b.PersistConnection {
b.client.Close()
}
}

// Put the key, value pair. index number metadata is prepended to the value
func (b *BoltDB) Put(key string, value []byte) error {
b.mu.Lock()
defer b.mu.Unlock()

db, err := b.getDBhandle()
if err != nil {
return err
}
defer b.releaseDBhandle()

return db.Update(func(tx *bolt.Tx) error {
return b.client.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(b.boltBucket)
if err != nil {
return err
Expand All @@ -133,14 +94,8 @@ func (b *BoltDB) Exists(key string) (bool, error) {
b.mu.Lock()
defer b.mu.Unlock()

db, err := b.getDBhandle()
if err != nil {
return false, err
}
defer b.releaseDBhandle()

var exists bool
err = db.View(func(tx *bolt.Tx) error {
err := b.client.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
Expand All @@ -163,14 +118,8 @@ func (b *BoltDB) List(keyPrefix string) ([]*store.KVPair, error) {
b.mu.Lock()
defer b.mu.Unlock()

db, err := b.getDBhandle()
if err != nil {
return nil, err
}
defer b.releaseDBhandle()

var kv []*store.KVPair
err = db.View(func(tx *bolt.Tx) error {
err := b.client.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
Expand Down Expand Up @@ -212,13 +161,8 @@ func (b *BoltDB) AtomicDelete(key string, previous *store.KVPair) error {
if previous == nil {
return store.ErrPreviousNotSpecified
}
db, err := b.getDBhandle()
if err != nil {
return err
}
defer b.releaseDBhandle()

return db.Update(func(tx *bolt.Tx) error {
return b.client.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
return store.ErrKeyNotFound
Expand All @@ -242,15 +186,9 @@ func (b *BoltDB) AtomicPut(key string, value []byte, previous *store.KVPair) (*s
b.mu.Lock()
defer b.mu.Unlock()

db, err := b.getDBhandle()
if err != nil {
return nil, err
}
defer b.releaseDBhandle()

var dbIndex uint64
dbval := make([]byte, libkvmetadatalen)
err = db.Update(func(tx *bolt.Tx) error {
err := b.client.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(b.boltBucket)
if bucket == nil {
if previous != nil {
Expand Down Expand Up @@ -293,9 +231,5 @@ func (b *BoltDB) Close() {
b.mu.Lock()
defer b.mu.Unlock()

if !b.PersistConnection {
b.reset()
} else {
b.client.Close()
}
b.client.Close()
}
1 change: 0 additions & 1 deletion libnetwork/internal/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ var (
type Config struct {
ConnectionTimeout time.Duration
Bucket string
PersistConnection bool
}

// Store represents the backend K/V storage
Expand Down
15 changes: 0 additions & 15 deletions libnetwork/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,3 @@ func OptionBoltdbWithRandomDBFile(t *testing.T) config.Option {
c.Scope.Client.Config = &store.Config{Bucket: "testBackend"}
}
}

func TestMultipleControllersWithSameStore(t *testing.T) {
cfgOptions := OptionBoltdbWithRandomDBFile(t)
ctrl1, err := New(cfgOptions)
if err != nil {
t.Fatalf("Error new controller: %v", err)
}
defer ctrl1.Stop()
// Use the same boltdb file without closing the previous controller
ctrl2, err := New(cfgOptions)
if err != nil {
t.Fatalf("Local store must support concurrent controllers")
}
ctrl2.Stop()
}

0 comments on commit 701dd98

Please sign in to comment.