From 07656137780412a2341ec21ed1466647b656944a Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 20 Feb 2017 19:52:36 -0500 Subject: [PATCH] move handshake to consensus package --- consensus/replay.go | 205 +++++++++++++++++++++++++++++++++++++++ consensus/replay_file.go | 2 +- consensus/replay_test.go | 2 +- consensus/state.go | 29 ------ node/node.go | 2 +- state/errors.go | 32 +++--- state/execution.go | 181 +--------------------------------- 7 files changed, 228 insertions(+), 225 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index d534827b..2ab84dc2 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -1,6 +1,7 @@ package consensus import ( + "bytes" "errors" "fmt" "io" @@ -9,13 +10,26 @@ import ( "strings" "time" + abci "github.com/tendermint/abci/types" auto "github.com/tendermint/go-autofile" . "github.com/tendermint/go-common" + cfg "github.com/tendermint/go-config" "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/proxy" + sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) +// Functionality to replay blocks and messages on recovery from a crash. +// There are two general failure scenarios: failure during consensus, and failure while applying the block. +// The former is handled by the WAL, the latter by the proxyApp Handshake on restart, +// which ultimately hands off the work to the WAL. + +//----------------------------------------- +// recover from failure during consensus +// by replaying messages from the WAL + // Unmarshal and apply a single message to the consensus state // as if it were received in receiveRoutine // Lines that start with "#" are ignored. @@ -154,3 +168,194 @@ func makeHeightSearchFunc(height int) auto.SearchFunc { } } } + +//---------------------------------------------- +// Recover from failure during block processing +// by handshaking with the app to figure out where +// we were last and using the WAL to recover there + +// Replay the last block through the consensus and return the AppHash from after Commit. +func replayLastBlock(config cfg.Config, state *sm.State, proxyApp proxy.AppConnConsensus, blockStore sm.BlockStore) ([]byte, error) { + mempool := sm.MockMempool{} + cs := NewConsensusState(config, state, proxyApp, blockStore, mempool) + + evsw := types.NewEventSwitch() + evsw.Start() + defer evsw.Stop() + cs.SetEventSwitch(evsw) + newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1) + + // run through the WAL, commit new block, stop + cs.Start() + <-newBlockCh // TODO: use a timeout and return err? + cs.Stop() + + return cs.state.AppHash, nil +} + +type Handshaker struct { + config cfg.Config + state *sm.State + store sm.BlockStore + + nBlocks int // number of blocks applied to the state +} + +func NewHandshaker(config cfg.Config, state *sm.State, store sm.BlockStore) *Handshaker { + return &Handshaker{config, state, store, 0} +} + +func (h *Handshaker) NBlocks() int { + return h.nBlocks +} + +// TODO: retry the handshake/replay if it fails ? +func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { + // handshake is done via info request on the query conn + res, err := proxyApp.Query().InfoSync() + if err != nil { + return errors.New(Fmt("Error calling Info: %v", err)) + } + + blockHeight := int(res.LastBlockHeight) // XXX: beware overflow + appHash := res.LastBlockAppHash + + log.Notice("ABCI Handshake", "appHeight", blockHeight, "appHash", appHash) + + // TODO: check version + + // replay blocks up to the latest in the blockstore + _, err = h.ReplayBlocks(appHash, blockHeight, proxyApp) + if err != nil { + return errors.New(Fmt("Error on replay: %v", err)) + } + + // TODO: (on restart) replay mempool + + return nil +} + +// Replay all blocks since appBlockHeight and ensure the result matches the current state. +// Returns the final AppHash or an error +func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp proxy.AppConns) ([]byte, error) { + + storeBlockHeight := h.store.Height() + stateBlockHeight := h.state.LastBlockHeight + log.Notice("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight) + + // First handle edge cases and constraints on the storeBlockHeight + if storeBlockHeight == 0 { + return appHash, h.checkAppHash(appHash) + + } else if storeBlockHeight < appBlockHeight { + // the app should never be ahead of the store (but this is under app's control) + return appHash, sm.ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight} + + } else if storeBlockHeight < stateBlockHeight { + // the state should never be ahead of the store (this is under tendermint's control) + PanicSanity(Fmt("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight)) + + } else if storeBlockHeight > stateBlockHeight+1 { + // store should be at most one ahead of the state (this is under tendermint's control) + PanicSanity(Fmt("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1)) + } + + // Now either store is equal to state, or one ahead. + // For each, consider all cases of where the app could be, given app <= store + if storeBlockHeight == stateBlockHeight { + // Tendermint ran Commit and saved the state. + // Either the app is asking for replay, or we're all synced up. + if appBlockHeight < storeBlockHeight { + // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) + return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, false) + + } else if appBlockHeight == storeBlockHeight { + // we're good! + return appHash, h.checkAppHash(appHash) + } + + } else if storeBlockHeight == stateBlockHeight+1 { + // We saved the block in the store but haven't updated the state, + // so we'll need to replay a block using the WAL. + if appBlockHeight < stateBlockHeight { + // the app is further behind than it should be, so replay blocks + // but leave the last block to go through the WAL + return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, true) + + } else if appBlockHeight == stateBlockHeight { + // We haven't run Commit (both the state and app are one block behind), + // so run through consensus with the real app + log.Info("Replay last block using real app") + return replayLastBlock(h.config, h.state, proxyApp.Consensus(), h.store) + + } else if appBlockHeight == storeBlockHeight { + // We ran Commit, but didn't save the state, so run through consensus with mock app + mockApp := newMockProxyApp(appHash) + log.Info("Replay last block using mock app") + return replayLastBlock(h.config, h.state, mockApp, h.store) + } + + } + + PanicSanity("Should never happen") + return nil, nil +} + +func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) { + // App is further behind than it should be, so we need to replay blocks. + // We replay all blocks from appBlockHeight+1 to storeBlockHeight-1, + // and let the final block be replayed through ReplayBlocks. + // Note that we don't have an old version of the state, + // so we by-pass state validation using applyBlock here. + + var appHash []byte + var err error + finalBlock := storeBlockHeight + if useReplayFunc { + finalBlock -= 1 + } + for i := appBlockHeight + 1; i <= finalBlock; i++ { + log.Info("Applying block", "height", i) + h.nBlocks += 1 + block := h.store.LoadBlock(i) + appHash, err = sm.ApplyBlock(proxyApp.Consensus(), block) + if err != nil { + return nil, err + } + } + + if useReplayFunc { + // sync the final block + appHash, err = h.ReplayBlocks(appHash, finalBlock, proxyApp) + if err != nil { + return appHash, err + } + } + + return appHash, h.checkAppHash(appHash) +} + +func (h *Handshaker) checkAppHash(appHash []byte) error { + if !bytes.Equal(h.state.AppHash, appHash) { + return errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)) + } + return nil +} + +//-------------------------------------------------------------------------------- + +func newMockProxyApp(appHash []byte) proxy.AppConnConsensus { + clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{appHash: appHash}) + cli, _ := clientCreator.NewABCIClient() + return proxy.NewAppConnConsensus(cli) +} + +type mockProxyApp struct { + abci.BaseApplication + + appHash []byte +} + +func (mock *mockProxyApp) Commit() abci.Result { + return abci.NewResultOK(mock.appHash, "") +} diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 5d674785..6ff38088 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -248,7 +248,7 @@ func newConsensusStateForReplay(config cfg.Config) *ConsensusState { state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) // Create proxyAppConn connection (consensus, mempool, query) - proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), sm.NewHandshaker(config, state, blockStore, ReplayLastBlock)) + proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), NewHandshaker(config, state, blockStore)) _, err := proxyApp.Start() if err != nil { Exit(Fmt("Error starting proxy app conns: %v", err)) diff --git a/consensus/replay_test.go b/consensus/replay_test.go index e8aa9e90..3016f104 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -346,7 +346,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int) { } // now start the app using the handshake - it should sync - handshaker := sm.NewHandshaker(config, state, store, ReplayLastBlock) + handshaker := NewHandshaker(config, state, store) proxyApp = proxy.NewAppConns(config, clientCreator2, handshaker) if _, err := proxyApp.Start(); err != nil { t.Fatalf("Error starting proxy app connections: %v", err) diff --git a/consensus/state.go b/consensus/state.go index 8e721ed3..dc1324e3 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -253,25 +253,6 @@ type ConsensusState struct { done chan struct{} } -// Replay the last block through the consensus and return the AppHash from after Commit. -func ReplayLastBlock(config cfg.Config, state *sm.State, proxyApp proxy.AppConnConsensus, blockStore sm.BlockStore) ([]byte, error) { - mempool := sm.MockMempool{} - cs := NewConsensusState(config, state, proxyApp, blockStore, mempool) - - evsw := types.NewEventSwitch() - evsw.Start() - defer evsw.Stop() - cs.SetEventSwitch(evsw) - newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1) - - // run through the WAL, commit new block, stop - cs.Start() - <-newBlockCh // TODO: use a timeout and return err? - cs.Stop() - - return cs.state.AppHash, nil -} - func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore sm.BlockStore, mempool sm.Mempool) *ConsensusState { cs := &ConsensusState{ config: config, @@ -624,11 +605,6 @@ func (cs *ConsensusState) newStep() { //----------------------------------------- // the main go routines -// a nice idea but probably more trouble than its worth -func (cs *ConsensusState) stopTimer() { - cs.timeoutTicker.Stop() -} - // receiveRoutine handles messages which may cause state transitions. // it's argument (n) is the number of messages to process before exiting - use 0 to run forever // It keeps the RoundState and is the only thing that updates it. @@ -767,7 +743,6 @@ func (cs *ConsensusState) enterNewRound(height int, round int) { if now := time.Now(); cs.StartTime.After(now) { log.Warn("Need to set a buffer and log.Warn() here for sanity.", "startTime", cs.StartTime, "now", now) } - // cs.stopTimer() log.Notice(Fmt("enterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) @@ -947,8 +922,6 @@ func (cs *ConsensusState) enterPrevote(height int, round int) { // TODO: catchup event? } - // cs.stopTimer() - log.Info(Fmt("enterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) // Sign and broadcast vote as necessary @@ -1022,8 +995,6 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { return } - // cs.stopTimer() - log.Info(Fmt("enterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) defer func() { diff --git a/node/node.go b/node/node.go index 755658b7..deaa9e16 100644 --- a/node/node.go +++ b/node/node.go @@ -69,7 +69,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato // Create the proxyApp, which manages connections (consensus, mempool, query) // and sync tendermint and the app by replaying any necessary blocks - proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(config, state, blockStore, consensus.ReplayLastBlock)) + proxyApp := proxy.NewAppConns(config, clientCreator, consensus.NewHandshaker(config, state, blockStore)) if _, err := proxyApp.Start(); err != nil { cmn.Exit(cmn.Fmt("Error starting proxy app connections: %v", err)) } diff --git a/state/errors.go b/state/errors.go index 0d0eae14..32a9351c 100644 --- a/state/errors.go +++ b/state/errors.go @@ -9,47 +9,47 @@ type ( ErrProxyAppConn error ErrUnknownBlock struct { - height int + Height int } ErrBlockHashMismatch struct { - coreHash []byte - appHash []byte - height int + CoreHash []byte + AppHash []byte + Height int } ErrAppBlockHeightTooHigh struct { - coreHeight int - appHeight int + CoreHeight int + AppHeight int } ErrLastStateMismatch struct { - height int - core []byte - app []byte + Height int + Core []byte + App []byte } ErrStateMismatch struct { - got *State - expected *State + Got *State + Expected *State } ) func (e ErrUnknownBlock) Error() string { - return Fmt("Could not find block #%d", e.height) + return Fmt("Could not find block #%d", e.Height) } func (e ErrBlockHashMismatch) Error() string { - return Fmt("App block hash (%X) does not match core block hash (%X) for height %d", e.appHash, e.coreHash, e.height) + return Fmt("App block hash (%X) does not match core block hash (%X) for height %d", e.AppHash, e.CoreHash, e.Height) } func (e ErrAppBlockHeightTooHigh) Error() string { - return Fmt("App block height (%d) is higher than core (%d)", e.appHeight, e.coreHeight) + return Fmt("App block height (%d) is higher than core (%d)", e.AppHeight, e.CoreHeight) } func (e ErrLastStateMismatch) Error() string { - return Fmt("Latest tendermint block (%d) LastAppHash (%X) does not match app's AppHash (%X)", e.height, e.core, e.app) + return Fmt("Latest tendermint block (%d) LastAppHash (%X) does not match app's AppHash (%X)", e.Height, e.Core, e.App) } func (e ErrStateMismatch) Error() string { - return Fmt("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", e.got, e.expected) + return Fmt("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", e.Got, e.Expected) } diff --git a/state/execution.go b/state/execution.go index c1471bf8..3e7ad91b 100644 --- a/state/execution.go +++ b/state/execution.go @@ -1,14 +1,12 @@ package state import ( - "bytes" "errors" "github.com/ebuchman/fail-test" abci "github.com/tendermint/abci/types" . "github.com/tendermint/go-common" - cfg "github.com/tendermint/go-config" "github.com/tendermint/go-crypto" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" @@ -272,7 +270,7 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl // Apply and commit a block, but without all the state validation. // Returns the application root hash (result of abci.Commit) -func applyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) { +func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) { var eventCache types.Fireable // nil _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block) if err != nil { @@ -291,6 +289,9 @@ func applyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([] return res.Data, nil } +//------------------------------------------------------ +// blockchain services types + // Updates to the mempool need to be synchronized with committing a block // so apps can reset their transient state on Commit type Mempool interface { @@ -311,9 +312,6 @@ func (m MockMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) error { retur func (m MockMempool) Reap(n int) types.Txs { return types.Txs{} } func (m MockMempool) Update(height int, txs types.Txs) {} -//---------------------------------------------------------------- -// Handshake with app to sync to latest state of core by replaying blocks - // TODO: Should we move blockchain/store.go to its own package? type BlockStore interface { Height() int @@ -327,174 +325,3 @@ type BlockStore interface { LoadBlockCommit(height int) *types.Commit LoadSeenCommit(height int) *types.Commit } - -// returns the apphash from Commit -type blockReplayFunc func(cfg.Config, *State, proxy.AppConnConsensus, BlockStore) ([]byte, error) - -type Handshaker struct { - config cfg.Config - state *State - store BlockStore - replayLastBlock blockReplayFunc - - nBlocks int // number of blocks applied to the state -} - -func NewHandshaker(config cfg.Config, state *State, store BlockStore, f blockReplayFunc) *Handshaker { - return &Handshaker{config, state, store, f, 0} -} - -func (h *Handshaker) NBlocks() int { - return h.nBlocks -} - -// TODO: retry the handshake/replay if it fails ? -func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { - // handshake is done via info request on the query conn - res, err := proxyApp.Query().InfoSync() - if err != nil { - return errors.New(Fmt("Error calling Info: %v", err)) - } - - blockHeight := int(res.LastBlockHeight) // XXX: beware overflow - appHash := res.LastBlockAppHash - - log.Notice("ABCI Handshake", "appHeight", blockHeight, "appHash", appHash) - - // TODO: check version - - // replay blocks up to the latest in the blockstore - _, err = h.ReplayBlocks(appHash, blockHeight, proxyApp) - if err != nil { - return errors.New(Fmt("Error on replay: %v", err)) - } - - // TODO: (on restart) replay mempool - - return nil -} - -// Replay all blocks since appBlockHeight and ensure the result matches the current state. -// Returns the final AppHash or an error -func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp proxy.AppConns) ([]byte, error) { - - storeBlockHeight := h.store.Height() - stateBlockHeight := h.state.LastBlockHeight - log.Notice("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight) - - // First handle edge cases and constraints on the storeBlockHeight - if storeBlockHeight == 0 { - return appHash, h.checkAppHash(appHash) - - } else if storeBlockHeight < appBlockHeight { - // the app should never be ahead of the store (but this is under app's control) - return appHash, ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight} - - } else if storeBlockHeight < stateBlockHeight { - // the state should never be ahead of the store (this is under tendermint's control) - PanicSanity(Fmt("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight)) - - } else if storeBlockHeight > stateBlockHeight+1 { - // store should be at most one ahead of the state (this is under tendermint's control) - PanicSanity(Fmt("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1)) - } - - // Now either store is equal to state, or one ahead. - // For each, consider all cases of where the app could be, given app <= store - if storeBlockHeight == stateBlockHeight { - // Tendermint ran Commit and saved the state. - // Either the app is asking for replay, or we're all synced up. - if appBlockHeight < storeBlockHeight { - // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) - return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, false) - - } else if appBlockHeight == storeBlockHeight { - // we're good! - return appHash, h.checkAppHash(appHash) - } - - } else if storeBlockHeight == stateBlockHeight+1 { - // We saved the block in the store but haven't updated the state, - // so we'll need to replay a block using the WAL. - if appBlockHeight < stateBlockHeight { - // the app is further behind than it should be, so replay blocks - // but leave the last block to go through the WAL - return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, true) - - } else if appBlockHeight == stateBlockHeight { - // We haven't run Commit (both the state and app are one block behind), - // so run through consensus with the real app - log.Info("Replay last block using real app") - return h.replayLastBlock(h.config, h.state, proxyApp.Consensus(), h.store) - - } else if appBlockHeight == storeBlockHeight { - // We ran Commit, but didn't save the state, so run through consensus with mock app - mockApp := newMockProxyApp(appHash) - log.Info("Replay last block using mock app") - return h.replayLastBlock(h.config, h.state, mockApp, h.store) - } - - } - - PanicSanity("Should never happen") - return nil, nil -} - -func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) { - // App is further behind than it should be, so we need to replay blocks. - // We replay all blocks from appBlockHeight+1 to storeBlockHeight-1, - // and let the final block be replayed through ReplayBlocks. - // Note that we don't have an old version of the state, - // so we by-pass state validation using applyBlock here. - - var appHash []byte - var err error - finalBlock := storeBlockHeight - if useReplayFunc { - finalBlock -= 1 - } - for i := appBlockHeight + 1; i <= finalBlock; i++ { - log.Info("Applying block", "height", i) - h.nBlocks += 1 - block := h.store.LoadBlock(i) - appHash, err = applyBlock(proxyApp.Consensus(), block) - if err != nil { - return nil, err - } - } - - if useReplayFunc { - // sync the final block - appHash, err = h.ReplayBlocks(appHash, finalBlock, proxyApp) - if err != nil { - return appHash, err - } - } - - return appHash, h.checkAppHash(appHash) -} - -func (h *Handshaker) checkAppHash(appHash []byte) error { - if !bytes.Equal(h.state.AppHash, appHash) { - return errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)) - } - return nil -} - -//-------------------------------------------------------------------------------- - -func newMockProxyApp(appHash []byte) proxy.AppConnConsensus { - clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{appHash: appHash}) - cli, _ := clientCreator.NewABCIClient() - return proxy.NewAppConnConsensus(cli) -} - -type mockProxyApp struct { - abci.BaseApplication - - appHash []byte -} - -func (mock *mockProxyApp) Commit() abci.Result { - return abci.NewResultOK(mock.appHash, "") -}