diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 8ca1d634..497d85aa 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -99,5 +99,5 @@ type Peer interface { // Disconnect disconnects with peer if we have error which we can't // properly handle. - Disconnect() + Disconnect(reason error) } diff --git a/htlcswitch/link.go b/htlcswitch/link.go index acfee55a..6efc7097 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -314,9 +314,7 @@ out: } if err := l.updateCommitTx(); err != nil { - log.Errorf("unable to update commitment: %v", - err) - l.cfg.Peer.Disconnect() + l.fail("unable to update commitment: %v", err) break out } @@ -333,9 +331,7 @@ out: // update, waiting for the revocation window to open // up. if err := l.updateCommitTx(); err != nil { - log.Errorf("unable to update "+ - "commitment: %v", err) - l.cfg.Peer.Disconnect() + l.fail("unable to update commitment: %v", err) break out } @@ -454,9 +450,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { logIndex, err := l.channel.SettleHTLC(pre) if err != nil { // TODO(roasbeef): broadcast on-chain - log.Errorf("settle for incoming HTLC "+ - "rejected: %v", err) - l.cfg.Peer.Disconnect() + l.fail("unable to settle incoming HTLC: %v", err) return } @@ -499,9 +493,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) { // this is a settle request, then initiate an update. if l.batchCounter >= 10 || isSettle { if err := l.updateCommitTx(); err != nil { - log.Errorf("unable to update "+ - "commitment: %v", err) - l.cfg.Peer.Disconnect() + l.fail("unable to update commitment: %v", err) return } } @@ -518,10 +510,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // "settle" list in the event that we know the preimage. index, err := l.channel.ReceiveHTLC(msg) if err != nil { - // TODO(roasbeef): fail channel - log.Errorf("unable to handle upstream add HTLC: %v", - err) - l.cfg.Peer.Disconnect() + l.fail("unable to handle upstream add HTLC: %v", err) return } log.Tracef("Receive upstream htlc with payment hash(%x), "+ @@ -540,9 +529,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { idx := msg.ID if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil { // TODO(roasbeef): broadcast on-chain - log.Errorf("unable to handle upstream settle "+ - "HTLC: %v", err) - l.cfg.Peer.Disconnect() + l.fail("unable to handle upstream settle HTLC: %v", err) return } @@ -551,9 +538,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { case *lnwire.UpdateFailHTLC: idx := msg.ID if err := l.channel.ReceiveFailHTLC(idx); err != nil { - log.Errorf("unable to handle upstream fail HTLC: "+ - "%v", err) - l.cfg.Peer.Disconnect() + l.fail("unable to handle upstream fail HTLC: %v", err) return } @@ -566,8 +551,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // TODO(roasbeef): redundant re-serialization sig := msg.CommitSig.Serialize() if err := l.channel.ReceiveNewCommitment(sig); err != nil { - log.Errorf("unable to accept new commitment: %v", err) - l.cfg.Peer.Disconnect() + l.fail("unable to accept new commitment: %v", err) return } @@ -607,8 +591,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // so we'll reply with a signature to provide them with their // version of the latest commitment l. if err := l.updateCommitTx(); err != nil { - log.Errorf("unable to update commitment: %v", err) - l.cfg.Peer.Disconnect() + l.fail("unable to update commitment: %v", err) return } @@ -618,8 +601,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // revocation window. htlcs, err := l.channel.ReceiveRevocation(msg) if err != nil { - log.Errorf("unable to accept revocation: %v", err) - l.cfg.Peer.Disconnect() + l.fail("unable to accept revocation: %v", err) return } @@ -643,8 +625,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // channel, if not we will apply the update. fee := msg.FeePerKw if err := l.channel.ReceiveUpdateFee(fee); err != nil { - log.Errorf("error receiving fee update: %v", err) - l.cfg.Peer.Disconnect() + l.fail("error receiving fee update: %v", err) return } } @@ -999,8 +980,7 @@ func (l *channelLink) processLockedInHtlcs( preimage := invoice.Terms.PaymentPreimage logIndex, err := l.channel.SettleHTLC(preimage) if err != nil { - log.Errorf("unable to settle htlc: %v", err) - l.cfg.Peer.Disconnect() + l.fail("unable to settle htlc: %v", err) return nil } @@ -1009,8 +989,7 @@ func (l *channelLink) processLockedInHtlcs( // update. err = l.cfg.Registry.SettleInvoice(invoiceHash) if err != nil { - log.Errorf("unable to settle invoice: %v", err) - l.cfg.Peer.Disconnect() + l.fail("unable to settle invoice: %v", err) return nil } @@ -1133,8 +1112,7 @@ func (l *channelLink) processLockedInHtlcs( // remote htlc logs, initiate a state transition by updating // the remote commitment chain. if err := l.updateCommitTx(); err != nil { - log.Errorf("unable to update commitment: %v", err) - l.cfg.Peer.Disconnect() + l.fail("unable to update commitment: %v", err) return nil } } @@ -1159,3 +1137,11 @@ func (l *channelLink) sendHTLCError(rHash [32]byte, Reason: reason, }) } + +// fail helper function which is used to encapsulate the action neccessary +// for proper disconnect. +func (l *channelLink) fail(format string, a ...interface{}) { + reason := errors.Errorf(format, a...) + log.Error(reason) + l.cfg.Peer.Disconnect(reason) +} \ No newline at end of file diff --git a/htlcswitch/mock.go b/htlcswitch/mock.go index 48bf3d8f..e7cd6688 100644 --- a/htlcswitch/mock.go +++ b/htlcswitch/mock.go @@ -260,7 +260,7 @@ func (s *mockServer) PubKey() [33]byte { return s.id } -func (s *mockServer) Disconnect() { +func (s *mockServer) Disconnect(reason error) { s.Stop() s.t.Fatalf("server %v was disconnected", s.name) } diff --git a/peer.go b/peer.go index 6c37423d..bcf2a1ac 100644 --- a/peer.go +++ b/peer.go @@ -338,12 +338,12 @@ func (p *peer) WaitForDisconnect() { // Disconnect terminates the connection with the remote peer. Additionally, a // signal is sent to the server and htlcSwitch indicating the resources // allocated to the peer can now be cleaned up. -func (p *peer) Disconnect() { +func (p *peer) Disconnect(reason error) { if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) { return } - peerLog.Tracef("Disconnecting %s", p) + peerLog.Tracef("Disconnecting %s, reason: %v", p, reason) // Ensure that the TCP connection is properly closed before continuing. p.conn.Close() @@ -534,7 +534,7 @@ out: } } - p.Disconnect() + p.Disconnect(errors.New("read handler closed")) p.wg.Done() peerLog.Tracef("readHandler for peer %v done", p) @@ -641,9 +641,8 @@ func (p *peer) writeHandler() { } if err != nil { - peerLog.Errorf("unable to write message: %v", - err) - p.Disconnect() + p.Disconnect(errors.Errorf("unable to write message: %v", + err)) return } diff --git a/server.go b/server.go index 99d678fd..2588b097 100644 --- a/server.go +++ b/server.go @@ -682,8 +682,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound // Attempt to start the peer, if we're unable to do so, then disconnect // this peer. if err := p.Start(); err != nil { - srvrLog.Errorf("unable to start peer: %v", err) - p.Disconnect() + p.Disconnect(errors.Errorf("unable to start peer: %v", err)) return } @@ -746,7 +745,7 @@ func (s *server) inboundPeerConnected(conn net.Conn) { // peer to the peer garbage collection goroutine. srvrLog.Debugf("Disconnecting stale connection to %v", connectedPeer) - connectedPeer.Disconnect() + connectedPeer.Disconnect(errors.New("remove stale connection")) s.donePeers <- connectedPeer } @@ -825,7 +824,7 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) // server for garbage collection. srvrLog.Debugf("Disconnecting stale connection to %v", connectedPeer) - connectedPeer.Disconnect() + connectedPeer.Disconnect(errors.New("remove stale connection")) s.donePeers <- connectedPeer } @@ -841,7 +840,7 @@ func (s *server) addPeer(p *peer) { // Ignore new peers if we're shutting down. if atomic.LoadInt32(&s.shutdown) != 0 { - p.Disconnect() + p.Disconnect(errors.New("server is shutting down")) return } @@ -889,7 +888,7 @@ func (s *server) removePeer(p *peer) { // As the peer is now finished, ensure that the TCP connection is // closed and all of its related goroutines have exited. - p.Disconnect() + p.Disconnect(errors.New("remove peer")) // Ignore deleting peers if we're shutting down. if atomic.LoadInt32(&s.shutdown) != 0 { @@ -1144,7 +1143,7 @@ func (s *server) handleDisconnectPeer(msg *disconnectPeerMsg) { // Now that we know the peer is actually connected, we'll disconnect // from the peer. srvrLog.Infof("Disconnecting from %v", peer) - peer.Disconnect() + peer.Disconnect(errors.New("received user command to disconnect the peer")) msg.err <- nil }