From 60c8257c3c251b841c476d09f64b800cbac6282d Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 12 Mar 2018 16:34:03 -0700 Subject: [PATCH] peer: modify the msgStream to not buffer messages off the wire indefinitely MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this commit, we modify the msgStream struct to ensure that it has a cap at which it’ll continue to buffer messages. Currently we have two msgStream structs per peer: the first for the discovery messages, and the second for any messages that modify channel state. Due to inefficiencies in the current protocol for reconciling graph state upon connection (just dump the entire damn thing), when a node first starts up, this can lead to very high memory usage as all peers will concurrently send their initial message dump which can be in the thousands of messages on testate. Our fix is simple: make the message stream into a _bounded_ message stream. The newMsgStream function now has a new argument: bufSize. Internally, we’ll take this bufSize and create more or less an internal semaphore for the producer. Each time the producer gets a new message, it’ll try and read an item from the channel. If the queue still has size, then this will succeed immediately. If not, then we’ll block until the consumer actually finishes processing a message and then signals by sending a new item into the channel. We choose an initial value of 1000. This was chosen as there’s already a max limit of outstanding adds on the commitment, and a value of 1000 should allow any incoming messages to be safely flushed and processed by the gossiper. --- peer.go | 54 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 8 deletions(-) diff --git a/peer.go b/peer.go index a0a5cc8b..9dbabf79 100644 --- a/peer.go +++ b/peer.go @@ -500,24 +500,39 @@ type msgStream struct { mtx sync.Mutex + bufSize uint32 + producerSema chan struct{} + wg sync.WaitGroup quit chan struct{} } // newMsgStream creates a new instance of a chanMsgStream for a particular -// channel identified by its channel ID. -func newMsgStream(p *peer, startMsg, stopMsg string, +// channel identified by its channel ID. bufSize is the max number of messages +// that should be buffered in the internal queue. Callers should set this to a +// sane value that avoids blocking unnecessarily, but doesn't allow an +// unbounded amount of memory to be allocated to buffer incoming messages. +func newMsgStream(p *peer, startMsg, stopMsg string, bufSize uint32, apply func(lnwire.Message)) *msgStream { stream := &msgStream{ - peer: p, - apply: apply, - startMsg: startMsg, - stopMsg: stopMsg, - quit: make(chan struct{}), + peer: p, + apply: apply, + startMsg: startMsg, + stopMsg: stopMsg, + producerSema: make(chan struct{}, bufSize), + quit: make(chan struct{}), } stream.msgCond = sync.NewCond(&stream.mtx) + // Before we return the active stream, we'll populate the producer's + // semaphore channel. We'll use this to ensure that the producer won't + // attempt to allocate memory in the queue for an item until it has + // sufficient extra space. + for i := uint32(0); i < bufSize; i++ { + stream.producerSema <- struct{}{} + } + return stream } @@ -580,13 +595,34 @@ func (ms *msgStream) msgConsumer() { ms.msgCond.L.Unlock() ms.apply(msg) + + // We've just successfully processed an item, so we'll signal + // to the producer that a new slot in the buffer. We'll use + // this to bound the size of the buffer to avoid allowing it to + // grow indefinitely. + select { + case ms.producerSema <- struct{}{}: + case <-ms.quit: + return + } } } // AddMsg adds a new message to the msgStream. This function is safe for // concurrent access. func (ms *msgStream) AddMsg(msg lnwire.Message) { - // First, we'll lock the condition, and add the message to the end of + // First, we'll attempt to receive from the producerSema struct. This + // acts as a sempahore to prevent us from indefinitely buffering + // incoming items from the wire. Either the msg queue isn't full, and + // we'll not block, or the queue is full, and we'll block until either + // we're signalled to quit, or a slot is freed up. + select { + case <-ms.producerSema: + case <-ms.quit: + return + } + + // Next, we'll lock the condition, and add the message to the end of // the message queue. ms.msgCond.L.Lock() ms.msgs = append(ms.msgs, msg) @@ -610,6 +646,7 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { return newMsgStream(p, fmt.Sprintf("Update stream for ChannelID(%x) created", cid[:]), fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]), + 1000, func(msg lnwire.Message) { _, isChanSycMsg := msg.(*lnwire.ChannelReestablish) @@ -653,6 +690,7 @@ func newDiscMsgStream(p *peer) *msgStream { return newMsgStream(p, "Update stream for gossiper created", "Update stream for gossiper exited", + 1000, func(msg lnwire.Message) { p.server.authGossiper.ProcessRemoteAnnouncement(msg, p.addr.IdentityKey)