From 6078aa08ebf165e523cc00b67d89f0f946ba4fb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 25 May 2015 15:04:40 +0300 Subject: [PATCH] p2p/discover: watch find failures, evacuate on too many, rebond if failed --- p2p/discover/table.go | 55 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/p2p/discover/table.go b/p2p/discover/table.go index b523a0684..c45143307 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -27,6 +27,7 @@ const ( nBuckets = hashBits + 1 // Number of buckets maxBondingPingPongs = 16 + maxFindnodeFailures = 5 ) type Table struct { @@ -198,7 +199,19 @@ func (tab *Table) Lookup(targetID NodeID) []*Node { asked[n.ID] = true pendingQueries++ go func() { - r, _ := tab.net.findnode(n.ID, n.addr(), targetID) + // Find potential neighbors to bond with + r, err := tab.net.findnode(n.ID, n.addr(), targetID) + if err != nil { + // Bump the failure counter to detect and evacuate non-bonded entries + fails := tab.db.findFails(n.ID) + 1 + tab.db.updateFindFails(n.ID, fails) + glog.V(logger.Detail).Infof("Bumping failures for %x: %d", n.ID[:8], fails) + + if fails > maxFindnodeFailures { + glog.V(logger.Detail).Infof("Evacuating node %x: %d findnode failures", n.ID[:8], fails) + tab.del(n) + } + } reply <- tab.bondall(r) }() } @@ -305,8 +318,15 @@ func (tab *Table) bondall(nodes []*Node) (result []*Node) { // If pinged is true, the remote node has just pinged us and one half // of the process can be skipped. func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) (*Node, error) { - var n *Node - if n = tab.db.node(id); n == nil { + // Retrieve a previously known node and any recent findnode failures + node, fails := tab.db.node(id), 0 + if node != nil { + fails = tab.db.findFails(id) + } + // If the node is unknown (non-bonded) or failed (remotely unknown), bond from scratch + if node == nil || fails > 0 { + glog.V(logger.Detail).Infof("Bonding %x: known=%v, fails=%v", id[:8], node != nil, fails) + tab.bondmu.Lock() w := tab.bonding[id] if w != nil { @@ -325,18 +345,22 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16 delete(tab.bonding, id) tab.bondmu.Unlock() } - n = w.n + node = w.n if w.err != nil { return nil, w.err } } + // Bonding succeeded, add to the table and reset previous findnode failures tab.mutex.Lock() defer tab.mutex.Unlock() - b := tab.buckets[logdist(tab.self.sha, n.sha)] - if !b.bump(n) { - tab.pingreplace(n, b) + + b := tab.buckets[logdist(tab.self.sha, node.sha)] + if !b.bump(node) { + tab.pingreplace(node, b) } - return n, nil + tab.db.updateFindFails(id, 0) + + return node, nil } func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16) { @@ -414,6 +438,21 @@ outer: } } +// del removes an entry from the node table (used to evacuate failed/non-bonded +// discovery peers). +func (tab *Table) del(node *Node) { + tab.mutex.Lock() + defer tab.mutex.Unlock() + + bucket := tab.buckets[logdist(tab.self.sha, node.sha)] + for i := range bucket.entries { + if bucket.entries[i].ID == node.ID { + bucket.entries = append(bucket.entries[:i], bucket.entries[i+1:]...) + return + } + } +} + func (b *bucket) bump(n *Node) bool { for i := range b.entries { if b.entries[i].ID == n.ID {