From 4eebc7c9940956e73019b53e2b9fb3015732328d Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 1 Feb 2017 17:01:33 -0800 Subject: [PATCH] peer: refactor the queueHandler to aggressively drain queue This commit refactors the queueHandler slightly to be more aggressive when attempting to drain the pending message queue. With this new logic the queueHandler will now attempt to _completely_ drain the pendingMsgs queue by sending it all to the writeHandler _before_ attempting to accept new messages from the outside sub-systems. The previous queueHandler was a bit more passive and would result in messages sitting the the pendingMessage queue longer than required. --- peer.go | 117 +++++++++++++++++++++++++++----------------------------- 1 file changed, 57 insertions(+), 60 deletions(-) diff --git a/peer.go b/peer.go index f8cbd3e1..1139e150 100644 --- a/peer.go +++ b/peer.go @@ -543,18 +543,19 @@ func (p *peer) writeMessage(msg lnwire.Message) error { // // NOTE: This method MUST be run as a goroutine. func (p *peer) writeHandler() { -out: + defer func() { + p.wg.Done() + peerLog.Tracef("writeHandler for peer %v done", p) + }() + for { select { case outMsg := <-p.sendQueue: - if err := p.writeMessage(outMsg.msg); err != nil { - peerLog.Errorf("unable to write message: %v", - err) - p.Disconnect() - break out - } - switch outMsg.msg.(type) { + // If we're about to send a ping message, then log the + // exact time in which we send the message so we can + // use the delay as a rough estimate of latency to the + // remote peer. case *lnwire.Ping: // TODO(roasbeef): do this before the write? // possibly account for processing within func? @@ -562,33 +563,26 @@ out: atomic.StoreInt64(&p.pingLastSend, now) } - // Synchronize with the writeHandler. - p.sendQueueSync <- struct{}{} - case <-p.quit: - break out - } - } - - // Wait for the queueHandler to finish so we can empty out all pending - // messages avoiding a possible deadlock somewhere. - <-p.queueQuit - - // Drain any lingering messages that we're meant to be sent. But since - // we're shutting down, just ignore them. -fin: - for { - select { - case msg := <-p.sendQueue: - if msg.sentChan != nil { - msg.sentChan <- struct{}{} + // Write out the message to the socket, closing the + // 'sentChan' if it's non-nil, The 'sentChan' allows + // callers to optionally synchronize sends with the + // writeHandler. + err := p.writeMessage(outMsg.msg) + if outMsg.sentChan != nil { + close(outMsg.sentChan) } - default: - break fin + + if err != nil { + peerLog.Errorf("unable to write message: %v", + err) + p.Disconnect() + return + } + + case <-p.quit: + return } } - - p.wg.Done() - peerLog.Tracef("writeHandler for peer %v done", p) } // queueHandler is responsible for accepting messages from outside subsystems @@ -596,40 +590,43 @@ fin: // // NOTE: This method MUST be run as a goroutine. func (p *peer) queueHandler() { - waitOnSync := false + defer p.wg.Done() + pendingMsgs := list.New() -out: for { - select { - case msg := <-p.outgoingQueue: - if !waitOnSync { - p.sendQueue <- msg - } else { - pendingMsgs.PushBack(msg) - } - waitOnSync = true - case <-p.sendQueueSync: - // If there aren't any more remaining messages in the - // queue, then we're no longer waiting to synchronize - // with the writeHandler. - next := pendingMsgs.Front() - if next == nil { - waitOnSync = false - continue + // Before add a queue'd message our pending message queue, + // we'll first try to aggressively empty out our pending list of + // messaging. + for { + // Examine the front of the queue. If this message is + // nil, then we've emptied out the queue and can accept + // new messages from outside sub-systems. + elem := pendingMsgs.Front() + if elem == nil { + break } - // Notify the writeHandler about the next item to - // asynchronously send. - val := pendingMsgs.Remove(next) - p.sendQueue <- val.(outgoinMsg) - // TODO(roasbeef): other sync stuffs - case <-p.quit: - break out + select { + case p.sendQueue <- elem.Value.(outgoinMsg): + pendingMsgs.Remove(elem) + case <-p.quit: + return + default: + break + } } - } - close(p.queueQuit) - p.wg.Done() + // If there weren't any messages to send, or the writehandler + // is still blocked, then we'll accept a new message into the + // queue from outside sub-systems. + select { + case <-p.quit: + return + case msg := <-p.outgoingQueue: + pendingMsgs.PushBack(msg) + } + + } } // pingHandler is responsible for periodically sending ping messages to the