Merge pull request #807 from tendermint/45-change-common-start-signature

service#Start, service#Stop signatures were changed
This commit is contained in:
Ethan Buchman 2017-11-29 20:23:07 +00:00 committed by GitHub
commit f233cde9a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 113 additions and 122 deletions

View File

@ -83,17 +83,17 @@ ensure_tools:
### Formatting, linting, and vetting ### Formatting, linting, and vetting
metalinter: metalinter:
@gometalinter --vendor --deadline=600s --enable-all --disable=lll ./... @gometalinter --vendor --deadline=600s --enable-all --disable=lll ./...
metalinter_test: metalinter_test:
@gometalinter --vendor --deadline=600s --disable-all \ @gometalinter --vendor --deadline=600s --disable-all \
--enable=deadcode \ --enable=deadcode \
--enable=gas \
--enable=misspell \ --enable=misspell \
--enable=safesql \ --enable=safesql \
./... ./...
# --enable=gas \
#--enable=maligned \ #--enable=maligned \
#--enable=dupl \ #--enable=dupl \
#--enable=errcheck \ #--enable=errcheck \

View File

@ -12,7 +12,7 @@ import (
func main() { func main() {
wsc := rpcclient.NewWSClient("127.0.0.1:46657", "/websocket") wsc := rpcclient.NewWSClient("127.0.0.1:46657", "/websocket")
_, err := wsc.Start() err := wsc.Start()
if err != nil { if err != nil {
cmn.Exit(err.Error()) cmn.Exit(err.Error())
} }

View File

@ -311,7 +311,7 @@ func (pool *BlockPool) makeNextRequester() {
pool.requesters[nextHeight] = request pool.requesters[nextHeight] = request
pool.numPending++ pool.numPending++
_, err := request.Start() err := request.Start()
if err != nil { if err != nil {
request.Logger.Error("Error starting request", "err", err) request.Logger.Error("Error starting request", "err", err)
} }

View File

@ -37,7 +37,7 @@ func TestBasic(t *testing.T) {
pool := NewBlockPool(start, requestsCh, timeoutsCh) pool := NewBlockPool(start, requestsCh, timeoutsCh)
pool.SetLogger(log.TestingLogger()) pool.SetLogger(log.TestingLogger())
_, err := pool.Start() err := pool.Start()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -93,7 +93,7 @@ func TestTimeout(t *testing.T) {
requestsCh := make(chan BlockRequest, 100) requestsCh := make(chan BlockRequest, 100)
pool := NewBlockPool(start, requestsCh, timeoutsCh) pool := NewBlockPool(start, requestsCh, timeoutsCh)
pool.SetLogger(log.TestingLogger()) pool.SetLogger(log.TestingLogger())
_, err := pool.Start() err := pool.Start()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }

View File

@ -92,7 +92,7 @@ func (bcR *BlockchainReactor) OnStart() error {
return err return err
} }
if bcR.fastSync { if bcR.fastSync {
_, err := bcR.pool.Start() err := bcR.pool.Start()
if err != nil { if err != nil {
return err return err
} }

View File

@ -49,7 +49,7 @@ func NewRunNodeCmd(nodeProvider nm.NodeProvider) *cobra.Command {
return fmt.Errorf("Failed to create node: %v", err) return fmt.Errorf("Failed to create node: %v", err)
} }
if _, err := n.Start(); err != nil { if err := n.Start(); err != nil {
return fmt.Errorf("Failed to start node: %v", err) return fmt.Errorf("Failed to start node: %v", err)
} else { } else {
logger.Info("Started node", "nodeInfo", n.Switch().NodeInfo()) logger.Info("Started node", "nodeInfo", n.Switch().NodeInfo())

View File

@ -58,7 +58,7 @@ func TestByzantine(t *testing.T) {
eventBus := types.NewEventBus() eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events", "validator", i)) eventBus.SetLogger(logger.With("module", "events", "validator", i))
_, err := eventBus.Start() err := eventBus.Start()
require.NoError(t, err) require.NoError(t, err)
defer eventBus.Stop() defer eventBus.Stop()

View File

@ -464,12 +464,12 @@ type mockTicker struct {
fired bool fired bool
} }
func (m *mockTicker) Start() (bool, error) { func (m *mockTicker) Start() error {
return true, nil return nil
} }
func (m *mockTicker) Stop() bool { func (m *mockTicker) Stop() error {
return true return nil
} }
func (m *mockTicker) ScheduleTimeout(ti timeoutInfo) { func (m *mockTicker) ScheduleTimeout(ti timeoutInfo) {

View File

@ -65,7 +65,7 @@ func (conR *ConsensusReactor) OnStart() error {
} }
if !conR.FastSync() { if !conR.FastSync() {
_, err := conR.conS.Start() err := conR.conS.Start()
if err != nil { if err != nil {
return err return err
} }
@ -97,7 +97,7 @@ func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State, blocksSynced in
// dont bother with the WAL if we fast synced // dont bother with the WAL if we fast synced
conR.conS.doWALCatchup = false conR.conS.doWALCatchup = false
} }
_, err := conR.conS.Start() err := conR.conS.Start()
if err != nil { if err != nil {
conR.Logger.Error("Error starting conS", "err", err) conR.Logger.Error("Error starting conS", "err", err)
} }

View File

@ -41,7 +41,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus
eventBuses[i] = types.NewEventBus() eventBuses[i] = types.NewEventBus()
eventBuses[i].SetLogger(thisLogger.With("module", "events", "validator", i)) eventBuses[i].SetLogger(thisLogger.With("module", "events", "validator", i))
_, err := eventBuses[i].Start() err := eventBuses[i].Start()
require.NoError(t, err) require.NoError(t, err)
reactors[i].SetEventBus(eventBuses[i]) reactors[i].SetEventBus(eventBuses[i])

View File

@ -356,7 +356,6 @@ func (h *Handshaker) replayBlock(height int, proxyApp proxy.AppConnConsensus) ([
func (h *Handshaker) checkAppHash(appHash []byte) error { func (h *Handshaker) checkAppHash(appHash []byte) error {
if !bytes.Equal(h.state.AppHash, appHash) { if !bytes.Equal(h.state.AppHash, appHash) {
panic(errors.New(cmn.Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)).Error()) panic(errors.New(cmn.Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)).Error())
return nil
} }
return nil return nil
} }
@ -371,7 +370,7 @@ func newMockProxyApp(appHash []byte, abciResponses *sm.ABCIResponses) proxy.AppC
abciResponses: abciResponses, abciResponses: abciResponses,
}) })
cli, _ := clientCreator.NewABCIClient() cli, _ := clientCreator.NewABCIClient()
_, err := cli.Start() err := cli.Start()
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -292,13 +292,13 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
// Create proxyAppConn connection (consensus, mempool, query) // Create proxyAppConn connection (consensus, mempool, query)
clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()) clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir())
proxyApp := proxy.NewAppConns(clientCreator, NewHandshaker(state, blockStore)) proxyApp := proxy.NewAppConns(clientCreator, NewHandshaker(state, blockStore))
_, err = proxyApp.Start() err = proxyApp.Start()
if err != nil { if err != nil {
cmn.Exit(cmn.Fmt("Error starting proxy app conns: %v", err)) cmn.Exit(cmn.Fmt("Error starting proxy app conns: %v", err))
} }
eventBus := types.NewEventBus() eventBus := types.NewEventBus()
if _, err := eventBus.Start(); err != nil { if err := eventBus.Start(); err != nil {
cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err)) cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err))
} }

View File

@ -70,7 +70,7 @@ func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int, bl
// fmt.Printf("====== WAL: \n\r%s\n", bytes) // fmt.Printf("====== WAL: \n\r%s\n", bytes)
t.Logf("====== WAL: \n\r%s\n", bytes) t.Logf("====== WAL: \n\r%s\n", bytes)
_, err := cs.Start() err := cs.Start()
require.NoError(t, err) require.NoError(t, err)
defer func() { defer func() {
cs.Stop() cs.Stop()
@ -171,7 +171,7 @@ LOOP:
cs.wal = crashingWal cs.wal = crashingWal
// start consensus state // start consensus state
_, err = cs.Start() err = cs.Start()
require.NoError(t, err) require.NoError(t, err)
i++ i++
@ -257,9 +257,9 @@ func (w *crashingWAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, f
return w.next.SearchForEndHeight(height) return w.next.SearchForEndHeight(height)
} }
func (w *crashingWAL) Start() (bool, error) { return w.next.Start() } func (w *crashingWAL) Start() error { return w.next.Start() }
func (w *crashingWAL) Stop() bool { return w.next.Stop() } func (w *crashingWAL) Stop() error { return w.next.Stop() }
func (w *crashingWAL) Wait() { w.next.Wait() } func (w *crashingWAL) Wait() { w.next.Wait() }
//------------------------------------------------------------------------------------------ //------------------------------------------------------------------------------------------
// Handshake Tests // Handshake Tests
@ -339,7 +339,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
t.Fatal(err) t.Fatal(err)
} }
wal.SetLogger(log.TestingLogger()) wal.SetLogger(log.TestingLogger())
if _, err := wal.Start(); err != nil { if err := wal.Start(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
chain, commits, err := makeBlockchainFromWAL(wal) chain, commits, err := makeBlockchainFromWAL(wal)
@ -368,7 +368,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
// now start the app using the handshake - it should sync // now start the app using the handshake - it should sync
handshaker := NewHandshaker(state, store) handshaker := NewHandshaker(state, store)
proxyApp := proxy.NewAppConns(clientCreator2, handshaker) proxyApp := proxy.NewAppConns(clientCreator2, handshaker)
if _, err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err) t.Fatalf("Error starting proxy app connections: %v", err)
} }
@ -406,7 +406,7 @@ func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) {
func buildAppStateFromChain(proxyApp proxy.AppConns, func buildAppStateFromChain(proxyApp proxy.AppConns,
state *sm.State, chain []*types.Block, nBlocks int, mode uint) { state *sm.State, chain []*types.Block, nBlocks int, mode uint) {
// start a new app without handshake, play nBlocks blocks // start a new app without handshake, play nBlocks blocks
if _, err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
panic(err) panic(err)
} }
@ -441,7 +441,7 @@ func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.B
// run the whole chain against this client to build up the tendermint state // run the whole chain against this client to build up the tendermint state
clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "1"))) clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "1")))
proxyApp := proxy.NewAppConns(clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock)) proxyApp := proxy.NewAppConns(clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock))
if _, err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
panic(err) panic(err)
} }
defer proxyApp.Stop() defer proxyApp.Stop()

View File

@ -229,7 +229,7 @@ func (cs *ConsensusState) OnStart() error {
// NOTE: we will get a build up of garbage go routines // NOTE: we will get a build up of garbage go routines
// firing on the tockChan until the receiveRoutine is started // firing on the tockChan until the receiveRoutine is started
// to deal with them (by that point, at most one will be valid) // to deal with them (by that point, at most one will be valid)
_, err := cs.timeoutTicker.Start() err := cs.timeoutTicker.Start()
if err != nil { if err != nil {
return err return err
} }
@ -257,7 +257,7 @@ func (cs *ConsensusState) OnStart() error {
// timeoutRoutine: receive requests for timeouts on tickChan and fire timeouts on tockChan // timeoutRoutine: receive requests for timeouts on tickChan and fire timeouts on tockChan
// receiveRoutine: serializes processing of proposoals, block parts, votes; coordinates state transitions // receiveRoutine: serializes processing of proposoals, block parts, votes; coordinates state transitions
func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) startRoutines(maxSteps int) {
_, err := cs.timeoutTicker.Start() err := cs.timeoutTicker.Start()
if err != nil { if err != nil {
cs.Logger.Error("Error starting timeout ticker", "err", err) cs.Logger.Error("Error starting timeout ticker", "err", err)
return return
@ -292,7 +292,7 @@ func (cs *ConsensusState) OpenWAL(walFile string) (WAL, error) {
return nil, err return nil, err
} }
wal.SetLogger(cs.Logger.With("wal", walFile)) wal.SetLogger(cs.Logger.With("wal", walFile))
if _, err := wal.Start(); err != nil { if err := wal.Start(); err != nil {
return nil, err return nil, err
} }
return wal, nil return wal, nil

View File

@ -15,8 +15,8 @@ var (
// conditional on the height/round/step in the timeoutInfo. // conditional on the height/round/step in the timeoutInfo.
// The timeoutInfo.Duration may be non-positive. // The timeoutInfo.Duration may be non-positive.
type TimeoutTicker interface { type TimeoutTicker interface {
Start() (bool, error) Start() error
Stop() bool Stop() error
Chan() <-chan timeoutInfo // on which to receive a timeout Chan() <-chan timeoutInfo // on which to receive a timeout
ScheduleTimeout(ti timeoutInfo) // reset the timer ScheduleTimeout(ti timeoutInfo) // reset the timer

View File

@ -54,8 +54,8 @@ type WAL interface {
Group() *auto.Group Group() *auto.Group
SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error)
Start() (bool, error) Start() error
Stop() bool Stop() error
Wait() Wait()
} }
@ -102,7 +102,7 @@ func (wal *baseWAL) OnStart() error {
} else if size == 0 { } else if size == 0 {
wal.Save(EndHeightMessage{0}) wal.Save(EndHeightMessage{0})
} }
_, err = wal.group.Start() err = wal.group.Start()
return err return err
} }
@ -307,6 +307,6 @@ func (nilWAL) Group() *auto.Group { return nil }
func (nilWAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) { func (nilWAL) SearchForEndHeight(height uint64) (gr *auto.GroupReader, found bool, err error) {
return nil, false, nil return nil, false, nil
} }
func (nilWAL) Start() (bool, error) { return true, nil } func (nilWAL) Start() error { return nil }
func (nilWAL) Stop() bool { return true } func (nilWAL) Stop() error { return nil }
func (nilWAL) Wait() {} func (nilWAL) Wait() {}

2
glide.lock generated
View File

@ -123,7 +123,7 @@ imports:
subpackages: subpackages:
- iavl - iavl
- name: github.com/tendermint/tmlibs - name: github.com/tendermint/tmlibs
version: b854baa1fce7101c90b1d301b3359bb412f981c0 version: 1e12754b3a3b5f1c23bf44c2d882faae688fb2e8
subpackages: subpackages:
- autofile - autofile
- cli - cli

View File

@ -34,7 +34,7 @@ import:
subpackages: subpackages:
- iavl - iavl
- package: github.com/tendermint/tmlibs - package: github.com/tendermint/tmlibs
version: ~0.4.1 version: 1e12754b3a3b5f1c23bf44c2d882faae688fb2e8
subpackages: subpackages:
- autofile - autofile
- cli - cli

View File

@ -27,7 +27,7 @@ func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
appConnMem, _ := cc.NewABCIClient() appConnMem, _ := cc.NewABCIClient()
appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool"))
_, err := appConnMem.Start() err := appConnMem.Start()
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -121,7 +121,7 @@ func TestSerialReap(t *testing.T) {
mempool := newMempoolWithApp(cc) mempool := newMempoolWithApp(cc)
appConnCon, _ := cc.NewABCIClient() appConnCon, _ := cc.NewABCIClient()
appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus"))
if _, err := appConnCon.Start(); err != nil { if err := appConnCon.Start(); err != nil {
t.Fatalf("Error starting ABCI client: %v", err.Error()) t.Fatalf("Error starting ABCI client: %v", err.Error())
} }

View File

@ -165,7 +165,7 @@ func NewNode(config *cfg.Config,
handshaker.SetLogger(consensusLogger) handshaker.SetLogger(consensusLogger)
proxyApp := proxy.NewAppConns(clientCreator, handshaker) proxyApp := proxy.NewAppConns(clientCreator, handshaker)
proxyApp.SetLogger(logger.With("module", "proxy")) proxyApp.SetLogger(logger.With("module", "proxy"))
if _, err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("Error starting proxy app connections: %v", err) return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
} }
@ -326,7 +326,7 @@ func NewNode(config *cfg.Config,
// OnStart starts the Node. It implements cmn.Service. // OnStart starts the Node. It implements cmn.Service.
func (n *Node) OnStart() error { func (n *Node) OnStart() error {
_, err := n.eventBus.Start() err := n.eventBus.Start()
if err != nil { if err != nil {
return err return err
} }
@ -349,7 +349,7 @@ func (n *Node) OnStart() error {
// Start the switch // Start the switch
n.sw.SetNodeInfo(n.makeNodeInfo()) n.sw.SetNodeInfo(n.makeNodeInfo())
n.sw.SetNodePrivKey(n.privKey) n.sw.SetNodePrivKey(n.privKey)
_, err = n.sw.Start() err = n.sw.Start()
if err != nil { if err != nil {
return err return err
} }

View File

@ -19,7 +19,7 @@ func TestNodeStartStop(t *testing.T) {
// create & start node // create & start node
n, err := DefaultNewNode(config, log.TestingLogger()) n, err := DefaultNewNode(config, log.TestingLogger())
assert.NoError(t, err, "expected no err on DefaultNewNode") assert.NoError(t, err, "expected no err on DefaultNewNode")
_, err1 := n.Start() err1 := n.Start()
if err1 != nil { if err1 != nil {
t.Error(err1) t.Error(err1)
} }

View File

@ -36,7 +36,7 @@ func TestMConnectionSend(t *testing.T) {
defer client.Close() // nolint: errcheck defer client.Close() // nolint: errcheck
mconn := createTestMConnection(client) mconn := createTestMConnection(client)
_, err := mconn.Start() err := mconn.Start()
require.Nil(err) require.Nil(err)
defer mconn.Stop() defer mconn.Stop()
@ -77,12 +77,12 @@ func TestMConnectionReceive(t *testing.T) {
errorsCh <- r errorsCh <- r
} }
mconn1 := createMConnectionWithCallbacks(client, onReceive, onError) mconn1 := createMConnectionWithCallbacks(client, onReceive, onError)
_, err := mconn1.Start() err := mconn1.Start()
require.Nil(err) require.Nil(err)
defer mconn1.Stop() defer mconn1.Stop()
mconn2 := createTestMConnection(server) mconn2 := createTestMConnection(server)
_, err = mconn2.Start() err = mconn2.Start()
require.Nil(err) require.Nil(err)
defer mconn2.Stop() defer mconn2.Stop()
@ -107,7 +107,7 @@ func TestMConnectionStatus(t *testing.T) {
defer client.Close() // nolint: errcheck defer client.Close() // nolint: errcheck
mconn := createTestMConnection(client) mconn := createTestMConnection(client)
_, err := mconn.Start() err := mconn.Start()
require.Nil(err) require.Nil(err)
defer mconn.Stop() defer mconn.Stop()
@ -132,7 +132,7 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) {
errorsCh <- r errorsCh <- r
} }
mconn := createMConnectionWithCallbacks(client, onReceive, onError) mconn := createMConnectionWithCallbacks(client, onReceive, onError)
_, err := mconn.Start() err := mconn.Start()
require.Nil(err) require.Nil(err)
defer mconn.Stop() defer mconn.Stop()
@ -164,7 +164,7 @@ func newClientAndServerConnsForReadErrors(require *require.Assertions, chOnErr c
} }
mconnClient := NewMConnection(client, chDescs, onReceive, onError) mconnClient := NewMConnection(client, chDescs, onReceive, onError)
mconnClient.SetLogger(log.TestingLogger().With("module", "client")) mconnClient.SetLogger(log.TestingLogger().With("module", "client"))
_, err := mconnClient.Start() err := mconnClient.Start()
require.Nil(err) require.Nil(err)
// create server conn with 1 channel // create server conn with 1 channel
@ -175,7 +175,7 @@ func newClientAndServerConnsForReadErrors(require *require.Assertions, chOnErr c
} }
mconnServer := createMConnectionWithCallbacks(server, onReceive, onError) mconnServer := createMConnectionWithCallbacks(server, onReceive, onError)
mconnServer.SetLogger(serverLogger) mconnServer.SetLogger(serverLogger)
_, err = mconnServer.Start() err = mconnServer.Start()
require.Nil(err) require.Nil(err)
return mconnClient, mconnServer return mconnClient, mconnServer
} }
@ -288,7 +288,7 @@ func TestMConnectionTrySend(t *testing.T) {
defer client.Close() defer client.Close()
mconn := createTestMConnection(client) mconn := createTestMConnection(client)
_, err := mconn.Start() err := mconn.Start()
require.Nil(err) require.Nil(err)
defer mconn.Stop() defer mconn.Stop()

View File

@ -16,7 +16,7 @@ type Listener interface {
InternalAddress() *NetAddress InternalAddress() *NetAddress
ExternalAddress() *NetAddress ExternalAddress() *NetAddress
String() string String() string
Stop() bool Stop() error
} }
// Implements Listener // Implements Listener
@ -100,7 +100,7 @@ func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger log
connections: make(chan net.Conn, numBufferedConnections), connections: make(chan net.Conn, numBufferedConnections),
} }
dl.BaseService = *cmn.NewBaseService(logger, "DefaultListener", dl) dl.BaseService = *cmn.NewBaseService(logger, "DefaultListener", dl)
_, err = dl.Start() // Started upon construction err = dl.Start() // Started upon construction
if err != nil { if err != nil {
logger.Error("Error starting base service", "err", err) logger.Error("Error starting base service", "err", err)
} }

View File

@ -235,7 +235,7 @@ func (p *peer) OnStart() error {
if err := p.BaseService.OnStart(); err != nil { if err := p.BaseService.OnStart(); err != nil {
return err return err
} }
_, err := p.mconn.Start() err := p.mconn.Start()
return err return err
} }

View File

@ -23,7 +23,7 @@ func TestPeerBasic(t *testing.T) {
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), DefaultPeerConfig()) p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), DefaultPeerConfig())
require.Nil(err) require.Nil(err)
_, err = p.Start() err = p.Start()
require.Nil(err) require.Nil(err)
defer p.Stop() defer p.Stop()
@ -50,7 +50,7 @@ func TestPeerWithoutAuthEnc(t *testing.T) {
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config) p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config)
require.Nil(err) require.Nil(err)
_, err = p.Start() err = p.Start()
require.Nil(err) require.Nil(err)
defer p.Stop() defer p.Stop()
@ -71,7 +71,7 @@ func TestPeerSend(t *testing.T) {
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config) p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config)
require.Nil(err) require.Nil(err)
_, err = p.Start() err = p.Start()
require.Nil(err) require.Nil(err)
defer p.Stop() defer p.Stop()

View File

@ -69,8 +69,8 @@ func (r *PEXReactor) OnStart() error {
if err := r.BaseReactor.OnStart(); err != nil { if err := r.BaseReactor.OnStart(); err != nil {
return err return err
} }
_, err := r.book.Start() err := r.book.Start()
if err != nil { if err != nil && err != cmn.ErrAlreadyStarted {
return err return err
} }
go r.ensurePeersRoutine() go r.ensurePeersRoutine()

View File

@ -62,13 +62,11 @@ func TestPEXReactorAddRemovePeer(t *testing.T) {
} }
func TestPEXReactorRunning(t *testing.T) { func TestPEXReactorRunning(t *testing.T) {
require := require.New(t)
N := 3 N := 3
switches := make([]*Switch, N) switches := make([]*Switch, N)
dir, err := ioutil.TempDir("", "pex_reactor") dir, err := ioutil.TempDir("", "pex_reactor")
require.Nil(err) require.Nil(t, err)
defer os.RemoveAll(dir) // nolint: errcheck defer os.RemoveAll(dir) // nolint: errcheck
book := NewAddrBook(dir+"addrbook.json", false) book := NewAddrBook(dir+"addrbook.json", false)
book.SetLogger(log.TestingLogger()) book.SetLogger(log.TestingLogger())
@ -95,8 +93,8 @@ func TestPEXReactorRunning(t *testing.T) {
// start switches // start switches
for _, s := range switches { for _, s := range switches {
_, err := s.Start() // start switch and reactors err := s.Start() // start switch and reactors
require.Nil(err) require.Nil(t, err)
} }
assertSomePeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second) assertSomePeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second)

View File

@ -1,12 +1,13 @@
package p2p package p2p
import ( import (
"errors"
"fmt" "fmt"
"math/rand" "math/rand"
"net" "net"
"time" "time"
"github.com/pkg/errors"
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
@ -174,17 +175,13 @@ func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
// OnStart implements BaseService. It starts all the reactors, peers, and listeners. // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
func (sw *Switch) OnStart() error { func (sw *Switch) OnStart() error {
if err := sw.BaseService.OnStart(); err != nil {
return err
}
// Start reactors // Start reactors
for _, reactor := range sw.reactors { for _, reactor := range sw.reactors {
_, err := reactor.Start() err := reactor.Start()
if err != nil { if err != nil {
return err return errors.Wrapf(err, "failed to start %v", reactor)
} }
} }
// Start listeners // Start listeners
for _, listener := range sw.listeners { for _, listener := range sw.listeners {
go sw.listenerRoutine(listener) go sw.listenerRoutine(listener)
@ -194,7 +191,6 @@ func (sw *Switch) OnStart() error {
// OnStop implements BaseService. It stops all listeners, peers, and reactors. // OnStop implements BaseService. It stops all listeners, peers, and reactors.
func (sw *Switch) OnStop() { func (sw *Switch) OnStop() {
sw.BaseService.OnStop()
// Stop listeners // Stop listeners
for _, listener := range sw.listeners { for _, listener := range sw.listeners {
listener.Stop() listener.Stop()
@ -289,7 +285,7 @@ func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
} }
func (sw *Switch) startInitPeer(peer *peer) { func (sw *Switch) startInitPeer(peer *peer) {
_, err := peer.Start() // spawn send/recv routines err := peer.Start() // spawn send/recv routines
if err != nil { if err != nil {
// Should never happen // Should never happen
sw.Logger.Error("Error starting peer", "peer", peer, "err", err) sw.Logger.Error("Error starting peer", "peer", peer, "err", err)
@ -547,7 +543,7 @@ func Connect2Switches(switches []*Switch, i, j int) {
// It returns the first encountered error. // It returns the first encountered error.
func StartSwitches(switches []*Switch) error { func StartSwitches(switches []*Switch) error {
for _, s := range switches { for _, s := range switches {
_, err := s.Start() // start switch and reactors err := s.Start() // start switch and reactors
if err != nil { if err != nil {
return err return err
} }

View File

@ -225,7 +225,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
assert, require := assert.New(t), require.New(t) assert, require := assert.New(t), require.New(t)
sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
_, err := sw.Start() err := sw.Start()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -252,7 +252,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
assert, require := assert.New(t), require.New(t) assert, require := assert.New(t), require.New(t)
sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
_, err := sw.Start() err := sw.Start()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }

View File

@ -51,7 +51,7 @@ func TestEcho(t *testing.T) {
// Start server // Start server
s := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) s := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
s.SetLogger(log.TestingLogger().With("module", "abci-server")) s.SetLogger(log.TestingLogger().With("module", "abci-server"))
if _, err := s.Start(); err != nil { if err := s.Start(); err != nil {
t.Fatalf("Error starting socket server: %v", err.Error()) t.Fatalf("Error starting socket server: %v", err.Error())
} }
defer s.Stop() defer s.Stop()
@ -62,7 +62,7 @@ func TestEcho(t *testing.T) {
t.Fatalf("Error creating ABCI client: %v", err.Error()) t.Fatalf("Error creating ABCI client: %v", err.Error())
} }
cli.SetLogger(log.TestingLogger().With("module", "abci-client")) cli.SetLogger(log.TestingLogger().With("module", "abci-client"))
if _, err := cli.Start(); err != nil { if err := cli.Start(); err != nil {
t.Fatalf("Error starting ABCI client: %v", err.Error()) t.Fatalf("Error starting ABCI client: %v", err.Error())
} }
@ -85,7 +85,7 @@ func BenchmarkEcho(b *testing.B) {
// Start server // Start server
s := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) s := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
s.SetLogger(log.TestingLogger().With("module", "abci-server")) s.SetLogger(log.TestingLogger().With("module", "abci-server"))
if _, err := s.Start(); err != nil { if err := s.Start(); err != nil {
b.Fatalf("Error starting socket server: %v", err.Error()) b.Fatalf("Error starting socket server: %v", err.Error())
} }
defer s.Stop() defer s.Stop()
@ -96,7 +96,7 @@ func BenchmarkEcho(b *testing.B) {
b.Fatalf("Error creating ABCI client: %v", err.Error()) b.Fatalf("Error creating ABCI client: %v", err.Error())
} }
cli.SetLogger(log.TestingLogger().With("module", "abci-client")) cli.SetLogger(log.TestingLogger().With("module", "abci-client"))
if _, err := cli.Start(); err != nil { if err := cli.Start(); err != nil {
b.Fatalf("Error starting ABCI client: %v", err.Error()) b.Fatalf("Error starting ABCI client: %v", err.Error())
} }
@ -124,7 +124,7 @@ func TestInfo(t *testing.T) {
// Start server // Start server
s := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) s := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
s.SetLogger(log.TestingLogger().With("module", "abci-server")) s.SetLogger(log.TestingLogger().With("module", "abci-server"))
if _, err := s.Start(); err != nil { if err := s.Start(); err != nil {
t.Fatalf("Error starting socket server: %v", err.Error()) t.Fatalf("Error starting socket server: %v", err.Error())
} }
defer s.Stop() defer s.Stop()
@ -135,7 +135,7 @@ func TestInfo(t *testing.T) {
t.Fatalf("Error creating ABCI client: %v", err.Error()) t.Fatalf("Error creating ABCI client: %v", err.Error())
} }
cli.SetLogger(log.TestingLogger().With("module", "abci-client")) cli.SetLogger(log.TestingLogger().With("module", "abci-client"))
if _, err := cli.Start(); err != nil { if err := cli.Start(); err != nil {
t.Fatalf("Error starting ABCI client: %v", err.Error()) t.Fatalf("Error starting ABCI client: %v", err.Error())
} }

View File

@ -76,7 +76,7 @@ func (app *multiAppConn) OnStart() error {
return errors.Wrap(err, "Error creating ABCI client (query connection)") return errors.Wrap(err, "Error creating ABCI client (query connection)")
} }
querycli.SetLogger(app.Logger.With("module", "abci-client", "connection", "query")) querycli.SetLogger(app.Logger.With("module", "abci-client", "connection", "query"))
if _, err := querycli.Start(); err != nil { if err := querycli.Start(); err != nil {
return errors.Wrap(err, "Error starting ABCI client (query connection)") return errors.Wrap(err, "Error starting ABCI client (query connection)")
} }
app.queryConn = NewAppConnQuery(querycli) app.queryConn = NewAppConnQuery(querycli)
@ -87,7 +87,7 @@ func (app *multiAppConn) OnStart() error {
return errors.Wrap(err, "Error creating ABCI client (mempool connection)") return errors.Wrap(err, "Error creating ABCI client (mempool connection)")
} }
memcli.SetLogger(app.Logger.With("module", "abci-client", "connection", "mempool")) memcli.SetLogger(app.Logger.With("module", "abci-client", "connection", "mempool"))
if _, err := memcli.Start(); err != nil { if err := memcli.Start(); err != nil {
return errors.Wrap(err, "Error starting ABCI client (mempool connection)") return errors.Wrap(err, "Error starting ABCI client (mempool connection)")
} }
app.mempoolConn = NewAppConnMempool(memcli) app.mempoolConn = NewAppConnMempool(memcli)
@ -98,7 +98,7 @@ func (app *multiAppConn) OnStart() error {
return errors.Wrap(err, "Error creating ABCI client (consensus connection)") return errors.Wrap(err, "Error creating ABCI client (consensus connection)")
} }
concli.SetLogger(app.Logger.With("module", "abci-client", "connection", "consensus")) concli.SetLogger(app.Logger.With("module", "abci-client", "connection", "consensus"))
if _, err := concli.Start(); err != nil { if err := concli.Start(); err != nil {
return errors.Wrap(err, "Error starting ABCI client (consensus connection)") return errors.Wrap(err, "Error starting ABCI client (consensus connection)")
} }
app.consensusConn = NewAppConnConsensus(concli) app.consensusConn = NewAppConnConsensus(concli)

View File

@ -27,9 +27,8 @@ func TestHeaderEvents(t *testing.T) {
// start for this test it if it wasn't already running // start for this test it if it wasn't already running
if !c.IsRunning() { if !c.IsRunning() {
// if so, then we start it, listen, and stop it. // if so, then we start it, listen, and stop it.
st, err := c.Start() err := c.Start()
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
require.True(st, "%d", i)
defer c.Stop() defer c.Stop()
} }
@ -48,9 +47,8 @@ func TestBlockEvents(t *testing.T) {
// start for this test it if it wasn't already running // start for this test it if it wasn't already running
if !c.IsRunning() { if !c.IsRunning() {
// if so, then we start it, listen, and stop it. // if so, then we start it, listen, and stop it.
st, err := c.Start() err := c.Start()
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
require.True(st, "%d", i)
defer c.Stop() defer c.Stop()
} }
@ -80,9 +78,8 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) {
// start for this test it if it wasn't already running // start for this test it if it wasn't already running
if !c.IsRunning() { if !c.IsRunning() {
// if so, then we start it, listen, and stop it. // if so, then we start it, listen, and stop it.
st, err := c.Start() err := c.Start()
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
require.True(st, "%d", i)
defer c.Stop() defer c.Stop()
} }
@ -113,9 +110,8 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) {
// start for this test it if it wasn't already running // start for this test it if it wasn't already running
if !c.IsRunning() { if !c.IsRunning() {
// if so, then we start it, listen, and stop it. // if so, then we start it, listen, and stop it.
st, err := c.Start() err := c.Start()
require.Nil(err, "%d: %+v", i, err) require.Nil(err, "%d: %+v", i, err)
require.True(st, "%d", i)
defer c.Stop() defer c.Stop()
} }

View File

@ -215,26 +215,26 @@ func newWSEvents(remote, endpoint string) *WSEvents {
// Start is the only way I could think the extend OnStart from // Start is the only way I could think the extend OnStart from
// events.eventSwitch. If only it wasn't private... // events.eventSwitch. If only it wasn't private...
// BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start // BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
func (w *WSEvents) Start() (bool, error) { func (w *WSEvents) Start() error {
ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() { ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
w.redoSubscriptions() w.redoSubscriptions()
})) }))
started, err := ws.Start() err := ws.Start()
if err == nil { if err == nil {
w.ws = ws w.ws = ws
go w.eventListener() go w.eventListener()
} }
return started, errors.Wrap(err, "StartWSEvent") return err
} }
// Stop wraps the BaseService/eventSwitch actions as Start does // Stop wraps the BaseService/eventSwitch actions as Start does
func (w *WSEvents) Stop() bool { func (w *WSEvents) Stop() error {
// send a message to quit to stop the eventListener // send a message to quit to stop the eventListener
w.quit <- true w.quit <- true
<-w.done <-w.done
w.ws.Stop() w.ws.Stop()
w.ws = nil w.ws = nil
return true return nil
} }
func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error {

View File

@ -47,10 +47,10 @@ type WSClient struct {
onReconnect func() onReconnect func()
// internal channels // internal channels
send chan types.RPCRequest // user requests send chan types.RPCRequest // user requests
backlog chan types.RPCRequest // stores a single user request received during a conn failure backlog chan types.RPCRequest // stores a single user request received during a conn failure
reconnectAfter chan error // reconnect requests reconnectAfter chan error // reconnect requests
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
wg sync.WaitGroup wg sync.WaitGroup
@ -168,12 +168,14 @@ func (c *WSClient) OnStop() {}
// Stop overrides cmn.Service#Stop. There is no other way to wait until Quit // Stop overrides cmn.Service#Stop. There is no other way to wait until Quit
// channel is closed. // channel is closed.
func (c *WSClient) Stop() bool { func (c *WSClient) Stop() error {
success := c.BaseService.Stop() err := c.BaseService.Stop()
// only close user-facing channels when we can't write to them if err == nil {
c.wg.Wait() // only close user-facing channels when we can't write to them
close(c.ResponsesCh) c.wg.Wait()
return success close(c.ResponsesCh)
}
return err
} }
// IsReconnecting returns true if the client is reconnecting right now. // IsReconnecting returns true if the client is reconnecting right now.

View File

@ -196,7 +196,7 @@ func TestNotBlockingOnStop(t *testing.T) {
func startClient(t *testing.T, addr net.Addr) *WSClient { func startClient(t *testing.T, addr net.Addr) *WSClient {
c := NewWSClient(addr.String(), "/websocket") c := NewWSClient(addr.String(), "/websocket")
_, err := c.Start() err := c.Start()
require.Nil(t, err) require.Nil(t, err)
c.SetLogger(log.TestingLogger()) c.SetLogger(log.TestingLogger())
return c return c

View File

@ -278,7 +278,7 @@ func TestServersAndClientsBasic(t *testing.T) {
cl3 := client.NewWSClient(addr, websocketEndpoint) cl3 := client.NewWSClient(addr, websocketEndpoint)
cl3.SetLogger(log.TestingLogger()) cl3.SetLogger(log.TestingLogger())
_, err := cl3.Start() err := cl3.Start()
require.Nil(t, err) require.Nil(t, err)
fmt.Printf("=== testing server on %s using %v client", addr, cl3) fmt.Printf("=== testing server on %s using %v client", addr, cl3)
testWithWSClient(t, cl3) testWithWSClient(t, cl3)
@ -307,7 +307,7 @@ func TestQuotedStringArg(t *testing.T) {
func TestWSNewWSRPCFunc(t *testing.T) { func TestWSNewWSRPCFunc(t *testing.T) {
cl := client.NewWSClient(tcpAddr, websocketEndpoint) cl := client.NewWSClient(tcpAddr, websocketEndpoint)
cl.SetLogger(log.TestingLogger()) cl.SetLogger(log.TestingLogger())
_, err := cl.Start() err := cl.Start()
require.Nil(t, err) require.Nil(t, err)
defer cl.Stop() defer cl.Stop()
@ -332,7 +332,7 @@ func TestWSNewWSRPCFunc(t *testing.T) {
func TestWSHandlesArrayParams(t *testing.T) { func TestWSHandlesArrayParams(t *testing.T) {
cl := client.NewWSClient(tcpAddr, websocketEndpoint) cl := client.NewWSClient(tcpAddr, websocketEndpoint)
cl.SetLogger(log.TestingLogger()) cl.SetLogger(log.TestingLogger())
_, err := cl.Start() err := cl.Start()
require.Nil(t, err) require.Nil(t, err)
defer cl.Stop() defer cl.Stop()
@ -357,7 +357,7 @@ func TestWSHandlesArrayParams(t *testing.T) {
func TestWSClientPingPong(t *testing.T) { func TestWSClientPingPong(t *testing.T) {
cl := client.NewWSClient(tcpAddr, websocketEndpoint) cl := client.NewWSClient(tcpAddr, websocketEndpoint)
cl.SetLogger(log.TestingLogger()) cl.SetLogger(log.TestingLogger())
_, err := cl.Start() err := cl.Start()
require.Nil(t, err) require.Nil(t, err)
defer cl.Stop() defer cl.Stop()

View File

@ -723,7 +723,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ
con := NewWSConnection(wsConn, wm.funcMap, wm.wsConnOptions...) con := NewWSConnection(wsConn, wm.funcMap, wm.wsConnOptions...)
con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr())) con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr()))
wm.logger.Info("New websocket connection", "remote", con.remoteAddr) wm.logger.Info("New websocket connection", "remote", con.remoteAddr)
_, err = con.Start() // Blocking err = con.Start() // Blocking
if err != nil { if err != nil {
wm.logger.Error("Error starting connection", "err", err) wm.logger.Error("Error starting connection", "err", err)
} }

View File

@ -92,7 +92,7 @@ func GetGRPCClient() core_grpc.BroadcastAPIClient {
// StartTendermint starts a test tendermint server in a go routine and returns when it is initialized // StartTendermint starts a test tendermint server in a go routine and returns when it is initialized
func StartTendermint(app abci.Application) *nm.Node { func StartTendermint(app abci.Application) *nm.Node {
node := NewTendermint(app) node := NewTendermint(app)
_, err := node.Start() err := node.Start()
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -25,7 +25,7 @@ var (
func TestApplyBlock(t *testing.T) { func TestApplyBlock(t *testing.T) {
cc := proxy.NewLocalClientCreator(dummy.NewDummyApplication()) cc := proxy.NewLocalClientCreator(dummy.NewDummyApplication())
proxyApp := proxy.NewAppConns(cc, nil) proxyApp := proxy.NewAppConns(cc, nil)
_, err := proxyApp.Start() err := proxyApp.Start()
require.Nil(t, err) require.Nil(t, err)
defer proxyApp.Stop() defer proxyApp.Stop()