diff --git a/peer.go b/peer.go index d3e653a5..4fa3600c 100644 --- a/peer.go +++ b/peer.go @@ -394,6 +394,9 @@ func (p *peer) readNextMessage() (lnwire.Message, []byte, error) { // // NOTE: This method MUST be run as a goroutine. func (p *peer) readHandler() { + var activeChanMtx sync.Mutex + activeChanStreams := make(map[lnwire.ChannelID]struct{}) + out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, _, err := p.readNextMessage() @@ -481,18 +484,51 @@ out: } if isChanUpdate { - p.server.fundingMgr.waitUntilChannelOpen(targetChan) - // Dispatch the commitment update message to the proper - // active goroutine dedicated to this channel. - p.htlcManMtx.Lock() - channel, ok := p.htlcManagers[targetChan] - p.htlcManMtx.Unlock() - if !ok { - peerLog.Errorf("recv'd update for unknown "+ - "channel %v from %v", targetChan, p) + sendUpdate := func() { + // Dispatch the commitment update message to + // the proper active goroutine dedicated to + // this channel. + p.htlcManMtx.RLock() + channel, ok := p.htlcManagers[targetChan] + p.htlcManMtx.RUnlock() + if !ok { + peerLog.Errorf("recv'd update for unknown "+ + "channel %v from %v", targetChan, p) + return + } + + channel <- nextMsg + } + + // Check the map of active channel streams, if this map + // has an entry, then this means the channel is fully + // open. In this case, we can send the channel update + // directly without any further waiting. + activeChanMtx.Lock() + _, ok := activeChanStreams[targetChan] + activeChanMtx.Unlock() + if ok { + sendUpdate() continue } - channel <- nextMsg + + // Otherwise, we'll launch a goroutine to synchronize + // the processing of this message, with the opening of + // the channel as marked by the funding manage. + go func() { + // Block until the channel is marked open. + p.server.fundingMgr.waitUntilChannelOpen(targetChan) + + // Once the channel is open, we'll mark the + // stream as active and send the update to the + // channel. Marking the stream lets us take the + // fast path above, skipping the check to the + // funding manager. + activeChanMtx.Lock() + activeChanStreams[targetChan] = struct{}{} + sendUpdate() + activeChanMtx.Unlock() + }() } }