Merge pull request #408 from tendermint/mock_app
handshake replay through consensus using mockApp
This commit is contained in:
commit
bed86da8ae
|
@ -242,7 +242,7 @@ FOR_LOOP:
|
|||
// NOTE: we could improve performance if we
|
||||
// didn't make the app commit to disk every block
|
||||
// ... but we would need a way to get the hash without it persisting
|
||||
err := bcR.state.ApplyBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader, sm.MockMempool{})
|
||||
err := bcR.state.ApplyBlock(bcR.evsw, bcR.proxyAppConn, first, firstPartsHeader, types.MockMempool{})
|
||||
if err != nil {
|
||||
// TODO This is bad, are we zombie?
|
||||
PanicQ(Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
|
||||
|
|
|
@ -6,15 +6,15 @@ import (
|
|||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// NOTE: this is totally unsafe.
|
||||
// XXX: this is totally unsafe.
|
||||
// it's only suitable for testnets.
|
||||
func reset_all() {
|
||||
reset_priv_validator()
|
||||
os.RemoveAll(config.GetString("db_dir"))
|
||||
os.RemoveAll(config.GetString("cs_wal_dir"))
|
||||
os.Remove(config.GetString("cs_wal_file"))
|
||||
}
|
||||
|
||||
// NOTE: this is totally unsafe.
|
||||
// XXX: this is totally unsafe.
|
||||
// it's only suitable for testnets.
|
||||
func reset_priv_validator() {
|
||||
// Get PrivValidator
|
||||
|
|
|
@ -72,7 +72,7 @@ func GetConfig(rootDir string) cfg.Config {
|
|||
mapConfig.SetDefault("grpc_laddr", "")
|
||||
mapConfig.SetDefault("prof_laddr", "")
|
||||
mapConfig.SetDefault("revision_file", rootDir+"/revision")
|
||||
mapConfig.SetDefault("cs_wal_dir", rootDir+"/data/cs.wal")
|
||||
mapConfig.SetDefault("cs_wal_file", rootDir+"/data/cs.wal/wal")
|
||||
mapConfig.SetDefault("cs_wal_light", false)
|
||||
mapConfig.SetDefault("filter_peers", false)
|
||||
|
||||
|
|
|
@ -86,7 +86,7 @@ func ResetConfig(localPath string) cfg.Config {
|
|||
mapConfig.SetDefault("grpc_laddr", "tcp://0.0.0.0:36658")
|
||||
mapConfig.SetDefault("prof_laddr", "")
|
||||
mapConfig.SetDefault("revision_file", rootDir+"/revision")
|
||||
mapConfig.SetDefault("cs_wal_dir", rootDir+"/data/cs.wal")
|
||||
mapConfig.SetDefault("cs_wal_file", rootDir+"/data/cs.wal/wal")
|
||||
mapConfig.SetDefault("cs_wal_light", false)
|
||||
mapConfig.SetDefault("filter_peers", false)
|
||||
|
||||
|
|
|
@ -11,6 +11,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
abcicli "github.com/tendermint/abci/client"
|
||||
abci "github.com/tendermint/abci/types"
|
||||
. "github.com/tendermint/go-common"
|
||||
cfg "github.com/tendermint/go-config"
|
||||
dbm "github.com/tendermint/go-db"
|
||||
|
@ -20,8 +22,6 @@ import (
|
|||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
abcicli "github.com/tendermint/abci/client"
|
||||
abci "github.com/tendermint/abci/types"
|
||||
|
||||
"github.com/tendermint/abci/example/counter"
|
||||
"github.com/tendermint/abci/example/dummy"
|
||||
|
@ -320,7 +320,7 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou
|
|||
state := sm.MakeGenesisState(db, genDoc)
|
||||
state.Save()
|
||||
thisConfig := tendermint_test.ResetConfig(Fmt("%s_%d", testName, i))
|
||||
ensureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal
|
||||
ensureDir(path.Dir(thisConfig.GetString("cs_wal_file")), 0700) // dir for wal
|
||||
css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], appFunc())
|
||||
css[i].SetTimeoutTicker(tickerFunc())
|
||||
}
|
||||
|
@ -336,7 +336,7 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF
|
|||
state := sm.MakeGenesisState(db, genDoc)
|
||||
state.Save()
|
||||
thisConfig := tendermint_test.ResetConfig(Fmt("%s_%d", testName, i))
|
||||
ensureDir(thisConfig.GetString("cs_wal_dir"), 0700) // dir for wal
|
||||
ensureDir(path.Dir(thisConfig.GetString("cs_wal_file")), 0700) // dir for wal
|
||||
var privVal *types.PrivValidator
|
||||
if i < nValidators {
|
||||
privVal = privVals[i]
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package consensus
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -9,13 +10,26 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
abci "github.com/tendermint/abci/types"
|
||||
auto "github.com/tendermint/go-autofile"
|
||||
. "github.com/tendermint/go-common"
|
||||
cfg "github.com/tendermint/go-config"
|
||||
"github.com/tendermint/go-wire"
|
||||
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// Functionality to replay blocks and messages on recovery from a crash.
|
||||
// There are two general failure scenarios: failure during consensus, and failure while applying the block.
|
||||
// The former is handled by the WAL, the latter by the proxyApp Handshake on restart,
|
||||
// which ultimately hands off the work to the WAL.
|
||||
|
||||
//-----------------------------------------
|
||||
// recover from failure during consensus
|
||||
// by replaying messages from the WAL
|
||||
|
||||
// Unmarshal and apply a single message to the consensus state
|
||||
// as if it were received in receiveRoutine
|
||||
// Lines that start with "#" are ignored.
|
||||
|
@ -154,3 +168,195 @@ func makeHeightSearchFunc(height int) auto.SearchFunc {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
//----------------------------------------------
|
||||
// Recover from failure during block processing
|
||||
// by handshaking with the app to figure out where
|
||||
// we were last and using the WAL to recover there
|
||||
|
||||
type Handshaker struct {
|
||||
config cfg.Config
|
||||
state *sm.State
|
||||
store types.BlockStore
|
||||
|
||||
nBlocks int // number of blocks applied to the state
|
||||
}
|
||||
|
||||
func NewHandshaker(config cfg.Config, state *sm.State, store types.BlockStore) *Handshaker {
|
||||
return &Handshaker{config, state, store, 0}
|
||||
}
|
||||
|
||||
func (h *Handshaker) NBlocks() int {
|
||||
return h.nBlocks
|
||||
}
|
||||
|
||||
// TODO: retry the handshake/replay if it fails ?
|
||||
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
|
||||
// handshake is done via info request on the query conn
|
||||
res, err := proxyApp.Query().InfoSync()
|
||||
if err != nil {
|
||||
return errors.New(Fmt("Error calling Info: %v", err))
|
||||
}
|
||||
|
||||
blockHeight := int(res.LastBlockHeight) // XXX: beware overflow
|
||||
appHash := res.LastBlockAppHash
|
||||
|
||||
log.Notice("ABCI Handshake", "appHeight", blockHeight, "appHash", appHash)
|
||||
|
||||
// TODO: check version
|
||||
|
||||
// replay blocks up to the latest in the blockstore
|
||||
_, err = h.ReplayBlocks(appHash, blockHeight, proxyApp)
|
||||
if err != nil {
|
||||
return errors.New(Fmt("Error on replay: %v", err))
|
||||
}
|
||||
|
||||
// TODO: (on restart) replay mempool
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Replay all blocks since appBlockHeight and ensure the result matches the current state.
|
||||
// Returns the final AppHash or an error
|
||||
func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp proxy.AppConns) ([]byte, error) {
|
||||
|
||||
storeBlockHeight := h.store.Height()
|
||||
stateBlockHeight := h.state.LastBlockHeight
|
||||
log.Notice("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight)
|
||||
|
||||
// First handle edge cases and constraints on the storeBlockHeight
|
||||
if storeBlockHeight == 0 {
|
||||
return appHash, h.checkAppHash(appHash)
|
||||
|
||||
} else if storeBlockHeight < appBlockHeight {
|
||||
// the app should never be ahead of the store (but this is under app's control)
|
||||
return appHash, sm.ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight}
|
||||
|
||||
} else if storeBlockHeight < stateBlockHeight {
|
||||
// the state should never be ahead of the store (this is under tendermint's control)
|
||||
PanicSanity(Fmt("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight))
|
||||
|
||||
} else if storeBlockHeight > stateBlockHeight+1 {
|
||||
// store should be at most one ahead of the state (this is under tendermint's control)
|
||||
PanicSanity(Fmt("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1))
|
||||
}
|
||||
|
||||
// Now either store is equal to state, or one ahead.
|
||||
// For each, consider all cases of where the app could be, given app <= store
|
||||
if storeBlockHeight == stateBlockHeight {
|
||||
// Tendermint ran Commit and saved the state.
|
||||
// Either the app is asking for replay, or we're all synced up.
|
||||
if appBlockHeight < storeBlockHeight {
|
||||
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
|
||||
return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, false)
|
||||
|
||||
} else if appBlockHeight == storeBlockHeight {
|
||||
// we're good!
|
||||
return appHash, h.checkAppHash(appHash)
|
||||
}
|
||||
|
||||
} else if storeBlockHeight == stateBlockHeight+1 {
|
||||
// We saved the block in the store but haven't updated the state,
|
||||
// so we'll need to replay a block using the WAL.
|
||||
if appBlockHeight < stateBlockHeight {
|
||||
// the app is further behind than it should be, so replay blocks
|
||||
// but leave the last block to go through the WAL
|
||||
return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, true)
|
||||
|
||||
} else if appBlockHeight == stateBlockHeight {
|
||||
// We haven't run Commit (both the state and app are one block behind),
|
||||
// so run through consensus with the real app
|
||||
log.Info("Replay last block using real app")
|
||||
return h.replayLastBlock(proxyApp.Consensus())
|
||||
|
||||
} else if appBlockHeight == storeBlockHeight {
|
||||
// We ran Commit, but didn't save the state, so run through consensus with mock app
|
||||
mockApp := newMockProxyApp(appHash)
|
||||
log.Info("Replay last block using mock app")
|
||||
return h.replayLastBlock(mockApp)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
PanicSanity("Should never happen")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) {
|
||||
// App is further behind than it should be, so we need to replay blocks.
|
||||
// We replay all blocks from appBlockHeight+1 to storeBlockHeight-1,
|
||||
// and let the final block be replayed through ReplayBlocks.
|
||||
// Note that we don't have an old version of the state,
|
||||
// so we by-pass state validation using applyBlock here.
|
||||
|
||||
var appHash []byte
|
||||
var err error
|
||||
finalBlock := storeBlockHeight
|
||||
if useReplayFunc {
|
||||
finalBlock -= 1
|
||||
}
|
||||
for i := appBlockHeight + 1; i <= finalBlock; i++ {
|
||||
log.Info("Applying block", "height", i)
|
||||
block := h.store.LoadBlock(i)
|
||||
appHash, err = sm.ApplyBlock(proxyApp.Consensus(), block)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
h.nBlocks += 1
|
||||
}
|
||||
|
||||
if useReplayFunc {
|
||||
// sync the final block
|
||||
return h.ReplayBlocks(appHash, finalBlock, proxyApp)
|
||||
}
|
||||
|
||||
return appHash, h.checkAppHash(appHash)
|
||||
}
|
||||
|
||||
// Replay the last block through the consensus and return the AppHash from after Commit.
|
||||
func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, error) {
|
||||
mempool := types.MockMempool{}
|
||||
cs := NewConsensusState(h.config, h.state, proxyApp, h.store, mempool)
|
||||
|
||||
evsw := types.NewEventSwitch()
|
||||
evsw.Start()
|
||||
defer evsw.Stop()
|
||||
cs.SetEventSwitch(evsw)
|
||||
newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1)
|
||||
|
||||
// run through the WAL, commit new block, stop
|
||||
cs.Start()
|
||||
<-newBlockCh // TODO: use a timeout and return err?
|
||||
cs.Stop()
|
||||
|
||||
h.nBlocks += 1
|
||||
|
||||
return cs.state.AppHash, nil
|
||||
}
|
||||
|
||||
func (h *Handshaker) checkAppHash(appHash []byte) error {
|
||||
if !bytes.Equal(h.state.AppHash, appHash) {
|
||||
panic(errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)).Error())
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
||||
func newMockProxyApp(appHash []byte) proxy.AppConnConsensus {
|
||||
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{appHash: appHash})
|
||||
cli, _ := clientCreator.NewABCIClient()
|
||||
return proxy.NewAppConnConsensus(cli)
|
||||
}
|
||||
|
||||
type mockProxyApp struct {
|
||||
abci.BaseApplication
|
||||
|
||||
appHash []byte
|
||||
}
|
||||
|
||||
func (mock *mockProxyApp) Commit() abci.Result {
|
||||
return abci.NewResultOK(mock.appHash, "")
|
||||
}
|
||||
|
|
|
@ -248,7 +248,7 @@ func newConsensusStateForReplay(config cfg.Config) *ConsensusState {
|
|||
state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
|
||||
|
||||
// Create proxyAppConn connection (consensus, mempool, query)
|
||||
proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), sm.NewHandshaker(config, state, blockStore))
|
||||
proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), NewHandshaker(config, state, blockStore))
|
||||
_, err := proxyApp.Start()
|
||||
if err != nil {
|
||||
Exit(Fmt("Error starting proxy app conns: %v", err))
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
package consensus
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
|
@ -11,8 +14,14 @@ import (
|
|||
|
||||
"github.com/tendermint/tendermint/config/tendermint_test"
|
||||
|
||||
. "github.com/tendermint/go-common"
|
||||
"github.com/tendermint/abci/example/dummy"
|
||||
cmn "github.com/tendermint/go-common"
|
||||
cfg "github.com/tendermint/go-config"
|
||||
"github.com/tendermint/go-crypto"
|
||||
dbm "github.com/tendermint/go-db"
|
||||
"github.com/tendermint/go-wire"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
|
@ -20,14 +29,23 @@ func init() {
|
|||
config = tendermint_test.ResetConfig("consensus_replay_test")
|
||||
}
|
||||
|
||||
// TODO: these tests ensure we can always recover from any state of the wal,
|
||||
// assuming it comes with a correct related state for the priv_validator.json.
|
||||
// It would be better to verify explicitly which states we can recover from without the wal
|
||||
// These tests ensure we can always recover from failure at any part of the consensus process.
|
||||
// There are two general failure scenarios: failure during consensus, and failure while applying the block.
|
||||
// Only the latter interacts with the app and store,
|
||||
// but the former has to deal with restrictions on re-use of priv_validator keys.
|
||||
// The `WAL Tests` are for failures during the consensus;
|
||||
// the `Handshake Tests` are for failures in applying the block.
|
||||
// With the help of the WAL, we can recover from it all!
|
||||
|
||||
var data_dir = path.Join(cmn.GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data")
|
||||
|
||||
//------------------------------------------------------------------------------------------
|
||||
// WAL Tests
|
||||
|
||||
// TODO: It would be better to verify explicitly which states we can recover from without the wal
|
||||
// and which ones we need the wal for - then we'd also be able to only flush the
|
||||
// wal writer when we need to, instead of with every message.
|
||||
|
||||
var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data")
|
||||
|
||||
// the priv validator changes step at these lines for a block with 1 val and 1 part
|
||||
var baseStepChanges = []int{3, 6, 8}
|
||||
|
||||
|
@ -50,7 +68,7 @@ type testCase struct {
|
|||
|
||||
func newTestCase(name string, stepChanges []int) *testCase {
|
||||
if len(stepChanges) != 3 {
|
||||
panic(Fmt("a full wal has 3 step changes! Got array %v", stepChanges))
|
||||
panic(cmn.Fmt("a full wal has 3 step changes! Got array %v", stepChanges))
|
||||
}
|
||||
return &testCase{
|
||||
name: name,
|
||||
|
@ -85,18 +103,19 @@ func readWAL(p string) string {
|
|||
|
||||
func writeWAL(walMsgs string) string {
|
||||
tempDir := os.TempDir()
|
||||
walDir := tempDir + "/wal" + RandStr(12)
|
||||
walDir := path.Join(tempDir, "/wal"+cmn.RandStr(12))
|
||||
walFile := path.Join(walDir, "wal")
|
||||
// Create WAL directory
|
||||
err := EnsureDir(walDir, 0700)
|
||||
err := cmn.EnsureDir(walDir, 0700)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// Write the needed WAL to file
|
||||
err = WriteFile(walDir+"/wal", []byte(walMsgs), 0600)
|
||||
err = cmn.WriteFile(walFile, []byte(walMsgs), 0600)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return walDir
|
||||
return walFile
|
||||
}
|
||||
|
||||
func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) {
|
||||
|
@ -104,14 +123,14 @@ func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) {
|
|||
select {
|
||||
case <-newBlockCh:
|
||||
case <-after:
|
||||
panic(Fmt("Timed out waiting for new block for case '%s' line %d", thisCase.name, i))
|
||||
panic(cmn.Fmt("Timed out waiting for new block for case '%s' line %d", thisCase.name, i))
|
||||
}
|
||||
}
|
||||
|
||||
func runReplayTest(t *testing.T, cs *ConsensusState, walDir string, newBlockCh chan interface{},
|
||||
func runReplayTest(t *testing.T, cs *ConsensusState, walFile string, newBlockCh chan interface{},
|
||||
thisCase *testCase, i int) {
|
||||
|
||||
cs.config.Set("cs_wal_dir", walDir)
|
||||
cs.config.Set("cs_wal_file", walFile)
|
||||
cs.Start()
|
||||
// Wait to make a new block.
|
||||
// This is just a signal that we haven't halted; its not something contained in the WAL itself.
|
||||
|
@ -137,7 +156,7 @@ func toPV(pv PrivValidator) *types.PrivValidator {
|
|||
|
||||
func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) {
|
||||
fmt.Println("-------------------------------------")
|
||||
log.Notice(Fmt("Starting replay test %v (of %d lines of WAL). Crash after = %v", thisCase.name, nLines, crashAfter))
|
||||
log.Notice(cmn.Fmt("Starting replay test %v (of %d lines of WAL). Crash after = %v", thisCase.name, nLines, crashAfter))
|
||||
|
||||
lineStep := nLines
|
||||
if crashAfter {
|
||||
|
@ -148,7 +167,7 @@ func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*Consensu
|
|||
lastMsg := split[nLines]
|
||||
|
||||
// we write those lines up to (not including) one with the signature
|
||||
walDir := writeWAL(strings.Join(split[:nLines], "\n") + "\n")
|
||||
walFile := writeWAL(strings.Join(split[:nLines], "\n") + "\n")
|
||||
|
||||
cs := fixedConsensusStateDummy()
|
||||
|
||||
|
@ -160,7 +179,7 @@ func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*Consensu
|
|||
|
||||
newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1)
|
||||
|
||||
return cs, newBlockCh, lastMsg, walDir
|
||||
return cs, newBlockCh, lastMsg, walFile
|
||||
}
|
||||
|
||||
func readTimedWALMessage(t *testing.T, walMsg string) TimedWALMessage {
|
||||
|
@ -177,12 +196,12 @@ func readTimedWALMessage(t *testing.T, walMsg string) TimedWALMessage {
|
|||
// Test the log at every iteration, and set the privVal last step
|
||||
// as if the log was written after signing, before the crash
|
||||
|
||||
func TestReplayCrashAfterWrite(t *testing.T) {
|
||||
func TestWALCrashAfterWrite(t *testing.T) {
|
||||
for _, thisCase := range testCases {
|
||||
split := strings.Split(thisCase.log, "\n")
|
||||
for i := 0; i < len(split)-1; i++ {
|
||||
cs, newBlockCh, _, walDir := setupReplayTest(thisCase, i+1, true)
|
||||
runReplayTest(t, cs, walDir, newBlockCh, thisCase, i+1)
|
||||
cs, newBlockCh, _, walFile := setupReplayTest(thisCase, i+1, true)
|
||||
runReplayTest(t, cs, walFile, newBlockCh, thisCase, i+1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -191,27 +210,27 @@ func TestReplayCrashAfterWrite(t *testing.T) {
|
|||
// Test the log as if we crashed after signing but before writing.
|
||||
// This relies on privValidator.LastSignature being set
|
||||
|
||||
func TestReplayCrashBeforeWritePropose(t *testing.T) {
|
||||
func TestWALCrashBeforeWritePropose(t *testing.T) {
|
||||
for _, thisCase := range testCases {
|
||||
lineNum := thisCase.proposeLine
|
||||
// setup replay test where last message is a proposal
|
||||
cs, newBlockCh, proposalMsg, walDir := setupReplayTest(thisCase, lineNum, false)
|
||||
cs, newBlockCh, proposalMsg, walFile := setupReplayTest(thisCase, lineNum, false)
|
||||
msg := readTimedWALMessage(t, proposalMsg)
|
||||
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
|
||||
// Set LastSig
|
||||
toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal)
|
||||
toPV(cs.privValidator).LastSignature = proposal.Proposal.Signature
|
||||
runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum)
|
||||
runReplayTest(t, cs, walFile, newBlockCh, thisCase, lineNum)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReplayCrashBeforeWritePrevote(t *testing.T) {
|
||||
func TestWALCrashBeforeWritePrevote(t *testing.T) {
|
||||
for _, thisCase := range testCases {
|
||||
testReplayCrashBeforeWriteVote(t, thisCase, thisCase.prevoteLine, types.EventStringCompleteProposal())
|
||||
}
|
||||
}
|
||||
|
||||
func TestReplayCrashBeforeWritePrecommit(t *testing.T) {
|
||||
func TestWALCrashBeforeWritePrecommit(t *testing.T) {
|
||||
for _, thisCase := range testCases {
|
||||
testReplayCrashBeforeWriteVote(t, thisCase, thisCase.precommitLine, types.EventStringPolka())
|
||||
}
|
||||
|
@ -219,7 +238,7 @@ func TestReplayCrashBeforeWritePrecommit(t *testing.T) {
|
|||
|
||||
func testReplayCrashBeforeWriteVote(t *testing.T, thisCase *testCase, lineNum int, eventString string) {
|
||||
// setup replay test where last message is a vote
|
||||
cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false)
|
||||
cs, newBlockCh, voteMsg, walFile := setupReplayTest(thisCase, lineNum, false)
|
||||
types.AddListenerForEvent(cs.evsw, "tester", eventString, func(data types.TMEventData) {
|
||||
msg := readTimedWALMessage(t, voteMsg)
|
||||
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
|
||||
|
@ -227,5 +246,396 @@ func testReplayCrashBeforeWriteVote(t *testing.T, thisCase *testCase, lineNum in
|
|||
toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)
|
||||
toPV(cs.privValidator).LastSignature = vote.Vote.Signature
|
||||
})
|
||||
runReplayTest(t, cs, walDir, newBlockCh, thisCase, lineNum)
|
||||
runReplayTest(t, cs, walFile, newBlockCh, thisCase, lineNum)
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------------------
|
||||
// Handshake Tests
|
||||
|
||||
var (
|
||||
NUM_BLOCKS = 6 // number of blocks in the test_data/many_blocks.cswal
|
||||
mempool = types.MockMempool{}
|
||||
|
||||
testPartSize int
|
||||
)
|
||||
|
||||
//---------------------------------------
|
||||
// Test handshake/replay
|
||||
|
||||
// 0 - all synced up
|
||||
// 1 - saved block but app and state are behind
|
||||
// 2 - save block and committed but state is behind
|
||||
var modes = []uint{0, 1, 2}
|
||||
|
||||
// Sync from scratch
|
||||
func TestHandshakeReplayAll(t *testing.T) {
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, 0, m)
|
||||
}
|
||||
}
|
||||
|
||||
// Sync many, not from scratch
|
||||
func TestHandshakeReplaySome(t *testing.T) {
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, 1, m)
|
||||
}
|
||||
}
|
||||
|
||||
// Sync from lagging by one
|
||||
func TestHandshakeReplayOne(t *testing.T) {
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, NUM_BLOCKS-1, m)
|
||||
}
|
||||
}
|
||||
|
||||
// Sync from caught up
|
||||
func TestHandshakeReplayNone(t *testing.T) {
|
||||
for _, m := range modes {
|
||||
testHandshakeReplay(t, NUM_BLOCKS, m)
|
||||
}
|
||||
}
|
||||
|
||||
// Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart the app and sync it up with the remaining blocks
|
||||
func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
|
||||
config := tendermint_test.ResetConfig("proxy_test_")
|
||||
|
||||
// copy the many_blocks file
|
||||
walBody, err := cmn.ReadFile(path.Join(data_dir, "many_blocks.cswal"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
walFile := writeWAL(string(walBody))
|
||||
config.Set("cs_wal_file", walFile)
|
||||
|
||||
privVal := types.LoadPrivValidator(config.GetString("priv_validator_file"))
|
||||
testPartSize = config.GetInt("block_part_size")
|
||||
|
||||
wal, err := NewWAL(walFile, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
chain, commits, err := makeBlockchainFromWAL(wal)
|
||||
if err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
||||
state, store := stateAndStore(config, privVal.PubKey)
|
||||
store.chain = chain
|
||||
store.commits = commits
|
||||
|
||||
// run the chain through state.ApplyBlock to build up the tendermint state
|
||||
latestAppHash := buildTMStateFromChain(config, state, chain, mode)
|
||||
|
||||
// make a new client creator
|
||||
dummyApp := dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "2"))
|
||||
clientCreator2 := proxy.NewLocalClientCreator(dummyApp)
|
||||
if nBlocks > 0 {
|
||||
// run nBlocks against a new client to build up the app state.
|
||||
// use a throwaway tendermint state
|
||||
proxyApp := proxy.NewAppConns(config, clientCreator2, nil)
|
||||
state, _ := stateAndStore(config, privVal.PubKey)
|
||||
buildAppStateFromChain(proxyApp, state, chain, nBlocks, mode)
|
||||
}
|
||||
|
||||
// now start the app using the handshake - it should sync
|
||||
handshaker := NewHandshaker(config, state, store)
|
||||
proxyApp := proxy.NewAppConns(config, clientCreator2, handshaker)
|
||||
if _, err := proxyApp.Start(); err != nil {
|
||||
t.Fatalf("Error starting proxy app connections: %v", err)
|
||||
}
|
||||
|
||||
// get the latest app hash from the app
|
||||
res, err := proxyApp.Query().InfoSync()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// the app hash should be synced up
|
||||
if !bytes.Equal(latestAppHash, res.LastBlockAppHash) {
|
||||
t.Fatalf("Expected app hashes to match after handshake/replay. got %X, expected %X", res.LastBlockAppHash, latestAppHash)
|
||||
}
|
||||
|
||||
expectedBlocksToSync := NUM_BLOCKS - nBlocks
|
||||
if nBlocks == NUM_BLOCKS && mode > 0 {
|
||||
expectedBlocksToSync += 1
|
||||
} else if nBlocks > 0 && mode == 1 {
|
||||
expectedBlocksToSync += 1
|
||||
}
|
||||
|
||||
if handshaker.NBlocks() != expectedBlocksToSync {
|
||||
t.Fatalf("Expected handshake to sync %d blocks, got %d", expectedBlocksToSync, handshaker.NBlocks())
|
||||
}
|
||||
}
|
||||
|
||||
func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) {
|
||||
err := st.ApplyBlock(nil, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func buildAppStateFromChain(proxyApp proxy.AppConns,
|
||||
state *sm.State, chain []*types.Block, nBlocks int, mode uint) {
|
||||
// start a new app without handshake, play nBlocks blocks
|
||||
if _, err := proxyApp.Start(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer proxyApp.Stop()
|
||||
switch mode {
|
||||
case 0:
|
||||
for i := 0; i < nBlocks; i++ {
|
||||
block := chain[i]
|
||||
applyBlock(state, block, proxyApp)
|
||||
}
|
||||
case 1, 2:
|
||||
for i := 0; i < nBlocks-1; i++ {
|
||||
block := chain[i]
|
||||
applyBlock(state, block, proxyApp)
|
||||
}
|
||||
|
||||
if mode == 2 {
|
||||
// update the dummy height and apphash
|
||||
// as if we ran commit but not
|
||||
applyBlock(state, chain[nBlocks-1], proxyApp)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func buildTMStateFromChain(config cfg.Config, state *sm.State, chain []*types.Block, mode uint) []byte {
|
||||
// run the whole chain against this client to build up the tendermint state
|
||||
clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "1")))
|
||||
proxyApp := proxy.NewAppConns(config, clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock))
|
||||
if _, err := proxyApp.Start(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer proxyApp.Stop()
|
||||
|
||||
var latestAppHash []byte
|
||||
|
||||
switch mode {
|
||||
case 0:
|
||||
// sync right up
|
||||
for _, block := range chain {
|
||||
applyBlock(state, block, proxyApp)
|
||||
}
|
||||
|
||||
latestAppHash = state.AppHash
|
||||
case 1, 2:
|
||||
// sync up to the penultimate as if we stored the block.
|
||||
// whether we commit or not depends on the appHash
|
||||
for _, block := range chain[:len(chain)-1] {
|
||||
applyBlock(state, block, proxyApp)
|
||||
}
|
||||
|
||||
// apply the final block to a state copy so we can
|
||||
// get the right next appHash but keep the state back
|
||||
stateCopy := state.Copy()
|
||||
applyBlock(stateCopy, chain[len(chain)-1], proxyApp)
|
||||
latestAppHash = stateCopy.AppHash
|
||||
}
|
||||
|
||||
return latestAppHash
|
||||
}
|
||||
|
||||
//--------------------------
|
||||
// utils for making blocks
|
||||
|
||||
func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) {
|
||||
// Search for height marker
|
||||
gr, found, err := wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(1))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if !found {
|
||||
return nil, nil, errors.New(cmn.Fmt("WAL does not contain height %d.", 1))
|
||||
}
|
||||
defer gr.Close()
|
||||
|
||||
log.Notice("Build a blockchain by reading from the WAL")
|
||||
|
||||
var blockParts *types.PartSet
|
||||
var blocks []*types.Block
|
||||
var commits []*types.Commit
|
||||
for {
|
||||
line, err := gr.ReadLine()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
piece, err := readPieceFromWAL([]byte(line))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if piece == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
switch p := piece.(type) {
|
||||
case *types.PartSetHeader:
|
||||
// if its not the first one, we have a full block
|
||||
if blockParts != nil {
|
||||
var n int
|
||||
block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block)
|
||||
blocks = append(blocks, block)
|
||||
}
|
||||
blockParts = types.NewPartSetFromHeader(*p)
|
||||
case *types.Part:
|
||||
_, err := blockParts.AddPart(p, false)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
case *types.Vote:
|
||||
if p.Type == types.VoteTypePrecommit {
|
||||
commit := &types.Commit{
|
||||
BlockID: p.BlockID,
|
||||
Precommits: []*types.Vote{p},
|
||||
}
|
||||
commits = append(commits, commit)
|
||||
}
|
||||
}
|
||||
}
|
||||
// grab the last block too
|
||||
var n int
|
||||
block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block)
|
||||
blocks = append(blocks, block)
|
||||
return blocks, commits, nil
|
||||
}
|
||||
|
||||
func readPieceFromWAL(msgBytes []byte) (interface{}, error) {
|
||||
// Skip over empty and meta lines
|
||||
if len(msgBytes) == 0 || msgBytes[0] == '#' {
|
||||
return nil, nil
|
||||
}
|
||||
var err error
|
||||
var msg TimedWALMessage
|
||||
wire.ReadJSON(&msg, msgBytes, &err)
|
||||
if err != nil {
|
||||
fmt.Println("MsgBytes:", msgBytes, string(msgBytes))
|
||||
return nil, fmt.Errorf("Error reading json data: %v", err)
|
||||
}
|
||||
|
||||
// for logging
|
||||
switch m := msg.Msg.(type) {
|
||||
case msgInfo:
|
||||
switch msg := m.Msg.(type) {
|
||||
case *ProposalMessage:
|
||||
return &msg.Proposal.BlockPartsHeader, nil
|
||||
case *BlockPartMessage:
|
||||
return msg.Part, nil
|
||||
case *VoteMessage:
|
||||
return msg.Vote, nil
|
||||
}
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// make some bogus txs
|
||||
func txsFunc(blockNum int) (txs []types.Tx) {
|
||||
for i := 0; i < 10; i++ {
|
||||
txs = append(txs, types.Tx([]byte{byte(blockNum), byte(i)}))
|
||||
}
|
||||
return txs
|
||||
}
|
||||
|
||||
// sign a commit vote
|
||||
func signCommit(chainID string, privVal *types.PrivValidator, height, round int, hash []byte, header types.PartSetHeader) *types.Vote {
|
||||
vote := &types.Vote{
|
||||
ValidatorIndex: 0,
|
||||
ValidatorAddress: privVal.Address,
|
||||
Height: height,
|
||||
Round: round,
|
||||
Type: types.VoteTypePrecommit,
|
||||
BlockID: types.BlockID{hash, header},
|
||||
}
|
||||
|
||||
sig := privVal.Sign(types.SignBytes(chainID, vote))
|
||||
vote.Signature = sig
|
||||
return vote
|
||||
}
|
||||
|
||||
// make a blockchain with one validator
|
||||
func makeBlockchain(t *testing.T, chainID string, nBlocks int, privVal *types.PrivValidator, proxyApp proxy.AppConns, state *sm.State) (blockchain []*types.Block, commits []*types.Commit) {
|
||||
|
||||
prevHash := state.LastBlockID.Hash
|
||||
lastCommit := new(types.Commit)
|
||||
prevParts := types.PartSetHeader{}
|
||||
valHash := state.Validators.Hash()
|
||||
prevBlockID := types.BlockID{prevHash, prevParts}
|
||||
|
||||
for i := 1; i < nBlocks+1; i++ {
|
||||
block, parts := types.MakeBlock(i, chainID, txsFunc(i), lastCommit,
|
||||
prevBlockID, valHash, state.AppHash, testPartSize)
|
||||
fmt.Println(i)
|
||||
fmt.Println(block.LastBlockID)
|
||||
err := state.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool)
|
||||
if err != nil {
|
||||
t.Fatal(i, err)
|
||||
}
|
||||
|
||||
voteSet := types.NewVoteSet(chainID, i, 0, types.VoteTypePrecommit, state.Validators)
|
||||
vote := signCommit(chainID, privVal, i, 0, block.Hash(), parts.Header())
|
||||
_, err = voteSet.AddVote(vote)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
prevHash = block.Hash()
|
||||
prevParts = parts.Header()
|
||||
lastCommit = voteSet.MakeCommit()
|
||||
prevBlockID = types.BlockID{prevHash, prevParts}
|
||||
|
||||
blockchain = append(blockchain, block)
|
||||
commits = append(commits, lastCommit)
|
||||
}
|
||||
return blockchain, commits
|
||||
}
|
||||
|
||||
// fresh state and mock store
|
||||
func stateAndStore(config cfg.Config, pubKey crypto.PubKey) (*sm.State, *mockBlockStore) {
|
||||
stateDB := dbm.NewMemDB()
|
||||
return sm.MakeGenesisState(stateDB, &types.GenesisDoc{
|
||||
ChainID: config.GetString("chain_id"),
|
||||
Validators: []types.GenesisValidator{
|
||||
types.GenesisValidator{pubKey, 10000, "test"},
|
||||
},
|
||||
AppHash: nil,
|
||||
}), NewMockBlockStore(config)
|
||||
}
|
||||
|
||||
//----------------------------------
|
||||
// mock block store
|
||||
|
||||
type mockBlockStore struct {
|
||||
config cfg.Config
|
||||
chain []*types.Block
|
||||
commits []*types.Commit
|
||||
}
|
||||
|
||||
// TODO: NewBlockStore(db.NewMemDB) ...
|
||||
func NewMockBlockStore(config cfg.Config) *mockBlockStore {
|
||||
return &mockBlockStore{config, nil, nil}
|
||||
}
|
||||
|
||||
func (bs *mockBlockStore) Height() int { return len(bs.chain) }
|
||||
func (bs *mockBlockStore) LoadBlock(height int) *types.Block { return bs.chain[height-1] }
|
||||
func (bs *mockBlockStore) LoadBlockMeta(height int) *types.BlockMeta {
|
||||
block := bs.chain[height-1]
|
||||
return &types.BlockMeta{
|
||||
BlockID: types.BlockID{block.Hash(), block.MakePartSet(bs.config.GetInt("block_part_size")).Header()},
|
||||
Header: block.Header,
|
||||
}
|
||||
}
|
||||
func (bs *mockBlockStore) LoadBlockPart(height int, index int) *types.Part { return nil }
|
||||
func (bs *mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
|
||||
}
|
||||
func (bs *mockBlockStore) LoadBlockCommit(height int) *types.Commit {
|
||||
return bs.commits[height-1]
|
||||
}
|
||||
func (bs *mockBlockStore) LoadSeenCommit(height int) *types.Commit {
|
||||
return bs.commits[height-1]
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"path"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -14,8 +14,6 @@ import (
|
|||
. "github.com/tendermint/go-common"
|
||||
cfg "github.com/tendermint/go-config"
|
||||
"github.com/tendermint/go-wire"
|
||||
bc "github.com/tendermint/tendermint/blockchain"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
|
@ -226,8 +224,8 @@ type ConsensusState struct {
|
|||
|
||||
config cfg.Config
|
||||
proxyAppConn proxy.AppConnConsensus
|
||||
blockStore *bc.BlockStore
|
||||
mempool *mempl.Mempool
|
||||
blockStore types.BlockStore
|
||||
mempool types.Mempool
|
||||
|
||||
privValidator PrivValidator // for signing votes
|
||||
|
||||
|
@ -255,7 +253,7 @@ type ConsensusState struct {
|
|||
done chan struct{}
|
||||
}
|
||||
|
||||
func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
|
||||
func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool) *ConsensusState {
|
||||
cs := &ConsensusState{
|
||||
config: config,
|
||||
proxyAppConn: proxyAppConn,
|
||||
|
@ -342,35 +340,18 @@ func (cs *ConsensusState) LoadCommit(height int) *types.Commit {
|
|||
func (cs *ConsensusState) OnStart() error {
|
||||
cs.BaseService.OnStart()
|
||||
|
||||
walDir := cs.config.GetString("cs_wal_dir")
|
||||
err := EnsureDir(walDir, 0700)
|
||||
walFile := cs.config.GetString("cs_wal_file")
|
||||
err := EnsureDir(path.Dir(walFile), 0700)
|
||||
if err != nil {
|
||||
log.Error("Error ensuring ConsensusState wal dir", "error", err.Error())
|
||||
return err
|
||||
}
|
||||
err = cs.OpenWAL(walDir)
|
||||
err = cs.OpenWAL(walFile)
|
||||
if err != nil {
|
||||
log.Error("Error loading ConsensusState wal", "error", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
// If the latest block was applied in the abci handshake,
|
||||
// we may not have written the current height to the wal,
|
||||
// so check here and write it if not found.
|
||||
// TODO: remove this and run the handhsake/replay
|
||||
// through the consensus state with a mock app
|
||||
gr, found, err := cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(cs.Height))
|
||||
if (err == io.EOF || !found) && cs.Step == RoundStepNewHeight {
|
||||
log.Warn("Height not found in wal. Writing new height", "height", cs.Height)
|
||||
rs := cs.RoundStateEvent()
|
||||
cs.wal.Save(rs)
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
if gr != nil {
|
||||
gr.Close()
|
||||
}
|
||||
|
||||
// we need the timeoutRoutine for replay so
|
||||
// we don't block on the tick chan.
|
||||
// NOTE: we will get a build up of garbage go routines
|
||||
|
@ -420,10 +401,10 @@ func (cs *ConsensusState) Wait() {
|
|||
}
|
||||
|
||||
// Open file to log all consensus messages and timeouts for deterministic accountability
|
||||
func (cs *ConsensusState) OpenWAL(walDir string) (err error) {
|
||||
func (cs *ConsensusState) OpenWAL(walFile string) (err error) {
|
||||
cs.mtx.Lock()
|
||||
defer cs.mtx.Unlock()
|
||||
wal, err := NewWAL(walDir, cs.config.GetBool("cs_wal_light"))
|
||||
wal, err := NewWAL(walFile, cs.config.GetBool("cs_wal_light"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -569,7 +550,6 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
|
|||
|
||||
// Reset fields based on state.
|
||||
validators := state.Validators
|
||||
height := state.LastBlockHeight + 1 // Next desired block height
|
||||
lastPrecommits := (*types.VoteSet)(nil)
|
||||
if cs.CommitRound > -1 && cs.Votes != nil {
|
||||
if !cs.Votes.Precommits(cs.CommitRound).HasTwoThirdsMajority() {
|
||||
|
@ -578,6 +558,9 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
|
|||
lastPrecommits = cs.Votes.Precommits(cs.CommitRound)
|
||||
}
|
||||
|
||||
// Next desired block height
|
||||
height := state.LastBlockHeight + 1
|
||||
|
||||
// RoundState fields
|
||||
cs.updateHeight(height)
|
||||
cs.updateRoundStep(0, RoundStepNewHeight)
|
||||
|
@ -622,11 +605,6 @@ func (cs *ConsensusState) newStep() {
|
|||
//-----------------------------------------
|
||||
// the main go routines
|
||||
|
||||
// a nice idea but probably more trouble than its worth
|
||||
func (cs *ConsensusState) stopTimer() {
|
||||
cs.timeoutTicker.Stop()
|
||||
}
|
||||
|
||||
// receiveRoutine handles messages which may cause state transitions.
|
||||
// it's argument (n) is the number of messages to process before exiting - use 0 to run forever
|
||||
// It keeps the RoundState and is the only thing that updates it.
|
||||
|
@ -765,7 +743,6 @@ func (cs *ConsensusState) enterNewRound(height int, round int) {
|
|||
if now := time.Now(); cs.StartTime.After(now) {
|
||||
log.Warn("Need to set a buffer and log.Warn() here for sanity.", "startTime", cs.StartTime, "now", now)
|
||||
}
|
||||
// cs.stopTimer()
|
||||
|
||||
log.Notice(Fmt("enterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
|
||||
|
||||
|
@ -945,8 +922,6 @@ func (cs *ConsensusState) enterPrevote(height int, round int) {
|
|||
// TODO: catchup event?
|
||||
}
|
||||
|
||||
// cs.stopTimer()
|
||||
|
||||
log.Info(Fmt("enterPrevote(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
|
||||
|
||||
// Sign and broadcast vote as necessary
|
||||
|
@ -1020,8 +995,6 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
|
|||
return
|
||||
}
|
||||
|
||||
// cs.stopTimer()
|
||||
|
||||
log.Info(Fmt("enterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
|
||||
|
||||
defer func() {
|
||||
|
@ -1235,7 +1208,8 @@ func (cs *ConsensusState) finalizeCommit(height int) {
|
|||
seenCommit := precommits.MakeCommit()
|
||||
cs.blockStore.SaveBlock(block, blockParts, seenCommit)
|
||||
} else {
|
||||
log.Warn("Why are we finalizeCommitting a block height we already have?", "height", block.Height)
|
||||
// Happens during replay if we already saved the block but didn't commit
|
||||
log.Info("Calling finalizeCommit on already stored block", "height", block.Height)
|
||||
}
|
||||
|
||||
fail.Fail() // XXX
|
||||
|
@ -1250,7 +1224,8 @@ func (cs *ConsensusState) finalizeCommit(height int) {
|
|||
// NOTE: the block.AppHash wont reflect these txs until the next block
|
||||
err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
|
||||
if err != nil {
|
||||
// TODO!
|
||||
log.Error("Error on ApplyBlock. Did the application crash? Please restart tendermint", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
fail.Fail() // XXX
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
#! /bin/bash
|
||||
|
||||
# XXX: removes tendermint dir
|
||||
|
||||
cd $GOPATH/src/github.com/tendermint/tendermint
|
||||
|
||||
# specify a dir to copy
|
||||
# NOTE: eventually we should replace with `tendermint init --test`
|
||||
# TODO: eventually we should replace with `tendermint init --test`
|
||||
DIR=$HOME/.tendermint_test/consensus_state_test
|
||||
|
||||
# XXX: remove tendermint dir
|
||||
rm -rf $HOME/.tendermint
|
||||
cp -r $DIR $HOME/.tendermint
|
||||
|
||||
|
@ -18,6 +19,7 @@ function reset(){
|
|||
reset
|
||||
|
||||
# empty block
|
||||
function empty_block(){
|
||||
tendermint node --proxy_app=dummy &> /dev/null &
|
||||
sleep 5
|
||||
killall tendermint
|
||||
|
@ -28,21 +30,40 @@ killall tendermint
|
|||
sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/empty_block.cswal
|
||||
|
||||
reset
|
||||
}
|
||||
|
||||
# small block 1
|
||||
# many blocks
|
||||
function many_blocks(){
|
||||
bash scripts/txs/random.sh 1000 36657 &> /dev/null &
|
||||
PID=$!
|
||||
tendermint node --proxy_app=dummy &> /dev/null &
|
||||
sleep 5
|
||||
sleep 7
|
||||
killall tendermint
|
||||
kill -9 $PID
|
||||
|
||||
sed '/HEIGHT: 7/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/many_blocks.cswal
|
||||
|
||||
reset
|
||||
}
|
||||
|
||||
|
||||
# small block 1
|
||||
function small_block1(){
|
||||
bash scripts/txs/random.sh 1000 36657 &> /dev/null &
|
||||
PID=$!
|
||||
tendermint node --proxy_app=dummy &> /dev/null &
|
||||
sleep 10
|
||||
killall tendermint
|
||||
kill -9 $PID
|
||||
|
||||
sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block1.cswal
|
||||
|
||||
reset
|
||||
}
|
||||
|
||||
|
||||
# small block 2 (part size = 512)
|
||||
function small_block2(){
|
||||
echo "" >> ~/.tendermint/config.toml
|
||||
echo "block_part_size = 512" >> ~/.tendermint/config.toml
|
||||
bash scripts/txs/random.sh 1000 36657 &> /dev/null &
|
||||
|
@ -55,4 +76,28 @@ kill -9 $PID
|
|||
sed '/HEIGHT: 2/Q' ~/.tendermint/data/cs.wal/wal > consensus/test_data/small_block2.cswal
|
||||
|
||||
reset
|
||||
}
|
||||
|
||||
|
||||
|
||||
case "$1" in
|
||||
"small_block1")
|
||||
small_block1
|
||||
;;
|
||||
"small_block2")
|
||||
small_block2
|
||||
;;
|
||||
"empty_block")
|
||||
empty_block
|
||||
;;
|
||||
"many_blocks")
|
||||
many_blocks
|
||||
;;
|
||||
*)
|
||||
small_block1
|
||||
small_block2
|
||||
empty_block
|
||||
many_blocks
|
||||
esac
|
||||
|
||||
|
||||
|
|
File diff suppressed because one or more lines are too long
|
@ -40,8 +40,8 @@ type WAL struct {
|
|||
light bool // ignore block parts
|
||||
}
|
||||
|
||||
func NewWAL(walDir string, light bool) (*WAL, error) {
|
||||
group, err := auto.OpenGroup(walDir + "/wal")
|
||||
func NewWAL(walFile string, light bool) (*WAL, error) {
|
||||
group, err := auto.OpenGroup(walFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -7,13 +7,13 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
abci "github.com/tendermint/abci/types"
|
||||
auto "github.com/tendermint/go-autofile"
|
||||
"github.com/tendermint/go-clist"
|
||||
. "github.com/tendermint/go-common"
|
||||
cfg "github.com/tendermint/go-config"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
abci "github.com/tendermint/abci/types"
|
||||
)
|
||||
|
||||
/*
|
||||
|
@ -249,7 +249,7 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
|
|||
|
||||
// Get the valid transactions remaining
|
||||
// If maxTxs is -1, there is no cap on returned transactions.
|
||||
func (mem *Mempool) Reap(maxTxs int) []types.Tx {
|
||||
func (mem *Mempool) Reap(maxTxs int) types.Txs {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
|
@ -263,7 +263,7 @@ func (mem *Mempool) Reap(maxTxs int) []types.Tx {
|
|||
}
|
||||
|
||||
// maxTxs: -1 means uncapped, 0 means none
|
||||
func (mem *Mempool) collectTxs(maxTxs int) []types.Tx {
|
||||
func (mem *Mempool) collectTxs(maxTxs int) types.Txs {
|
||||
if maxTxs == 0 {
|
||||
return []types.Tx{}
|
||||
} else if maxTxs < 0 {
|
||||
|
@ -281,7 +281,7 @@ func (mem *Mempool) collectTxs(maxTxs int) []types.Tx {
|
|||
// Mempool will discard these txs.
|
||||
// NOTE: this should be called *after* block is committed by consensus.
|
||||
// NOTE: unsafe; Lock/Unlock must be managed by caller
|
||||
func (mem *Mempool) Update(height int, txs []types.Tx) {
|
||||
func (mem *Mempool) Update(height int, txs types.Txs) {
|
||||
// TODO: check err ?
|
||||
mem.proxyAppConn.FlushSync() // To flush async resCb calls e.g. from CheckTx
|
||||
|
||||
|
|
12
node/node.go
12
node/node.go
|
@ -63,15 +63,19 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
|
|||
stateDB := dbm.NewDB("state", config.GetString("db_backend"), config.GetString("db_dir"))
|
||||
state := sm.GetState(config, stateDB)
|
||||
|
||||
// add the chainid and number of validators to the global config
|
||||
config.Set("chain_id", state.ChainID)
|
||||
config.Set("num_vals", state.Validators.Size())
|
||||
|
||||
// Create the proxyApp, which manages connections (consensus, mempool, query)
|
||||
proxyApp := proxy.NewAppConns(config, clientCreator, sm.NewHandshaker(config, state, blockStore))
|
||||
// and sync tendermint and the app by replaying any necessary blocks
|
||||
proxyApp := proxy.NewAppConns(config, clientCreator, consensus.NewHandshaker(config, state, blockStore))
|
||||
if _, err := proxyApp.Start(); err != nil {
|
||||
cmn.Exit(cmn.Fmt("Error starting proxy app connections: %v", err))
|
||||
}
|
||||
|
||||
// add the chainid and number of validators to the global config
|
||||
config.Set("chain_id", state.ChainID)
|
||||
config.Set("num_vals", state.Validators.Size())
|
||||
// reload the state (it may have been updated by the handshake)
|
||||
state = sm.LoadState(stateDB)
|
||||
|
||||
// Generate node PrivKey
|
||||
privKey := crypto.GenPrivKeyEd25519()
|
||||
|
|
|
@ -5,36 +5,19 @@ import (
|
|||
"github.com/tendermint/go-crypto"
|
||||
"github.com/tendermint/go-p2p"
|
||||
|
||||
abci "github.com/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
//-----------------------------------------------------
|
||||
// Interfaces for use by RPC
|
||||
// NOTE: these methods must be thread safe!
|
||||
|
||||
type BlockStore interface {
|
||||
Height() int
|
||||
LoadBlockMeta(height int) *types.BlockMeta
|
||||
LoadBlock(height int) *types.Block
|
||||
LoadSeenCommit(height int) *types.Commit
|
||||
LoadBlockCommit(height int) *types.Commit
|
||||
}
|
||||
//----------------------------------------------
|
||||
// These interfaces are used by RPC and must be thread safe
|
||||
|
||||
type Consensus interface {
|
||||
GetValidators() (int, []*types.Validator)
|
||||
GetRoundState() *consensus.RoundState
|
||||
}
|
||||
|
||||
type Mempool interface {
|
||||
Size() int
|
||||
CheckTx(types.Tx, func(*abci.Response)) error
|
||||
Reap(int) []types.Tx
|
||||
Flush()
|
||||
}
|
||||
|
||||
type P2P interface {
|
||||
Listeners() []p2p.Listener
|
||||
Peers() p2p.IPeerSet
|
||||
|
@ -44,16 +27,18 @@ type P2P interface {
|
|||
DialSeeds([]string)
|
||||
}
|
||||
|
||||
//----------------------------------------------
|
||||
|
||||
var (
|
||||
// external, thread safe interfaces
|
||||
eventSwitch types.EventSwitch
|
||||
proxyAppQuery proxy.AppConnQuery
|
||||
config cfg.Config
|
||||
|
||||
// interfaces defined above
|
||||
blockStore BlockStore
|
||||
// interfaces defined in types and above
|
||||
blockStore types.BlockStore
|
||||
mempool types.Mempool
|
||||
consensusState Consensus
|
||||
mempool Mempool
|
||||
p2pSwitch P2P
|
||||
|
||||
// objects
|
||||
|
@ -69,18 +54,18 @@ func SetEventSwitch(evsw types.EventSwitch) {
|
|||
eventSwitch = evsw
|
||||
}
|
||||
|
||||
func SetBlockStore(bs BlockStore) {
|
||||
func SetBlockStore(bs types.BlockStore) {
|
||||
blockStore = bs
|
||||
}
|
||||
|
||||
func SetMempool(mem types.Mempool) {
|
||||
mempool = mem
|
||||
}
|
||||
|
||||
func SetConsensusState(cs Consensus) {
|
||||
consensusState = cs
|
||||
}
|
||||
|
||||
func SetMempool(mem Mempool) {
|
||||
mempool = mem
|
||||
}
|
||||
|
||||
func SetSwitch(sw P2P) {
|
||||
p2pSwitch = sw
|
||||
}
|
||||
|
|
|
@ -9,47 +9,47 @@ type (
|
|||
ErrProxyAppConn error
|
||||
|
||||
ErrUnknownBlock struct {
|
||||
height int
|
||||
Height int
|
||||
}
|
||||
|
||||
ErrBlockHashMismatch struct {
|
||||
coreHash []byte
|
||||
appHash []byte
|
||||
height int
|
||||
CoreHash []byte
|
||||
AppHash []byte
|
||||
Height int
|
||||
}
|
||||
|
||||
ErrAppBlockHeightTooHigh struct {
|
||||
coreHeight int
|
||||
appHeight int
|
||||
CoreHeight int
|
||||
AppHeight int
|
||||
}
|
||||
|
||||
ErrLastStateMismatch struct {
|
||||
height int
|
||||
core []byte
|
||||
app []byte
|
||||
Height int
|
||||
Core []byte
|
||||
App []byte
|
||||
}
|
||||
|
||||
ErrStateMismatch struct {
|
||||
got *State
|
||||
expected *State
|
||||
Got *State
|
||||
Expected *State
|
||||
}
|
||||
)
|
||||
|
||||
func (e ErrUnknownBlock) Error() string {
|
||||
return Fmt("Could not find block #%d", e.height)
|
||||
return Fmt("Could not find block #%d", e.Height)
|
||||
}
|
||||
|
||||
func (e ErrBlockHashMismatch) Error() string {
|
||||
return Fmt("App block hash (%X) does not match core block hash (%X) for height %d", e.appHash, e.coreHash, e.height)
|
||||
return Fmt("App block hash (%X) does not match core block hash (%X) for height %d", e.AppHash, e.CoreHash, e.Height)
|
||||
}
|
||||
|
||||
func (e ErrAppBlockHeightTooHigh) Error() string {
|
||||
return Fmt("App block height (%d) is higher than core (%d)", e.appHeight, e.coreHeight)
|
||||
return Fmt("App block height (%d) is higher than core (%d)", e.AppHeight, e.CoreHeight)
|
||||
}
|
||||
func (e ErrLastStateMismatch) Error() string {
|
||||
return Fmt("Latest tendermint block (%d) LastAppHash (%X) does not match app's AppHash (%X)", e.height, e.core, e.app)
|
||||
return Fmt("Latest tendermint block (%d) LastAppHash (%X) does not match app's AppHash (%X)", e.Height, e.Core, e.App)
|
||||
}
|
||||
|
||||
func (e ErrStateMismatch) Error() string {
|
||||
return Fmt("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", e.got, e.expected)
|
||||
return Fmt("State after replay does not match saved state. Got ----\n%v\nExpected ----\n%v\n", e.Got, e.Expected)
|
||||
}
|
||||
|
|
|
@ -1,14 +1,12 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
|
||||
"github.com/ebuchman/fail-test"
|
||||
|
||||
abci "github.com/tendermint/abci/types"
|
||||
. "github.com/tendermint/go-common"
|
||||
cfg "github.com/tendermint/go-config"
|
||||
"github.com/tendermint/go-crypto"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
|
@ -54,10 +52,6 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC
|
|||
nextValSet.IncrementAccum(1)
|
||||
s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet)
|
||||
|
||||
// save state with updated height/blockhash/validators
|
||||
// but stale apphash, in case we fail between Commit and Save
|
||||
s.SaveIntermediate()
|
||||
|
||||
fail.Fail() // XXX
|
||||
|
||||
return nil
|
||||
|
@ -229,7 +223,7 @@ func (s *State) validateBlock(block *types.Block) error {
|
|||
|
||||
// Execute and commit block against app, save block and state
|
||||
func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus,
|
||||
block *types.Block, partsHeader types.PartSetHeader, mempool Mempool) error {
|
||||
block *types.Block, partsHeader types.PartSetHeader, mempool types.Mempool) error {
|
||||
|
||||
// Run the block on the State:
|
||||
// + update validator sets
|
||||
|
@ -250,7 +244,7 @@ func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConn
|
|||
// mempool must be locked during commit and update
|
||||
// because state is typically reset on Commit and old txs must be replayed
|
||||
// against committed state before new txs are run in the mempool, lest they be invalid
|
||||
func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, block *types.Block, mempool Mempool) error {
|
||||
func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, block *types.Block, mempool types.Mempool) error {
|
||||
mempool.Lock()
|
||||
defer mempool.Unlock()
|
||||
|
||||
|
@ -264,6 +258,7 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl
|
|||
log.Debug("Commit.Log: " + res.Log)
|
||||
}
|
||||
|
||||
log.Info("Committed state", "hash", res.Data)
|
||||
// Set the state's new AppHash
|
||||
s.AppHash = res.Data
|
||||
|
||||
|
@ -273,165 +268,23 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl
|
|||
return nil
|
||||
}
|
||||
|
||||
// Updates to the mempool need to be synchronized with committing a block
|
||||
// so apps can reset their transient state on Commit
|
||||
type Mempool interface {
|
||||
Lock()
|
||||
Unlock()
|
||||
Update(height int, txs []types.Tx)
|
||||
}
|
||||
|
||||
type MockMempool struct {
|
||||
}
|
||||
|
||||
func (m MockMempool) Lock() {}
|
||||
func (m MockMempool) Unlock() {}
|
||||
func (m MockMempool) Update(height int, txs []types.Tx) {}
|
||||
|
||||
//----------------------------------------------------------------
|
||||
// Handshake with app to sync to latest state of core by replaying blocks
|
||||
|
||||
// TODO: Should we move blockchain/store.go to its own package?
|
||||
type BlockStore interface {
|
||||
Height() int
|
||||
LoadBlock(height int) *types.Block
|
||||
LoadBlockMeta(height int) *types.BlockMeta
|
||||
}
|
||||
|
||||
type Handshaker struct {
|
||||
config cfg.Config
|
||||
state *State
|
||||
store BlockStore
|
||||
|
||||
nBlocks int // number of blocks applied to the state
|
||||
}
|
||||
|
||||
func NewHandshaker(config cfg.Config, state *State, store BlockStore) *Handshaker {
|
||||
return &Handshaker{config, state, store, 0}
|
||||
}
|
||||
|
||||
// TODO: retry the handshake/replay if it fails ?
|
||||
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
|
||||
// handshake is done via info request on the query conn
|
||||
res, err := proxyApp.Query().InfoSync()
|
||||
// Apply and commit a block, but without all the state validation.
|
||||
// Returns the application root hash (result of abci.Commit)
|
||||
func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) {
|
||||
var eventCache types.Fireable // nil
|
||||
_, err := execBlockOnProxyApp(eventCache, appConnConsensus, block)
|
||||
if err != nil {
|
||||
return errors.New(Fmt("Error calling Info: %v", err))
|
||||
log.Warn("Error executing block on proxy app", "height", block.Height, "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
blockHeight := int(res.LastBlockHeight) // XXX: beware overflow
|
||||
appHash := res.LastBlockAppHash
|
||||
|
||||
log.Notice("ABCI Handshake", "appHeight", blockHeight, "appHash", appHash)
|
||||
|
||||
// TODO: check version
|
||||
|
||||
// replay blocks up to the latest in the blockstore
|
||||
err = h.ReplayBlocks(appHash, blockHeight, proxyApp.Consensus())
|
||||
if err != nil {
|
||||
return errors.New(Fmt("Error on replay: %v", err))
|
||||
// Commit block, get hash back
|
||||
res := appConnConsensus.CommitSync()
|
||||
if res.IsErr() {
|
||||
log.Warn("Error in proxyAppConn.CommitSync", "error", res)
|
||||
return nil, res
|
||||
}
|
||||
|
||||
// Save the state
|
||||
h.state.Save()
|
||||
|
||||
// TODO: (on restart) replay mempool
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Replay all blocks after blockHeight and ensure the result matches the current state.
|
||||
func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, appConnConsensus proxy.AppConnConsensus) error {
|
||||
|
||||
storeBlockHeight := h.store.Height()
|
||||
stateBlockHeight := h.state.LastBlockHeight
|
||||
log.Notice("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight)
|
||||
|
||||
if storeBlockHeight == 0 {
|
||||
return nil
|
||||
} else if storeBlockHeight < appBlockHeight {
|
||||
// if the app is ahead, there's nothing we can do
|
||||
return ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight}
|
||||
|
||||
} else if storeBlockHeight == appBlockHeight {
|
||||
// We ran Commit, but if we crashed before state.Save(),
|
||||
// load the intermediate state and update the state.AppHash.
|
||||
// NOTE: If ABCI allowed rollbacks, we could just replay the
|
||||
// block even though it's been committed
|
||||
stateAppHash := h.state.AppHash
|
||||
lastBlockAppHash := h.store.LoadBlock(storeBlockHeight).AppHash
|
||||
|
||||
if bytes.Equal(stateAppHash, appHash) {
|
||||
// we're all synced up
|
||||
log.Debug("ABCI RelpayBlocks: Already synced")
|
||||
} else if bytes.Equal(stateAppHash, lastBlockAppHash) {
|
||||
// we crashed after commit and before saving state,
|
||||
// so load the intermediate state and update the hash
|
||||
h.state.LoadIntermediate()
|
||||
h.state.AppHash = appHash
|
||||
log.Debug("ABCI RelpayBlocks: Loaded intermediate state and updated state.AppHash")
|
||||
} else {
|
||||
PanicSanity(Fmt("Unexpected state.AppHash: state.AppHash %X; app.AppHash %X, lastBlock.AppHash %X", stateAppHash, appHash, lastBlockAppHash))
|
||||
|
||||
}
|
||||
return nil
|
||||
|
||||
} else if storeBlockHeight == appBlockHeight+1 &&
|
||||
storeBlockHeight == stateBlockHeight+1 {
|
||||
// We crashed after saving the block
|
||||
// but before Commit (both the state and app are behind),
|
||||
// so just replay the block
|
||||
|
||||
// check that the lastBlock.AppHash matches the state apphash
|
||||
block := h.store.LoadBlock(storeBlockHeight)
|
||||
if !bytes.Equal(block.Header.AppHash, appHash) {
|
||||
return ErrLastStateMismatch{storeBlockHeight, block.Header.AppHash, appHash}
|
||||
}
|
||||
|
||||
blockMeta := h.store.LoadBlockMeta(storeBlockHeight)
|
||||
|
||||
h.nBlocks += 1
|
||||
var eventCache types.Fireable // nil
|
||||
|
||||
// replay the latest block
|
||||
return h.state.ApplyBlock(eventCache, appConnConsensus, block, blockMeta.BlockID.PartsHeader, MockMempool{})
|
||||
} else if storeBlockHeight != stateBlockHeight {
|
||||
// unless we failed before committing or saving state (previous 2 case),
|
||||
// the store and state should be at the same height!
|
||||
PanicSanity(Fmt("Expected storeHeight (%d) and stateHeight (%d) to match.", storeBlockHeight, stateBlockHeight))
|
||||
} else {
|
||||
// store is more than one ahead,
|
||||
// so app wants to replay many blocks
|
||||
|
||||
// replay all blocks starting with appBlockHeight+1
|
||||
var eventCache types.Fireable // nil
|
||||
|
||||
// TODO: use stateBlockHeight instead and let the consensus state
|
||||
// do the replay
|
||||
|
||||
var appHash []byte
|
||||
for i := appBlockHeight + 1; i <= storeBlockHeight; i++ {
|
||||
h.nBlocks += 1
|
||||
block := h.store.LoadBlock(i)
|
||||
_, err := execBlockOnProxyApp(eventCache, appConnConsensus, block)
|
||||
if err != nil {
|
||||
log.Warn("Error executing block on proxy app", "height", i, "err", err)
|
||||
return err
|
||||
}
|
||||
// Commit block, get hash back
|
||||
res := appConnConsensus.CommitSync()
|
||||
if res.IsErr() {
|
||||
log.Warn("Error in proxyAppConn.CommitSync", "error", res)
|
||||
return res
|
||||
}
|
||||
if res.Log != "" {
|
||||
log.Info("Commit.Log: " + res.Log)
|
||||
}
|
||||
appHash = res.Data
|
||||
}
|
||||
if !bytes.Equal(h.state.AppHash, appHash) {
|
||||
return errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
if res.Log != "" {
|
||||
log.Info("Commit.Log: " + res.Log)
|
||||
}
|
||||
return res.Data, nil
|
||||
}
|
||||
|
|
|
@ -1,209 +0,0 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
"github.com/tendermint/tendermint/config/tendermint_test"
|
||||
// . "github.com/tendermint/go-common"
|
||||
"github.com/tendermint/abci/example/dummy"
|
||||
cfg "github.com/tendermint/go-config"
|
||||
"github.com/tendermint/go-crypto"
|
||||
dbm "github.com/tendermint/go-db"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
var (
|
||||
privKey = crypto.GenPrivKeyEd25519FromSecret([]byte("handshake_test"))
|
||||
chainID = "handshake_chain"
|
||||
nBlocks = 5
|
||||
mempool = MockMempool{}
|
||||
testPartSize = 65536
|
||||
)
|
||||
|
||||
//---------------------------------------
|
||||
// Test block execution
|
||||
|
||||
func TestExecBlock(t *testing.T) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
//---------------------------------------
|
||||
// Test handshake/replay
|
||||
|
||||
// Sync from scratch
|
||||
func TestHandshakeReplayAll(t *testing.T) {
|
||||
testHandshakeReplay(t, 0)
|
||||
}
|
||||
|
||||
// Sync many, not from scratch
|
||||
func TestHandshakeReplaySome(t *testing.T) {
|
||||
testHandshakeReplay(t, 1)
|
||||
}
|
||||
|
||||
// Sync from lagging by one
|
||||
func TestHandshakeReplayOne(t *testing.T) {
|
||||
testHandshakeReplay(t, nBlocks-1)
|
||||
}
|
||||
|
||||
// Sync from caught up
|
||||
func TestHandshakeReplayNone(t *testing.T) {
|
||||
testHandshakeReplay(t, nBlocks)
|
||||
}
|
||||
|
||||
// Make some blocks. Start a fresh app and apply n blocks. Then restart the app and sync it up with the remaining blocks
|
||||
func testHandshakeReplay(t *testing.T, n int) {
|
||||
config := tendermint_test.ResetConfig("proxy_test_")
|
||||
|
||||
state, store := stateAndStore(config)
|
||||
clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "1")))
|
||||
clientCreator2 := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "2")))
|
||||
proxyApp := proxy.NewAppConns(config, clientCreator, NewHandshaker(config, state, store))
|
||||
if _, err := proxyApp.Start(); err != nil {
|
||||
t.Fatalf("Error starting proxy app connections: %v", err)
|
||||
}
|
||||
chain := makeBlockchain(t, proxyApp, state)
|
||||
store.chain = chain //
|
||||
latestAppHash := state.AppHash
|
||||
proxyApp.Stop()
|
||||
|
||||
if n > 0 {
|
||||
// start a new app without handshake, play n blocks
|
||||
proxyApp = proxy.NewAppConns(config, clientCreator2, nil)
|
||||
if _, err := proxyApp.Start(); err != nil {
|
||||
t.Fatalf("Error starting proxy app connections: %v", err)
|
||||
}
|
||||
state2, _ := stateAndStore(config)
|
||||
for i := 0; i < n; i++ {
|
||||
block := chain[i]
|
||||
err := state2.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
proxyApp.Stop()
|
||||
}
|
||||
|
||||
// now start it with the handshake
|
||||
handshaker := NewHandshaker(config, state, store)
|
||||
proxyApp = proxy.NewAppConns(config, clientCreator2, handshaker)
|
||||
if _, err := proxyApp.Start(); err != nil {
|
||||
t.Fatalf("Error starting proxy app connections: %v", err)
|
||||
}
|
||||
|
||||
// get the latest app hash from the app
|
||||
res, err := proxyApp.Query().InfoSync()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// the app hash should be synced up
|
||||
if !bytes.Equal(latestAppHash, res.LastBlockAppHash) {
|
||||
t.Fatalf("Expected app hashes to match after handshake/replay. got %X, expected %X", res.LastBlockAppHash, latestAppHash)
|
||||
}
|
||||
|
||||
if handshaker.nBlocks != nBlocks-n {
|
||||
t.Fatalf("Expected handshake to sync %d blocks, got %d", nBlocks-n, handshaker.nBlocks)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//--------------------------
|
||||
// utils for making blocks
|
||||
|
||||
// make some bogus txs
|
||||
func txsFunc(blockNum int) (txs []types.Tx) {
|
||||
for i := 0; i < 10; i++ {
|
||||
txs = append(txs, types.Tx([]byte{byte(blockNum), byte(i)}))
|
||||
}
|
||||
return txs
|
||||
}
|
||||
|
||||
// sign a commit vote
|
||||
func signCommit(height, round int, hash []byte, header types.PartSetHeader) *types.Vote {
|
||||
vote := &types.Vote{
|
||||
ValidatorIndex: 0,
|
||||
ValidatorAddress: privKey.PubKey().Address(),
|
||||
Height: height,
|
||||
Round: round,
|
||||
Type: types.VoteTypePrecommit,
|
||||
BlockID: types.BlockID{hash, header},
|
||||
}
|
||||
|
||||
sig := privKey.Sign(types.SignBytes(chainID, vote))
|
||||
vote.Signature = sig
|
||||
return vote
|
||||
}
|
||||
|
||||
// make a blockchain with one validator
|
||||
func makeBlockchain(t *testing.T, proxyApp proxy.AppConns, state *State) (blockchain []*types.Block) {
|
||||
|
||||
prevHash := state.LastBlockID.Hash
|
||||
lastCommit := new(types.Commit)
|
||||
prevParts := types.PartSetHeader{}
|
||||
valHash := state.Validators.Hash()
|
||||
prevBlockID := types.BlockID{prevHash, prevParts}
|
||||
|
||||
for i := 1; i < nBlocks+1; i++ {
|
||||
block, parts := types.MakeBlock(i, chainID, txsFunc(i), lastCommit,
|
||||
prevBlockID, valHash, state.AppHash, testPartSize)
|
||||
fmt.Println(i)
|
||||
fmt.Println(prevBlockID)
|
||||
fmt.Println(block.LastBlockID)
|
||||
err := state.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool)
|
||||
if err != nil {
|
||||
t.Fatal(i, err)
|
||||
}
|
||||
|
||||
voteSet := types.NewVoteSet(chainID, i, 0, types.VoteTypePrecommit, state.Validators)
|
||||
vote := signCommit(i, 0, block.Hash(), parts.Header())
|
||||
_, err = voteSet.AddVote(vote)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
blockchain = append(blockchain, block)
|
||||
prevHash = block.Hash()
|
||||
prevParts = parts.Header()
|
||||
lastCommit = voteSet.MakeCommit()
|
||||
prevBlockID = types.BlockID{prevHash, prevParts}
|
||||
}
|
||||
return blockchain
|
||||
}
|
||||
|
||||
// fresh state and mock store
|
||||
func stateAndStore(config cfg.Config) (*State, *mockBlockStore) {
|
||||
stateDB := dbm.NewMemDB()
|
||||
return MakeGenesisState(stateDB, &types.GenesisDoc{
|
||||
ChainID: chainID,
|
||||
Validators: []types.GenesisValidator{
|
||||
types.GenesisValidator{privKey.PubKey(), 10000, "test"},
|
||||
},
|
||||
AppHash: nil,
|
||||
}), NewMockBlockStore(config, nil)
|
||||
}
|
||||
|
||||
//----------------------------------
|
||||
// mock block store
|
||||
|
||||
type mockBlockStore struct {
|
||||
config cfg.Config
|
||||
chain []*types.Block
|
||||
}
|
||||
|
||||
func NewMockBlockStore(config cfg.Config, chain []*types.Block) *mockBlockStore {
|
||||
return &mockBlockStore{config, chain}
|
||||
}
|
||||
|
||||
func (bs *mockBlockStore) Height() int { return len(bs.chain) }
|
||||
func (bs *mockBlockStore) LoadBlock(height int) *types.Block { return bs.chain[height-1] }
|
||||
func (bs *mockBlockStore) LoadBlockMeta(height int) *types.BlockMeta {
|
||||
block := bs.chain[height-1]
|
||||
return &types.BlockMeta{
|
||||
BlockID: types.BlockID{block.Hash(), block.MakePartSet(bs.config.GetInt("block_part_size")).Header()},
|
||||
Header: block.Header,
|
||||
}
|
||||
}
|
|
@ -14,8 +14,7 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
stateKey = []byte("stateKey")
|
||||
stateIntermediateKey = []byte("stateIntermediateKey")
|
||||
stateKey = []byte("stateKey")
|
||||
)
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
@ -82,35 +81,6 @@ func (s *State) Save() {
|
|||
s.db.SetSync(stateKey, s.Bytes())
|
||||
}
|
||||
|
||||
func (s *State) SaveIntermediate() {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
s.db.SetSync(stateIntermediateKey, s.Bytes())
|
||||
}
|
||||
|
||||
// Load the intermediate state into the current state
|
||||
// and do some sanity checks
|
||||
func (s *State) LoadIntermediate() {
|
||||
s2 := loadState(s.db, stateIntermediateKey)
|
||||
if s.ChainID != s2.ChainID {
|
||||
PanicSanity(Fmt("State mismatch for ChainID. Got %v, Expected %v", s2.ChainID, s.ChainID))
|
||||
}
|
||||
|
||||
if s.LastBlockHeight+1 != s2.LastBlockHeight {
|
||||
PanicSanity(Fmt("State mismatch for LastBlockHeight. Got %v, Expected %v", s2.LastBlockHeight, s.LastBlockHeight+1))
|
||||
}
|
||||
|
||||
if !bytes.Equal(s.Validators.Hash(), s2.LastValidators.Hash()) {
|
||||
PanicSanity(Fmt("State mismatch for LastValidators. Got %X, Expected %X", s2.LastValidators.Hash(), s.Validators.Hash()))
|
||||
}
|
||||
|
||||
if !bytes.Equal(s.AppHash, s2.AppHash) {
|
||||
PanicSanity(Fmt("State mismatch for AppHash. Got %X, Expected %X", s2.AppHash, s.AppHash))
|
||||
}
|
||||
|
||||
s.setBlockAndValidators(s2.LastBlockHeight, s2.LastBlockID, s2.LastBlockTime, s2.Validators.Copy(), s2.LastValidators.Copy())
|
||||
}
|
||||
|
||||
func (s *State) Equals(s2 *State) bool {
|
||||
return bytes.Equal(s.Bytes(), s2.Bytes())
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
#! /bin/bash
|
||||
|
||||
cd $GOPATH/src/github.com/tendermint/tendermint
|
||||
cd "$GOPATH/src/github.com/tendermint/tendermint"
|
||||
|
||||
bash ./test/persist/test_failure_indices.sh
|
||||
|
|
|
@ -3,30 +3,46 @@
|
|||
|
||||
export TMROOT=$HOME/.tendermint_persist
|
||||
|
||||
rm -rf $TMROOT
|
||||
rm -rf "$TMROOT"
|
||||
tendermint init
|
||||
|
||||
TM_CMD="tendermint node --log_level=debug" # &> tendermint_${name}.log"
|
||||
DUMMY_CMD="dummy --persist $TMROOT/dummy" # &> dummy_${name}.log"
|
||||
|
||||
|
||||
function start_procs(){
|
||||
name=$1
|
||||
indexToFail=$2
|
||||
echo "Starting persistent dummy and tendermint"
|
||||
dummy --persist $TMROOT/dummy &> "dummy_${name}.log" &
|
||||
if [[ "$CIRCLECI" == true ]]; then
|
||||
$DUMMY_CMD &
|
||||
else
|
||||
$DUMMY_CMD &> "dummy_${name}.log" &
|
||||
fi
|
||||
PID_DUMMY=$!
|
||||
if [[ "$indexToFail" == "" ]]; then
|
||||
# run in background, dont fail
|
||||
tendermint node --log_level=debug &> tendermint_${name}.log &
|
||||
if [[ "$CIRCLECI" == true ]]; then
|
||||
$TM_CMD &
|
||||
else
|
||||
$TM_CMD &> "tendermint_${name}.log" &
|
||||
fi
|
||||
PID_TENDERMINT=$!
|
||||
else
|
||||
# run in foreground, fail
|
||||
FAIL_TEST_INDEX=$indexToFail tendermint node --log_level=debug &> tendermint_${name}.log
|
||||
if [[ "$CIRCLECI" == true ]]; then
|
||||
FAIL_TEST_INDEX=$indexToFail $TM_CMD
|
||||
else
|
||||
FAIL_TEST_INDEX=$indexToFail $TM_CMD &> "tendermint_${name}.log"
|
||||
fi
|
||||
PID_TENDERMINT=$!
|
||||
fi
|
||||
}
|
||||
|
||||
function kill_procs(){
|
||||
kill -9 $PID_DUMMY $PID_TENDERMINT
|
||||
wait $PID_DUMMY
|
||||
wait $PID_TENDERMINT
|
||||
kill -9 "$PID_DUMMY" "$PID_TENDERMINT"
|
||||
wait "$PID_DUMMY"
|
||||
wait "$PID_TENDERMINT"
|
||||
}
|
||||
|
||||
|
||||
|
@ -43,10 +59,10 @@ function send_txs(){
|
|||
|
||||
# send a bunch of txs over a few blocks
|
||||
echo "Node is up, sending txs"
|
||||
for i in `seq 1 5`; do
|
||||
for j in `seq 1 100`; do
|
||||
tx=`head -c 8 /dev/urandom | hexdump -ve '1/1 "%.2X"'`
|
||||
curl -s $addr/broadcast_tx_async?tx=0x$tx &> /dev/null
|
||||
for i in $(seq 1 5); do
|
||||
for _ in $(seq 1 100); do
|
||||
tx=$(head -c 8 /dev/urandom | hexdump -ve '1/1 "%.2X"')
|
||||
curl -s "$addr/broadcast_tx_async?tx=0x$tx" &> /dev/null
|
||||
done
|
||||
sleep 1
|
||||
done
|
||||
|
@ -54,33 +70,33 @@ function send_txs(){
|
|||
|
||||
|
||||
failsStart=0
|
||||
fails=`grep -r "fail.Fail" --include \*.go . | wc -l`
|
||||
failsEnd=$(($fails-1))
|
||||
fails=$(grep -r "fail.Fail" --include \*.go . | wc -l)
|
||||
failsEnd=$((fails-1))
|
||||
|
||||
for failIndex in `seq $failsStart $failsEnd`; do
|
||||
for failIndex in $(seq $failsStart $failsEnd); do
|
||||
echo ""
|
||||
echo "* Test FailIndex $failIndex"
|
||||
# test failure at failIndex
|
||||
|
||||
send_txs &
|
||||
start_procs 1 $failIndex
|
||||
start_procs 1 "$failIndex"
|
||||
|
||||
# tendermint should fail when it hits the fail index
|
||||
kill -9 $PID_DUMMY
|
||||
wait $PID_DUMMY
|
||||
kill -9 "$PID_DUMMY"
|
||||
wait "$PID_DUMMY"
|
||||
|
||||
start_procs 2
|
||||
|
||||
# wait for node to handshake and make a new block
|
||||
addr="localhost:46657"
|
||||
curl -s $addr/status > /dev/null
|
||||
curl -s "$addr/status" > /dev/null
|
||||
ERR=$?
|
||||
i=0
|
||||
while [ "$ERR" != 0 ]; do
|
||||
sleep 1
|
||||
curl -s $addr/status > /dev/null
|
||||
curl -s "$addr/status" > /dev/null
|
||||
ERR=$?
|
||||
i=$(($i + 1))
|
||||
i=$((i + 1))
|
||||
if [[ $i == 10 ]]; then
|
||||
echo "Timed out waiting for tendermint to start"
|
||||
exit 1
|
||||
|
@ -88,11 +104,11 @@ for failIndex in `seq $failsStart $failsEnd`; do
|
|||
done
|
||||
|
||||
# wait for a new block
|
||||
h1=`curl -s $addr/status | jq .result[1].latest_block_height`
|
||||
h1=$(curl -s $addr/status | jq .result[1].latest_block_height)
|
||||
h2=$h1
|
||||
while [ "$h2" == "$h1" ]; do
|
||||
sleep 1
|
||||
h2=`curl -s $addr/status | jq .result[1].latest_block_height`
|
||||
h2=$(curl -s $addr/status | jq .result[1].latest_block_height)
|
||||
done
|
||||
|
||||
kill_procs
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
#! /bin/bash
|
||||
set -e
|
||||
|
||||
echo `pwd`
|
||||
pwd
|
||||
|
||||
BRANCH=`git rev-parse --abbrev-ref HEAD`
|
||||
BRANCH=$(git rev-parse --abbrev-ref HEAD)
|
||||
echo "Current branch: $BRANCH"
|
||||
|
||||
bash test/test_cover.sh
|
||||
|
|
|
@ -5,7 +5,7 @@ PKGS=$(go list github.com/tendermint/tendermint/... | grep -v /vendor/)
|
|||
set -e
|
||||
echo "mode: atomic" > coverage.txt
|
||||
for pkg in ${PKGS[@]}; do
|
||||
go test -v -timeout 30m -race -coverprofile=profile.out -covermode=atomic $pkg
|
||||
go test -v -timeout 30m -race -coverprofile=profile.out -covermode=atomic "$pkg"
|
||||
if [ -f profile.out ]; then
|
||||
tail -n +2 profile.out >> coverage.txt;
|
||||
rm profile.out
|
||||
|
|
|
@ -18,30 +18,30 @@ LIBS_MAKE_TEST=(go-rpc go-wire abci)
|
|||
for lib in "${LIBS_GO_TEST[@]}"; do
|
||||
|
||||
# checkout vendored version of lib
|
||||
bash scripts/glide/checkout.sh $GLIDE $lib
|
||||
bash scripts/glide/checkout.sh "$GLIDE" "$lib"
|
||||
|
||||
echo "Testing $lib ..."
|
||||
go test -v --race github.com/tendermint/$lib/...
|
||||
go test -v --race "github.com/tendermint/$lib/..."
|
||||
if [[ "$?" != 0 ]]; then
|
||||
echo "FAIL"
|
||||
exit 1
|
||||
fi
|
||||
done
|
||||
|
||||
DIR=`pwd`
|
||||
DIR=$(pwd)
|
||||
for lib in "${LIBS_MAKE_TEST[@]}"; do
|
||||
|
||||
# checkout vendored version of lib
|
||||
bash scripts/glide/checkout.sh $GLIDE $lib
|
||||
bash scripts/glide/checkout.sh "$GLIDE" "$lib"
|
||||
|
||||
echo "Testing $lib ..."
|
||||
cd $GOPATH/src/github.com/tendermint/$lib
|
||||
cd "$GOPATH/src/github.com/tendermint/$lib"
|
||||
make test
|
||||
if [[ "$?" != 0 ]]; then
|
||||
echo "FAIL"
|
||||
exit 1
|
||||
fi
|
||||
cd $DIR
|
||||
cd "$DIR"
|
||||
done
|
||||
|
||||
echo ""
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
package types
|
||||
|
||||
import (
|
||||
abci "github.com/tendermint/abci/types"
|
||||
)
|
||||
|
||||
//------------------------------------------------------
|
||||
// blockchain services types
|
||||
// NOTE: Interfaces used by RPC must be thread safe!
|
||||
//------------------------------------------------------
|
||||
|
||||
//------------------------------------------------------
|
||||
// mempool
|
||||
|
||||
// Updates to the mempool need to be synchronized with committing a block
|
||||
// so apps can reset their transient state on Commit
|
||||
type Mempool interface {
|
||||
Lock()
|
||||
Unlock()
|
||||
|
||||
Size() int
|
||||
CheckTx(Tx, func(*abci.Response)) error
|
||||
Reap(int) Txs
|
||||
Update(height int, txs Txs)
|
||||
Flush()
|
||||
}
|
||||
|
||||
type MockMempool struct {
|
||||
}
|
||||
|
||||
func (m MockMempool) Lock() {}
|
||||
func (m MockMempool) Unlock() {}
|
||||
func (m MockMempool) Size() int { return 0 }
|
||||
func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil }
|
||||
func (m MockMempool) Reap(n int) Txs { return Txs{} }
|
||||
func (m MockMempool) Update(height int, txs Txs) {}
|
||||
func (m MockMempool) Flush() {}
|
||||
|
||||
//------------------------------------------------------
|
||||
// blockstore
|
||||
|
||||
type BlockStoreRPC interface {
|
||||
Height() int
|
||||
|
||||
LoadBlockMeta(height int) *BlockMeta
|
||||
LoadBlock(height int) *Block
|
||||
LoadBlockPart(height int, index int) *Part
|
||||
|
||||
LoadBlockCommit(height int) *Commit
|
||||
LoadSeenCommit(height int) *Commit
|
||||
}
|
||||
|
||||
type BlockStore interface {
|
||||
BlockStoreRPC
|
||||
SaveBlock(block *Block, blockParts *PartSet, seenCommit *Commit)
|
||||
}
|
Loading…
Reference in New Issue