Skip to content

Commit

Permalink
*: move all metrics registration to the store level (#905)
Browse files Browse the repository at this point in the history
* *: add test for duplicate metrics registration

This test verifies that a db and table can be reopened after being dropped
without causing a panic.

* *: add custom collector that exports pull-based db and table metrics

These were previously structured as GaugeFuncs registered on db and table
creation. These can create a duplicate metrics registration when a db or table
is dropped and recreated, so this commit moves exporting these metrics to the
store level.

* *: move all metrics registration to the store level

At component instantiation time (e.g. db, wal, table), existing metrics are
provided to the components with the correct variable label set. This moves the
metric registration lifecycle to the store level and avoids duplicate
registration panics when e.g. reopening a closed db.
  • Loading branch information
asubiotto authored Jun 19, 2024
1 parent 1d823f7 commit e93bdb8
Show file tree
Hide file tree
Showing 11 changed files with 540 additions and 261 deletions.
91 changes: 32 additions & 59 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/go-kit/log/level"
"github.com/oklog/ulid/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -60,7 +59,7 @@ type ColumnStore struct {
enableWAL bool
manualBlockRotation bool
snapshotTriggerSize int64
metrics metrics
metrics globalMetrics
recoveryConcurrency int

// indexDegree is the degree of the btree index (default = 2)
Expand All @@ -79,16 +78,10 @@ type ColumnStore struct {
// testingOptions are options only used for testing purposes.
testingOptions struct {
disableReclaimDiskSpaceOnSnapshot bool
walTestingOptions []wal.TestingOption
walTestingOptions []wal.Option
}
}

type metrics struct {
shutdownDuration prometheus.Histogram
shutdownStarted prometheus.Counter
shutdownCompleted prometheus.Counter
}

type Option func(*ColumnStore) error

func New(
Expand All @@ -112,20 +105,9 @@ func New(
}
}

s.metrics = metrics{
shutdownDuration: promauto.With(s.reg).NewHistogram(prometheus.HistogramOpts{
Name: "frostdb_shutdown_duration",
Help: "time it takes for the columnarstore to complete a full shutdown.",
}),
shutdownStarted: promauto.With(s.reg).NewCounter(prometheus.CounterOpts{
Name: "frostdb_shutdown_started",
Help: "Indicates a shutdown of the columnarstore has started.",
}),
shutdownCompleted: promauto.With(s.reg).NewCounter(prometheus.CounterOpts{
Name: "frostdb_shutdown_completed",
Help: "Indicates a shutdown of the columnarstore has completed.",
}),
}
// Register metrics that are updated by the collector.
s.reg.MustRegister(&collector{s: s})
s.metrics = makeAndRegisterGlobalMetrics(s.reg)

if s.enableWAL && s.storagePath == "" {
return nil, fmt.Errorf("storage path must be configured if WAL is enabled")
Expand Down Expand Up @@ -351,14 +333,8 @@ func (s *ColumnStore) recoverDBsFromStorage(ctx context.Context) error {
return g.Wait()
}

type dbMetrics struct {
txHighWatermark prometheus.GaugeFunc
snapshotMetrics *snapshotMetrics
}

type DB struct {
columnStore *ColumnStore
reg prometheus.Registerer
logger log.Logger
tracer trace.Tracer
name string
Expand Down Expand Up @@ -387,7 +363,8 @@ type DB struct {

snapshotInProgress atomic.Bool

metrics *dbMetrics
metrics snapshotMetrics
metricsProvider tableMetricsProvider
}

// DataSinkSource is a convenience interface for a data source and sink.
Expand Down Expand Up @@ -474,20 +451,20 @@ func (s *ColumnStore) DB(ctx context.Context, name string, opts ...DBOption) (*D
s.mtx.Lock()
}

reg := prometheus.WrapRegistererWith(prometheus.Labels{"db": name}, s.reg)
logger := log.WithPrefix(s.logger, "db", name)
db = &DB{
columnStore: s,
name: name,
mtx: &sync.RWMutex{},
tables: map[string]*Table{},
roTables: map[string]*Table{},
reg: reg,
logger: logger,
tracer: s.tracer,
wal: &wal.NopWAL{},
sources: s.sources,
sinks: s.sinks,
columnStore: s,
name: name,
mtx: &sync.RWMutex{},
tables: map[string]*Table{},
roTables: map[string]*Table{},
logger: logger,
tracer: s.tracer,
wal: &wal.NopWAL{},
sources: s.sources,
sinks: s.sinks,
metrics: s.metrics.snapshotMetricsForDB(name),
metricsProvider: tableMetricsProvider{dbName: name, m: s.metrics},
}

if s.storagePath != "" {
Expand Down Expand Up @@ -544,7 +521,15 @@ func (s *ColumnStore) DB(ctx context.Context, name string, opts ...DBOption) (*D
delete(s.dbReplaysInProgress, name)
}()
var err error
db.wal, err = db.openWAL(ctx, s.testingOptions.walTestingOptions...)
db.wal, err = db.openWAL(
ctx,
append(
[]wal.Option{
wal.WithMetrics(s.metrics.metricsForFileWAL(name)),
wal.WithStoreMetrics(s.metrics.metricsForWAL(name)),
}, s.testingOptions.walTestingOptions...,
)...,
)
return err
}(); err != nil {
return err
Expand All @@ -563,17 +548,6 @@ func (s *ColumnStore) DB(ctx context.Context, name string, opts ...DBOption) (*D
}
}
}

// Register metrics last to avoid duplicate registration should and of the WAL or storage replay errors occur
db.metrics = &dbMetrics{
txHighWatermark: promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "frostdb_tx_high_watermark",
Help: "The highest transaction number that has been released to be read",
}, func() float64 {
return float64(db.highWatermark.Load())
}),
snapshotMetrics: newSnapshotMetrics(reg),
}
return nil
}(); dbSetupErr != nil {
level.Warn(s.logger).Log(
Expand Down Expand Up @@ -646,10 +620,9 @@ func (s *ColumnStore) DropDB(name string) error {
return os.RemoveAll(filepath.Join(s.DatabasesDir(), name))
}

func (db *DB) openWAL(ctx context.Context, opts ...wal.TestingOption) (WAL, error) {
func (db *DB) openWAL(ctx context.Context, opts ...wal.Option) (WAL, error) {
wal, err := wal.Open(
db.logger,
db.reg,
db.walDir(),
opts...,
)
Expand Down Expand Up @@ -812,7 +785,7 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
db,
tableName,
config,
db.reg,
db.metricsProvider.metricsForTable(tableName),
db.logger,
db.tracer,
wal,
Expand Down Expand Up @@ -1100,7 +1073,7 @@ func (db *DB) readOnlyTable(name string) (*Table, error) {
db,
name,
nil,
db.reg,
db.metricsProvider.metricsForTable(name),
db.logger,
db.tracer,
db.wal,
Expand Down Expand Up @@ -1175,7 +1148,7 @@ func (db *DB) table(name string, config *tablepb.TableConfig, id ulid.ULID) (*Ta
db,
name,
config,
db.reg,
db.metricsProvider.metricsForTable(name),
db.logger,
db.tracer,
db.wal,
Expand Down
44 changes: 35 additions & 9 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/google/uuid"
"github.com/polarsignals/iceberg-go"
"github.com/polarsignals/iceberg-go/catalog"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -1713,6 +1714,7 @@ func Test_DB_Limiter(t *testing.T) {
// DropStorage ensures that a database can continue on after drop storage is called.
func Test_DB_DropStorage(t *testing.T) {
logger := newTestLogger(t)
ctx := context.Background()
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)

Expand All @@ -1722,8 +1724,11 @@ func Test_DB_DropStorage(t *testing.T) {

dir := t.TempDir()

// Use an actual prometheus registry to test duplicate metrics
// registration.
c, err := New(
WithLogger(logger),
WithRegistry(prometheus.NewRegistry()),
WithWAL(),
WithStoragePath(dir),
WithActiveMemorySize(1*MiB),
Expand All @@ -1732,15 +1737,14 @@ func Test_DB_DropStorage(t *testing.T) {
_ = c.Close()
}()
require.NoError(t, err)
const dbName = "test"
db, err := c.DB(context.Background(), dbName)
const dbAndTableName = "test"
db, err := c.DB(ctx, dbAndTableName)
require.NoError(t, err)
table, err := db.Table("test", config)
table, err := db.Table(dbAndTableName, config)
require.NoError(t, err)

samples := dynparquet.NewTestSamples()

ctx := context.Background()
for i := 0; i < 100; i++ {
r, err := samples.ToRecord()
require.NoError(t, err)
Expand All @@ -1751,7 +1755,7 @@ func Test_DB_DropStorage(t *testing.T) {
countRows := func(expected int) {
rows := 0
engine := query.NewEngine(mem, db.TableProvider())
err = engine.ScanTable("test").
err = engine.ScanTable(dbAndTableName).
Execute(context.Background(), func(ctx context.Context, r arrow.Record) error {
rows += int(r.NumRows())
return nil
Expand All @@ -1762,9 +1766,31 @@ func Test_DB_DropStorage(t *testing.T) {
countRows(300)

level.Debug(logger).Log("msg", "dropping storage")
require.NoError(t, c.DropDB(dbName))
require.NoError(t, c.DropDB(dbAndTableName))

// Getting without creating a DB should return an error.
_, err = c.GetDB(dbAndTableName)
require.Error(t, err)

// Opening a new DB with the same name should be fine.
db, err = c.DB(ctx, dbAndTableName)
require.NoError(t, err)
// A table as well.
table, err = db.Table(dbAndTableName, config)
require.NoError(t, err)

// Open a new store against the dropped storage, and expect empty db
r, err := samples.ToRecord()
require.NoError(t, err)
defer r.Release()
_, err = table.InsertRecord(ctx, r)
require.NoError(t, err)
// Expect a three rows in the table.
countRows(3)

// Dropping twice should be valid as well.
require.NoError(t, c.DropDB(dbAndTableName))

// Open a new store against the dropped storage, and expect empty db.
c, err = New(
WithLogger(logger),
WithWAL(),
Expand All @@ -1776,9 +1802,9 @@ func Test_DB_DropStorage(t *testing.T) {
}()
require.NoError(t, err)
level.Debug(logger).Log("msg", "opening new db")
db, err = c.DB(context.Background(), "test")
db, err = c.DB(ctx, dbAndTableName)
require.NoError(t, err)
_, err = db.Table("test", config)
_, err = db.Table(dbAndTableName, config)
require.NoError(t, err)
countRows(0)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/pingcap/tidb/parser v0.0.0-20231013125129-93a834a6bf8d
github.com/planetscale/vtprotobuf v0.6.0
github.com/polarsignals/iceberg-go v0.0.0-20240502213135-2ee70b71e76b
github.com/polarsignals/wal v0.0.0-20240514152147-1cd4b81c9b88
github.com/polarsignals/wal v0.0.0-20240619104840-9da940027f9c
github.com/prometheus/client_golang v1.19.1
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/polarsignals/iceberg-go v0.0.0-20240502213135-2ee70b71e76b h1:Dbm5itapR0uYIMujR8OntWpDJ/nm5OM6JiaKauLcZ4Y=
github.com/polarsignals/iceberg-go v0.0.0-20240502213135-2ee70b71e76b/go.mod h1:5T9ChEZjRNhAGGLwH1cqzDA7wXB84SmU+WkXQr/ZAjo=
github.com/polarsignals/wal v0.0.0-20240514152147-1cd4b81c9b88 h1:FZvQW8MXcNjwLfWDRAatOA83Pof5+iKW7veuInygBXY=
github.com/polarsignals/wal v0.0.0-20240514152147-1cd4b81c9b88/go.mod h1:EVDHAAe+7GQ33A1/x+/gE+sBPN4toQ0XG5RoLD49xr8=
github.com/polarsignals/wal v0.0.0-20240619104840-9da940027f9c h1:ReFgEXqZ9/y+/9ZdNHOa1L62wqt8mWqoqrWutWj2x+A=
github.com/polarsignals/wal v0.0.0-20240619104840-9da940027f9c/go.mod h1:EVDHAAe+7GQ33A1/x+/gE+sBPN4toQ0XG5RoLD49xr8=
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
Expand Down
2 changes: 1 addition & 1 deletion index/lsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type LSM struct {
type LSMMetrics struct {
Compactions *prometheus.CounterVec
LevelSize *prometheus.GaugeVec
CompactionDuration prometheus.Histogram
CompactionDuration prometheus.Observer
}

// LevelConfig is the configuration for a level in the LSM.
Expand Down
Loading

0 comments on commit e93bdb8

Please sign in to comment.