From b4374436f331903ae1a19879aac0f37678b65f0e Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 30 Sep 2015 05:01:49 +0200 Subject: [PATCH] p2p/discover: fix race involving the seed node iterator nodeDB.querySeeds was not safe for concurrent use but could be called concurrenty on multiple goroutines in the following case: - the table was empty - a timed refresh started - a lookup was started and initiated refresh These conditions are unlikely to coincide during normal use, but are much more likely to occur all at once when the user's machine just woke from sleep. The root cause of the issue is that querySeeds reused the same leveldb iterator until it was exhausted. This commit moves the refresh scheduling logic into its own goroutine (so only one refresh is ever active) and changes querySeeds to not use a persistent iterator. The seed node selection is now more random and ignores nodes that have not been contacted in the last 5 days. --- p2p/discover/database.go | 100 +++++++++++--------- p2p/discover/database_test.go | 103 ++++++++------------- p2p/discover/table.go | 169 ++++++++++++++++++++++------------ p2p/discover/table_test.go | 3 - p2p/discover/udp.go | 7 -- 5 files changed, 204 insertions(+), 178 deletions(-) diff --git a/p2p/discover/database.go b/p2p/discover/database.go index d5c594364..e8e3371ff 100644 --- a/p2p/discover/database.go +++ b/p2p/discover/database.go @@ -21,6 +21,7 @@ package discover import ( "bytes" + "crypto/rand" "encoding/binary" "os" "sync" @@ -46,11 +47,8 @@ var ( // nodeDB stores all nodes we know about. type nodeDB struct { - lvl *leveldb.DB // Interface to the database itself - seeder iterator.Iterator // Iterator for fetching possible seed nodes - - self NodeID // Own node id to prevent adding it into the database - + lvl *leveldb.DB // Interface to the database itself + self NodeID // Own node id to prevent adding it into the database runner sync.Once // Ensures we can start at most one expirer quit chan struct{} // Channel to signal the expiring thread to stop } @@ -302,52 +300,70 @@ func (db *nodeDB) updateFindFails(id NodeID, fails int) error { return db.storeInt64(makeKey(id, nodeDBDiscoverFindFails), int64(fails)) } -// querySeeds retrieves a batch of nodes to be used as potential seed servers -// during bootstrapping the node into the network. -// -// Ideal seeds are the most recently seen nodes (highest probability to be still -// alive), but yet untried. However, since leveldb only supports dumb iteration -// we will instead start pulling in potential seeds that haven't been yet pinged -// since the start of the boot procedure. -// -// If the database runs out of potential seeds, we restart the startup counter -// and start iterating over the peers again. -func (db *nodeDB) querySeeds(n int) []*Node { - // Create a new seed iterator if none exists - if db.seeder == nil { - db.seeder = db.lvl.NewIterator(nil, nil) - } - // Iterate over the nodes and find suitable seeds - nodes := make([]*Node, 0, n) - for len(nodes) < n && db.seeder.Next() { - // Iterate until a discovery node is found - id, field := splitKey(db.seeder.Key()) - if field != nodeDBDiscoverRoot { - continue +// querySeeds retrieves random nodes to be used as potential seed nodes +// for bootstrapping. +func (db *nodeDB) querySeeds(n int, maxAge time.Duration) []*Node { + var ( + now = time.Now() + nodes = make([]*Node, 0, n) + it = db.lvl.NewIterator(nil, nil) + id NodeID + ) + defer it.Release() + +seek: + for seeks := 0; len(nodes) < n && seeks < n*5; seeks++ { + // Seek to a random entry. The first byte is incremented by a + // random amount each time in order to increase the likelihood + // of hitting all existing nodes in very small databases. + ctr := id[0] + rand.Read(id[:]) + id[0] = ctr + id[0]%16 + it.Seek(makeKey(id, nodeDBDiscoverRoot)) + + n := nextNode(it) + if n == nil { + id[0] = 0 + continue seek // iterator exhausted } - // Dump it if its a self reference - if bytes.Compare(id[:], db.self[:]) == 0 { - db.deleteNode(id) - continue + if n.ID == db.self { + continue seek } - // Load it as a potential seed - if node := db.node(id); node != nil { - nodes = append(nodes, node) + if now.Sub(db.lastPong(n.ID)) > maxAge { + continue seek } - } - // Release the iterator if we reached the end - if len(nodes) == 0 { - db.seeder.Release() - db.seeder = nil + for i := range nodes { + if nodes[i].ID == n.ID { + continue seek // duplicate + } + } + nodes = append(nodes, n) } return nodes } +// reads the next node record from the iterator, skipping over other +// database entries. +func nextNode(it iterator.Iterator) *Node { + for end := false; !end; end = !it.Next() { + id, field := splitKey(it.Key()) + if field != nodeDBDiscoverRoot { + continue + } + var n Node + if err := rlp.DecodeBytes(it.Value(), &n); err != nil { + if glog.V(logger.Warn) { + glog.Errorf("invalid node %x: %v", id, err) + } + continue + } + return &n + } + return nil +} + // close flushes and closes the database files. func (db *nodeDB) close() { - if db.seeder != nil { - db.seeder.Release() - } close(db.quit) db.lvl.Close() } diff --git a/p2p/discover/database_test.go b/p2p/discover/database_test.go index 569585903..80c1a6ff2 100644 --- a/p2p/discover/database_test.go +++ b/p2p/discover/database_test.go @@ -162,9 +162,33 @@ var nodeDBSeedQueryNodes = []struct { node *Node pong time.Time }{ + // This one should not be in the result set because its last + // pong time is too far in the past. { node: newNode( - MustHexID("0x01d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + MustHexID("0x84d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + net.IP{127, 0, 0, 3}, + 30303, + 30303, + ), + pong: time.Now().Add(-3 * time.Hour), + }, + // This one shouldn't be in in the result set because its + // nodeID is the local node's ID. + { + node: newNode( + MustHexID("0x57d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + net.IP{127, 0, 0, 3}, + 30303, + 30303, + ), + pong: time.Now().Add(-4 * time.Second), + }, + + // These should be in the result set. + { + node: newNode( + MustHexID("0x22d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{127, 0, 0, 1}, 30303, 30303, @@ -173,7 +197,7 @@ var nodeDBSeedQueryNodes = []struct { }, { node: newNode( - MustHexID("0x02d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + MustHexID("0x44d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{127, 0, 0, 2}, 30303, 30303, @@ -182,7 +206,7 @@ var nodeDBSeedQueryNodes = []struct { }, { node: newNode( - MustHexID("0x03d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), + MustHexID("0xe2d9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439"), net.IP{127, 0, 0, 3}, 30303, 30303, @@ -192,7 +216,7 @@ var nodeDBSeedQueryNodes = []struct { } func TestNodeDBSeedQuery(t *testing.T) { - db, _ := newNodeDB("", Version, NodeID{}) + db, _ := newNodeDB("", Version, nodeDBSeedQueryNodes[1].node.ID) defer db.close() // Insert a batch of nodes for querying @@ -200,20 +224,24 @@ func TestNodeDBSeedQuery(t *testing.T) { if err := db.updateNode(seed.node); err != nil { t.Fatalf("node %d: failed to insert: %v", i, err) } + if err := db.updateLastPong(seed.node.ID, seed.pong); err != nil { + t.Fatalf("node %d: failed to insert lastPong: %v", i, err) + } } + // Retrieve the entire batch and check for duplicates - seeds := db.querySeeds(2 * len(nodeDBSeedQueryNodes)) - if len(seeds) != len(nodeDBSeedQueryNodes) { - t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(nodeDBSeedQueryNodes)) - } + seeds := db.querySeeds(len(nodeDBSeedQueryNodes)*2, time.Hour) have := make(map[NodeID]struct{}) for _, seed := range seeds { have[seed.ID] = struct{}{} } want := make(map[NodeID]struct{}) - for _, seed := range nodeDBSeedQueryNodes { + for _, seed := range nodeDBSeedQueryNodes[2:] { want[seed.node.ID] = struct{}{} } + if len(seeds) != len(want) { + t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(want)) + } for id, _ := range have { if _, ok := want[id]; !ok { t.Errorf("extra seed: %v", id) @@ -224,63 +252,6 @@ func TestNodeDBSeedQuery(t *testing.T) { t.Errorf("missing seed: %v", id) } } - // Make sure the next batch is empty (seed EOF) - seeds = db.querySeeds(2 * len(nodeDBSeedQueryNodes)) - if len(seeds) != 0 { - t.Errorf("seed count mismatch: have %v, want %v", len(seeds), 0) - } -} - -func TestNodeDBSeedQueryContinuation(t *testing.T) { - db, _ := newNodeDB("", Version, NodeID{}) - defer db.close() - - // Insert a batch of nodes for querying - for i, seed := range nodeDBSeedQueryNodes { - if err := db.updateNode(seed.node); err != nil { - t.Fatalf("node %d: failed to insert: %v", i, err) - } - } - // Iteratively retrieve the batch, checking for an empty batch on reset - for i := 0; i < len(nodeDBSeedQueryNodes); i++ { - if seeds := db.querySeeds(1); len(seeds) != 1 { - t.Errorf("1st iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1) - } - } - if seeds := db.querySeeds(1); len(seeds) != 0 { - t.Errorf("reset: seed count mismatch: have %v, want %v", len(seeds), 0) - } - for i := 0; i < len(nodeDBSeedQueryNodes); i++ { - if seeds := db.querySeeds(1); len(seeds) != 1 { - t.Errorf("2nd iteration %d: seed count mismatch: have %v, want %v", i, len(seeds), 1) - } - } -} - -func TestNodeDBSelfSeedQuery(t *testing.T) { - // Assign a node as self to verify evacuation - self := nodeDBSeedQueryNodes[0].node.ID - db, _ := newNodeDB("", Version, self) - defer db.close() - - // Insert a batch of nodes for querying - for i, seed := range nodeDBSeedQueryNodes { - if err := db.updateNode(seed.node); err != nil { - t.Fatalf("node %d: failed to insert: %v", i, err) - } - } - // Retrieve the entire batch and check that self was evacuated - seeds := db.querySeeds(2 * len(nodeDBSeedQueryNodes)) - if len(seeds) != len(nodeDBSeedQueryNodes)-1 { - t.Errorf("seed count mismatch: have %v, want %v", len(seeds), len(nodeDBSeedQueryNodes)-1) - } - have := make(map[NodeID]struct{}) - for _, seed := range seeds { - have[seed.ID] = struct{}{} - } - if _, ok := have[self]; ok { - t.Errorf("self not evacuated") - } } func TestNodeDBPersistency(t *testing.T) { diff --git a/p2p/discover/table.go b/p2p/discover/table.go index 972bc1077..66afa52ea 100644 --- a/p2p/discover/table.go +++ b/p2p/discover/table.go @@ -44,6 +44,10 @@ const ( maxBondingPingPongs = 16 maxFindnodeFailures = 5 + + autoRefreshInterval = 1 * time.Hour + seedCount = 30 + seedMaxAge = 5 * 24 * time.Hour ) type Table struct { @@ -52,6 +56,10 @@ type Table struct { nursery []*Node // bootstrap nodes db *nodeDB // database of known nodes + refreshReq chan struct{} + closeReq chan struct{} + closed chan struct{} + bondmu sync.Mutex bonding map[NodeID]*bondproc bondslots chan struct{} // limits total number of active bonding processes @@ -93,11 +101,14 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string db, _ = newNodeDB("", Version, ourID) } tab := &Table{ - net: t, - db: db, - self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)), - bonding: make(map[NodeID]*bondproc), - bondslots: make(chan struct{}, maxBondingPingPongs), + net: t, + db: db, + self: newNode(ourID, ourAddr.IP, uint16(ourAddr.Port), uint16(ourAddr.Port)), + bonding: make(map[NodeID]*bondproc), + bondslots: make(chan struct{}, maxBondingPingPongs), + refreshReq: make(chan struct{}), + closeReq: make(chan struct{}), + closed: make(chan struct{}), } for i := 0; i < cap(tab.bondslots); i++ { tab.bondslots <- struct{}{} @@ -105,6 +116,7 @@ func newTable(t transport, ourID NodeID, ourAddr *net.UDPAddr, nodeDBPath string for i := range tab.buckets { tab.buckets[i] = new(bucket) } + go tab.refreshLoop() return tab } @@ -163,10 +175,12 @@ func randUint(max uint32) uint32 { // Close terminates the network listener and flushes the node database. func (tab *Table) Close() { - if tab.net != nil { - tab.net.close() + select { + case <-tab.closed: + // already closed. + case tab.closeReq <- struct{}{}: + <-tab.closed // wait for refreshLoop to end. } - tab.db.close() } // Bootstrap sets the bootstrap nodes. These nodes are used to connect @@ -183,7 +197,7 @@ func (tab *Table) Bootstrap(nodes []*Node) { tab.nursery = append(tab.nursery, &cpy) } tab.mutex.Unlock() - tab.refresh() + tab.requestRefresh() } // Lookup performs a network search for nodes close @@ -210,9 +224,9 @@ func (tab *Table) Lookup(targetID NodeID) []*Node { result := tab.closest(target, bucketSize) tab.mutex.Unlock() - // If the result set is empty, all nodes were dropped, refresh + // If the result set is empty, all nodes were dropped, refresh. if len(result.entries) == 0 { - tab.refresh() + tab.requestRefresh() return nil } @@ -257,56 +271,86 @@ func (tab *Table) Lookup(targetID NodeID) []*Node { return result.entries } -// refresh performs a lookup for a random target to keep buckets full, or seeds -// the table if it is empty (initial bootstrap or discarded faulty peers). -func (tab *Table) refresh() { - seed := true +func (tab *Table) requestRefresh() { + select { + case tab.refreshReq <- struct{}{}: + case <-tab.closed: + } +} - // If the discovery table is empty, seed with previously known nodes - tab.mutex.Lock() - for _, bucket := range tab.buckets { - if len(bucket.entries) > 0 { - seed = false - break +func (tab *Table) refreshLoop() { + defer func() { + tab.db.close() + if tab.net != nil { + tab.net.close() + } + close(tab.closed) + }() + + timer := time.NewTicker(autoRefreshInterval) + var done chan struct{} + for { + select { + case <-timer.C: + if done == nil { + done = make(chan struct{}) + go tab.doRefresh(done) + } + case <-tab.refreshReq: + if done == nil { + done = make(chan struct{}) + go tab.doRefresh(done) + } + case <-done: + done = nil + case <-tab.closeReq: + if done != nil { + <-done + } + return } } +} + +// doRefresh performs a lookup for a random target to keep buckets +// full. seed nodes are inserted if the table is empty (initial +// bootstrap or discarded faulty peers). +func (tab *Table) doRefresh(done chan struct{}) { + defer close(done) + + // The Kademlia paper specifies that the bucket refresh should + // perform a lookup in the least recently used bucket. We cannot + // adhere to this because the findnode target is a 512bit value + // (not hash-sized) and it is not easily possible to generate a + // sha3 preimage that falls into a chosen bucket. + // We perform a lookup with a random target instead. + var target NodeID + rand.Read(target[:]) + result := tab.Lookup(target) + if len(result) > 0 { + return + } + + // The table is empty. Load nodes from the database and insert + // them. This should yield a few previously seen nodes that are + // (hopefully) still alive. + seeds := tab.db.querySeeds(seedCount, seedMaxAge) + seeds = tab.bondall(append(seeds, tab.nursery...)) + if glog.V(logger.Debug) { + if len(seeds) == 0 { + glog.Infof("no seed nodes found") + } + for _, n := range seeds { + age := time.Since(tab.db.lastPong(n.ID)) + glog.Infof("seed node (age %v): %v", age, n) + } + } + tab.mutex.Lock() + tab.stuff(seeds) tab.mutex.Unlock() - // If the table is not empty, try to refresh using the live entries - if !seed { - // The Kademlia paper specifies that the bucket refresh should - // perform a refresh in the least recently used bucket. We cannot - // adhere to this because the findnode target is a 512bit value - // (not hash-sized) and it is not easily possible to generate a - // sha3 preimage that falls into a chosen bucket. - // - // We perform a lookup with a random target instead. - var target NodeID - rand.Read(target[:]) - - result := tab.Lookup(target) - if len(result) == 0 { - // Lookup failed, seed after all - seed = true - } - } - - if seed { - // Pick a batch of previously know seeds to lookup with - seeds := tab.db.querySeeds(10) - for _, seed := range seeds { - glog.V(logger.Debug).Infoln("Seeding network with", seed) - } - nodes := append(tab.nursery, seeds...) - - // Bond with all the seed nodes (will pingpong only if failed recently) - bonded := tab.bondall(nodes) - if len(bonded) > 0 { - tab.Lookup(tab.self.ID) - } - // TODO: the Kademlia paper says that we're supposed to perform - // random lookups in all buckets further away than our closest neighbor. - } + // Finally, do a self lookup to fill up the buckets. + tab.Lookup(tab.self.ID) } // closest returns the n nodes in the table that are closest to the @@ -373,8 +417,9 @@ func (tab *Table) bond(pinged bool, id NodeID, addr *net.UDPAddr, tcpPort uint16 } // If the node is unknown (non-bonded) or failed (remotely unknown), bond from scratch var result error - if node == nil || fails > 0 { - glog.V(logger.Detail).Infof("Bonding %x: known=%v, fails=%v", id[:8], node != nil, fails) + age := time.Since(tab.db.lastPong(id)) + if node == nil || fails > 0 || age > nodeDBNodeExpiration { + glog.V(logger.Detail).Infof("Bonding %x: known=%t, fails=%d age=%v", id[:8], node != nil, fails, age) tab.bondmu.Lock() w := tab.bonding[id] @@ -435,13 +480,17 @@ func (tab *Table) pingpong(w *bondproc, pinged bool, id NodeID, addr *net.UDPAdd // ping a remote endpoint and wait for a reply, also updating the node // database accordingly. func (tab *Table) ping(id NodeID, addr *net.UDPAddr) error { - // Update the last ping and send the message tab.db.updateLastPing(id, time.Now()) if err := tab.net.ping(id, addr); err != nil { return err } - // Pong received, update the database and return tab.db.updateLastPong(id, time.Now()) + + // Start the background expiration goroutine after the first + // successful communication. Subsequent calls have no effect if it + // is already running. We do this here instead of somewhere else + // so that the search for seed nodes also considers older nodes + // that would otherwise be removed by the expiration. tab.db.ensureExpirer() return nil } diff --git a/p2p/discover/table_test.go b/p2p/discover/table_test.go index 426f4e9cc..84962a1a5 100644 --- a/p2p/discover/table_test.go +++ b/p2p/discover/table_test.go @@ -514,9 +514,6 @@ func (tn *preminedTestnet) findnode(toid NodeID, toaddr *net.UDPAddr, target Nod if toaddr.Port == 0 { panic("query to node at distance 0") } - if target != tn.target { - panic("findnode with wrong target") - } next := uint16(toaddr.Port) - 1 var result []*Node for i, id := range tn.dists[toaddr.Port] { diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index afb31ee69..8f62598f2 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -52,8 +52,6 @@ const ( respTimeout = 500 * time.Millisecond sendTimeout = 500 * time.Millisecond expiration = 20 * time.Second - - refreshInterval = 1 * time.Hour ) // RPC packet types @@ -312,10 +310,8 @@ func (t *udp) loop() { plist = list.New() timeout = time.NewTimer(0) nextTimeout *pending // head of plist when timeout was last reset - refresh = time.NewTicker(refreshInterval) ) <-timeout.C // ignore first timeout - defer refresh.Stop() defer timeout.Stop() resetTimeout := func() { @@ -344,9 +340,6 @@ func (t *udp) loop() { resetTimeout() select { - case <-refresh.C: - go t.refresh() - case <-t.closing: for el := plist.Front(); el != nil; el = el.Next() { el.Value.(*pending).errc <- errClosed