diff --git a/peer.go b/peer.go index 28a01589..85cf793e 100644 --- a/peer.go +++ b/peer.go @@ -3,6 +3,8 @@ package main import ( "bytes" "container/list" + "crypto/rand" + "encoding/binary" "encoding/hex" "fmt" "net" @@ -10,6 +12,7 @@ import ( "sync/atomic" "time" + "github.com/BitfuryLightning/tools/rt" "github.com/BitfuryLightning/tools/rt/graph" "github.com/btcsuite/fastsha256" "github.com/davecgh/go-spew/spew" @@ -53,9 +56,10 @@ type chanSnapshotReq struct { } // peer is an active peer on the Lightning Network. This struct is responsible -// for managing any channel state related to this peer. To do so, it has several -// helper goroutines to handle events such as HTLC timeouts, new funding -// workflow, and detecting an uncooperative closure of any active channels. +// for managing any channel state related to this peer. To do so, it has +// several helper goroutines to handle events such as HTLC timeouts, new +// funding workflow, and detecting an uncooperative closure of any active +// channels. // TODO(roasbeef): proper reconnection logic type peer struct { // MUST be used atomically. @@ -118,8 +122,8 @@ type peer struct { // channels to the source peer which handled the funding workflow. newChannels chan *lnwallet.LightningChannel - // localCloseChanReqs is a channel in which any local requests to - // close a particular channel are sent over. + // localCloseChanReqs is a channel in which any local requests to close + // a particular channel are sent over. localCloseChanReqs chan *closeLinkReq // remoteCloseChanReqs is a channel in which any remote requests @@ -131,7 +135,7 @@ type peer struct { // next pending channel. Pending channels are tracked by this id // throughout their lifetime until they become active channels, or are // cancelled. Channels id's initiated by an outbound node start from 0, - // while channels inititaed by an inbound node start from 2^63. In + // while channels initiated by an inbound node start from 2^63. In // either case, this value is always monotonically increasing. nextPendingChannelID uint64 pendingChannelMtx sync.RWMutex @@ -241,7 +245,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { } // Start starts all helper goroutines the peer needs for normal operations. -// In the case this peer has already beeen started, then this function is a +// In the case this peer has already been started, then this function is a // noop. func (p *peer) Start() error { if atomic.AddInt32(&p.started, 1) != 1 { @@ -250,11 +254,12 @@ func (p *peer) Start() error { peerLog.Tracef("peer %v starting", p) - p.wg.Add(4) + p.wg.Add(5) go p.readHandler() go p.queueHandler() go p.writeHandler() go p.channelManager() + go p.pingHandler() return nil } @@ -347,6 +352,9 @@ out: var targetChan *wire.OutPoint switch msg := nextMsg.(type) { + case *lnwire.Ping: + p.queueMsg(lnwire.NewPong(msg.Nonce), nil) + // TODO(roasbeef): consolidate into predicate (single vs dual) case *lnwire.SingleFundingRequest: p.server.fundingMgr.processFundingRequest(msg, p) @@ -457,10 +465,6 @@ func (p *peer) writeMessage(msg lnwire.Message) error { // // NOTE: This method MUST be run as a goroutine. func (p *peer) writeHandler() { - // pingTicker is used to periodically send pings to the remote peer. - pingTicker := time.NewTicker(pingInterval) - defer pingTicker.Stop() - out: for { select { @@ -477,8 +481,6 @@ out: // Synchronize with the writeHandler. p.sendQueueSync <- struct{}{} - case <-pingTicker.C: - // TODO(roasbeef): move ping to time.AfterFunc case <-p.quit: break out } @@ -546,14 +548,50 @@ out: p.wg.Done() } +// pingHandler is responsible for periodically sending ping messages to the +// remote peer in order to keep the connection alive and/or determine if the +// connection is still active. +// +// NOTE: This method MUST be run as a goroutine. +func (p *peer) pingHandler() { + pingTicker := time.NewTicker(pingInterval) + defer pingTicker.Stop() + + var pingBuf [8]byte + +out: + for { + select { + case <-pingTicker.C: + // Fill the ping buffer with fresh randomness. If we're + // unable to read enough bytes, then we simply defer + // sending the ping to the next interval. + if _, err := rand.Read(pingBuf[:]); err != nil { + peerLog.Errorf("unable to send ping to %v: %v", p, + err) + continue + } + + // Convert the bytes read into a uint64, and queue the + // message for sending. + nonce := binary.BigEndian.Uint64(pingBuf[:]) + p.queueMsg(lnwire.NewPing(nonce), nil) + case <-p.quit: + break out + } + } + + p.wg.Done() +} + // queueMsg queues a new lnwire.Message to be eventually sent out on the // wire. func (p *peer) queueMsg(msg lnwire.Message, doneChan chan struct{}) { p.outgoingQueue <- outgoinMsg{msg, doneChan} } -// ChannelSnapshots returns a slice of channel snapshots detaling all currently -// active channels maintained with the remote peer. +// ChannelSnapshots returns a slice of channel snapshots detailing all +// currently active channels maintained with the remote peer. func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot { resp := make(chan []*channeldb.ChannelSnapshot, 1) p.chanSnapshotReqs <- &chanSnapshotReq{resp} @@ -1003,7 +1041,7 @@ out: // Otherwise, attempt to extend the remote commitment // chain including all the currently pending entries. - // If the send was unsuccesful, then abaondon the + // If the send was unsuccessful, then abandon the // update, waiting for the revocation window to open // up. if sent, err := p.updateCommitTx(state); err != nil {