diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index f7d2fe594e..4429f5cf5c 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -67,6 +67,10 @@ signal relay was added. This signal has *no impact* on routing, and is deployed experimentally to assist ongoing channel jamming research. +* Add initial support for [quiescence](https://github.com/lightningnetwork/lnd/pull/8270). + This is a protocol gadget required for Dynamic Commitments and Splicing that + will be added later. + ## Functional Enhancements * [Add ability](https://github.com/lightningnetwork/lnd/pull/8998) to paginate wallet transactions. @@ -216,6 +220,7 @@ The underlying functionality between those two options remain the same. * Elle Mouton * George Tsagkarelis * hieblmi +* Keagan McClelland * Oliver Gugger * Pins * Viktor Tigerström diff --git a/feature/default_sets.go b/feature/default_sets.go index 616abc8ba3..9aee982ca7 100644 --- a/feature/default_sets.go +++ b/feature/default_sets.go @@ -84,6 +84,10 @@ var defaultSetDesc = setDesc{ SetNodeAnn: {}, // N SetInvoice: {}, // 9 }, + lnwire.QuiescenceOptional: { + SetInit: {}, // I + SetNodeAnn: {}, // N + }, lnwire.ShutdownAnySegwitOptional: { SetInit: {}, // I SetNodeAnn: {}, // N diff --git a/feature/manager.go b/feature/manager.go index e0bcfc96bb..9538832163 100644 --- a/feature/manager.go +++ b/feature/manager.go @@ -63,6 +63,9 @@ type Config struct { // NoRouteBlinding unsets route blinding feature bits. NoRouteBlinding bool + // NoQuiescence unsets quiescence feature bits. + NoQuiescence bool + // NoTaprootOverlay unsets the taproot overlay channel feature bits. NoTaprootOverlay bool @@ -199,6 +202,9 @@ func newManager(cfg Config, desc setDesc) (*Manager, error) { raw.Unset(lnwire.Bolt11BlindedPathsOptional) raw.Unset(lnwire.Bolt11BlindedPathsRequired) } + if cfg.NoQuiescence { + raw.Unset(lnwire.QuiescenceOptional) + } if cfg.NoTaprootOverlay { raw.Unset(lnwire.SimpleTaprootOverlayChansOptional) raw.Unset(lnwire.SimpleTaprootOverlayChansRequired) diff --git a/go.mod b/go.mod index 6e2bd9f77d..d0ff97ac7c 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/lightningnetwork/lightning-onion v1.2.1-0.20240712235311-98bd56499dfb github.com/lightningnetwork/lnd/cert v1.2.2 github.com/lightningnetwork/lnd/clock v1.1.1 - github.com/lightningnetwork/lnd/fn v1.2.3 + github.com/lightningnetwork/lnd/fn v1.2.5 github.com/lightningnetwork/lnd/healthcheck v1.2.6 github.com/lightningnetwork/lnd/kvdb v1.4.11 github.com/lightningnetwork/lnd/queue v1.1.1 diff --git a/go.sum b/go.sum index 86c1c8a21a..2ea42fd8ce 100644 --- a/go.sum +++ b/go.sum @@ -456,8 +456,8 @@ github.com/lightningnetwork/lnd/cert v1.2.2 h1:71YK6hogeJtxSxw2teq3eGeuy4rHGKcFf github.com/lightningnetwork/lnd/cert v1.2.2/go.mod h1:jQmFn/Ez4zhDgq2hnYSw8r35bqGVxViXhX6Cd7HXM6U= github.com/lightningnetwork/lnd/clock v1.1.1 h1:OfR3/zcJd2RhH0RU+zX/77c0ZiOnIMsDIBjgjWdZgA0= github.com/lightningnetwork/lnd/clock v1.1.1/go.mod h1:mGnAhPyjYZQJmebS7aevElXKTFDuO+uNFFfMXK1W8xQ= -github.com/lightningnetwork/lnd/fn v1.2.3 h1:Q1OrgNSgQynVheBNa16CsKVov1JI5N2AR6G07x9Mles= -github.com/lightningnetwork/lnd/fn v1.2.3/go.mod h1:SyFohpVrARPKH3XVAJZlXdVe+IwMYc4OMAvrDY32kw0= +github.com/lightningnetwork/lnd/fn v1.2.5 h1:pGMz0BDUxrhvOtShD4FIysdVy+ulfFAnFvTKjZO5Pp8= +github.com/lightningnetwork/lnd/fn v1.2.5/go.mod h1:SyFohpVrARPKH3XVAJZlXdVe+IwMYc4OMAvrDY32kw0= github.com/lightningnetwork/lnd/healthcheck v1.2.6 h1:1sWhqr93GdkWy4+6U7JxBfcyZIE78MhIHTJZfPx7qqI= github.com/lightningnetwork/lnd/healthcheck v1.2.6/go.mod h1:Mu02um4CWY/zdTOvFje7WJgJcHyX2zq/FG3MhOAiGaQ= github.com/lightningnetwork/lnd/kvdb v1.4.11 h1:fk1HMVFrsVK3xqU7q+JWHRgBltw/a2qIg1E3zazMb/8= diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 3dd70247d2..a3f4ff9a9a 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -170,6 +170,18 @@ type ChannelUpdateHandler interface { // will only ever be called once. If no CommitSig is owed in the // argument's LinkDirection, then we will call this hook immediately. OnCommitOnce(LinkDirection, func()) + + // InitStfu allows us to initiate quiescence on this link. It returns + // a receive only channel that will block until quiescence has been + // achieved, or definitively fails. The return value is the + // ChannelParty who holds the role of initiator or Err if the operation + // fails. + // + // This operation has been added to allow channels to be quiesced via + // RPC. It may be removed or reworked in the future as RPC initiated + // quiescence is a holdover until we have downstream protocols that use + // it. + InitStfu() <-chan fn.Result[lntypes.ChannelParty] } // CommitHookID is a value that is used to uniquely identify hooks in the diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 344bf77a4a..3eb398c1af 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -283,6 +283,10 @@ type ChannelLinkConfig struct { // invalid. DisallowRouteBlinding bool + // DisallowQuiescence is a flag that can be used to disable the + // quiescence protocol. + DisallowQuiescence bool + // MaxFeeExposure is the threshold in milli-satoshis after which we'll // restrict the flow of HTLCs and fee updates. MaxFeeExposure lnwire.MilliSatoshi @@ -392,6 +396,15 @@ type channelLink struct { // our next CommitSig. incomingCommitHooks hookMap + // quiescer is the state machine that tracks where this channel is with + // respect to the quiescence protocol. + quiescer Quiescer + + // quiescenceReqs is a queue of requests to quiesce this link. The + // members of the queue are send-only channels we should call back with + // the result. + quiescenceReqs chan StfuReq + // ContextGuard is a helper that encapsulates a wait group and quit // channel and allows contexts that either block or cancel on those // depending on the use case. @@ -467,6 +480,29 @@ func NewChannelLink(cfg ChannelLinkConfig, cfg.MaxFeeExposure = DefaultMaxFeeExposure } + var qsm Quiescer + if !cfg.DisallowQuiescence { + qsm = NewQuiescer(QuiescerCfg{ + chanID: lnwire.NewChanIDFromOutPoint( + channel.ChannelPoint(), + ), + channelInitiator: channel.Initiator(), + sendMsg: func(s lnwire.Stfu) error { + return cfg.Peer.SendMessage(false, &s) + }, + timeoutDuration: defaultQuiescenceTimeout, + onTimeout: func() { + cfg.Peer.Disconnect(ErrQuiescenceTimeout) + }, + }) + } else { + qsm = &quiescerNoop{} + } + + quiescenceReqs := make( + chan fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]], 1, + ) + return &channelLink{ cfg: cfg, channel: channel, @@ -476,6 +512,8 @@ func NewChannelLink(cfg ChannelLinkConfig, flushHooks: newHookMap(), outgoingCommitHooks: newHookMap(), incomingCommitHooks: newHookMap(), + quiescer: qsm, + quiescenceReqs: quiescenceReqs, ContextGuard: fn.NewContextGuard(), } } @@ -628,20 +666,37 @@ func (l *channelLink) WaitForShutdown() { // actively accept requests to forward HTLC's. We're able to forward HTLC's if // we are eligible to update AND the channel isn't currently flushing the // outgoing half of the channel. +// +// NOTE: MUST NOT be called from the main event loop. func (l *channelLink) EligibleToForward() bool { - return l.EligibleToUpdate() && - !l.IsFlushing(Outgoing) + l.RLock() + defer l.RUnlock() + + return l.eligibleToForward() +} + +// eligibleToForward returns a bool indicating if the channel is able to +// actively accept requests to forward HTLC's. We're able to forward HTLC's if +// we are eligible to update AND the channel isn't currently flushing the +// outgoing half of the channel. +// +// NOTE: MUST be called from the main event loop. +func (l *channelLink) eligibleToForward() bool { + return l.eligibleToUpdate() && !l.IsFlushing(Outgoing) } -// EligibleToUpdate returns a bool indicating if the channel is able to update +// eligibleToUpdate returns a bool indicating if the channel is able to update // channel state. We're able to update channel state if we know the remote // party's next revocation point. Otherwise, we can't initiate new channel // state. We also require that the short channel ID not be the all-zero source // ID, meaning that the channel has had its ID finalized. -func (l *channelLink) EligibleToUpdate() bool { +// +// NOTE: MUST be called from the main event loop. +func (l *channelLink) eligibleToUpdate() bool { return l.channel.RemoteNextRevocation() != nil && - l.ShortChanID() != hop.Source && - l.isReestablished() + l.channel.ShortChanID() != hop.Source && + l.isReestablished() && + l.quiescer.CanSendUpdates() } // EnableAdds sets the ChannelUpdateHandler state to allow UpdateAddHtlc's in @@ -705,6 +760,27 @@ func (l *channelLink) OnCommitOnce(direction LinkDirection, hook func()) { } } +// InitStfu allows us to initiate quiescence on this link. It returns a receive +// only channel that will block until quiescence has been achieved, or +// definitively fails. +// +// This operation has been added to allow channels to be quiesced via RPC. It +// may be removed or reworked in the future as RPC initiated quiescence is a +// holdover until we have downstream protocols that use it. +func (l *channelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] { + req, out := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]]( + fn.Unit{}, + ) + + select { + case l.quiescenceReqs <- req: + case <-l.Quit: + req.Resolve(fn.Err[lntypes.ChannelParty](ErrLinkShuttingDown)) + } + + return out +} + // isReestablished returns true if the link has successfully completed the // channel reestablishment dance. func (l *channelLink) isReestablished() bool { @@ -1450,6 +1526,20 @@ func (l *channelLink) htlcManager() { ) } + case qReq := <-l.quiescenceReqs: + l.quiescer.InitStfu(qReq) + + if l.noDanglingUpdates(lntypes.Local) { + err := l.quiescer.SendOwedStfu() + if err != nil { + l.stfuFailf( + "SendOwedStfu: %s", err.Error(), + ) + res := fn.Err[lntypes.ChannelParty](err) + qReq.Resolve(res) + } + } + case <-l.Quit: return } @@ -1584,13 +1674,14 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error { return errors.New("not an UpdateAddHTLC packet") } - // If we are flushing the link in the outgoing direction we can't add - // new htlcs to the link and we need to bounce it - if l.IsFlushing(Outgoing) { + // If we are flushing the link in the outgoing direction or we have + // already sent Stfu, then we can't add new htlcs to the link and we + // need to bounce it. + if l.IsFlushing(Outgoing) || !l.quiescer.CanSendUpdates() { l.mailBox.FailAdd(pkt) return NewDetailedLinkError( - &lnwire.FailPermanentChannelFailure{}, + &lnwire.FailTemporaryChannelFailure{}, OutgoingFailureLinkNotEligible, ) } @@ -1693,6 +1784,15 @@ func (l *channelLink) handleDownstreamUpdateAdd(pkt *htlcPacket) error { // // TODO(roasbeef): add sync ntfn to ensure switch always has consistent view? func (l *channelLink) handleDownstreamPkt(pkt *htlcPacket) { + if pkt.htlc.MsgType().IsChannelUpdate() && + !l.quiescer.CanSendUpdates() { + + l.log.Warnf("unable to process channel update. "+ + "ChannelID=%v is quiescent.", l.ChanID) + + return + } + switch htlc := pkt.htlc.(type) { case *lnwire.UpdateAddHTLC: // Handle add message. The returned error can be ignored, @@ -1937,6 +2037,13 @@ func (l *channelLink) cleanupSpuriousResponse(pkt *htlcPacket) { // updates from the upstream peer. The upstream peer is the peer whom we have a // direct channel with, updating our respective commitment chains. func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { + // First check if the message is an update and we are capable of + // receiving updates right now. + if msg.MsgType().IsChannelUpdate() && !l.quiescer.CanRecvUpdates() { + l.stfuFailf("update received after stfu: %T", msg) + return + } + switch msg := msg.(type) { case *lnwire.UpdateAddHTLC: if l.IsFlushing(Incoming) { @@ -2325,6 +2432,15 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } } + // If we need to send out an Stfu, this would be the time to do + // so. + if l.noDanglingUpdates(lntypes.Local) { + err = l.quiescer.SendOwedStfu() + if err != nil { + l.stfuFailf("sendOwedStfu: %v", err.Error()) + } + } + // Now that we have finished processing the incoming CommitSig // and sent out our RevokeAndAck, we invoke the flushHooks if // the channel state is clean. @@ -2391,8 +2507,22 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } } + // If we can send updates then we can process adds in case we + // are the exit hop and need to send back resolutions, or in + // case there are validity issues with the packets. Otherwise + // we defer the action until resume. + // + // We are free to process the settles and fails without this + // check since processing those can't result in further updates + // to this channel link. + if l.quiescer.CanSendUpdates() { + l.processRemoteAdds(fwdPkg) + } else { + l.quiescer.OnResume(func() { + l.processRemoteAdds(fwdPkg) + }) + } l.processRemoteSettleFails(fwdPkg) - l.processRemoteAdds(fwdPkg) // If the link failed during processing the adds, we must // return to ensure we won't attempted to update the state @@ -2458,6 +2588,12 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // Update the mailbox's feerate as well. l.mailBox.SetFeeRate(fee) + case *lnwire.Stfu: + err := l.handleStfu(msg) + if err != nil { + l.stfuFailf("handleStfu: %v", err.Error()) + } + // In the case where we receive a warning message from our peer, just // log it and move on. We choose not to disconnect from our peer, // although we "MAY" do so according to the specification. @@ -2490,6 +2626,50 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } +// handleStfu implements the top-level logic for handling the Stfu message from +// our peer. +func (l *channelLink) handleStfu(stfu *lnwire.Stfu) error { + if !l.noDanglingUpdates(lntypes.Remote) { + return ErrPendingRemoteUpdates + } + err := l.quiescer.RecvStfu(*stfu) + if err != nil { + return err + } + + // If we can immediately send an Stfu response back, we will. + if l.noDanglingUpdates(lntypes.Local) { + return l.quiescer.SendOwedStfu() + } + + return nil +} + +// stfuFailf fails the link in the case where the requirements of the quiescence +// protocol are violated. In all cases we opt to drop the connection as only +// link state (as opposed to channel state) is affected. +func (l *channelLink) stfuFailf(format string, args ...interface{}) { + l.failf(LinkFailureError{ + code: ErrStfuViolation, + FailureAction: LinkFailureDisconnect, + PermanentFailure: false, + Warning: true, + }, format, args...) +} + +// noDanglingUpdates returns true when there are 0 updates that were originally +// issued by whose on either the Local or Remote commitment transaction. +func (l *channelLink) noDanglingUpdates(whose lntypes.ChannelParty) bool { + pendingOnLocal := l.channel.NumPendingUpdates( + whose, lntypes.Local, + ) + pendingOnRemote := l.channel.NumPendingUpdates( + whose, lntypes.Remote, + ) + + return pendingOnLocal == 0 && pendingOnRemote == 0 +} + // ackDownStreamPackets is responsible for removing htlcs from a link's mailbox // for packets delivered from server, and cleaning up any circuits closed by // signing a previous commitment txn. This method ensures that the circuits are @@ -3293,7 +3473,7 @@ func (l *channelLink) updateChannelFee(feePerKw chainfee.SatPerKWeight) error { // We skip sending the UpdateFee message if the channel is not // currently eligible to forward messages. - if !l.EligibleToUpdate() { + if !l.eligibleToUpdate() { l.log.Debugf("skipping fee update for inactive channel") return nil } diff --git a/htlcswitch/link_test.go b/htlcswitch/link_test.go index c72a255384..7259723403 100644 --- a/htlcswitch/link_test.go +++ b/htlcswitch/link_test.go @@ -7540,3 +7540,92 @@ func TestLinkFlushHooksCalled(t *testing.T) { ctx.receiveRevAndAckAliceToBob() assertHookCalled(true) } + +// TestLinkQuiescenceExitHopProcessingDeferred ensures that we do not send back +// htlc resolution messages in the case where the link is quiescent AND we are +// the exit hop. This is needed because we handle exit hop processing in the +// link instead of the switch and we process htlc resolutions when we receive +// a RevokeAndAck. Because of this we need to ensure that we hold off on +// processing the remote adds when we are quiescent. Later, when the channel +// update traffic is allowed to resume, we will need to verify that the actions +// we didn't run during the initial RevokeAndAck are run. +func TestLinkQuiescenceExitHopProcessingDeferred(t *testing.T) { + t.Parallel() + + // Initialize two channel state machines for testing. + alice, bob, err := createMirroredChannel( + t, btcutil.SatoshiPerBitcoin, btcutil.SatoshiPerBitcoin, + ) + require.NoError(t, err) + + // Build a single edge network to test channel quiescence. + network := newTwoHopNetwork( + t, alice.channel, bob.channel, testStartingHeight, + ) + aliceLink := network.aliceChannelLink + bobLink := network.bobChannelLink + + // Generate an invoice for Bob so that Alice can pay him. + htlcID := uint64(0) + htlc, invoice := generateHtlcAndInvoice(t, htlcID) + err = network.bobServer.registry.AddInvoice( + nil, *invoice, htlc.PaymentHash, + ) + require.NoError(t, err) + + // Establish a payment circuit for Alice + circuit := &PaymentCircuit{ + Incoming: CircuitKey{ + HtlcID: htlcID, + }, + PaymentHash: htlc.PaymentHash, + } + circuitMap := network.aliceServer.htlcSwitch.circuits + _, err = circuitMap.CommitCircuits(circuit) + require.NoError(t, err) + + // Add a switch packet to Alice's switch so that she can initialize the + // payment attempt. + err = aliceLink.handleSwitchPacket(&htlcPacket{ + incomingHTLCID: htlcID, + htlc: htlc, + circuit: circuit, + }) + require.NoError(t, err) + + // give alice enough time to fire the update_add + // TODO(proofofkeags): make this not depend on a flakey sleep. + <-time.After(time.Millisecond) + + // bob initiates stfu which he can do immediately since he doesn't have + // local updates + <-bobLink.InitStfu() + + // wait for other possible messages to play out + <-time.After(1 * time.Second) + + ensureNoUpdateAfterStfu := func(t *testing.T, trace []lnwire.Message) { + stfuReceived := false + for _, msg := range trace { + if msg.MsgType() == lnwire.MsgStfu { + stfuReceived = true + continue + } + + if stfuReceived && msg.MsgType().IsChannelUpdate() { + t.Fatalf("channel update after stfu: %v", + msg.MsgType()) + } + } + } + + network.aliceServer.protocolTraceMtx.Lock() + ensureNoUpdateAfterStfu(t, network.aliceServer.protocolTrace) + network.aliceServer.protocolTraceMtx.Unlock() + + network.bobServer.protocolTraceMtx.Lock() + ensureNoUpdateAfterStfu(t, network.bobServer.protocolTrace) + network.bobServer.protocolTraceMtx.Unlock() + + // TODO(proofofkeags): make sure these actions are run on resume. +} diff --git a/htlcswitch/linkfailure.go b/htlcswitch/linkfailure.go index 47f8065f76..495bd46fc2 100644 --- a/htlcswitch/linkfailure.go +++ b/htlcswitch/linkfailure.go @@ -51,6 +51,12 @@ const ( // circuit map. This is non-fatal and will resolve itself (usually // within several minutes). ErrCircuitError + + // ErrStfuViolation indicates that the quiescence protocol has been + // violated, either because Stfu has been sent/received at an invalid + // time, or that an update has been sent/received while the channel is + // quiesced. + ErrStfuViolation ) // LinkFailureAction is an enum-like type that describes the action that should @@ -122,6 +128,8 @@ func (e LinkFailureError) Error() string { return "unable to resume channel, recovery required" case ErrCircuitError: return "non-fatal circuit map error" + case ErrStfuViolation: + return "quiescence protocol executed improperly" default: return "unknown error" } diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 6de60b38a1..0a3364ae27 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -153,8 +153,10 @@ type mockServer struct { t testing.TB - name string - messages chan lnwire.Message + name string + messages chan lnwire.Message + protocolTraceMtx sync.Mutex + protocolTrace []lnwire.Message id [33]byte htlcSwitch *Switch @@ -289,6 +291,10 @@ func (s *mockServer) Start() error { for { select { case msg := <-s.messages: + s.protocolTraceMtx.Lock() + s.protocolTrace = append(s.protocolTrace, msg) + s.protocolTraceMtx.Unlock() + var shouldSkip bool for _, interceptor := range s.interceptorFuncs { @@ -627,6 +633,8 @@ func (s *mockServer) readHandler(message lnwire.Message) error { targetChan = msg.ChanID case *lnwire.UpdateFee: targetChan = msg.ChanID + case *lnwire.Stfu: + targetChan = msg.ChanID default: return fmt.Errorf("unknown message type: %T", msg) } @@ -950,6 +958,14 @@ func (f *mockChannelLink) OnFlushedOnce(func()) { func (f *mockChannelLink) OnCommitOnce(LinkDirection, func()) { // TODO(proofofkeags): Implement } +func (f *mockChannelLink) InitStfu() <-chan fn.Result[lntypes.ChannelParty] { + // TODO(proofofkeags): Implement + c := make(chan fn.Result[lntypes.ChannelParty], 1) + + c <- fn.Errf[lntypes.ChannelParty]("InitStfu not implemented") + + return c +} func (f *mockChannelLink) FundingCustomBlob() fn.Option[tlv.Blob] { return fn.None[tlv.Blob]() diff --git a/htlcswitch/quiescer.go b/htlcswitch/quiescer.go new file mode 100644 index 0000000000..5a76221576 --- /dev/null +++ b/htlcswitch/quiescer.go @@ -0,0 +1,575 @@ +package htlcswitch + +import ( + "fmt" + "sync" + "time" + + "github.com/btcsuite/btclog/v2" + "github.com/lightningnetwork/lnd/build" + "github.com/lightningnetwork/lnd/fn" + "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/lnwire" +) + +var ( + // ErrInvalidStfu indicates that the Stfu we have received is invalid. + // This can happen in instances where we have not sent Stfu but we have + // received one with the initiator field set to false. + ErrInvalidStfu = fmt.Errorf("stfu received is invalid") + + // ErrStfuAlreadySent indicates that this channel has already sent an + // Stfu message for this negotiation. + ErrStfuAlreadySent = fmt.Errorf("stfu already sent") + + // ErrStfuAlreadyRcvd indicates that this channel has already received + // an Stfu message for this negotiation. + ErrStfuAlreadyRcvd = fmt.Errorf("stfu already received") + + // ErrNoQuiescenceInitiator indicates that the caller has requested the + // quiescence initiator for a channel that is not yet quiescent. + ErrNoQuiescenceInitiator = fmt.Errorf( + "indeterminate quiescence initiator: channel is not quiescent", + ) + + // ErrPendingRemoteUpdates indicates that we have received an Stfu while + // the remote party has issued updates that are not yet bilaterally + // committed. + ErrPendingRemoteUpdates = fmt.Errorf( + "stfu received with pending remote updates", + ) + + // ErrPendingLocalUpdates indicates that we are attempting to send an + // Stfu while we have issued updates that are not yet bilaterally + // committed. + ErrPendingLocalUpdates = fmt.Errorf( + "stfu send attempted with pending local updates", + ) + + // ErrQuiescenceTimeout indicates that the quiescer has been quiesced + // beyond the allotted time. + ErrQuiescenceTimeout = fmt.Errorf( + "quiescence timeout", + ) +) + +const defaultQuiescenceTimeout = 30 * time.Second + +type StfuReq = fn.Req[fn.Unit, fn.Result[lntypes.ChannelParty]] + +// Quiescer is the public interface of the quiescence mechanism. Callers of the +// quiescence API should not need any methods besides the ones detailed here. +type Quiescer interface { + // IsQuiescent returns true if the state machine has been driven all the + // way to completion. If this returns true, processes that depend on + // channel quiescence may proceed. + IsQuiescent() bool + + // QuiescenceInitiator determines which ChannelParty is the initiator of + // quiescence for the purposes of downstream protocols. If the channel + // is not currently quiescent, this method will return + // ErrNoDownstreamLeader. + QuiescenceInitiator() fn.Result[lntypes.ChannelParty] + + // InitStfu instructs the quiescer that we intend to begin a quiescence + // negotiation where we are the initiator. We don't yet send stfu yet + // because we need to wait for the link to give us a valid opportunity + // to do so. + InitStfu(req StfuReq) + + // RecvStfu is called when we receive an Stfu message from the remote. + RecvStfu(stfu lnwire.Stfu) error + + // CanRecvUpdates returns true if we haven't yet received an Stfu which + // would mark the end of the remote's ability to send updates. + CanRecvUpdates() bool + + // CanSendUpdates returns true if we haven't yet sent an Stfu which + // would mark the end of our ability to send updates. + CanSendUpdates() bool + + // SendOwedStfu sends Stfu if it owes one. It returns an error if the + // state machine is in an invalid state. + SendOwedStfu() error + + // OnResume accepts a no return closure that will run when the quiescer + // is resumed. + OnResume(hook func()) + + // Resume runs all of the deferred actions that have accumulated while + // the channel has been quiescent and then resets the quiescer state to + // its initial state. + Resume() +} + +// QuiescerCfg is a config structure used to initialize a quiescer giving it the +// appropriate functionality to interact with the channel state that the +// quiescer must syncrhonize with. +type QuiescerCfg struct { + // chanID marks what channel we are managing the state machine for. This + // is important because the quiescer needs to know the ChannelID to + // construct the Stfu message. + chanID lnwire.ChannelID + + // channelInitiator indicates which ChannelParty originally opened the + // channel. This is used to break ties when both sides of the channel + // send Stfu claiming to be the initiator. + channelInitiator lntypes.ChannelParty + + // sendMsg is a function that can be used to send an Stfu message over + // the wire. + sendMsg func(lnwire.Stfu) error + + // timeoutDuration is the Duration that we will wait from the moment the + // channel is considered quiescent before we call the onTimeout function + timeoutDuration time.Duration + + // onTimeout is a function that will be called in the event that the + // Quiescer has not been resumed before the timeout is reached. If + // Quiescer.Resume is called before the timeout has been raeached, then + // onTimeout will not be called until the quiescer reaches a quiescent + // state again. + onTimeout func() +} + +// QuiescerLive is a state machine that tracks progression through the +// quiescence protocol. +type QuiescerLive struct { + cfg QuiescerCfg + + // log is a quiescer-scoped logging instance. + log btclog.Logger + + // localInit indicates whether our path through this state machine was + // initiated by our node. This can be true or false independently of + // remoteInit. + localInit bool + + // remoteInit indicates whether we received Stfu from our peer where the + // message indicated that the remote node believes it was the initiator. + // This can be true or false independently of localInit. + remoteInit bool + + // sent tracks whether or not we have emitted Stfu for sending. + sent bool + + // received tracks whether or not we have received Stfu from our peer. + received bool + + // activeQuiescenceRequest is a possibly None Request that we should + // resolve when we complete quiescence. + activeQuiescenceReq fn.Option[StfuReq] + + // resumeQueue is a slice of hooks that will be called when the quiescer + // is resumed. These are actions that needed to be deferred while the + // channel was quiescent. + resumeQueue []func() + + // timeoutTimer is a field that is used to hold onto the timeout job + // when we reach quiescence. + timeoutTimer *time.Timer + + sync.RWMutex +} + +// NewQuiescer creates a new quiescer for the given channel. +func NewQuiescer(cfg QuiescerCfg) Quiescer { + logPrefix := fmt.Sprintf("Quiescer(%v):", cfg.chanID) + + return &QuiescerLive{ + cfg: cfg, + log: build.NewPrefixLog(logPrefix, log), + } +} + +// RecvStfu is called when we receive an Stfu message from the remote. +func (q *QuiescerLive) RecvStfu(msg lnwire.Stfu) error { + q.Lock() + defer q.Unlock() + + return q.recvStfu(msg) +} + +// recvStfu is called when we receive an Stfu message from the remote. +func (q *QuiescerLive) recvStfu(msg lnwire.Stfu) error { + // At the time of this writing, this check that we have already received + // an Stfu is not strictly necessary, according to the specification. + // However, it is fishy if we do and it is unclear how we should handle + // such a case so we will err on the side of caution. + if q.received { + return fmt.Errorf("%w for channel %v", ErrStfuAlreadyRcvd, + q.cfg.chanID) + } + + // We need to check that the Stfu we are receiving is valid. + if !q.sent && !msg.Initiator { + return fmt.Errorf("%w for channel %v", ErrInvalidStfu, + q.cfg.chanID) + } + + if !q.canRecvStfu() { + return fmt.Errorf("%w for channel %v", ErrPendingRemoteUpdates, + q.cfg.chanID) + } + + q.received = true + + // If the remote party sets the initiator bit to true then we will + // remember that they are making a claim to the initiator role. This + // does not necessarily mean they will get it, though. + q.remoteInit = msg.Initiator + + // Since we just received an Stfu, we may have a newly quiesced state. + // If so, we will try to resolve any outstanding StfuReqs. + q.tryResolveStfuReq() + + if q.isQuiescent() { + q.startTimeout() + } + + return nil +} + +// MakeStfu is called when we are ready to send an Stfu message. It returns the +// Stfu message to be sent. +func (q *QuiescerLive) MakeStfu() fn.Result[lnwire.Stfu] { + q.RLock() + defer q.RUnlock() + + return q.makeStfu() +} + +// makeStfu is called when we are ready to send an Stfu message. It returns the +// Stfu message to be sent. +func (q *QuiescerLive) makeStfu() fn.Result[lnwire.Stfu] { + if q.sent { + return fn.Errf[lnwire.Stfu]("%w for channel %v", + ErrStfuAlreadySent, q.cfg.chanID) + } + + if !q.canSendStfu() { + return fn.Errf[lnwire.Stfu]("%w for channel %v", + ErrPendingLocalUpdates, q.cfg.chanID) + } + + stfu := lnwire.Stfu{ + ChanID: q.cfg.chanID, + Initiator: q.localInit, + } + + return fn.Ok(stfu) +} + +// OweStfu returns true if we owe the other party an Stfu. We owe the remote an +// Stfu when we have received but not yet sent an Stfu, or we are the initiator +// but have not yet sent an Stfu. +func (q *QuiescerLive) OweStfu() bool { + q.RLock() + defer q.RUnlock() + + return q.oweStfu() +} + +// oweStfu returns true if we owe the other party an Stfu. We owe the remote an +// Stfu when we have received but not yet sent an Stfu, or we are the initiator +// but have not yet sent an Stfu. +func (q *QuiescerLive) oweStfu() bool { + return (q.received || q.localInit) && !q.sent +} + +// NeedStfu returns true if the remote owes us an Stfu. They owe us an Stfu when +// we have sent but not yet received an Stfu. +func (q *QuiescerLive) NeedStfu() bool { + q.RLock() + defer q.RUnlock() + + return q.needStfu() +} + +// needStfu returns true if the remote owes us an Stfu. They owe us an Stfu when +// we have sent but not yet received an Stfu. +func (q *QuiescerLive) needStfu() bool { + q.RLock() + defer q.RUnlock() + + return q.sent && !q.received +} + +// IsQuiescent returns true if the state machine has been driven all the way to +// completion. If this returns true, processes that depend on channel quiescence +// may proceed. +func (q *QuiescerLive) IsQuiescent() bool { + q.RLock() + defer q.RUnlock() + + return q.isQuiescent() +} + +// isQuiescent returns true if the state machine has been driven all the way to +// completion. If this returns true, processes that depend on channel quiescence +// may proceed. +func (q *QuiescerLive) isQuiescent() bool { + return q.sent && q.received +} + +// QuiescenceInitiator determines which ChannelParty is the initiator of +// quiescence for the purposes of downstream protocols. If the channel is not +// currently quiescent, this method will return ErrNoQuiescenceInitiator. +func (q *QuiescerLive) QuiescenceInitiator() fn.Result[lntypes.ChannelParty] { + q.RLock() + defer q.RUnlock() + + return q.quiescenceInitiator() +} + +// quiescenceInitiator determines which ChannelParty is the initiator of +// quiescence for the purposes of downstream protocols. If the channel is not +// currently quiescent, this method will return ErrNoQuiescenceInitiator. +func (q *QuiescerLive) quiescenceInitiator() fn.Result[lntypes.ChannelParty] { + switch { + case !q.isQuiescent(): + return fn.Err[lntypes.ChannelParty](ErrNoQuiescenceInitiator) + + case q.localInit && q.remoteInit: + // In the case of a tie, the channel initiator wins. + return fn.Ok(q.cfg.channelInitiator) + + case q.localInit: + return fn.Ok(lntypes.Local) + + case q.remoteInit: + return fn.Ok(lntypes.Remote) + } + + // unreachable + return fn.Err[lntypes.ChannelParty](ErrNoQuiescenceInitiator) +} + +// CanSendUpdates returns true if we haven't yet sent an Stfu which would mark +// the end of our ability to send updates. +func (q *QuiescerLive) CanSendUpdates() bool { + q.RLock() + defer q.RUnlock() + + return q.canSendUpdates() +} + +// canSendUpdates returns true if we haven't yet sent an Stfu which would mark +// the end of our ability to send updates. +func (q *QuiescerLive) canSendUpdates() bool { + return !q.sent && !q.localInit +} + +// CanRecvUpdates returns true if we haven't yet received an Stfu which would +// mark the end of the remote's ability to send updates. +func (q *QuiescerLive) CanRecvUpdates() bool { + q.RLock() + defer q.RUnlock() + + return q.canRecvUpdates() +} + +// canRecvUpdates returns true if we haven't yet received an Stfu which would +// mark the end of the remote's ability to send updates. +func (q *QuiescerLive) canRecvUpdates() bool { + return !q.received +} + +// CanSendStfu returns true if we can send an Stfu. +func (q *QuiescerLive) CanSendStfu(numPendingLocalUpdates uint64) bool { + q.RLock() + defer q.RUnlock() + + return q.canSendStfu() +} + +// canSendStfu returns true if we can send an Stfu. +func (q *QuiescerLive) canSendStfu() bool { + return !q.sent +} + +// CanRecvStfu returns true if we can receive an Stfu. +func (q *QuiescerLive) CanRecvStfu() bool { + q.RLock() + defer q.RUnlock() + + return q.canRecvStfu() +} + +// canRecvStfu returns true if we can receive an Stfu. +func (q *QuiescerLive) canRecvStfu() bool { + return !q.received +} + +// SendOwedStfu sends Stfu if it owes one. It returns an error if the state +// machine is in an invalid state. +func (q *QuiescerLive) SendOwedStfu() error { + q.Lock() + defer q.Unlock() + + return q.sendOwedStfu() +} + +// sendOwedStfu sends Stfu if it owes one. It returns an error if the state +// machine is in an invalid state. +func (q *QuiescerLive) sendOwedStfu() error { + if !q.oweStfu() || !q.canSendStfu() { + return nil + } + + err := q.makeStfu().Sink(q.cfg.sendMsg) + + if err == nil { + q.sent = true + + // Since we just sent an Stfu, we may have a newly quiesced + // state. If so, we will try to resolve any outstanding + // StfuReqs. + q.tryResolveStfuReq() + + if q.isQuiescent() { + q.startTimeout() + } + } + + return err +} + +// TryResolveStfuReq attempts to resolve the active quiescence request if the +// state machine has reached a quiescent state. +func (q *QuiescerLive) TryResolveStfuReq() { + q.Lock() + defer q.Unlock() + + q.tryResolveStfuReq() +} + +// tryResolveStfuReq attempts to resolve the active quiescence request if the +// state machine has reached a quiescent state. +func (q *QuiescerLive) tryResolveStfuReq() { + q.activeQuiescenceReq.WhenSome( + func(req StfuReq) { + if q.isQuiescent() { + req.Resolve(q.quiescenceInitiator()) + q.activeQuiescenceReq = fn.None[StfuReq]() + } + }, + ) +} + +// InitStfu instructs the quiescer that we intend to begin a quiescence +// negotiation where we are the initiator. We don't yet send stfu yet because +// we need to wait for the link to give us a valid opportunity to do so. +func (q *QuiescerLive) InitStfu(req StfuReq) { + q.Lock() + defer q.Unlock() + + q.initStfu(req) +} + +// initStfu instructs the quiescer that we intend to begin a quiescence +// negotiation where we are the initiator. We don't yet send stfu yet because +// we need to wait for the link to give us a valid opportunity to do so. +func (q *QuiescerLive) initStfu(req StfuReq) { + if q.localInit { + req.Resolve(fn.Errf[lntypes.ChannelParty]( + "quiescence already requested", + )) + + return + } + + q.localInit = true + q.activeQuiescenceReq = fn.Some(req) +} + +// OnResume accepts a no return closure that will run when the quiescer is +// resumed. +func (q *QuiescerLive) OnResume(hook func()) { + q.Lock() + defer q.Unlock() + + q.onResume(hook) +} + +// onResume accepts a no return closure that will run when the quiescer is +// resumed. +func (q *QuiescerLive) onResume(hook func()) { + q.resumeQueue = append(q.resumeQueue, hook) +} + +// Resume runs all of the deferred actions that have accumulated while the +// channel has been quiescent and then resets the quiescer state to its initial +// state. +func (q *QuiescerLive) Resume() { + q.Lock() + defer q.Unlock() + + q.resume() +} + +// resume runs all of the deferred actions that have accumulated while the +// channel has been quiescent and then resets the quiescer state to its initial +// state. +func (q *QuiescerLive) resume() { + q.log.Debug("quiescence terminated, resuming htlc traffic") + + // since we are resuming we want to cancel the quiescence timeout + // action. + q.cancelTimeout() + + for _, hook := range q.resumeQueue { + hook() + } + q.localInit = false + q.remoteInit = false + q.sent = false + q.received = false + q.resumeQueue = nil +} + +// startTimeout starts the timeout function that fires if the quiescer remains +// in a quiesced state for too long. If this function is called multiple times +// only the last one will have an effect. +func (q *QuiescerLive) startTimeout() { + if q.cfg.onTimeout == nil { + return + } + + old := q.timeoutTimer + + q.timeoutTimer = time.AfterFunc(q.cfg.timeoutDuration, q.cfg.onTimeout) + + if old != nil { + old.Stop() + } +} + +// cancelTimeout cancels the timeout function that would otherwise fire if the +// quiescer remains in a quiesced state too long. If this function is called +// before startTimeout or after another call to cancelTimeout, the effect will +// be a noop. +func (q *QuiescerLive) cancelTimeout() { + if q.timeoutTimer != nil { + q.timeoutTimer.Stop() + q.timeoutTimer = nil + } +} + +type quiescerNoop struct{} + +var _ Quiescer = (*quiescerNoop)(nil) + +func (q *quiescerNoop) InitStfu(req StfuReq) { + req.Resolve(fn.Errf[lntypes.ChannelParty]("quiescence not supported")) +} +func (q *quiescerNoop) RecvStfu(_ lnwire.Stfu) error { return nil } +func (q *quiescerNoop) CanRecvUpdates() bool { return true } +func (q *quiescerNoop) CanSendUpdates() bool { return true } +func (q *quiescerNoop) SendOwedStfu() error { return nil } +func (q *quiescerNoop) IsQuiescent() bool { return false } +func (q *quiescerNoop) OnResume(hook func()) { hook() } +func (q *quiescerNoop) Resume() {} +func (q *quiescerNoop) QuiescenceInitiator() fn.Result[lntypes.ChannelParty] { + return fn.Err[lntypes.ChannelParty](ErrNoQuiescenceInitiator) +} diff --git a/htlcswitch/quiescer_test.go b/htlcswitch/quiescer_test.go new file mode 100644 index 0000000000..da08909d57 --- /dev/null +++ b/htlcswitch/quiescer_test.go @@ -0,0 +1,374 @@ +package htlcswitch + +import ( + "bytes" + "testing" + "time" + + "github.com/lightningnetwork/lnd/fn" + "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" +) + +var cid = lnwire.ChannelID(bytes.Repeat([]byte{0x00}, 32)) + +type quiescerTestHarness struct { + quiescer *QuiescerLive + conn <-chan lnwire.Stfu +} + +func initQuiescerTestHarness( + channelInitiator lntypes.ChannelParty) *quiescerTestHarness { + + conn := make(chan lnwire.Stfu, 1) + harness := &quiescerTestHarness{ + conn: conn, + } + + quiescer, _ := NewQuiescer(QuiescerCfg{ + chanID: cid, + channelInitiator: channelInitiator, + sendMsg: func(msg lnwire.Stfu) error { + conn <- msg + return nil + }, + }).(*QuiescerLive) + + harness.quiescer = quiescer + + return harness +} + +// TestQuiescerDoubleRecvInvalid ensures that we get an error response when we +// receive the Stfu message twice during the lifecycle of the quiescer. +func TestQuiescerDoubleRecvInvalid(t *testing.T) { + t.Parallel() + + harness := initQuiescerTestHarness(lntypes.Local) + + msg := lnwire.Stfu{ + ChanID: cid, + Initiator: true, + } + + err := harness.quiescer.RecvStfu(msg) + require.NoError(t, err) + err = harness.quiescer.RecvStfu(msg) + require.Error(t, err, ErrStfuAlreadyRcvd) +} + +// TestQuiescenceRemoteInit ensures that we can successfully traverse the state +// graph of quiescence beginning with the Remote party initiating quiescence. +func TestQuiescenceRemoteInit(t *testing.T) { + t.Parallel() + + harness := initQuiescerTestHarness(lntypes.Local) + + msg := lnwire.Stfu{ + ChanID: cid, + Initiator: true, + } + + err := harness.quiescer.RecvStfu(msg) + require.NoError(t, err) + + err = harness.quiescer.SendOwedStfu() + require.NoError(t, err) + + select { + case msg := <-harness.conn: + require.False(t, msg.Initiator) + default: + t.Fatalf("stfu not sent when expected") + } +} + +func TestQuiescenceLocalInit(t *testing.T) { + t.Parallel() + + harness := initQuiescerTestHarness(lntypes.Local) + + msg := lnwire.Stfu{ + ChanID: cid, + Initiator: true, + } + + stfuReq, stfuRes := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]]( + fn.Unit{}, + ) + harness.quiescer.InitStfu(stfuReq) + + err := harness.quiescer.SendOwedStfu() + require.NoError(t, err) + + select { + case msg := <-harness.conn: + require.True(t, msg.Initiator) + default: + t.Fatalf("stfu not sent when expected") + } + + err = harness.quiescer.RecvStfu(msg) + require.NoError(t, err) + + select { + case party := <-stfuRes: + require.Equal(t, fn.Ok(lntypes.Local), party) + default: + t.Fatalf("quiescence request not resolved") + } +} + +// TestQuiescenceInitiator ensures that the quiescenceInitiator is the Remote +// party when we have a receive first traversal of the quiescer's state graph. +func TestQuiescenceInitiator(t *testing.T) { + t.Parallel() + + // Remote Initiated + harness := initQuiescerTestHarness(lntypes.Local) + require.True(t, harness.quiescer.QuiescenceInitiator().IsErr()) + + // Receive + msg := lnwire.Stfu{ + ChanID: cid, + Initiator: true, + } + require.NoError(t, harness.quiescer.RecvStfu(msg)) + require.True(t, harness.quiescer.QuiescenceInitiator().IsErr()) + + // Send + require.NoError(t, harness.quiescer.SendOwedStfu()) + require.Equal( + t, harness.quiescer.QuiescenceInitiator(), + fn.Ok(lntypes.Remote), + ) + + // Local Initiated + harness = initQuiescerTestHarness(lntypes.Local) + require.True(t, harness.quiescer.quiescenceInitiator().IsErr()) + + req, res := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]]( + fn.Unit{}, + ) + harness.quiescer.initStfu(req) + req2, res2 := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]]( + fn.Unit{}, + ) + harness.quiescer.initStfu(req2) + select { + case initiator := <-res2: + require.True(t, initiator.IsErr()) + default: + t.Fatal("quiescence request not resolved") + } + + require.NoError( + t, harness.quiescer.sendOwedStfu(), + ) + require.True(t, harness.quiescer.quiescenceInitiator().IsErr()) + + msg = lnwire.Stfu{ + ChanID: cid, + Initiator: false, + } + require.NoError(t, harness.quiescer.recvStfu(msg)) + require.True(t, harness.quiescer.quiescenceInitiator().IsOk()) + + select { + case initiator := <-res: + require.Equal(t, fn.Ok(lntypes.Local), initiator) + default: + t.Fatal("quiescence request not resolved") + } +} + +// TestQuiescenceCantReceiveUpdatesAfterStfu tests that we can receive channel +// updates prior to but not after we receive Stfu. +func TestQuiescenceCantReceiveUpdatesAfterStfu(t *testing.T) { + t.Parallel() + + harness := initQuiescerTestHarness(lntypes.Local) + require.True(t, harness.quiescer.CanRecvUpdates()) + + msg := lnwire.Stfu{ + ChanID: cid, + Initiator: true, + } + require.NoError(t, harness.quiescer.RecvStfu(msg)) + require.False(t, harness.quiescer.CanRecvUpdates()) +} + +// TestQuiescenceCantSendUpdatesAfterStfu tests that we can send channel updates +// prior to but not after we send Stfu. +func TestQuiescenceCantSendUpdatesAfterStfu(t *testing.T) { + t.Parallel() + + harness := initQuiescerTestHarness(lntypes.Local) + require.True(t, harness.quiescer.CanSendUpdates()) + + msg := lnwire.Stfu{ + ChanID: cid, + Initiator: true, + } + + err := harness.quiescer.RecvStfu(msg) + require.NoError(t, err) + + err = harness.quiescer.SendOwedStfu() + require.NoError(t, err) + + require.False(t, harness.quiescer.CanSendUpdates()) +} + +// TestQuiescenceStfuNotNeededAfterRecv tests that after we receive an Stfu we +// do not needStfu either before or after receiving it if we do not initiate +// quiescence. +func TestQuiescenceStfuNotNeededAfterRecv(t *testing.T) { + t.Parallel() + + harness := initQuiescerTestHarness(lntypes.Local) + + msg := lnwire.Stfu{ + ChanID: cid, + Initiator: true, + } + require.False(t, harness.quiescer.NeedStfu()) + + require.NoError(t, harness.quiescer.RecvStfu(msg)) + + require.False(t, harness.quiescer.NeedStfu()) +} + +// TestQuiescenceInappropriateMakeStfuReturnsErr ensures that we cannot call +// makeStfu at times when it would be a protocol violation to send it. +func TestQuiescenceInappropriateMakeStfuReturnsErr(t *testing.T) { + t.Parallel() + + harness := initQuiescerTestHarness(lntypes.Local) + + msg := lnwire.Stfu{ + ChanID: cid, + Initiator: true, + } + require.NoError(t, harness.quiescer.RecvStfu(msg)) + require.True(t, harness.quiescer.MakeStfu().IsOk()) + + require.NoError(t, harness.quiescer.SendOwedStfu()) + require.True(t, harness.quiescer.MakeStfu().IsErr()) +} + +// TestQuiescerTieBreaker ensures that if both parties attempt to claim the +// initiator role that the result of the negotiation breaks the tie using the +// channel initiator. +func TestQuiescerTieBreaker(t *testing.T) { + t.Parallel() + + for _, initiator := range []lntypes.ChannelParty{ + lntypes.Local, lntypes.Remote, + } { + harness := initQuiescerTestHarness(initiator) + + msg := lnwire.Stfu{ + ChanID: cid, + Initiator: true, + } + + req, res := fn.NewReq[fn.Unit, fn.Result[lntypes.ChannelParty]]( + fn.Unit{}, + ) + + harness.quiescer.InitStfu(req) + require.NoError(t, harness.quiescer.RecvStfu(msg)) + require.NoError(t, harness.quiescer.SendOwedStfu()) + + select { + case party := <-res: + require.Equal(t, fn.Ok(initiator), party) + default: + t.Fatal("quiescence party unavailable") + } + } +} + +// TestQuiescerResume ensures that the hooks that are attached to the quiescer +// are called when we call the resume method and no earlier. +func TestQuiescerResume(t *testing.T) { + t.Parallel() + + harness := initQuiescerTestHarness(lntypes.Local) + + msg := lnwire.Stfu{ + ChanID: cid, + Initiator: true, + } + + require.NoError(t, harness.quiescer.RecvStfu(msg)) + require.NoError(t, harness.quiescer.SendOwedStfu()) + + require.True(t, harness.quiescer.IsQuiescent()) + var resumeHooksCalled = false + harness.quiescer.OnResume(func() { + resumeHooksCalled = true + }) + require.False(t, resumeHooksCalled) + + harness.quiescer.Resume() + require.True(t, resumeHooksCalled) + require.False(t, harness.quiescer.IsQuiescent()) +} + +func TestQuiescerTimeoutTriggers(t *testing.T) { + t.Parallel() + + harness := initQuiescerTestHarness(lntypes.Local) + + msg := lnwire.Stfu{ + ChanID: cid, + Initiator: true, + } + + timeoutGate := make(chan struct{}) + + harness.quiescer.cfg.timeoutDuration = time.Second + harness.quiescer.cfg.onTimeout = func() { close(timeoutGate) } + + err := harness.quiescer.RecvStfu(msg) + require.NoError(t, err) + err = harness.quiescer.SendOwedStfu() + require.NoError(t, err) + + select { + case <-timeoutGate: + case <-time.After(2 * harness.quiescer.cfg.timeoutDuration): + t.Fatal("quiescence timeout did not trigger") + } +} + +func TestQuiescerTimeoutAborts(t *testing.T) { + t.Parallel() + + harness := initQuiescerTestHarness(lntypes.Local) + + msg := lnwire.Stfu{ + ChanID: cid, + Initiator: true, + } + + timeoutGate := make(chan struct{}) + + harness.quiescer.cfg.timeoutDuration = time.Second + harness.quiescer.cfg.onTimeout = func() { close(timeoutGate) } + + err := harness.quiescer.RecvStfu(msg) + require.NoError(t, err) + err = harness.quiescer.SendOwedStfu() + require.NoError(t, err) + harness.quiescer.Resume() + + select { + case <-timeoutGate: + t.Fatal("quiescence timeout triggered despite being resumed") + case <-time.After(2 * harness.quiescer.cfg.timeoutDuration): + } +} diff --git a/itest/list_on_test.go b/itest/list_on_test.go index 7560fb16be..fe6c373855 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -710,4 +710,8 @@ var allTestCases = []*lntest.TestCase{ Name: "experimental endorsement", TestFunc: testExperimentalEndorsement, }, + { + Name: "quiescence", + TestFunc: testQuiescence, + }, } diff --git a/itest/lnd_quiescence_test.go b/itest/lnd_quiescence_test.go new file mode 100644 index 0000000000..7c2c274a21 --- /dev/null +++ b/itest/lnd_quiescence_test.go @@ -0,0 +1,48 @@ +package itest + +import ( + "github.com/btcsuite/btcd/btcutil" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnrpc/devrpc" + "github.com/lightningnetwork/lnd/lnrpc/routerrpc" + "github.com/lightningnetwork/lnd/lntest" + "github.com/stretchr/testify/require" +) + +// testQuiescence tests whether we can come to agreement on quiescence of a +// channel. We initiate quiescence via RPC and if it succeeds we verify that +// the expected initiator is the resulting initiator. +// +// NOTE FOR REVIEW: this could be improved by blasting the channel with HTLC +// traffic on both sides to increase the surface area of the change under test. +func testQuiescence(ht *lntest.HarnessTest) { + alice, bob := ht.Alice, ht.Bob + + chanPoint := ht.OpenChannel(bob, alice, lntest.OpenChannelParams{ + Amt: btcutil.Amount(1000000), + }) + defer ht.CloseChannel(bob, chanPoint) + + res := alice.RPC.Quiesce(&devrpc.QuiescenceRequest{ + ChanId: chanPoint, + }) + + require.True(ht, res.Initiator) + + req := &routerrpc.SendPaymentRequest{ + Dest: ht.Alice.PubKey[:], + Amt: 100, + PaymentHash: ht.Random32Bytes(), + FinalCltvDelta: finalCltvDelta, + TimeoutSeconds: 60, + FeeLimitMsat: noFeeLimitMsat, + } + + ht.SendPaymentAssertFail( + ht.Bob, req, + // This fails with insufficient balance because the bandwidth + // manager reports 0 bandwidth if a link is not eligible for + // forwarding, which is the case during quiescence. + lnrpc.PaymentFailureReason_FAILURE_REASON_INSUFFICIENT_BALANCE, + ) +} diff --git a/lncfg/protocol.go b/lncfg/protocol.go index 80809f49d6..5852d032e0 100644 --- a/lncfg/protocol.go +++ b/lncfg/protocol.go @@ -141,6 +141,11 @@ func (l *ProtocolOptions) NoExperimentalEndorsement() bool { return l.NoExperimentalEndorsementOption } +// NoQuiescence returns true if quiescence is disabled. +func (l *ProtocolOptions) NoQuiescence() bool { + return true +} + // CustomMessageOverrides returns the set of protocol messages that we override // to allow custom handling. func (p ProtocolOptions) CustomMessageOverrides() []uint16 { diff --git a/lncfg/protocol_integration.go b/lncfg/protocol_integration.go index 52cc658c3b..e9f32d9dfb 100644 --- a/lncfg/protocol_integration.go +++ b/lncfg/protocol_integration.go @@ -73,6 +73,9 @@ type ProtocolOptions struct { // NoExperimentalEndorsementOption disables experimental endorsement. NoExperimentalEndorsementOption bool `long:"no-experimental-endorsement" description:"do not forward experimental endorsement signals"` + // NoQuiescenceOption disables quiescence for all channels. + NoQuiescenceOption bool `long:"no-quiescence" description:"do not allow or advertise quiescence for any channel"` + // CustomMessage allows the custom message APIs to handle messages with // the provided protocol numbers, which fall outside the custom message // number range. @@ -136,6 +139,11 @@ func (l *ProtocolOptions) NoExperimentalEndorsement() bool { return l.NoExperimentalEndorsementOption } +// NoQuiescence returns true if quiescence is disabled. +func (l *ProtocolOptions) NoQuiescence() bool { + return l.NoQuiescenceOption +} + // CustomMessageOverrides returns the set of protocol messages that we override // to allow custom handling. func (l ProtocolOptions) CustomMessageOverrides() []uint16 { diff --git a/lnrpc/devrpc/config_active.go b/lnrpc/devrpc/config_active.go index 6fc274f2e9..da5cd5be97 100644 --- a/lnrpc/devrpc/config_active.go +++ b/lnrpc/devrpc/config_active.go @@ -6,6 +6,7 @@ package devrpc import ( "github.com/btcsuite/btcd/chaincfg" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/htlcswitch" ) // Config is the primary configuration struct for the DEV RPC server. It @@ -16,4 +17,5 @@ import ( type Config struct { ActiveNetParams *chaincfg.Params GraphDB *channeldb.ChannelGraph + Switch *htlcswitch.Switch } diff --git a/lnrpc/devrpc/dev.pb.go b/lnrpc/devrpc/dev.pb.go index 890f4e3171..d8de47fc8a 100644 --- a/lnrpc/devrpc/dev.pb.go +++ b/lnrpc/devrpc/dev.pb.go @@ -59,6 +59,103 @@ func (*ImportGraphResponse) Descriptor() ([]byte, []int) { return file_devrpc_dev_proto_rawDescGZIP(), []int{0} } +type QuiescenceRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The channel point of the channel we wish to quiesce + ChanId *lnrpc.ChannelPoint `protobuf:"bytes,1,opt,name=chan_id,json=chanId,proto3" json:"chan_id,omitempty"` +} + +func (x *QuiescenceRequest) Reset() { + *x = QuiescenceRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_devrpc_dev_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *QuiescenceRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*QuiescenceRequest) ProtoMessage() {} + +func (x *QuiescenceRequest) ProtoReflect() protoreflect.Message { + mi := &file_devrpc_dev_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use QuiescenceRequest.ProtoReflect.Descriptor instead. +func (*QuiescenceRequest) Descriptor() ([]byte, []int) { + return file_devrpc_dev_proto_rawDescGZIP(), []int{1} +} + +func (x *QuiescenceRequest) GetChanId() *lnrpc.ChannelPoint { + if x != nil { + return x.ChanId + } + return nil +} + +type QuiescenceResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Indicates whether or not we hold the initiator role or not once the + // negotiation completes + Initiator bool `protobuf:"varint,1,opt,name=initiator,proto3" json:"initiator,omitempty"` +} + +func (x *QuiescenceResponse) Reset() { + *x = QuiescenceResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_devrpc_dev_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *QuiescenceResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*QuiescenceResponse) ProtoMessage() {} + +func (x *QuiescenceResponse) ProtoReflect() protoreflect.Message { + mi := &file_devrpc_dev_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use QuiescenceResponse.ProtoReflect.Descriptor instead. +func (*QuiescenceResponse) Descriptor() ([]byte, []int) { + return file_devrpc_dev_proto_rawDescGZIP(), []int{2} +} + +func (x *QuiescenceResponse) GetInitiator() bool { + if x != nil { + return x.Initiator + } + return false +} + var File_devrpc_dev_proto protoreflect.FileDescriptor var file_devrpc_dev_proto_rawDesc = []byte{ @@ -66,15 +163,26 @@ var file_devrpc_dev_proto_rawDesc = []byte{ 0x74, 0x6f, 0x12, 0x06, 0x64, 0x65, 0x76, 0x72, 0x70, 0x63, 0x1a, 0x0f, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x6e, 0x69, 0x6e, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x15, 0x0a, 0x13, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x32, 0x46, 0x0a, 0x03, 0x44, 0x65, 0x76, 0x12, 0x3f, 0x0a, 0x0b, 0x49, 0x6d, 0x70, - 0x6f, 0x72, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, 0x12, 0x13, 0x2e, 0x6c, 0x6e, 0x72, 0x70, 0x63, - 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x47, 0x72, 0x61, 0x70, 0x68, 0x1a, 0x1b, 0x2e, - 0x64, 0x65, 0x76, 0x72, 0x70, 0x63, 0x2e, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x47, 0x72, 0x61, - 0x70, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x6e, 0x69, - 0x6e, 0x67, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x2f, 0x6c, 0x6e, 0x64, 0x2f, 0x6c, 0x6e, - 0x72, 0x70, 0x63, 0x2f, 0x64, 0x65, 0x76, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x73, 0x65, 0x22, 0x41, 0x0a, 0x11, 0x51, 0x75, 0x69, 0x65, 0x73, 0x63, 0x65, 0x6e, 0x63, 0x65, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x07, 0x63, 0x68, 0x61, 0x6e, 0x5f, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x6c, 0x6e, 0x72, 0x70, 0x63, + 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x06, 0x63, + 0x68, 0x61, 0x6e, 0x49, 0x64, 0x22, 0x32, 0x0a, 0x12, 0x51, 0x75, 0x69, 0x65, 0x73, 0x63, 0x65, + 0x6e, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x69, + 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, + 0x69, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x32, 0x88, 0x01, 0x0a, 0x03, 0x44, 0x65, + 0x76, 0x12, 0x3f, 0x0a, 0x0b, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, + 0x12, 0x13, 0x2e, 0x6c, 0x6e, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, + 0x47, 0x72, 0x61, 0x70, 0x68, 0x1a, 0x1b, 0x2e, 0x64, 0x65, 0x76, 0x72, 0x70, 0x63, 0x2e, 0x49, + 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x47, 0x72, 0x61, 0x70, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x40, 0x0a, 0x07, 0x51, 0x75, 0x69, 0x65, 0x73, 0x63, 0x65, 0x12, 0x19, 0x2e, + 0x64, 0x65, 0x76, 0x72, 0x70, 0x63, 0x2e, 0x51, 0x75, 0x69, 0x65, 0x73, 0x63, 0x65, 0x6e, 0x63, + 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x64, 0x65, 0x76, 0x72, 0x70, + 0x63, 0x2e, 0x51, 0x75, 0x69, 0x65, 0x73, 0x63, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x6e, 0x69, 0x6e, 0x67, 0x6e, 0x65, 0x74, 0x77, + 0x6f, 0x72, 0x6b, 0x2f, 0x6c, 0x6e, 0x64, 0x2f, 0x6c, 0x6e, 0x72, 0x70, 0x63, 0x2f, 0x64, 0x65, + 0x76, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -89,19 +197,25 @@ func file_devrpc_dev_proto_rawDescGZIP() []byte { return file_devrpc_dev_proto_rawDescData } -var file_devrpc_dev_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_devrpc_dev_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_devrpc_dev_proto_goTypes = []interface{}{ (*ImportGraphResponse)(nil), // 0: devrpc.ImportGraphResponse - (*lnrpc.ChannelGraph)(nil), // 1: lnrpc.ChannelGraph + (*QuiescenceRequest)(nil), // 1: devrpc.QuiescenceRequest + (*QuiescenceResponse)(nil), // 2: devrpc.QuiescenceResponse + (*lnrpc.ChannelPoint)(nil), // 3: lnrpc.ChannelPoint + (*lnrpc.ChannelGraph)(nil), // 4: lnrpc.ChannelGraph } var file_devrpc_dev_proto_depIdxs = []int32{ - 1, // 0: devrpc.Dev.ImportGraph:input_type -> lnrpc.ChannelGraph - 0, // 1: devrpc.Dev.ImportGraph:output_type -> devrpc.ImportGraphResponse - 1, // [1:2] is the sub-list for method output_type - 0, // [0:1] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 3, // 0: devrpc.QuiescenceRequest.chan_id:type_name -> lnrpc.ChannelPoint + 4, // 1: devrpc.Dev.ImportGraph:input_type -> lnrpc.ChannelGraph + 1, // 2: devrpc.Dev.Quiesce:input_type -> devrpc.QuiescenceRequest + 0, // 3: devrpc.Dev.ImportGraph:output_type -> devrpc.ImportGraphResponse + 2, // 4: devrpc.Dev.Quiesce:output_type -> devrpc.QuiescenceResponse + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name } func init() { file_devrpc_dev_proto_init() } @@ -122,6 +236,30 @@ func file_devrpc_dev_proto_init() { return nil } } + file_devrpc_dev_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*QuiescenceRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_devrpc_dev_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*QuiescenceResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -129,7 +267,7 @@ func file_devrpc_dev_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_devrpc_dev_proto_rawDesc, NumEnums: 0, - NumMessages: 1, + NumMessages: 3, NumExtensions: 0, NumServices: 1, }, diff --git a/lnrpc/devrpc/dev.pb.gw.go b/lnrpc/devrpc/dev.pb.gw.go index d702804cd6..10b0a60c0d 100644 --- a/lnrpc/devrpc/dev.pb.gw.go +++ b/lnrpc/devrpc/dev.pb.gw.go @@ -66,6 +66,40 @@ func local_request_Dev_ImportGraph_0(ctx context.Context, marshaler runtime.Mars } +func request_Dev_Quiesce_0(ctx context.Context, marshaler runtime.Marshaler, client DevClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq QuiescenceRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := client.Quiesce(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_Dev_Quiesce_0(ctx context.Context, marshaler runtime.Marshaler, server DevServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq QuiescenceRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := server.Quiesce(ctx, &protoReq) + return msg, metadata, err + +} + // RegisterDevHandlerServer registers the http handlers for service Dev to "mux". // UnaryRPC :call DevServer directly. // StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. @@ -95,6 +129,29 @@ func RegisterDevHandlerServer(ctx context.Context, mux *runtime.ServeMux, server }) + mux.Handle("POST", pattern_Dev_Quiesce_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/devrpc.Dev/Quiesce", runtime.WithHTTPPathPattern("/v2/dev/quiesce")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_Dev_Quiesce_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_Dev_Quiesce_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } @@ -156,13 +213,37 @@ func RegisterDevHandlerClient(ctx context.Context, mux *runtime.ServeMux, client }) + mux.Handle("POST", pattern_Dev_Quiesce_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateContext(ctx, mux, req, "/devrpc.Dev/Quiesce", runtime.WithHTTPPathPattern("/v2/dev/quiesce")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Dev_Quiesce_0(rctx, inboundMarshaler, client, req, pathParams) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_Dev_Quiesce_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + return nil } var ( pattern_Dev_ImportGraph_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v2", "dev", "importgraph"}, "")) + + pattern_Dev_Quiesce_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v2", "dev", "quiesce"}, "")) ) var ( forward_Dev_ImportGraph_0 = runtime.ForwardResponseMessage + + forward_Dev_Quiesce_0 = runtime.ForwardResponseMessage ) diff --git a/lnrpc/devrpc/dev.pb.json.go b/lnrpc/devrpc/dev.pb.json.go index 954917a1a3..2163a13de6 100644 --- a/lnrpc/devrpc/dev.pb.json.go +++ b/lnrpc/devrpc/dev.pb.json.go @@ -46,4 +46,29 @@ func RegisterDevJSONCallbacks(registry map[string]func(ctx context.Context, } callback(string(respBytes), nil) } + + registry["devrpc.Dev.Quiesce"] = func(ctx context.Context, + conn *grpc.ClientConn, reqJSON string, callback func(string, error)) { + + req := &QuiescenceRequest{} + err := marshaler.Unmarshal([]byte(reqJSON), req) + if err != nil { + callback("", err) + return + } + + client := NewDevClient(conn) + resp, err := client.Quiesce(ctx, req) + if err != nil { + callback("", err) + return + } + + respBytes, err := marshaler.Marshal(resp) + if err != nil { + callback("", err) + return + } + callback(string(respBytes), nil) + } } diff --git a/lnrpc/devrpc/dev.proto b/lnrpc/devrpc/dev.proto index 502fbadc8b..4b4fe778fd 100644 --- a/lnrpc/devrpc/dev.proto +++ b/lnrpc/devrpc/dev.proto @@ -30,7 +30,25 @@ service Dev { used for development. */ rpc ImportGraph (lnrpc.ChannelGraph) returns (ImportGraphResponse); + + /* + Quiesce instructs a channel to initiate the quiescence (stfu) protocol. This + RPC is for testing purposes only. The commit that adds it will be removed + once interop is confirmed. + */ + rpc Quiesce (QuiescenceRequest) returns (QuiescenceResponse); } message ImportGraphResponse { } + +message QuiescenceRequest { + // The channel point of the channel we wish to quiesce + lnrpc.ChannelPoint chan_id = 1; +} + +message QuiescenceResponse { + // Indicates whether or not we hold the initiator role or not once the + // negotiation completes + bool initiator = 1; +} diff --git a/lnrpc/devrpc/dev.swagger.json b/lnrpc/devrpc/dev.swagger.json index 16e16d7be8..b540a8e4ce 100644 --- a/lnrpc/devrpc/dev.swagger.json +++ b/lnrpc/devrpc/dev.swagger.json @@ -48,12 +48,63 @@ "Dev" ] } + }, + "/v2/dev/quiesce": { + "post": { + "summary": "Quiesce instructs a channel to initiate the quiescence (stfu) protocol. This\nRPC is for testing purposes only. The commit that adds it will be removed\nonce interop is confirmed.", + "operationId": "Dev_Quiesce", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/devrpcQuiescenceResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/devrpcQuiescenceRequest" + } + } + ], + "tags": [ + "Dev" + ] + } } }, "definitions": { "devrpcImportGraphResponse": { "type": "object" }, + "devrpcQuiescenceRequest": { + "type": "object", + "properties": { + "chan_id": { + "$ref": "#/definitions/lnrpcChannelPoint", + "title": "The channel point of the channel we wish to quiesce" + } + } + }, + "devrpcQuiescenceResponse": { + "type": "object", + "properties": { + "initiator": { + "type": "boolean", + "title": "Indicates whether or not we hold the initiator role or not once the\nnegotiation completes" + } + } + }, "lnrpcChannelEdge": { "type": "object", "properties": { @@ -116,6 +167,25 @@ }, "description": "Returns a new instance of the directed channel graph." }, + "lnrpcChannelPoint": { + "type": "object", + "properties": { + "funding_txid_bytes": { + "type": "string", + "format": "byte", + "description": "Txid of the funding transaction. When using REST, this field must be\nencoded as base64." + }, + "funding_txid_str": { + "type": "string", + "description": "Hex-encoded string representing the byte-reversed hash of the funding\ntransaction." + }, + "output_index": { + "type": "integer", + "format": "int64", + "title": "The index of the output of the funding transaction" + } + } + }, "lnrpcFeature": { "type": "object", "properties": { diff --git a/lnrpc/devrpc/dev.yaml b/lnrpc/devrpc/dev.yaml index 18c4e26b40..cb849dc638 100644 --- a/lnrpc/devrpc/dev.yaml +++ b/lnrpc/devrpc/dev.yaml @@ -6,3 +6,6 @@ http: - selector: devrpc.Dev.ImportGraph post: "/v2/dev/importgraph" body: "*" + - selector: devrpc.Dev.Quiesce + post: "/v2/dev/quiesce" + body: "*" diff --git a/lnrpc/devrpc/dev_grpc.pb.go b/lnrpc/devrpc/dev_grpc.pb.go index 1744c12a3b..1eb6266fbe 100644 --- a/lnrpc/devrpc/dev_grpc.pb.go +++ b/lnrpc/devrpc/dev_grpc.pb.go @@ -23,6 +23,10 @@ type DevClient interface { // ImportGraph imports a ChannelGraph into the graph database. Should only be // used for development. ImportGraph(ctx context.Context, in *lnrpc.ChannelGraph, opts ...grpc.CallOption) (*ImportGraphResponse, error) + // Quiesce instructs a channel to initiate the quiescence (stfu) protocol. This + // RPC is for testing purposes only. The commit that adds it will be removed + // once interop is confirmed. + Quiesce(ctx context.Context, in *QuiescenceRequest, opts ...grpc.CallOption) (*QuiescenceResponse, error) } type devClient struct { @@ -42,6 +46,15 @@ func (c *devClient) ImportGraph(ctx context.Context, in *lnrpc.ChannelGraph, opt return out, nil } +func (c *devClient) Quiesce(ctx context.Context, in *QuiescenceRequest, opts ...grpc.CallOption) (*QuiescenceResponse, error) { + out := new(QuiescenceResponse) + err := c.cc.Invoke(ctx, "/devrpc.Dev/Quiesce", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // DevServer is the server API for Dev service. // All implementations must embed UnimplementedDevServer // for forward compatibility @@ -50,6 +63,10 @@ type DevServer interface { // ImportGraph imports a ChannelGraph into the graph database. Should only be // used for development. ImportGraph(context.Context, *lnrpc.ChannelGraph) (*ImportGraphResponse, error) + // Quiesce instructs a channel to initiate the quiescence (stfu) protocol. This + // RPC is for testing purposes only. The commit that adds it will be removed + // once interop is confirmed. + Quiesce(context.Context, *QuiescenceRequest) (*QuiescenceResponse, error) mustEmbedUnimplementedDevServer() } @@ -60,6 +77,9 @@ type UnimplementedDevServer struct { func (UnimplementedDevServer) ImportGraph(context.Context, *lnrpc.ChannelGraph) (*ImportGraphResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method ImportGraph not implemented") } +func (UnimplementedDevServer) Quiesce(context.Context, *QuiescenceRequest) (*QuiescenceResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Quiesce not implemented") +} func (UnimplementedDevServer) mustEmbedUnimplementedDevServer() {} // UnsafeDevServer may be embedded to opt out of forward compatibility for this service. @@ -91,6 +111,24 @@ func _Dev_ImportGraph_Handler(srv interface{}, ctx context.Context, dec func(int return interceptor(ctx, in, info, handler) } +func _Dev_Quiesce_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(QuiescenceRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DevServer).Quiesce(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/devrpc.Dev/Quiesce", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DevServer).Quiesce(ctx, req.(*QuiescenceRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Dev_ServiceDesc is the grpc.ServiceDesc for Dev service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -102,6 +140,10 @@ var Dev_ServiceDesc = grpc.ServiceDesc{ MethodName: "ImportGraph", Handler: _Dev_ImportGraph_Handler, }, + { + MethodName: "Quiesce", + Handler: _Dev_Quiesce_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "devrpc/dev.proto", diff --git a/lnrpc/devrpc/dev_server.go b/lnrpc/devrpc/dev_server.go index 662c0d08d9..ad135e8dfb 100644 --- a/lnrpc/devrpc/dev_server.go +++ b/lnrpc/devrpc/dev_server.go @@ -18,8 +18,10 @@ import ( "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb/models" + "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwire" "google.golang.org/grpc" "gopkg.in/macaroon-bakery.v2/bakery" @@ -40,6 +42,10 @@ var ( Entity: "offchain", Action: "write", }}, + "/devrpc.Dev/Quiesce": {{ + Entity: "offchain", + Action: "write", + }}, } ) @@ -56,6 +62,7 @@ type ServerShell struct { type Server struct { started int32 // To be used atomically. shutdown int32 // To be used atomically. + quit chan struct{} // Required by the grpc-gateway/v2 library for forward compatibility. // Must be after the atomically used variables to not break struct @@ -78,7 +85,8 @@ func New(cfg *Config) (*Server, lnrpc.MacaroonPerms, error) { // We don't create any new macaroons for this subserver, instead reuse // existing onchain/offchain permissions. server := &Server{ - cfg: cfg, + quit: make(chan struct{}), + cfg: cfg, } return server, macPermissions, nil @@ -103,6 +111,8 @@ func (s *Server) Stop() error { return nil } + close(s.quit) + return nil } @@ -342,3 +352,33 @@ func (s *Server) ImportGraph(ctx context.Context, return &ImportGraphResponse{}, nil } + +// Quiesce initiates the quiescence process for the channel with the given +// channel ID. This method will block until the channel is fully quiesced. +func (s *Server) Quiesce(_ context.Context, in *QuiescenceRequest) ( + *QuiescenceResponse, error) { + + txid, err := lnrpc.GetChanPointFundingTxid(in.ChanId) + if err != nil { + return nil, err + } + + op := wire.NewOutPoint(txid, in.ChanId.OutputIndex) + cid := lnwire.NewChanIDFromOutPoint(*op) + ln, err := s.cfg.Switch.GetLink(cid) + if err != nil { + return nil, err + } + + select { + case result := <-ln.InitStfu(): + mkResp := func(b lntypes.ChannelParty) *QuiescenceResponse { + return &QuiescenceResponse{Initiator: b.IsLocal()} + } + + return fn.MapOk(mkResp)(result).Unpack() + + case <-s.quit: + return nil, fmt.Errorf("server shutting down") + } +} diff --git a/lntest/rpc/harness_rpc.go b/lntest/rpc/harness_rpc.go index 0640581dcd..2e08a84947 100644 --- a/lntest/rpc/harness_rpc.go +++ b/lntest/rpc/harness_rpc.go @@ -43,7 +43,7 @@ type HarnessRPC struct { ChainKit chainrpc.ChainKitClient NeutrinoKit neutrinorpc.NeutrinoKitClient Peer peersrpc.PeersClient - DevRPC devrpc.DevClient + Dev devrpc.DevClient // Name is the HarnessNode's name. Name string @@ -75,7 +75,7 @@ func NewHarnessRPC(ctxt context.Context, t *testing.T, c *grpc.ClientConn, ChainKit: chainrpc.NewChainKitClient(c), NeutrinoKit: neutrinorpc.NewNeutrinoKitClient(c), Peer: peersrpc.NewPeersClient(c), - DevRPC: devrpc.NewDevClient(c), + Dev: devrpc.NewDevClient(c), Name: name, } diff --git a/lntest/rpc/lnd.go b/lntest/rpc/lnd.go index f0ed52fd88..1657caac8d 100644 --- a/lntest/rpc/lnd.go +++ b/lntest/rpc/lnd.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnrpc/devrpc" "github.com/stretchr/testify/require" ) @@ -730,3 +731,17 @@ func (h *HarnessRPC) LookupHtlcResolutionAssertErr( return err } + +// Quiesce makes an RPC call to the node's Quiesce method and returns the +// response. +func (h *HarnessRPC) Quiesce( + req *devrpc.QuiescenceRequest) *devrpc.QuiescenceResponse { + + ctx, cancel := context.WithTimeout(h.runCtx, DefaultTimeout) + defer cancel() + + res, err := h.Dev.Quiesce(ctx, req) + h.NoError(err, "Quiesce returned an error") + + return res +} diff --git a/lnwire/features.go b/lnwire/features.go index bc6204f424..eb00d86906 100644 --- a/lnwire/features.go +++ b/lnwire/features.go @@ -171,6 +171,16 @@ const ( // sender-generated preimages according to BOLT XX. AMPOptional FeatureBit = 31 + // QuiescenceRequired is a required feature bit that denotes that a + // connection established with this node must support the quiescence + // protocol if it wants to have a channel relationship. + QuiescenceRequired FeatureBit = 34 + + // QuiescenceOptional is an optional feature bit that denotes that a + // connection established with this node is permitted to use the + // quiescence protocol. + QuiescenceOptional FeatureBit = 35 + // ExplicitChannelTypeRequired is a required bit that denotes that a // connection established with this node is to use explicit channel // commitment types for negotiation instead of the existing implicit @@ -335,6 +345,8 @@ var Features = map[FeatureBit]string{ WumboChannelsOptional: "wumbo-channels", AMPRequired: "amp", AMPOptional: "amp", + QuiescenceRequired: "quiescence", + QuiescenceOptional: "quiescence", PaymentMetadataOptional: "payment-metadata", PaymentMetadataRequired: "payment-metadata", ExplicitChannelTypeOptional: "explicit-commitment-type", diff --git a/lnwire/message.go b/lnwire/message.go index a758db000d..68b09692e5 100644 --- a/lnwire/message.go +++ b/lnwire/message.go @@ -63,6 +63,25 @@ const ( MsgKickoffSig = 777 ) +// IsChannelUpdate is a filter function that discerns channel update messages +// from the other messages in the Lightning Network Protocol. +func (t MessageType) IsChannelUpdate() bool { + switch t { + case MsgUpdateAddHTLC: + return true + case MsgUpdateFulfillHTLC: + return true + case MsgUpdateFailHTLC: + return true + case MsgUpdateFailMalformedHTLC: + return true + case MsgUpdateFee: + return true + default: + return false + } +} + // ErrorEncodeMessage is used when failed to encode the message payload. func ErrorEncodeMessage(err error) error { return fmt.Errorf("failed to encode message to buffer, got %w", err) diff --git a/peer/brontide.go b/peer/brontide.go index 17bef3234e..3bae1be1bc 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -411,6 +411,10 @@ type Config struct { // invalid. DisallowRouteBlinding bool + // DisallowQuiescence is a flag that indicates whether the Brontide + // should have the quiescence feature disabled. + DisallowQuiescence bool + // MaxFeeExposure limits the number of outstanding fees in a channel. // This value will be passed to created links. MaxFeeExposure lnwire.MilliSatoshi @@ -1324,6 +1328,8 @@ func (p *Brontide) addLink(chanPoint *wire.OutPoint, DisallowRouteBlinding: p.cfg.DisallowRouteBlinding, MaxFeeExposure: p.cfg.MaxFeeExposure, ShouldFwdExpEndorsement: p.cfg.ShouldFwdExpEndorsement, + DisallowQuiescence: p.cfg.DisallowQuiescence || + !p.remoteFeatures.HasFeature(lnwire.QuiescenceOptional), } // Before adding our new link, purge the switch of any pending or live diff --git a/peer/test_utils.go b/peer/test_utils.go index 1a62355908..9034bb5a96 100644 --- a/peer/test_utils.go +++ b/peer/test_utils.go @@ -24,6 +24,7 @@ import ( "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lntest/channels" "github.com/lightningnetwork/lnd/lntest/mock" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/lightningnetwork/lnd/lnwire" @@ -477,6 +478,14 @@ func (m *mockUpdateHandler) OnCommitOnce( hook() } +func (m *mockUpdateHandler) InitStfu() <-chan fn.Result[lntypes.ChannelParty] { + // TODO(proofofkeags): Implement + c := make(chan fn.Result[lntypes.ChannelParty], 1) + + c <- fn.Errf[lntypes.ChannelParty]("InitStfu not yet implemented") + + return c +} func newMockConn(t *testing.T, expectedMessages int) *mockMessageConn { return &mockMessageConn{ diff --git a/server.go b/server.go index 3b8224f5b0..c186d36560 100644 --- a/server.go +++ b/server.go @@ -587,6 +587,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, NoTaprootOverlay: !cfg.ProtocolOptions.TaprootOverlayChans, NoRouteBlinding: cfg.ProtocolOptions.NoRouteBlinding(), NoExperimentalEndorsement: cfg.ProtocolOptions.NoExperimentalEndorsement(), + NoQuiescence: cfg.ProtocolOptions.NoQuiescence(), }) if err != nil { return nil, err @@ -4214,6 +4215,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, RequestAlias: s.aliasMgr.RequestAlias, AddLocalAlias: s.aliasMgr.AddLocalAlias, DisallowRouteBlinding: s.cfg.ProtocolOptions.NoRouteBlinding(), + DisallowQuiescence: s.cfg.ProtocolOptions.NoQuiescence(), MaxFeeExposure: thresholdMSats, Quit: s.quit, AuxLeafStore: s.implCfg.AuxLeafStore, diff --git a/subrpcserver_config.go b/subrpcserver_config.go index a4ee6d1a16..9e9295931b 100644 --- a/subrpcserver_config.go +++ b/subrpcserver_config.go @@ -346,6 +346,10 @@ func (s *subRPCServerConfigs) PopulateDependencies(cfg *Config, reflect.ValueOf(graphDB), ) + subCfgValue.FieldByName("Switch").Set( + reflect.ValueOf(htlcSwitch), + ) + case *peersrpc.Config: subCfgValue := extractReflectValue(subCfg)