diff --git a/channeldb/channel.go b/channeldb/channel.go index 56278e17..a04146ea 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -789,6 +789,12 @@ const ( // BreachClose indicates that one peer attempted to broadcast a prior // _revoked_ channel state. BreachClose + + // FundingCanceled indicates that the channel never was fully opened before it + // was marked as closed in the database. This can happen if we or the remote + // fail at some point during the opening workflow, or we timeout waiting for + // the funding transaction to be confirmed. + FundingCanceled ) // ChannelCloseSummary contains the final state of a channel at the point it diff --git a/fundingmanager.go b/fundingmanager.go index dfed0f2c..94877782 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -3,12 +3,14 @@ package main import ( "bytes" "encoding/binary" + "fmt" "sync" "sync/atomic" "time" "golang.org/x/crypto/salsa20" + "github.com/boltdb/bolt" "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" @@ -38,6 +40,11 @@ const ( // // TODO(roasbeef): add command line param to modify maxFundingAmount = btcutil.Amount(1 << 24) + + // maxWaitNumBlocksFundingConf is the maximum number of blocks to wait + // for the funding transaction to be confirmed before forgetting about + // the channel. 288 blocks is ~48 hrs + maxWaitNumBlocksFundingConf = 288 ) // reservationWithCtx encapsulates a pending channel reservation. This wrapper @@ -283,6 +290,35 @@ type fundingManager struct { wg sync.WaitGroup } +// channelOpeningState represents the different states a channel can be in +// between the funding transaction has been confirmed and the channel is +// announced to the network and ready to be used. +type channelOpeningState uint8 + +const ( + // markedOpen is the opening state of a channel if the funding + // transaction is confirmed on-chain, but fundingLocked is not yet + // successfully sent to the other peer. + markedOpen channelOpeningState = iota + + // fundingLockedSent is the opening state of a channel if the + // fundingLocked message has successfully been sent to the other peer, + // but we still haven't announced the channel to the network. + fundingLockedSent +) + +var ( + // channelOpeningStateBucket is the database bucket used to store the + // channelOpeningState for each channel that is currently in the process + // of being opened. + channelOpeningStateBucket = []byte("channelOpeningState") + + // ErrChannelNotFound is returned when we are looking for a specific + // channel opening state in the FundingManager's internal database, but + // the channel in question is not considered being in an opening state. + ErrChannelNotFound = fmt.Errorf("channel not found in db") +) + // newFundingManager creates and initializes a new instance of the // fundingManager. func newFundingManager(cfg fundingConfig) (*fundingManager, error) { @@ -335,7 +371,101 @@ func (f *fundingManager) Start() error { f.localDiscoverySignals[chanID] = make(chan struct{}) doneChan := make(chan struct{}) - go f.waitForFundingConfirmation(channel, doneChan) + timeoutChan := make(chan struct{}) + + go func(ch *channeldb.OpenChannel) { + go f.waitForFundingWithTimeout(ch, doneChan, timeoutChan) + + select { + case <-timeoutChan: + // Timeout waiting for the funding transaction + // to confirm, so we forget the channel and + // delete it from the database. + closeInfo := &channeldb.ChannelCloseSummary{ + ChanPoint: ch.FundingOutpoint, + RemotePub: ch.IdentityPub, + CloseType: channeldb.FundingCanceled, + } + if err := ch.CloseChannel(closeInfo); err != nil { + fndgLog.Errorf("Failed closing channel "+ + "%v: %v", ch.FundingOutpoint, err) + } + case <-f.quit: + // The fundingManager is shutting down, and will + // resume wait on startup. + case <-doneChan: + // Success, funding transaction was confirmed. + } + }(channel) + } + + // Fetch all our open channels, and make sure they all finalized the + // opening process. + // TODO(halseth): this check is only done on restart atm, but should + // also be done if a peer that disappeared during the opening process + // reconnects. + openChannels, err := f.cfg.Wallet.Cfg.Database.FetchAllChannels() + if err != nil { + return err + } + + for _, channel := range openChannels { + channelState, shortChanID, err := f.getChannelOpeningState( + &channel.FundingOutpoint) + if err == ErrChannelNotFound { + // Channel not in fundingManager's opening database, + // meaning it was successully announced to the network. + continue + } else if err != nil { + return err + } + + fndgLog.Debugf("channel with opening state %v found", + channelState) + + chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint) + f.localDiscoverySignals[chanID] = make(chan struct{}) + + // If we did find the channel in the opening state database, we + // have seen the funding transaction being confirmed, but we + // did not finish the rest of the setup procedure before we shut + // down. We handle the remaining steps of this setup by + // continuing the procedure where we left off. + switch channelState { + case markedOpen: + // The funding transaction was confirmed, but we did not + // successfully send the fundingLocked message to the + // peer, so let's do that now. + f.wg.Add(1) + go func() { + defer f.wg.Done() + f.sendFundingLockedAndAnnounceChannel(channel, + shortChanID) + }() + + case fundingLockedSent: + // fundingLocked was sent to peer, but the channel + // announcement was not sent. + f.wg.Add(1) + go func() { + defer f.wg.Done() + + lnChannel, err := lnwallet.NewLightningChannel( + nil, nil, f.cfg.FeeEstimator, channel) + if err != nil { + fndgLog.Errorf("error creating "+ + "lightning channel: %v", err) + } + defer lnChannel.Stop() + + f.sendChannelAnnouncement(channel, lnChannel, + shortChanID) + }() + + default: + fndgLog.Errorf("undefined channelState: %v", + channelState) + } } f.wg.Add(1) // TODO(roasbeef): tune @@ -877,6 +1007,8 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // With all the necessary data available, attempt to advance the // funding workflow to the next stage. If this succeeds then the // funding transaction will broadcast after our next message. + // CompleteReservationSingle will also mark the channel as 'IsPending' + // in the database. commitSig := fmsg.msg.CommitSig.Serialize() completeChan, err := resCtx.reservation.CompleteReservationSingle( &fundingOut, commitSig) @@ -887,6 +1019,22 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { return } + // If something goes wrong before the funding transaction is confirmed, + // we use this convenience method to delete the pending OpenChannel + // from the database. + deleteFromDatabase := func() { + closeInfo := &channeldb.ChannelCloseSummary{ + ChanPoint: completeChan.FundingOutpoint, + RemotePub: completeChan.IdentityPub, + CloseType: channeldb.FundingCanceled, + } + + if err := completeChan.CloseChannel(closeInfo); err != nil { + fndgLog.Errorf("Failed closing channel %v: %v", + completeChan.FundingOutpoint, err) + } + } + // A new channel has almost finished the funding process. In order to // properly synchronize with the writeHandler goroutine, we add a new // channel to the barriers map which will be closed once the channel is @@ -910,6 +1058,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { if err != nil { fndgLog.Errorf("unable to parse signature: %v", err) cancelReservation() + deleteFromDatabase() return } @@ -920,6 +1069,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { if err := f.cfg.SendToPeer(peerKey, fundingSigned); err != nil { fndgLog.Errorf("unable to send FundingSigned message: %v", err) cancelReservation() + deleteFromDatabase() return } @@ -933,12 +1083,37 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { // With this last message, our job as the responder is now complete. // We'll wait for the funding transaction to reach the specified number // of confirmations, then start normal operations. + // + // When we get to this point we have sent the signComplete message to + // the channel funder, and BOLT#2 specifies that we MUST remember the + // channel for reconnection. The channel is already marked + // as pending in the database, so in case of a disconnect or restart, + // we will continue waiting for the confirmation the next time we start + // the funding manager. In case the funding transaction never appears + // on the blockchain, we must forget this channel. We therefore + // completely forget about this channel if we haven't seen the funding + // transaction in 288 blocks (~ 48 hrs), by canceling the reservation + // and canceling the wait for the funding confirmation. go func() { doneChan := make(chan struct{}) - go f.waitForFundingConfirmation(completeChan, doneChan) + timeoutChan := make(chan struct{}) + go f.waitForFundingWithTimeout(completeChan, doneChan, + timeoutChan) - <-doneChan - f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID) + select { + case <-timeoutChan: + // We did not see the funding confirmation before + // timeout, so we forget the channel. + cancelReservation() + deleteFromDatabase() + case <-f.quit: + // The fundingManager is shutting down, will resume + // wait for funding transaction on startup. + case <-doneChan: + // Success, funding transaction was confirmed. + f.deleteReservationCtx(peerKey, + fmsg.msg.PendingChannelID) + } }() } @@ -1021,7 +1196,18 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { go func() { doneChan := make(chan struct{}) - go f.waitForFundingConfirmation(completeChan, doneChan) + cancelChan := make(chan struct{}) + + // In case the fundingManager is stopped at some point during + // the remaining part of the opening process, we must wait for + // this process to finish (either successully or with some + // error), before the fundingManager can be shut down. + f.wg.Add(1) + go func() { + defer f.wg.Done() + f.waitForFundingConfirmation(completeChan, cancelChan, + doneChan) + }() select { case <-f.quit: @@ -1047,13 +1233,75 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { }() } +// waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation that +// will cancel the wait for confirmation if maxWaitNumBlocksFundingConf has +// passed from bestHeight. In the case of timeout, the timeoutChan will be +// closed. In case of confirmation or error, doneChan will be closed. +func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel, + doneChan chan<- struct{}, timeoutChan chan<- struct{}) { + + epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn() + if err != nil { + fndgLog.Errorf("unable to register for epoch notification: %v", + err) + close(doneChan) + return + } + + waitingDoneChan := make(chan struct{}) + cancelChan := make(chan struct{}) + + // Add this goroutine to wait group so we can be sure that it is + // properly stopped before the funding manager can be shut down. + f.wg.Add(1) + go func() { + defer f.wg.Done() + f.waitForFundingConfirmation(completeChan, cancelChan, + waitingDoneChan) + }() + + // On block maxHeight we will cancel the funding confirmation wait. + maxHeight := completeChan.FundingBroadcastHeight + maxWaitNumBlocksFundingConf + for { + select { + case epoch, ok := <-epochClient.Epochs: + if !ok { + fndgLog.Warnf("Epoch client shutting down") + return + } + + if uint32(epoch.Height) >= maxHeight { + fndgLog.Warnf("waited for %v blocks without "+ + "seeing funding transaction confirmed,"+ + " cancelling.", maxWaitNumBlocksFundingConf) + + // Cancel the waitForFundingConfirmation + // goroutine. + close(cancelChan) + + // Notify the caller of the timeout. + close(timeoutChan) + return + } + case <-f.quit: + // The fundingManager is shutting down, will resume + // waiting for the funding transaction on startup. + return + case <-waitingDoneChan: + close(doneChan) + return + } + } +} + // waitForFundingConfirmation handles the final stages of the channel funding // process once the funding transaction has been broadcast. The primary // function of waitForFundingConfirmation is to wait for blockchain // confirmation, and then to notify the other systems that must be notified // when a channel has become active for lightning transactions. +// The wait can be canceled by closing the cancelChan. func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.OpenChannel, - doneChan chan struct{}) { + cancelChan <-chan struct{}, doneChan chan<- struct{}) { defer close(doneChan) @@ -1072,9 +1320,25 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open fndgLog.Infof("Waiting for funding tx (%v) to reach %v confirmations", txid, numConfs) + var confDetails *chainntnfs.TxConfirmation + var ok bool + // Wait until the specified number of confirmations has been reached, - // or the wallet signals a shutdown. - confDetails, ok := <-confNtfn.Confirmed + // we get a cancel signal, or the wallet signals a shutdown. + select { + case confDetails, ok = <-confNtfn.Confirmed: + // fallthrough + case <-cancelChan: + fndgLog.Warnf("canceled waiting for funding confirmation, "+ + "stopping funding flow for ChannelPoint(%v)", + completeChan.FundingOutpoint) + return + case <-f.quit: + fndgLog.Warnf("fundingManager shutting down, stopping funding "+ + "flow for ChannelPoint(%v)", completeChan.FundingOutpoint) + return + } + if !ok { fndgLog.Warnf("ChainNotifier shutting down, cannot complete "+ "funding flow for ChannelPoint(%v)", @@ -1110,6 +1374,35 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open // TODO(roasbeef): ideally persistent state update for chan above // should be abstracted + // The funding transaction now being confirmed, we add this channel to + // the fundingManager's internal persistant state machine that we use + // to track the remaining process of the channel opening. This is useful + // to resume the opening process in case of restarts. + // + // TODO(halseth): make the two db transactions (MarkChannelAsOpen and + // saveChannelOpeningState) atomic by doing them in the same transaction. + // Needed to be properly fault-tolerant. + err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, markedOpen, + &shortChanID) + if err != nil { + fndgLog.Errorf("error setting channel state to markedOpen: %v", + err) + return + } + + // Now that the funding transaction has the required number of + // confirmations, we send the fundingLocked message to the peer. + f.sendFundingLockedAndAnnounceChannel(completeChan, &shortChanID) +} + +// sendFundingLockedAndAnnounceChannel creates and sends the fundingLocked +// message, and then the channel announcement. This should be called after the +// funding transaction has been confirmed, and the channelState is 'markedOpen'. +func (f *fundingManager) sendFundingLockedAndAnnounceChannel( + completeChan *channeldb.OpenChannel, shortChanID *lnwire.ShortChannelID) { + + chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint) + // With the channel marked open, we'll create the state-machine object // which wraps the database state. channel, err := lnwallet.NewLightningChannel(nil, nil, @@ -1130,16 +1423,60 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open return } fundingLockedMsg := lnwire.NewFundingLocked(chanID, nextRevocation) - f.cfg.SendToPeer(completeChan.IdentityPub, fundingLockedMsg) - fndgLog.Infof("Announcing ChannelPoint(%v), short_chan_id=%v", fundingPoint, - spew.Sdump(shortChanID)) + err = f.cfg.SendToPeer(completeChan.IdentityPub, fundingLockedMsg) + if err != nil { + fndgLog.Errorf("unable to send fundingLocked to peer: %v", err) + return + } + + // As the fundingLocked message is now sent to the peer, the channel is + // moved to the next state of the state machine. It will be moved to the + // last state (actually deleted from the database) after the channel is + // finally announced. + err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, fundingLockedSent, + shortChanID) + if err != nil { + fndgLog.Errorf("error setting channel state to "+ + "fundingLockedSent: %v", err) + return + } + + f.sendChannelAnnouncement(completeChan, channel, shortChanID) +} + +// sendChannelAnnouncement broadcast the neccessary channel announcement +// messages to the network. Should be called after the fundingLocked message is +// sent (channelState is 'fundingLockedSent') and the channel is ready to be +// used. +func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenChannel, + channel *lnwallet.LightningChannel, shortChanID *lnwire.ShortChannelID) { + + chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint) + fundingPoint := completeChan.FundingOutpoint + + fndgLog.Infof("Announcing ChannelPoint(%v), short_chan_id=%v", + &fundingPoint, spew.Sdump(shortChanID)) // Register the new link with the L3 routing manager so this new // channel can be utilized during path finding. - go f.announceChannel(f.cfg.IDKey, completeChan.IdentityPub, + err := f.announceChannel(f.cfg.IDKey, completeChan.IdentityPub, channel.LocalFundingKey, channel.RemoteFundingKey, - shortChanID, chanID) + *shortChanID, chanID) + if err != nil { + fndgLog.Errorf("channel announcement failed: %v", err) + return + } + + // After the channel is successully announced from the fundingManager, + // we delete the channel from our internal database. We can do this + // because we assume the AuthenticatedGossiper queues the announcement + // messages, and persists them in case of a daemon shutdown. + err = f.deleteChannelOpeningState(&completeChan.FundingOutpoint) + if err != nil { + fndgLog.Errorf("error deleting channel state: %v", err) + return + } // Finally, as the local channel discovery has been fully processed, // we'll trigger the signal indicating that it's safe for any funding @@ -1380,9 +1717,11 @@ func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.Pu // the network to recognize the legitimacy of the channel. The crafted // announcements are then sent to the channel router to handle broadcasting to // the network during its next trickle. +// This method is synchronous and will return when all the network requests +// finish, either successfully or with an error. func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKey, remoteFundingKey *btcec.PublicKey, shortChanID lnwire.ShortChannelID, - chanID lnwire.ChannelID) { + chanID lnwire.ChannelID) error { // First, we'll create the batch of announcements to be sent upon // initial channel creation. This includes the channel announcement @@ -1392,7 +1731,7 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe localFundingKey, remoteFundingKey, shortChanID, chanID) if err != nil { fndgLog.Errorf("can't generate channel announcement: %v", err) - return + return err } // With the announcements crafted, we'll now send the announcements to @@ -1400,9 +1739,21 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe // // TODO(roasbeef): add flag that indicates if should be announced or // not - f.cfg.SendAnnouncement(ann.chanAnn) - f.cfg.SendAnnouncement(ann.chanUpdateAnn) - f.cfg.SendAnnouncement(ann.chanProof) + + // The announcement message consists of three distinct messages: + // 1. channel announcement 2. channel update 3. channel proof + // We must wait for them all to be successfully announced to the + // network, and/ if either fails we consider the announcement + // unsuccessful. + if err = f.cfg.SendAnnouncement(ann.chanAnn); err != nil { + return err + } + if err = f.cfg.SendAnnouncement(ann.chanUpdateAnn); err != nil { + return err + } + if err = f.cfg.SendAnnouncement(ann.chanProof); err != nil { + return err + } // Now that the channel is announced to the network, we will also // obtain and send a node announcement. This is done since a node @@ -1411,9 +1762,13 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe nodeAnn, err := f.cfg.CurrentNodeAnnouncement() if err != nil { fndgLog.Errorf("can't generate node announcement: %v", err) - return + return err } - f.cfg.SendAnnouncement(&nodeAnn) + + if err = f.cfg.SendAnnouncement(&nodeAnn); err != nil { + return err + } + return nil } // initFundingWorkflow sends a message to the funding manager instructing it @@ -1657,3 +2012,90 @@ func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey { Y: pub.Y, } } + +// saveChannelOpeningState saves the channelOpeningState for the provided +// chanPoint to the channelOpeningStateBucket. +func (f *fundingManager) saveChannelOpeningState(chanPoint *wire.OutPoint, + state channelOpeningState, shortChanID *lnwire.ShortChannelID) error { + return f.cfg.Wallet.Cfg.Database.Update(func(tx *bolt.Tx) error { + + bucket, err := tx.CreateBucketIfNotExists(channelOpeningStateBucket) + if err != nil { + return err + } + + var outpointBytes bytes.Buffer + if err = writeOutpoint(&outpointBytes, chanPoint); err != nil { + return err + } + + // Save state and the uint64 representation of the shortChanID + // for later use. + scratch := make([]byte, 10) + byteOrder.PutUint16(scratch[:2], uint16(state)) + byteOrder.PutUint64(scratch[2:], shortChanID.ToUint64()) + + if err = bucket.Put(outpointBytes.Bytes(), scratch); err != nil { + return err + } + return nil + }) +} + +// getChannelOpeningState fetches the channelOpeningState for the provided +// chanPoint from the database, or returns ErrChannelNotFound if the channel +// is not found. +func (f *fundingManager) getChannelOpeningState(chanPoint *wire.OutPoint) ( + channelOpeningState, *lnwire.ShortChannelID, error) { + + var state channelOpeningState + var shortChanID lnwire.ShortChannelID + err := f.cfg.Wallet.Cfg.Database.View(func(tx *bolt.Tx) error { + + bucket := tx.Bucket(channelOpeningStateBucket) + if bucket == nil { + // If the bucket does not exist, it means we never added + // a channel to the db, so return ErrChannelNotFound. + return ErrChannelNotFound + } + + var outpointBytes bytes.Buffer + if err := writeOutpoint(&outpointBytes, chanPoint); err != nil { + return err + } + + value := bucket.Get(outpointBytes.Bytes()) + if value == nil { + return ErrChannelNotFound + } + + state = channelOpeningState(byteOrder.Uint16(value[:2])) + shortChanID = lnwire.NewShortChanIDFromInt(byteOrder.Uint64(value[2:])) + return nil + }) + if err != nil { + return 0, nil, err + } + + return state, &shortChanID, nil +} + +// deleteChannelOpeningState removes any state for chanPoint from the database. +func (f *fundingManager) deleteChannelOpeningState(chanPoint *wire.OutPoint) error { + return f.cfg.Wallet.Cfg.Database.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(channelOpeningStateBucket) + if bucket == nil { + return fmt.Errorf("Bucket not found") + } + + var outpointBytes bytes.Buffer + if err := writeOutpoint(&outpointBytes, chanPoint); err != nil { + return err + } + + if err := bucket.Delete(outpointBytes.Bytes()); err != nil { + return err + } + return nil + }) +} diff --git a/fundingmanager_test.go b/fundingmanager_test.go new file mode 100644 index 00000000..06f3eb93 --- /dev/null +++ b/fundingmanager_test.go @@ -0,0 +1,1155 @@ +package main + +import ( + "fmt" + "io/ioutil" + "net" + "os" + "path/filepath" + "testing" + "time" + + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcd/chaincfg" + "github.com/roasbeef/btcd/chaincfg/chainhash" + _ "github.com/roasbeef/btcwallet/walletdb/bdb" + + "github.com/roasbeef/btcd/btcec" + "github.com/roasbeef/btcd/txscript" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" +) + +// The block height returned by the mock BlockChainIO's GetBestBlock. +const fundingBroadcastHeight = 123 + +var ( + privPass = []byte("dummy-pass") + + // Use hard-coded keys for Alice and Bob, the two FundingManagers that + // we will test the interaction between. + alicePrivKeyBytes = [32]byte{ + 0xb7, 0x94, 0x38, 0x5f, 0x2d, 0x1e, 0xf7, 0xab, + 0x4d, 0x92, 0x73, 0xd1, 0x90, 0x63, 0x81, 0xb4, + 0x4f, 0x2f, 0x6f, 0x25, 0x88, 0xa3, 0xef, 0xb9, + 0x6a, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53, + } + + alicePrivKey, alicePubKey = btcec.PrivKeyFromBytes(btcec.S256(), + alicePrivKeyBytes[:]) + + aliceTCPAddr, _ = net.ResolveTCPAddr("tcp", "10.0.0.2:9001") + + aliceAddr = &lnwire.NetAddress{ + IdentityKey: alicePubKey, + Address: aliceTCPAddr, + } + + bobPrivKeyBytes = [32]byte{ + 0x81, 0xb6, 0x37, 0xd8, 0xfc, 0xd2, 0xc6, 0xda, + 0x63, 0x59, 0xe6, 0x96, 0x31, 0x13, 0xa1, 0x17, + 0xd, 0xe7, 0x95, 0xe4, 0xb7, 0x25, 0xb8, 0x4d, + 0x1e, 0xb, 0x4c, 0xfd, 0x9e, 0xc5, 0x8c, 0xe9, + } + + bobPrivKey, bobPubKey = btcec.PrivKeyFromBytes(btcec.S256(), + bobPrivKeyBytes[:]) + + bobTCPAddr, _ = net.ResolveTCPAddr("tcp", "10.0.0.2:9000") + + bobAddr = &lnwire.NetAddress{ + IdentityKey: bobPubKey, + Address: bobTCPAddr, + } +) + +// mockWalletController is used by the LightningWallet, and let us mock the +// interaction with the bitcoin network. +type mockWalletController struct { + rootKey *btcec.PrivateKey + prevAddres btcutil.Address + publishedTransactions chan *wire.MsgTx +} + +// FetchInputInfo will be called to get info about the inputs to the funding +// transaction. +func (*mockWalletController) FetchInputInfo( + prevOut *wire.OutPoint) (*wire.TxOut, error) { + txOut := &wire.TxOut{ + Value: int64(10 * btcutil.SatoshiPerBitcoin), + PkScript: []byte("dummy"), + } + return txOut, nil +} +func (*mockWalletController) ConfirmedBalance(confs int32, + witness bool) (btcutil.Amount, error) { + return 0, nil +} + +// NewAddress is called to get new addresses for delivery, change etc. +func (m *mockWalletController) NewAddress(addrType lnwallet.AddressType, + change bool) (btcutil.Address, error) { + addr, _ := btcutil.NewAddressPubKey( + m.rootKey.PubKey().SerializeCompressed(), &chaincfg.MainNetParams) + return addr, nil +} +func (*mockWalletController) GetPrivKey(a btcutil.Address) (*btcec.PrivateKey, error) { + return nil, nil +} + +// NewRawKey will be called to get keys to be used for the funding tx and the +// commitment tx. +func (m *mockWalletController) NewRawKey() (*btcec.PublicKey, error) { + return m.rootKey.PubKey(), nil +} + +// FetchRootKey will be called to provide the wallet with a root key. +func (m *mockWalletController) FetchRootKey() (*btcec.PrivateKey, error) { + return m.rootKey, nil +} +func (*mockWalletController) SendOutputs(outputs []*wire.TxOut) (*chainhash.Hash, error) { + return nil, nil +} + +// ListUnspentWitness is called by the wallet when doing coin selection. We just +// need one unspent for the funding transaction. +func (*mockWalletController) ListUnspentWitness(confirms int32) ([]*lnwallet.Utxo, error) { + utxo := &lnwallet.Utxo{ + Value: btcutil.Amount(10 * btcutil.SatoshiPerBitcoin), + OutPoint: wire.OutPoint{ + Hash: chainhash.Hash{}, + Index: 0, + }, + } + var ret []*lnwallet.Utxo + ret = append(ret, utxo) + return ret, nil +} +func (*mockWalletController) ListTransactionDetails() ([]*lnwallet.TransactionDetail, error) { + return nil, nil +} +func (*mockWalletController) LockOutpoint(o wire.OutPoint) {} +func (*mockWalletController) UnlockOutpoint(o wire.OutPoint) {} +func (m *mockWalletController) PublishTransaction(tx *wire.MsgTx) error { + m.publishedTransactions <- tx + return nil +} +func (*mockWalletController) SubscribeTransactions() (lnwallet.TransactionSubscription, error) { + return nil, nil +} +func (*mockWalletController) IsSynced() (bool, error) { + return true, nil +} +func (*mockWalletController) Start() error { + return nil +} +func (*mockWalletController) Stop() error { + return nil +} + +type mockSigner struct { + key *btcec.PrivateKey +} + +func (m *mockSigner) SignOutputRaw(tx *wire.MsgTx, + signDesc *lnwallet.SignDescriptor) ([]byte, error) { + amt := signDesc.Output.Value + witnessScript := signDesc.WitnessScript + privKey := m.key + + sig, err := txscript.RawTxInWitnessSignature(tx, signDesc.SigHashes, + signDesc.InputIndex, amt, witnessScript, txscript.SigHashAll, + privKey) + if err != nil { + return nil, err + } + + return sig[:len(sig)-1], nil +} + +func (m *mockSigner) ComputeInputScript(tx *wire.MsgTx, + signDesc *lnwallet.SignDescriptor) (*lnwallet.InputScript, error) { + witnessScript, err := txscript.WitnessScript(tx, signDesc.SigHashes, + signDesc.InputIndex, signDesc.Output.Value, + signDesc.Output.PkScript, txscript.SigHashAll, m.key, true) + if err != nil { + return nil, err + } + + return &lnwallet.InputScript{ + Witness: witnessScript, + }, nil +} + +type mockChainIO struct{} + +func (*mockChainIO) GetBestBlock() (*chainhash.Hash, int32, error) { + return activeNetParams.GenesisHash, fundingBroadcastHeight, nil +} + +func (*mockChainIO) GetUtxo(op *wire.OutPoint, + heightHint uint32) (*wire.TxOut, error) { + return nil, nil +} + +func (*mockChainIO) GetBlockHash(blockHeight int64) (*chainhash.Hash, error) { + return nil, nil +} + +func (*mockChainIO) GetBlock(blockHash *chainhash.Hash) (*wire.MsgBlock, error) { + return nil, nil +} + +type mockNotifier struct { + confChannel chan *chainntnfs.TxConfirmation + epochChan chan *chainntnfs.BlockEpoch +} + +func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, numConfs, + heightHint uint32) (*chainntnfs.ConfirmationEvent, error) { + return &chainntnfs.ConfirmationEvent{ + Confirmed: m.confChannel, + }, nil +} +func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { + return &chainntnfs.BlockEpochEvent{ + Epochs: m.epochChan, + Cancel: func() {}, + }, nil +} + +func (m *mockNotifier) Start() error { + return nil +} + +func (m *mockNotifier) Stop() error { + return nil +} +func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, + heightHint uint32) (*chainntnfs.SpendEvent, error) { + return &chainntnfs.SpendEvent{ + Spend: make(chan *chainntnfs.SpendDetail), + Cancel: func() {}, + }, nil +} + +type testNode struct { + privKey *btcec.PrivateKey + msgChan chan lnwire.Message + announceChan chan lnwire.Message + publTxChan chan *wire.MsgTx + fundingMgr *fundingManager + mockNotifier *mockNotifier + testDir string +} + +func disableLogger(t *testing.T) { + channeldb.UseLogger(btclog.Disabled) + lnwallet.UseLogger(btclog.Disabled) + fndgLog = btclog.Disabled +} + +func createTestWallet(tempTestDir string, netParams *chaincfg.Params, + notifier chainntnfs.ChainNotifier, wc lnwallet.WalletController, + signer lnwallet.Signer, bio lnwallet.BlockChainIO, + estimator lnwallet.FeeEstimator) (*lnwallet.LightningWallet, error) { + + dbDir := filepath.Join(tempTestDir, "cdb") + cdb, err := channeldb.Open(dbDir) + if err != nil { + return nil, err + } + + wallet, err := lnwallet.NewLightningWallet(lnwallet.Config{ + Database: cdb, + Notifier: notifier, + WalletController: wc, + Signer: signer, + ChainIO: bio, + FeeEstimator: estimator, + NetParams: *netParams, + }) + if err != nil { + return nil, err + } + + if err := wallet.Startup(); err != nil { + return nil, err + } + + return wallet, nil +} + +func createTestFundingManager(t *testing.T, pubKey *btcec.PublicKey, + tempTestDir string, hdSeed []byte, netParams *chaincfg.Params, + chainNotifier chainntnfs.ChainNotifier, estimator lnwallet.FeeEstimator, + sentMessages chan lnwire.Message, sentAnnouncements chan lnwire.Message, + publTxChan chan *wire.MsgTx, shutdownChan chan struct{}) (*fundingManager, error) { + + wc := &mockWalletController{ + rootKey: alicePrivKey, + publishedTransactions: publTxChan, + } + signer := &mockSigner{ + key: alicePrivKey, + } + bio := &mockChainIO{} + + lnw, err := createTestWallet(tempTestDir, netParams, + chainNotifier, wc, signer, bio, estimator) + if err != nil { + t.Fatalf("unable to create test ln wallet: %v", err) + } + + arbiterChan := make(chan *lnwallet.LightningChannel) + var chanIDSeed [32]byte + + f, err := newFundingManager(fundingConfig{ + IDKey: pubKey, + Wallet: lnw, + Notifier: chainNotifier, + FeeEstimator: estimator, + SignMessage: func(pubKey *btcec.PublicKey, msg []byte) (*btcec.Signature, error) { + return nil, nil + }, + SendAnnouncement: func(msg lnwire.Message) error { + select { + case sentAnnouncements <- msg: + case <-shutdownChan: + return fmt.Errorf("shutting down") + } + return nil + }, + CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { + return lnwire.NodeAnnouncement{}, nil + }, + ArbiterChan: arbiterChan, + SendToPeer: func(target *btcec.PublicKey, msgs ...lnwire.Message) error { + select { + case sentMessages <- msgs[0]: + case <-shutdownChan: + return fmt.Errorf("shutting down") + } + return nil + }, + FindPeer: func(peerKey *btcec.PublicKey) (*peer, error) { + return nil, nil + }, + TempChanIDSeed: chanIDSeed, + FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { + // This is not expected to be used in the current tests. + // Add an implementation if that changes. + t.Fatal("did not expect FindChannel to be called") + return nil, nil + }, + NumRequiredConfs: func(chanAmt btcutil.Amount, pushAmt btcutil.Amount) uint16 { + return uint16(cfg.DefaultNumChanConfs) + }, + RequiredRemoteDelay: func(amt btcutil.Amount) uint16 { + return 4 + }, + }) + if err != nil { + t.Fatalf("failed creating fundingManager: %v", err) + } + + return f, nil +} + +func recreateAliceFundingManager(t *testing.T, alice *testNode) { + // Stop the old fundingManager before creating a new one. + if err := alice.fundingMgr.Stop(); err != nil { + t.Fatalf("unable to stop old fundingManager: %v", err) + } + + aliceMsgChan := make(chan lnwire.Message) + aliceAnnounceChan := make(chan lnwire.Message) + + oldCfg := alice.fundingMgr.cfg + + f, err := newFundingManager(fundingConfig{ + IDKey: oldCfg.IDKey, + Wallet: oldCfg.Wallet, + Notifier: oldCfg.Notifier, + FeeEstimator: oldCfg.FeeEstimator, + SignMessage: func(pubKey *btcec.PublicKey, + msg []byte) (*btcec.Signature, error) { + return nil, nil + }, + SendAnnouncement: func(msg lnwire.Message) error { + aliceAnnounceChan <- msg + return nil + }, + CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement, error) { + return lnwire.NodeAnnouncement{}, nil + }, + ArbiterChan: oldCfg.ArbiterChan, + SendToPeer: func(target *btcec.PublicKey, + msgs ...lnwire.Message) error { + aliceMsgChan <- msgs[0] + return nil + }, + FindPeer: func(peerKey *btcec.PublicKey) (*peer, error) { + return nil, nil + }, + TempChanIDSeed: oldCfg.TempChanIDSeed, + FindChannel: oldCfg.FindChannel, + }) + if err != nil { + t.Fatalf("failed recreating aliceFundingManager: %v", err) + } + + alice.fundingMgr = f + alice.msgChan = aliceMsgChan + alice.announceChan = aliceAnnounceChan + + if err = f.Start(); err != nil { + t.Fatalf("failed starting fundingManager: %v", err) + } +} + +func setupFundingManagers(t *testing.T, shutdownChannel chan struct{}) (*testNode, *testNode) { + // We need to set the global config, as fundingManager uses + // MaxPendingChannels, and it is usually set in lndMain(). + cfg = &config{ + MaxPendingChannels: defaultMaxPendingChannels, + } + + netParams := activeNetParams.Params + estimator := lnwallet.StaticFeeEstimator{FeeRate: 250} + + aliceMockNotifier := &mockNotifier{ + confChannel: make(chan *chainntnfs.TxConfirmation, 1), + epochChan: make(chan *chainntnfs.BlockEpoch, 1), + } + + aliceTestDir, err := ioutil.TempDir("", "alicelnwallet") + if err != nil { + t.Fatalf("unable to create temp directory: %v", err) + } + + aliceMsgChan := make(chan lnwire.Message) + aliceAnnounceChan := make(chan lnwire.Message) + alicePublTxChan := make(chan *wire.MsgTx, 1) + + aliceFundingMgr, err := createTestFundingManager(t, alicePubKey, + aliceTestDir, alicePrivKeyBytes[:], netParams, aliceMockNotifier, + estimator, aliceMsgChan, aliceAnnounceChan, alicePublTxChan, + shutdownChannel) + if err != nil { + t.Fatalf("failed creating fundingManager: %v", err) + } + + if err = aliceFundingMgr.Start(); err != nil { + t.Fatalf("failed starting fundingManager: %v", err) + } + + alice := &testNode{ + privKey: alicePrivKey, + msgChan: aliceMsgChan, + announceChan: aliceAnnounceChan, + publTxChan: alicePublTxChan, + fundingMgr: aliceFundingMgr, + mockNotifier: aliceMockNotifier, + testDir: aliceTestDir, + } + + bobMockNotifier := &mockNotifier{ + confChannel: make(chan *chainntnfs.TxConfirmation, 1), + epochChan: make(chan *chainntnfs.BlockEpoch, 1), + } + + bobTestDir, err := ioutil.TempDir("", "boblnwallet") + if err != nil { + t.Fatalf("unable to create temp directory: %v", err) + } + + bobMsgChan := make(chan lnwire.Message) + bobAnnounceChan := make(chan lnwire.Message) + bobPublTxChan := make(chan *wire.MsgTx, 1) + bobFundingMgr, err := createTestFundingManager(t, bobPubKey, bobTestDir, + bobPrivKeyBytes[:], netParams, bobMockNotifier, estimator, + bobMsgChan, bobAnnounceChan, bobPublTxChan, shutdownChannel) + if err != nil { + t.Fatalf("failed creating fundingManager: %v", err) + } + + if err = bobFundingMgr.Start(); err != nil { + t.Fatalf("failed starting fundingManager: %v", err) + } + + bob := &testNode{ + privKey: bobPrivKey, + msgChan: bobMsgChan, + announceChan: bobAnnounceChan, + publTxChan: bobPublTxChan, + fundingMgr: bobFundingMgr, + mockNotifier: bobMockNotifier, + testDir: bobTestDir, + } + + return alice, bob +} + +func tearDownFundingManagers(t *testing.T, a, b *testNode, shutdownChannel chan struct{}) { + close(shutdownChannel) + + if err := a.fundingMgr.Stop(); err != nil { + t.Fatalf("unable to stop fundingManager: %v", err) + } + if err := b.fundingMgr.Stop(); err != nil { + t.Fatalf("unable to stop fundingManager: %v", err) + } + os.RemoveAll(a.testDir) + os.RemoveAll(b.testDir) +} + +// openChannel takes the funding process to the point where the funding +// transaction is confirmed on-chain. Returns the funding out point. +func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, + pushAmt btcutil.Amount, numConfs uint32, + updateChan chan *lnrpc.OpenStatusUpdate) *wire.OutPoint { + // Create a funding request and start the workflow. + errChan := make(chan error, 1) + initReq := &openChanReq{ + targetPeerID: int32(1), + targetPubkey: bob.privKey.PubKey(), + chainHash: *activeNetParams.GenesisHash, + localFundingAmt: localFundingAmt, + pushAmt: pushAmt, + updates: updateChan, + err: errChan, + } + + alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + + // Alice should have sent the OpenChannel message to Bob. + var aliceMsg lnwire.Message + select { + case aliceMsg = <-alice.msgChan: + case err := <-initReq.err: + t.Fatalf("error init funding workflow: %v", err) + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send OpenChannel message") + } + + openChannelReq, ok := aliceMsg.(*lnwire.OpenChannel) + if !ok { + errorMsg, gotError := aliceMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected OpenChannel to be sent "+ + "from bob, instead got error: (%v) %v", + errorMsg.Code, string(errorMsg.Data)) + } + t.Fatalf("expected OpenChannel to be sent from "+ + "alice, instead got %T", aliceMsg) + } + + // Let Bob handle the init message. + bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) + + // Bob should answer with an AcceptChannel. + var bobMsg lnwire.Message + select { + case bobMsg = <-bob.msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send AcceptChannel message") + } + + acceptChannelResponse, ok := bobMsg.(*lnwire.AcceptChannel) + if !ok { + errorMsg, gotError := bobMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected AcceptChannel to be sent "+ + "from bob, instead got error: (%v) %v", + errorMsg.Code, string(errorMsg.Data)) + } + t.Fatalf("expected AcceptChannel to be sent from bob, "+ + "instead got %T", bobMsg) + } + + // Forward the response to Alice. + alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr) + + // Alice responds with a FundingCreated messages. + select { + case aliceMsg = <-alice.msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send FundingCreated message") + } + fundingCreated, ok := aliceMsg.(*lnwire.FundingCreated) + if !ok { + errorMsg, gotError := aliceMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected FundingCreated to be sent "+ + "from bob, instead got error: (%v) %v", + errorMsg.Code, string(errorMsg.Data)) + } + t.Fatalf("expected FundingCreated to be sent from "+ + "alice, instead got %T", aliceMsg) + } + + // Give the message to Bob. + bob.fundingMgr.processFundingCreated(fundingCreated, aliceAddr) + + // Finally, Bob should send the FundingSigned message. + select { + case bobMsg = <-bob.msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send FundingSigned message") + } + + fundingSigned, ok := bobMsg.(*lnwire.FundingSigned) + if !ok { + errorMsg, gotError := bobMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected FundingSigned to be "+ + "sent from bob, instead got error: (%v) %v", + errorMsg.Code, string(errorMsg.Data)) + } + t.Fatalf("expected FundingSigned to be sent from "+ + "bob, instead got %T", bobMsg) + } + + // Forward the signature to Alice. + alice.fundingMgr.processFundingSigned(fundingSigned, bobAddr) + + // After Alice processes the singleFundingSignComplete message, she will + // broadcast the funding transaction to the network. We expect to get a + // channel update saying the channel is pending. + var pendingUpdate *lnrpc.OpenStatusUpdate + select { + case pendingUpdate = <-updateChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send OpenStatusUpdate_ChanPending") + } + + _, ok = pendingUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanPending) + if !ok { + t.Fatal("OpenStatusUpdate was not OpenStatusUpdate_ChanPending") + } + + // Get and return the transaction Alice published to the network. + var publ *wire.MsgTx + select { + case publ = <-alice.publTxChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not publish funding tx") + } + + fundingOutPoint := &wire.OutPoint{ + Hash: publ.TxHash(), + Index: 0, + } + return fundingOutPoint +} + +func TestFundingManagerNormalWorkflow(t *testing.T) { + disableLogger(t) + + shutdownChannel := make(chan struct{}) + + alice, bob := setupFundingManagers(t, shutdownChannel) + defer tearDownFundingManagers(t, alice, bob, shutdownChannel) + + // We will consume the channel updates as we go, so no buffering is needed. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + + // Run through the process of opening the channel, up until the funding + // transaction is broadcasted. + fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan) + + // Notify that transaction was mined + alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + + // Give fundingManager time to process the newly mined tx and write + //state to database. + time.Sleep(300 * time.Millisecond) + + // The funding transaction was mined, so assert that both funding + // managers now have the state of this channel 'markedOpen' in their + // internal state machine. + state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != markedOpen { + t.Fatalf("expected state to be markedOpen, was %v", state) + } + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != markedOpen { + t.Fatalf("expected state to be markedOpen, was %v", state) + } + + // After the funding transaction is mined, Alice will send + // fundingLocked to Bob. + var fundingLockedAlice lnwire.Message + select { + case fundingLockedAlice = <-alice.msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send fundingLocked") + } + if fundingLockedAlice.MsgType() != lnwire.MsgFundingLocked { + t.Fatalf("expected fundingLocked sent from Alice, "+ + "instead got %T", fundingLockedAlice) + } + + // And similarly Bob will send funding locked to Alice. + var fundingLockedBob lnwire.Message + select { + case fundingLockedBob = <-bob.msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send fundingLocked") + } + + if fundingLockedBob.MsgType() != lnwire.MsgFundingLocked { + t.Fatalf("expected fundingLocked sent from Bob, "+ + "instead got %T", fundingLockedBob) + } + + // Sleep to make sure database write is finished. + time.Sleep(300 * time.Millisecond) + + // Check that the state machine is updated accordingly + state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != fundingLockedSent { + t.Fatalf("expected state to be fundingLockedSent, was %v", state) + } + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != fundingLockedSent { + t.Fatalf("expected state to be fundingLockedSent, was %v", state) + } + + // After the FundingLocked message is sent, the channel will be announced. + // A chanAnnouncement consists of three distinct messages: + // 1) ChannelAnnouncement + // 2) ChannelUpdate + // 3) AnnounceSignatures + // that will be announced in no particular order. + // A node announcement will also be sent. + announcements := make([]lnwire.Message, 4) + for i := 0; i < len(announcements); i++ { + select { + case announcements[i] = <-alice.announceChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send announcement %v", i) + } + } + + gotChannelAnnouncement := false + gotChannelUpdate := false + gotAnnounceSignatures := false + gotNodeAnnouncement := false + + for _, msg := range announcements { + switch msg.(type) { + case *lnwire.ChannelAnnouncement: + gotChannelAnnouncement = true + case *lnwire.ChannelUpdate: + gotChannelUpdate = true + case *lnwire.AnnounceSignatures: + gotAnnounceSignatures = true + case *lnwire.NodeAnnouncement: + gotNodeAnnouncement = true + } + } + + if !gotChannelAnnouncement { + t.Fatalf("did not get ChannelAnnouncement from Alice") + } + if !gotChannelUpdate { + t.Fatalf("did not get ChannelUpdate from Alice") + } + if !gotAnnounceSignatures { + t.Fatalf("did not get AnnounceSignatures from Alice") + } + if !gotNodeAnnouncement { + t.Fatalf("did not get NodeAnnouncement from Alice") + } + + // Do the check for Bob as well. + for i := 0; i < len(announcements); i++ { + select { + case announcements[i] = <-bob.announceChan: + case <-time.After(time.Second * 5): + t.Fatalf("bob did not send announcement %v", i) + } + } + + gotChannelAnnouncement = false + gotChannelUpdate = false + gotAnnounceSignatures = false + gotNodeAnnouncement = false + + for _, msg := range announcements { + switch msg.(type) { + case *lnwire.ChannelAnnouncement: + gotChannelAnnouncement = true + case *lnwire.ChannelUpdate: + gotChannelUpdate = true + case *lnwire.AnnounceSignatures: + gotAnnounceSignatures = true + case *lnwire.NodeAnnouncement: + gotNodeAnnouncement = true + } + } + + if !gotChannelAnnouncement { + t.Fatalf("did not get ChannelAnnouncement from Bob") + } + if !gotChannelUpdate { + t.Fatalf("did not get ChannelUpdate from Bob") + } + if !gotAnnounceSignatures { + t.Fatalf("did not get AnnounceSignatures from Bob") + } + if !gotNodeAnnouncement { + t.Fatalf("did not get NodeAnnouncement from Bob") + } + + // The funding process is now finished, wait for the + // OpenStatusUpdate_ChanOpen update + var openUpdate *lnrpc.OpenStatusUpdate + select { + case openUpdate = <-updateChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send OpenStatusUpdate") + } + + _, ok := openUpdate.Update.(*lnrpc.OpenStatusUpdate_ChanOpen) + if !ok { + t.Fatal("OpenStatusUpdate was not OpenStatusUpdate_ChanOpen") + } + + // The internal state-machine should now have deleted the channelStates + // from the database, as the channel is announced. + state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != ErrChannelNotFound { + t.Fatalf("expected to not find channel state, but got: %v", state) + } + + // Need to give bob time to update database. + time.Sleep(300 * time.Millisecond) + + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != ErrChannelNotFound { + t.Fatalf("expected to not find channel state, but got: %v", state) + } + +} + +func TestFundingManagerRestartBehavior(t *testing.T) { + disableLogger(t) + + shutdownChannel := make(chan struct{}) + + alice, bob := setupFundingManagers(t, shutdownChannel) + defer tearDownFundingManagers(t, alice, bob, shutdownChannel) + + // Run through the process of opening the channel, up until the funding + // transaction is broadcasted. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan) + + // After the funding transaction gets mined, both nodes will send the + // fundingLocked message to the other peer. If the funding node fails + // before this message has been successfully sent, it should retry + // sending it on restart. We mimic this behavior by letting the + // SendToPeer method return an error, as if the message was not + // successfully sent. We then the fundingManager and make sure + // it continues the process as expected. + alice.fundingMgr.cfg.SendToPeer = func(target *btcec.PublicKey, + msgs ...lnwire.Message) error { + return fmt.Errorf("intentional error in SendToPeer") + } + + // Notify that transaction was mined + alice.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + bob.mockNotifier.confChannel <- &chainntnfs.TxConfirmation{} + + // Give fundingManager time to process the newly mined tx and write to + // the database. + time.Sleep(500 * time.Millisecond) + + // The funding transaction was mined, so assert that both funding + // managers now have the state of this channel 'markedOpen' in their + // internal state machine. + state, _, err := alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != markedOpen { + t.Fatalf("expected state to be markedOpen, was %v", state) + } + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != markedOpen { + t.Fatalf("expected state to be markedOpen, was %v", state) + } + + // After the funding transaction was mined, Bob should have successfully + // sent the fundingLocked message, while Alice failed sending it. In + // Alice's case this means that there should be no messages for Bob, and + // the channel should still be in state 'markedOpen' + + select { + case msg := <-alice.msgChan: + t.Fatalf("did not expect any message from Alice: %v", msg) + default: + // Expected. + } + + // Bob will send funding locked to Alice + fundingLockedBob := <-bob.msgChan + if fundingLockedBob.MsgType() != lnwire.MsgFundingLocked { + t.Fatalf("expected fundingLocked sent from Bob, "+ + "instead got %T", fundingLockedBob) + } + + // Sleep to make sure database write is finished. + time.Sleep(300 * time.Millisecond) + + // Alice should still be markedOpen + state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != markedOpen { + t.Fatalf("expected state to be markedOpen, was %v", state) + } + + // While Bob successfully sent fundingLocked. + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != fundingLockedSent { + t.Fatalf("expected state to be fundingLockedSent, was %v", state) + } + + // We now recreate Alice's fundingManager, and expect it to retry + // sending the fundingLocked message. + recreateAliceFundingManager(t, alice) + time.Sleep(300 * time.Millisecond) + + // Intetionally make the next channel announcement fail + alice.fundingMgr.cfg.SendAnnouncement = func(msg lnwire.Message) error { + return fmt.Errorf("intentional error in SendAnnouncement") + } + + fundingLockedAlice := <-alice.msgChan + if fundingLockedAlice.MsgType() != lnwire.MsgFundingLocked { + t.Fatalf("expected fundingLocked sent from Alice, "+ + "instead got %T", fundingLockedAlice) + } + + // Sleep to make sure database write is finished. + time.Sleep(500 * time.Millisecond) + + // The state should now be fundingLockedSent + state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != nil { + t.Fatalf("unable to get channel state: %v", err) + } + + if state != fundingLockedSent { + t.Fatalf("expected state to be fundingLockedSent, was %v", state) + } + + // Check that the channel announcements were never sent + select { + case ann := <-alice.announceChan: + t.Fatalf("unexpectedly got channel announcement message: %v", ann) + default: + // Expected + } + + // Bob, however, should send the announcements + announcements := make([]lnwire.Message, 4) + for i := 0; i < len(announcements); i++ { + select { + case announcements[i] = <-bob.announceChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send announcement %v", i) + } + } + + gotChannelAnnouncement := false + gotChannelUpdate := false + gotAnnounceSignatures := false + gotNodeAnnouncement := false + + for _, msg := range announcements { + switch msg.(type) { + case *lnwire.ChannelAnnouncement: + gotChannelAnnouncement = true + case *lnwire.ChannelUpdate: + gotChannelUpdate = true + case *lnwire.AnnounceSignatures: + gotAnnounceSignatures = true + case *lnwire.NodeAnnouncement: + gotNodeAnnouncement = true + } + } + + if !gotChannelAnnouncement { + t.Fatalf("did not get ChannelAnnouncement from Bob") + } + if !gotChannelUpdate { + t.Fatalf("did not get ChannelUpdate from Bob") + } + if !gotAnnounceSignatures { + t.Fatalf("did not get AnnounceSignatures from Bob") + } + if !gotNodeAnnouncement { + t.Fatalf("did not get NodeAnnouncement from Bob") + } + + // Next up, we check that the Alice rebroadcasts the announcement + // messages on restart. + recreateAliceFundingManager(t, alice) + time.Sleep(300 * time.Millisecond) + for i := 0; i < len(announcements); i++ { + select { + case announcements[i] = <-alice.announceChan: + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send announcement %v", i) + } + } + + gotChannelAnnouncement = false + gotChannelUpdate = false + gotAnnounceSignatures = false + gotNodeAnnouncement = false + + for _, msg := range announcements { + switch msg.(type) { + case *lnwire.ChannelAnnouncement: + gotChannelAnnouncement = true + case *lnwire.ChannelUpdate: + gotChannelUpdate = true + case *lnwire.AnnounceSignatures: + gotAnnounceSignatures = true + case *lnwire.NodeAnnouncement: + gotNodeAnnouncement = true + } + } + + if !gotChannelAnnouncement { + t.Fatalf("did not get ChannelAnnouncement from Alice after restart") + } + if !gotChannelUpdate { + t.Fatalf("did not get ChannelUpdate from Alice after restart") + } + if !gotAnnounceSignatures { + t.Fatalf("did not get AnnounceSignatures from Alice after restart") + } + if !gotNodeAnnouncement { + t.Fatalf("did not get NodeAnnouncement from Alice after restart") + } + + // The funding process is now finished. Since we recreated the + // fundingManager, we don't have an update channel to synchronize on, + // so a small sleep makes sure the database writing is finished. + time.Sleep(300 * time.Millisecond) + + // The internal state-machine should now have deleted them from the + // internal database, as the channel is announced. + state, _, err = alice.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != ErrChannelNotFound { + t.Fatalf("expected to not find channel state, but got: %v", state) + } + + state, _, err = bob.fundingMgr.getChannelOpeningState(fundingOutPoint) + if err != ErrChannelNotFound { + t.Fatalf("expected to not find channel state, but got: %v", state) + } + +} + +func TestFundingManagerFundingTimeout(t *testing.T) { + disableLogger(t) + + shutdownChannel := make(chan struct{}) + + alice, bob := setupFundingManagers(t, shutdownChannel) + defer tearDownFundingManagers(t, alice, bob, shutdownChannel) + + // We will consume the channel updates as we go, so no buffering is needed. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + + // Run through the process of opening the channel, up until the funding + // transaction is broadcasted. + _ = openChannel(t, alice, bob, 500000, 0, 1, updateChan) + + // Bob will at this point be waiting for the funding transaction to be + // confirmed, so the channel should be considered pending. + pendingChannels, err := bob.fundingMgr.cfg.Wallet.Cfg.Database.FetchPendingChannels() + if err != nil { + t.Fatalf("unable to fetch pending channels: %v", err) + } + if len(pendingChannels) != 1 { + t.Fatalf("Expected Bob to have 1 pending channel, had %v", + len(pendingChannels)) + } + + // We expect Bob to forget the channel after 288 blocks (48 hours), so + // mine 287, and check that it is still pending. + bob.mockNotifier.epochChan <- &chainntnfs.BlockEpoch{ + Height: fundingBroadcastHeight + 287, + } + + time.Sleep(300 * time.Millisecond) + + // Bob should still be waiting for the channel to open. + pendingChannels, err = bob.fundingMgr.cfg.Wallet.Cfg.Database.FetchPendingChannels() + if err != nil { + t.Fatalf("unable to fetch pending channels: %v", err) + } + if len(pendingChannels) != 1 { + t.Fatalf("Expected Bob to have 1 pending channel, had %v", + len(pendingChannels)) + } + + bob.mockNotifier.epochChan <- &chainntnfs.BlockEpoch{ + Height: fundingBroadcastHeight + 288, + } + + // It takes some time for Bob to update the database, so sleep for + // some time. + time.Sleep(300 * time.Millisecond) + + pendingChannels, err = bob.fundingMgr.cfg.Wallet.Cfg.Database.FetchPendingChannels() + if err != nil { + t.Fatalf("unable to fetch pending channels: %v", err) + } + if len(pendingChannels) != 0 { + t.Fatalf("Expected Bob to have 0 pending channel, had %v", + len(pendingChannels)) + } +} diff --git a/lnd.go b/lnd.go index f45cefd8..c9bc9234 100644 --- a/lnd.go +++ b/lnd.go @@ -140,9 +140,9 @@ func lndMain() error { return server.genNodeAnnouncement(true) }, SendAnnouncement: func(msg lnwire.Message) error { - server.discoverSrv.ProcessLocalAnnouncement(msg, + errChan := server.discoverSrv.ProcessLocalAnnouncement(msg, idPrivKey.PubKey()) - return nil + return <-errChan }, ArbiterChan: server.breachArbiter.newContracts, SendToPeer: server.sendToPeer,