funding: adding to graph, private chans, chanAnn after 6 confs

This commit introduces some new interdependent functionality. As
soon as the fundingLocked message is sent, the channel is
immediately added to the ChannelRouter's internal topology.

Finally, channels are now only broadcasted to the greater
network after six confirmations on the funding transaction
has been reached.
This commit is contained in:
nsa 2017-11-13 15:55:50 -08:00 committed by Olaoluwa Osuntokun
parent dab7f31e44
commit c954ca1f21
1 changed files with 359 additions and 62 deletions

View File

@ -182,9 +182,10 @@ type fundingConfig struct {
// announcement from the backing Lighting Network node.
CurrentNodeAnnouncement func() (lnwire.NodeAnnouncement, error)
// SendAnnouncement is used by the FundingManager to announce newly
// created channels to the rest of the Lightning Network.
SendAnnouncement func(msg lnwire.Message) error
// SendLocalAnnouncement is used by the FundingManager to send
// announcement messages to the Gossiper to possibly broadcast
// to the greater network.
SendLocalAnnouncement func(msg lnwire.Message) error
// SendToPeer allows the FundingManager to send messages to the peer
// node during the multiple steps involved in the creation of the
@ -260,6 +261,10 @@ type fundingManager struct {
// funding workflows.
activeReservations map[serializedPubKey]pendingChannels
// pendingChanAnnPrefs is a map which stores a pending channel's id
// along with its announcement preference.
pendingChanAnnPrefs map[[32]byte]bool
// signedReservations is a utility map that maps the permanent channel
// ID of a funding reservation to its temporary channel ID. This is
// required as mid funding flow, we switch to referencing the channel
@ -315,6 +320,12 @@ const (
// fundingLocked message has successfully been sent to the other peer,
// but we still haven't announced the channel to the network.
fundingLockedSent
// addedToRouterGraph is the opening state of a channel if the
// channel has been successfully added to the router graph
// immediately after the fundingLocked message has been sent, but
// we still haven't announced the channel to the network.
addedToRouterGraph
)
var (
@ -323,6 +334,12 @@ var (
// of being opened.
channelOpeningStateBucket = []byte("channelOpeningState")
// openChanAnnPrefBucket is the database bucket used to store each
// channel's announcement preference signalled by the LSB of
// channel_flags of the open_channel message in the funding workflow.
// It only stores open channels' announcement preferences.
openChanAnnPrefBucket = []byte("open_chan_ann")
// ErrChannelNotFound is returned when we are looking for a specific
// channel opening state in the FundingManager's internal database, but
// the channel in question is not considered being in an opening state.
@ -342,6 +359,7 @@ func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
fundingRequests: make(chan *initFundingMsg, msgBufferSize),
localDiscoverySignals: make(map[lnwire.ChannelID]chan struct{}),
handleFundingLockedBarriers: make(map[lnwire.ChannelID]struct{}),
pendingChanAnnPrefs: make(map[[32]byte]bool),
queries: make(chan interface{}, 1),
quit: make(chan struct{}),
}, nil
@ -457,11 +475,6 @@ func (f *fundingManager) Start() error {
f.newChanBarriers[chanID] = make(chan struct{})
f.barrierMtx.Unlock()
// Set up a localDiscoverySignals to make sure we finish sending
// our own fundingLocked and channel announcements before
// processing a received fundingLocked.
f.localDiscoverySignals[chanID] = make(chan struct{})
// If we did find the channel in the opening state database, we
// have seen the funding transaction being confirmed, but we
// did not finish the rest of the setup procedure before we shut
@ -486,17 +499,92 @@ func (f *fundingManager) Start() error {
case fundingLockedSent:
// fundingLocked was sent to peer, but the channel
// was not added to the router graph and the channel
// announcement was not sent.
f.wg.Add(1)
go func(dbChan *channeldb.OpenChannel) {
defer f.wg.Done()
err = f.sendChannelAnnouncement(dbChan, shortChanID)
// Retrieve channel announcement preference
private, err := f.getOpenChanAnnPref(
&dbChan.FundingOutpoint)
if err != nil {
fndgLog.Errorf("error sending channel "+
"announcement: %v", err)
fndgLog.Errorf("unable to retrieve channel "+
"announcement preference: %v", err)
return
}
lnChannel, err := lnwallet.NewLightningChannel(
nil, nil, f.cfg.FeeEstimator, dbChan)
if err != nil {
fndgLog.Errorf("error creating "+
"lightning channel: %v", err)
return
}
defer lnChannel.Stop()
err = f.addToRouterGraph(dbChan, lnChannel,
shortChanID, private)
if err != nil {
fndgLog.Errorf("failed adding to "+
"router graph: %v", err)
return
}
if !private {
err = f.annAfterSixConfs(dbChan,
lnChannel, shortChanID)
if err != nil {
fndgLog.Errorf("error sending channel "+
"announcement: %v", err)
return
}
}
}(channel)
case addedToRouterGraph:
// The channel was added to the Router's topology, but
// the channel announcement was not sent.
f.wg.Add(1)
go func(dbChan *channeldb.OpenChannel) {
defer f.wg.Done()
// Retrieve channel announcement preference
private, err := f.getOpenChanAnnPref(
&dbChan.FundingOutpoint)
if err != nil {
fndgLog.Errorf("unable to retrieve channel "+
"announcement preference: %v", err)
return
}
lnChannel, err := lnwallet.NewLightningChannel(
nil, nil, f.cfg.FeeEstimator, dbChan)
if err != nil {
fndgLog.Errorf("error creating "+
"lightning channel: %v", err)
return
}
defer lnChannel.Stop()
if private {
// We delete the channel from our internal
// database.
err := f.deleteChannelOpeningState(
&channel.FundingOutpoint)
if err != nil {
fndgLog.Errorf("error deleting "+
"channel state: %v", err)
return
}
} else {
err = f.annAfterSixConfs(channel,
lnChannel, shortChanID)
if err != nil {
fndgLog.Errorf("error sending channel "+
"announcement: %v", err)
return
}
}
}(channel)
default:
@ -641,6 +729,7 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey,
return
}
delete(f.pendingChanAnnPrefs, tempChanID)
f.cancelReservationCtx(peer, tempChanID)
return
}
@ -737,7 +826,6 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// number and send ErrorGeneric to remote peer if condition is
// violated.
peerIDKey := newSerializedKey(fmsg.peerAddress.IdentityKey)
msg := fmsg.msg
amt := msg.FundingAmount
@ -846,6 +934,16 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
}
f.resMtx.Unlock()
// Save the announcement preference of this pending channel
if msg.ChannelFlags&1 == 0 {
// This channel WILL be announced to the greater network later.
f.pendingChanAnnPrefs[msg.PendingChannelID] = false
} else {
// This channel WILL NOT be announced to the greater network
// later.
f.pendingChanAnnPrefs[msg.PendingChannelID] = true
}
// Using the RequiredRemoteDelay closure, we'll compute the remote CSV
// delay we require given the total amount of funds within the channel.
remoteCsvDelay := f.cfg.RequiredRemoteDelay(amt)
@ -1080,6 +1178,12 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
peerKey, pendingChanID[:])
return
}
private, ok := f.pendingChanAnnPrefs[pendingChanID]
if !ok {
fndgLog.Warnf("can't find channel announcement preference for"+
"chanID:%x", pendingChanID[:])
return
}
// The channel initiator has responded with the funding outpoint of the
// final funding transaction, as well as a signature for our version of
@ -1106,10 +1210,32 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
return
}
// A new channel has almost finished the funding process. In order to
// properly synchronize with the writeHandler goroutine, we add a new
// channel to the barriers map which will be closed once the channel is
// fully open.
f.barrierMtx.Lock()
channelID := lnwire.NewChanIDFromOutPoint(&fundingOut)
fndgLog.Debugf("Creating chan barrier for ChanID(%v)", channelID)
f.newChanBarriers[channelID] = make(chan struct{})
f.barrierMtx.Unlock()
// Store channel announcement preference in boltdb
if err = f.saveOpenChanAnnPref(&fundingOut, private); err != nil {
fndgLog.Errorf("unable to save channel announcement preference "+
"to db for chanID:%x", channelID)
}
// If something goes wrong before the funding transaction is confirmed,
// we use this convenience method to delete the pending OpenChannel
// from the database.
deleteFromDatabase := func() {
err := f.deleteOpenChanAnnPref(&completeChan.FundingOutpoint)
if err != nil {
fndgLog.Errorf("Failed to delete channel announcement "+
"preference: %v", err)
}
closeInfo := &channeldb.ChannelCloseSummary{
ChanPoint: completeChan.FundingOutpoint,
ChainHash: completeChan.ChainHash,
@ -1184,7 +1310,9 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
// completely forget about this channel if we haven't seen the funding
// transaction in 288 blocks (~ 48 hrs), by canceling the reservation
// and canceling the wait for the funding confirmation.
f.wg.Add(1)
go func() {
defer f.wg.Done()
confChan := make(chan *lnwire.ShortChannelID)
timeoutChan := make(chan struct{})
go f.waitForFundingWithTimeout(completeChan, confChan,
@ -1263,6 +1391,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
pendingChanID, []byte(err.Error()))
return
}
private, ok := f.pendingChanAnnPrefs[pendingChanID]
if !ok {
fndgLog.Warnf("can't find channel announcement preference for"+
"chanID:%x", pendingChanID[:])
return
}
// Create an entry in the local discovery map so we can ensure that we
// process the channel confirmation fully before we receive a funding
@ -1273,6 +1407,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
f.localDiscoverySignals[permChanID] = make(chan struct{})
f.localDiscoveryMtx.Unlock()
// Store channel announcement preference in boltdb
if err = f.saveOpenChanAnnPref(fundingPoint, private); err != nil {
fndgLog.Errorf("unable to save channel announcement preference "+
"to db for chanID:%x", permChanID)
}
// The remote peer has responded with a signature for our commitment
// transaction. We'll verify the signature for validity, then commit
// the state to disk as we can now open the channel.
@ -1531,15 +1671,31 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open
case <-f.quit:
return
}
// This closes the discoverySignal channel, indicating to a separate
// goroutine that it is acceptable to process funding locked messages
// from the peer.
f.localDiscoveryMtx.Lock()
if discoverySignal, ok := f.localDiscoverySignals[chanID]; ok {
close(discoverySignal)
}
f.localDiscoveryMtx.Unlock()
}
// handleFundingConfirmation is a wrapper method for creating a new
// lnwallet.LightningChannel object, calling sendFundingLocked, and for calling
// sendChannelAnnouncement. This is called after the funding transaction is
// lnwallet.LightningChannel object, calling sendFundingLocked, addToRouterGraph,
// and annAfterSixConfs. This is called after the funding transaction is
// confirmed.
func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenChannel,
shortChanID *lnwire.ShortChannelID) error {
// Retrieve channel announcement preference
private, err := f.getOpenChanAnnPref(&completeChan.FundingOutpoint)
if err != nil {
return fmt.Errorf("unable to retrieve channel announcement "+
"preference: %v", err)
}
// We create the state-machine object which wraps the database state.
lnChannel, err := lnwallet.NewLightningChannel(nil, nil, f.cfg.FeeEstimator,
completeChan)
@ -1559,10 +1715,23 @@ func (f *fundingManager) handleFundingConfirmation(completeChan *channeldb.OpenC
if err != nil {
return fmt.Errorf("failed sending fundingLocked: %v", err)
}
err = f.sendChannelAnnouncement(completeChan, shortChanID)
err = f.addToRouterGraph(completeChan, lnChannel, shortChanID, private)
if err != nil {
return fmt.Errorf("failed sending channel announcement: %v",
err)
return fmt.Errorf("failed adding to router graph: %v", err)
}
if private {
// We delete the channel from our internal database.
err := f.deleteChannelOpeningState(&completeChan.FundingOutpoint)
if err != nil {
return fmt.Errorf("error deleting channel state: %v", err)
}
} else {
err = f.annAfterSixConfs(completeChan, lnChannel, shortChanID)
if err != nil {
return fmt.Errorf("failed sending channel announcement: %v",
err)
}
}
return nil
@ -1644,17 +1813,82 @@ func (f *fundingManager) sendFundingLocked(completeChan *channeldb.OpenChannel,
return nil
}
// sendChannelAnnouncement broadcast the necessary channel announcement
// messages to the network. Should be called after the fundingLocked message
// is sent (channelState is 'fundingLockedSent') and the channel is ready to
// be used.
func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenChannel,
shortChanID *lnwire.ShortChannelID) error {
// TODO(eugene) wait for 6 confirmations here
// addToRouterGraph sends a private ChannelAnnouncement and a private
// ChannelUpdate to the gossiper so that it is added to the Router's internal
// graph before the announcement_signatures is sent in
// annAfterSixConfs. These private announcement messages are NOT
// broadcasted to the greater network.
func (f *fundingManager) addToRouterGraph(completeChan *channeldb.OpenChannel,
shortChanID *lnwire.ShortChannelID, private bool) error {
chanID := lnwire.NewChanIDFromOutPoint(&completeChan.FundingOutpoint)
ann, err := f.newChanAnnouncement(f.cfg.IDKey, completeChan.IdentityPub,
completeChan.LocalChanCfg.MultiSigKey,
completeChan.RemoteChanCfg.MultiSigKey, *shortChanID, chanID)
if err != nil {
return fmt.Errorf("error generating channel "+
"announcement: %v", err)
}
// Send ChannelAnnouncement and ChannelUpdate to the gossiper to add
// to the Router's topology.
if err = f.cfg.SendLocalAnnouncement(ann.chanAnn); err != nil {
return fmt.Errorf("error sending private channel "+
"announcement: %v", err)
}
if err = f.cfg.SendLocalAnnouncement(ann.chanUpdateAnn); err != nil {
return fmt.Errorf("error sending private channel "+
"update: %v", err)
}
// As the channel is now added to the ChannelRouter's topology, the
// channel is moved to the next state of the state machine. It will be
// moved to the last state (actually deleted from the database) after
// the channel is finally announced.
err = f.saveChannelOpeningState(&completeChan.FundingOutpoint, addedToRouterGraph,
shortChanID)
if err != nil {
return fmt.Errorf("error setting channel state to"+
" addedToRouterGraph: %v", err)
}
return nil
}
// annAfterSixConfs broadcasts the necessary channel announcement messages to
// the network after 6 confs. Should be called after the fundingLocked message
// is sent and the channel is added to the router graph (channelState is
// 'addedToRouterGraph') and the channel is ready to be used.
func (f *fundingManager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
shortChanID *lnwire.ShortChannelID) error {
// Register with the ChainNotifier for a notification once the
// funding transaction reaches 6 confirmations.
txid := completeChan.FundingOutpoint.Hash
confNtfn, err := f.cfg.Notifier.RegisterConfirmationsNtfn(&txid, 6,
completeChan.FundingBroadcastHeight)
if err != nil {
return fmt.Errorf("Unable to register for confirmation of "+
"ChannelPoint(%v): %v", completeChan.FundingOutpoint, err)
}
// Wait until 6 confirmations has been reached or the wallet signals
// a shutdown.
select {
case _, ok := <-confNtfn.Confirmed:
if !ok {
return fmt.Errorf("ChainNotifier shutting down, cannot "+
"complete funding flow for ChannelPoint(%v)",
completeChan.FundingOutpoint)
}
case <-f.quit:
return fmt.Errorf("fundingManager shutting down, stopping funding "+
"flow for ChannelPoint(%v)", completeChan.FundingOutpoint)
}
fundingPoint := completeChan.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(&fundingPoint)
fndgLog.Infof("Announcing ChannelPoint(%v), short_chan_id=%v",
&fundingPoint, spew.Sdump(shortChanID))
@ -1679,20 +1913,11 @@ func (f *fundingManager) sendChannelAnnouncement(completeChan *channeldb.OpenCha
// we delete the channel from our internal database. We can do this
// because we assume the AuthenticatedGossiper queues the announcement
// messages, and persists them in case of a daemon shutdown.
err = f.deleteChannelOpeningState(&completeChan.FundingOutpoint)
err = f.deleteChannelOpeningState(&fundingPoint)
if err != nil {
return fmt.Errorf("error deleting channel state: %v", err)
}
// Finally, as the local channel discovery has been fully processed,
// we'll trigger the signal indicating that it's safe for any funding
// locked messages related to this channel to be processed.
f.localDiscoveryMtx.Lock()
if discoverySignal, ok := f.localDiscoverySignals[chanID]; ok {
close(discoverySignal)
}
f.localDiscoveryMtx.Unlock()
return nil
}
@ -2010,25 +2235,10 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKe
return err
}
// With the announcements crafted, we'll now send the announcements to
// the rest of the network.
//
// TODO(roasbeef): add flag that indicates if should be announced or
// not
// The announcement message consists of three distinct messages:
// 1. channel announcement 2. channel update 3. channel proof
// We must wait for them all to be successfully announced to the
// network, and/ if either fails we consider the announcement
// unsuccessful.
if err = f.cfg.SendAnnouncement(ann.chanAnn); err != nil {
fndgLog.Errorf("Unable to send channel announcement: %v", err)
return err
}
if err = f.cfg.SendAnnouncement(ann.chanUpdateAnn); err != nil {
fndgLog.Errorf("Unable to send channel update: %v", err)
return err
}
// We only send the channel proof announcement and the node announcement
// because addToRouterGraph previously send the ChannelAnnouncement and
// the ChannelUpdate announcement messages. The channel proof and node
// announcements are broadcast to the greater network.
if err = f.cfg.SendAnnouncement(ann.chanProof); err != nil {
fndgLog.Errorf("Unable to send channel proof: %v", err)
return err
@ -2148,6 +2358,18 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
fndgLog.Infof("Starting funding workflow with %v for pendingID(%x)",
msg.peerAddress.Address, chanID)
// Save the announcement preference of this pending channel
var channelFlags byte
if msg.openChanReq.private {
// This channel will be private
channelFlags = 1
f.pendingChanAnnPrefs[chanID] = true
} else {
// This channel will be publicly announced to the greater network.
channelFlags = 0
f.pendingChanAnnPrefs[chanID] = false
}
fundingOpen := lnwire.OpenChannel{
ChainHash: *f.cfg.Wallet.Cfg.NetParams.GenesisHash,
PendingChannelID: chanID,
@ -2248,6 +2470,7 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
)
}
delete(f.pendingChanAnnPrefs, chanID)
if _, err := f.cancelReservationCtx(peerKey, chanID); err != nil {
fndgLog.Warnf("unable to delete reservation: %v", err)
return
@ -2316,6 +2539,86 @@ func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey {
}
}
// saveOpenChanAnnPref saves an open channel's announcement preference.
func (f *fundingManager) saveOpenChanAnnPref(chanPoint *wire.OutPoint,
pref bool) error {
return f.cfg.Wallet.Cfg.Database.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(openChanAnnPrefBucket)
if err != nil {
return err
}
var outpointBytes bytes.Buffer
if err = writeOutpoint(&outpointBytes, chanPoint); err != nil {
return err
}
scratch := make([]byte, 0)
var b byte
if pref {
b = 1
} else {
b = 0
}
scratch = append(scratch[:], b)
return bucket.Put(outpointBytes.Bytes(), scratch[:])
})
}
// getOpenChanAnnPref retrives an open channel's announcement preference.
func (f *fundingManager) getOpenChanAnnPref(chanPoint *wire.OutPoint) (bool, error) {
var pref bool
err := f.cfg.Wallet.Cfg.Database.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(openChanAnnPrefBucket)
if bucket == nil {
return fmt.Errorf("Channel announcement preference " +
"not found")
}
var outpointBytes bytes.Buffer
if err := writeOutpoint(&outpointBytes, chanPoint); err != nil {
return err
}
value := bucket.Get(outpointBytes.Bytes())
if value == nil {
return fmt.Errorf("Channel announcement preference " +
"not found")
}
if value[0] == 1 {
pref = true
}
return nil
})
if err != nil {
return false, err
}
return pref, nil
}
// deleteOpenChanAnnPref deletes an open channel's announcement preference.
func (f *fundingManager) deleteOpenChanAnnPref(chanPoint *wire.OutPoint) error {
return f.cfg.Wallet.Cfg.Database.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(openChanAnnPrefBucket)
if bucket == nil {
return fmt.Errorf("Bucket not found")
}
var outpointBytes bytes.Buffer
if err := writeOutpoint(&outpointBytes, chanPoint); err != nil {
return err
}
return bucket.Delete(outpointBytes.Bytes())
})
}
// saveChannelOpeningState saves the channelOpeningState for the provided
// chanPoint to the channelOpeningStateBucket.
func (f *fundingManager) saveChannelOpeningState(chanPoint *wire.OutPoint,
@ -2338,10 +2641,7 @@ func (f *fundingManager) saveChannelOpeningState(chanPoint *wire.OutPoint,
byteOrder.PutUint16(scratch[:2], uint16(state))
byteOrder.PutUint64(scratch[2:], shortChanID.ToUint64())
if err = bucket.Put(outpointBytes.Bytes(), scratch); err != nil {
return err
}
return nil
return bucket.Put(outpointBytes.Bytes(), scratch)
})
}
@ -2396,9 +2696,6 @@ func (f *fundingManager) deleteChannelOpeningState(chanPoint *wire.OutPoint) err
return err
}
if err := bucket.Delete(outpointBytes.Bytes()); err != nil {
return err
}
return nil
return bucket.Delete(outpointBytes.Bytes())
})
}