diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 9803ae534..f71c16237 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -122,7 +122,14 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { // newPeer registers a new block download source into the downloader. func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { - err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id), dl.peerGetBlocksFn(id)) + return dl.newSlowPeer(id, hashes, blocks, 0) +} + +// newSlowPeer registers a new block download source into the downloader, with a +// specific delay time on processing the network packets sent to it, simulating +// potentially slow network IO. +func (dl *downloadTester) newSlowPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { + err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id, delay), dl.peerGetBlocksFn(id, delay)) if err == nil { // Assign the owned hashes and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) @@ -147,8 +154,10 @@ func (dl *downloadTester) dropPeer(id string) { // peerGetBlocksFn constructs a getHashes function associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of hashes from the particularly requested peer. -func (dl *downloadTester) peerGetHashesFn(id string) func(head common.Hash) error { +func (dl *downloadTester) peerGetHashesFn(id string, delay time.Duration) func(head common.Hash) error { return func(head common.Hash) error { + time.Sleep(delay) + limit := MaxHashFetch if dl.maxHashFetch > 0 { limit = dl.maxHashFetch @@ -178,8 +187,10 @@ func (dl *downloadTester) peerGetHashesFn(id string) func(head common.Hash) erro // peerGetBlocksFn constructs a getBlocks function associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of blocks from the particularly requested peer. -func (dl *downloadTester) peerGetBlocksFn(id string) func([]common.Hash) error { +func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([]common.Hash) error { return func(hashes []common.Hash) error { + time.Sleep(delay) + blocks := dl.peerBlocks[id] result := make([]*types.Block, 0, len(hashes)) for _, hash := range hashes { @@ -340,6 +351,37 @@ func TestMultiSynchronisation(t *testing.T) { } } +// Tests that synchronising with a peer who's very slow at network IO does not +// stall the other peers in the system. +func TestSlowSynchronisation(t *testing.T) { + tester := newTester() + + // Create a batch of blocks, with a slow and a full speed peer + targetCycles := 2 + targetBlocks := targetCycles*blockCacheLimit - 15 + targetIODelay := 500 * time.Millisecond + + hashes := createHashes(targetBlocks, knownHash) + blocks := createBlocksFromHashes(hashes) + + tester.newSlowPeer("fast", hashes, blocks, 0) + tester.newSlowPeer("slow", hashes, blocks, targetIODelay) + + // Try to sync with the peers (pull hashes from fast) + start := time.Now() + if err := tester.sync("fast"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) + } + // Check that the slow peer got hit at most once per block-cache-size import + limit := time.Duration(targetCycles+1) * targetIODelay + if delay := time.Since(start); delay >= limit { + t.Fatalf("synchronisation exceeded delay limit: have %v, want %v", delay, limit) + } +} + // Tests that if a peer returns an invalid chain with a block pointing to a non- // existing parent, it is correctly detected and handled. func TestNonExistingParentAttack(t *testing.T) { diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 9614a6951..f36e133e4 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -74,7 +74,7 @@ func (p *peer) Fetch(request *fetchRequest) error { for hash, _ := range request.Hashes { hashes = append(hashes, hash) } - p.getBlocks(hashes) + go p.getBlocks(hashes) return nil } diff --git a/eth/sync.go b/eth/sync.go index 88a76805c..917fc0fce 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -171,7 +171,7 @@ func (pm *ProtocolManager) fetcher() { // Send out all block requests for peer, hashes := range request { glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id) - peer.requestBlocks(hashes) + go peer.requestBlocks(hashes) } request = make(map[*peer][]common.Hash)