From 5f3048bd09112219b1a870c781633dfc74f0df5f Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 23 Jan 2018 16:56:14 +0400 Subject: [PATCH] call FlushSync before calling CommitSync if we call it after, we might receive a "fresh" transaction from `broadcast_tx_sync` before old transactions (which were not committed). Refs #1091 ``` Commit is called with a lock on the mempool, meaning no calls to CheckTx can start. However, since CheckTx is called async in the mempool connection, some CheckTx might have already "sailed", when the lock is released in the mempool and Commit proceeds. Then, that spurious CheckTx has not yet "begun" in the ABCI app (stuck in transport?). Instead, ABCI app manages to start to process the Commit. Next, the spurious, "sailed" CheckTx happens in the wrong place. ``` --- mempool/mempool.go | 9 ++++++--- state/execution.go | 8 ++++++++ types/services.go | 2 ++ 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/mempool/mempool.go b/mempool/mempool.go index 04dbe50a..0cdd1dee 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -159,6 +159,12 @@ func (mem *Mempool) Size() int { return mem.txs.Len() } +// Flushes the mempool connection to ensure async resCb calls are done e.g. +// from CheckTx. +func (mem *Mempool) FlushAppConn() error { + return mem.proxyAppConn.FlushSync() +} + // Flush removes all transactions from the mempool and cache func (mem *Mempool) Flush() { mem.proxyMtx.Lock() @@ -347,9 +353,6 @@ func (mem *Mempool) collectTxs(maxTxs int) types.Txs { // NOTE: this should be called *after* block is committed by consensus. // NOTE: unsafe; Lock/Unlock must be managed by caller func (mem *Mempool) Update(height int64, txs types.Txs) error { - if err := mem.proxyAppConn.FlushSync(); err != nil { // To flush async resCb calls e.g. from CheckTx - return err - } // First, create a lookup map of txns in new txs. txsMap := make(map[string]struct{}) for _, tx := range txs { diff --git a/state/execution.go b/state/execution.go index 921799b8..6eb50f2f 100644 --- a/state/execution.go +++ b/state/execution.go @@ -127,6 +127,14 @@ func (blockExec *BlockExecutor) Commit(block *types.Block) ([]byte, error) { blockExec.mempool.Lock() defer blockExec.mempool.Unlock() + // while mempool is Locked, flush to ensure all async requests have completed + // in the ABCI app before Commit. + err := blockExec.mempool.FlushAppConn() + if err != nil { + blockExec.logger.Error("Client error during mempool.FlushAppConn", "err", err) + return nil, err + } + // Commit block, get hash back res, err := blockExec.proxyApp.CommitSync() if err != nil { diff --git a/types/services.go b/types/services.go index 6900fae7..6b2be8a5 100644 --- a/types/services.go +++ b/types/services.go @@ -27,6 +27,7 @@ type Mempool interface { Reap(int) Txs Update(height int64, txs Txs) error Flush() + FlushAppConn() error TxsAvailable() <-chan int64 EnableTxsAvailable() @@ -44,6 +45,7 @@ func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil func (m MockMempool) Reap(n int) Txs { return Txs{} } func (m MockMempool) Update(height int64, txs Txs) error { return nil } func (m MockMempool) Flush() {} +func (m MockMempool) FlushAppConn() error { return nil } func (m MockMempool) TxsAvailable() <-chan int64 { return make(chan int64) } func (m MockMempool) EnableTxsAvailable() {}