tendermint/consensus/replay.go

428 lines
14 KiB
Go
Raw Normal View History

2015-12-22 22:27:40 -08:00
package consensus
import (
2017-02-20 16:52:36 -08:00
"bytes"
2015-12-22 22:27:40 -08:00
"fmt"
"hash/crc32"
"io"
2015-12-22 22:27:40 -08:00
"reflect"
2017-10-28 08:07:59 -07:00
//"strconv"
//"strings"
2015-12-22 22:27:40 -08:00
"time"
2017-02-20 16:52:36 -08:00
abci "github.com/tendermint/abci/types"
2017-10-28 08:07:59 -07:00
//auto "github.com/tendermint/tmlibs/autofile"
cmn "github.com/tendermint/tmlibs/common"
2017-12-27 19:09:48 -08:00
dbm "github.com/tendermint/tmlibs/db"
2017-05-02 00:53:32 -07:00
"github.com/tendermint/tmlibs/log"
2015-12-22 22:27:40 -08:00
2017-02-20 16:52:36 -08:00
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
2015-12-22 22:27:40 -08:00
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
2015-12-22 22:27:40 -08:00
)
var crc32c = crc32.MakeTable(crc32.Castagnoli)
2017-02-20 16:52:36 -08:00
// Functionality to replay blocks and messages on recovery from a crash.
// There are two general failure scenarios: failure during consensus, and failure while applying the block.
// The former is handled by the WAL, the latter by the proxyApp Handshake on restart,
// which ultimately hands off the work to the WAL.
//-----------------------------------------
// recover from failure during consensus
// by replaying messages from the WAL
2016-10-28 15:01:14 -07:00
// Unmarshal and apply a single message to the consensus state
// as if it were received in receiveRoutine
2016-10-28 15:01:14 -07:00
// Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running
2017-10-09 12:10:58 -07:00
func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan interface{}) error {
2017-10-23 12:33:17 -07:00
// skip meta messages
2017-10-09 12:10:58 -07:00
if _, ok := msg.Msg.(EndHeightMessage); ok {
2016-10-28 15:01:14 -07:00
return nil
}
2015-12-22 22:27:40 -08:00
2016-01-18 11:10:05 -08:00
// for logging
switch m := msg.Msg.(type) {
case types.EventDataRoundState:
2017-05-02 00:53:32 -07:00
cs.Logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
2016-01-18 11:10:05 -08:00
// these are playback checks
ticker := time.After(time.Second * 2)
if newStepCh != nil {
select {
case mi := <-newStepCh:
m2 := mi.(types.EventDataRoundState)
2016-01-18 11:10:05 -08:00
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m)
}
case <-ticker:
return fmt.Errorf("Failed to read off newStepCh")
}
}
case msgInfo:
2018-01-01 18:27:38 -08:00
peerID := m.PeerID
if peerID == "" {
peerID = "local"
2016-01-18 11:10:05 -08:00
}
switch msg := m.Msg.(type) {
case *ProposalMessage:
p := msg.Proposal
2017-05-02 00:53:32 -07:00
cs.Logger.Info("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
2018-01-01 18:27:38 -08:00
p.BlockPartsHeader, "pol", p.POLRound, "peer", peerID)
2016-01-18 11:10:05 -08:00
case *BlockPartMessage:
2018-01-01 18:27:38 -08:00
cs.Logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerID)
2016-01-18 11:10:05 -08:00
case *VoteMessage:
v := msg.Vote
2017-05-02 00:53:32 -07:00
cs.Logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
2018-01-01 18:27:38 -08:00
"blockID", v.BlockID, "peer", peerID)
2015-12-22 22:27:40 -08:00
}
2017-07-25 07:52:14 -07:00
cs.handleMsg(m)
2016-01-18 11:10:05 -08:00
case timeoutInfo:
2017-05-02 00:53:32 -07:00
cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.handleTimeout(m, cs.RoundState)
2016-01-18 11:10:05 -08:00
default:
2016-10-28 15:01:14 -07:00
return fmt.Errorf("Replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg))
2015-12-22 22:27:40 -08:00
}
2016-01-18 11:10:05 -08:00
return nil
2015-12-22 22:27:40 -08:00
}
// replay only those messages since the last block.
// timeoutRoutine should run concurrently to read off tickChan
func (cs *ConsensusState) catchupReplay(csHeight int64) error {
// set replayMode
cs.replayMode = true
defer func() { cs.replayMode = false }()
// Ensure that ENDHEIGHT for this height doesn't exist.
// NOTE: This is just a sanity check. As far as we know things work fine
// without it, and Handshake could reuse ConsensusState if it weren't for
// this check (since we can crash after writing ENDHEIGHT).
//
// Ignore data corruption errors since this is a sanity check.
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
2017-10-03 16:11:55 -07:00
if err != nil {
return err
}
2016-10-28 15:01:14 -07:00
if gr != nil {
2017-09-06 10:11:47 -07:00
if err := gr.Close(); err != nil {
return err
}
2016-10-28 15:01:14 -07:00
}
2017-04-17 18:10:38 -07:00
if found {
new pubsub package comment out failing consensus tests for now rewrite rpc httpclient to use new pubsub package import pubsub as tmpubsub, query as tmquery make event IDs constants EventKey -> EventTypeKey rename EventsPubsub to PubSub mempool does not use pubsub rename eventsSub to pubsub new subscribe API fix channel size issues and consensus tests bugs refactor rpc client add missing discardFromChan method add mutex rename pubsub to eventBus remove IsRunning from WSRPCConnection interface (not needed) add a comment in broadcastNewRoundStepsAndVotes rename registerEventCallbacks to broadcastNewRoundStepsAndVotes See https://dave.cheney.net/2014/03/19/channel-axioms stop eventBuses after reactor tests remove unnecessary Unsubscribe return subscribe helper function move discardFromChan to where it is used subscribe now returns an err this gives us ability to refuse to subscribe if pubsub is at its max capacity. use context for control overflow cache queries handle err when subscribing in replay_test rename testClientID to testSubscriber extract var set channel buffer capacity to 1 in replay_file fix byzantine_test unsubscribe from single event, not all events refactor httpclient to return events to appropriate channels return failing testReplayCrashBeforeWriteVote test fix TestValidatorSetChanges refactor code a bit fix testReplayCrashBeforeWriteVote add comment fix TestValidatorSetChanges fixes from Bucky's review update comment [ci skip] test TxEventBuffer update changelog fix TestValidatorSetChanges (2nd attempt) only do wg.Done when no errors benchmark event bus create pubsub server inside NewEventBus only expose config params (later if needed) set buffer capacity to 0 so we are not testing cache new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ} This should allow to subscribe to all transactions! or a specific one using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'" use TimeoutCommit instead of afterPublishEventNewBlockTimeout TimeoutCommit is the time a node waits after committing a block, before it goes into the next height. So it will finish everything from the last block, but then wait a bit. The idea is this gives it time to hear more votes from other validators, to strengthen the commit it includes in the next block. But it also gives it time to hear about new transactions. waitForBlockWithUpdatedVals rewrite WAL crash tests Task: test that we can recover from any WAL crash. Solution: the old tests were relying on event hub being run in the same thread (we were injecting the private validator's last signature). when considering a rewrite, we considered two possible solutions: write a "fuzzy" testing system where WAL is crashing upon receiving a new message, or inject failures and trigger them in tests using something like https://github.com/coreos/gofail. remove sleep no cs.Lock around wal.Save test different cases (empty block, non-empty block, ...) comments add comments test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks fixes as per Bucky's last review reset subscriptions on UnsubscribeAll use a simple counter to track message for which we panicked also, set a smaller part size for all test cases
2017-06-26 08:00:30 -07:00
return fmt.Errorf("WAL should not contain #ENDHEIGHT %d.", csHeight)
2017-04-17 18:10:38 -07:00
}
2017-04-14 13:27:22 -07:00
// Search for last height marker
//
// Ignore data corruption errors in previous heights because we only care about last height
gr, found, err = cs.wal.SearchForEndHeight(csHeight-1, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
2016-12-06 01:16:13 -08:00
if err == io.EOF {
2017-05-02 00:53:32 -07:00
cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1)
2016-12-06 01:16:13 -08:00
} else if err != nil {
return err
}
2016-10-28 15:01:14 -07:00
if !found {
return fmt.Errorf("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1)
}
2017-11-27 13:48:15 -08:00
defer gr.Close() // nolint: errcheck
2017-05-02 00:53:32 -07:00
cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight)
2016-01-18 12:57:57 -08:00
2017-10-09 12:10:58 -07:00
var msg *TimedWALMessage
dec := WALDecoder{gr}
2016-10-28 15:01:14 -07:00
for {
2017-10-09 12:10:58 -07:00
msg, err = dec.Decode()
if err == io.EOF {
break
} else if IsDataCorruptionError(err) {
cs.Logger.Debug("data has been corrupted in last height of consensus WAL", "err", err, "height", csHeight)
2017-12-15 09:59:45 -08:00
panic(fmt.Sprintf("data has been corrupted (%v) in last height %d of consensus WAL", err, csHeight))
2017-10-23 12:33:17 -07:00
} else if err != nil {
2017-10-09 12:10:58 -07:00
return err
}
// NOTE: since the priv key is set when the msgs are received
// it will attempt to eg double sign but we can just ignore it
// since the votes will be replayed and we'll get to the next step
2017-10-09 12:10:58 -07:00
if err := cs.readReplayMessage(msg, nil); err != nil {
return err
}
}
2017-05-02 00:53:32 -07:00
cs.Logger.Info("Replay: Done")
return nil
2016-01-18 11:10:05 -08:00
}
2016-10-28 15:01:14 -07:00
//--------------------------------------------------------------------------------
// Parses marker lines of the form:
2017-04-14 13:27:22 -07:00
// #ENDHEIGHT: 12345
2017-10-28 08:07:59 -07:00
/*
func makeHeightSearchFunc(height int64) auto.SearchFunc {
2016-10-28 15:01:14 -07:00
return func(line string) (int, error) {
line = strings.TrimRight(line, "\n")
parts := strings.Split(line, " ")
if len(parts) != 2 {
return -1, errors.New("Line did not have 2 parts")
}
i, err := strconv.Atoi(parts[1])
if err != nil {
return -1, errors.New("Failed to parse INFO: " + err.Error())
}
if height < i {
return 1, nil
} else if height == i {
return 0, nil
} else {
return -1, nil
}
}
2017-10-28 08:07:59 -07:00
}*/
2017-02-20 16:52:36 -08:00
//----------------------------------------------
// Recover from failure during block processing
// by handshaking with the app to figure out where
// we were last and using the WAL to recover there
type Handshaker struct {
2017-12-27 19:09:48 -08:00
stateDB dbm.DB
initialState sm.State
store types.BlockStore
logger log.Logger
2017-02-20 16:52:36 -08:00
nBlocks int // number of blocks applied to the state
}
2017-12-27 19:09:48 -08:00
func NewHandshaker(stateDB dbm.DB, state sm.State, store types.BlockStore) *Handshaker {
return &Handshaker{stateDB, state, store, log.NewNopLogger(), 0}
2017-05-02 00:53:32 -07:00
}
func (h *Handshaker) SetLogger(l log.Logger) {
h.logger = l
2017-02-20 16:52:36 -08:00
}
func (h *Handshaker) NBlocks() int {
return h.nBlocks
}
// TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// handshake is done via info request on the query conn
res, err := proxyApp.Query().InfoSync(abci.RequestInfo{version.Version})
2017-02-20 16:52:36 -08:00
if err != nil {
return fmt.Errorf("Error calling Info: %v", err)
2017-02-20 16:52:36 -08:00
}
blockHeight := int64(res.LastBlockHeight)
if blockHeight < 0 {
return fmt.Errorf("Got a negative last block height (%d) from the app", blockHeight)
}
2017-02-20 16:52:36 -08:00
appHash := res.LastBlockAppHash
2017-05-12 14:07:53 -07:00
h.logger.Info("ABCI Handshake", "appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash))
2017-02-20 16:52:36 -08:00
// TODO: check version
// replay blocks up to the latest in the blockstore
2017-12-27 19:09:48 -08:00
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
if err != nil {
return fmt.Errorf("Error on replay: %v", err)
2017-02-20 16:52:36 -08:00
}
2017-05-12 14:07:53 -07:00
h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced", "appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash))
2017-02-20 16:52:36 -08:00
// TODO: (on restart) replay mempool
return nil
}
// Replay all blocks since appBlockHeight and ensure the result matches the current state.
// Returns the final AppHash or an error
2017-12-27 19:09:48 -08:00
func (h *Handshaker) ReplayBlocks(state sm.State, appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns) ([]byte, error) {
2017-02-20 16:52:36 -08:00
storeBlockHeight := h.store.Height()
2017-12-27 19:09:48 -08:00
stateBlockHeight := state.LastBlockHeight
2017-05-02 00:53:32 -07:00
h.logger.Info("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight)
2017-02-20 16:52:36 -08:00
// If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain
if appBlockHeight == 0 {
2017-12-27 19:09:48 -08:00
validators := types.TM2PB.Validators(state.Validators)
2017-11-29 09:22:52 -08:00
if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil {
2017-09-06 10:11:47 -07:00
return nil, err
}
}
2017-02-20 16:52:36 -08:00
// First handle edge cases and constraints on the storeBlockHeight
if storeBlockHeight == 0 {
2017-12-27 19:09:48 -08:00
return appHash, checkAppHash(state, appHash)
2017-02-20 16:52:36 -08:00
} else if storeBlockHeight < appBlockHeight {
// the app should never be ahead of the store (but this is under app's control)
return appHash, sm.ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight}
} else if storeBlockHeight < stateBlockHeight {
// the state should never be ahead of the store (this is under tendermint's control)
cmn.PanicSanity(cmn.Fmt("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight))
2017-02-20 16:52:36 -08:00
} else if storeBlockHeight > stateBlockHeight+1 {
// store should be at most one ahead of the state (this is under tendermint's control)
cmn.PanicSanity(cmn.Fmt("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1))
2017-02-20 16:52:36 -08:00
}
2017-12-27 19:09:48 -08:00
var err error
2017-02-20 16:52:36 -08:00
// Now either store is equal to state, or one ahead.
// For each, consider all cases of where the app could be, given app <= store
if storeBlockHeight == stateBlockHeight {
// Tendermint ran Commit and saved the state.
// Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
2017-12-27 19:09:48 -08:00
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false)
2017-02-20 16:52:36 -08:00
} else if appBlockHeight == storeBlockHeight {
2017-03-05 11:59:02 -08:00
// We're good!
2017-12-27 19:09:48 -08:00
return appHash, checkAppHash(state, appHash)
2017-02-20 16:52:36 -08:00
}
} else if storeBlockHeight == stateBlockHeight+1 {
// We saved the block in the store but haven't updated the state,
// so we'll need to replay a block using the WAL.
if appBlockHeight < stateBlockHeight {
// the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL
2017-12-27 19:09:48 -08:00
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true)
2017-02-20 16:52:36 -08:00
} else if appBlockHeight == stateBlockHeight {
// We haven't run Commit (both the state and app are one block behind),
2017-04-14 22:33:30 -07:00
// so replayBlock with the real app.
// NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT
2017-05-02 00:53:32 -07:00
h.logger.Info("Replay last block using real app")
2017-12-27 19:09:48 -08:00
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
return state.AppHash, err
2017-02-20 16:52:36 -08:00
} else if appBlockHeight == storeBlockHeight {
2017-04-14 22:33:30 -07:00
// We ran Commit, but didn't save the state, so replayBlock with mock app
2017-12-27 19:09:48 -08:00
abciResponses, err := sm.LoadABCIResponses(h.stateDB, storeBlockHeight)
2017-12-25 10:54:19 -08:00
if err != nil {
return nil, err
}
mockApp := newMockProxyApp(appHash, abciResponses)
2017-05-02 00:53:32 -07:00
h.logger.Info("Replay last block using mock app")
2017-12-27 19:09:48 -08:00
state, err = h.replayBlock(state, storeBlockHeight, mockApp)
return state.AppHash, err
2017-02-20 16:52:36 -08:00
}
}
cmn.PanicSanity("Should never happen")
2017-02-20 16:52:36 -08:00
return nil, nil
}
2017-12-27 19:09:48 -08:00
func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) {
2017-02-20 16:52:36 -08:00
// App is further behind than it should be, so we need to replay blocks.
2017-03-28 11:06:03 -07:00
// We replay all blocks from appBlockHeight+1.
2017-08-21 13:31:54 -07:00
//
2017-02-20 16:52:36 -08:00
// Note that we don't have an old version of the state,
// so we by-pass state validation/mutation using sm.ExecCommitBlock.
2017-08-21 13:31:54 -07:00
// This also means we won't be saving validator sets if they change during this period.
2017-12-26 22:27:03 -08:00
// TODO: Load the historical information to fix this and just use state.ApplyBlock
2017-08-21 13:31:54 -07:00
//
2017-04-17 18:14:35 -07:00
// If mutateState == true, the final block is replayed with h.replayBlock()
2017-02-20 16:52:36 -08:00
var appHash []byte
var err error
finalBlock := storeBlockHeight
2017-04-14 22:33:30 -07:00
if mutateState {
2017-02-20 16:52:36 -08:00
finalBlock -= 1
}
for i := appBlockHeight + 1; i <= finalBlock; i++ {
2017-05-02 00:53:32 -07:00
h.logger.Info("Applying block", "height", i)
2017-02-20 16:52:36 -08:00
block := h.store.LoadBlock(i)
2017-12-27 19:09:48 -08:00
appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger)
2017-02-20 16:52:36 -08:00
if err != nil {
return nil, err
}
2017-02-20 18:45:53 -08:00
h.nBlocks += 1
2017-02-20 16:52:36 -08:00
}
2017-04-14 22:33:30 -07:00
if mutateState {
2017-02-20 16:52:36 -08:00
// sync the final block
2017-12-27 19:09:48 -08:00
state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
if err != nil {
return nil, err
}
appHash = state.AppHash
2017-02-20 16:52:36 -08:00
}
2017-12-27 19:09:48 -08:00
return appHash, checkAppHash(state, appHash)
2017-02-20 16:52:36 -08:00
}
// ApplyBlock on the proxyApp with the last block.
2017-12-27 19:09:48 -08:00
func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) {
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)
2017-02-20 18:45:53 -08:00
2017-12-28 15:26:13 -08:00
blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, types.MockMempool{}, types.MockEvidencePool{})
2017-12-27 19:09:48 -08:00
var err error
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)
if err != nil {
return sm.State{}, err
2017-03-27 12:41:45 -07:00
}
2017-02-20 18:45:53 -08:00
h.nBlocks += 1
2017-12-27 19:09:48 -08:00
return state, nil
2017-02-20 18:45:53 -08:00
}
2017-12-27 19:09:48 -08:00
func checkAppHash(state sm.State, appHash []byte) error {
if !bytes.Equal(state.AppHash, appHash) {
panic(fmt.Errorf("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, state.AppHash).Error())
2017-02-20 16:52:36 -08:00
}
return nil
}
//--------------------------------------------------------------------------------
// mockProxyApp uses ABCIResponses to give the right results
// Useful because we don't want to call Commit() twice for the same block on the real app.
func newMockProxyApp(appHash []byte, abciResponses *sm.ABCIResponses) proxy.AppConnConsensus {
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{
appHash: appHash,
abciResponses: abciResponses,
})
2017-02-20 16:52:36 -08:00
cli, _ := clientCreator.NewABCIClient()
err := cli.Start()
2017-09-06 10:11:47 -07:00
if err != nil {
panic(err)
}
2017-02-20 16:52:36 -08:00
return proxy.NewAppConnConsensus(cli)
}
type mockProxyApp struct {
abci.BaseApplication
appHash []byte
txCount int
abciResponses *sm.ABCIResponses
}
func (mock *mockProxyApp) DeliverTx(tx []byte) abci.ResponseDeliverTx {
r := mock.abciResponses.DeliverTx[mock.txCount]
mock.txCount += 1
return *r
}
2017-11-29 09:22:52 -08:00
func (mock *mockProxyApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
mock.txCount = 0
return *mock.abciResponses.EndBlock
2017-02-20 16:52:36 -08:00
}
func (mock *mockProxyApp) Commit() abci.ResponseCommit {
2017-12-01 22:47:55 -08:00
return abci.ResponseCommit{Code: abci.CodeTypeOK, Data: mock.appHash}
2017-02-20 16:52:36 -08:00
}