Make sealing synchronous

Fix private tx receipt setting on tasks
Remove now unnecessary locking when writing state
Move delay creation outside of goroutine to mimic Clique sealing
Handle errors appropriately
This commit is contained in:
Peter Fox 2019-09-18 09:27:58 +01:00
parent d130ff6fb2
commit 2dd2a28f92
No known key found for this signature in database
GPG Key ID: FB5F75BD7CCBB092
3 changed files with 86 additions and 75 deletions

View File

@ -416,42 +416,45 @@ func (sb *backend) Seal(chain consensus.ChainReader, block *types.Block, results
return err
}
// wait for the timestamp of header, use this to adjust the block period
delay := time.Unix(block.Header().Time.Int64(), 0).Sub(now())
select {
case <-time.After(delay):
case <-stop:
results <- nil
return nil
}
delay := time.Unix(header.Time.Int64(), 0).Sub(now())
// get the proposed block hash and clear it if the seal() is completed.
sb.sealMu.Lock()
sb.proposedBlockHash = block.Hash()
clear := func() {
sb.proposedBlockHash = common.Hash{}
sb.sealMu.Unlock()
}
defer clear()
// post block into Istanbul engine
go sb.EventMux().Post(istanbul.RequestEvent{
Proposal: block,
})
for {
go func() {
// wait for the timestamp of header, use this to adjust the block period
select {
case result := <-sb.commitCh:
// if the block hash and the hash from channel are the same,
// return the result. Otherwise, keep waiting the next hash.
if result != nil && block.Hash() == result.Hash() {
results <- result
return nil
}
case <-time.After(delay):
case <-stop:
results <- nil
return nil
return
}
}
// get the proposed block hash and clear it if the seal() is completed.
sb.sealMu.Lock()
sb.proposedBlockHash = block.Hash()
defer func() {
sb.proposedBlockHash = common.Hash{}
sb.sealMu.Unlock()
}()
// post block into Istanbul engine
go sb.EventMux().Post(istanbul.RequestEvent{
Proposal: block,
})
for {
select {
case result := <-sb.commitCh:
// if the block hash and the hash from channel are the same,
// return the result. Otherwise, keep waiting the next hash.
if result != nil && block.Hash() == result.Hash() {
results <- result
return
}
case <-stop:
results <- nil
return
}
}
}()
return nil
}
// update timestamp and signature of the block based on its number of transactions

View File

@ -195,30 +195,44 @@ func TestSealCommittedOtherHash(t *testing.T) {
chain, engine := newBlockChain(4)
block := makeBlockWithoutSeal(chain, engine, chain.Genesis())
otherBlock := makeBlockWithoutSeal(chain, engine, block)
expectedCommittedSeal := append([]byte{1, 2, 3}, bytes.Repeat([]byte{0x00}, types.IstanbulExtraSeal-3)...)
eventSub := engine.EventMux().Subscribe(istanbul.RequestEvent{})
eventLoop := func() {
blockOutputChannel := make(chan *types.Block)
stopChannel := make(chan struct{})
go func() {
select {
case ev := <-eventSub.Chan():
_, ok := ev.Data.(istanbul.RequestEvent)
if !ok {
if _, ok := ev.Data.(istanbul.RequestEvent); !ok {
t.Errorf("unexpected event comes: %v", reflect.TypeOf(ev.Data))
}
engine.Commit(otherBlock, [][]byte{})
if err := engine.Commit(otherBlock, [][]byte{expectedCommittedSeal}); err != nil {
t.Error(err.Error())
}
}
eventSub.Unsubscribe()
}
go eventLoop()
seal := func() {
engine.Seal(chain, block, nil, make(chan struct{}))
t.Error("seal should not be completed")
}
go seal()
}()
go func() {
if err := engine.Seal(chain, block, blockOutputChannel, stopChannel); err != nil {
t.Error(err.Error())
}
}()
const timeoutDura = 2 * time.Second
timeout := time.NewTimer(timeoutDura)
select {
case <-timeout.C:
// wait 2 seconds to ensure we cannot get any blocks from Istanbul
case <-blockOutputChannel:
t.Error("Wrong block found!")
default:
//no block found, stop the sealing
close(stopChannel)
}
select {
case output := <-blockOutputChannel:
if output != nil {
t.Error("Block not nil!")
}
}
}

View File

@ -544,7 +544,10 @@ func (w *worker) taskLoop() {
w.pendingMu.Lock()
w.pendingTasks[w.engine.SealHash(task.block.Header())] = task
w.pendingMu.Unlock()
go w.seal(task.block, stopCh)
if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
log.Warn("Block sealing failed", "err", err)
}
case <-w.exitCh:
interrupt()
return
@ -552,12 +555,6 @@ func (w *worker) taskLoop() {
}
}
func (w *worker) seal(b *types.Block, stop <-chan struct{}) {
if err := w.engine.Seal(w.chain, b, w.resultCh, stop); err != nil {
log.Warn("Block sealing failed", "err", err)
}
}
// resultLoop is a standalone goroutine to handle sealing result submitting
// and flush relative data to the database.
func (w *worker) resultLoop() {
@ -585,39 +582,38 @@ func (w *worker) resultLoop() {
}
// Different block could share same sealhash, deep copy here to prevent write-write conflict.
var logs []*types.Log
work := w.current
for _, receipt := range append(work.receipts, work.privateReceipts...) {
for _, receipt := range append(task.receipts, task.privateReceipts...) {
// Update the block hash in all logs since it is now available and not when the
// receipt/log of individual transactions were created.
for _, log := range receipt.Logs {
log.BlockHash = hash
}
logs = append(logs, receipt.Logs...)
}
for _, log := range append(work.state.Logs(), work.privateState.Logs()...) {
log.BlockHash = hash
}
// write private transacions
privateStateRoot, _ := work.privateState.Commit(w.config.IsEIP158(block.Number()))
core.WritePrivateStateRoot(w.eth.ChainDb(), block.Root(), privateStateRoot)
allReceipts := mergeReceipts(work.receipts, work.privateReceipts)
// Commit block and state to database.
w.mu.Lock()
stat, err := w.chain.WriteBlockWithState(block, allReceipts, work.state, nil)
w.mu.Unlock()
// write private transactions
privateStateRoot, err := task.privateState.Commit(w.config.IsEIP158(block.Number()))
if err != nil {
log.Error("Failed writWriteBlockAndStating block to chain", "err", err)
log.Error("Failed committing private state root", "err", err)
continue
}
if err := core.WritePrivateStateRoot(w.eth.ChainDb(), block.Root(), privateStateRoot); err != nil {
log.Error("Failed writing private state root", "err", err)
continue
}
allReceipts := mergeReceipts(task.receipts, task.privateReceipts)
if err := core.WritePrivateBlockBloom(w.eth.ChainDb(), block.NumberU64(), work.privateReceipts); err != nil {
// Commit block and state to database.
stat, err := w.chain.WriteBlockWithState(block, allReceipts, task.state, nil)
if err != nil {
log.Error("Failed writing block to chain", "err", err)
continue
}
if err := core.WritePrivateBlockBloom(w.eth.ChainDb(), block.NumberU64(), task.privateReceipts); err != nil {
log.Error("Failed writing private block bloom", "err", err)
continue
}
log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
"elapsed", common.PrettyDuration(time.Since(task.createdAt)))
@ -625,8 +621,6 @@ func (w *worker) resultLoop() {
w.mux.Post(core.NewMinedBlockEvent{Block: block})
var events []interface{}
logs = append(work.state.Logs(), work.privateState.Logs()...)
switch stat {
case core.CanonStatTy:
events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
@ -1023,8 +1017,8 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st
privateReceipts := make([]*types.Receipt, len(w.current.privateReceipts))
for i, l := range w.current.privateReceipts {
receipts[i] = new(types.Receipt)
*receipts[i] = *l
privateReceipts[i] = new(types.Receipt)
*privateReceipts[i] = *l
}
s := w.current.state.Copy()