From 44904e1e5efbd7076d3038874db5a4d3fc1f3abf Mon Sep 17 00:00:00 2001 From: Dmitriy Date: Mon, 24 Sep 2018 13:11:10 +0300 Subject: [PATCH] New mtn state. (#156) * [mtn] switch mtn state to boltdb --- .travis.yml | 1 + Godeps/Godeps.json | 5 + isolate/isolate.go | 1 + isolate/mtn.go | 383 +++++++++++++++++++++++++++++----------- isolate/porto/box.go | 5 +- vendor/go.etcd.io/bbolt | 1 + 6 files changed, 295 insertions(+), 101 deletions(-) create mode 160000 vendor/go.etcd.io/bbolt diff --git a/.travis.yml b/.travis.yml index 3f6f592..baa2a0f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,6 +15,7 @@ before_install: install: - go env + - go get go.etcd.io/bbolt/... script: - make test diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 5a73621..c8f5181 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -277,6 +277,11 @@ "ImportPath": "github.com/pkg/errors", "Comment": "v0.8.0-2-g248dadf", "Rev": "248dadf4e9068a0b3e79f02ed0a610d935de5302" + }, + { + "ImportPath":"go.etcd.io/bbolt", + "Comment":"v1.3.1-etcd.8", + "Rev":"7ee3ded59d4835e10f3e7d0f7603c42aa5e83820" } ] } diff --git a/isolate/isolate.go b/isolate/isolate.go index c7b7c7e..c7e3275 100644 --- a/isolate/isolate.go +++ b/isolate/isolate.go @@ -71,6 +71,7 @@ type ( Url string `json:"url,omitempty"` Label string `json:"label,omitempty"` Ident string `json:"ident,omitempty"` + DbPath string `json:"dbpath,omitempty"` Headers map[string]string `json:"headers,omitempty"` } `json:"mtn,omitempty"` } diff --git a/isolate/mtn.go b/isolate/mtn.go index 0eb1370..a38525c 100644 --- a/isolate/mtn.go +++ b/isolate/mtn.go @@ -5,14 +5,15 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "os" "time" - "sync" - + bolt "go.etcd.io/bbolt" "github.com/noxiouz/stout/pkg/log" ) + type RawAlloc struct { Id string `json:"id"` Porto RawPorto `json:"porto"` @@ -27,21 +28,19 @@ type RawPorto struct { } type AllocError struct { - Type string `json:"type,omitempty"` - Message []string `json:"message"` - Cause ErrorCause `json:"cause"` + Type string `json:"type,omitempty"` + Message []string `json:"message"` + Cause ErrorCause `json:"cause"` } type ErrorCause struct { - Type string `json:"type,omitempty"` - Message []string `json:"message"` - Cause string `json:"cause,omitempty"` + Type string `json:"type,omitempty"` + Message []string `json:"message"` + Cause string `json:"cause,omitempty"` } type RawAllocs []RawAlloc -type AllocAnswer string - type MtnCfg struct { Enable bool Allocbuffer int @@ -49,20 +48,12 @@ type MtnCfg struct { Ident string SchedLabel string Headers map[string]string + DbPath string } type MtnState struct { - sync.Mutex Cfg MtnCfg - Pool MtnPool -} - -type MtnPool map[string]*IdState - -type IdState struct { - sync.Mutex - Reserved int - Allocations map[string]Allocation + Db *bolt.DB } type Allocation struct { @@ -79,13 +70,35 @@ type PostAllocreq struct { Scheduler string `json:"scheduler"` } +func SaveRename(src, dst string) error { + ioSrcDb, errSrcOpen := os.Open(src) + if errSrcOpen != nil { + return errSrcOpen + } + defer ioSrcDb.Close() + ioDstDb, errDstCreate := os.Create(dst) + if errDstCreate != nil { + return errDstCreate + } + defer ioDstDb.Close() + _, errCopy := io.Copy(ioSrcDb, ioDstDb) + if errCopy != nil { + return errCopy + } + errRemove := os.Remove(src) + if errRemove != nil { + return errRemove + } + return nil +} + func (c *MtnState) CfgInit(ctx context.Context, cfg *Config) bool { c.Cfg.Enable = cfg.Mtn.Enable if !c.Cfg.Enable { return true } if cfg.Mtn.Allocbuffer < 1 { - c.Cfg.Allocbuffer = 3 + c.Cfg.Allocbuffer = 4 } else { c.Cfg.Allocbuffer = cfg.Mtn.Allocbuffer } @@ -106,6 +119,61 @@ func (c *MtnState) CfgInit(ctx context.Context, cfg *Config) bool { } c.Cfg.Url = cfg.Mtn.Url c.Cfg.Headers = cfg.Mtn.Headers + + if len(cfg.Mtn.DbPath) > 1 { + c.Cfg.DbPath = cfg.Mtn.DbPath + } else { + c.Cfg.DbPath = "/run/isolate.mtn.db" + } + corruptedBackupPath := "/var/tmp/isolate.mtn.db.corrupted" + db, err := bolt.Open(c.Cfg.DbPath, 0666, &bolt.Options{Timeout: 10 * time.Second}) + if err != nil { + log.G(ctx).Errorf("Cant open db inside CfgInit() by calling bolt.Open(), returned: %s", err) + if s, err := os.Stat(c.Cfg.DbPath); os.IsNotExist(err) { + log.G(ctx).Errorf("DB file not exist and we cant create new. Err: %s", err) + return false + } else if err == nil { + fSize := s.Size() + if fSize > 0 { + if _, err := os.Stat(corruptedBackupPath); err == nil { + log.G(ctx).Errorf("Corrupted DB backup file exist, nothing to do there.") + return false + } + log.G(ctx).Errorf("DB file exist, size %d and cant be opened. Try to recreate.", fSize) + errMove := SaveRename(c.Cfg.DbPath, corruptedBackupPath) + if errMove != nil { + log.G(ctx).Errorf("Cant move corrupted db file, err: %s", errMove) + return false + } + } else { + log.G(ctx).Errorf("DB file exist, size %d and cant be opened. Try to delete old.", fSize) + err := os.Remove(c.Cfg.DbPath) + if err != nil { + log.G(ctx).Errorf("Cant delete old db file, err: %s", err) + return false + } + } + db, err = bolt.Open(c.Cfg.DbPath, 0666, &bolt.Options{Timeout: 10 * time.Second}) + if err != nil { + log.G(ctx).Errorf("Second try open db is failed, err: %s", err) + return false + } + } + errDb := db.Update(func(tx *bolt.Tx) error { + errChan := tx.Check() + select { + case errCheck := <-errChan: + return errCheck + default: + return nil + } + }) + if errDb != nil { + log.G(ctx).Errorf("DB fail consistency checks, err: %s", errDb) + return false + } + } + c.Db = db return true } @@ -113,27 +181,46 @@ func (c *MtnState) PoolInit(ctx context.Context) bool { if !c.Cfg.Enable { return true } - c.Pool = make(map[string]*IdState) allAllocs, err := c.GetAllocations(ctx) if err != nil { log.G(ctx).Errorf("Cant init pool inside PoolInit(), err: %s", err) return false } + + tx, err := c.Db.Begin(true) + if err != nil { + log.G(ctx).Errorf("Cant start transaction inside PoolInit(), err: %s", err) + return false + } + defer tx.Rollback() + for netId, allocs := range allAllocs { - cState := IdState{ - Reserved: 0, - Allocations: make(map[string]Allocation), - } for _, alloc := range allocs { - cState.Allocations[alloc.Id] = Allocation{alloc.Net, alloc.Hostname, alloc.Ip, alloc.Id, false} + b, err := tx.CreateBucketIfNotExists([]byte(netId)) + if err != nil { + log.G(ctx).Errorf("Cant continue transaction inside PoolInit(), err: %s", err) + return false + } + if b.Get([]byte(alloc.Id)) == nil { + if buf, err := json.Marshal(Allocation{alloc.Net, alloc.Hostname, alloc.Ip, alloc.Id, false}); err != nil { + log.G(ctx).Errorf("Cant continue transaction inside PoolInit(), err: %s", err) + return false + } else if err := b.Put([]byte(alloc.Id), buf); err != nil { + log.G(ctx).Errorf("Cant continue transaction inside PoolInit(), err: %s", err) + return false + } + } } - c.Pool[netId] = &cState + } + if err := tx.Commit(); err != nil { + log.G(ctx).Errorf("Cant commit transaction inside PoolInit(), err: %s", err) + return false } return true } func (c *MtnState) GetAllocations(logCtx context.Context) (map[string][]Allocation, error) { - ctx, cancel := context.WithTimeout(context.Background(), 20 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30 * time.Second) defer cancel() req, nrErr := http.NewRequest("GET", c.Cfg.Url + "?scheduler=" + c.Cfg.SchedLabel, nil) if nrErr != nil { @@ -182,7 +269,7 @@ func (c *MtnState) GetAllocations(logCtx context.Context) (map[string][]Allocati func (c *MtnState) RequestAllocs(ctx context.Context, netid string) (map[string]Allocation, error) { r := make(map[string]Allocation) - ctx, cancel := context.WithTimeout(context.Background(), 20 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30 * time.Second) defer cancel() jsonBody := PostAllocreq{netid, c.Cfg.Ident, c.Cfg.SchedLabel} txtBody, mrshErr := json.Marshal(jsonBody) @@ -204,7 +291,7 @@ func (c *MtnState) RequestAllocs(ctx context.Context, netid string) (map[string] return nil, doErr } jresp := RawAlloc{} - decoder := json.NewDecoder(rh.Body) + decoder := json.NewDecoder(rh.Body) rErr := decoder.Decode(&jresp) rh.Body.Close() if rErr != nil { @@ -216,91 +303,189 @@ func (c *MtnState) RequestAllocs(ctx context.Context, netid string) (map[string] return r, nil } +func (c *MtnState) DbAllocIsFree(ctx context.Context, value []byte) bool { + var a Allocation + if err := json.Unmarshal(value, &a); err != nil { + log.G(ctx).Errorf("DbAllocIsFree() failed on json.Unmarshal() with error: %s.", err) + return false + } + if a.Used { + return false + } + return true +} + +func (c *MtnState) GetDbAlloc(ctx context.Context, tx *bolt.Tx, netId string) (Allocation, error) { + b := tx.Bucket([]byte(netId)) + if b == nil { + return Allocation{}, fmt.Errorf("BUG inside GetDbAlloc()! Backet %s not exist!", netId) + } + var a Allocation + cr := b.Cursor() + for k, v := cr.First(); k != nil; k, v = cr.Next() { + if c.DbAllocIsFree(ctx, v) { + if err := json.Unmarshal(v, &a); err != nil { + return a, err + } + a.Used = true + id := a.Id + value, errMrsh := json.Marshal(a) + if errMrsh != nil { + return a, errMrsh + } + errPut := b.Put([]byte(id), value) + if errPut != nil { + return a, errPut + } + return a, nil + } + } + fcounter, errCnt := c.CountFreeAllocs(ctx, tx, netId) + log.G(ctx).Errorf("Normaly we must never be in GetDbAlloc() at that point. But ok, lets try fix situation. Free count for that netId %s is %d (possible counter error: %s).", netId, fcounter, errCnt) + allocs, errAllocs := c.RequestAllocs(ctx, netId) + if errAllocs != nil { + log.G(ctx).Errorf("Last hope in GetDbAlloc() failed.") + return a, errAllocs + } + gotcha := false + b, errCrBk := tx.CreateBucketIfNotExists([]byte(netId)) + if errCrBk != nil { + return a, errCrBk + } + for id, alloc := range allocs { + if !gotcha { + alloc.Used = true + a = alloc + gotcha = true + } + value, errMrsh := json.Marshal(alloc) + if errMrsh != nil { + return a, errMrsh + } + errPut := b.Put([]byte(id), value) + if errPut != nil { + return a, errPut + } + } + if gotcha { + return a, nil + } + return a, fmt.Errorf("BUG inside GetDbAlloc()... or somewhere! Cant get allocation from DB and cant request more. Clean allocaion: %s.", a) +} + +func (c *MtnState) FreeDbAlloc(ctx context.Context, netId string, id string) error { + tx, errTx := c.Db.Begin(true) + if errTx != nil { + log.G(ctx).Errorf("Cant start transaction inside FreeDbAlloc(), err: %s", errTx) + return errTx + } + defer tx.Rollback() + b := tx.Bucket([]byte(netId)) + if b == nil { + return fmt.Errorf("BUG inside FreeDbAlloc()! Bucket %s not exist!", netId) + } + v := b.Get([]byte(id)) + var a Allocation + if err := json.Unmarshal(v, &a); err != nil { + return err + } + a.Used = false + value, errMrsh := json.Marshal(a) + if errMrsh != nil { + return errMrsh + } + errPut := b.Put([]byte(id), value) + if errPut != nil { + return errPut + } + if errCommit := tx.Commit(); errCommit != nil { + return errCommit + } + return nil +} + +func (c *MtnState) CountFreeAllocs(ctx context.Context, tx *bolt.Tx, netId string) (int, error) { + b, errBk := tx.CreateBucketIfNotExists([]byte(netId)) + if errBk == nil { + return 0, errBk + } + counter := 0 + e := b.ForEach(func(_, v []byte) error { + if c.DbAllocIsFree(ctx, v) { + counter+=1 + } + return nil + }) + log.G(ctx).Debugf("CountFreeAllocs() ended for netId %s with count: %d.", netId, counter) + return counter, e +} + func (c *MtnState) BindAllocs(ctx context.Context, netId string) error { if len(netId) == 0 { return fmt.Errorf("Len(netId) is zero.") } - c.Lock() log.G(ctx).Debugf("BindAllocs() called with netId %s.", netId) - defer c.Unlock() - if _, p := c.Pool[netId]; p { - c.Pool[netId].Lock() - defer c.Pool[netId].Unlock() - if (len(c.Pool[netId].Allocations) - c.Pool[netId].Reserved) > c.Cfg.Allocbuffer { - c.Pool[netId].Reserved += c.Cfg.Allocbuffer - log.G(ctx).Debugf("BindAllocs() ended with c.Pool[netId]: %s.", c.Pool[netId]) - return nil - } else { - allocs, reqErr := c.RequestAllocs(ctx, netId) - if reqErr != nil { - return reqErr + + tx, errTx := c.Db.Begin(true) + if errTx != nil { + log.G(ctx).Errorf("Cant start transaction inside BindAllocs(), err: %s", errTx) + return errTx + } + defer tx.Rollback() + fCount, errCnt := c.CountFreeAllocs(ctx, tx, netId) + if errCnt != nil { + log.G(ctx).Errorf("Cant continue transaction inside BindAllocs(), err: %s", errCnt) + return errCnt + } + if c.Cfg.Allocbuffer > fCount { + allocs, err := c.RequestAllocs(ctx, netId) + if err != nil { + return err + } + b, errBk := tx.CreateBucketIfNotExists([]byte(netId)) + if errBk != nil { + return errBk + } + for id, alloc := range allocs { + value, errMrsh := json.Marshal(alloc) + if errMrsh != nil { + return errMrsh } - c.Pool[netId].Reserved += c.Cfg.Allocbuffer - for id, alloc := range allocs { - c.Pool[netId].Allocations[id] = alloc + errPut := b.Put([]byte(id), value) + if errPut != nil { + return errPut } - log.G(ctx).Debugf("BindAllocs() ended with c.Pool[netId]: %s.", c.Pool[netId]) - return nil } - } else { - allocs, reqErr := c.RequestAllocs(ctx, netId) - if reqErr != nil { - return reqErr - } - newPool := IdState{*new(sync.Mutex), c.Cfg.Allocbuffer, allocs} - c.Pool[netId] = &newPool - log.G(ctx).Debugf("BindAllocs() ended with c.Pool[netId]: %s.", c.Pool[netId]) - return nil } + if err := tx.Commit(); err != nil { + return err + } + return nil } func (c *MtnState) UseAlloc(ctx context.Context, netId string) (Allocation, error) { - c.Lock() - newPool := c.Pool[netId] - log.G(ctx).Debugf("UseAlloc() called with netId: %s; and c.Pool[netId] is: %s", netId, c.Pool[netId]) - newAllocs := make(map[string]Allocation) - found := false - var rId string - for id, alloc := range newPool.Allocations { - if found { - newAllocs[id] = alloc - continue - } - if !newPool.Allocations[id].Used { - alloc.Used = true - newAllocs[id] = alloc - rId = id - found = true - } + tx, errTx := c.Db.Begin(true) + if errTx != nil { + log.G(ctx).Errorf("Cant start transaction inside UseAlloc(), err: %s", errTx) + return Allocation{}, errTx + } + defer tx.Rollback() + a, e := c.GetDbAlloc(ctx, tx, netId) + log.G(ctx).Debugf("UseAlloc(): a, e: %s, %s.", a, e) + if e != nil { + return Allocation{}, e } - if found { - newPool.Allocations = newAllocs - c.Pool[netId] = newPool - log.G(ctx).Debugf("UseAlloc() successfull ended for netId: %s with c.Pool[netId]: %s", netId, c.Pool[netId]) - c.Unlock() - return c.Pool[netId].Allocations[rId], nil + if err := tx.Commit(); err != nil { + return a, err } - c.Unlock() - return Allocation{}, fmt.Errorf("BUG! Cant find free alloc in %s netid! newAllocs map is: %s", netId, newAllocs) + return a, nil } func (c *MtnState) UnuseAlloc(ctx context.Context, netId string, id string) { - c.Lock() - newPool := c.Pool[netId] - log.G(ctx).Debugf("UnuseAlloc() called with netId %s and id %s and c.Pool[netId]: %s.", netId, id, c.Pool[netId]) - newAllocs := make(map[string]Allocation) - for cId, alloc := range newPool.Allocations { - if cId != id { - newAllocs[cId] = alloc - continue - } - alloc.Used = false - newAllocs[id] = alloc + err := c.FreeDbAlloc(ctx, netId, id) + if err != nil { + log.G(ctx).Errorf("BUG inside FreeDbAlloc()! error returned: %s.", err) } - newPool.Allocations = newAllocs - c.Pool[netId] = newPool - log.G(ctx).Debugf("UnuseAlloc() ended with netId %s and id %s and c.Pool[netId]: %s.", netId, id, c.Pool[netId]) - c.Unlock() + log.G(ctx).Debugf("UnuseAlloc() successfuly for: %s %s.", netId, id) } - diff --git a/isolate/porto/box.go b/isolate/porto/box.go index 709e388..60f7c42 100644 --- a/isolate/porto/box.go +++ b/isolate/porto/box.go @@ -506,9 +506,9 @@ func (b *Box) Spool(ctx context.Context, name string, opts isolate.RawProfile) ( if b.GlobalState.Mtn.Cfg.Enable && profile.Network["mtn"] == "enable" { err := b.GlobalState.Mtn.BindAllocs(ctx, profile.Network["netid"]) if err != nil { - return fmt.Errorf("Cant bind mtn alllocaton at spool with profile: %s, and with state: %s, and error: %s", profile, b.GlobalState.Mtn, err) + return fmt.Errorf("Cant bind mtn alllocaton at spool with state: %s, and error: %s", b.GlobalState.Mtn, err) } - log.G(ctx).Debugf("Successfully call b.GlobalState.Mtn.BindAllocs() at spool %s with project id %s. GlobalState.Mtn is: %s", name, profile.Network["netid"], b.GlobalState.Mtn.Pool) + log.G(ctx).Debugf("Successfully call b.GlobalState.Mtn.BindAllocs() at spool %s with project id %s.", name, profile.Network["netid"]) } return nil } @@ -618,6 +618,7 @@ func (b *Box) Inspect(ctx context.Context, workeruuid string) ([]byte, error) { // Close releases all resources such as idle connections from http.Transport func (b *Box) Close() error { b.transport.CloseIdleConnections() + b.GlobalState.Mtn.Db.Close() b.onClose() return nil } diff --git a/vendor/go.etcd.io/bbolt b/vendor/go.etcd.io/bbolt new file mode 160000 index 0000000..7ee3ded --- /dev/null +++ b/vendor/go.etcd.io/bbolt @@ -0,0 +1 @@ +Subproject commit 7ee3ded59d4835e10f3e7d0f7603c42aa5e83820