abci: localClient improvements & bugfixes & pubsub Unsubscribe issues (#2748)

* use READ lock/unlock in ConsensusState#GetLastHeight

Refs #2721

* do not use defers when there's no need

* fix peer formatting (output its address instead of the pointer)

```
[54310]: E[11-02|11:59:39.851] Connection failed @ sendRoutine              module=p2p peer=0xb78f00 conn=MConn{74.207.236.148:26656} err="pong timeout"
```

https://github.com/tendermint/tendermint/issues/2721#issuecomment-435326581

* panic if peer has no state

https://github.com/tendermint/tendermint/issues/2721#issuecomment-435347165

It's confusing that sometimes we check if peer has a state, but most of
the times we expect it to be there

1. add79700b5/mempool/reactor.go (L138)
2. add79700b5/rpc/core/consensus.go (L196) (edited)

I will change everything to always assume peer has a state and panic
otherwise

that should help identify issues earlier

* abci/localclient: extend lock on app callback

App callback should be protected by lock as well (note this was already
done for InitChainAsync, why not for others???). Otherwise, when we
execute the block, tx might come in and call the callback in the same
time we're updating it in execBlockOnProxyApp => DATA RACE

Fixes #2721

Consensus state is locked

```
goroutine 113333 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00180009c, 0xc0000c7e00)
        /usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*RWMutex).RLock(0xc001800090)
        /usr/local/go/src/sync/rwmutex.go:50 +0x4e
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).GetRoundState(0xc001800000, 0x0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:218 +0x46
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).queryMaj23Routine(0xc0017def80, 0x11104a0, 0xc0072488f0, 0xc007248
9c0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:735 +0x16d
created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).AddPeer
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:172 +0x236
```

because localClient is locked

```
goroutine 1899 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00003363c, 0xc0000cb500)
        /usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc000033638)
        /usr/local/go/src/sync/mutex.go:134 +0xff
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).SetResponseCallback(0xc0001fb560, 0xc007868540)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:32 +0x33
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnConsensus).SetResponseCallback(0xc00002f750, 0xc007868540)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:57 +0x40
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.execBlockOnProxyApp(0x1104e20, 0xc002ca0ba0, 0x11092a0, 0xc00002f750, 0xc0001fe960, 0xc000bfc660, 0x110cfe0, 0xc000090330, 0xc9d12, 0xc000d9d5a0, ...)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:230 +0x1fd
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.(*BlockExecutor).ApplyBlock(0xc002c2a230, 0x7, 0x0, 0xc000eae880, 0x6, 0xc002e52c60, 0x16, 0x1f927, 0xc9d12, 0xc000d9d5a0, ...)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:96 +0x142
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).finalizeCommit(0xc001800000, 0x1f928)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1339 +0xa3e
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryFinalizeCommit(0xc001800000, 0x1f928)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1270 +0x451
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit.func1(0xc001800000, 0x0, 0x1f928)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1218 +0x90
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit(0xc001800000, 0x1f928, 0x0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1247 +0x6b8
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xc003bc7ad0, 0xc003bc7b10)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1659 +0xbad
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xf1, 0xf1)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1517 +0x59
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg(0xc001800000, 0xd98200, 0xc0070dbed0, 0xc000cf4cc0, 0x28)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:660 +0x64b
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine(0xc001800000, 0x0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:617 +0x670
created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).OnStart
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:311 +0x132
```

tx comes in and CheckTx is executed right when we execute the block

```
goroutine 111044 [semacquire, 309 minutes]:
sync.runtime_SemacquireMutex(0xc00003363c, 0x0)
        /usr/local/go/src/runtime/sema.go:71 +0x3d
sync.(*Mutex).Lock(0xc000033638)
        /usr/local/go/src/sync/mutex.go:134 +0xff
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).CheckTxAsync(0xc0001fb0e0, 0xc002d94500, 0x13f, 0x280, 0x0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:85 +0x47
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnMempool).CheckTxAsync(0xc00002f720, 0xc002d94500, 0x13f, 0x280, 0x1)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:114 +0x51
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool.(*Mempool).CheckTx(0xc002d3a320, 0xc002d94500, 0x13f, 0x280, 0xc0072355f0, 0x0, 0x0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool/mempool.go:316 +0x17b
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core.BroadcastTxSync(0xc002d94500, 0x13f, 0x280, 0x0, 0x0, 0x0)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core/mempool.go:93 +0xb8
reflect.Value.call(0xd85560, 0x10326c0, 0x13, 0xec7b8b, 0x4, 0xc00663f180, 0x1, 0x1, 0xc00663f180, 0xc00663f188, ...)
        /usr/local/go/src/reflect/value.go:447 +0x449
reflect.Value.Call(0xd85560, 0x10326c0, 0x13, 0xc00663f180, 0x1, 0x1, 0x0, 0x0, 0xc005cc9344)
        /usr/local/go/src/reflect/value.go:308 +0xa4
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.makeHTTPHandler.func2(0x1102060, 0xc00663f100, 0xc0082d7900)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/handlers.go:269 +0x188
net/http.HandlerFunc.ServeHTTP(0xc002c81f20, 0x1102060, 0xc00663f100, 0xc0082d7900)
        /usr/local/go/src/net/http/server.go:1964 +0x44
net/http.(*ServeMux).ServeHTTP(0xc002c81b60, 0x1102060, 0xc00663f100, 0xc0082d7900)
        /usr/local/go/src/net/http/server.go:2361 +0x127
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.maxBytesHandler.ServeHTTP(0x10f8a40, 0xc002c81b60, 0xf4240, 0x1102060, 0xc00663f100, 0xc0082d7900)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:219 +0xcf
github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.RecoverAndLogHandler.func1(0x1103220, 0xc00121e620, 0xc0082d7900)
        /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:192 +0x394
net/http.HandlerFunc.ServeHTTP(0xc002c06ea0, 0x1103220, 0xc00121e620, 0xc0082d7900)
        /usr/local/go/src/net/http/server.go:1964 +0x44
net/http.serverHandler.ServeHTTP(0xc001a1aa90, 0x1103220, 0xc00121e620, 0xc0082d7900)
        /usr/local/go/src/net/http/server.go:2741 +0xab
net/http.(*conn).serve(0xc00785a3c0, 0x11041a0, 0xc000f844c0)
        /usr/local/go/src/net/http/server.go:1847 +0x646
created by net/http.(*Server).Serve
        /usr/local/go/src/net/http/server.go:2851 +0x2f5
```

* consensus: use read lock in Receive#VoteMessage

* use defer to unlock mutex because application might panic

* use defer in every method of the localClient

* add a changelog entry

* drain channels before Unsubscribe(All)

Read 55362ed766/libs/pubsub/pubsub.go (L13)
for the detailed explanation of the issue.

We'll need to fix it someday. Make sure to keep an eye on
https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-033-pubsub.md

* retry instead of panic when peer has no state in reactors other than consensus

in /dump_consensus_state RPC endpoint, skip a peer with no state

* rpc/core/mempool: simplify error messages

* rpc/core/mempool: use time.After instead of timer

also, do not log DeliverTx result (to be consistent with other memthods)

* unlock before calling the callback in reqRes#SetCallback
This commit is contained in:
Anton Kaliaev 2018-11-13 20:32:51 +04:00 committed by Ethan Buchman
parent fb10209a96
commit 5a6822c8ac
18 changed files with 217 additions and 96 deletions

View File

@ -26,3 +26,6 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
### BUG FIXES: ### BUG FIXES:
- [abci] unlock mutex in localClient so even when app panics (e.g. during CheckTx), consensus continue working
- [abci] fix DATA RACE in localClient
- [rpc] drain channel before calling Unsubscribe(All) in /broadcast_tx_commit

View File

@ -105,8 +105,8 @@ func (reqRes *ReqRes) SetCallback(cb func(res *types.Response)) {
return return
} }
defer reqRes.mtx.Unlock()
reqRes.cb = cb reqRes.cb = cb
reqRes.mtx.Unlock()
} }
func (reqRes *ReqRes) GetCallback() func(*types.Response) { func (reqRes *ReqRes) GetCallback() func(*types.Response) {

View File

@ -111,8 +111,8 @@ func (cli *grpcClient) Error() error {
// NOTE: callback may get internally generated flush responses. // NOTE: callback may get internally generated flush responses.
func (cli *grpcClient) SetResponseCallback(resCb Callback) { func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock() cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb cli.resCb = resCb
cli.mtx.Unlock()
} }
//---------------------------------------- //----------------------------------------

View File

@ -9,8 +9,13 @@ import (
var _ Client = (*localClient)(nil) var _ Client = (*localClient)(nil)
// NOTE: use defer to unlock mutex because Application might panic (e.g., in
// case of malicious tx or query). It only makes sense for publicly exposed
// methods like CheckTx (/broadcast_tx_* RPC endpoint) or Query (/abci_query
// RPC endpoint), but defers are used everywhere for the sake of consistency.
type localClient struct { type localClient struct {
cmn.BaseService cmn.BaseService
mtx *sync.Mutex mtx *sync.Mutex
types.Application types.Application
Callback Callback
@ -30,8 +35,8 @@ func NewLocalClient(mtx *sync.Mutex, app types.Application) *localClient {
func (app *localClient) SetResponseCallback(cb Callback) { func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
app.Callback = cb app.Callback = cb
app.mtx.Unlock()
} }
// TODO: change types.Application to include Error()? // TODO: change types.Application to include Error()?
@ -45,6 +50,9 @@ func (app *localClient) FlushAsync() *ReqRes {
} }
func (app *localClient) EchoAsync(msg string) *ReqRes { func (app *localClient) EchoAsync(msg string) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()
return app.callback( return app.callback(
types.ToRequestEcho(msg), types.ToRequestEcho(msg),
types.ToResponseEcho(msg), types.ToResponseEcho(msg),
@ -53,8 +61,9 @@ func (app *localClient) EchoAsync(msg string) *ReqRes {
func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes { func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.Info(req) res := app.Application.Info(req)
app.mtx.Unlock()
return app.callback( return app.callback(
types.ToRequestInfo(req), types.ToRequestInfo(req),
types.ToResponseInfo(res), types.ToResponseInfo(res),
@ -63,8 +72,9 @@ func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes {
func (app *localClient) SetOptionAsync(req types.RequestSetOption) *ReqRes { func (app *localClient) SetOptionAsync(req types.RequestSetOption) *ReqRes {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.SetOption(req) res := app.Application.SetOption(req)
app.mtx.Unlock()
return app.callback( return app.callback(
types.ToRequestSetOption(req), types.ToRequestSetOption(req),
types.ToResponseSetOption(res), types.ToResponseSetOption(res),
@ -73,8 +83,9 @@ func (app *localClient) SetOptionAsync(req types.RequestSetOption) *ReqRes {
func (app *localClient) DeliverTxAsync(tx []byte) *ReqRes { func (app *localClient) DeliverTxAsync(tx []byte) *ReqRes {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.DeliverTx(tx) res := app.Application.DeliverTx(tx)
app.mtx.Unlock()
return app.callback( return app.callback(
types.ToRequestDeliverTx(tx), types.ToRequestDeliverTx(tx),
types.ToResponseDeliverTx(res), types.ToResponseDeliverTx(res),
@ -83,8 +94,9 @@ func (app *localClient) DeliverTxAsync(tx []byte) *ReqRes {
func (app *localClient) CheckTxAsync(tx []byte) *ReqRes { func (app *localClient) CheckTxAsync(tx []byte) *ReqRes {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.CheckTx(tx) res := app.Application.CheckTx(tx)
app.mtx.Unlock()
return app.callback( return app.callback(
types.ToRequestCheckTx(tx), types.ToRequestCheckTx(tx),
types.ToResponseCheckTx(res), types.ToResponseCheckTx(res),
@ -93,8 +105,9 @@ func (app *localClient) CheckTxAsync(tx []byte) *ReqRes {
func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes { func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.Query(req) res := app.Application.Query(req)
app.mtx.Unlock()
return app.callback( return app.callback(
types.ToRequestQuery(req), types.ToRequestQuery(req),
types.ToResponseQuery(res), types.ToResponseQuery(res),
@ -103,8 +116,9 @@ func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes {
func (app *localClient) CommitAsync() *ReqRes { func (app *localClient) CommitAsync() *ReqRes {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.Commit() res := app.Application.Commit()
app.mtx.Unlock()
return app.callback( return app.callback(
types.ToRequestCommit(), types.ToRequestCommit(),
types.ToResponseCommit(res), types.ToResponseCommit(res),
@ -113,19 +127,20 @@ func (app *localClient) CommitAsync() *ReqRes {
func (app *localClient) InitChainAsync(req types.RequestInitChain) *ReqRes { func (app *localClient) InitChainAsync(req types.RequestInitChain) *ReqRes {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.InitChain(req) res := app.Application.InitChain(req)
reqRes := app.callback( return app.callback(
types.ToRequestInitChain(req), types.ToRequestInitChain(req),
types.ToResponseInitChain(res), types.ToResponseInitChain(res),
) )
app.mtx.Unlock()
return reqRes
} }
func (app *localClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes { func (app *localClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.BeginBlock(req) res := app.Application.BeginBlock(req)
app.mtx.Unlock()
return app.callback( return app.callback(
types.ToRequestBeginBlock(req), types.ToRequestBeginBlock(req),
types.ToResponseBeginBlock(res), types.ToResponseBeginBlock(res),
@ -134,8 +149,9 @@ func (app *localClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes {
func (app *localClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes { func (app *localClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.EndBlock(req) res := app.Application.EndBlock(req)
app.mtx.Unlock()
return app.callback( return app.callback(
types.ToRequestEndBlock(req), types.ToRequestEndBlock(req),
types.ToResponseEndBlock(res), types.ToResponseEndBlock(res),
@ -154,64 +170,73 @@ func (app *localClient) EchoSync(msg string) (*types.ResponseEcho, error) {
func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.Info(req) res := app.Application.Info(req)
app.mtx.Unlock()
return &res, nil return &res, nil
} }
func (app *localClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) { func (app *localClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.SetOption(req) res := app.Application.SetOption(req)
app.mtx.Unlock()
return &res, nil return &res, nil
} }
func (app *localClient) DeliverTxSync(tx []byte) (*types.ResponseDeliverTx, error) { func (app *localClient) DeliverTxSync(tx []byte) (*types.ResponseDeliverTx, error) {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.DeliverTx(tx) res := app.Application.DeliverTx(tx)
app.mtx.Unlock()
return &res, nil return &res, nil
} }
func (app *localClient) CheckTxSync(tx []byte) (*types.ResponseCheckTx, error) { func (app *localClient) CheckTxSync(tx []byte) (*types.ResponseCheckTx, error) {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.CheckTx(tx) res := app.Application.CheckTx(tx)
app.mtx.Unlock()
return &res, nil return &res, nil
} }
func (app *localClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) { func (app *localClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.Query(req) res := app.Application.Query(req)
app.mtx.Unlock()
return &res, nil return &res, nil
} }
func (app *localClient) CommitSync() (*types.ResponseCommit, error) { func (app *localClient) CommitSync() (*types.ResponseCommit, error) {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.Commit() res := app.Application.Commit()
app.mtx.Unlock()
return &res, nil return &res, nil
} }
func (app *localClient) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) { func (app *localClient) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.InitChain(req) res := app.Application.InitChain(req)
app.mtx.Unlock()
return &res, nil return &res, nil
} }
func (app *localClient) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) { func (app *localClient) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.BeginBlock(req) res := app.Application.BeginBlock(req)
app.mtx.Unlock()
return &res, nil return &res, nil
} }
func (app *localClient) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) { func (app *localClient) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) {
app.mtx.Lock() app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.EndBlock(req) res := app.Application.EndBlock(req)
app.mtx.Unlock()
return &res, nil return &res, nil
} }

View File

@ -118,8 +118,8 @@ func (cli *socketClient) Error() error {
// NOTE: callback may get internally generated flush responses. // NOTE: callback may get internally generated flush responses.
func (cli *socketClient) SetResponseCallback(resCb Callback) { func (cli *socketClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock() cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb cli.resCb = resCb
cli.mtx.Unlock()
} }
//---------------------------------------- //----------------------------------------

View File

@ -183,7 +183,11 @@ func (conR *ConsensusReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
return return
} }
// TODO // TODO
//peer.Get(PeerStateKey).(*PeerState).Disconnect() // ps, ok := peer.Get(PeerStateKey).(*PeerState)
// if !ok {
// panic(fmt.Sprintf("Peer %v has no state", peer))
// }
// ps.Disconnect()
} }
// Receive implements Reactor // Receive implements Reactor
@ -214,7 +218,10 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
// Get peer states // Get peer states
ps := src.Get(types.PeerStateKey).(*PeerState) ps, ok := src.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", src))
}
switch chID { switch chID {
case StateChannel: case StateChannel:
@ -293,9 +300,9 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
switch msg := msg.(type) { switch msg := msg.(type) {
case *VoteMessage: case *VoteMessage:
cs := conR.conS cs := conR.conS
cs.mtx.Lock() cs.mtx.RLock()
height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size() height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
cs.mtx.Unlock() cs.mtx.RUnlock()
ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote) ps.SetHasVote(msg.Vote)
@ -428,7 +435,10 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) {
/* /*
// TODO: Make this broadcast more selective. // TODO: Make this broadcast more selective.
for _, peer := range conR.Switch.Peers().List() { for _, peer := range conR.Switch.Peers().List() {
ps := peer.Get(PeerStateKey).(*PeerState) ps, ok := peer.Get(PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", peer))
}
prs := ps.GetRoundState() prs := ps.GetRoundState()
if prs.Height == vote.Height { if prs.Height == vote.Height {
// TODO: Also filter on round? // TODO: Also filter on round?
@ -826,7 +836,10 @@ func (conR *ConsensusReactor) peerStatsRoutine() {
continue continue
} }
// Get peer state // Get peer state
ps := peer.Get(types.PeerStateKey).(*PeerState) ps, ok := peer.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", peer))
}
switch msg.Msg.(type) { switch msg.Msg.(type) {
case *VoteMessage: case *VoteMessage:
if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 { if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 {
@ -859,7 +872,10 @@ func (conR *ConsensusReactor) StringIndented(indent string) string {
s := "ConsensusReactor{\n" s := "ConsensusReactor{\n"
s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n" s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n"
for _, peer := range conR.Switch.Peers().List() { for _, peer := range conR.Switch.Peers().List() {
ps := peer.Get(types.PeerStateKey).(*PeerState) ps, ok := peer.Get(types.PeerStateKey).(*PeerState)
if !ok {
panic(fmt.Sprintf("Peer %v has no state", peer))
}
s += indent + " " + ps.StringIndented(indent+" ") + "\n" s += indent + " " + ps.StringIndented(indent+" ") + "\n"
} }
s += indent + "}" s += indent + "}"

View File

@ -58,7 +58,18 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error {
if err != nil { if err != nil {
return errors.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep) return errors.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)
} }
defer cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) defer func() {
// drain newStepCh to make sure we don't block
LOOP:
for {
select {
case <-newStepCh:
default:
break LOOP
}
}
cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
}()
// just open the file for reading, no need to use wal // just open the file for reading, no need to use wal
fp, err := os.OpenFile(file, os.O_RDONLY, 0600) fp, err := os.OpenFile(file, os.O_RDONLY, 0600)
@ -221,7 +232,18 @@ func (pb *playback) replayConsoleLoop() int {
if err != nil { if err != nil {
cmn.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)) cmn.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep))
} }
defer pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) defer func() {
// drain newStepCh to make sure we don't block
LOOP:
for {
select {
case <-newStepCh:
default:
break LOOP
}
}
pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
}()
if len(tokens) == 1 { if len(tokens) == 1 {
if err := pb.replayReset(1, newStepCh); err != nil { if err := pb.replayReset(1, newStepCh); err != nil {

View File

@ -207,18 +207,16 @@ func (cs *ConsensusState) GetState() sm.State {
// GetLastHeight returns the last height committed. // GetLastHeight returns the last height committed.
// If there were no blocks, returns 0. // If there were no blocks, returns 0.
func (cs *ConsensusState) GetLastHeight() int64 { func (cs *ConsensusState) GetLastHeight() int64 {
cs.mtx.Lock() cs.mtx.RLock()
defer cs.mtx.Unlock() defer cs.mtx.RUnlock()
return cs.RoundState.Height - 1 return cs.RoundState.Height - 1
} }
// GetRoundState returns a shallow copy of the internal consensus state. // GetRoundState returns a shallow copy of the internal consensus state.
func (cs *ConsensusState) GetRoundState() *cstypes.RoundState { func (cs *ConsensusState) GetRoundState() *cstypes.RoundState {
cs.mtx.RLock() cs.mtx.RLock()
defer cs.mtx.RUnlock()
rs := cs.RoundState // copy rs := cs.RoundState // copy
cs.mtx.RUnlock()
return &rs return &rs
} }
@ -226,7 +224,6 @@ func (cs *ConsensusState) GetRoundState() *cstypes.RoundState {
func (cs *ConsensusState) GetRoundStateJSON() ([]byte, error) { func (cs *ConsensusState) GetRoundStateJSON() ([]byte, error) {
cs.mtx.RLock() cs.mtx.RLock()
defer cs.mtx.RUnlock() defer cs.mtx.RUnlock()
return cdc.MarshalJSON(cs.RoundState) return cdc.MarshalJSON(cs.RoundState)
} }
@ -234,7 +231,6 @@ func (cs *ConsensusState) GetRoundStateJSON() ([]byte, error) {
func (cs *ConsensusState) GetRoundStateSimpleJSON() ([]byte, error) { func (cs *ConsensusState) GetRoundStateSimpleJSON() ([]byte, error) {
cs.mtx.RLock() cs.mtx.RLock()
defer cs.mtx.RUnlock() defer cs.mtx.RUnlock()
return cdc.MarshalJSON(cs.RoundState.RoundStateSimple()) return cdc.MarshalJSON(cs.RoundState.RoundStateSimple())
} }
@ -248,15 +244,15 @@ func (cs *ConsensusState) GetValidators() (int64, []*types.Validator) {
// SetPrivValidator sets the private validator account for signing votes. // SetPrivValidator sets the private validator account for signing votes.
func (cs *ConsensusState) SetPrivValidator(priv types.PrivValidator) { func (cs *ConsensusState) SetPrivValidator(priv types.PrivValidator) {
cs.mtx.Lock() cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.privValidator = priv cs.privValidator = priv
cs.mtx.Unlock()
} }
// SetTimeoutTicker sets the local timer. It may be useful to overwrite for testing. // SetTimeoutTicker sets the local timer. It may be useful to overwrite for testing.
func (cs *ConsensusState) SetTimeoutTicker(timeoutTicker TimeoutTicker) { func (cs *ConsensusState) SetTimeoutTicker(timeoutTicker TimeoutTicker) {
cs.mtx.Lock() cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.timeoutTicker = timeoutTicker cs.timeoutTicker = timeoutTicker
cs.mtx.Unlock()
} }
// LoadCommit loads the commit for a given height. // LoadCommit loads the commit for a given height.

View File

@ -160,12 +160,15 @@ func (evR *EvidenceReactor) broadcastEvidenceRoutine(peer p2p.Peer) {
// Returns the message to send the peer, or nil if the evidence is invalid for the peer. // Returns the message to send the peer, or nil if the evidence is invalid for the peer.
// If message is nil, return true if we should sleep and try again. // If message is nil, return true if we should sleep and try again.
func (evR EvidenceReactor) checkSendEvidenceMessage(peer p2p.Peer, ev types.Evidence) (msg EvidenceMessage, retry bool) { func (evR EvidenceReactor) checkSendEvidenceMessage(peer p2p.Peer, ev types.Evidence) (msg EvidenceMessage, retry bool) {
// make sure the peer is up to date // make sure the peer is up to date
evHeight := ev.Height() evHeight := ev.Height()
peerState, ok := peer.Get(types.PeerStateKey).(PeerState) peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
if !ok { if !ok {
evR.Logger.Info("Found peer without PeerState", "peer", peer) // Peer does not have a state yet. We set it in the consensus reactor, but
// when we add peer in Switch, the order we call reactors#AddPeer is
// different every time due to us using a map. Sometimes other reactors
// will be initialized before the consensus reactor. We should wait a few
// milliseconds and retry.
return nil, true return nil, true
} }

View File

@ -165,6 +165,16 @@ func TestReactorSelectiveBroadcast(t *testing.T) {
// make reactors from statedb // make reactors from statedb
reactors := makeAndConnectEvidenceReactors(config, []dbm.DB{stateDB1, stateDB2}) reactors := makeAndConnectEvidenceReactors(config, []dbm.DB{stateDB1, stateDB2})
// set the peer height on each reactor
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
ps := peerState{height1}
peer.Set(types.PeerStateKey, ps)
}
}
// update the first reactor peer's height to be very small
peer := reactors[0].Switch.Peers().List()[0] peer := reactors[0].Switch.Peers().List()[0]
ps := peerState{height2} ps := peerState{height2}
peer.Set(types.PeerStateKey, ps) peer.Set(types.PeerStateKey, ps)

View File

@ -30,9 +30,15 @@
// //
// s.Subscribe(ctx, sub, qry, out) // s.Subscribe(ctx, sub, qry, out)
// defer func() { // defer func() {
// for range out { // // drain out to make sure we don't block
// // drain out to make sure we don't block // LOOP:
// } // for {
// select {
// case <-out:
// default:
// break LOOP
// }
// }
// s.UnsubscribeAll(ctx, sub) // s.UnsubscribeAll(ctx, sub)
// }() // }()
// for msg := range out { // for msg := range out {

View File

@ -300,6 +300,7 @@ func (mem *Mempool) TxsWaitChan() <-chan struct{} {
// CONTRACT: Either cb will get called, or err returned. // CONTRACT: Either cb will get called, or err returned.
func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
mem.proxyMtx.Lock() mem.proxyMtx.Lock()
// use defer to unlock mutex because application (*local client*) might panic
defer mem.proxyMtx.Unlock() defer mem.proxyMtx.Unlock()
if mem.Size() >= mem.config.Size { if mem.Size() >= mem.config.Size {

View File

@ -133,16 +133,23 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
} }
memTx := next.Value.(*mempoolTx) memTx := next.Value.(*mempoolTx)
// make sure the peer is up to date // make sure the peer is up to date
height := memTx.Height() peerState, ok := peer.Get(types.PeerStateKey).(PeerState)
if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil { if !ok {
peerState := peerState_i.(PeerState) // Peer does not have a state yet. We set it in the consensus reactor, but
peerHeight := peerState.GetHeight() // when we add peer in Switch, the order we call reactors#AddPeer is
if peerHeight < height-1 { // Allow for a lag of 1 block // different every time due to us using a map. Sometimes other reactors
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) // will be initialized before the consensus reactor. We should wait a few
continue // milliseconds and retry.
} time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
} }
if peerState.GetHeight() < memTx.Height()-1 { // Allow for a lag of 1 block
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
}
// send memTx // send memTx
msg := &TxMessage{Tx: memTx.tx} msg := &TxMessage{Tx: memTx.tx}
success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg)) success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg))

View File

@ -21,6 +21,14 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
type peerState struct {
height int64
}
func (ps peerState) GetHeight() int64 {
return ps.height
}
// mempoolLogger is a TestingLogger which uses a different // mempoolLogger is a TestingLogger which uses a different
// color for each validator ("validator" key must exist). // color for each validator ("validator" key must exist).
func mempoolLogger() log.Logger { func mempoolLogger() log.Logger {
@ -107,6 +115,11 @@ func TestReactorBroadcastTxMessage(t *testing.T) {
r.Stop() r.Stop()
} }
}() }()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
peer.Set(types.PeerStateKey, peerState{1})
}
}
// send a bunch of txs to the first reactor's mempool // send a bunch of txs to the first reactor's mempool
// and wait for them all to be received in the others // and wait for them all to be received in the others

View File

@ -162,10 +162,10 @@ func (a *addrBook) FilePath() string {
// AddOurAddress one of our addresses. // AddOurAddress one of our addresses.
func (a *addrBook) AddOurAddress(addr *p2p.NetAddress) { func (a *addrBook) AddOurAddress(addr *p2p.NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.Logger.Info("Add our address to book", "addr", addr) a.Logger.Info("Add our address to book", "addr", addr)
a.mtx.Lock()
a.ourAddrs[addr.String()] = struct{}{} a.ourAddrs[addr.String()] = struct{}{}
a.mtx.Unlock()
} }
// OurAddress returns true if it is our address. // OurAddress returns true if it is our address.
@ -178,10 +178,10 @@ func (a *addrBook) OurAddress(addr *p2p.NetAddress) bool {
func (a *addrBook) AddPrivateIDs(IDs []string) { func (a *addrBook) AddPrivateIDs(IDs []string) {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock()
for _, id := range IDs { for _, id := range IDs {
a.privateIDs[p2p.ID(id)] = struct{}{} a.privateIDs[p2p.ID(id)] = struct{}{}
} }
a.mtx.Unlock()
} }
// AddAddress implements AddrBook // AddAddress implements AddrBook
@ -202,7 +202,7 @@ func (a *addrBook) RemoveAddress(addr *p2p.NetAddress) {
if ka == nil { if ka == nil {
return return
} }
a.Logger.Info("Remove address from book", "addr", ka.Addr, "ID", ka.ID()) a.Logger.Info("Remove address from book", "addr", addr)
a.removeFromAllBuckets(ka) a.removeFromAllBuckets(ka)
} }
@ -217,8 +217,8 @@ func (a *addrBook) IsGood(addr *p2p.NetAddress) bool {
// HasAddress returns true if the address is in the book. // HasAddress returns true if the address is in the book.
func (a *addrBook) HasAddress(addr *p2p.NetAddress) bool { func (a *addrBook) HasAddress(addr *p2p.NetAddress) bool {
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.addrLookup[addr.ID] ka := a.addrLookup[addr.ID]
a.mtx.Unlock()
return ka != nil return ka != nil
} }
@ -461,13 +461,12 @@ ADDRS_LOOP:
// ListOfKnownAddresses returns the new and old addresses. // ListOfKnownAddresses returns the new and old addresses.
func (a *addrBook) ListOfKnownAddresses() []*knownAddress { func (a *addrBook) ListOfKnownAddresses() []*knownAddress {
a.mtx.Lock()
defer a.mtx.Unlock()
addrs := []*knownAddress{} addrs := []*knownAddress{}
a.mtx.Lock()
for _, addr := range a.addrLookup { for _, addr := range a.addrLookup {
addrs = append(addrs, addr.copy()) addrs = append(addrs, addr.copy())
} }
a.mtx.Unlock()
return addrs return addrs
} }

View File

@ -69,7 +69,18 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
} }
// make sure to unregister after the test is over // make sure to unregister after the test is over
defer c.UnsubscribeAll(ctx, subscriber) defer func() {
// drain evts to make sure we don't block
LOOP:
for {
select {
case <-evts:
default:
break LOOP
}
}
c.UnsubscribeAll(ctx, subscriber)
}()
select { select {
case evt := <-evts: case evt := <-evts:

View File

@ -193,7 +193,10 @@ func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
peers := p2pPeers.Peers().List() peers := p2pPeers.Peers().List()
peerStates := make([]ctypes.PeerStateInfo, len(peers)) peerStates := make([]ctypes.PeerStateInfo, len(peers))
for i, peer := range peers { for i, peer := range peers {
peerState := peer.Get(types.PeerStateKey).(*cm.PeerState) peerState, ok := peer.Get(types.PeerStateKey).(*cm.PeerState)
if !ok { // peer does not have a state yet
continue
}
peerStateJSON, err := peerState.ToJSON() peerStateJSON, err := peerState.ToJSON()
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -8,7 +8,6 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
cmn "github.com/tendermint/tendermint/libs/common"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -51,7 +50,7 @@ import (
func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
err := mempool.CheckTx(tx, nil) err := mempool.CheckTx(tx, nil)
if err != nil { if err != nil {
return nil, fmt.Errorf("Error broadcasting transaction: %v", err) return nil, err
} }
return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil
} }
@ -94,7 +93,7 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
resCh <- res resCh <- res
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("Error broadcasting transaction: %v", err) return nil, err
} }
res := <-resCh res := <-resCh
r := res.GetCheckTx() r := res.GetCheckTx()
@ -106,8 +105,9 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
}, nil }, nil
} }
// CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app) // CONTRACT: only returns error if mempool.CheckTx() errs or if we timeout
// or if we timeout waiting for tx to commit. // waiting for tx to commit.
//
// If CheckTx or DeliverTx fail, no error will be returned, but the returned result // If CheckTx or DeliverTx fail, no error will be returned, but the returned result
// will contain a non-OK ABCI code. // will contain a non-OK ABCI code.
// //
@ -150,20 +150,31 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// |-----------+------+---------+----------+-----------------| // |-----------+------+---------+----------+-----------------|
// | tx | Tx | nil | true | The transaction | // | tx | Tx | nil | true | The transaction |
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// subscribe to tx being committed in block // Subscribe to tx being committed in block.
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel() defer cancel()
deliverTxResCh := make(chan interface{}) deliverTxResCh := make(chan interface{}, 1)
q := types.EventQueryTxFor(tx) q := types.EventQueryTxFor(tx)
err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh) err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh)
if err != nil { if err != nil {
err = errors.Wrap(err, "failed to subscribe to tx") err = errors.Wrap(err, "failed to subscribe to tx")
logger.Error("Error on broadcastTxCommit", "err", err) logger.Error("Error on broadcast_tx_commit", "err", err)
return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err) return nil, err
} }
defer eventBus.Unsubscribe(context.Background(), "mempool", q) defer func() {
// drain deliverTxResCh to make sure we don't block
LOOP:
for {
select {
case <-deliverTxResCh:
default:
break LOOP
}
}
eventBus.Unsubscribe(context.Background(), "mempool", q)
}()
// broadcast the tx and register checktx callback // Broadcast tx and wait for CheckTx result
checkTxResCh := make(chan *abci.Response, 1) checkTxResCh := make(chan *abci.Response, 1)
err = mempool.CheckTx(tx, func(res *abci.Response) { err = mempool.CheckTx(tx, func(res *abci.Response) {
checkTxResCh <- res checkTxResCh <- res
@ -172,40 +183,35 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
logger.Error("Error on broadcastTxCommit", "err", err) logger.Error("Error on broadcastTxCommit", "err", err)
return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err) return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err)
} }
checkTxRes := <-checkTxResCh checkTxResMsg := <-checkTxResCh
checkTxR := checkTxRes.GetCheckTx() checkTxRes := checkTxResMsg.GetCheckTx()
if checkTxR.Code != abci.CodeTypeOK { if checkTxRes.Code != abci.CodeTypeOK {
// CheckTx failed!
return &ctypes.ResultBroadcastTxCommit{ return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxR, CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{}, DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(), Hash: tx.Hash(),
}, nil }, nil
} }
// Wait for the tx to be included in a block, // Wait for the tx to be included in a block or timeout.
// timeout after something reasonable. var deliverTxTimeout = 10 * time.Second // TODO: configurable?
// TODO: configurable?
timer := time.NewTimer(60 * 2 * time.Second)
select { select {
case deliverTxResMsg := <-deliverTxResCh: case deliverTxResMsg := <-deliverTxResCh: // The tx was included in a block.
deliverTxRes := deliverTxResMsg.(types.EventDataTx) deliverTxRes := deliverTxResMsg.(types.EventDataTx)
// The tx was included in a block.
deliverTxR := deliverTxRes.Result
logger.Info("DeliverTx passed ", "tx", cmn.HexBytes(tx), "response", deliverTxR)
return &ctypes.ResultBroadcastTxCommit{ return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxR, CheckTx: *checkTxRes,
DeliverTx: deliverTxR, DeliverTx: deliverTxRes.Result,
Hash: tx.Hash(), Hash: tx.Hash(),
Height: deliverTxRes.Height, Height: deliverTxRes.Height,
}, nil }, nil
case <-timer.C: case <-time.After(deliverTxTimeout):
logger.Error("failed to include tx") err = errors.New("Timed out waiting for tx to be included in a block")
logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{ return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxR, CheckTx: *checkTxRes,
DeliverTx: abci.ResponseDeliverTx{}, DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(), Hash: tx.Hash(),
}, fmt.Errorf("Timed out waiting for transaction to be included in a block") }, err
} }
} }