Merge pull request #1151 from fjl/parallel-nonce-2

core: re-add parallel nonce checks
This commit is contained in:
Jeffrey Wilcke 2015-06-03 09:12:06 -07:00
commit 71d9367edc
1 changed files with 49 additions and 67 deletions

View File

@ -522,13 +522,14 @@ type queueEvent struct {
} }
func (self *ChainManager) procFutureBlocks() { func (self *ChainManager) procFutureBlocks() {
blocks := []*types.Block{} var blocks []*types.Block
self.futureBlocks.Each(func(i int, block *types.Block) { self.futureBlocks.Each(func(i int, block *types.Block) {
blocks = append(blocks, block) blocks = append(blocks, block)
}) })
if len(blocks) > 0 {
types.BlockBy(types.Number).Sort(blocks) types.BlockBy(types.Number).Sort(blocks)
self.InsertChain(blocks) self.InsertChain(blocks)
}
} }
// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned // InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
@ -540,17 +541,34 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
self.chainmu.Lock() self.chainmu.Lock()
defer self.chainmu.Unlock() defer self.chainmu.Unlock()
// A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring. // A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex
// acquiring.
var ( var (
queue = make([]interface{}, len(chain)) queue = make([]interface{}, len(chain))
queueEvent = queueEvent{queue: queue} queueEvent = queueEvent{queue: queue}
stats struct{ queued, processed, ignored int } stats struct{ queued, processed, ignored int }
tstart = time.Now() tstart = time.Now()
nonceDone = make(chan nonceResult, len(chain))
nonceQuit = make(chan struct{})
nonceChecked = make([]bool, len(chain))
) )
// Start the parallel nonce verifier.
go verifyNonces(self.pow, chain, nonceQuit, nonceDone)
defer close(nonceQuit)
for i, block := range chain { for i, block := range chain {
if block == nil { // Wait for block i's nonce to be verified before processing
continue // its state transition.
for nonceChecked[i] {
r := <-nonceDone
nonceChecked[r.i] = true
if !r.valid {
block := chain[i]
return i, ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce)
}
} }
if BadHashes[block.Hash()] { if BadHashes[block.Hash()] {
@ -559,10 +577,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
return i, err return i, err
} }
// create a nonce channel for parallisation of the nonce check
nonceErrCh := make(chan error)
go verifyBlockNonce(self.pow, block, nonceErrCh)
// Setting block.Td regardless of error (known for example) prevents errors down the line // Setting block.Td regardless of error (known for example) prevents errors down the line
// in the protocol handler // in the protocol handler
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash()))) block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
@ -571,9 +585,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
// all others will fail too (unless a known block is returned). // all others will fail too (unless a known block is returned).
logs, err := self.processor.Process(block) logs, err := self.processor.Process(block)
if err != nil { if err != nil {
// empty the nonce channel
<-nonceErrCh
if IsKnownBlockErr(err) { if IsKnownBlockErr(err) {
stats.ignored++ stats.ignored++
continue continue
@ -597,11 +608,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
return i, err return i, err
} }
// Wait and check nonce channel and make sure it checks out fine
// otherwise return the error
if err := <-nonceErrCh; err != nil {
return i, err
}
cblock := self.currentBlock cblock := self.currentBlock
// Compare the TD of the last known block in the canonical chain to make sure it's greater. // Compare the TD of the last known block in the canonical chain to make sure it's greater.
@ -776,66 +782,42 @@ func blockErr(block *types.Block, err error) {
h := block.Header() h := block.Header()
glog.V(logger.Error).Infof("Bad block #%v (%x)\n", h.Number, h.Hash().Bytes()) glog.V(logger.Error).Infof("Bad block #%v (%x)\n", h.Number, h.Hash().Bytes())
glog.V(logger.Error).Infoln(err) glog.V(logger.Error).Infoln(err)
glog.V(logger.Debug).Infoln(block) glog.V(logger.Debug).Infoln(verifyNonces)
} }
// verifyNonces verifies nonces of the given blocks in parallel and returns type nonceResult struct {
i int
valid bool
}
// block verifies nonces of the given blocks in parallel and returns
// an error if one of the blocks nonce verifications failed. // an error if one of the blocks nonce verifications failed.
func verifyNonces(pow pow.PoW, blocks []*types.Block) error { func verifyNonces(pow pow.PoW, blocks []*types.Block, quit <-chan struct{}, done chan<- nonceResult) {
// Spawn a few workers. They listen for blocks on the in channel // Spawn a few workers. They listen for blocks on the in channel
// and send results on done. The workers will exit in the // and send results on done. The workers will exit in the
// background when in is closed. // background when in is closed.
var ( var (
in = make(chan *types.Block) in = make(chan int)
done = make(chan error, runtime.GOMAXPROCS(0)) nworkers = runtime.GOMAXPROCS(0)
) )
defer close(in) defer close(in)
for i := 0; i < cap(done); i++ { if len(blocks) < nworkers {
go verifyNonce(pow, in, done) nworkers = len(blocks)
} }
// Feed blocks to the workers, aborting at the first invalid nonce. for i := 0; i < nworkers; i++ {
var ( go func() {
running, i int for i := range in {
block *types.Block done <- nonceResult{i: i, valid: pow.Verify(blocks[i])}
sendin = in
)
for i < len(blocks) || running > 0 {
if i == len(blocks) {
// Disable sending to in.
sendin = nil
} else {
block = blocks[i]
i++
}
select {
case sendin <- block:
running++
case err := <-done:
running--
if err != nil {
return err
} }
} }()
} }
return nil // Feed block indices to the workers.
} for i := range blocks {
select {
// verifyNonce is a worker for the verifyNonces method. It will run until case in <- i:
// in is closed. continue
func verifyNonce(pow pow.PoW, in <-chan *types.Block, done chan<- error) { case <-quit:
for block := range in { return
if !pow.Verify(block) {
done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce)
} else {
done <- nil
} }
} }
} }
func verifyBlockNonce(pow pow.PoW, block *types.Block, done chan<- error) {
if !pow.Verify(block) {
done <- ValidationError("Block (#%v / %x) nonce is invalid (= %x)", block.Number(), block.Hash(), block.Nonce)
} else {
done <- nil
}
}