Skip to content

Commit

Permalink
Merge pull request #3 from OpenMPDK/wip-nkv-minio-multipart-fix
Browse files Browse the repository at this point in the history
Pull request for S3 mutipart PUT/GET fix
  • Loading branch information
benixon authored Jan 12, 2023
2 parents 34b3130 + 763c0fb commit c3c0969
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 8 deletions.
21 changes: 17 additions & 4 deletions cmd/kv-storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ func (k *KVStorage) loadVolumes() (*kvVolumes, error) {

value, err := k.kv.Get(kvVolumesKey, *bufp)
if err != nil {
fmt.Println (" ### loadVolumes failed during get = ", err, k.path)
return volumes, nil
}
if err = json.Unmarshal(value, volumes); err != nil {
fmt.Println (" ### loadVolumes failed during unmarshal = ", err, k.path, value)
return nil, err
}
return volumes, nil
Expand All @@ -161,10 +163,12 @@ func (k *KVStorage) SyncVolumes () (err error) {
}

func (k *KVStorage) MakeVol(volume string) (err error) {
//fmt.Println (" ### MakeVol = ", volume, k.path)
k.volumesMu.Lock()
defer k.volumesMu.Unlock()
volumes, err := k.loadVolumes()
if err != nil {
fmt.Println (" ### MakeVol::loadVolumes failed = ", volume, err, k.path)
return err
}

Expand All @@ -177,10 +181,13 @@ func (k *KVStorage) MakeVol(volume string) (err error) {
volumes.VolInfos = append(volumes.VolInfos, VolInfo{volume, time.Now()})
b, err := json.Marshal(volumes)
if err != nil {
fmt.Println (" ### MakeVol failed during marshal = ", volume, err, k.path)
return err
}
//fmt.Println (" ### MakeVol volume data = ", volume, k.path, b)
err = k.kv.Put(kvVolumesKey, b)
if err != nil {
fmt.Println (" ### MakeVol failed during put = ", volume, err, k.path)
return err
}
k.volumes = volumes
Expand Down Expand Up @@ -300,15 +307,15 @@ func (k *KVStorage) getKVNSEntry(nskey string, buffer []byte) (val []byte, entry
continue
}
}
if entry.Key != nskey {
/*if entry.Key != nskey {
fmt.Printf("##### key mismatch, requested: %s, got: %s\n", nskey, entry.Key)
tries--
if tries == 0 {
fmt.Printf("##### key mismatch after 10 retries, requested: %s, got: %s\n", nskey, entry.Key)
os.Exit(0)
}
continue
}
}*/
return nil, entry, nil
}
}
Expand Down Expand Up @@ -981,6 +988,7 @@ func (k *KVStorage) ReadFileStream(volume, filePath string, offset, length int64
meta_op_no_stat = true
}
if (!meta_op_no_stat || !use_custome_reader) {
//fmt.Println("### In ReadFileStream :: ", volume, filePath, length)
bufp := kvValuePoolMeta.Get().(*[]byte)
defer kvValuePoolMeta.Put(bufp)

Expand Down Expand Up @@ -1410,8 +1418,13 @@ func (k *KVStorage) ReadAll(volume string, filePath string) (buf []byte, err err

if is_meta || strings.Contains(nskey, ".minio.sys") {
//bufp = kvValuePoolMeta.Get().(*[]byte)
//defer kvValuePoolMeta.Put(bufp)
length = 8192
//defer kvValuePoolMeta.Put(bufp)
/*if (strings.Contains(nskey, ".minio.sys/multipart/")) {
length = 32768
} else {
length = 8192
}*/
length = int64 (kvMaxMetaSize)
newBuf := make([]byte, length)
buf, err = k.kv.Get(nskey, newBuf)
//buf, err = k.kv.Get(nskey, *bufp)
Expand Down
21 changes: 17 additions & 4 deletions cmd/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,19 @@ func getKVMaxValueSize() int {
return valSize
}

var kvMaxMetaSize = getKVMaxMetaSize()

func getKVMaxMetaSize() int {
str := os.Getenv("MINIO_NKV_MAX_META_SIZE")
if str == "" {
return 8192
}
valSize, err := strconv.Atoi(str)
logger.FatalIf(err, "parsing MINIO_NKV_MAX_VALUE_SIZE")
return valSize
}


var kvChecksum = os.Getenv("MINIO_NKV_CHECKSUM") != ""
var use_custome_reader = os.Getenv("MINIO_NKV_USE_CUSTOM_READER") != ""
var track_minio_stats = os.Getenv("MINIO_ENABLE_STATS") != ""
Expand Down Expand Up @@ -474,7 +487,7 @@ var kvValuePoolNoEC = sync.Pool{

var kvValuePoolMeta = sync.Pool{
New: func() interface{} {
b := make([]byte, 8192)
b := make([]byte, kvMaxMetaSize)
return &b
},
}
Expand Down Expand Up @@ -699,7 +712,7 @@ func (k *KV) Put(keyStr string, value []byte) error {
if k.sync {
cstatus := C.minio_nkv_put(&k.handle, unsafe.Pointer(&key[0]), C.int(len(key)), valuePtr, C.int(len(value)))
status = int(cstatus)
//fmt.Println("##### Put happened, ", k.path, keyStr, len(value))
//fmt.Println("##### Put happened, ", k.path, keyStr, len(value), status)
} else {
ch := make(chan asyncKVLoopResponse, 1)
var response asyncKVLoopResponse
Expand Down Expand Up @@ -789,7 +802,7 @@ func (k *KV) Get(keyStr string, value []byte) ([]byte, error) {
cstatus := C.minio_nkv_get(&k.handle, unsafe.Pointer(&key[0]), C.int(len(key)), unsafe.Pointer(&value[0]), C.int(len(value)), &actualLengthCint)
status = int(cstatus)
actualLength = int(actualLengthCint)
//fmt.Println("##### GET returned, key, length = ", keyStr, len(value), actualLength, k.path)
//fmt.Println("##### GET returned, key, length = ", keyStr, len(value), actualLength, k.path, status)
} else {
ch := make(chan asyncKVLoopResponse, 1)
var response asyncKVLoopResponse
Expand Down Expand Up @@ -882,7 +895,7 @@ func (k *KV) Delete(keyStr string) error {
if k.sync {
cstatus := C.minio_nkv_delete(&k.handle, unsafe.Pointer(&key[0]), C.int(len(key)))
status = int(cstatus)
//fmt.Println("##### Del happened, ", k.path, keyStr)
//fmt.Println("##### Del happened, ", k.path, keyStr, status)
} else {
ch := make(chan asyncKVLoopResponse, 1)
var response asyncKVLoopResponse
Expand Down

0 comments on commit c3c0969

Please sign in to comment.