diff --git a/peer.go b/peer.go index 5305ff03..2057b3cc 100644 --- a/peer.go +++ b/peer.go @@ -473,41 +473,39 @@ func (p *peer) readNextMessage() (lnwire.Message, error) { return nextMsg, nil } -// chanMsgStream implements a goroutine-safe, in-order stream of messages to be -// delivered to an active channel. These messages MUST be in order due to the -// nature of the lightning channel commitment state machine. We utilize -// additional synchronization with the fundingManager to ensure we don't -// attempt to dispatch a message to a channel before it is fully active. -type chanMsgStream struct { - fundingMgr *fundingManager - htlcSwitch *htlcswitch.Switch - - cid lnwire.ChannelID - +// msgStream implements a goroutine-safe, in-order stream of messages to be +// delivered via closure to a receiver. These messages MUST be in order due to +// the nature of the lightning channel commitment and gossiper state machines. +// TODO(conner): use stream handler interface to abstract out stream +// state/logging +type msgStream struct { peer *peer + apply func(lnwire.Message) + + startMsg string + stopMsg string + msgCond *sync.Cond msgs []lnwire.Message - chanLink htlcswitch.ChannelLink - mtx sync.Mutex wg sync.WaitGroup quit chan struct{} } -// newChanMsgStream creates a new instance of a chanMsgStream for a particular +// newMsgStream creates a new instance of a chanMsgStream for a particular // channel identified by its channel ID. -func newChanMsgStream(f *fundingManager, h *htlcswitch.Switch, p *peer, - c lnwire.ChannelID) *chanMsgStream { +func newMsgStream(p *peer, startMsg, stopMsg string, + apply func(lnwire.Message)) *msgStream { - stream := &chanMsgStream{ - fundingMgr: f, - htlcSwitch: h, - peer: p, - cid: c, - quit: make(chan struct{}), + stream := &msgStream{ + peer: p, + apply: apply, + startMsg: startMsg, + stopMsg: stopMsg, + quit: make(chan struct{}), } stream.msgCond = sync.NewCond(&stream.mtx) @@ -515,45 +513,44 @@ func newChanMsgStream(f *fundingManager, h *htlcswitch.Switch, p *peer, } // Start starts the chanMsgStream. -func (c *chanMsgStream) Start() { - c.wg.Add(1) - go c.msgConsumer() +func (ms *msgStream) Start() { + ms.wg.Add(1) + go ms.msgConsumer() } // Stop stops the chanMsgStream. -func (c *chanMsgStream) Stop() { +func (ms *msgStream) Stop() { // TODO(roasbeef): signal too? - close(c.quit) + close(ms.quit) // Wake up the msgConsumer is we've been signalled to exit. - c.msgCond.Signal() + ms.msgCond.Signal() - c.wg.Wait() + ms.wg.Wait() } // msgConsumer is the main goroutine that streams messages from the peer's // readHandler directly to the target channel. -func (c *chanMsgStream) msgConsumer() { - defer c.wg.Done() +func (ms *msgStream) msgConsumer() { + defer ms.wg.Done() + defer peerLog.Tracef(ms.stopMsg) - peerLog.Tracef("Update stream for ChannelID(%x) created", c.cid[:]) + peerLog.Tracef(ms.startMsg) for { // First, we'll check our condition. If the queue of messages // is empty, then we'll wait until a new item is added. - c.msgCond.L.Lock() - for len(c.msgs) == 0 { - c.msgCond.Wait() + ms.msgCond.L.Lock() + for len(ms.msgs) == 0 { + ms.msgCond.Wait() // If we were woke up in order to exit, then we'll do // so. Otherwise, we'll check the message queue for any // new items. select { - case <-c.quit: - peerLog.Tracef("Update stream for "+ - "ChannelID(%x) exiting", c.cid[:]) - c.msgCond.L.Unlock() + case <-ms.quit: + ms.msgCond.L.Unlock() return default: } @@ -562,48 +559,81 @@ func (c *chanMsgStream) msgConsumer() { // Grab the message off the front of the queue, shifting the // slice's reference down one in order to remove the message // from the queue. - msg := c.msgs[0] - c.msgs[0] = nil // Set to nil to prevent GC leak. - c.msgs = c.msgs[1:] + msg := ms.msgs[0] + ms.msgs[0] = nil // Set to nil to prevent GC leak. + ms.msgs = ms.msgs[1:] - c.msgCond.L.Unlock() + ms.msgCond.L.Unlock() - // We'll send a message to the funding manager and wait iff an - // active funding process for this channel hasn't yet - // completed. We do this in order to account for the following - // scenario: we send the funding locked message to the other - // side, they immediately send a channel update message, but we - // haven't yet sent the channel to the channelManager. - c.fundingMgr.waitUntilChannelOpen(c.cid) - - // Dispatch the commitment update message to the proper active - // goroutine dedicated to this channel. - if c.chanLink == nil { - link, err := c.htlcSwitch.GetLink(c.cid) - if err != nil { - peerLog.Errorf("recv'd update for unknown "+ - "channel %v from %v", c.cid, c.peer) - continue - } - c.chanLink = link - } - - c.chanLink.HandleChannelUpdate(msg) + ms.apply(msg) } } -// AddMsg adds a new message to the chanMsgStream. This function is safe for +// AddMsg adds a new message to the msgStream. This function is safe for // concurrent access. -func (c *chanMsgStream) AddMsg(msg lnwire.Message) { +func (ms *msgStream) AddMsg(msg lnwire.Message) { // First, we'll lock the condition, and add the message to the end of // the message queue. - c.msgCond.L.Lock() - c.msgs = append(c.msgs, msg) - c.msgCond.L.Unlock() + ms.msgCond.L.Lock() + ms.msgs = append(ms.msgs, msg) + ms.msgCond.L.Unlock() // With the message added, we signal to the msgConsumer that there are // additional messages to consume. - c.msgCond.Signal() + ms.msgCond.Signal() +} + +// newChanMsgStream is used to create a msgStream between the peer and +// particular channel link in the htlcswitch. We utilize additional +// synchronization with the fundingManager to ensure we don't attempt to +// dispatch a message to a channel before it is fully active. A reference to the +// channel this stream forwards to his held in scope to prevent unnecessary +// lookups. +func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { + + var chanLink htlcswitch.ChannelLink + + return newMsgStream(p, + fmt.Sprintf("Update stream for ChannelID(%x) created", cid), + fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid), + func(msg lnwire.Message) { + // We'll send a message to the funding manager and wait iff an + // active funding process for this channel hasn't yet completed. + // We do this in order to account for the following scenario: we + // send the funding locked message to the other side, they + // immediately send a channel update message, but we haven't yet + // sent the channel to the channelManager. + p.server.fundingMgr.waitUntilChannelOpen(cid) + + // Dispatch the commitment update message to the proper active + // goroutine dedicated to this channel. + if chanLink == nil { + link, err := p.server.htlcSwitch.GetLink(cid) + if err != nil { + peerLog.Errorf("recv'd update for unknown "+ + "channel %v from %v", cid, p) + return + } + chanLink = link + } + + chanLink.HandleChannelUpdate(msg) + }, + ) +} + +// newDiscMsgStream is used to setup a msgStream between the peer and the +// authenticated gossiper. This stream should be used to forward all remote +// channel announcements. +func newDiscMsgStream(p *peer) *msgStream { + return newMsgStream(p, + "Update stream for gossiper created", + "Update stream for gossiper exited", + func(msg lnwire.Message) { + p.server.authGossiper.ProcessRemoteAnnouncement(msg, + p.addr.IdentityKey) + }, + ) } // readHandler is responsible for reading messages off the wire in series, then @@ -620,7 +650,11 @@ func (p *peer) readHandler() { p.Disconnect(err) }) - chanMsgStreams := make(map[lnwire.ChannelID]*chanMsgStream) + discStream := newDiscMsgStream(p) + discStream.Start() + defer discStream.Stop() + + chanMsgStreams := make(map[lnwire.ChannelID]*msgStream) out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, err := p.readNextMessage() @@ -717,8 +751,8 @@ out: *lnwire.NodeAnnouncement, *lnwire.AnnounceSignatures: - p.server.authGossiper.ProcessRemoteAnnouncement(msg, - p.addr.IdentityKey) + discStream.AddMsg(msg) + default: peerLog.Errorf("unknown message %v received from peer "+ "%v", uint16(msg.MsgType()), p) @@ -732,8 +766,7 @@ out: // If a stream hasn't yet been created, then // we'll do so, add it to the map, and finally // start it. - chanStream = newChanMsgStream(p.server.fundingMgr, - p.server.htlcSwitch, p, targetChan) + chanStream = newChanMsgStream(p, targetChan) chanMsgStreams[targetChan] = chanStream chanStream.Start() }