Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose stats #1

Merged
merged 2 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: '1.21'
go-version: '1.22'
cache: false

- name: Run golangci-lint
uses: golangci/golangci-lint-action@v3.7.0
uses: golangci/golangci-lint-action@v4

- name: Build
run: go build -v ./...
Expand Down
30 changes: 30 additions & 0 deletions filechannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

// Sender sends bytes to file channel.
type Sender interface {
SenderStats

// Send bytes to file channel. Data will be finally persistent on disk.
Send(context.Context, []byte) error

Expand All @@ -32,6 +34,8 @@ type Sender interface {

// Receiver receives bytes from file channel in the sending order.
type Receiver interface {
ReceiverStats

// Recv bytes from file channel.
Recv(context.Context) ([]byte, error)

Expand All @@ -51,6 +55,8 @@ type AckReceiver interface {

// FileChannel is the interface for a file-based persistent channel.
type FileChannel interface {
Stats

// Tx creates a Sender. Sender is thread safe.
// It's possible to have multiple senders at the same time.
Tx() Sender
Expand Down Expand Up @@ -89,6 +95,7 @@ var (

// Compiler fence.
var _ AckFileChannel = &fileChannel{}
var _ Stats = &fileChannel{}

type fileChannel struct {
wRefLock sync.Mutex
Expand All @@ -98,6 +105,14 @@ type fileChannel struct {
inner *filechannel.FileChannel
}

func (f *fileChannel) FlushOffset() uint64 {
return f.inner.FlushOffset()
}

func (f *fileChannel) DiskUsage() (uint64, error) {
return f.inner.DiskUsage()
}

func (f *fileChannel) Close() error {
f.wRefLock.Lock()
defer f.wRefLock.Unlock()
Expand All @@ -110,6 +125,10 @@ func (f *fileChannel) Close() error {
return err
}

func (f *fileChannel) writeOffset() uint64 {
return f.inner.WriteOffset()
}

func (f *fileChannel) send(bytes []byte) error {
f.wLock.Lock()
defer f.wLock.Unlock()
Expand Down Expand Up @@ -175,11 +194,16 @@ func OpenAckFileChannel(dir string, opts ...Option) (AckFileChannel, error) {

// Compiler fence.
var _ Sender = &fileChannelSender{}
var _ SenderStats = &fileChannelSender{}

type fileChannelSender struct {
inner *fileChannel
}

func (s *fileChannelSender) WriteOffset() uint64 {
return s.inner.writeOffset()
}

func (s *fileChannelSender) Close() error {
if s.inner != nil {
s.inner.closeTx()
Expand All @@ -200,11 +224,16 @@ func (s *fileChannelSender) Send(ctx context.Context, p []byte) error {

// Compiler fence.
var _ Receiver = &fileChannelReceiver{}
var _ ReceiverStats = &fileChannelReceiver{}

type fileChannelReceiver struct {
inner *filechannel.Iterator
}

func (r *fileChannelReceiver) ReadOffset() uint64 {
return r.inner.Offset()
}

func (r *fileChannelReceiver) Close() error {
return r.inner.Close()
}
Expand All @@ -215,6 +244,7 @@ func (r *fileChannelReceiver) Recv(ctx context.Context) ([]byte, error) {

// Compiler fence.
var _ AckReceiver = &fileChannelAckReceiver{}
var _ ReceiverStats = &fileChannelAckReceiver{}

type fileChannelAckReceiver struct {
fileChannelReceiver
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21.4
require (
github.com/elliotchance/orderedmap/v2 v2.2.0
github.com/gofrs/flock v0.8.1
github.com/klauspost/compress v1.17.3
github.com/klauspost/compress v1.17.8
github.com/stretchr/testify v1.8.4
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/elliotchance/orderedmap/v2 v2.2.0 h1:7/2iwO98kYT4XkOjA9mBEIwvi4KpGB4c
github.com/elliotchance/orderedmap/v2 v2.2.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA=
github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand Down
30 changes: 29 additions & 1 deletion internal/filechannel/filechannel.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 RisingWave Labs
// Copyright 2023-2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@ import (
"math"
"os"
"path"
"path/filepath"
"regexp"
"slices"
"strconv"
Expand Down Expand Up @@ -160,6 +161,10 @@ type Iterator struct {
buf *bytes.Buffer
}

func (it *Iterator) Offset() uint64 {
return it.offset
}

func (it *Iterator) updateOffset(offset uint64) error {
if it.offset == math.MaxUint64 {
it.offset = offset
Expand Down Expand Up @@ -1306,3 +1311,26 @@ func (fc *FileChannel) IteratorAcknowledgable() *Iterator {
}
return NewIterator(fc.segmentManager, fc.position, false)
}

func (fc *FileChannel) WriteOffset() uint64 {
return fc.currentOffset
}

func (fc *FileChannel) FlushOffset() uint64 {
return fc.position.Get()
}

func (fc *FileChannel) DiskUsage() (uint64, error) {
var total uint64
err := filepath.Walk(fc.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() || !segmentFilePattern.MatchString(info.Name()) {
return nil
}
total += uint64(info.Size())
return nil
})
return total, err
}
48 changes: 48 additions & 0 deletions stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filechannel

// ReceiverStats is the interface for getting the stats of a receiver.
type ReceiverStats interface {
// ReadOffset returns the offset of the last read message.
// Note that the offset is local to the receiver. It will only change
// when the receiver reads a message.
//
// The initial offset is math.MaxUint64 to indicate that no message has
// been read.
ReadOffset() uint64
}

// SenderStats is the interface for getting the stats of a sender.
type SenderStats interface {
// WriteOffset returns the offset of the last written message.
// Note that the offset is not the byte offset in the file. It's the
// offset of the message in the channel. The offset will change no matter
// a message is written by the sender, or the other senders of the same
// channel.
WriteOffset() uint64
}

// Stats is the interface for getting the stats of a file channel.
type Stats interface {
// DiskUsage returns the disk usage of the file channel.
// Note that calling DiskUsage() is an expensive operation.
DiskUsage() (uint64, error)

// FlushOffset returns the offset of the last flushed message.
// Messages with offset less than the flush offset are guaranteed to be
// seen by the readers.
FlushOffset() uint64
}
91 changes: 91 additions & 0 deletions stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package filechannel

import (
"context"
"math"
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func TestFileChannel_Stats(t *testing.T) {
tmpDir := mkdirTemp(t)
defer os.RemoveAll(tmpDir)

fch, err := OpenFileChannel(tmpDir)
if !assert.NoError(t, err) {
t.FailNow()
}
defer fch.Close()

// Assert disk usage == 64 (header size).
usage, err := fch.DiskUsage()
if !assert.NoError(t, err) {
t.FailNow()
}
assert.Equal(t, uint64(64), usage)

msg := []byte("Hello world!")

tx := fch.Tx()
defer tx.Close()

// Assert sender offset == 0.
assert.Equal(t, uint64(0), tx.WriteOffset())

err = tx.Send(context.Background(), msg)
if !assert.NoError(t, err) {
t.FailNow()
}

// Assert sender offset != 0.
assert.NotEqual(t, uint64(0), tx.WriteOffset())

rx := fch.Rx()
defer rx.Close()

// Assert reader offset == math.MaxUint64.
assert.Equal(t, uint64(math.MaxUint64), rx.ReadOffset())

p, err := rx.Recv(context.Background())
if !assert.NoError(t, err) {
t.FailNow()
}
if !assert.Equal(t, msg, p) {
t.FailNow()
}

// Assert reader offset != 0.
assert.NotEqual(t, uint64(0), rx.ReadOffset())

// Assert reader offset == sender offset.
assert.Equal(t, tx.WriteOffset(), rx.ReadOffset())

// Recv happened means the file channel has flushed.
// Now we can examine the disk usage and flush offset.

// Assert flush offset != 0.
assert.NotEqual(t, uint64(0), fch.FlushOffset())

// Assert disk usage > 64.
usage, err = fch.DiskUsage()
if !assert.NoError(t, err) {
t.FailNow()
}
assert.NotEqual(t, uint64(64), usage)
}