save block b4 apply; track stale apphash

This commit is contained in:
Ethan Buchman 2016-09-11 13:16:23 -04:00
parent fb9735ef46
commit 8ec1839f5d
4 changed files with 159 additions and 117 deletions

View File

@ -1255,33 +1255,34 @@ func (cs *ConsensusState) finalizeCommit(height int) {
"height", block.Height, "hash", block.Hash(), "root", block.AppHash)
log.Info(Fmt("%v", block))
// Fire off event for new block.
// TODO: Handle app failure. See #177
types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block})
types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header})
// Create a copy of the state for staging
stateCopy := cs.state.Copy()
// event cache for txs
eventCache := types.NewEventCache(cs.evsw)
// Execute and commit the block
// NOTE: All calls to the proxyAppConn should come here
stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
// txs committed, bad ones removed from mepool; fire events
// NOTE: the block.AppHash wont reflect these txs until the next block
eventCache.Flush()
// Save to blockStore.
if cs.blockStore.Height() < block.Height {
precommits := cs.Votes.Precommits(cs.CommitRound)
seenCommit := precommits.MakeCommit()
log.Notice("save block", "height", block.Height)
cs.blockStore.SaveBlock(block, blockParts, seenCommit)
} else {
log.Warn("Why are we finalizeCommitting a block height we already have?", "height", block.Height)
}
// Create a copy of the state for staging
// and an event cache for txs
stateCopy := cs.state.Copy()
// event cache for txs
eventCache := types.NewEventCache(cs.evsw)
// Execute and commit the block, and update the mempool.
// All calls to the proxyAppConn should come here.
// NOTE: the block.AppHash wont reflect these txs until the next block
stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
// Fire off event for new block.
// TODO: Handle app failure. See #177
types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block})
types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header})
eventCache.Flush()
// Save the state.
log.Notice("save state", "height", stateCopy.LastBlockHeight, "hash", stateCopy.AppHash)
stateCopy.Save()

View File

@ -282,8 +282,7 @@ func (mem *Mempool) collectTxs(maxTxs int) []types.Tx {
// NOTE: this should be called *after* block is committed by consensus.
// NOTE: unsafe; Lock/Unlock must be managed by caller
func (mem *Mempool) Update(height int, txs []types.Tx) {
// mem.proxyMtx.Lock()
// defer mem.proxyMtx.Unlock()
// TODO: check err ?
mem.proxyAppConn.FlushSync() // To flush async resCb calls e.g. from CheckTx
// First, create a lookup map of txns in new txs.

View File

@ -2,7 +2,6 @@ package state
import (
"errors"
"fmt"
. "github.com/tendermint/go-common"
"github.com/tendermint/tendermint/proxy"
@ -10,10 +9,13 @@ import (
tmsp "github.com/tendermint/tmsp/types"
)
// Validate block
func (s *State) ValidateBlock(block *types.Block) error {
return s.validateBlock(block)
}
//--------------------------------------------------
// Execute the block
type (
ErrInvalidBlock error
ErrProxyAppConn error
)
// Execute the block to mutate State.
// Validates block and then executes Data.Txs in the block.
@ -22,7 +24,7 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC
// Validate the block.
err := s.validateBlock(block)
if err != nil {
return err
return ErrInvalidBlock(err)
}
// Update the validator set
@ -37,7 +39,7 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC
if err != nil {
// There was some error in proxyApp
// TODO Report error and wait for proxyApp to be available.
return err
return ErrProxyAppConn(err)
}
// All good!
@ -45,6 +47,10 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC
nextValSet.IncrementAccum(1)
s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet)
// save state with updated height/blockhash/validators
// but stale apphash, in case we fail between Commit and Save
s.Save()
return nil
}
@ -113,33 +119,6 @@ func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn prox
return nil
}
func (s *State) validateBlock(block *types.Block) error {
// Basic block validation.
err := block.ValidateBasic(s.ChainID, s.LastBlockHeight, s.LastBlockID, s.LastBlockTime, s.AppHash)
if err != nil {
return err
}
// Validate block LastCommit.
if block.Height == 1 {
if len(block.LastCommit.Precommits) != 0 {
return errors.New("Block at height 1 (first block) should have no LastCommit precommits")
}
} else {
if len(block.LastCommit.Precommits) != s.LastValidators.Size() {
return fmt.Errorf("Invalid block commit size. Expected %v, got %v",
s.LastValidators.Size(), len(block.LastCommit.Precommits))
}
err := s.LastValidators.VerifyCommit(
s.ChainID, s.LastBlockID, block.Height-1, block.LastCommit)
if err != nil {
return err
}
}
return nil
}
// Updates the LastCommitHeight of the validators in valSet, in place.
// Assumes that lastValSet matches the valset of block.LastCommit
// CONTRACT: lastValSet is not mutated.
@ -168,18 +147,62 @@ func updateValidatorsWithBlock(lastValSet *types.ValidatorSet, valSet *types.Val
}
//-----------------------------------------------------------------------------
//-----------------------------------------------------
// Validate block
type InvalidTxError struct {
Tx types.Tx
Code tmsp.CodeType
func (s *State) ValidateBlock(block *types.Block) error {
return s.validateBlock(block)
}
func (txErr InvalidTxError) Error() string {
return Fmt("Invalid tx: [%v] code: [%v]", txErr.Tx, txErr.Code)
func (s *State) validateBlock(block *types.Block) error {
// Basic block validation.
err := block.ValidateBasic(s.ChainID, s.LastBlockHeight, s.LastBlockID, s.LastBlockTime, s.AppHash)
if err != nil {
return err
}
// Validate block LastCommit.
if block.Height == 1 {
if len(block.LastCommit.Precommits) != 0 {
return errors.New("Block at height 1 (first block) should have no LastCommit precommits")
}
} else {
if len(block.LastCommit.Precommits) != s.LastValidators.Size() {
return errors.New(Fmt("Invalid block commit size. Expected %v, got %v",
s.LastValidators.Size(), len(block.LastCommit.Precommits)))
}
err := s.LastValidators.VerifyCommit(
s.ChainID, s.LastBlockID, block.Height-1, block.LastCommit)
if err != nil {
return err
}
}
return nil
}
//-----------------------------------------------------------------------------
// ApplyBlock executes the block, then commits and updates the mempool atomically
// Execute and commit block against app, save block and state
func (s *State) ApplyBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus,
block *types.Block, partsHeader types.PartSetHeader, mempool Mempool) error {
// Run the block on the State:
// + update validator sets
// + run txs on the proxyAppConn
err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader)
if err != nil {
return errors.New(Fmt("Exec failed for application: %v", err))
}
// lock mempool, commit state, update mempoool
err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool)
if err != nil {
return errors.New(Fmt("Commit failed for application: %v", err))
}
return nil
}
// mempool must be locked during commit and update
// because state is typically reset on Commit and old txs must be replayed
@ -188,9 +211,6 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl
mempool.Lock()
defer mempool.Unlock()
// flush out any CheckTx that have already started
// cs.proxyAppConn.FlushSync() // ?! XXX
// Commit block, get hash back
res := proxyAppConn.CommitSync()
if res.IsErr() {
@ -210,25 +230,40 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl
return nil
}
// Execute and commit block against app, save block and state
func (s *State) ApplyBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus,
block *types.Block, partsHeader types.PartSetHeader, mempool Mempool) {
// Updates to the mempool need to be synchronized with committing a block
// so apps can reset their transient state on Commit
type Mempool interface {
Lock()
Unlock()
Update(height int, txs []types.Tx)
}
// Run the block on the State:
// + update validator sets
// + run txs on the proxyAppConn
err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader)
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Exec failed for application: %v", err))
}
type mockMempool struct {
}
// lock mempool, commit state, update mempoool
err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool)
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Commit failed for application: %v", err))
}
func (m mockMempool) Lock() {}
func (m mockMempool) Unlock() {}
func (m mockMempool) Update(height int, txs []types.Tx) {}
//----------------------------------------------------------------
// Replay blocks to sync app to latest state of core
type ErrAppBlockHeightTooHigh struct {
coreHeight int
appHeight int
}
func (e ErrAppBlockHeightTooHigh) Error() string {
return Fmt("App block height (%d) is higher than core (%d)", e.appHeight, e.coreHeight)
}
type ErrStateMismatch struct {
got *State
expected *State
}
func (e ErrStateMismatch) Error() string {
return Fmt("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", e.got, e.expected)
}
// Replay all blocks after blockHeight and ensure the result matches the current state.
@ -241,34 +276,45 @@ func (s *State) ReplayBlocks(appHash []byte, header *types.Header, partsHeader t
// it should save all eg. valset changes before calling Commit.
// then, if tm state is behind app state, the only thing missing can be app hash
// fresh state to work on
// get a fresh state and reset to the apps latest
stateCopy := s.Copy()
// reset to this height (do nothing if its 0)
var blockHeight int
if header != nil {
blockHeight = header.Height
// TODO: put validators in iavl tree so we can set the state with an older validator set
lastVals, nextVals := stateCopy.GetValidators()
stateCopy.SetBlockAndValidators(header, partsHeader, lastVals, nextVals)
stateCopy.Stale = false
stateCopy.AppHash = appHash
}
// run the transactions
var eventCache events.Fireable // nil
appBlockHeight := stateCopy.LastBlockHeight
coreBlockHeight := blockStore.Height()
if coreBlockHeight < appBlockHeight {
return ErrAppBlockHeightTooHigh{coreBlockHeight, appBlockHeight}
// replay all blocks starting with blockHeight+1
for i := blockHeight + 1; i <= blockStore.Height(); i++ {
blockMeta := blockStore.LoadBlockMeta(i)
block := blockStore.LoadBlock(i)
panicOnNilBlock(i, blockStore.Height(), block, blockMeta) // XXX
} else if coreBlockHeight == appBlockHeight {
// if we crashed between Commit and SaveState,
// the state's app hash is stale
if s.Stale {
s.Stale = false
s.AppHash = appHash
}
stateCopy.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{})
} else {
// the app is behind.
// replay all blocks starting with appBlockHeight+1
for i := appBlockHeight + 1; i <= coreBlockHeight; i++ {
blockMeta := blockStore.LoadBlockMeta(i)
block := blockStore.LoadBlock(i)
panicOnNilBlock(i, coreBlockHeight, block, blockMeta) // XXX
var eventCache events.Fireable // nil
stateCopy.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.PartsHeader, mockMempool{})
}
}
// The computed state and the previously set state should be identical
if !s.Equals(stateCopy) {
return fmt.Errorf("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", stateCopy, s)
return ErrStateMismatch{stateCopy, s}
}
return nil
}
@ -284,20 +330,3 @@ BlockMeta: %v
}
}
//------------------------------------------------
// Updates to the mempool need to be synchronized with committing a block
// so apps can reset their transient state on Commit
type Mempool interface {
Lock()
Unlock()
Update(height int, txs []types.Tx)
}
type mockMempool struct {
}
func (m mockMempool) Lock() {}
func (m mockMempool) Unlock() {}
func (m mockMempool) Update(height int, txs []types.Tx) {}

View File

@ -21,16 +21,25 @@ var (
// NOTE: not goroutine-safe.
type State struct {
mtx sync.Mutex
db dbm.DB
GenesisDoc *types.GenesisDoc
ChainID string
// mtx for writing to db
mtx sync.Mutex
db dbm.DB
// should not change
GenesisDoc *types.GenesisDoc
ChainID string
// updated at end of ExecBlock
LastBlockHeight int // Genesis state has this set to 0. So, Block(H=0) does not exist.
LastBlockID types.BlockID
LastBlockTime time.Time
Validators *types.ValidatorSet
LastValidators *types.ValidatorSet
AppHash []byte
// AppHash is updated after Commit;
// it's stale after ExecBlock and before Commit
Stale bool
AppHash []byte
}
func LoadState(db dbm.DB) *State {
@ -60,6 +69,7 @@ func (s *State) Copy() *State {
LastBlockTime: s.LastBlockTime,
Validators: s.Validators.Copy(),
LastValidators: s.LastValidators.Copy(),
Stale: s.Stale, // but really state shouldnt be copied while its stale
AppHash: s.AppHash,
}
}
@ -84,12 +94,15 @@ func (s *State) Bytes() []byte {
}
// Mutate state variables to match block and validators
// Since we don't have the AppHash yet, it becomes stale
func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) {
s.LastBlockHeight = header.Height
s.LastBlockID = types.BlockID{block.Hash(), blockPartsHeader}
s.LastBlockTime = header.Time
s.Validators = nextValSet
s.LastValidators = prevValSet
s.Stale = true
}
func (s *State) GetValidators() (*types.ValidatorSet, *types.ValidatorSet) {