fundingManager: persist state in opening process.

Persists the state of a channel opening process after funding
transaction is confirmed. This tracks the messages sent to
the peer such that the process can be continued in case of a
restart. Also introduces that the receiver side forgets about
channel if funding transaction is not confirmed in 48hrs.
This commit is contained in:
Johan T. Halseth 2017-06-08 19:48:07 +02:00 committed by Olaoluwa Osuntokun
parent 6858b1e1b2
commit 849d0b93b1
4 changed files with 1625 additions and 22 deletions

View File

@ -789,6 +789,12 @@ const (
// BreachClose indicates that one peer attempted to broadcast a prior
// _revoked_ channel state.
BreachClose
// FundingCanceled indicates that the channel never was fully opened before it
// was marked as closed in the database. This can happen if we or the remote
// fail at some point during the opening workflow, or we timeout waiting for
// the funding transaction to be confirmed.
FundingCanceled
)
// ChannelCloseSummary contains the final state of a channel at the point it

View File

@ -3,12 +3,14 @@ package main
import (
"bytes"
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"time"
"golang.org/x/crypto/salsa20"
"github.com/boltdb/bolt"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
@ -38,6 +40,11 @@ const (
//
// TODO(roasbeef): add command line param to modify
maxFundingAmount = btcutil.Amount(1 << 24)
// maxWaitNumBlocksFundingConf is the maximum number of blocks to wait
// for the funding transaction to be confirmed before forgetting about
// the channel. 288 blocks is ~48 hrs
maxWaitNumBlocksFundingConf = 288
)
// reservationWithCtx encapsulates a pending channel reservation. This wrapper
@ -283,6 +290,35 @@ type fundingManager struct {
wg sync.WaitGroup
}
// channelOpeningState represents the different states a channel can be in
// between the funding transaction has been confirmed and the channel is
// announced to the network and ready to be used.
type channelOpeningState uint8
const (
// markedOpen is the opening state of a channel if the funding
// transaction is confirmed on-chain, but fundingLocked is not yet
// successfully sent to the other peer.
markedOpen channelOpeningState = iota
// fundingLockedSent is the opening state of a channel if the
// fundingLocked message has successfully been sent to the other peer,
// but we still haven't announced the channel to the network.
fundingLockedSent
)
var (
// channelOpeningStateBucket is the database bucket used to store the
// channelOpeningState for each channel that is currently in the process
// of being opened.
channelOpeningStateBucket = []byte("channelOpeningState")
// ErrChannelNotFound is returned when we are looking for a specific
// channel opening state in the FundingManager's internal database, but
// the channel in question is not considered being in an opening state.
ErrChannelNotFound = fmt.Errorf("channel not found in db")
)
// newFundingManager creates and initializes a new instance of the
// fundingManager.
func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
@ -335,7 +371,101 @@ func (f *fundingManager) Start() error {
f.localDiscoverySignals[chanID] = make(chan struct{})
doneChan := make(chan struct{})
go f.waitForFundingConfirmation(channel, doneChan)
timeoutChan := make(chan struct{})
go func(ch *channeldb.OpenChannel) {
go f.waitForFundingWithTimeout(ch, doneChan, timeoutChan)
select {
case <-timeoutChan:
// Timeout waiting for the funding transaction
// to confirm, so we forget the channel and
// delete it from the database.
closeInfo := &channeldb.ChannelCloseSummary{
ChanPoint: ch.FundingOutpoint,
RemotePub: ch.IdentityPub,
CloseType: channeldb.FundingCanceled,
}
if err := ch.CloseChannel(closeInfo); err != nil {
fndgLog.Errorf("Failed closing channel "+
"%v: %v", ch.FundingOutpoint, err)
}
case <-f.quit:
// The fundingManager is shutting down, and will
// resume wait on startup.
case <-doneChan:
// Success, funding transaction was confirmed.
}
}(channel)
}
// Fetch all our open channels, and make sure they all finalized the
// opening process.
// TODO(halseth): this check is only done on restart atm, but should
// also be done if a peer that disappeared during the opening process
// reconnects.
openChannels, err := f.cfg.Wallet.Cfg.Database.FetchAllChannels()
if err != nil {
return err
}
for _, channel := range openChannels {
channelState, shortChanID, err := f.getChannelOpeningState(
&channel.FundingOutpoint)
if err == ErrChannelNotFound {
// Channel not in fundingManager's opening database,
// meaning it was successully announced to the network.
continue
} else if err != nil {
return err
}
fndgLog.Debugf("channel with opening state %v found",
channelState)
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
f.localDiscoverySignals[chanID] = make(chan struct{})
// If we did find the channel in the opening state database, we
// have seen the funding transaction being confirmed, but we
// did not finish the rest of the setup procedure before we shut
// down. We handle the remaining steps of this setup by
// continuing the procedure where we left off.
switch channelState {
case markedOpen:
// The funding transaction was confirmed, but we did not
// successfully send the fundingLocked message to the
// peer, so let's do that now.
f.wg.Add(1)
go func() {
defer f.wg.Done()
f.sendFundingLockedAndAnnounceChannel(channel,
shortChanID)
}()
case fundingLockedSent:
// fundingLocked was sent to peer, but the channel
// announcement was not sent.
f.wg.Add(1)
go func() {
defer f.wg.Done()
lnChannel, err := lnwallet.NewLightningChannel(
nil, nil, f.cfg.FeeEstimator, channel)
if err != nil {
fndgLog.Errorf("error creating "+
"lightning channel: %v", err)
}
defer lnChannel.Stop()
f.sendChannelAnnouncement(channel, lnChannel,
shortChanID)
}()
default:
fndgLog.Errorf("undefined channelState: %v",
channelState)
}
}
f.wg.Add(1) // TODO(roasbeef): tune
@ -877,6 +1007,8 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// With all the necessary data available, attempt to advance the
// funding workflow to the next stage. If this succeeds then the
// funding transaction will broadcast after our next message.
// CompleteReservationSingle will also mark the channel as 'IsPending'
// in the database.
commitSig := fmsg.msg.CommitSig.Serialize()
completeChan, err := resCtx.reservation.CompleteReservationSingle(
&fundingOut, commitSig)
@ -887,6 +1019,22 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
return
}
// If something goes wrong before the funding transaction is confirmed,
// we use this convenience method to delete the pending OpenChannel
// from the database.
deleteFromDatabase := func() {
closeInfo := &channeldb.ChannelCloseSummary{
ChanPoint: completeChan.FundingOutpoint,
RemotePub: completeChan.IdentityPub,
CloseType: channeldb.FundingCanceled,
}
if err := completeChan.CloseChannel(closeInfo); err != nil {
fndgLog.Errorf("Failed closing channel %v: %v",
completeChan.FundingOutpoint, err)
}
}
// A new channel has almost finished the funding process. In order to
// properly synchronize with the writeHandler goroutine, we add a new
// channel to the barriers map which will be closed once the channel is
@ -910,6 +1058,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
if err != nil {
fndgLog.Errorf("unable to parse signature: %v", err)
cancelReservation()
deleteFromDatabase()
return
}
@ -920,6 +1069,7 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
if err := f.cfg.SendToPeer(peerKey, fundingSigned); err != nil {
fndgLog.Errorf("unable to send FundingSigned message: %v", err)
cancelReservation()
deleteFromDatabase()
return
}
@ -933,12 +1083,37 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// With this last message, our job as the responder is now complete.
// We'll wait for the funding transaction to reach the specified number
// of confirmations, then start normal operations.
//
// When we get to this point we have sent the signComplete message to
// the channel funder, and BOLT#2 specifies that we MUST remember the
// channel for reconnection. The channel is already marked
// as pending in the database, so in case of a disconnect or restart,
// we will continue waiting for the confirmation the next time we start
// the funding manager. In case the funding transaction never appears
// on the blockchain, we must forget this channel. We therefore
// completely forget about this channel if we haven't seen the funding
// transaction in 288 blocks (~ 48 hrs), by canceling the reservation
// and canceling the wait for the funding confirmation.
go func() {
doneChan := make(chan struct{})
go f.waitForFundingConfirmation(completeChan, doneChan)
timeoutChan := make(chan struct{})
go f.waitForFundingWithTimeout(completeChan, doneChan,
timeoutChan)
<-doneChan
f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID)
select {
case <-timeoutChan:
// We did not see the funding confirmation before
// timeout, so we forget the channel.
cancelReservation()
deleteFromDatabase()
case <-f.quit:
// The fundingManager is shutting down, will resume
// wait for funding transaction on startup.
case <-doneChan:
// Success, funding transaction was confirmed.
f.deleteReservationCtx(peerKey,
fmsg.msg.PendingChannelID)
}
}()
}
@ -1021,7 +1196,18 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
go func() {
doneChan := make(chan struct{})
go f.waitForFundingConfirmation(completeChan, doneChan)
cancelChan := make(chan struct{})
// In case the fundingManager is stopped at some point during
// the remaining part of the opening process, we must wait for
// this process to finish (either successully or with some
// error), before the fundingManager can be shut down.
f.wg.Add(1)
go func() {
defer f.wg.Done()
f.waitForFundingConfirmation(completeChan, cancelChan,
doneChan)
}()
select {
case <-f.quit:
@ -1047,13 +1233,75 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
}()
}
// waitForFundingWithTimeout is a wrapper around waitForFundingConfirmation that
// will cancel the wait for confirmation if maxWaitNumBlocksFundingConf has
// passed from bestHeight. In the case of timeout, the timeoutChan will be
// closed. In case of confirmation or error, doneChan will be closed.
func (f *fundingManager) waitForFundingWithTimeout(completeChan *channeldb.OpenChannel,
doneChan chan<- struct{}, timeoutChan chan<- struct{}) {
epochClient, err := f.cfg.Notifier.RegisterBlockEpochNtfn()
if err != nil {
fndgLog.Errorf("unable to register for epoch notification: %v",
err)
close(doneChan)
return
}
waitingDoneChan := make(chan struct{})
cancelChan := make(chan struct{})
// Add this goroutine to wait group so we can be sure that it is
// properly stopped before the funding manager can be shut down.
f.wg.Add(1)
go func() {
defer f.wg.Done()
f.waitForFundingConfirmation(completeChan, cancelChan,
waitingDoneChan)
}()
// On block maxHeight we will cancel the funding confirmation wait.
maxHeight := completeChan.FundingBroadcastHeight + maxWaitNumBlocksFundingConf
for {
select {
case epoch, ok := <-epochClient.Epochs:
if !ok {
fndgLog.Warnf("Epoch client shutting down")
return
}
if uint32(epoch.Height) >= maxHeight {
fndgLog.Warnf("waited for %v blocks without "+
"seeing funding transaction confirmed,"+
" cancelling.", maxWaitNumBlocksFundingConf)
// Cancel the waitForFundingConfirmation
// goroutine.
close(cancelChan)
// Notify the caller of the timeout.
close(timeoutChan)
return
}
case <-f.quit:
// The fundingManager is shutting down, will resume
// waiting for the funding transaction on startup.
return
case <-waitingDoneChan:
close(doneChan)
return
}
}
}
// waitForFundingConfirmation handles the final stages of the channel funding
// process once the funding transaction has been broadcast. The primary
// function of waitForFundingConfirmation is to wait for blockchain
// confirmation, and then to notify the other systems that must be notified
// when a channel has become active for lightning transactions.
// The wait can be canceled by closing the cancelChan.
func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.OpenChannel,
doneChan chan struct{}) {
cancelChan <-chan struct{}, doneChan chan<- struct{}) {
defer close(doneChan)
@ -1072,9 +1320,25 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open
fndgLog.Infof("Waiting for funding tx (%v) to reach %v confirmations",
txid, numConfs)
var confDetails *chainntnfs.TxConfirmation
var ok bool
// Wait until the specified number of confirmations has been reached,
// or the wallet signals a shutdown.
confDetails, ok := <-confNtfn.Confirmed
// we get a cancel signal, or the wallet signals a shutdown.
select {
case confDetails, ok = <-confNtfn.Confirmed:
// fallthrough
case <-cancelChan:
fndgLog.Warnf("canceled waiting for funding confirmation, "+
"stopping funding flow for ChannelPoint(%v)",
completeChan.FundingOutpoint)
return
case <-f.quit:
fndgLog.Warnf("fundingManager shutting down, stopping funding "+
"flow for ChannelPoint(%v)", completeChan.FundingOutpoint)
return
}
if !ok {
fndgLog.Warnf("ChainNotifier shutting down, cannot complete "+
"funding flow for ChannelPoint(%v)",
@ -1110,6 +1374,35 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open
// TODO(roasbeef): ideally persistent state update for chan above
// should be abstracted
// The funding transaction now being confirmed, we add this channel to
// the fundingManager's internal persistant state machine that we use
// to track the remaining process of the channel opening. This is useful
// to resume the opening process in case of restarts.
//
// TODO(halseth): make the two db transactions (MarkChannelAsOpen and
// saveChannelOpeningState) atomic by doing them in the same transaction.
// Needed to be properly fault-tolerant.
err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, markedOpen,
&shortChanID)
if err != nil {
fndgLog.Errorf("error setting channel state to markedOpen: %v",
err)
return
}
// Now that the funding transaction has the required number of
// confirmations, we send the fundingLocked message to the peer.
f.sendFundingLockedAndAnnounceChannel(completeChan, &shortChanID)
}
// sendFundingLockedAndAnnounceChannel creates and sends the fundingLocked
// message, and then the channel announcement. This should be called after the
// funding transaction has been confirmed, and the channelState is 'markedOpen'.
func (f *fundingManager) sendFundingLockedAndAnnounceChannel(
completeChan *channeldb.OpenChannel, shortChanID *lnwire.ShortChannelID) {
chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
// With the channel marked open, we'll create the state-machine object
// which wraps the database state.
channel, err := lnwallet.NewLightningChannel(nil, nil,
@ -1130,16 +1423,60 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open
return
}
fundingLockedMsg := lnwire.NewFundingLocked(chanID, nextRevocation)
f.cfg.SendToPeer(completeChan.IdentityPub, fundingLockedMsg)
fndgLog.Infof("Announcing ChannelPoint(%v), short_chan_id=%v", fundingPoint,
spew.Sdump(shortChanID))
err = f.cfg.SendToPeer(completeChan.IdentityPub, fundingLockedMsg)
if err != nil {
fndgLog.Errorf("unable to send fundingLocked to peer: %v", err)
return
}
// As the fundingLocked message is now sent to the peer, the channel is
// moved to the next state of the state machine. It will be moved to the
// last state (actually deleted from the database) after the channel is
// finally announced.
err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, fundingLockedSent,
shortChanID)
if err != nil {
fndgLog.Errorf("error setting channel state to "+
"fundingLockedSent: %v", err)
return
}
f.sendChannelAnnouncement(completeChan, channel, shortChanID)
}
// sendChannelAnnouncement broadcast the neccessary channel announcement
// messages to the network. Should be called after the fundingLocked message is
// sent (channelState is 'fundingLockedSent') and the channel is ready to be
// used.
func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenChannel,
channel *lnwallet.LightningChannel, shortChanID *lnwire.ShortChannelID) {
chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
fundingPoint := completeChan.FundingOutpoint
fndgLog.Infof("Announcing ChannelPoint(%v), short_chan_id=%v",
&fundingPoint, spew.Sdump(shortChanID))
// Register the new link with the L3 routing manager so this new
// channel can be utilized during path finding.
go f.announceChannel(f.cfg.IDKey, completeChan.IdentityPub,
err := f.announceChannel(f.cfg.IDKey, completeChan.IdentityPub,
channel.LocalFundingKey, channel.RemoteFundingKey,
shortChanID, chanID)
*shortChanID, chanID)
if err != nil {
fndgLog.Errorf("channel announcement failed: %v", err)
return
}
// After the channel is successully announced from the fundingManager,
// we delete the channel from our internal database. We can do this
// because we assume the AuthenticatedGossiper queues the announcement
// messages, and persists them in case of a daemon shutdown.
err = f.deleteChannelOpeningState(&completeChan.FundingOutpoint)
if err != nil {
fndgLog.Errorf("error deleting channel state: %v", err)
return
}
// Finally, as the local channel discovery has been fully processed,
// we'll trigger the signal indicating that it's safe for any funding
@ -1380,9 +1717,11 @@ func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.Pu
// the network to recognize the legitimacy of the channel. The crafted
// announcements are then sent to the channel router to handle broadcasting to
// the network during its next trickle.
// This method is synchronous and will return when all the network requests
// finish, either successfully or with an error.
func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKey,
remoteFundingKey *btcec.PublicKey, shortChanID lnwire.ShortChannelID,
chanID lnwire.ChannelID) {
chanID lnwire.ChannelID) error {
// First, we'll create the batch of announcements to be sent upon
// initial channel creation. This includes the channel announcement
@ -1392,7 +1731,7 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe
localFundingKey, remoteFundingKey, shortChanID, chanID)
if err != nil {
fndgLog.Errorf("can't generate channel announcement: %v", err)
return
return err
}
// With the announcements crafted, we'll now send the announcements to
@ -1400,9 +1739,21 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe
//
// TODO(roasbeef): add flag that indicates if should be announced or
// not
f.cfg.SendAnnouncement(ann.chanAnn)
f.cfg.SendAnnouncement(ann.chanUpdateAnn)
f.cfg.SendAnnouncement(ann.chanProof)
// The announcement message consists of three distinct messages:
// 1. channel announcement 2. channel update 3. channel proof
// We must wait for them all to be successfully announced to the
// network, and/ if either fails we consider the announcement
// unsuccessful.
if err = f.cfg.SendAnnouncement(ann.chanAnn); err != nil {
return err
}
if err = f.cfg.SendAnnouncement(ann.chanUpdateAnn); err != nil {
return err
}
if err = f.cfg.SendAnnouncement(ann.chanProof); err != nil {
return err
}
// Now that the channel is announced to the network, we will also
// obtain and send a node announcement. This is done since a node
@ -1411,9 +1762,13 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe
nodeAnn, err := f.cfg.CurrentNodeAnnouncement()
if err != nil {
fndgLog.Errorf("can't generate node announcement: %v", err)
return
return err
}
f.cfg.SendAnnouncement(&nodeAnn)
if err = f.cfg.SendAnnouncement(&nodeAnn); err != nil {
return err
}
return nil
}
// initFundingWorkflow sends a message to the funding manager instructing it
@ -1657,3 +2012,90 @@ func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey {
Y: pub.Y,
}
}
// saveChannelOpeningState saves the channelOpeningState for the provided
// chanPoint to the channelOpeningStateBucket.
func (f *fundingManager) saveChannelOpeningState(chanPoint *wire.OutPoint,
state channelOpeningState, shortChanID *lnwire.ShortChannelID) error {
return f.cfg.Wallet.Cfg.Database.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(channelOpeningStateBucket)
if err != nil {
return err
}
var outpointBytes bytes.Buffer
if err = writeOutpoint(&outpointBytes, chanPoint); err != nil {
return err
}
// Save state and the uint64 representation of the shortChanID
// for later use.
scratch := make([]byte, 10)
byteOrder.PutUint16(scratch[:2], uint16(state))
byteOrder.PutUint64(scratch[2:], shortChanID.ToUint64())
if err = bucket.Put(outpointBytes.Bytes(), scratch); err != nil {
return err
}
return nil
})
}
// getChannelOpeningState fetches the channelOpeningState for the provided
// chanPoint from the database, or returns ErrChannelNotFound if the channel
// is not found.
func (f *fundingManager) getChannelOpeningState(chanPoint *wire.OutPoint) (
channelOpeningState, *lnwire.ShortChannelID, error) {
var state channelOpeningState
var shortChanID lnwire.ShortChannelID
err := f.cfg.Wallet.Cfg.Database.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(channelOpeningStateBucket)
if bucket == nil {
// If the bucket does not exist, it means we never added
// a channel to the db, so return ErrChannelNotFound.
return ErrChannelNotFound
}
var outpointBytes bytes.Buffer
if err := writeOutpoint(&outpointBytes, chanPoint); err != nil {
return err
}
value := bucket.Get(outpointBytes.Bytes())
if value == nil {
return ErrChannelNotFound
}
state = channelOpeningState(byteOrder.Uint16(value[:2]))
shortChanID = lnwire.NewShortChanIDFromInt(byteOrder.Uint64(value[2:]))
return nil
})
if err != nil {
return 0, nil, err
}
return state, &shortChanID, nil
}
// deleteChannelOpeningState removes any state for chanPoint from the database.
func (f *fundingManager) deleteChannelOpeningState(chanPoint *wire.OutPoint) error {
return f.cfg.Wallet.Cfg.Database.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(channelOpeningStateBucket)
if bucket == nil {
return fmt.Errorf("Bucket not found")
}
var outpointBytes bytes.Buffer
if err := writeOutpoint(&outpointBytes, chanPoint); err != nil {
return err
}
if err := bucket.Delete(outpointBytes.Bytes()); err != nil {
return err
}
return nil
})
}

1155
fundingmanager_test.go Normal file

File diff suppressed because it is too large Load Diff

4
lnd.go
View File

@ -140,9 +140,9 @@ func lndMain() error {
return server.genNodeAnnouncement(true)
},
SendAnnouncement: func(msg lnwire.Message) error {
server.discoverSrv.ProcessLocalAnnouncement(msg,
errChan := server.discoverSrv.ProcessLocalAnnouncement(msg,
idPrivKey.PubKey())
return nil
return <-errChan
},
ArbiterChan: server.breachArbiter.newContracts,
SendToPeer: server.sendToPeer,