From 79a6782c1c4a056b27d2c242f656dfcf5633ae3f Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 13 Apr 2015 17:06:19 +0200 Subject: [PATCH 1/5] p2p: fix goroutine leak when handshake read fails This regression was introduced in b3c058a9e4e9. --- p2p/handshake.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/handshake.go b/p2p/handshake.go index 43361364f..79395f23f 100644 --- a/p2p/handshake.go +++ b/p2p/handshake.go @@ -115,7 +115,7 @@ func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, // returning the handshake read error. If the remote side // disconnects us early with a valid reason, we should return it // as the error so it can be tracked elsewhere. - werr := make(chan error) + werr := make(chan error, 1) go func() { werr <- Send(rw, handshakeMsg, our) }() rhs, err := readProtocolHandshake(rw, secrets.RemoteID, our) if err != nil { From 995fab2ebc3b145a1dbb85841c5241cd024362ac Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 13 Apr 2015 17:34:08 +0200 Subject: [PATCH 2/5] p2p: fix yet another disconnect hang Peer.readLoop will only terminate if the connection is closed. Fix the hang by closing the connection before waiting for readLoop to terminate. This also removes the british disconnect procedure where we're waiting for the remote end to close the connection. I have confirmed with @subtly that cpp-ethereum doesn't adhere to it either. --- p2p/peer.go | 30 ++++++++---------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index 7bc4f9cf6..1262ba64a 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net" "sort" "sync" @@ -20,8 +19,7 @@ const ( baseProtocolLength = uint64(16) baseProtocolMaxMsgSize = 10 * 1024 * 1024 - pingInterval = 15 * time.Second - disconnectGracePeriod = 2 * time.Second + pingInterval = 15 * time.Second ) const ( @@ -129,39 +127,27 @@ func (p *Peer) run() DiscReason { case err := <-readErr: if r, ok := err.(DiscReason); ok { reason = r - break + } else { + // Note: We rely on protocols to abort if there is a write + // error. It might be more robust to handle them here as well. + p.DebugDetailf("Read error: %v\n", err) + reason = DiscNetworkError } - // Note: We rely on protocols to abort if there is a write - // error. It might be more robust to handle them here as well. - p.DebugDetailf("Read error: %v\n", err) - p.conn.Close() - reason = DiscNetworkError case err := <-p.protoErr: reason = discReasonForError(err) case reason = <-p.disc: } close(p.closed) + p.politeDisconnect(reason) p.wg.Wait() - if reason != DiscNetworkError { - p.politeDisconnect(reason) - } p.Debugf("Disconnected: %v\n", reason) return reason } func (p *Peer) politeDisconnect(reason DiscReason) { - done := make(chan struct{}) - go func() { + if reason != DiscNetworkError { SendItems(p.rw, discMsg, uint(reason)) - // Wait for the other side to close the connection. - // Discard any data that they send until then. - io.Copy(ioutil.Discard, p.conn) - close(done) - }() - select { - case <-done: - case <-time.After(disconnectGracePeriod): } p.conn.Close() } From b9929d289df8efd68e9226fc35b14cd5d469c6e8 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 13 Apr 2015 17:37:32 +0200 Subject: [PATCH 3/5] p2p: fix unsynchronized map access during Server shutdown removePeer can be called even after listenLoop and dialLoop have returned. --- p2p/server.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/p2p/server.go b/p2p/server.go index 5cd3dc2ad..61e0d71e9 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -260,9 +260,11 @@ func (srv *Server) Stop() { // No new peers can be added at this point because dialLoop and // listenLoop are down. It is safe to call peerWG.Wait because // peerWG.Add is not called outside of those loops. + srv.lock.Lock() for _, peer := range srv.peers { peer.Disconnect(DiscQuitting) } + srv.lock.Unlock() srv.peerWG.Wait() } From b8aeb04f6f55b03edb6998107f8480031e23b22f Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 13 Apr 2015 17:44:14 +0200 Subject: [PATCH 4/5] p2p/discover: remove unused field Node.activeStamp --- p2p/discover/node.go | 15 --------------- p2p/discover/table.go | 1 - 2 files changed, 16 deletions(-) diff --git a/p2p/discover/node.go b/p2p/discover/node.go index 99cb549a5..6662a6cb7 100644 --- a/p2p/discover/node.go +++ b/p2p/discover/node.go @@ -14,8 +14,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" - "time" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/secp256k1" @@ -31,9 +29,6 @@ type Node struct { DiscPort int // UDP listening port for discovery protocol TCPPort int // TCP listening port for RLPx - - // this must be set/read using atomic load and store. - activeStamp int64 } func newNode(id NodeID, addr *net.UDPAddr) *Node { @@ -50,16 +45,6 @@ func (n *Node) isValid() bool { return !n.IP.IsMulticast() && !n.IP.IsUnspecified() && n.TCPPort != 0 && n.DiscPort != 0 } -func (n *Node) bumpActive() { - stamp := time.Now().Unix() - atomic.StoreInt64(&n.activeStamp, stamp) -} - -func (n *Node) active() time.Time { - stamp := atomic.LoadInt64(&n.activeStamp) - return time.Unix(stamp, 0) -} - func (n *Node) addr() *net.UDPAddr { return &net.UDPAddr{IP: n.IP, Port: n.DiscPort} } diff --git a/p2p/discover/table.go b/p2p/discover/table.go index dbf86c084..e2e846456 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -326,7 +326,6 @@ outer: func (b *bucket) bump(n *Node) bool { for i := range b.entries { if b.entries[i].ID == n.ID { - n.bumpActive() // move it to the front copy(b.entries[1:], b.entries[:i]) b.entries[0] = n From 0217652d1b7e8f0c1c3002837d9f1277de27ef46 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 13 Apr 2015 18:08:11 +0200 Subject: [PATCH 5/5] p2p/discover: improve timer handling for reply timeouts --- p2p/discover/udp.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index d37260e7d..61a0abed9 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -267,11 +267,12 @@ func (t *udp) loop() { defer timeout.Stop() rearmTimeout := func() { - if len(pending) == 0 || nextDeadline == pending[0].deadline { + now := time.Now() + if len(pending) == 0 || now.Before(nextDeadline) { return } nextDeadline = pending[0].deadline - timeout.Reset(nextDeadline.Sub(time.Now())) + timeout.Reset(nextDeadline.Sub(now)) } for {