Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
roseduan committed Sep 9, 2023
1 parent 2d051ec commit fb42623
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 38 deletions.
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# diskhash
on-disk hash table index(mainly for WAL).

## When you will need it?
if you are using [WAL](https://github.com/rosedblabs/wal) to store your data,
## When will you need it?
If you are using [WAL](https://github.com/rosedblabs/wal) to store your data,

> wal: https://github.com/rosedblabs/wal
you will get the positions to get the data from WAL, the common way to store the positions is to use an in-memory index, but if you have a large amount of data, the index will be very large, and it will take a lot of time to load the index into memory when you restart the program.
you will get the positions to get the data from WAL, the common way to store the positions is to use an in-memory index(like rosedb).

so, you can use diskhash to store the index on disk.
But if you have a large amount of data, and it will take a lot of time to load the index into memory when you restart the system.

So, you can use diskhash to store the index on disk.

## Can be used as a general hash table index(without wal)?

Expand Down Expand Up @@ -74,9 +76,10 @@ func main() {

// put a key-value pair into the table.
// the MatchKey function will be called when the key is matched.
// Why we need the MatchKey function?
// because the key may be hashed to the same slot with another key(even though the probability is very low),
// so we need to check if the key is matched.
// When we store the data in the hash table, we only store the hash value of the key, and the raw value.
// So when we get the data from hash table, even if the hash value of the key matches, that doesn't mean
// the key matches because of hash collision.
// So we need to provide a function to determine whether the key of the slot matches the stored key.
err = table.Put([]byte("key1"), []byte(strings.Repeat("v", 10)), func(slot diskhash.Slot) (bool, error) {
return true, nil
})
Expand Down
43 changes: 25 additions & 18 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ import (
"github.com/rosedblabs/diskhash/fs"
)

const slotsPerBucket = 31

// bucket is the basic unit of a file in diskhash.
// each file contains 31 slots at most.
type bucket struct {
slots [slotsPerBucket]Slot
offset int64
nextOffset int64
file fs.File
slots [slotsPerBucket]Slot // 31 slots now
offset int64 // the offset of the bucket in the file
nextOffset int64 // the offset of the next overflow bucket
file fs.File // the file that contains the bucket
bucketSize uint32
}

// bucketIterator is used to iterate all buckets in hash table.
type bucketIterator struct {
currentFile fs.File
overflowFile fs.File
Expand All @@ -26,9 +27,11 @@ type bucketIterator struct {
bucketSize uint32
}

// Slot is the basic unit of a bucket.
// each slot contains a key hash and a value.
type Slot struct {
Hash uint32
Value []byte
Hash uint32 // the hash of the key
Value []byte // raw value
}

type slotWriter struct {
Expand Down Expand Up @@ -71,6 +74,7 @@ func (bi *bucketIterator) next() (*bucket, error) {
return bucket, nil
}

// readBucket reads a bucket from the current file.
func (bi *bucketIterator) readBucket() (*bucket, error) {
// read an entire bucket with all slots
bucketBuf := make([]byte, bi.bucketSize)
Expand All @@ -81,22 +85,22 @@ func (bi *bucketIterator) readBucket() (*bucket, error) {
b := &bucket{file: bi.currentFile, offset: bi.offset, bucketSize: bi.bucketSize}
// parse and get slots in the bucket
for i := 0; i < slotsPerBucket; i++ {
_ = bucketBuf[4+bi.slotValueLen]
b.slots[i].Hash = binary.LittleEndian.Uint32(bucketBuf[:4])
_ = bucketBuf[hashLen+bi.slotValueLen]
b.slots[i].Hash = binary.LittleEndian.Uint32(bucketBuf[:hashLen])
if b.slots[i].Hash != 0 {
b.slots[i].Value = bucketBuf[4 : 4+bi.slotValueLen]
b.slots[i].Value = bucketBuf[hashLen : hashLen+bi.slotValueLen]
}
bucketBuf = bucketBuf[4+bi.slotValueLen:]
bucketBuf = bucketBuf[hashLen+bi.slotValueLen:]
}

// the last 8 bytes is the offset of next overflow bucket
b.nextOffset = int64(binary.LittleEndian.Uint64(bucketBuf[:8]))
b.nextOffset = int64(binary.LittleEndian.Uint64(bucketBuf[:nextOffLen]))

return b, nil
}

func (sw *slotWriter) insertSlot(sl Slot, t *Table) error {
// if we exeed the slotsPerBucket, we need to create a new overflow bucket
// if we exceed the slotsPerBucket, we need to create a new overflow bucket
// and link it to the current bucket
if sw.currentSlotIndex == slotsPerBucket {
nextBucket, err := t.createOverflowBucket()
Expand All @@ -123,26 +127,29 @@ func (sw *slotWriter) writeSlots() error {
return sw.currentBucket.write()
}

// write all slots in the bucket to the file.
func (b *bucket) write() error {
buf := make([]byte, b.bucketSize)
// write all slots to the buffer
var index = 0
for i := 0; i < slotsPerBucket; i++ {
slot := b.slots[i]

binary.LittleEndian.PutUint32(buf[index:index+4], slot.Hash)
copy(buf[index+4:index+4+len(slot.Value)], slot.Value)
binary.LittleEndian.PutUint32(buf[index:index+hashLen], slot.Hash)
copy(buf[index+hashLen:index+hashLen+len(slot.Value)], slot.Value)

index += 4 + len(slot.Value)
index += hashLen + len(slot.Value)
}

// write the offset of next overflow bucket
binary.LittleEndian.PutUint64(buf[len(buf)-8:], uint64(b.nextOffset))
binary.LittleEndian.PutUint64(buf[len(buf)-nextOffLen:], uint64(b.nextOffset))

_, err := b.file.WriteAt(buf, b.offset)
return err
}

// remove a slot from the bucket, and move all slots after it forward
// to fill the empty slot.
func (b *bucket) removeSlot(slotIndex int) {
i := slotIndex
for ; i < slotsPerBucket-1; i++ {
Expand Down
6 changes: 4 additions & 2 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ func main() {
// put a key-value pair into the table.
// the MatchKey function will be called when the key is matched.
// Why we need the MatchKey function?
// because the key may be hashed to the same slot with another key(even though the probability is very low),
// so we need to check if the key is matched.
// When we store the data in the hash table, we only store the hash value of the key, and the raw value.
// So when we get the data from hash table, even if the hash value of the key matches, that doesn't mean
// the key matches because of hash collision.
// So we need to provide a function to determine whether the key of the slot matches the stored key.
err = table.Put([]byte("key1"), []byte(strings.Repeat("v", 10)), func(slot diskhash.Slot) (bool, error) {
return true, nil
})
Expand Down
15 changes: 13 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,23 @@ package diskhash

import "os"

// Options is used to create a new diskhash table.
type Options struct {
DirPath string
// DirPath is the directory path to store the hash table files.
DirPath string

// SlotValueLength is the length of the value in each slot.
// Your value lenght must be equal to the value length you set when creating the table.
SlotValueLength uint32
LoadFactor float64

// LoadFactor is the load factor of the hash table.
// The load factor is the ratio of the number of elements in the hash table to the table size.
// If the ratio is greater than the load factor, the hash table will be expanded automatically.
// The default value is 0.7.
LoadFactor float64
}

// DefaultOptions is the default options.
var DefaultOptions = Options{
DirPath: os.TempDir(),
SlotValueLength: 0,
Expand Down
64 changes: 55 additions & 9 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,38 @@ import (
)

const (
primaryFileName = "HASH.PRIMARY"
overflowFileName = "HASH.OVERFLOW"
metaFileName = "HASH.META"
bucketNextOffsetLen = 8
primaryFileName = "HASH.PRIMARY"
overflowFileName = "HASH.OVERFLOW"
metaFileName = "HASH.META"
slotsPerBucket = 31
nextOffLen = 8
hashLen = 4
)

// MatchKeyFunc is used to determine whether the key of the slot matches the stored key.
// And you must supply the function to the Put/Get/Delete methods.
//
// Why we need this function?
//
// When we store the data in the hash table, we only store the hash value of the key, and the raw value.
// So when we get the data from hash table, even if the hash value of the key matches, that doesn't mean
// the key matches because of hash collision.
// So we need to provide a function to determine whether the key of the slot matches the stored key.
type MatchKeyFunc func(Slot) (bool, error)

// Table is a hash table that stores data on disk.
// It consists of two files, the primary file and the overflow file.
// Each file is divided into multiple buckets, each bucket contains multiple slots.
type Table struct {
primaryFile fs.File
overflowFile fs.File
metaFile fs.File
metaFile fs.File // meta file stores the metadata of the hash table
meta *tableMeta
mu *sync.RWMutex
mu *sync.RWMutex // protect the table when multiple goroutines access it
options Options
}

// tableMeta is the metadata of the hash table.
type tableMeta struct {
Level uint8
SplitBucketIndex uint32
Expand All @@ -40,6 +55,9 @@ type tableMeta struct {
FreeBuckets []int64
}

// Open opens a hash table.
// If the hash table does not exist, it will be created automatically.
// It will open the primary file, the overflow file and the meta file.
func Open(options Options) (*Table, error) {
if err := checkOptions(options); err != nil {
return nil, err
Expand Down Expand Up @@ -99,6 +117,8 @@ func checkOptions(options Options) error {
return nil
}

// read the metadata info from the meta file.
// if the file is empty, init the metadata info.
func (t *Table) readMeta() error {
file, err := fs.Open(filepath.Join(t.options.DirPath, metaFileName), fs.OSFileSystem)
if err != nil {
Expand All @@ -111,7 +131,7 @@ func (t *Table) readMeta() error {
if file.Size() == 0 {
t.meta.NumBuckets = 1
t.meta.SlotValueLength = t.options.SlotValueLength
t.meta.BucketSize = slotsPerBucket*(4+t.meta.SlotValueLength) + bucketNextOffsetLen
t.meta.BucketSize = slotsPerBucket*(hashLen+t.meta.SlotValueLength) + nextOffLen
} else {
decoder := json.NewDecoder(t.metaFile)
if err := decoder.Decode(t.meta); err != nil {
Expand All @@ -127,11 +147,13 @@ func (t *Table) readMeta() error {
return nil
}

// write the metadata info to the meta file in json format.
func (t *Table) writeMeta() error {
encoder := json.NewEncoder(t.metaFile)
return encoder.Encode(t.meta)
}

// Close closes the files of the hash table.
func (t *Table) Close() error {
t.mu.Lock()
defer t.mu.Unlock()
Expand All @@ -146,6 +168,7 @@ func (t *Table) Close() error {
return nil
}

// Sync flushes the data of the hash table to disk.
func (t *Table) Sync() error {
t.mu.Lock()
defer t.mu.Unlock()
Expand All @@ -161,6 +184,8 @@ func (t *Table) Sync() error {
return nil
}

// Put puts a new ke/value pair to the hash table.
// the parameter matchKey is described in the MatchKeyFunc.
func (t *Table) Put(key, value []byte, matchKey MatchKeyFunc) error {
t.mu.Lock()
defer t.mu.Unlock()
Expand All @@ -173,7 +198,7 @@ func (t *Table) Put(key, value []byte, matchKey MatchKeyFunc) error {
// get the slot writer to write the new slot,
// it will get the corresponding bucket according to the key hash,
// and find an empty slot to insert.
// If there are no empty slots, a overflow bucket will be created.
// If there are no empty slots, an overflow bucket will be created.
keyHash := getKeyHash(key)
slot := &Slot{Hash: keyHash, Value: value}
sw, err := t.getSlotWriter(slot.Hash, matchKey)
Expand Down Expand Up @@ -205,6 +230,8 @@ func (t *Table) Put(key, value []byte, matchKey MatchKeyFunc) error {
return nil
}

// find a free slot position to insert the new slot.
// return the slot writer.
func (t *Table) getSlotWriter(keyHash uint32, matchKey MatchKeyFunc) (*slotWriter, error) {
sw := &slotWriter{}
bi := t.newBucketIterator(t.getKeyBucket(keyHash))
Expand All @@ -227,6 +254,8 @@ func (t *Table) getSlotWriter(keyHash uint32, matchKey MatchKeyFunc) (*slotWrite
sw.currentSlotIndex = i
return sw, nil
}
// if the slot hash value is not equal to the key hash value,
// which means the key will never be matched, so we can skip it.
if slot.Hash != keyHash {
continue
}
Expand All @@ -249,6 +278,8 @@ func (t *Table) getSlotWriter(keyHash uint32, matchKey MatchKeyFunc) (*slotWrite
}
}

// Get gets the value of the key from the hash table.
// the parameter matchKey is described in the MatchKeyFunc.
func (t *Table) Get(key []byte, matchKey MatchKeyFunc) error {
t.mu.RLock()
defer t.mu.RUnlock()
Expand All @@ -268,9 +299,15 @@ func (t *Table) Get(key []byte, matchKey MatchKeyFunc) error {
return err
}
for _, slot := range b.slots {
// if the slot hash value is 0, which means the subsequent slots are all empty,
// (why? when we write a new slot, we will iterate from the beginning of the bucket, find an empty slot to insert,
// when we remove a slot, we will move the subsequent slots forward, so all non-empty slots will be continuous)
// so we can skip the current bucket and move to the next bucket.
if slot.Hash == 0 {
break
}
// if the slot hash value is not equal to the key hash value,
// which means the key will never be matched, so we can skip it.
if slot.Hash != keyHash {
continue
}
Expand All @@ -281,12 +318,17 @@ func (t *Table) Get(key []byte, matchKey MatchKeyFunc) error {
}
}

// Delete deletes the key from the hash table.
// the parameter matchKey is described in the MatchKeyFunc.
func (t *Table) Delete(key []byte, matchKey MatchKeyFunc) error {
t.mu.Lock()
defer t.mu.Unlock()

// get the bucket according to the key hash
keyHash := getKeyHash(key)
bi := t.newBucketIterator(t.getKeyBucket(keyHash))
// iterate all slots in the bucket and the overflow buckets,
// find the slot to delete.
for {
b, err := bi.next()
if err == io.EOF {
Expand All @@ -295,6 +337,8 @@ func (t *Table) Delete(key []byte, matchKey MatchKeyFunc) error {
if err != nil {
return err
}

// the following code is similar to the Get method
for i, slot := range b.slots {
if slot.Hash == 0 {
break
Expand All @@ -320,6 +364,7 @@ func (t *Table) Delete(key []byte, matchKey MatchKeyFunc) error {
}
}

// Size returns the number of keys in the hash table.
func (t *Table) Size() uint32 {
t.mu.RLock()
defer t.mu.RUnlock()
Expand All @@ -345,7 +390,8 @@ func (t *Table) openFile(name string) (fs.File, error) {
if err != nil {
return nil, err
}
// init file header
// init (dummy) file header
// the first bucket size in the file is not used, so we just init it.
if file.Size() == 0 {
if err := file.Truncate(int64(t.meta.BucketSize)); err != nil {
_ = file.Close()
Expand Down

0 comments on commit fb42623

Please sign in to comment.