Merge pull request #1143 from tendermint/1091-race-condition

call FlushSync before calling CommitSync
This commit is contained in:
Ethan Buchman 2018-01-24 14:22:43 -05:00 committed by GitHub
commit 57cc8ab977
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 3 deletions

View File

@ -159,6 +159,12 @@ func (mem *Mempool) Size() int {
return mem.txs.Len()
}
// Flushes the mempool connection to ensure async resCb calls are done e.g.
// from CheckTx.
func (mem *Mempool) FlushAppConn() error {
return mem.proxyAppConn.FlushSync()
}
// Flush removes all transactions from the mempool and cache
func (mem *Mempool) Flush() {
mem.proxyMtx.Lock()
@ -347,9 +353,6 @@ func (mem *Mempool) collectTxs(maxTxs int) types.Txs {
// NOTE: this should be called *after* block is committed by consensus.
// NOTE: unsafe; Lock/Unlock must be managed by caller
func (mem *Mempool) Update(height int64, txs types.Txs) error {
if err := mem.proxyAppConn.FlushSync(); err != nil { // To flush async resCb calls e.g. from CheckTx
return err
}
// First, create a lookup map of txns in new txs.
txsMap := make(map[string]struct{})
for _, tx := range txs {

View File

@ -127,6 +127,14 @@ func (blockExec *BlockExecutor) Commit(block *types.Block) ([]byte, error) {
blockExec.mempool.Lock()
defer blockExec.mempool.Unlock()
// while mempool is Locked, flush to ensure all async requests have completed
// in the ABCI app before Commit.
err := blockExec.mempool.FlushAppConn()
if err != nil {
blockExec.logger.Error("Client error during mempool.FlushAppConn", "err", err)
return nil, err
}
// Commit block, get hash back
res, err := blockExec.proxyApp.CommitSync()
if err != nil {

View File

@ -27,6 +27,7 @@ type Mempool interface {
Reap(int) Txs
Update(height int64, txs Txs) error
Flush()
FlushAppConn() error
TxsAvailable() <-chan int64
EnableTxsAvailable()
@ -44,6 +45,7 @@ func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil
func (m MockMempool) Reap(n int) Txs { return Txs{} }
func (m MockMempool) Update(height int64, txs Txs) error { return nil }
func (m MockMempool) Flush() {}
func (m MockMempool) FlushAppConn() error { return nil }
func (m MockMempool) TxsAvailable() <-chan int64 { return make(chan int64) }
func (m MockMempool) EnableTxsAvailable() {}