From 3ec83cc82f8cf8f787ea7ab4809ffc71af613aad Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 19 Jan 2018 17:23:38 -0800 Subject: [PATCH] peer+contractcourt: delegate watching for co-op closes to the chainWatcher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this commit, we modify the interaction between the chanCloser sub-system and the chain notifier all together. This fixes a series of bugs as before this commit, we wouldn’t be able to detect if the remote party actually broadcasted *any* of the transactions that we signed off upon. This would be rejected to the user by having a “zombie” channel close that would never actually be resolved. Rather than the chanCloser watching for on-chain closes, we’ll now open up a co-op close context to the chainWatcher (via a layer of indirection via the ChainArbitrator), and report to it all possible closes that we’ve signed. The chainWatcher will then be able to launch a goroutine to properly update the database state once any of the possible closure transactions confirms. --- chancloser.go | 35 +++++++-- contractcourt/chain_watcher.go | 136 +++++++++++++++++++++++++++++++++ fundingmanager_test.go | 5 +- peer.go | 41 ++++++---- peer_test.go | 16 ++-- 5 files changed, 201 insertions(+), 32 deletions(-) diff --git a/chancloser.go b/chancloser.go index a64e81f4..ea257014 100644 --- a/chancloser.go +++ b/chancloser.go @@ -6,6 +6,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" @@ -141,6 +142,8 @@ type channelCloser struct { // TODO(roasbeef): abstract away closeReq *htlcswitch.ChanClose + closeCtx *contractcourt.CooperativeCloseCtx + // localDeliveryScript is the script that we'll send our settled // channel funds to. localDeliveryScript []byte @@ -155,7 +158,8 @@ type channelCloser struct { // only be populated iff, we're the initiator of this closing request. func newChannelCloser(cfg chanCloseCfg, deliveryScript []byte, idealFeePerkw btcutil.Amount, negotiationHeight uint32, - closeReq *htlcswitch.ChanClose) *channelCloser { + closeReq *htlcswitch.ChanClose, + closeCtx *contractcourt.CooperativeCloseCtx) *channelCloser { // Given the target fee-per-kw, we'll compute what our ideal _total_ // fee will be starting at for this fee negotiation. @@ -191,6 +195,7 @@ func newChannelCloser(cfg chanCloseCfg, deliveryScript []byte, cfg: cfg, negotiationHeight: negotiationHeight, idealFeeSat: idealFeeSat, + closeCtx: closeCtx, localDeliveryScript: deliveryScript, priorFeeOffers: make(map[btcutil.Amount]*lnwire.ClosingSigned), } @@ -459,19 +464,20 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b // being closed within the database. closingTxid := closeTx.TxHash() chanInfo := c.cfg.channel.StateSnapshot() - closeSummary := &channeldb.ChannelCloseSummary{ + c.closeCtx.Finalize(&channeldb.ChannelCloseSummary{ ChanPoint: c.chanPoint, ChainHash: chanInfo.ChainHash, ClosingTXID: closingTxid, + CloseHeight: c.negotiationHeight, RemotePub: &chanInfo.RemoteIdentity, Capacity: chanInfo.Capacity, SettledBalance: finalLocalBalance, CloseType: channeldb.CooperativeClose, + ShortChanID: c.cfg.channel.ShortChanID(), IsPending: true, - } - if err := c.cfg.channel.DeleteState(closeSummary); err != nil { - return nil, false, err - } + }) + + // TODO(roasbeef): don't need, ChainWatcher will handle c.state = closeFinished @@ -507,7 +513,8 @@ func (c *channelCloser) ProcessCloseMsg(msg lnwire.Message) ([]lnwire.Message, b // transaction for a channel based on the prior fee negotiations and our // current compromise fee. func (c *channelCloser) proposeCloseSigned(fee btcutil.Amount) (*lnwire.ClosingSigned, error) { - rawSig, err := c.cfg.channel.CreateCloseProposal( + + rawSig, txid, localAmt, err := c.cfg.channel.CreateCloseProposal( fee, c.localDeliveryScript, c.remoteDeliveryScript, ) if err != nil { @@ -535,6 +542,20 @@ func (c *channelCloser) proposeCloseSigned(fee btcutil.Amount) (*lnwire.ClosingS // accepts our offer. This way, we don't have to re-sign. c.priorFeeOffers[fee] = closeSignedMsg + chanInfo := c.cfg.channel.StateSnapshot() + c.closeCtx.LogPotentialClose(&channeldb.ChannelCloseSummary{ + ChanPoint: c.chanPoint, + ChainHash: chanInfo.ChainHash, + ClosingTXID: *txid, + CloseHeight: c.negotiationHeight, + RemotePub: &chanInfo.RemoteIdentity, + Capacity: chanInfo.Capacity, + SettledBalance: localAmt, + CloseType: channeldb.CooperativeClose, + ShortChanID: c.cfg.channel.ShortChanID(), + IsPending: true, + }) + return closeSignedMsg, nil } diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index 7af988f8..34d1d677 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -617,3 +617,139 @@ func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail return c.chanState.CloseChannel(&closeSummary) } + +// CooperativeCloseContext is a transactional object that's used by external +// parties to initiate a cooperative closure negotiation. During the +// negotiation, we sign multiple versions of a closing transaction, either of +// which may be counter signed and broadcast by the remote party at any time. +// As a result, we'll need to watch the chain to see if any of these confirm, +// only afterwards will we mark the channel as fully closed. +type CooperativeCloseCtx struct { + // potentialCloses is a channel will be used by the party negotiating + // the cooperative closure to send possible closing states to the chain + // watcher to ensure we detect all on-chain spends. + potentialCloses chan *channeldb.ChannelCloseSummary + + activeCloses map[chainhash.Hash]struct{} + + watchCancel chan struct{} + + watcher *chainWatcher + + sync.Mutex +} + +// BeginCooperativeClose should be called by the party negotiating the +// cooperative closure before the first signature is sent to the remote party. +// This will return a context that should be used to communicate possible +// closing states so we can act on them. +func (c *chainWatcher) BeginCooperativeClose() *CooperativeCloseCtx { + // We'll simply return a new close context that will be used be the + // caller to notify us of potential closes. + return &CooperativeCloseCtx{ + potentialCloses: make(chan *channeldb.ChannelCloseSummary), + watchCancel: make(chan struct{}), + activeCloses: make(map[chainhash.Hash]struct{}), + watcher: c, + } +} + +// LogPotentialClose should be called by the party negotiating the cooperative +// closure once they signed a new state, but *before* they transmit it to the +// remote party. This will ensure that the chain watcher is able to log the new +// state it should watch the chain for. +func (c *CooperativeCloseCtx) LogPotentialClose(potentialClose *channeldb.ChannelCloseSummary) { + c.Lock() + defer c.Unlock() + + // We'll check to see if we're already watching for a close of this + // channel, if so, then we'll exit early to avoid launching a duplicate + // goroutine. + if _, ok := c.activeCloses[potentialClose.ClosingTXID]; ok { + return + } + + // Otherwise, we'll mark this txid as currently being watched. + c.activeCloses[potentialClose.ClosingTXID] = struct{}{} + + // We'll take this potential close, and launch a goroutine which will + // wait until it's confirmed, then update the database state. When a + // potential close gets confirmed, we'll cancel out all other launched + // goroutines. + go func() { + confNtfn, err := c.watcher.notifier.RegisterConfirmationsNtfn( + &potentialClose.ClosingTXID, 1, + uint32(potentialClose.CloseHeight), + ) + if err != nil { + log.Errorf("unable to register for conf: %v", err) + return + } + + log.Infof("Waiting for txid=%v to close ChannelPoint(%v) on chain", + potentialClose.ClosingTXID, c.watcher.chanState.FundingOutpoint) + + select { + case confInfo, ok := <-confNtfn.Confirmed: + if !ok { + log.Errorf("notifier exiting") + return + } + + log.Infof("ChannelPoint(%v) is fully closed, at "+ + "height: %v", c.watcher.chanState.FundingOutpoint, + confInfo.BlockHeight) + + close(c.watchCancel) + + c.watcher.Lock() + for _, sub := range c.watcher.clientSubscriptions { + select { + case sub.CooperativeClosure <- struct{}{}: + case <-c.watcher.quit: + } + } + c.watcher.Unlock() + + err := c.watcher.chanState.CloseChannel(potentialClose) + if err != nil { + log.Warnf("unable to update latest close for "+ + "ChannelPoint(%v)", + c.watcher.chanState.FundingOutpoint) + } + + err = c.watcher.markChanClosed() + if err != nil { + log.Errorf("unable to mark chan fully "+ + "closed: %v", err) + return + } + + case <-c.watchCancel: + log.Debugf("Exiting watch for close of txid=%v for "+ + "ChannelPoint(%v)", potentialClose.ClosingTXID, + c.watcher.chanState.FundingOutpoint) + + case <-c.watcher.quit: + return + } + }() +} + +// Finalize should be called once both parties agree on a final transaction to +// close out the channel. This method will immediately mark the channel as +// pending closed in the database, then launch a goroutine to mark the channel +// fully closed upon confirmation. +func (c *CooperativeCloseCtx) Finalize(preferredClose *channeldb.ChannelCloseSummary) error { + log.Infof("Finalizing chan close for ChannelPoint(%v)", + c.watcher.chanState.FundingOutpoint) + + err := c.watcher.chanState.CloseChannel(preferredClose) + if err != nil { + return err + } + + go c.LogPotentialClose(preferredClose) + + return nil +} diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 68a4fb0c..c0eebfe6 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -230,7 +230,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { return lnwire.NodeAnnouncement{}, nil }, - ArbiterChan: arbiterChan, SendToPeer: func(target *btcec.PublicKey, msgs ...lnwire.Message) error { select { case sentMessages <- msgs[0]: @@ -257,7 +256,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, return lnwallet.NewLightningChannel( signer, nil, - nil, channel) } } @@ -271,7 +269,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, RequiredRemoteDelay: func(amt btcutil.Amount) uint16 { return 4 }, - ArbitrateNewChan: func(*channeldb.OpenChannel) error { + WatchNewChannel: func(*channeldb.OpenChannel) error { return nil }, }) @@ -330,7 +328,6 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { return lnwire.NodeAnnouncement{}, nil }, - ArbiterChan: oldCfg.ArbiterChan, SendToPeer: func(target *btcec.PublicKey, msgs ...lnwire.Message) error { select { diff --git a/peer.go b/peer.go index eef29233..7436bc20 100644 --- a/peer.go +++ b/peer.go @@ -1425,6 +1425,18 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) (*channelCloser, e return nil, err } + // Before we create the chan closer, we'll start a new + // cooperative channel closure transaction from the chain arb. + // Wtih this context, we'll ensure that we're able to respond + // if *any* of the transactions we sign off on are ever + // braodacast. + closeCtx, err := p.server.chainArb.BeginCoopChanClose( + *channel.ChannelPoint(), + ) + if err != nil { + return nil, err + } + chanCloser = newChannelCloser( chanCloseCfg{ channel: channel, @@ -1437,6 +1449,7 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) (*channelCloser, e targetFeePerKw, uint32(startingHeight), nil, + closeCtx, ) p.activeChanCloses[chanID] = chanCloser } @@ -1479,7 +1492,14 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) { return } - _, startingHeight, err := p.server.cc.chainIO.GetBestBlock() + // Before we create the chan closer, we'll start a new + // cooperative channel closure transaction from the chain arb. + // Wtih this context, we'll ensure that we're able to respond + // if *any* of the transactions we sign off on are ever + // braodacast. + closeCtx, err := p.server.chainArb.BeginCoopChanClose( + *channel.ChannelPoint(), + ) if err != nil { peerLog.Errorf(err.Error()) req.Err <- err @@ -1488,6 +1508,12 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) { // Next, we'll create a new channel closer state machine to // handle the close negotiation. + _, startingHeight, err := p.server.cc.chainIO.GetBestBlock() + if err != nil { + peerLog.Errorf(err.Error()) + req.Err <- err + return + } chanCloser := newChannelCloser( chanCloseCfg{ channel: channel, @@ -1500,6 +1526,7 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) { req.TargetFeePerKw, uint32(startingHeight), req, + closeCtx, ) p.activeChanCloses[chanID] = chanCloser @@ -1591,18 +1618,6 @@ func (p *peer) finalizeChanClosure(chanCloser *channelCloser) { go waitForChanToClose(chanCloser.negotiationHeight, notifier, errChan, chanPoint, &closingTxid, func() { - - // First, we'll mark the database as being fully closed - // so we'll no longer watch for its ultimate closure - // upon startup. - err := p.server.chanDB.MarkChanFullyClosed(chanPoint) - if err != nil { - if closeReq != nil { - closeReq.Err <- err - } - return - } - // Respond to the local subsystem which requested the // channel closure. if closeReq != nil { diff --git a/peer_test.go b/peer_test.go index ad7c8fbc..c963b054 100644 --- a/peer_test.go +++ b/peer_test.go @@ -88,7 +88,7 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) { // We accept the fee, and send a ClosingSigned with the same fee back, // so she knows we agreed. peerFee := responderClosingSigned.FeeSatoshis - initiatorSig, err := initiatorChan.CreateCloseProposal( + initiatorSig, _, _, err := initiatorChan.CreateCloseProposal( peerFee, dummyDeliveryScript, respDeliveryScript, ) if err != nil { @@ -178,7 +178,7 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) { t.Fatalf("unable to query fee estimator: %v", err) } fee := btcutil.Amount(responderChan.CalcFee(uint64(feeRate * 1000))) - closeSig, err := responderChan.CreateCloseProposal(fee, + closeSig, _, _, err := responderChan.CreateCloseProposal(fee, dummyDeliveryScript, initiatorDeliveryScript) if err != nil { t.Fatalf("unable to create close proposal: %v", err) @@ -287,7 +287,7 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) { // We don't agree with the fee, and will send back one that's 2.5x. preferredRespFee := responderClosingSigned.FeeSatoshis increasedFee := btcutil.Amount(float64(preferredRespFee) * 2.5) - initiatorSig, err := initiatorChan.CreateCloseProposal( + initiatorSig, _, _, err := initiatorChan.CreateCloseProposal( increasedFee, dummyDeliveryScript, respDeliveryScript, ) if err != nil { @@ -331,7 +331,7 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) { // We try negotiating a 2.1x fee, which should also be rejected. increasedFee = btcutil.Amount(float64(preferredRespFee) * 2.1) - initiatorSig, err = initiatorChan.CreateCloseProposal( + initiatorSig, _, _, err = initiatorChan.CreateCloseProposal( increasedFee, dummyDeliveryScript, respDeliveryScript, ) if err != nil { @@ -376,7 +376,7 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) { // Finally, we'll accept the fee by echoing back the same fee that they // sent to us. - initiatorSig, err = initiatorChan.CreateCloseProposal( + initiatorSig, _, _, err = initiatorChan.CreateCloseProposal( peerFee, dummyDeliveryScript, respDeliveryScript, ) if err != nil { @@ -471,7 +471,7 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) { uint64(initiatorIdealFeeRate * 1000), ) increasedFee := btcutil.Amount(float64(initiatorIdealFee) * 2.5) - closeSig, err := responderChan.CreateCloseProposal( + closeSig, _, _, err := responderChan.CreateCloseProposal( increasedFee, dummyDeliveryScript, initiatorDeliveryScript, ) if err != nil { @@ -536,7 +536,7 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) { // We try negotiating a 2.1x fee, which should also be rejected. increasedFee = btcutil.Amount(float64(initiatorIdealFee) * 2.1) - responderSig, err := responderChan.CreateCloseProposal( + responderSig, _, _, err := responderChan.CreateCloseProposal( increasedFee, dummyDeliveryScript, initiatorDeliveryScript, ) if err != nil { @@ -582,7 +582,7 @@ func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) { // At this point, we'll accept their fee by sending back a CloseSigned // message with an identical fee. - responderSig, err = responderChan.CreateCloseProposal( + responderSig, _, _, err = responderChan.CreateCloseProposal( peerFee, dummyDeliveryScript, initiatorDeliveryScript, ) if err != nil {