Merge pull request #395 from cfromknecht/discovery-msg-stream

peer: add async queue for gossiper msgs
This commit is contained in:
Olaoluwa Osuntokun 2017-11-02 16:12:20 -07:00 committed by GitHub
commit 184b56f81e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 108 additions and 75 deletions

179
peer.go
View File

@ -473,40 +473,38 @@ func (p *peer) readNextMessage() (lnwire.Message, error) {
return nextMsg, nil return nextMsg, nil
} }
// chanMsgStream implements a goroutine-safe, in-order stream of messages to be // msgStream implements a goroutine-safe, in-order stream of messages to be
// delivered to an active channel. These messages MUST be in order due to the // delivered via closure to a receiver. These messages MUST be in order due to
// nature of the lightning channel commitment state machine. We utilize // the nature of the lightning channel commitment and gossiper state machines.
// additional synchronization with the fundingManager to ensure we don't // TODO(conner): use stream handler interface to abstract out stream
// attempt to dispatch a message to a channel before it is fully active. // state/logging
type chanMsgStream struct { type msgStream struct {
fundingMgr *fundingManager
htlcSwitch *htlcswitch.Switch
cid lnwire.ChannelID
peer *peer peer *peer
apply func(lnwire.Message)
startMsg string
stopMsg string
msgCond *sync.Cond msgCond *sync.Cond
msgs []lnwire.Message msgs []lnwire.Message
chanLink htlcswitch.ChannelLink
mtx sync.Mutex mtx sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
} }
// newChanMsgStream creates a new instance of a chanMsgStream for a particular // newMsgStream creates a new instance of a chanMsgStream for a particular
// channel identified by its channel ID. // channel identified by its channel ID.
func newChanMsgStream(f *fundingManager, h *htlcswitch.Switch, p *peer, func newMsgStream(p *peer, startMsg, stopMsg string,
c lnwire.ChannelID) *chanMsgStream { apply func(lnwire.Message)) *msgStream {
stream := &chanMsgStream{ stream := &msgStream{
fundingMgr: f,
htlcSwitch: h,
peer: p, peer: p,
cid: c, apply: apply,
startMsg: startMsg,
stopMsg: stopMsg,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
stream.msgCond = sync.NewCond(&stream.mtx) stream.msgCond = sync.NewCond(&stream.mtx)
@ -515,45 +513,44 @@ func newChanMsgStream(f *fundingManager, h *htlcswitch.Switch, p *peer,
} }
// Start starts the chanMsgStream. // Start starts the chanMsgStream.
func (c *chanMsgStream) Start() { func (ms *msgStream) Start() {
c.wg.Add(1) ms.wg.Add(1)
go c.msgConsumer() go ms.msgConsumer()
} }
// Stop stops the chanMsgStream. // Stop stops the chanMsgStream.
func (c *chanMsgStream) Stop() { func (ms *msgStream) Stop() {
// TODO(roasbeef): signal too? // TODO(roasbeef): signal too?
close(c.quit) close(ms.quit)
// Wake up the msgConsumer is we've been signalled to exit. // Wake up the msgConsumer is we've been signalled to exit.
c.msgCond.Signal() ms.msgCond.Signal()
c.wg.Wait() ms.wg.Wait()
} }
// msgConsumer is the main goroutine that streams messages from the peer's // msgConsumer is the main goroutine that streams messages from the peer's
// readHandler directly to the target channel. // readHandler directly to the target channel.
func (c *chanMsgStream) msgConsumer() { func (ms *msgStream) msgConsumer() {
defer c.wg.Done() defer ms.wg.Done()
defer peerLog.Tracef(ms.stopMsg)
peerLog.Tracef("Update stream for ChannelID(%x) created", c.cid[:]) peerLog.Tracef(ms.startMsg)
for { for {
// First, we'll check our condition. If the queue of messages // First, we'll check our condition. If the queue of messages
// is empty, then we'll wait until a new item is added. // is empty, then we'll wait until a new item is added.
c.msgCond.L.Lock() ms.msgCond.L.Lock()
for len(c.msgs) == 0 { for len(ms.msgs) == 0 {
c.msgCond.Wait() ms.msgCond.Wait()
// If we were woke up in order to exit, then we'll do // If we were woke up in order to exit, then we'll do
// so. Otherwise, we'll check the message queue for any // so. Otherwise, we'll check the message queue for any
// new items. // new items.
select { select {
case <-c.quit: case <-ms.quit:
peerLog.Tracef("Update stream for "+ ms.msgCond.L.Unlock()
"ChannelID(%x) exiting", c.cid[:])
c.msgCond.L.Unlock()
return return
default: default:
} }
@ -562,48 +559,81 @@ func (c *chanMsgStream) msgConsumer() {
// Grab the message off the front of the queue, shifting the // Grab the message off the front of the queue, shifting the
// slice's reference down one in order to remove the message // slice's reference down one in order to remove the message
// from the queue. // from the queue.
msg := c.msgs[0] msg := ms.msgs[0]
c.msgs[0] = nil // Set to nil to prevent GC leak. ms.msgs[0] = nil // Set to nil to prevent GC leak.
c.msgs = c.msgs[1:] ms.msgs = ms.msgs[1:]
c.msgCond.L.Unlock() ms.msgCond.L.Unlock()
// We'll send a message to the funding manager and wait iff an ms.apply(msg)
// active funding process for this channel hasn't yet
// completed. We do this in order to account for the following
// scenario: we send the funding locked message to the other
// side, they immediately send a channel update message, but we
// haven't yet sent the channel to the channelManager.
c.fundingMgr.waitUntilChannelOpen(c.cid)
// Dispatch the commitment update message to the proper active
// goroutine dedicated to this channel.
if c.chanLink == nil {
link, err := c.htlcSwitch.GetLink(c.cid)
if err != nil {
peerLog.Errorf("recv'd update for unknown "+
"channel %v from %v", c.cid, c.peer)
continue
}
c.chanLink = link
}
c.chanLink.HandleChannelUpdate(msg)
} }
} }
// AddMsg adds a new message to the chanMsgStream. This function is safe for // AddMsg adds a new message to the msgStream. This function is safe for
// concurrent access. // concurrent access.
func (c *chanMsgStream) AddMsg(msg lnwire.Message) { func (ms *msgStream) AddMsg(msg lnwire.Message) {
// First, we'll lock the condition, and add the message to the end of // First, we'll lock the condition, and add the message to the end of
// the message queue. // the message queue.
c.msgCond.L.Lock() ms.msgCond.L.Lock()
c.msgs = append(c.msgs, msg) ms.msgs = append(ms.msgs, msg)
c.msgCond.L.Unlock() ms.msgCond.L.Unlock()
// With the message added, we signal to the msgConsumer that there are // With the message added, we signal to the msgConsumer that there are
// additional messages to consume. // additional messages to consume.
c.msgCond.Signal() ms.msgCond.Signal()
}
// newChanMsgStream is used to create a msgStream between the peer and
// particular channel link in the htlcswitch. We utilize additional
// synchronization with the fundingManager to ensure we don't attempt to
// dispatch a message to a channel before it is fully active. A reference to the
// channel this stream forwards to his held in scope to prevent unnecessary
// lookups.
func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream {
var chanLink htlcswitch.ChannelLink
return newMsgStream(p,
fmt.Sprintf("Update stream for ChannelID(%x) created", cid),
fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid),
func(msg lnwire.Message) {
// We'll send a message to the funding manager and wait iff an
// active funding process for this channel hasn't yet completed.
// We do this in order to account for the following scenario: we
// send the funding locked message to the other side, they
// immediately send a channel update message, but we haven't yet
// sent the channel to the channelManager.
p.server.fundingMgr.waitUntilChannelOpen(cid)
// Dispatch the commitment update message to the proper active
// goroutine dedicated to this channel.
if chanLink == nil {
link, err := p.server.htlcSwitch.GetLink(cid)
if err != nil {
peerLog.Errorf("recv'd update for unknown "+
"channel %v from %v", cid, p)
return
}
chanLink = link
}
chanLink.HandleChannelUpdate(msg)
},
)
}
// newDiscMsgStream is used to setup a msgStream between the peer and the
// authenticated gossiper. This stream should be used to forward all remote
// channel announcements.
func newDiscMsgStream(p *peer) *msgStream {
return newMsgStream(p,
"Update stream for gossiper created",
"Update stream for gossiper exited",
func(msg lnwire.Message) {
p.server.authGossiper.ProcessRemoteAnnouncement(msg,
p.addr.IdentityKey)
},
)
} }
// readHandler is responsible for reading messages off the wire in series, then // readHandler is responsible for reading messages off the wire in series, then
@ -620,7 +650,11 @@ func (p *peer) readHandler() {
p.Disconnect(err) p.Disconnect(err)
}) })
chanMsgStreams := make(map[lnwire.ChannelID]*chanMsgStream) discStream := newDiscMsgStream(p)
discStream.Start()
defer discStream.Stop()
chanMsgStreams := make(map[lnwire.ChannelID]*msgStream)
out: out:
for atomic.LoadInt32(&p.disconnect) == 0 { for atomic.LoadInt32(&p.disconnect) == 0 {
nextMsg, err := p.readNextMessage() nextMsg, err := p.readNextMessage()
@ -717,8 +751,8 @@ out:
*lnwire.NodeAnnouncement, *lnwire.NodeAnnouncement,
*lnwire.AnnounceSignatures: *lnwire.AnnounceSignatures:
p.server.authGossiper.ProcessRemoteAnnouncement(msg, discStream.AddMsg(msg)
p.addr.IdentityKey)
default: default:
peerLog.Errorf("unknown message %v received from peer "+ peerLog.Errorf("unknown message %v received from peer "+
"%v", uint16(msg.MsgType()), p) "%v", uint16(msg.MsgType()), p)
@ -732,8 +766,7 @@ out:
// If a stream hasn't yet been created, then // If a stream hasn't yet been created, then
// we'll do so, add it to the map, and finally // we'll do so, add it to the map, and finally
// start it. // start it.
chanStream = newChanMsgStream(p.server.fundingMgr, chanStream = newChanMsgStream(p, targetChan)
p.server.htlcSwitch, p, targetChan)
chanMsgStreams[targetChan] = chanStream chanMsgStreams[targetChan] = chanStream
chanStream.Start() chanStream.Start()
} }