Skip to content

Commit

Permalink
Merge pull request #7981 from ellemouton/handleRogueUpdates
Browse files Browse the repository at this point in the history
watchtower: handle rogue updates
  • Loading branch information
Roasbeef authored Sep 18, 2023
2 parents 7412482 + 12be6a3 commit 9f4a883
Show file tree
Hide file tree
Showing 9 changed files with 536 additions and 1,044 deletions.
6 changes: 5 additions & 1 deletion docs/release-notes/release-notes-0.17.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,14 @@ fails](https://github.com/lightningnetwork/lnd/pull/7876).
retried](https://github.com/lightningnetwork/lnd/pull/7927) with an
exponential back off.


* In the watchtower client, we [now explicitly
handle](https://github.com/lightningnetwork/lnd/pull/7981) the scenario where
a channel is closed while we still have an in-memory update for it.

* `lnd` [now properly handles a case where an erroneous force close attempt
would impeded start up](https://github.com/lightningnetwork/lnd/pull/7985).


# New Features
## Functional Enhancements

Expand Down
5 changes: 5 additions & 0 deletions lnrpc/wtclientrpc/wtclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@ func constructFunctionalOptions(includeSessions,
return opts, ackCounts, committedUpdateCounts
}

perNumRogueUpdates := func(s *wtdb.ClientSession, numUpdates uint16) {
ackCounts[s.ID] += numUpdates
}

perNumAckedUpdates := func(s *wtdb.ClientSession, id lnwire.ChannelID,
numUpdates uint16) {

Expand All @@ -405,6 +409,7 @@ func constructFunctionalOptions(includeSessions,
opts = []wtdb.ClientSessionListOption{
wtdb.WithPerNumAckedUpdates(perNumAckedUpdates),
wtdb.WithPerCommittedUpdate(perCommittedUpdate),
wtdb.WithPerRogueUpdateCount(perNumRogueUpdates),
}

if excludeExhaustedSessions {
Expand Down
13 changes: 13 additions & 0 deletions watchtower/wtclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,19 @@ func (c *TowerClient) handleClosableSessions(
// and handle it.
c.closableSessionQueue.Pop()

// Stop the session and remove it from the
// in-memory set.
err := c.activeSessions.StopAndRemove(
item.sessionID,
)
if err != nil {
c.log.Errorf("could not remove "+
"session(%s) from in-memory "+
"set: %v", item.sessionID, err)

return
}

// Fetch the session from the DB so that we can
// extract the Tower info.
sess, err := c.cfg.DB.GetClientSession(
Expand Down
162 changes: 157 additions & 5 deletions watchtower/wtclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
Expand Down Expand Up @@ -72,7 +73,7 @@ var (

addrScript, _ = txscript.PayToAddrScript(addr)

waitTime = 5 * time.Second
waitTime = 15 * time.Second

defaultTxPolicy = wtpolicy.TxPolicy{
BlobType: blob.TypeAltruistCommit,
Expand Down Expand Up @@ -398,7 +399,7 @@ type testHarness struct {
cfg harnessCfg
signer *wtmock.MockSigner
capacity lnwire.MilliSatoshi
clientDB *wtmock.ClientDB
clientDB *wtdb.ClientDB
clientCfg *wtclient.Config
client wtclient.Client
server *serverHarness
Expand Down Expand Up @@ -426,10 +427,26 @@ type harnessCfg struct {
noServerStart bool
}

func newClientDB(t *testing.T) *wtdb.ClientDB {
dbCfg := &kvdb.BoltConfig{
DBTimeout: kvdb.DefaultDBTimeout,
}

// Construct the ClientDB.
dir := t.TempDir()
bdb, err := wtdb.NewBoltBackendCreator(true, dir, "wtclient.db")(dbCfg)
require.NoError(t, err)

clientDB, err := wtdb.OpenClientDB(bdb)
require.NoError(t, err)

return clientDB
}

func newHarness(t *testing.T, cfg harnessCfg) *testHarness {
signer := wtmock.NewMockSigner()
mockNet := newMockNet()
clientDB := wtmock.NewClientDB()
clientDB := newClientDB(t)

server := newServerHarness(
t, mockNet, towerAddrStr, func(serverCfg *wtserver.Config) {
Expand Down Expand Up @@ -509,6 +526,7 @@ func newHarness(t *testing.T, cfg harnessCfg) *testHarness {
h.startClient()
t.Cleanup(func() {
require.NoError(t, h.client.Stop())
require.NoError(t, h.clientDB.Close())
})

h.makeChannel(0, h.cfg.localBalance, h.cfg.remoteBalance)
Expand Down Expand Up @@ -1342,7 +1360,7 @@ var clientTests = []clientTest{

// Wait for all the updates to be populated in the
// server's database.
h.server.waitForUpdates(hints, 3*time.Second)
h.server.waitForUpdates(hints, waitTime)
},
},
{
Expand Down Expand Up @@ -2053,7 +2071,7 @@ var clientTests = []clientTest{
// Now stop the client and reset its database.
require.NoError(h.t, h.client.Stop())

db := wtmock.NewClientDB()
db := newClientDB(h.t)
h.clientDB = db
h.clientCfg.DB = db

Expand Down Expand Up @@ -2398,6 +2416,140 @@ var clientTests = []clientTest{
server2.waitForUpdates(hints[numUpdates/2:], waitTime)
},
},
{
// This test shows that if a channel is closed while an update
// for that channel still exists in an in-memory queue
// somewhere then it is handled correctly by treating it as a
// rogue update.
name: "channel closed while update is un-acked",
cfg: harnessCfg{
localBalance: localBalance,
remoteBalance: remoteBalance,
policy: wtpolicy.Policy{
TxPolicy: defaultTxPolicy,
MaxUpdates: 5,
},
},
fn: func(h *testHarness) {
const (
numUpdates = 10
chanIDInt = 0
)

h.sendUpdatesOn = true

// Advance the channel with a few updates.
hints := h.advanceChannelN(chanIDInt, numUpdates)

// Backup a few these updates and wait for them to
// arrive at the server. Note that we back up enough
// updates to saturate the session so that the session
// is considered closable when the channel is deleted.
h.backupStates(chanIDInt, 0, numUpdates/2, nil)
h.server.waitForUpdates(hints[:numUpdates/2], waitTime)

// Now, restart the server in a state where it will not
// ack updates. This will allow us to wait for an
// update to be un-acked and persisted.
h.server.restart(func(cfg *wtserver.Config) {
cfg.NoAckUpdates = true
})

// Backup a few more of the update. These should remain
// in the client as un-acked.
h.backupStates(
chanIDInt, numUpdates/2, numUpdates-1, nil,
)

// Wait for the tasks to be bound to sessions.
fetchSessions := h.clientDB.FetchSessionCommittedUpdates
err := wait.Predicate(func() bool {
sessions, err := h.clientDB.ListClientSessions(
nil,
)
require.NoError(h.t, err)

var updates []wtdb.CommittedUpdate
for id := range sessions {
updates, err = fetchSessions(&id)
require.NoError(h.t, err)

if len(updates) != numUpdates-1 {
return true
}
}

return false
}, waitTime)
require.NoError(h.t, err)

// Now we close this channel while the update for it has
// not yet been acked.
h.closeChannel(chanIDInt, 1)

// Closable sessions should now be one.
err = wait.Predicate(func() bool {
cs, err := h.clientDB.ListClosableSessions()
require.NoError(h.t, err)

return len(cs) == 1
}, waitTime)
require.NoError(h.t, err)

// Now, restart the server and allow it to ack updates
// again.
h.server.restart(func(cfg *wtserver.Config) {
cfg.NoAckUpdates = false
})

// Mine a few blocks so that the session close range is
// surpassed.
h.mine(3)

// Wait for there to be no more closable sessions on the
// client side.
err = wait.Predicate(func() bool {
cs, err := h.clientDB.ListClosableSessions()
require.NoError(h.t, err)

return len(cs) == 0
}, waitTime)
require.NoError(h.t, err)

// Wait for channel to be "unregistered".
chanID := chanIDFromInt(chanIDInt)
err = wait.Predicate(func() bool {
err := h.client.BackupState(&chanID, 0)

return errors.Is(
err, wtclient.ErrUnregisteredChannel,
)
}, waitTime)
require.NoError(h.t, err)

// Show that the committed update for the closed channel
// is cleared from the DB.
err = wait.Predicate(func() bool {
sessions, err := h.clientDB.ListClientSessions(
nil,
)
require.NoError(h.t, err)

var updates []wtdb.CommittedUpdate
for id := range sessions {
updates, err = fetchSessions(&id)
require.NoError(h.t, err)

if len(updates) != 0 {
return false
}
}

return true
}, waitTime)
require.NoError(h.t, err)
},
},
}

// TestClient executes the client test suite, asserting the ability to backup
Expand Down
Loading

0 comments on commit 9f4a883

Please sign in to comment.