diff --git a/cmd/kv-storage.go b/cmd/kv-storage.go index d0917824..ca390a93 100644 --- a/cmd/kv-storage.go +++ b/cmd/kv-storage.go @@ -142,9 +142,11 @@ func (k *KVStorage) loadVolumes() (*kvVolumes, error) { value, err := k.kv.Get(kvVolumesKey, *bufp) if err != nil { + fmt.Println (" ### loadVolumes failed during get = ", err, k.path) return volumes, nil } if err = json.Unmarshal(value, volumes); err != nil { + fmt.Println (" ### loadVolumes failed during unmarshal = ", err, k.path, value) return nil, err } return volumes, nil @@ -161,10 +163,12 @@ func (k *KVStorage) SyncVolumes () (err error) { } func (k *KVStorage) MakeVol(volume string) (err error) { + //fmt.Println (" ### MakeVol = ", volume, k.path) k.volumesMu.Lock() defer k.volumesMu.Unlock() volumes, err := k.loadVolumes() if err != nil { + fmt.Println (" ### MakeVol::loadVolumes failed = ", volume, err, k.path) return err } @@ -177,10 +181,13 @@ func (k *KVStorage) MakeVol(volume string) (err error) { volumes.VolInfos = append(volumes.VolInfos, VolInfo{volume, time.Now()}) b, err := json.Marshal(volumes) if err != nil { + fmt.Println (" ### MakeVol failed during marshal = ", volume, err, k.path) return err } + //fmt.Println (" ### MakeVol volume data = ", volume, k.path, b) err = k.kv.Put(kvVolumesKey, b) if err != nil { + fmt.Println (" ### MakeVol failed during put = ", volume, err, k.path) return err } k.volumes = volumes @@ -300,7 +307,7 @@ func (k *KVStorage) getKVNSEntry(nskey string, buffer []byte) (val []byte, entry continue } } - if entry.Key != nskey { + /*if entry.Key != nskey { fmt.Printf("##### key mismatch, requested: %s, got: %s\n", nskey, entry.Key) tries-- if tries == 0 { @@ -308,7 +315,7 @@ func (k *KVStorage) getKVNSEntry(nskey string, buffer []byte) (val []byte, entry os.Exit(0) } continue - } + }*/ return nil, entry, nil } } @@ -981,6 +988,7 @@ func (k *KVStorage) ReadFileStream(volume, filePath string, offset, length int64 meta_op_no_stat = true } if (!meta_op_no_stat || !use_custome_reader) { + //fmt.Println("### In ReadFileStream :: ", volume, filePath, length) bufp := kvValuePoolMeta.Get().(*[]byte) defer kvValuePoolMeta.Put(bufp) @@ -1410,8 +1418,13 @@ func (k *KVStorage) ReadAll(volume string, filePath string) (buf []byte, err err if is_meta || strings.Contains(nskey, ".minio.sys") { //bufp = kvValuePoolMeta.Get().(*[]byte) - //defer kvValuePoolMeta.Put(bufp) - length = 8192 + //defer kvValuePoolMeta.Put(bufp) + /*if (strings.Contains(nskey, ".minio.sys/multipart/")) { + length = 32768 + } else { + length = 8192 + }*/ + length = int64 (kvMaxMetaSize) newBuf := make([]byte, length) buf, err = k.kv.Get(nskey, newBuf) //buf, err = k.kv.Get(nskey, *bufp) diff --git a/cmd/kv.go b/cmd/kv.go index c1ac1a6b..1cf16bce 100644 --- a/cmd/kv.go +++ b/cmd/kv.go @@ -327,6 +327,19 @@ func getKVMaxValueSize() int { return valSize } +var kvMaxMetaSize = getKVMaxMetaSize() + +func getKVMaxMetaSize() int { + str := os.Getenv("MINIO_NKV_MAX_META_SIZE") + if str == "" { + return 8192 + } + valSize, err := strconv.Atoi(str) + logger.FatalIf(err, "parsing MINIO_NKV_MAX_VALUE_SIZE") + return valSize +} + + var kvChecksum = os.Getenv("MINIO_NKV_CHECKSUM") != "" var use_custome_reader = os.Getenv("MINIO_NKV_USE_CUSTOM_READER") != "" var track_minio_stats = os.Getenv("MINIO_ENABLE_STATS") != "" @@ -474,7 +487,7 @@ var kvValuePoolNoEC = sync.Pool{ var kvValuePoolMeta = sync.Pool{ New: func() interface{} { - b := make([]byte, 8192) + b := make([]byte, kvMaxMetaSize) return &b }, } @@ -699,7 +712,7 @@ func (k *KV) Put(keyStr string, value []byte) error { if k.sync { cstatus := C.minio_nkv_put(&k.handle, unsafe.Pointer(&key[0]), C.int(len(key)), valuePtr, C.int(len(value))) status = int(cstatus) - //fmt.Println("##### Put happened, ", k.path, keyStr, len(value)) + //fmt.Println("##### Put happened, ", k.path, keyStr, len(value), status) } else { ch := make(chan asyncKVLoopResponse, 1) var response asyncKVLoopResponse @@ -789,7 +802,7 @@ func (k *KV) Get(keyStr string, value []byte) ([]byte, error) { cstatus := C.minio_nkv_get(&k.handle, unsafe.Pointer(&key[0]), C.int(len(key)), unsafe.Pointer(&value[0]), C.int(len(value)), &actualLengthCint) status = int(cstatus) actualLength = int(actualLengthCint) - //fmt.Println("##### GET returned, key, length = ", keyStr, len(value), actualLength, k.path) + //fmt.Println("##### GET returned, key, length = ", keyStr, len(value), actualLength, k.path, status) } else { ch := make(chan asyncKVLoopResponse, 1) var response asyncKVLoopResponse @@ -882,7 +895,7 @@ func (k *KV) Delete(keyStr string) error { if k.sync { cstatus := C.minio_nkv_delete(&k.handle, unsafe.Pointer(&key[0]), C.int(len(key))) status = int(cstatus) - //fmt.Println("##### Del happened, ", k.path, keyStr) + //fmt.Println("##### Del happened, ", k.path, keyStr, status) } else { ch := make(chan asyncKVLoopResponse, 1) var response asyncKVLoopResponse