diff --git a/peer.go b/peer.go index 37f87e75..3742a162 100644 --- a/peer.go +++ b/peer.go @@ -471,6 +471,8 @@ func (p *peer) readNextMessage() (lnwire.Message, error) { // TODO(conner): use stream handler interface to abstract out stream // state/logging type msgStream struct { + streamShutdown int32 + peer *peer apply func(lnwire.Message) @@ -516,8 +518,12 @@ func (ms *msgStream) Stop() { close(ms.quit) - // Wake up the msgConsumer is we've been signalled to exit. - ms.msgCond.Signal() + // Now that we've closed the channel, we'll repeatedly signal the msg + // consumer until we've detected that it has exited. + for atomic.LoadInt32(&ms.streamShutdown) == 0 { + ms.msgCond.Signal() + time.Sleep(time.Millisecond * 100) + } ms.wg.Wait() } @@ -537,12 +543,13 @@ func (ms *msgStream) msgConsumer() { for len(ms.msgs) == 0 { ms.msgCond.Wait() - // If we were woke up in order to exit, then we'll do - // so. Otherwise, we'll check the message queue for any - // new items. + // If we woke up in order to exit, then we'll do so. + // Otherwise, we'll check the message queue for any new + // items. select { case <-ms.quit: ms.msgCond.L.Unlock() + atomic.StoreInt32(&ms.streamShutdown, 1) return default: } @@ -586,8 +593,8 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { var chanLink htlcswitch.ChannelLink return newMsgStream(p, - fmt.Sprintf("Update stream for ChannelID(%x) created", cid), - fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid), + fmt.Sprintf("Update stream for ChannelID(%x) created", cid[:]), + fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]), func(msg lnwire.Message) { _, isChanSycMsg := msg.(*lnwire.ChannelReestablish)