comments and cleanup
This commit is contained in:
parent
5109746b1c
commit
935f70a346
|
@ -105,7 +105,7 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error {
|
||||||
// and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT).
|
// and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT).
|
||||||
gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight))
|
gr, found, err := cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight))
|
||||||
if found {
|
if found {
|
||||||
return errors.New(Fmt("WAL should not contain height %d.", csHeight))
|
return errors.New(Fmt("WAL should not contain #ENDHEIGHT %d.", csHeight))
|
||||||
}
|
}
|
||||||
if gr != nil {
|
if gr != nil {
|
||||||
gr.Close()
|
gr.Close()
|
||||||
|
@ -114,13 +114,13 @@ func (cs *ConsensusState) catchupReplay(csHeight int) error {
|
||||||
// Search for last height marker
|
// Search for last height marker
|
||||||
gr, found, err = cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight-1))
|
gr, found, err = cs.wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(csHeight-1))
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
log.Warn("Replay: wal.group.Search returned EOF", "height", csHeight-1)
|
log.Warn("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1)
|
||||||
return nil
|
return nil
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
return errors.New(Fmt("WAL does not contain height %d.", csHeight))
|
return errors.New(Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1))
|
||||||
}
|
}
|
||||||
defer gr.Close()
|
defer gr.Close()
|
||||||
|
|
||||||
|
@ -275,18 +275,18 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p
|
||||||
|
|
||||||
} else if appBlockHeight == stateBlockHeight {
|
} else if appBlockHeight == stateBlockHeight {
|
||||||
// We haven't run Commit (both the state and app are one block behind),
|
// We haven't run Commit (both the state and app are one block behind),
|
||||||
// so ApplyBlock with the real app.
|
// so replayBlock with the real app.
|
||||||
// NOTE: We could instead use the cs.WAL on cs.Start,
|
// NOTE: We could instead use the cs.WAL on cs.Start,
|
||||||
// but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT
|
// but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT
|
||||||
log.Info("Replay last block using real app")
|
log.Info("Replay last block using real app")
|
||||||
return h.replayLastBlock(storeBlockHeight, proxyApp.Consensus())
|
return h.replayBlock(storeBlockHeight, proxyApp.Consensus())
|
||||||
|
|
||||||
} else if appBlockHeight == storeBlockHeight {
|
} else if appBlockHeight == storeBlockHeight {
|
||||||
// We ran Commit, but didn't save the state, so ApplyBlock with mock app
|
// We ran Commit, but didn't save the state, so replayBlock with mock app
|
||||||
abciResponses := h.state.LoadABCIResponses()
|
abciResponses := h.state.LoadABCIResponses()
|
||||||
mockApp := newMockProxyApp(appHash, abciResponses)
|
mockApp := newMockProxyApp(appHash, abciResponses)
|
||||||
log.Info("Replay last block using mock app")
|
log.Info("Replay last block using mock app")
|
||||||
return h.replayLastBlock(storeBlockHeight, mockApp)
|
return h.replayBlock(storeBlockHeight, mockApp)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -295,18 +295,18 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) {
|
func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, mutateState bool) ([]byte, error) {
|
||||||
// App is further behind than it should be, so we need to replay blocks.
|
// App is further behind than it should be, so we need to replay blocks.
|
||||||
// We replay all blocks from appBlockHeight+1.
|
// We replay all blocks from appBlockHeight+1.
|
||||||
// If useReplayFunc == true, stop short of the last block
|
|
||||||
// so it can be replayed using the WAL in ReplayBlocks.
|
|
||||||
// Note that we don't have an old version of the state,
|
// Note that we don't have an old version of the state,
|
||||||
// so we by-pass state validation using sm.ApplyBlock.
|
// so we by-pass state validation/mutation using sm.ApplyBlock.
|
||||||
|
// If mutateState == true, stop short of the last block
|
||||||
|
// so it can be replayed with a real state.ApplyBlock
|
||||||
|
|
||||||
var appHash []byte
|
var appHash []byte
|
||||||
var err error
|
var err error
|
||||||
finalBlock := storeBlockHeight
|
finalBlock := storeBlockHeight
|
||||||
if useReplayFunc {
|
if mutateState {
|
||||||
finalBlock -= 1
|
finalBlock -= 1
|
||||||
}
|
}
|
||||||
for i := appBlockHeight + 1; i <= finalBlock; i++ {
|
for i := appBlockHeight + 1; i <= finalBlock; i++ {
|
||||||
|
@ -320,7 +320,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store
|
||||||
h.nBlocks += 1
|
h.nBlocks += 1
|
||||||
}
|
}
|
||||||
|
|
||||||
if useReplayFunc {
|
if mutateState {
|
||||||
// sync the final block
|
// sync the final block
|
||||||
return h.ReplayBlocks(appHash, finalBlock, proxyApp)
|
return h.ReplayBlocks(appHash, finalBlock, proxyApp)
|
||||||
}
|
}
|
||||||
|
@ -329,7 +329,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store
|
||||||
}
|
}
|
||||||
|
|
||||||
// ApplyBlock on the proxyApp with the last block.
|
// ApplyBlock on the proxyApp with the last block.
|
||||||
func (h *Handshaker) replayLastBlock(height int, proxyApp proxy.AppConnConsensus) ([]byte, error) {
|
func (h *Handshaker) replayBlock(height int, proxyApp proxy.AppConnConsensus) ([]byte, error) {
|
||||||
mempool := types.MockMempool{}
|
mempool := types.MockMempool{}
|
||||||
|
|
||||||
var eventCache types.Fireable // nil
|
var eventCache types.Fireable // nil
|
||||||
|
|
|
@ -1217,8 +1217,12 @@ func (cs *ConsensusState) finalizeCommit(height int) {
|
||||||
fail.Fail() // XXX
|
fail.Fail() // XXX
|
||||||
|
|
||||||
// Finish writing to the WAL for this height.
|
// Finish writing to the WAL for this height.
|
||||||
// NOTE: ConsensusState should not be started again
|
// NOTE: If we fail before writing this, we'll never write it,
|
||||||
// until we successfully call ApplyBlock (eg. in Handshake after restart)
|
// and just recover by running ApplyBlock in the Handshake.
|
||||||
|
// If we moved it before persisting the block, we'd have to allow
|
||||||
|
// WAL replay for blocks with an #ENDHEIGHT
|
||||||
|
// As is, ConsensusState should not be started again
|
||||||
|
// until we successfully call ApplyBlock (ie. here or in Handshake after restart)
|
||||||
if cs.wal != nil {
|
if cs.wal != nil {
|
||||||
cs.wal.writeEndHeight(height)
|
cs.wal.writeEndHeight(height)
|
||||||
}
|
}
|
||||||
|
@ -1244,9 +1248,10 @@ func (cs *ConsensusState) finalizeCommit(height int) {
|
||||||
// Fire event for new block.
|
// Fire event for new block.
|
||||||
// NOTE: If we fail before firing, these events will never fire
|
// NOTE: If we fail before firing, these events will never fire
|
||||||
//
|
//
|
||||||
// Some options (for which they may fire more than once. I guess that's fine):
|
// TODO: Either
|
||||||
// * Fire before persisting state, in ApplyBlock
|
// * Fire before persisting state, in ApplyBlock
|
||||||
// * Fire on start up if we haven't written any new WAL msgs
|
// * Fire on start up if we haven't written any new WAL msgs
|
||||||
|
// Both options mean we may fire more than once. Is that fine ?
|
||||||
types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block})
|
types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block})
|
||||||
types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header})
|
types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header})
|
||||||
eventCache.Flush()
|
eventCache.Flush()
|
||||||
|
@ -1256,6 +1261,8 @@ func (cs *ConsensusState) finalizeCommit(height int) {
|
||||||
// NewHeightStep!
|
// NewHeightStep!
|
||||||
cs.updateToState(stateCopy)
|
cs.updateToState(stateCopy)
|
||||||
|
|
||||||
|
fail.Fail() // XXX
|
||||||
|
|
||||||
// cs.StartTime is already set.
|
// cs.StartTime is already set.
|
||||||
// Schedule Round0 to start soon.
|
// Schedule Round0 to start soon.
|
||||||
cs.scheduleRound0(&cs.RoundState)
|
cs.scheduleRound0(&cs.RoundState)
|
||||||
|
|
|
@ -88,19 +88,14 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fail.Fail() // XXX
|
|
||||||
|
|
||||||
// Run txs of block
|
// Run txs of block
|
||||||
for _, tx := range block.Txs {
|
for _, tx := range block.Txs {
|
||||||
fail.FailRand(len(block.Txs)) // XXX
|
|
||||||
proxyAppConn.DeliverTxAsync(tx)
|
proxyAppConn.DeliverTxAsync(tx)
|
||||||
if err := proxyAppConn.Error(); err != nil {
|
if err := proxyAppConn.Error(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fail.Fail() // XXX
|
|
||||||
|
|
||||||
// End block
|
// End block
|
||||||
abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(uint64(block.Height))
|
abciResponses.EndBlock, err = proxyAppConn.EndBlockSync(uint64(block.Height))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -108,8 +103,6 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fail.Fail() // XXX
|
|
||||||
|
|
||||||
valDiff := abciResponses.EndBlock.Diffs
|
valDiff := abciResponses.EndBlock.Diffs
|
||||||
|
|
||||||
log.Info("Executed block", "height", block.Height, "valid txs", validTxs, "invalid txs", invalidTxs)
|
log.Info("Executed block", "height", block.Height, "valid txs", validTxs, "invalid txs", invalidTxs)
|
||||||
|
@ -292,9 +285,8 @@ func (s *State) indexTxs(abciResponses *ABCIResponses) {
|
||||||
s.TxIndexer.Batch(batch)
|
s.TxIndexer.Batch(batch)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply and commit a block, but without all the state validation.
|
// Apply and commit a block on the proxyApp without validating or mutating the state
|
||||||
// Returns the application root hash (result of abci.Commit)
|
// Returns the application root hash (result of abci.Commit)
|
||||||
// TODO handle abciResponses
|
|
||||||
func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) {
|
func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) {
|
||||||
var eventCache types.Fireable // nil
|
var eventCache types.Fireable // nil
|
||||||
_, err := execBlockOnProxyApp(eventCache, appConnConsensus, block)
|
_, err := execBlockOnProxyApp(eventCache, appConnConsensus, block)
|
||||||
|
|
|
@ -186,7 +186,7 @@ type ABCIResponses struct {
|
||||||
DeliverTx []*abci.ResponseDeliverTx
|
DeliverTx []*abci.ResponseDeliverTx
|
||||||
EndBlock abci.ResponseEndBlock
|
EndBlock abci.ResponseEndBlock
|
||||||
|
|
||||||
txs types.Txs // for reference later
|
txs types.Txs // reference for indexing results by hash
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewABCIResponses(block *types.Block) *ABCIResponses {
|
func NewABCIResponses(block *types.Block) *ABCIResponses {
|
||||||
|
|
Loading…
Reference in New Issue