diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 6338b366..2c8bdeb4 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -139,6 +139,13 @@ type AuthenticatedGossiper struct { // TODO(roasbeef): limit premature networkMsgs to N prematureAnnouncements map[uint32][]*networkMsg + // prematureChannelUpdates is a map of ChannelUpdates we have + // received that wasn't associated with any channel we know about. + // We store them temporarily, such that we can reprocess them when + // a ChannelAnnouncement for the channel is received. + prematureChannelUpdates map[uint64][]*networkMsg + pChanUpdMtx sync.Mutex + // waitingProofs is a persistent storage of partial channel proof // announcement messages. We use it to buffer half of the material // needed to reconstruct a full authenticated channel announcement. Once @@ -174,13 +181,14 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { } return &AuthenticatedGossiper{ - selfKey: selfKey, - cfg: &cfg, - networkMsgs: make(chan *networkMsg), - quit: make(chan struct{}), - feeUpdates: make(chan *feeUpdateRequest), - prematureAnnouncements: make(map[uint32][]*networkMsg), - waitingProofs: storage, + selfKey: selfKey, + cfg: &cfg, + networkMsgs: make(chan *networkMsg), + quit: make(chan struct{}), + feeUpdates: make(chan *feeUpdateRequest), + prematureAnnouncements: make(map[uint32][]*networkMsg), + prematureChannelUpdates: make(map[uint64][]*networkMsg), + waitingProofs: storage, }, nil } @@ -1013,6 +1021,60 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return nil } + // If we earlier received any ChannelUpdates for this channel, + // we can now process them, as the channel is added to the + // graph. + shortChanID := msg.ShortChannelID.ToUint64() + var channelUpdates []*networkMsg + + d.pChanUpdMtx.Lock() + for _, cu := range d.prematureChannelUpdates[shortChanID] { + channelUpdates = append(channelUpdates, cu) + } + + // Now delete the premature ChannelUpdates, since we added them + // all to the queue of network messages. + + delete(d.prematureChannelUpdates, shortChanID) + d.pChanUpdMtx.Unlock() + + // Launch a new goroutine to handle each ChannelUpdate, + // this to ensure we don't block here, as we can handle + // only one announcement at a time. + for _, cu := range channelUpdates { + go func(nMsg *networkMsg) { + switch msg := nMsg.msg.(type) { + + case *lnwire.ChannelUpdate: + // We can safely wait for the error to + // be returned, as in case of shutdown, + // the gossiper will return an error. + var err error + if nMsg.isRemote { + err = <-d.ProcessRemoteAnnouncement( + msg, nMsg.peer) + } else { + err = <-d.ProcessLocalAnnouncement( + msg, nMsg.peer) + } + if err != nil { + log.Errorf("Failed reprocessing"+ + " ChannelUpdate for "+ + "shortChanID=%v: %v", + msg.ShortChannelID.ToUint64(), + err) + return + } + + // We don't expect any other message type + // than ChannelUpdate to be in this map. + default: + log.Errorf("Unsupported message type "+ + "found among ChannelUpdates: %T", msg) + } + }(cu) + } + // Channel announcement was successfully proceeded and know it // might be broadcast to other connected nodes if it was // announcement with proof (remote). @@ -1063,12 +1125,44 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l // verify message signature. chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) if err != nil { - err := errors.Errorf("unable to validate "+ - "channel update short_chan_id=%v: %v", - shortChanID, err) - log.Error(err) - nMsg.err <- err - return nil + switch err { + case channeldb.ErrGraphNotFound: + fallthrough + case channeldb.ErrGraphNoEdgesFound: + fallthrough + case channeldb.ErrEdgeNotFound: + // If the edge corresponding to this + // ChannelUpdate was not found in the graph, + // this might be a channel in the process of + // being opened, and we haven't processed our + // own ChannelAnnouncement yet, hence it is not + // found in the graph. This usually gets + // resolved after the channel proofs are + // exchanged and the channel is broadcasted to + // the rest of the network, but in case this + // is a private channel this won't ever happen. + // Because of this, we temporarily add it to a + // map, and reprocess it after our own + // ChannelAnnouncement has been processed. + d.pChanUpdMtx.Lock() + d.prematureChannelUpdates[shortChanID] = append( + d.prematureChannelUpdates[shortChanID], + nMsg) + d.pChanUpdMtx.Unlock() + log.Infof("Got ChannelUpdate for edge not "+ + "found in graph(shortChanID=%v), "+ + "saving for reprocessing later", + shortChanID) + nMsg.err <- nil + return nil + default: + err := errors.Errorf("unable to validate "+ + "channel update short_chan_id=%v: %v", + shortChanID, err) + log.Error(err) + nMsg.err <- err + return nil + } } // The least-significant bit in the flag on the channel update @@ -1117,11 +1211,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l return nil } - // If this is a local ChannelUpdate without an AuthProof, it means - // it is an update to a channel that is not (yet) supposed to be - // announced to the greater network. However, our channel counter - // party will need to be given the update, so we'll try sending - // the update directly to the remote peer. + // If this is a local ChannelUpdate without an AuthProof, it + // means it is an update to a channel that is not (yet) + // supposed to be announced to the greater network. However, + // our channel counter party will need to be given the update, + // so we'll try sending the update directly to the remote peer. if !nMsg.isRemote && chanInfo.AuthProof == nil { // Get our peer's public key. var remotePeer *btcec.PublicKey