Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use a hash for metadata lock name #357

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 18 additions & 13 deletions pkg/dbconn/metadatalock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"time"

"github.com/cashapp/spirit/pkg/table"

"github.com/cashapp/spirit/pkg/dbconn/sqlescape"
"github.com/siddontang/loggers"
)
Expand All @@ -23,12 +25,9 @@ type MetadataLock struct {
refreshInterval time.Duration
}

func NewMetadataLock(ctx context.Context, dsn string, lockName string, logger loggers.Advanced, optionFns ...func(*MetadataLock)) (*MetadataLock, error) {
if len(lockName) == 0 {
return nil, errors.New("metadata lock name is empty")
}
if len(lockName) > 64 {
return nil, fmt.Errorf("metadata lock name is too long: %d, max length is 64", len(lockName))
func NewMetadataLock(ctx context.Context, dsn string, table *table.TableInfo, logger loggers.Advanced, optionFns ...func(*MetadataLock)) (*MetadataLock, error) {
if table == nil {
return nil, errors.New("metadata lock table info is nil")
}

mdl := &MetadataLock{
Expand All @@ -52,28 +51,34 @@ func NewMetadataLock(ctx context.Context, dsn string, lockName string, logger lo
getLock := func() error {
// https://dev.mysql.com/doc/refman/8.0/en/locking-functions.html#function_get-lock
var answer int
stmt := sqlescape.MustEscapeSQL("SELECT GET_LOCK(%?, %?)", lockName, getLockTimeout.Seconds())
// Using the table name alone entails a maximum lock length and leads to conflicts
// between different tables with the same name in different schemas.
// We use the schema name and table name to create a unique lock name with a hash.
// The hash is truncated to 8 characters to avoid the maximum lock length.
// bizarrely_long_schema_name.thisisareallylongtablenamethisisareallylongtablename60charac ==>
// bizarrely_long_schem.thisisareallylongtablenamethisis-66fec116
stmt := sqlescape.MustEscapeSQL("SELECT GET_LOCK( concat(left(%?,20),'.',left(%?,32),'-',left(sha1(concat(%?,%?)),8)), %?)", table.SchemaName, table.TableName, table.SchemaName, table.TableName, getLockTimeout.Seconds())
if err := dbConn.QueryRowContext(ctx, stmt).Scan(&answer); err != nil {
return fmt.Errorf("could not acquire metadata lock: %s", err)
}
if answer == 0 {
// 0 means the lock is held by another connection
// TODO: we could lookup the connection that holds the lock and report details about it
return fmt.Errorf("could not acquire metadata lock: %s, lock is held by another connection", lockName)
return fmt.Errorf("could not acquire metadata lock for %s.%s, lock is held by another connection", table.SchemaName, table.TableName)
} else if answer != 1 {
// probably we never get here, but just in case
return fmt.Errorf("could not acquire metadata lock: %s, GET_LOCK returned: %d", lockName, answer)
return fmt.Errorf("could not acquire metadata lock for %s.%s, GET_LOCK returned: %d", table.SchemaName, table.TableName, answer)
}
return nil
}

// Acquire the lock or return an error immediately
// We only Infof the initial acquisition.
logger.Infof("attempting to acquire metadata lock: %s", lockName)
logger.Infof("attempting to acquire metadata lock for %s.%s", table.SchemaName, table.TableName)
if err = getLock(); err != nil {
return nil, err
}
logger.Infof("acquired metadata lock: %s", lockName)
logger.Infof("acquired metadata lock: %s.%s", table.SchemaName, table.TableName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worth logging lockName too in one of the logs somewhere

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not easy to do that, because I'm computing the lock name server-side inside the GET_LOCK function call. So for now let's leave it like this and if in the future we have a real need to log the lock name, I think we could just add another MySQL round-trip to compute the lock name and then use it in GET_LOCK.


// Setup background refresh runner
ctx, mdl.cancel = context.WithCancel(ctx)
Expand All @@ -85,7 +90,7 @@ func NewMetadataLock(ctx context.Context, dsn string, lockName string, logger lo
select {
case <-ctx.Done():
// Close the dedicated connection to release the lock
logger.Warnf("releasing metadata lock: %s", lockName)
logger.Warnf("releasing metadata lock for %s.%s", table.SchemaName, table.TableName)
mdl.closeCh <- dbConn.Close()
return
case <-ticker.C:
Expand All @@ -97,7 +102,7 @@ func NewMetadataLock(ctx context.Context, dsn string, lockName string, logger lo
// and we can try again on the next tick interval.
logger.Warnf("could not refresh metadata lock: %s", err)
} else {
logger.Debugf("refreshed metadata lock: %s", lockName)
logger.Debugf("refreshed metadata lock for %s.%s", table.SchemaName, table.TableName)
}
}
}
Expand Down
33 changes: 18 additions & 15 deletions pkg/dbconn/metadatalock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,39 @@ import (
"testing"
"time"

"github.com/cashapp/spirit/pkg/table"

"github.com/cashapp/spirit/pkg/testutils"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)

func TestMetadataLock(t *testing.T) {
lockName := "test"
lockTableInfo := table.TableInfo{SchemaName: "test", TableName: "test"}
logger := logrus.New()
mdl, err := NewMetadataLock(context.Background(), testutils.DSN(), lockName, logger)
mdl, err := NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger)
assert.NoError(t, err)
assert.NotNil(t, mdl)

// Confirm a second lock cannot be acquired
_, err = NewMetadataLock(context.Background(), testutils.DSN(), lockName, logger)
_, err = NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger)
assert.ErrorContains(t, err, "lock is held by another connection")

// Close the original mdl
assert.NoError(t, mdl.Close())

// Confirm a new lock can be acquired
mdl3, err := NewMetadataLock(context.Background(), testutils.DSN(), lockName, logger)
mdl3, err := NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger)
assert.NoError(t, err)
assert.NoError(t, mdl3.Close())
}

func TestMetadataLockContextCancel(t *testing.T) {
lockName := "test-cancel"
lockTableInfo := table.TableInfo{SchemaName: "test", TableName: "test-cancel"}

logger := logrus.New()
ctx, cancel := context.WithCancel(context.Background())
mdl, err := NewMetadataLock(ctx, testutils.DSN(), lockName, logger)
mdl, err := NewMetadataLock(ctx, testutils.DSN(), &lockTableInfo, logger)
assert.NoError(t, err)
assert.NotNil(t, mdl)

Expand All @@ -46,16 +48,16 @@ func TestMetadataLockContextCancel(t *testing.T) {
<-mdl.closeCh

// Confirm the lock is released by acquiring a new one
mdl2, err := NewMetadataLock(context.Background(), testutils.DSN(), lockName, logger)
mdl2, err := NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger)
assert.NoError(t, err)
assert.NotNil(t, mdl2)
assert.NoError(t, mdl2.Close())
}

func TestMetadataLockRefresh(t *testing.T) {
lockName := "test-refresh"
lockTableInfo := table.TableInfo{SchemaName: "test", TableName: "test-refresh"}
logger := logrus.New()
mdl, err := NewMetadataLock(context.Background(), testutils.DSN(), lockName, logger, func(mdl *MetadataLock) {
mdl, err := NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger, func(mdl *MetadataLock) {
// override the refresh interval for faster testing
mdl.refreshInterval = 2 * time.Second
})
Expand All @@ -66,22 +68,23 @@ func TestMetadataLockRefresh(t *testing.T) {
time.Sleep(5 * time.Second)

// Confirm the lock is still held
_, err = NewMetadataLock(context.Background(), testutils.DSN(), lockName, logger)
_, err = NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger)
assert.ErrorContains(t, err, "lock is held by another connection")

// Close the lock
assert.NoError(t, mdl.Close())
}

func TestMetadataLockLength(t *testing.T) {
long := "thisisareallylongtablenamethisisareallylongtablenamethisisareallylongtablename"
empty := ""
lockTableInfo := table.TableInfo{SchemaName: "test", TableName: "thisisareallylongtablenamethisisareallylongtablenamethisisareallylongtablename"}
var empty *table.TableInfo

logger := logrus.New()

_, err := NewMetadataLock(context.Background(), testutils.DSN(), long, logger)
assert.ErrorContains(t, err, "metadata lock name is too long")
_, err := NewMetadataLock(context.Background(), testutils.DSN(), &lockTableInfo, logger)
// No error anymore after using a hash of the table name
assert.NoError(t, err)

_, err = NewMetadataLock(context.Background(), testutils.DSN(), empty, logger)
assert.ErrorContains(t, err, "metadata lock name is empty")
assert.ErrorContains(t, err, "metadata lock table info is nil")
}
2 changes: 1 addition & 1 deletion pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (r *Runner) Run(originalCtx context.Context) error {
}

// Take a metadata lock to prevent other migrations from running concurrently.
r.metadataLock, err = dbconn.NewMetadataLock(ctx, r.dsn(), r.table.TableName, r.logger)
r.metadataLock, err = dbconn.NewMetadataLock(ctx, r.dsn(), r.table, r.logger)
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/migration/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2711,6 +2711,8 @@ func TestResumeFromCheckpointE2EWithManualSentinel(t *testing.T) {
statusInterval = 500 * time.Millisecond

tableName := `resume_checkpoint_e2e_w_sentinel`
tableInfo := table.TableInfo{SchemaName: "test", TableName: tableName}

testutils.RunSQL(t, fmt.Sprintf(`DROP TABLE IF EXISTS %s, _%s_old, _%s_chkpnt, _%s_sentinel`, tableName, tableName, tableName, tableName))
table := fmt.Sprintf(`CREATE TABLE %s (
id int(11) NOT NULL AUTO_INCREMENT,
Expand Down Expand Up @@ -2766,8 +2768,8 @@ func TestResumeFromCheckpointE2EWithManualSentinel(t *testing.T) {
// Test that it's not possible to acquire metadata lock with name
// as tablename while the migration is running.
lock, err := dbconn.NewMetadataLock(ctx, testutils.DSN(),
tableName, &testLogger{})
assert.ErrorContains(t, err, "could not acquire metadata lock: resume_checkpoint_e2e_w_sentinel, lock is held by another connection")
&tableInfo, &testLogger{})
assert.ErrorContains(t, err, "could not acquire metadata lock for test.resume_checkpoint_e2e_w_sentinel, lock is held by another connection")
assert.Nil(t, lock)
break
}
Expand Down
Loading