diff --git a/mempool/mempool.go b/mempool/mempool.go index 04dbe50a..0cdd1dee 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -159,6 +159,12 @@ func (mem *Mempool) Size() int { return mem.txs.Len() } +// Flushes the mempool connection to ensure async resCb calls are done e.g. +// from CheckTx. +func (mem *Mempool) FlushAppConn() error { + return mem.proxyAppConn.FlushSync() +} + // Flush removes all transactions from the mempool and cache func (mem *Mempool) Flush() { mem.proxyMtx.Lock() @@ -347,9 +353,6 @@ func (mem *Mempool) collectTxs(maxTxs int) types.Txs { // NOTE: this should be called *after* block is committed by consensus. // NOTE: unsafe; Lock/Unlock must be managed by caller func (mem *Mempool) Update(height int64, txs types.Txs) error { - if err := mem.proxyAppConn.FlushSync(); err != nil { // To flush async resCb calls e.g. from CheckTx - return err - } // First, create a lookup map of txns in new txs. txsMap := make(map[string]struct{}) for _, tx := range txs { diff --git a/state/execution.go b/state/execution.go index 921799b8..6eb50f2f 100644 --- a/state/execution.go +++ b/state/execution.go @@ -127,6 +127,14 @@ func (blockExec *BlockExecutor) Commit(block *types.Block) ([]byte, error) { blockExec.mempool.Lock() defer blockExec.mempool.Unlock() + // while mempool is Locked, flush to ensure all async requests have completed + // in the ABCI app before Commit. + err := blockExec.mempool.FlushAppConn() + if err != nil { + blockExec.logger.Error("Client error during mempool.FlushAppConn", "err", err) + return nil, err + } + // Commit block, get hash back res, err := blockExec.proxyApp.CommitSync() if err != nil { diff --git a/types/services.go b/types/services.go index 6900fae7..6b2be8a5 100644 --- a/types/services.go +++ b/types/services.go @@ -27,6 +27,7 @@ type Mempool interface { Reap(int) Txs Update(height int64, txs Txs) error Flush() + FlushAppConn() error TxsAvailable() <-chan int64 EnableTxsAvailable() @@ -44,6 +45,7 @@ func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil func (m MockMempool) Reap(n int) Txs { return Txs{} } func (m MockMempool) Update(height int64, txs Txs) error { return nil } func (m MockMempool) Flush() {} +func (m MockMempool) FlushAppConn() error { return nil } func (m MockMempool) TxsAvailable() <-chan int64 { return make(chan int64) } func (m MockMempool) EnableTxsAvailable() {}