From dc116850b4a87109d8fad15278bdba86e904363e Mon Sep 17 00:00:00 2001 From: "amalraj.manigmail.com" Date: Fri, 5 Oct 2018 15:28:50 +0800 Subject: [PATCH] miner, istanbul/backend : make seal async --- consensus/istanbul/backend/engine.go | 52 ++++++++++++++-------------- miner/worker.go | 21 ++++++----- 2 files changed, 39 insertions(+), 34 deletions(-) diff --git a/consensus/istanbul/backend/engine.go b/consensus/istanbul/backend/engine.go index 9ef64281b..47aba9d11 100644 --- a/consensus/istanbul/backend/engine.go +++ b/consensus/istanbul/backend/engine.go @@ -424,35 +424,35 @@ func (sb *backend) Seal(chain consensus.ChainReader, block *types.Block, results results <- nil return nil } - go func() { - // get the proposed block hash and clear it if the seal() is completed. - sb.sealMu.Lock() - sb.proposedBlockHash = block.Hash() - clear := func() { - sb.proposedBlockHash = common.Hash{} - sb.sealMu.Unlock() - } - defer clear() - // post block into Istanbul engine - go sb.EventMux().Post(istanbul.RequestEvent{ - Proposal: block, - }) - for { - select { - case result := <-sb.commitCh: - // if the block hash and the hash from channel are the same, - // return the result. Otherwise, keep waiting the next hash. - if block.Hash() == result.Hash() { - results <- result - return - } - case <-stop: - results <- nil - return + // get the proposed block hash and clear it if the seal() is completed. + sb.sealMu.Lock() + sb.proposedBlockHash = block.Hash() + clear := func() { + sb.proposedBlockHash = common.Hash{} + sb.sealMu.Unlock() + } + defer clear() + + // post block into Istanbul engine + go sb.EventMux().Post(istanbul.RequestEvent{ + Proposal: block, + }) + for { + select { + case result := <-sb.commitCh: + // if the block hash and the hash from channel are the same, + // return the result. Otherwise, keep waiting the next hash. + if block.Hash() == result.Hash() { + results <- result + return nil } + case <-stop: + results <- nil + return nil } - }() + } + return nil } diff --git a/miner/worker.go b/miner/worker.go index d5bf09444..5573569f1 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -544,9 +544,7 @@ func (w *worker) taskLoop() { w.pendingMu.Lock() w.pendingTasks[w.engine.SealHash(task.block.Header())] = task w.pendingMu.Unlock() - if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil { - log.Warn("Block sealing failed", "err", err) - } + go w.seal(task.block, stopCh) case <-w.exitCh: interrupt() return @@ -554,6 +552,12 @@ func (w *worker) taskLoop() { } } +func (w *worker) seal(b *types.Block, stop <-chan struct{}) { + if err := w.engine.Seal(w.chain, b, w.resultCh, stop); err != nil { + log.Warn("Block sealing failed", "err", err) + } +} + // resultLoop is a standalone goroutine to handle sealing result submitting // and flush relative data to the database. func (w *worker) resultLoop() { @@ -581,8 +585,9 @@ func (w *worker) resultLoop() { } // Different block could share same sealhash, deep copy here to prevent write-write conflict. var logs []*types.Log + work := w.current - for _, receipt := range append(task.receipts, task.privateReceipts...) { + for _, receipt := range append(work.receipts, work.privateReceipts...) { // Update the block hash in all logs since it is now available and not when the // receipt/log of individual transactions were created. for _, log := range receipt.Logs { @@ -590,14 +595,14 @@ func (w *worker) resultLoop() { } } - for _, log := range append(task.state.Logs(), task.privateState.Logs()...) { + for _, log := range append(work.state.Logs(), work.privateState.Logs()...) { log.BlockHash = hash } // write private transacions - privateStateRoot, _ := task.privateState.Commit(w.config.IsEIP158(block.Number())) + privateStateRoot, _ := work.privateState.Commit(w.config.IsEIP158(block.Number())) core.WritePrivateStateRoot(w.eth.ChainDb(), block.Root(), privateStateRoot) - allReceipts := mergeReceipts(task.receipts, task.privateReceipts) + allReceipts := mergeReceipts(work.receipts, work.privateReceipts) // Commit block and state to database. @@ -613,7 +618,7 @@ func (w *worker) resultLoop() { w.mux.Post(core.NewMinedBlockEvent{Block: block}) var events []interface{} - logs = append(task.state.Logs(), task.privateState.Logs()...) + logs = append(work.state.Logs(), work.privateState.Logs()...) switch stat { case core.CanonStatTy: