diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 85de78ebf..cc75c3014 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -150,16 +150,9 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { return d.syncWithPeer(p, hash) } -// TakeBlocks takes blocks from the queue and yields them to the blockTaker handler -// it's possible it yields no blocks +// TakeBlocks takes blocks from the queue and yields them to the caller. func (d *Downloader) TakeBlocks() types.Blocks { - // Check that there are blocks available and its parents are known - head := d.queue.GetHeadBlock() - if head == nil || !d.hasBlock(head.ParentHash()) { - return nil - } - // Retrieve a full batch of blocks - return d.queue.TakeBlocks(head) + return d.queue.TakeBlocks() } func (d *Downloader) Has(hash common.Hash) bool { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 78eff011a..cfa6257a3 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -10,7 +10,10 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -var knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} +var ( + knownHash = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + unknownHash = common.Hash{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9} +) func createHashes(start, amount int) (hashes []common.Hash) { hashes = make([]common.Hash, amount+1) @@ -27,7 +30,7 @@ func createBlock(i int, prevHash, hash common.Hash) *types.Block { header := &types.Header{Number: big.NewInt(int64(i))} block := types.NewBlockWithHeader(header) block.HeaderHash = hash - block.ParentHeaderHash = knownHash + block.ParentHeaderHash = prevHash return block } @@ -42,9 +45,12 @@ func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block { } type downloadTester struct { - downloader *Downloader - hashes []common.Hash - blocks map[common.Hash]*types.Block + downloader *Downloader + + hashes []common.Hash // Chain of hashes simulating + blocks map[common.Hash]*types.Block // Blocks associated with the hashes + chain []common.Hash // Block-chain being constructed + t *testing.T pcount int done chan bool @@ -52,7 +58,15 @@ type downloadTester struct { } func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types.Block) *downloadTester { - tester := &downloadTester{t: t, hashes: hashes, blocks: blocks, done: make(chan bool)} + tester := &downloadTester{ + t: t, + + hashes: hashes, + blocks: blocks, + chain: []common.Hash{knownHash}, + + done: make(chan bool), + } downloader := New(tester.hasBlock, tester.getBlock) tester.downloader = downloader @@ -64,9 +78,17 @@ func (dl *downloadTester) sync(peerId string, hash common.Hash) error { return dl.downloader.Synchronise(peerId, hash) } +func (dl *downloadTester) insertBlocks(blocks types.Blocks) { + for _, block := range blocks { + dl.chain = append(dl.chain, block.Hash()) + } +} + func (dl *downloadTester) hasBlock(hash common.Hash) bool { - if knownHash == hash { - return true + for _, h := range dl.chain { + if h == hash { + return true + } } return false } @@ -175,10 +197,9 @@ func TestTaking(t *testing.T) { if err != nil { t.Error("download error", err) } - - bs1 := tester.downloader.TakeBlocks() - if len(bs1) != 1000 { - t.Error("expected to take 1000, got", len(bs1)) + bs := tester.downloader.TakeBlocks() + if len(bs) != targetBlocks { + t.Error("retrieved block mismatch: have %v, want %v", len(bs), targetBlocks) } } @@ -248,17 +269,17 @@ func TestThrottling(t *testing.T) { done := make(chan struct{}) took := []*types.Block{} go func() { - for { + for running := true; running; { select { case <-done: - took = append(took, tester.downloader.TakeBlocks()...) - done <- struct{}{} - return + running = false default: - took = append(took, tester.downloader.TakeBlocks()...) time.Sleep(time.Millisecond) } + // Take a batch of blocks and accumulate + took = append(took, tester.downloader.TakeBlocks()...) } + done <- struct{}{} }() // Synchronise the two threads and verify @@ -273,3 +294,43 @@ func TestThrottling(t *testing.T) { t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks) } } + +// Tests that if a peer returns an invalid chain with a block pointing to a non- +// existing parent, it is correctly detected and handled. +func TestNonExistingParentAttack(t *testing.T) { + // Forge a single-link chain with a forged header + hashes := createHashes(0, 1) + blocks := createBlocksFromHashes(hashes) + + forged := blocks[hashes[0]] + forged.ParentHeaderHash = unknownHash + + // Try and sync with the malicious node and check that it fails + tester := newTester(t, hashes, blocks) + tester.newPeer("attack", big.NewInt(10000), hashes[0]) + if err := tester.sync("attack", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + bs := tester.downloader.TakeBlocks() + if len(bs) != 1 { + t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) + } + if tester.hasBlock(bs[0].ParentHash()) { + t.Fatalf("tester knows about the unknown hash") + } + tester.downloader.Cancel() + + // Reconstruct a valid chain, and try to synchronize with it + forged.ParentHeaderHash = knownHash + tester.newPeer("valid", big.NewInt(20000), hashes[0]) + if err := tester.sync("valid", hashes[0]); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + bs = tester.downloader.TakeBlocks() + if len(bs) != 1 { + t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1) + } + if !tester.hasBlock(bs[0].ParentHash()) { + t.Fatalf("tester doesn't know about the origin hash") + } +} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 40749698c..6ad915757 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -172,17 +172,11 @@ func (q *queue) GetBlock(hash common.Hash) *types.Block { } // TakeBlocks retrieves and permanently removes a batch of blocks from the cache. -// The head parameter is required to prevent a race condition where concurrent -// takes may fail parent verifications. -func (q *queue) TakeBlocks(head *types.Block) types.Blocks { +func (q *queue) TakeBlocks() types.Blocks { q.lock.Lock() defer q.lock.Unlock() - // Short circuit if the head block's different - if len(q.blockCache) == 0 || q.blockCache[0] != head { - return nil - } - // Otherwise accumulate all available blocks + // Accumulate all available blocks var blocks types.Blocks for _, block := range q.blockCache { if block == nil { diff --git a/eth/sync.go b/eth/sync.go index 00b571782..c89f34596 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -2,6 +2,7 @@ package eth import ( "math" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/eth/downloader" @@ -14,6 +15,7 @@ import ( func (pm *ProtocolManager) update() { forceSync := time.Tick(forceSyncCycle) blockProc := time.Tick(blockProcCycle) + blockProcPend := int32(0) for { select { @@ -36,7 +38,12 @@ func (pm *ProtocolManager) update() { } case <-blockProc: // Try to pull some blocks from the downloaded - go pm.processBlocks() + if atomic.CompareAndSwapInt32(&blockProcPend, 0, 1) { + go func() { + pm.processBlocks() + atomic.StoreInt32(&blockProcPend, 0) + }() + } case <-pm.quitSync: return @@ -52,7 +59,7 @@ func (pm *ProtocolManager) processBlocks() error { pm.wg.Add(1) defer pm.wg.Done() - // Take a batch of blocks (will return nil if a previous batch has not reached the chain yet) + // Short circuit if no blocks are available for insertion blocks := pm.downloader.TakeBlocks() if len(blocks) == 0 { return nil @@ -63,9 +70,8 @@ func (pm *ProtocolManager) processBlocks() error { max := int(math.Min(float64(len(blocks)), float64(blockProcAmount))) _, err := pm.chainman.InsertChain(blocks[:max]) if err != nil { - // cancel download process + glog.V(logger.Warn).Infof("Block insertion failed: %v", err) pm.downloader.Cancel() - return err } blocks = blocks[max:]