forked from rivian/delta-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
checkpoint.go
515 lines (470 loc) · 19.3 KB
/
checkpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
// Copyright 2023 Rivian Automotive, Inc.
// Licensed under the Apache License, Version 2.0 (the “License”);
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an “AS IS” BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package delta contains the resources required to interact with a Delta table.
package delta
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"sync/atomic"
"time"
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/compress"
"github.com/chelseajonesr/rfarrow"
"github.com/google/uuid"
"github.com/rivian/delta-go/storage"
"golang.org/x/sync/errgroup"
)
// CheckPoint holds the metadata for a checkpoint file.
// This gets written out to _last_checkpoint.
type CheckPoint struct {
/// Delta table version
Version int64 `json:"version"`
// The number of actions in the checkpoint. -1 if not available.
Size int64 `json:"size"`
// The number of parts if the checkpoint has multiple parts. Omit if single part.
Parts *int32 `json:"parts,omitempty"`
// Size of the checkpoint in bytes
SizeInBytes int64 `json:"sizeInBytes"`
NumOfAddFiles int64 `json:"numOfAddFiles"`
}
// CheckpointEntry contains a single entry in the checkpoint Parquet file
// All but one of the pointers should be nil
type CheckpointEntry struct {
Txn *Txn `parquet:"name=txn"`
Add *Add `parquet:"name=add"`
Remove *Remove `parquet:"name=remove"`
MetaData *MetaData `parquet:"name=metaData"`
Protocol *Protocol `parquet:"name=protocol"`
Cdc *CDC `parquet:"-"` // CDC not implemented yet
}
// CheckpointConfiguration contains additional configuration for checkpointing
type CheckpointConfiguration struct {
// Maximum numbers of rows to include in each multi-part checkpoint part
// Current default 50k
MaxRowsPerPart int
// Allow checkpointing even if the table reader version or writer version is greater than supported
// by this client. Defaults to false.
// **WARNING** If you set this to true and the table being checkpointed uses features that are not supported by this
// client, the resulting checkpoint might fail unpredictably and silently; this could cause data loss or corruption
UnsafeIgnoreUnsupportedReaderWriterVersionErrors bool
// Disable any cleanup after checkpointing, even if it was enabled in the table configuration.
// Defaults to false.
DisableCleanup bool
// Configure use of on-disk intermediate storage to reduce memory requirements
ReadWriteConfiguration OptimizeCheckpointConfiguration
}
// OptimizeCheckpointConfiguration holds settings for optimizing checkpoint read and write operations
type OptimizeCheckpointConfiguration struct {
// Use an intermediate on-disk storage location to reduce memory
OnDiskOptimization bool
WorkingStore storage.ObjectStore
WorkingFolder storage.Path
// If these are > 1, checkpoint read and write operations will use this many goroutines
ConcurrentCheckpointRead int
ConcurrentCheckpointWrite int
}
// NewCheckpointConfiguration returns the default configuration for creating checkpoints
func NewCheckpointConfiguration() *CheckpointConfiguration {
checkpointConfiguration := new(CheckpointConfiguration)
// From inspection of Spark generated checkpoint files
checkpointConfiguration.MaxRowsPerPart = 50000
checkpointConfiguration.UnsafeIgnoreUnsupportedReaderWriterVersionErrors = false
checkpointConfiguration.DisableCleanup = false
return checkpointConfiguration
}
// NewOptimizeCheckpointConfiguration returns a default enabled optimization configuration
// with a working folder in the table store's _delta_log/.tmp/ folder
// but no concurrency enabled
func NewOptimizeCheckpointConfiguration(store storage.ObjectStore, version int64) (*OptimizeCheckpointConfiguration, error) {
optimizeCheckpointConfiguration := new(OptimizeCheckpointConfiguration)
optimizeCheckpointConfiguration.OnDiskOptimization = true
optimizeCheckpointConfiguration.WorkingStore = store
optimizeCheckpointConfiguration.WorkingFolder = storage.NewPath(fmt.Sprintf("_delta_log/.tmp/checkpoint-v%d-%s", version, uuid.NewString()))
return optimizeCheckpointConfiguration, nil
}
var (
// ErrCheckpointAlreadyExists is returned when trying to create a checkpoint but it already exists
ErrCheckpointAlreadyExists error = errors.New("checkpoint already exists")
// ErrCheckpointRowCountMismatch is returned when the checkpoint is generated with a different row count
// than expected from the table state. This indicates an internal error.
ErrCheckpointRowCountMismatch error = errors.New("checkpoint generated with unexpected row count")
// ErrCheckpointIncomplete is returned when trying to read a multi-part checkpoint but not all parts exist
ErrCheckpointIncomplete error = errors.New("checkpoint is missing parts")
// ErrCheckpointInvalidMultipartFileName is returned when a multi-part checkpoint file has the wrong number of parts in the filename
ErrCheckpointInvalidMultipartFileName error = errors.New("checkpoint file name is invalid")
// ErrCheckpointAddZeroSize is returned if there is an Add action with size 0
// because including this would cause subsequent Optimize operations to fail.
ErrCheckpointAddZeroSize error = errors.New("zero size in add not allowed")
// ErrCheckpointEntryMultipleActions is returned if a checkpoint entry has more than one non-null action
ErrCheckpointEntryMultipleActions error = errors.New("checkpoint entry contains multiple actions")
// ErrCheckpointOptimizationWorkingFolder is returned if there is a problem with the optimization working folder
ErrCheckpointOptimizationWorkingFolder error = errors.New("error using checkpoint optimization working folder")
)
func checkpointFromBytes(bytes []byte) (*CheckPoint, error) {
checkpoint := new(CheckPoint)
err := json.Unmarshal(bytes, checkpoint)
if err != nil {
return nil, err
}
return checkpoint, nil
}
func lastCheckpointPath() storage.Path {
path := storage.PathFromIter([]string{"_delta_log", "_last_checkpoint"})
return path
}
// Return the checkpoint version and total parts, and the current part index if the URI is a valid checkpoint filename
// If the checkpoint is single-part then part and checkpoint.Parts will both be zero
// If the URI is not a valid checkpoint filename then checkpoint will be nil
func checkpointInfoFromURI(path storage.Path) (checkpoint *CheckPoint, part int32, parseErr error) {
// Check for a single-part checkpoint
groups := checkpointRegex.FindStringSubmatch(path.Base())
if len(groups) == 2 {
var version int64
version, parseErr = strconv.ParseInt(groups[1], 10, 64)
if parseErr != nil {
return
}
checkpoint = new(CheckPoint)
checkpoint.Version = version
checkpoint.Size = 0
part = 0
return
}
// Check for a multi part checkpoint
groups = checkpointPartsRegex.FindStringSubmatch(path.Base())
if len(groups) == 4 {
var version int64
version, parseErr = strconv.ParseInt(groups[1], 10, 64)
if parseErr != nil {
return
}
var partInt64 int64
var partsInt64 int64
partInt64, parseErr = strconv.ParseInt(groups[2], 10, 32)
if parseErr != nil {
return
}
part = int32(partInt64)
partsInt64, parseErr = strconv.ParseInt(groups[3], 10, 32)
if parseErr != nil {
return
}
parts := int32(partsInt64)
checkpoint = new(CheckPoint)
checkpoint.Version = version
checkpoint.Size = 0
checkpoint.Parts = &parts
}
return
}
// DoesCheckpointVersionExist returns true if the given checkpoint version exists, either as a single- or multi-part checkpoint
func DoesCheckpointVersionExist(store storage.ObjectStore, version int64, validateAllPartsExist bool) (bool, error) {
// List all files starting with the version prefix. This will also find commit logs and possible crc files
str := fmt.Sprintf("%020d", version)
path := storage.PathFromIter([]string{"_delta_log", str})
possibleCheckpointFiles, err := store.ListAll(path)
if err != nil {
return false, err
}
// Multi-part validation
partsFound := make(map[int32]bool, 10)
totalParts := int32(0)
for _, possibleCheckpointFile := range possibleCheckpointFiles.Objects {
checkpoint, currentPart, err := checkpointInfoFromURI(possibleCheckpointFile.Location)
if err != nil {
return false, err
}
if checkpoint != nil {
if checkpoint.Parts == nil || !validateAllPartsExist {
// If it's single-part or we're not validating multi-part, then we're done
return true, nil
}
if totalParts > 0 && *checkpoint.Parts != totalParts {
return false, errors.Join(ErrCheckpointInvalidMultipartFileName, fmt.Errorf("different number of total parts found between checkpoint files for version %d", version))
}
totalParts = *checkpoint.Parts
partsFound[currentPart] = true
}
}
// Found a multi-part checkpoint and we want to validate that all parts exist
if len(partsFound) > 0 {
for i := int32(0); i < totalParts; i++ {
found, ok := partsFound[i+1]
if !ok || !found {
return false, ErrCheckpointIncomplete
}
}
return true, nil
}
return false, nil
}
// Create a checkpoint for the given state in the given store
// Assumes that checkpointing is locked such that no other process is currently trying to write a checkpoint for the same version
// Applies tombstone expiration first
func createCheckpointFor(tableState *TableState, store storage.ObjectStore, checkpointConfiguration *CheckpointConfiguration) error {
checkpointExists, err := DoesCheckpointVersionExist(store, tableState.Version, false)
if err != nil {
return err
}
if checkpointExists {
return ErrCheckpointAlreadyExists
}
if err := tableState.prepareStateForCheckpoint(&checkpointConfiguration.ReadWriteConfiguration); err != nil {
return errors.Join(errors.New("failed to prepare state for checkpoint"), err)
}
totalRows := tableState.FileCount() + tableState.TombstoneCount() + len(tableState.AppTransactionVersion) + 2
numParts := int32(((totalRows - 1) / checkpointConfiguration.MaxRowsPerPart) + 1)
// From https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoints:
// When writing multi-part checkpoints, the data must be clustered (either through hash or range partitioning)
// by the 'path' of an added or removed file, or null otherwise. This ensures deterministic content in each
// part file in case of multiple attempts to write the files.
//
// We are not doing this, because we are using a separate checkpointing lock so only one writer can checkpoint
// at a time. (Note that this does not apply to external writers such as Spark.)
var totalBytes int64 = 0
var rowsWritten int32 = 0
generatePart := func(part int) error {
partOffsetRow := part * checkpointConfiguration.MaxRowsPerPart
checkpointEntries, err := checkpointRows(tableState, partOffsetRow, checkpointConfiguration)
if err != nil {
return err
}
atomic.AddInt32(&rowsWritten, int32(len(checkpointEntries)))
buf := new(bytes.Buffer)
props := parquet.NewWriterProperties(
parquet.WithCompression(compress.Codecs.Snappy),
)
err = rfarrow.WriteGoStructsToParquet(checkpointEntries, buf, props)
if err != nil {
return err
}
parquetBytes := buf.Bytes()
var checkpointFileName string
if numParts == 1 {
checkpointFileName = fmt.Sprintf("%020d.checkpoint.parquet", tableState.Version)
} else {
checkpointFileName = fmt.Sprintf("%020d.checkpoint.%010d.%010d.parquet", tableState.Version, part+1, numParts)
}
checkpointPath := storage.PathFromIter([]string{"_delta_log", checkpointFileName})
_, err = store.Head(checkpointPath)
if !errors.Is(err, storage.ErrObjectDoesNotExist) {
return errors.Join(ErrCheckpointAlreadyExists, fmt.Errorf("checkpoint file %s", checkpointPath.Raw))
}
err = store.Put(checkpointPath, parquetBytes)
if err != nil {
return err
}
atomic.AddInt64(&totalBytes, int64(len(parquetBytes)))
return nil
}
// Optional concurrency support
if checkpointConfiguration.ReadWriteConfiguration.ConcurrentCheckpointWrite > 1 {
g, ctx := errgroup.WithContext(context.Background())
partIndexChannel := make(chan int)
for i := 0; i < checkpointConfiguration.ReadWriteConfiguration.ConcurrentCheckpointWrite; i++ {
g.Go(func() error {
for part := range partIndexChannel {
err := generatePart(part)
if err != nil {
return err
}
}
return nil
})
}
g.Go(func() error {
defer close(partIndexChannel)
done := ctx.Done()
for part := 0; part < int(numParts); part++ {
if err := ctx.Err(); err != nil {
return err
}
select {
case partIndexChannel <- part:
continue
case <-done:
return ctx.Err()
}
}
return ctx.Err()
})
err := g.Wait()
if err != nil {
return err
}
} else {
for part := 0; part < int(numParts); part++ {
err := generatePart(part)
if err != nil {
return err
}
}
}
if int(rowsWritten) != totalRows {
return errors.Join(ErrCheckpointRowCountMismatch, fmt.Errorf("expected %d rows, got %d rows", totalRows, rowsWritten))
}
var reportedParts *int32
if numParts > 1 {
// Only multipart checkpoints list the parts
reportedParts = &numParts
}
checkpoint := CheckPoint{
Version: tableState.Version,
Size: int64(totalRows),
SizeInBytes: totalBytes,
Parts: reportedParts,
NumOfAddFiles: int64(tableState.FileCount()),
}
checkpointBytes, err := json.Marshal(checkpoint)
if err != nil {
return err
}
err = store.Put(lastCheckpointPath(), checkpointBytes)
if err != nil {
return err
}
return nil
}
// Generate an Add action for a checkpoint (with additional fields) from a basic Add action
// Note that parsed stats and partition have been removed during the parquet library change.
// TODO add them back
func checkpointAdd(add *Add) (*Add, error) {
// stats, err := StatsFromJson([]byte(add.Stats))
// if err != nil {
// return nil, err
// }
// parsedStats, err := statsAsGenericStats[RowType](stats)
// if err != nil {
// return nil, err
// }
addDataChange := false
checkpointAdd := new(Add)
switch typedAdd := any(checkpointAdd).(type) {
case *Add:
// *typedAdd = *add
typedAdd.DataChange = addDataChange
typedAdd.ModificationTime = add.ModificationTime
typedAdd.PartitionValues = add.PartitionValues
typedAdd.Path = add.Path
typedAdd.Size = add.Size
if typedAdd.Size == 0 {
return nil, errors.Join(ErrCheckpointAddZeroSize, fmt.Errorf("zero size add for path %s", add.Path))
}
typedAdd.Stats = add.Stats
typedAdd.Tags = add.Tags
// typedAdd.StatsParsed = *parsedStats
// partitionValuesParsed, err := partitionValuesAsGeneric[PartitionType](add.PartitionValues)
// if err != nil {
// return checkpointAdd, err
// }
// typedAdd.PartitionValuesParsed = *partitionValuesParsed
}
return checkpointAdd, nil
}
type deletionCandidate struct {
Version int64
Meta storage.ObjectMeta
}
// If the maybeToDelete files are safe to delete, delete them. Otherwise, clear them
// "Safe to delete" is determined by the version and timestamp of the last file in the maybeToDelete list.
// For more details see BufferingLogDeletionIterator() in https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala
// Returns the number of files deleted.
func flushDeleteFiles(store storage.ObjectStore, maybeToDelete []deletionCandidate, beforeVersion int64, maxTimestamp time.Time) (int, error) {
deleted := 0
if len(maybeToDelete) > 0 {
lastMaybeToDelete := maybeToDelete[len(maybeToDelete)-1]
if lastMaybeToDelete.Version < beforeVersion && lastMaybeToDelete.Meta.LastModified.UnixMilli() <= maxTimestamp.UnixMilli() {
for _, deleteFile := range maybeToDelete {
err := store.Delete(deleteFile.Meta.Location)
if err != nil {
return deleted, err
}
deleted++
}
}
}
return deleted, nil
}
// *** The caller MUST validate that there is a checkpoint at or after beforeVersion before calling this ***
// Remove any logs and checkpoints that have a last updated date before maxTimestamp and a version before beforeVersion
// Last updated timestamps are required to be monotonically increasing, so there may be some time adjustment required
// For more detail see BufferingLogDeletionIterator() in https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala
func removeExpiredLogsAndCheckpoints(beforeVersion int64, maxTimestamp time.Time, store storage.ObjectStore) (int, error) {
if !store.IsListOrdered() {
// Currently all object stores return list results sorted
return 0, errors.Join(ErrNotImplemented, errors.New("removing expired logs is not implemented for this object store"))
}
candidatesForDeletion := make([]deletionCandidate, 0, 200)
logIterator := storage.NewListIterator(BaseCommitURI(), store)
// First collect all the logs/checkpoints that might be eligible for deletion
for {
meta, err := logIterator.Next()
if errors.Is(err, storage.ErrObjectDoesNotExist) {
break
}
isValid, version := CommitOrCheckpointVersionFromURI(meta.Location)
// Spark and Rust clients also use the file's last updated timestamp rather than opening the commit and using internal state
if isValid && version < beforeVersion && meta.LastModified.Before(maxTimestamp) {
candidatesForDeletion = append(candidatesForDeletion, deletionCandidate{Version: version, Meta: *meta})
}
if version >= beforeVersion {
break
}
}
// Now look for actually deletable ones based on adjusted timestamp
maybeToDelete := make([]deletionCandidate, 0, len(candidatesForDeletion))
deletedCount := 0
var lastFile, currentFile deletionCandidate
if len(candidatesForDeletion) > 0 {
lastFile = candidatesForDeletion[0]
candidatesForDeletion = candidatesForDeletion[1:]
maybeToDelete = append(maybeToDelete, lastFile)
}
for {
if len(candidatesForDeletion) == 0 {
deleted, err := flushDeleteFiles(store, maybeToDelete, beforeVersion, maxTimestamp)
deletedCount += deleted
return deletedCount, err
}
currentFile = candidatesForDeletion[0]
candidatesForDeletion = candidatesForDeletion[1:]
if lastFile.Version < currentFile.Version && lastFile.Meta.LastModified.UnixMilli() >= currentFile.Meta.LastModified.UnixMilli() {
// The last version is earlier than the current, but the last timestamp is >= current: current needs time adjustment
currentFile = deletionCandidate{Version: currentFile.Version, Meta: storage.ObjectMeta{
Location: currentFile.Meta.Location,
Size: currentFile.Meta.Size,
LastModified: lastFile.Meta.LastModified.Add(1 * time.Millisecond)}}
// Then stick it on the end of the "maybe" list
maybeToDelete = append(maybeToDelete, currentFile)
} else {
// No time adjustment needed. Delete the contents of maybeToDelete if we can.
// There is always at least one file in maybeToDelete here.
deleted, err := flushDeleteFiles(store, maybeToDelete, beforeVersion, maxTimestamp)
deletedCount += deleted
if err != nil {
return deletedCount, err
}
// If we were not able to delete the contents of maybeToDelete then we are done
if deleted == 0 {
return deletedCount, nil
}
maybeToDelete = maybeToDelete[:0]
maybeToDelete = append(maybeToDelete, currentFile)
}
lastFile = currentFile
}
}