Merge pull request #1141 from obscuren/parallelisation-issue

Parallelisation issue
This commit is contained in:
Jeffrey Wilcke 2015-05-28 07:37:37 -07:00
commit b6e137b2b4
2 changed files with 58 additions and 25 deletions

View File

@ -6,4 +6,5 @@ import "github.com/ethereum/go-ethereum/common"
var BadHashes = map[common.Hash]bool{ var BadHashes = map[common.Hash]bool{
common.HexToHash("f269c503aed286caaa0d114d6a5320e70abbc2febe37953207e76a2873f2ba79"): true, common.HexToHash("f269c503aed286caaa0d114d6a5320e70abbc2febe37953207e76a2873f2ba79"): true,
common.HexToHash("38f5bbbffd74804820ffa4bab0cd540e9de229725afb98c1a7e57936f4a714bc"): true, common.HexToHash("38f5bbbffd74804820ffa4bab0cd540e9de229725afb98c1a7e57936f4a714bc"): true,
common.HexToHash("7064455b364775a16afbdecd75370e912c6e2879f202eda85b9beae547fff3ac"): true,
} }

View File

@ -548,18 +548,21 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
tstart = time.Now() tstart = time.Now()
) )
// check the nonce in parallel to the block processing
// this speeds catching up significantly
nonceErrCh := make(chan error)
go func() {
nonceErrCh <- verifyNonces(self.pow, chain)
}()
for i, block := range chain { for i, block := range chain {
if block == nil { if block == nil {
continue continue
} }
if BadHashes[block.Hash()] {
err := fmt.Errorf("Found known bad hash in chain %x", block.Hash())
blockErr(block, err)
return i, err
}
// create a nonce channel for parallisation of the nonce check
nonceErrCh := make(chan error)
go verifyBlockNonce(self.pow, block, nonceErrCh)
// Setting block.Td regardless of error (known for example) prevents errors down the line // Setting block.Td regardless of error (known for example) prevents errors down the line
// in the protocol handler // in the protocol handler
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash()))) block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
@ -568,13 +571,14 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
// all others will fail too (unless a known block is returned). // all others will fail too (unless a known block is returned).
logs, err := self.processor.Process(block) logs, err := self.processor.Process(block)
if err != nil { if err != nil {
// empty the nonce channel
<-nonceErrCh
if IsKnownBlockErr(err) { if IsKnownBlockErr(err) {
stats.ignored++ stats.ignored++
continue continue
} }
// Do not penelise on future block. We'll need a block queue eventually that will queue
// future block for future use
if err == BlockFutureErr { if err == BlockFutureErr {
block.SetQueued(true) block.SetQueued(true)
self.futureBlocks.Push(block) self.futureBlocks.Push(block)
@ -593,18 +597,23 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
return i, err return i, err
} }
// Wait and check nonce channel and make sure it checks out fine
// otherwise return the error
if err := <-nonceErrCh; err != nil {
return i, err
}
cblock := self.currentBlock cblock := self.currentBlock
// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
// not in the canonical chain.
self.write(block)
// Compare the TD of the last known block in the canonical chain to make sure it's greater. // Compare the TD of the last known block in the canonical chain to make sure it's greater.
// At this point it's possible that a different chain (fork) becomes the new canonical chain. // At this point it's possible that a different chain (fork) becomes the new canonical chain.
if block.Td.Cmp(self.td) > 0 { if block.Td.Cmp(self.td) > 0 {
// chain fork // chain fork
if block.ParentHash() != cblock.Hash() { if block.ParentHash() != cblock.Hash() {
// during split we merge two different chains and create the new canonical chain // during split we merge two different chains and create the new canonical chain
self.merge(cblock, block) err := self.merge(cblock, block)
if err != nil {
return i, err
}
queue[i] = ChainSplitEvent{block, logs} queue[i] = ChainSplitEvent{block, logs}
queueEvent.splitCount++ queueEvent.splitCount++
@ -637,19 +646,16 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
queue[i] = ChainSideEvent{block, logs} queue[i] = ChainSideEvent{block, logs}
queueEvent.sideCount++ queueEvent.sideCount++
} }
// Write block to database. Eventually we'll have to improve on this and throw away blocks that are
// not in the canonical chain.
self.write(block)
// Delete from future blocks
self.futureBlocks.Delete(block.Hash()) self.futureBlocks.Delete(block.Hash())
stats.processed++ stats.processed++
} }
// check and wait for the nonce error channel and
// make sure no nonce error was thrown in the process
err := <-nonceErrCh
if err != nil {
return 0, err
}
if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) { if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) {
tend := time.Since(tstart) tend := time.Since(tstart)
start, end := chain[0], chain[len(chain)-1] start, end := chain[0], chain[len(chain)-1]
@ -663,7 +669,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
// diff takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them // diff takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
// to be part of the new canonical chain. // to be part of the new canonical chain.
func (self *ChainManager) diff(oldBlock, newBlock *types.Block) types.Blocks { func (self *ChainManager) diff(oldBlock, newBlock *types.Block) (types.Blocks, error) {
var ( var (
newChain types.Blocks newChain types.Blocks
commonBlock *types.Block commonBlock *types.Block
@ -675,10 +681,17 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) types.Blocks {
if oldBlock.NumberU64() > newBlock.NumberU64() { if oldBlock.NumberU64() > newBlock.NumberU64() {
// reduce old chain // reduce old chain
for oldBlock = oldBlock; oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash()) { for oldBlock = oldBlock; oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash()) {
if oldBlock == nil {
return nil, fmt.Errorf("Invalid old chain")
}
} }
} else { } else {
// reduce new chain and append new chain blocks for inserting later on // reduce new chain and append new chain blocks for inserting later on
for newBlock = newBlock; newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = self.GetBlock(newBlock.ParentHash()) { for newBlock = newBlock; newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = self.GetBlock(newBlock.ParentHash()) {
if newBlock == nil {
return nil, fmt.Errorf("Invalid new chain")
}
newChain = append(newChain, newBlock) newChain = append(newChain, newBlock)
} }
} }
@ -692,6 +705,12 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) types.Blocks {
newChain = append(newChain, newBlock) newChain = append(newChain, newBlock)
oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash()), self.GetBlock(newBlock.ParentHash()) oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash()), self.GetBlock(newBlock.ParentHash())
if oldBlock == nil {
return nil, fmt.Errorf("Invalid old chain")
}
if newBlock == nil {
return nil, fmt.Errorf("Invalid new chain")
}
} }
if glog.V(logger.Info) { if glog.V(logger.Info) {
@ -699,17 +718,22 @@ func (self *ChainManager) diff(oldBlock, newBlock *types.Block) types.Blocks {
glog.Infof("Fork detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4]) glog.Infof("Fork detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4])
} }
return newChain return newChain, nil
} }
// merge merges two different chain to the new canonical chain // merge merges two different chain to the new canonical chain
func (self *ChainManager) merge(oldBlock, newBlock *types.Block) { func (self *ChainManager) merge(oldBlock, newBlock *types.Block) error {
newChain := self.diff(oldBlock, newBlock) newChain, err := self.diff(oldBlock, newBlock)
if err != nil {
return fmt.Errorf("chain reorg failed: %v", err)
}
// insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly // insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly
for _, block := range newChain { for _, block := range newChain {
self.insert(block) self.insert(block)
} }
return nil
} }
func (self *ChainManager) update() { func (self *ChainManager) update() {
@ -802,9 +826,17 @@ func verifyNonces(pow pow.PoW, blocks []*types.Block) error {
func verifyNonce(pow pow.PoW, in <-chan *types.Block, done chan<- error) { func verifyNonce(pow pow.PoW, in <-chan *types.Block, done chan<- error) {
for block := range in { for block := range in {
if !pow.Verify(block) { if !pow.Verify(block) {
done <- ValidationError("Block(#%v) nonce is invalid (= %x)", block.Number(), block.Nonce) done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce)
} else { } else {
done <- nil done <- nil
} }
} }
} }
func verifyBlockNonce(pow pow.PoW, block *types.Block, done chan<- error) {
if !pow.Verify(block) {
done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce)
} else {
done <- nil
}
}