diff --git a/breacharbiter.go b/breacharbiter.go index 6db6ee0c..eb5c010d 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -26,6 +26,7 @@ type breachArbiter struct { wallet *lnwallet.LightningWallet db *channeldb.DB notifier chainntnfs.ChainNotifier + chainIO lnwallet.BlockChainIO htlcSwitch *htlcSwitch // breachObservers is a map which tracks all the active breach @@ -62,12 +63,14 @@ type breachArbiter struct { // newBreachArbiter creates a new instance of a breachArbiter initialized with // its dependent objects. func newBreachArbiter(wallet *lnwallet.LightningWallet, db *channeldb.DB, - notifier chainntnfs.ChainNotifier, h *htlcSwitch) *breachArbiter { + notifier chainntnfs.ChainNotifier, h *htlcSwitch, + chain lnwallet.BlockChainIO) *breachArbiter { return &breachArbiter{ wallet: wallet, db: db, notifier: notifier, + chainIO: chain, htlcSwitch: h, breachObservers: make(map[wire.OutPoint]chan struct{}), @@ -120,6 +123,12 @@ func (b *breachArbiter) Start() error { b.wg.Add(1) go b.contractObserver(channelsToWatch) + // TODO(roasbeef): instead use closure height of channel + _, currentHeight, err := b.chainIO.GetBestBlock() + if err != nil { + return err + } + // Additionally, we'll also want to retrieve any pending close or force // close transactions to we can properly mark them as resolved in the // database. @@ -142,7 +151,9 @@ func (b *breachArbiter) Start() error { chanPoint := &pendingClose.ChanPoint closeTXID := &pendingClose.ClosingTXID - confNtfn, err := b.notifier.RegisterConfirmationsNtfn(closeTXID, 1) + confNtfn, err := b.notifier.RegisterConfirmationsNtfn( + closeTXID, 1, uint32(currentHeight), + ) if err != nil { return err } @@ -208,17 +219,27 @@ func (b *breachArbiter) contractObserver(activeChannels []*lnwallet.LightningCha go b.breachObserver(channel, settleSignal) } + // TODO(roasbeef): need to ensure currentHeight passed in doesn't + // result in lost notification + out: for { select { case breachInfo := <-b.breachedContracts: + _, currentHeight, err := b.chainIO.GetBestBlock() + if err != nil { + brarLog.Errorf("unable to get best height: %v", err) + } + // A new channel contract has just been breached! We // first register for a notification to be dispatched // once the breach transaction (the revoked commitment // transaction) has been confirmed in the chain to // ensure we're not dealing with a moving target. breachTXID := &breachInfo.commitHash - confChan, err := b.notifier.RegisterConfirmationsNtfn(breachTXID, 1) + confChan, err := b.notifier.RegisterConfirmationsNtfn( + breachTXID, 1, uint32(currentHeight), + ) if err != nil { brarLog.Errorf("unable to register for conf for txid: %v", breachTXID) @@ -336,6 +357,12 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, return spew.Sdump(justiceTx) })) + _, currentHeight, err := b.chainIO.GetBestBlock() + if err != nil { + brarLog.Errorf("unable to get current height: %v", err) + return + } + // Finally, broadcast the transaction, finalizing the channels' // retribution against the cheating counterparty. if err := b.wallet.PublishTransaction(justiceTx); err != nil { @@ -349,7 +376,8 @@ func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, // notify the caller that initiated the retribution workflow that the // deed has been done. justiceTXID := justiceTx.TxHash() - confChan, err = b.notifier.RegisterConfirmationsNtfn(&justiceTXID, 1) + confChan, err = b.notifier.RegisterConfirmationsNtfn(&justiceTXID, 1, + uint32(currentHeight)) if err != nil { brarLog.Errorf("unable to register for conf for txid: %v", justiceTXID) @@ -427,14 +455,16 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, // // TODO(roasbeef): also notify utxoNursery, might've had // outbound HTLC's in flight - go waitForChanToClose(b.notifier, nil, chanPoint, closeInfo.SpenderTxHash, func() { - brarLog.Infof("Force closed ChannelPoint(%v) is "+ - "fully closed, updating DB", chanPoint) + go waitForChanToClose(uint32(closeInfo.SpendingHeight), b.notifier, + nil, chanPoint, closeInfo.SpenderTxHash, func() { - if err := b.db.MarkChanFullyClosed(chanPoint); err != nil { - brarLog.Errorf("unable to mark chan as closed: %v", err) - } - }) + brarLog.Infof("Force closed ChannelPoint(%v) is "+ + "fully closed, updating DB", chanPoint) + + if err := b.db.MarkChanFullyClosed(chanPoint); err != nil { + brarLog.Errorf("unable to mark chan as closed: %v", err) + } + }) // A read from this channel indicates that a channel breach has been // detected! So we notify the main coordination goroutine with the diff --git a/peer.go b/peer.go index 012fb3d1..b2902f02 100644 --- a/peer.go +++ b/peer.go @@ -921,30 +921,39 @@ func (p *peer) handleLocalClose(req *closeLinkReq) { }, } + _, bestHeight, err := p.server.bio.GetBestBlock() + if err != nil { + req.err <- err + return + } + // Finally, launch a goroutine which will request to be notified by the // ChainNotifier once the closure transaction obtains a single // confirmation. notifier := p.server.chainNotifier - go waitForChanToClose(notifier, req.err, req.chanPoint, closingTxid, func() { - // First, we'll mark the database as being fully closed so - // we'll no longer watch for its ultimate closure upon startup. - err := p.server.chanDB.MarkChanFullyClosed(req.chanPoint) - if err != nil { - req.err <- err - return - } + go waitForChanToClose(uint32(bestHeight), notifier, req.err, + req.chanPoint, closingTxid, func() { - // Respond to the local subsystem which requested the channel - // closure. - req.updates <- &lnrpc.CloseStatusUpdate{ - Update: &lnrpc.CloseStatusUpdate_ChanClose{ - ChanClose: &lnrpc.ChannelCloseUpdate{ - ClosingTxid: closingTxid[:], - Success: true, + // First, we'll mark the database as being fully closed + // so we'll no longer watch for its ultimate closure + // upon startup. + err := p.server.chanDB.MarkChanFullyClosed(req.chanPoint) + if err != nil { + req.err <- err + return + } + + // Respond to the local subsystem which requested the + // channel closure. + req.updates <- &lnrpc.CloseStatusUpdate{ + Update: &lnrpc.CloseStatusUpdate_ChanClose{ + ChanClose: &lnrpc.ChannelCloseUpdate{ + ClosingTxid: closingTxid[:], + Success: true, + }, }, - }, - } - }) + } + }) } // handleRemoteClose completes a request for cooperative channel closure @@ -978,6 +987,15 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) { newLogClosure(func() string { return spew.Sdump(closeTx) })) + if err != nil { + peerLog.Errorf("unable to get current height: %v", err) + return + } + + _, bestHeight, err := p.server.bio.GetBestBlock() + if err != nil { + peerLog.Errorf("unable to get best height: %v", err) + } // Finally, broadcast the closure transaction, to the network. err = p.server.lnwallet.PublishTransaction(closeTx) @@ -1022,8 +1040,8 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) { // confirmation of the closing transaction, and mark the channel as // such within the database (once it's confirmed"). notifier := p.server.chainNotifier - go waitForChanToClose(notifier, nil, chanPoint, &closeTxid, - func() { + go waitForChanToClose(uint32(bestHeight), notifier, nil, chanPoint, + &closeTxid, func() { // Now that the closing transaction has been confirmed, // we'll mark the database as being fully closed so now // that we no longer watch for its ultimate closure @@ -1042,12 +1060,13 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) { // following actions: the channel point will be sent over the settleChan, and // finally the callback will be executed. If any error is encountered within // the function, then it will be sent over the errChan. -func waitForChanToClose(notifier chainntnfs.ChainNotifier, +func waitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier, errChan chan error, chanPoint *wire.OutPoint, closingTxID *chainhash.Hash, cb func()) { // TODO(roasbeef): add param for num needed confs - confNtfn, err := notifier.RegisterConfirmationsNtfn(closingTxID, 1) + confNtfn, err := notifier.RegisterConfirmationsNtfn(closingTxID, 1, + bestHeight) if err != nil && errChan != nil { errChan <- err return diff --git a/rpcserver.go b/rpcserver.go index ef80dca9..e6422a7b 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -502,6 +502,11 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, return err } + _, bestHeight, err := r.server.bio.GetBestBlock() + if err != nil { + return err + } + // As we're force closing this channel, as a precaution, we'll // ensure that the switch doesn't continue to see this channel // as eligible for forwarding HTLC's. If the peer is online, @@ -537,18 +542,19 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, errChan = make(chan error, 1) notifier := r.server.chainNotifier - go waitForChanToClose(notifier, errChan, chanPoint, closingTxid, func() { - // Respond to the local subsystem which requested the - // channel closure. - updateChan <- &lnrpc.CloseStatusUpdate{ - Update: &lnrpc.CloseStatusUpdate_ChanClose{ - ChanClose: &lnrpc.ChannelCloseUpdate{ - ClosingTxid: closingTxid[:], - Success: true, + go waitForChanToClose(uint32(bestHeight), notifier, errChan, chanPoint, + closingTxid, func() { + // Respond to the local subsystem which + // requested the channel closure. + updateChan <- &lnrpc.CloseStatusUpdate{ + Update: &lnrpc.CloseStatusUpdate_ChanClose{ + ChanClose: &lnrpc.ChannelCloseUpdate{ + ClosingTxid: closingTxid[:], + Success: true, + }, }, - }, - } - }) + } + }) // TODO(roasbeef): utxo nursery marks as fully closed