proxy: remove Handshaker from proxy pkg (#2437)

Handshaker was removed from proxy package so it can be called
independently of starting the abci app connections and can return a
result to the caller.
This commit is contained in:
Ethan Buchman 2018-09-19 09:35:09 -04:00 committed by Alexander Simmerl
parent 3e099f75c7
commit 91a8767083
6 changed files with 38 additions and 36 deletions

View File

@ -298,13 +298,18 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
// Create proxyAppConn connection (consensus, mempool, query) // Create proxyAppConn connection (consensus, mempool, query)
clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()) clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir())
proxyApp := proxy.NewAppConns(clientCreator, proxyApp := proxy.NewAppConns(clientCreator)
NewHandshaker(stateDB, state, blockStore, gdoc))
err = proxyApp.Start() err = proxyApp.Start()
if err != nil { if err != nil {
cmn.Exit(fmt.Sprintf("Error starting proxy app conns: %v", err)) cmn.Exit(fmt.Sprintf("Error starting proxy app conns: %v", err))
} }
handshaker := NewHandshaker(stateDB, state, blockStore, gdoc)
err = handshaker.Handshake(proxyApp)
if err != nil {
cmn.Exit(fmt.Sprintf("Error on handshake: %v", err))
}
eventBus := types.NewEventBus() eventBus := types.NewEventBus()
if err := eventBus.Start(); err != nil { if err := eventBus.Start(); err != nil {
cmn.Exit(fmt.Sprintf("Failed to start event bus: %v", err)) cmn.Exit(fmt.Sprintf("Failed to start event bus: %v", err))

View File

@ -351,7 +351,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
if nBlocks > 0 { if nBlocks > 0 {
// run nBlocks against a new client to build up the app state. // run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state // use a throwaway tendermint state
proxyApp := proxy.NewAppConns(clientCreator2, nil) proxyApp := proxy.NewAppConns(clientCreator2)
stateDB, state, _ := stateAndStore(config, privVal.GetPubKey()) stateDB, state, _ := stateAndStore(config, privVal.GetPubKey())
buildAppStateFromChain(proxyApp, stateDB, state, chain, nBlocks, mode) buildAppStateFromChain(proxyApp, stateDB, state, chain, nBlocks, mode)
} }
@ -359,11 +359,14 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
// now start the app using the handshake - it should sync // now start the app using the handshake - it should sync
genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile()) genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
handshaker := NewHandshaker(stateDB, state, store, genDoc) handshaker := NewHandshaker(stateDB, state, store, genDoc)
proxyApp := proxy.NewAppConns(clientCreator2, handshaker) proxyApp := proxy.NewAppConns(clientCreator2)
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err) t.Fatalf("Error starting proxy app connections: %v", err)
} }
defer proxyApp.Stop() defer proxyApp.Stop()
if err := handshaker.Handshake(proxyApp); err != nil {
t.Fatalf("Error on abci handshake: %v", err)
}
// get the latest app hash from the app // get the latest app hash from the app
res, err := proxyApp.Query().InfoSync(abci.RequestInfo{Version: ""}) res, err := proxyApp.Query().InfoSync(abci.RequestInfo{Version: ""})
@ -439,7 +442,7 @@ func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB,
func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, chain []*types.Block, mode uint) sm.State { func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, chain []*types.Block, mode uint) sm.State {
// run the whole chain against this client to build up the tendermint state // run the whole chain against this client to build up the tendermint state
clientCreator := proxy.NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(path.Join(config.DBDir(), "1"))) clientCreator := proxy.NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(path.Join(config.DBDir(), "1")))
proxyApp := proxy.NewAppConns(clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock)) proxyApp := proxy.NewAppConns(clientCreator) // sm.NewHandshaker(config, state, store, ReplayLastBlock))
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
panic(err) panic(err)
} }
@ -643,11 +646,14 @@ func TestInitChainUpdateValidators(t *testing.T) {
// now start the app using the handshake - it should sync // now start the app using the handshake - it should sync
genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile()) genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile())
handshaker := NewHandshaker(stateDB, state, store, genDoc) handshaker := NewHandshaker(stateDB, state, store, genDoc)
proxyApp := proxy.NewAppConns(clientCreator, handshaker) proxyApp := proxy.NewAppConns(clientCreator)
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err) t.Fatalf("Error starting proxy app connections: %v", err)
} }
defer proxyApp.Stop() defer proxyApp.Stop()
if err := handshaker.Handshake(proxyApp); err != nil {
t.Fatalf("Error on abci handshake: %v", err)
}
// reload the state, check the validator set was updated // reload the state, check the validator set was updated
state = sm.LoadState(stateDB) state = sm.LoadState(stateDB)

View File

@ -52,13 +52,13 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
return nil, errors.Wrap(err, "failed to make genesis state") return nil, errors.Wrap(err, "failed to make genesis state")
} }
blockStore := bc.NewBlockStore(blockStoreDB) blockStore := bc.NewBlockStore(blockStoreDB)
handshaker := NewHandshaker(stateDB, state, blockStore, genDoc) proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app))
proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app), handshaker)
proxyApp.SetLogger(logger.With("module", "proxy")) proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
return nil, errors.Wrap(err, "failed to start proxy app connections") return nil, errors.Wrap(err, "failed to start proxy app connections")
} }
defer proxyApp.Stop() defer proxyApp.Stop()
eventBus := types.NewEventBus() eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events")) eventBus.SetLogger(logger.With("module", "events"))
if err := eventBus.Start(); err != nil { if err := eventBus.Start(); err != nil {

View File

@ -190,18 +190,22 @@ func NewNode(config *cfg.Config,
return nil, err return nil, err
} }
// Create the proxyApp, which manages connections (consensus, mempool, query) // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
// and sync tendermint and the app by performing a handshake proxyApp := proxy.NewAppConns(clientCreator)
// and replaying any necessary blocks
consensusLogger := logger.With("module", "consensus")
handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger)
proxyApp := proxy.NewAppConns(clientCreator, handshaker)
proxyApp.SetLogger(logger.With("module", "proxy")) proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("Error starting proxy app connections: %v", err) return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
} }
// Create the handshaker, which calls RequestInfo and replays any blocks
// as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger)
if err := handshaker.Handshake(proxyApp); err != nil {
return nil, fmt.Errorf("Error during handshake: %v", err)
}
// reload the state (it may have been updated by the handshake) // reload the state (it may have been updated by the handshake)
state = sm.LoadState(stateDB) state = sm.LoadState(stateDB)

View File

@ -17,26 +17,19 @@ type AppConns interface {
Query() AppConnQuery Query() AppConnQuery
} }
func NewAppConns(clientCreator ClientCreator, handshaker Handshaker) AppConns { func NewAppConns(clientCreator ClientCreator) AppConns {
return NewMultiAppConn(clientCreator, handshaker) return NewMultiAppConn(clientCreator)
} }
//----------------------------- //-----------------------------
// multiAppConn implements AppConns // multiAppConn implements AppConns
type Handshaker interface {
Handshake(AppConns) error
}
// a multiAppConn is made of a few appConns (mempool, consensus, query) // a multiAppConn is made of a few appConns (mempool, consensus, query)
// and manages their underlying abci clients, including the handshake // and manages their underlying abci clients
// which ensures the app and tendermint are synced.
// TODO: on app restart, clients must reboot together // TODO: on app restart, clients must reboot together
type multiAppConn struct { type multiAppConn struct {
cmn.BaseService cmn.BaseService
handshaker Handshaker
mempoolConn *appConnMempool mempoolConn *appConnMempool
consensusConn *appConnConsensus consensusConn *appConnConsensus
queryConn *appConnQuery queryConn *appConnQuery
@ -45,9 +38,8 @@ type multiAppConn struct {
} }
// Make all necessary abci connections to the application // Make all necessary abci connections to the application
func NewMultiAppConn(clientCreator ClientCreator, handshaker Handshaker) *multiAppConn { func NewMultiAppConn(clientCreator ClientCreator) *multiAppConn {
multiAppConn := &multiAppConn{ multiAppConn := &multiAppConn{
handshaker: handshaker,
clientCreator: clientCreator, clientCreator: clientCreator,
} }
multiAppConn.BaseService = *cmn.NewBaseService(nil, "multiAppConn", multiAppConn) multiAppConn.BaseService = *cmn.NewBaseService(nil, "multiAppConn", multiAppConn)
@ -103,10 +95,5 @@ func (app *multiAppConn) OnStart() error {
} }
app.consensusConn = NewAppConnConsensus(concli) app.consensusConn = NewAppConnConsensus(concli)
// ensure app is synced to the latest state
if app.handshaker != nil {
return app.handshaker.Handshake(app)
}
return nil return nil
} }

View File

@ -29,7 +29,7 @@ var (
func TestApplyBlock(t *testing.T) { func TestApplyBlock(t *testing.T) {
cc := proxy.NewLocalClientCreator(kvstore.NewKVStoreApplication()) cc := proxy.NewLocalClientCreator(kvstore.NewKVStoreApplication())
proxyApp := proxy.NewAppConns(cc, nil) proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start() err := proxyApp.Start()
require.Nil(t, err) require.Nil(t, err)
defer proxyApp.Stop() defer proxyApp.Stop()
@ -52,7 +52,7 @@ func TestApplyBlock(t *testing.T) {
func TestBeginBlockValidators(t *testing.T) { func TestBeginBlockValidators(t *testing.T) {
app := &testApp{} app := &testApp{}
cc := proxy.NewLocalClientCreator(app) cc := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(cc, nil) proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start() err := proxyApp.Start()
require.Nil(t, err) require.Nil(t, err)
defer proxyApp.Stop() defer proxyApp.Stop()
@ -105,7 +105,7 @@ func TestBeginBlockValidators(t *testing.T) {
func TestBeginBlockByzantineValidators(t *testing.T) { func TestBeginBlockByzantineValidators(t *testing.T) {
app := &testApp{} app := &testApp{}
cc := proxy.NewLocalClientCreator(app) cc := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(cc, nil) proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start() err := proxyApp.Start()
require.Nil(t, err) require.Nil(t, err)
defer proxyApp.Stop() defer proxyApp.Stop()
@ -239,7 +239,7 @@ func TestUpdateValidators(t *testing.T) {
func TestEndBlockValidatorUpdates(t *testing.T) { func TestEndBlockValidatorUpdates(t *testing.T) {
app := &testApp{} app := &testApp{}
cc := proxy.NewLocalClientCreator(app) cc := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(cc, nil) proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start() err := proxyApp.Start()
require.Nil(t, err) require.Nil(t, err)
defer proxyApp.Stop() defer proxyApp.Stop()