diff --git a/internal/filechannel/filechannel.go b/internal/filechannel/filechannel.go index a7ef201..ce1877e 100644 --- a/internal/filechannel/filechannel.go +++ b/internal/filechannel/filechannel.go @@ -43,6 +43,10 @@ import ( "github.com/risingwavelabs/filechannel/internal/utils" ) +// The flag to control the debug output. Debug outputs should print the non-critical messages. All +// unexpected error messages must be printed without this flag. +var verbose, _ = strconv.ParseBool(os.Getenv("DEBUG")) + var ( ErrChecksumMismatch = errors.New("channel corrupted: checksum mismatch") ErrChannelClosed = errors.New("channel closed") @@ -1124,8 +1128,13 @@ func (fc *FileChannel) compress(ctx context.Context, index uint32) error { default: } + // Pin the segment. If failed, it means the watermark has moved forward and beyond the segment index. + // So we can just return immediately. if !fc.segmentManager.Pin(index) { - return fmt.Errorf("failed to pin segment index %d", index) + if verbose { + _, _ = fmt.Fprintf(os.Stderr, "failed to pin segment index %d\n", index) + } + return nil } defer fc.segmentManager.Unpin(index) @@ -1210,8 +1219,10 @@ func (fc *FileChannel) flushAndNotifyLoop(ctx context.Context) { case <-ctx.Done(): return case <-time.After(fc.flushInterval): - // FIXME: log the error - _ = fc.flushAndNotifyWithLock() + err := fc.flushAndNotifyWithLock() + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "failed to flush: %v\n", err) + } } } }