From 47f62a67aa8a033d8a81dc16104018369325897d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 2 Oct 2015 13:20:41 +0300 Subject: [PATCH] eth/downloader: match capabilities when querying idle peers --- eth/downloader/downloader.go | 4 +-- eth/downloader/downloader_test.go | 49 +++++++++++++++++++++++++++++-- eth/downloader/peer.go | 8 +++-- 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index d1a716c5f..64fb1b57b 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -816,7 +816,7 @@ func (d *Downloader) fetchBlocks61(from uint64) error { } // Send a download request to all idle peers, until throttled throttled := false - for _, peer := range d.peers.IdlePeers() { + for _, peer := range d.peers.IdlePeers(eth61) { // Short circuit if throttling activated if d.queue.Throttle() { throttled = true @@ -1255,7 +1255,7 @@ func (d *Downloader) fetchBodies(from uint64) error { } // Send a download request to all idle peers, until throttled queuedEmptyBlocks, throttled := false, false - for _, peer := range d.peers.IdlePeers() { + for _, peer := range d.peers.IdlePeers(eth62) { // Short circuit if throttling activated if d.queue.Throttle() { throttled = true diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 885fab8bd..96096527e 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -205,9 +205,17 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha dl.lock.Lock() defer dl.lock.Unlock() - err := dl.downloader.RegisterPeer(id, version, hashes[0], - dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), - dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) + var err error + switch version { + case 61: + err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil) + case 62: + err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) + case 63: + err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) + case 64: + err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay)) + } if err == nil { // Assign the owned hashes and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) @@ -618,6 +626,41 @@ func testMultiSynchronisation(t *testing.T, protocol int) { } } +// Tests that synchronisations behave well in multi-version protocol environments +// and not wreak havok on other nodes in the network. +func TestMultiProtocolSynchronisation61(t *testing.T) { testMultiProtocolSynchronisation(t, 61) } +func TestMultiProtocolSynchronisation62(t *testing.T) { testMultiProtocolSynchronisation(t, 62) } +func TestMultiProtocolSynchronisation63(t *testing.T) { testMultiProtocolSynchronisation(t, 63) } +func TestMultiProtocolSynchronisation64(t *testing.T) { testMultiProtocolSynchronisation(t, 64) } + +func testMultiProtocolSynchronisation(t *testing.T, protocol int) { + // Create a small enough block chain to download + targetBlocks := blockCacheLimit - 15 + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + // Create peers of every type + tester := newTester() + tester.newPeer("peer 61", 61, hashes, blocks) + tester.newPeer("peer 62", 62, hashes, blocks) + tester.newPeer("peer 63", 63, hashes, blocks) + tester.newPeer("peer 64", 64, hashes, blocks) + + // Synchronise with the requestd peer and make sure all blocks were retrieved + if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) + } + // Check that no peers have been dropped off + for _, version := range []int{61, 62, 63, 64} { + peer := fmt.Sprintf("peer %d", version) + if _, ok := tester.peerHashes[peer]; !ok { + t.Errorf("%s dropped", peer) + } + } +} + // Tests that if a block is empty (i.e. header only), no body request should be // made, and instead the header should be assembled into a whole block in itself. func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) } diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 8fd1f9a99..c1d20ac61 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -312,14 +312,16 @@ func (ps *peerSet) AllPeers() []*peer { // IdlePeers retrieves a flat list of all the currently idle peers within the // active peer set, ordered by their reputation. -func (ps *peerSet) IdlePeers() []*peer { +func (ps *peerSet) IdlePeers(version int) []*peer { ps.lock.RLock() defer ps.lock.RUnlock() list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if atomic.LoadInt32(&p.idle) == 0 { - list = append(list, p) + if (version == eth61 && p.version == eth61) || (version >= eth62 && p.version >= eth62) { + if atomic.LoadInt32(&p.idle) == 0 { + list = append(list, p) + } } } for i := 0; i < len(list); i++ {