miner, istanbul/backend : make seal async

This commit is contained in:
amalraj.manigmail.com 2018-10-05 15:28:50 +08:00
parent 2c4081d89f
commit dc116850b4
2 changed files with 39 additions and 34 deletions

View File

@ -424,35 +424,35 @@ func (sb *backend) Seal(chain consensus.ChainReader, block *types.Block, results
results <- nil results <- nil
return nil return nil
} }
go func() {
// get the proposed block hash and clear it if the seal() is completed.
sb.sealMu.Lock()
sb.proposedBlockHash = block.Hash()
clear := func() {
sb.proposedBlockHash = common.Hash{}
sb.sealMu.Unlock()
}
defer clear()
// post block into Istanbul engine // get the proposed block hash and clear it if the seal() is completed.
go sb.EventMux().Post(istanbul.RequestEvent{ sb.sealMu.Lock()
Proposal: block, sb.proposedBlockHash = block.Hash()
}) clear := func() {
for { sb.proposedBlockHash = common.Hash{}
select { sb.sealMu.Unlock()
case result := <-sb.commitCh: }
// if the block hash and the hash from channel are the same, defer clear()
// return the result. Otherwise, keep waiting the next hash.
if block.Hash() == result.Hash() { // post block into Istanbul engine
results <- result go sb.EventMux().Post(istanbul.RequestEvent{
return Proposal: block,
} })
case <-stop: for {
results <- nil select {
return case result := <-sb.commitCh:
// if the block hash and the hash from channel are the same,
// return the result. Otherwise, keep waiting the next hash.
if block.Hash() == result.Hash() {
results <- result
return nil
} }
case <-stop:
results <- nil
return nil
} }
}() }
return nil return nil
} }

View File

@ -544,9 +544,7 @@ func (w *worker) taskLoop() {
w.pendingMu.Lock() w.pendingMu.Lock()
w.pendingTasks[w.engine.SealHash(task.block.Header())] = task w.pendingTasks[w.engine.SealHash(task.block.Header())] = task
w.pendingMu.Unlock() w.pendingMu.Unlock()
if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil { go w.seal(task.block, stopCh)
log.Warn("Block sealing failed", "err", err)
}
case <-w.exitCh: case <-w.exitCh:
interrupt() interrupt()
return return
@ -554,6 +552,12 @@ func (w *worker) taskLoop() {
} }
} }
func (w *worker) seal(b *types.Block, stop <-chan struct{}) {
if err := w.engine.Seal(w.chain, b, w.resultCh, stop); err != nil {
log.Warn("Block sealing failed", "err", err)
}
}
// resultLoop is a standalone goroutine to handle sealing result submitting // resultLoop is a standalone goroutine to handle sealing result submitting
// and flush relative data to the database. // and flush relative data to the database.
func (w *worker) resultLoop() { func (w *worker) resultLoop() {
@ -581,8 +585,9 @@ func (w *worker) resultLoop() {
} }
// Different block could share same sealhash, deep copy here to prevent write-write conflict. // Different block could share same sealhash, deep copy here to prevent write-write conflict.
var logs []*types.Log var logs []*types.Log
work := w.current
for _, receipt := range append(task.receipts, task.privateReceipts...) { for _, receipt := range append(work.receipts, work.privateReceipts...) {
// Update the block hash in all logs since it is now available and not when the // Update the block hash in all logs since it is now available and not when the
// receipt/log of individual transactions were created. // receipt/log of individual transactions were created.
for _, log := range receipt.Logs { for _, log := range receipt.Logs {
@ -590,14 +595,14 @@ func (w *worker) resultLoop() {
} }
} }
for _, log := range append(task.state.Logs(), task.privateState.Logs()...) { for _, log := range append(work.state.Logs(), work.privateState.Logs()...) {
log.BlockHash = hash log.BlockHash = hash
} }
// write private transacions // write private transacions
privateStateRoot, _ := task.privateState.Commit(w.config.IsEIP158(block.Number())) privateStateRoot, _ := work.privateState.Commit(w.config.IsEIP158(block.Number()))
core.WritePrivateStateRoot(w.eth.ChainDb(), block.Root(), privateStateRoot) core.WritePrivateStateRoot(w.eth.ChainDb(), block.Root(), privateStateRoot)
allReceipts := mergeReceipts(task.receipts, task.privateReceipts) allReceipts := mergeReceipts(work.receipts, work.privateReceipts)
// Commit block and state to database. // Commit block and state to database.
@ -613,7 +618,7 @@ func (w *worker) resultLoop() {
w.mux.Post(core.NewMinedBlockEvent{Block: block}) w.mux.Post(core.NewMinedBlockEvent{Block: block})
var events []interface{} var events []interface{}
logs = append(task.state.Logs(), task.privateState.Logs()...) logs = append(work.state.Logs(), work.privateState.Logs()...)
switch stat { switch stat {
case core.CanonStatTy: case core.CanonStatTy: