-
Notifications
You must be signed in to change notification settings - Fork 0
/
merge.go
311 lines (283 loc) · 7.48 KB
/
merge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
package GoKeeper
import (
"GoKeeper/data"
"GoKeeper/util"
"io"
"os"
"path"
"path/filepath"
"sort"
"strconv"
)
const (
mergeSuffixName = "-merge"
mergerFinishedKey = "merge.finished"
)
// Merge 清理无效数据生成 Hint 文件
func (db *DB) Merge() error {
// 活跃文件为空,直接返回
if db.activeFile == nil {
return nil
}
db.lock.Lock()
if db.isMerging {
// 正在 merge 中
// 释放锁
db.lock.Unlock()
return ErrMergeIsRunning
}
// 查看可以 merge 的数据量是否达到了阈值
size, err := util.DirSize(db.options.DirPath)
if err != nil {
db.lock.Unlock()
return err
}
if float32(size)/float32(db.options.DataFileSize) < db.options.MergeThreshold {
// 数据量未达到阈值,直接返回
db.lock.Unlock()
return ErrMergeNotExceedThreshold
}
// 查看剩余空间容量是否可以容纳 merge 之后的数据量
// todo 获取剩余磁盘空间的方法没法跨平台,暂时不是先
//availableDiskSize, err := util.AvailableDiskSize()
//if err != nil {
// db.lock.Unlock()
// return err
//}
//if uint64(size-db.reclaimSize) >= availableDiskSize {
// db.lock.Unlock()
// return ErrDiskSpaceNotEnough
//}
db.isMerging = true
defer func() {
db.isMerging = false
}()
// 正式开始 merge 流程
// 持久化当前活跃文件
if err := db.activeFile.Sync(); err != nil {
return err
}
// 将当前活跃文件转换为旧的数据文件
db.olderFiles[db.activeFile.FileID] = db.activeFile
// 打开新的活跃文件
if err := db.setActiveDataFile(); err != nil {
return err
}
// 记录最近没有参与 merge 的文件id
nonMergeFileId := db.activeFile.FileID
// 取出所有需要 merge 的文件
mergeFiles := make([]*data.DataFile, 0, 10) // 预先分配空间
for _, file := range db.olderFiles {
mergeFiles = append(mergeFiles, file)
}
// 释放锁, 现在可以用户可以进行 Put,Get,Delete 操作
db.lock.Unlock()
// 将 merge 的文件从小到大进行排序
sort.Slice(mergeFiles, func(i, j int) bool {
return mergeFiles[i].FileID < mergeFiles[j].FileID
})
// 获取 mergePath
mergePath := db.getMergePath()
// 如果目录存在,说明发生过 merge, 删除掉
if _, err := os.Stat(mergePath); err == nil {
if err = os.RemoveAll(mergePath); err != nil {
return err
}
}
// 新建merge 目录
if err := os.Mkdir(mergePath, os.ModePerm); err != nil {
return err
}
// 打开一个新的临时的 bitcask 实例
mergeOptions := db.options
mergeOptions.DirPath = mergePath
// 打开每次都 Sync, merge 速度会下降
mergeOptions.SyncWrites = false
mergeDB, err := Open(mergeOptions)
if err != nil {
return err
}
// 打开 Hint 文件,存储索引
hintFile, err := data.OpenHintFile(mergePath)
if err != nil {
return err
}
// 遍历每个数据文件,读取每一条记录
for _, dataFile := range mergeFiles {
var offset int64 = 0
for {
record, n, err := dataFile.ReadLogRecord(offset)
if err != nil {
if err == io.EOF {
break
}
return err
}
// 解析拿到实际的 Key
realKey, _ := parseLogRecordKey(record.Key)
logRecordPos := db.index.Get(realKey)
// 和内存中的索引位置
if logRecordPos != nil &&
logRecordPos.Fid == dataFile.FileID &&
logRecordPos.Offset == offset {
// 清除事务标记
record.Key = logRecordKeyWithSeq(realKey, nonTransactionKey)
// 将数据重写到数据文件中
pos, err := mergeDB.appendLogRecord(record)
if err != nil {
return err
}
// 将位置索引写到 hint 文件中
if err := hintFile.WriteHintRecord(realKey, pos); err != nil {
return err
}
}
// 移动到下一条记录
offset += n
}
}
// 持久化 hint 文件和数据文件
if err = hintFile.Sync(); err != nil {
return err
}
if err = mergeDB.Sync(); err != nil {
return err
}
// 写标识 merge 完成的文件
mergeFinishFile, err := data.OpenFinishedFileName(mergePath)
if err != nil {
return err
}
mergeFinishRecord := &data.LogRecord{
Key: []byte(mergerFinishedKey),
Value: []byte(strconv.Itoa(int(nonMergeFileId))),
}
encodeRecord, _ := data.EncodeLogRecord(mergeFinishRecord)
if err = mergeFinishFile.Write(encodeRecord); err != nil {
return err
}
if err = mergeFinishFile.Sync(); err != nil {
return err
}
return nil
}
// 拿到数据目录的路径
// eg. 数据目录 /tmp/goKeeper
//
// merge目录 /tmp/gooKeeper-merge
func (db *DB) getMergePath() string {
// 1.首先清理路径中的冗余部分
// 2.其次获取数据目录的父级路径
dir := path.Dir(path.Clean(db.options.DirPath))
base := path.Base(dir)
return filepath.Join(dir, base, mergeSuffixName)
}
// 加载 merge 目录
func (db *DB) loadMergeFiles() error {
mergePath := db.getMergePath()
// 判断目录是否存在,不存在则直接返回
if _, err := os.Stat(mergePath); os.IsNotExist(err) {
return nil
}
defer func() {
_ = os.RemoveAll(mergePath)
}()
// 读取目录中的所有文件
dir, err := os.ReadDir(mergePath)
if err != nil {
return err
}
// 查找 merge 完成的文件,判断 merge 是否处理完毕
var isMergeFinished bool
mergeFileNames := make([]string, 0, 20)
for _, entry := range dir {
if entry.Name() == data.MergeFinishedFileName {
isMergeFinished = true
}
// 没必要拿到事务序列号文件
if entry.Name() == data.SeqNoFileName {
continue
}
mergeFileNames = append(mergeFileNames, entry.Name())
}
// merge没有完成则直接返回
// todo 这里之后可以优化在控制台打印数据,或将数据写入日志文件
if !isMergeFinished {
return nil
}
// merge 完成的处理
// 将旧的数据文件删除,用merge目录下的文件替代
// 获取没有参与 Merge 的文件ID
nonMergeFileId, err := db.getNonMergeFileId(mergePath)
if err != nil {
return err
}
// 删除它文件id小的文件
var fileId uint32 = 0
for ; fileId < nonMergeFileId; fileId++ {
fileName := data.GetDataFileName(db.options.DirPath, fileId)
if _, err = os.Stat(fileName); err == nil {
if err = os.Remove(fileName); err != nil {
return err
}
} else if os.IsNotExist(err) {
return err
}
}
// 将新的数据文件移动到数据目录
for _, fileName := range mergeFileNames {
srcPath := filepath.Join(mergePath, fileName)
dstPath := filepath.Join(db.options.DirPath, fileName)
if err = os.Rename(srcPath, dstPath); err != nil {
return err
}
}
return nil
}
// 获取没有参与 Merge 的文件ID
func (db *DB) getNonMergeFileId(dirPath string) (uint32, error) {
mergeFinishFile, err := data.OpenFinishedFileName(dirPath)
if err != nil {
return 0, err
}
record, _, err := mergeFinishFile.ReadLogRecord(0)
if err != nil {
return 0, err
}
nonMergeFileId, err := strconv.Atoi(string(record.Value))
if err != nil {
return 0, err
}
return uint32(nonMergeFileId), nil
}
// 从 hint 文件中加载索引
func (db *DB) loadIndexFromHintFile() error {
// 查看 hint 索引文件是否存在
hintFileName := filepath.Join(db.options.DirPath, data.HintFileName)
if _, err := os.Stat(hintFileName); os.IsNotExist(err) {
return nil
}
// 打开 hint 索引文件
hintFile, err := data.OpenHintFile(db.options.DirPath)
if err != nil {
return err
}
// 加载索引
var offset int64 = 0
for {
record, n, err := hintFile.ReadLogRecord(offset)
if err != nil {
if err == io.EOF {
break
}
return err
}
// 拿到位置索引
pos := data.DecodeLogRecordPos(record.Value)
// 加入到内存索引中
db.index.Put(record.Key, pos)
// 移动到下一条记录
offset += n
}
return nil
}