Merge pull request #1585 from tendermint/345-mempool-cache
Mempool cache
This commit is contained in:
commit
de8d4325de
|
@ -335,6 +335,7 @@ type MempoolConfig struct {
|
||||||
RecheckEmpty bool `mapstructure:"recheck_empty"`
|
RecheckEmpty bool `mapstructure:"recheck_empty"`
|
||||||
Broadcast bool `mapstructure:"broadcast"`
|
Broadcast bool `mapstructure:"broadcast"`
|
||||||
WalPath string `mapstructure:"wal_dir"`
|
WalPath string `mapstructure:"wal_dir"`
|
||||||
|
Size int `mapstructure:"size"`
|
||||||
CacheSize int `mapstructure:"cache_size"`
|
CacheSize int `mapstructure:"cache_size"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,6 +346,7 @@ func DefaultMempoolConfig() *MempoolConfig {
|
||||||
RecheckEmpty: true,
|
RecheckEmpty: true,
|
||||||
Broadcast: true,
|
Broadcast: true,
|
||||||
WalPath: filepath.Join(defaultDataDir, "mempool.wal"),
|
WalPath: filepath.Join(defaultDataDir, "mempool.wal"),
|
||||||
|
Size: 100000,
|
||||||
CacheSize: 100000,
|
CacheSize: 100000,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -179,6 +179,12 @@ recheck_empty = {{ .Mempool.RecheckEmpty }}
|
||||||
broadcast = {{ .Mempool.Broadcast }}
|
broadcast = {{ .Mempool.Broadcast }}
|
||||||
wal_dir = "{{ .Mempool.WalPath }}"
|
wal_dir = "{{ .Mempool.WalPath }}"
|
||||||
|
|
||||||
|
# size of the mempool
|
||||||
|
size = {{ .Mempool.Size }}
|
||||||
|
|
||||||
|
# size of the cache (used to filter transactions we saw earlier)
|
||||||
|
cache_size = {{ .Mempool.CacheSize }}
|
||||||
|
|
||||||
##### consensus configuration options #####
|
##### consensus configuration options #####
|
||||||
[consensus]
|
[consensus]
|
||||||
|
|
||||||
|
|
|
@ -178,21 +178,22 @@ connection, to query the local state of the app.
|
||||||
Mempool Connection
|
Mempool Connection
|
||||||
~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
The mempool connection is used *only* for CheckTx requests. Transactions
|
The mempool connection is used *only* for CheckTx requests.
|
||||||
are run using CheckTx in the same order they were received by the
|
Transactions are run using CheckTx in the same order they were
|
||||||
validator. If the CheckTx returns ``OK``, the transaction is kept in
|
received by the validator. If the CheckTx returns ``OK``, the
|
||||||
memory and relayed to other peers in the same order it was received.
|
transaction is kept in memory and relayed to other peers in the same
|
||||||
Otherwise, it is discarded.
|
order it was received. Otherwise, it is discarded.
|
||||||
|
|
||||||
CheckTx requests run concurrently with block processing; so they should
|
CheckTx requests run concurrently with block processing; so they
|
||||||
run against a copy of the main application state which is reset after
|
should run against a copy of the main application state which is reset
|
||||||
every block. This copy is necessary to track transitions made by a
|
after every block. This copy is necessary to track transitions made by
|
||||||
sequence of CheckTx requests before they are included in a block. When a
|
a sequence of CheckTx requests before they are included in a block.
|
||||||
block is committed, the application must ensure to reset the mempool
|
When a block is committed, the application must ensure to reset the
|
||||||
state to the latest committed state. Tendermint Core will then filter
|
mempool state to the latest committed state. Tendermint Core will then
|
||||||
through all transactions in the mempool, removing any that were included
|
filter through all transactions in the mempool, removing any that were
|
||||||
in the block, and re-run the rest using CheckTx against the post-Commit
|
included in the block, and re-run the rest using CheckTx against the
|
||||||
mempool state.
|
post-Commit mempool state (this behaviour can be turned off with
|
||||||
|
``[mempool] recheck = false``).
|
||||||
|
|
||||||
.. container:: toggle
|
.. container:: toggle
|
||||||
|
|
||||||
|
@ -226,6 +227,23 @@ mempool state.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Replay Protection
|
||||||
|
^^^^^^^^^^^^^^^^^
|
||||||
|
To prevent old transactions from being replayed, CheckTx must
|
||||||
|
implement replay protection.
|
||||||
|
|
||||||
|
Tendermint provides the first defence layer by keeping a lightweight
|
||||||
|
in-memory cache of 100k (``[mempool] cache_size``) last transactions in
|
||||||
|
the mempool. If Tendermint is just started or the clients sent more
|
||||||
|
than 100k transactions, old transactions may be sent to the
|
||||||
|
application. So it is important CheckTx implements some logic to
|
||||||
|
handle them.
|
||||||
|
|
||||||
|
There are cases where a transaction will (or may) become valid in some
|
||||||
|
future state, in which case you probably want to disable Tendermint's
|
||||||
|
cache. You can do that by setting ``[mempool] cache_size = 0`` in the
|
||||||
|
config.
|
||||||
|
|
||||||
Consensus Connection
|
Consensus Connection
|
||||||
~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
|
|
|
@ -136,6 +136,12 @@ like the file below, however, double check by inspecting the
|
||||||
broadcast = true
|
broadcast = true
|
||||||
wal_dir = "data/mempool.wal"
|
wal_dir = "data/mempool.wal"
|
||||||
|
|
||||||
|
# size of the mempool
|
||||||
|
size = 100000
|
||||||
|
|
||||||
|
# size of the cache (used to filter transactions we saw earlier)
|
||||||
|
cache_size = 100000
|
||||||
|
|
||||||
##### consensus configuration options #####
|
##### consensus configuration options #####
|
||||||
[consensus]
|
[consensus]
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
Transactional Semantics
|
||||||
|
=======================
|
||||||
|
|
||||||
|
In `"Using
|
||||||
|
Tendermint"<./specification/using-tendermint.html#broadcast-api>`__ we
|
||||||
|
discussed different API endpoints for sending transactions and
|
||||||
|
differences between them.
|
||||||
|
|
||||||
|
What we have not yet covered is transactional semantics.
|
||||||
|
|
||||||
|
When you send a transaction using one of the available methods, it
|
||||||
|
first goes to the mempool. Currently, it does not provide strong
|
||||||
|
guarantees like "if the transaction were accepted, it would be
|
||||||
|
eventually included in a block (given CheckTx passes)."
|
||||||
|
|
||||||
|
For instance a tx could enter the mempool, but before it can be sent
|
||||||
|
to peers the node crashes.
|
||||||
|
|
||||||
|
We are planning to provide such guarantees by using a WAL and
|
||||||
|
replaying transactions (See
|
||||||
|
`GH#248<https://github.com/tendermint/tendermint/issues/248>`__), but
|
||||||
|
it's non-trivial to do this all efficiently.
|
||||||
|
|
||||||
|
The temporary solution is for clients to monitor the node and resubmit
|
||||||
|
transaction(s) or/and send them to more nodes at once, so the
|
||||||
|
probability of all of them crashing at the same time and losing the
|
||||||
|
msg decreases substantially.
|
|
@ -214,7 +214,7 @@ Broadcast API
|
||||||
Earlier, we used the ``broadcast_tx_commit`` endpoint to send a
|
Earlier, we used the ``broadcast_tx_commit`` endpoint to send a
|
||||||
transaction. When a transaction is sent to a Tendermint node, it will
|
transaction. When a transaction is sent to a Tendermint node, it will
|
||||||
run via ``CheckTx`` against the application. If it passes ``CheckTx``,
|
run via ``CheckTx`` against the application. If it passes ``CheckTx``,
|
||||||
it will be included in the mempool, broadcast to other peers, and
|
it will be included in the mempool, broadcasted to other peers, and
|
||||||
eventually included in a block.
|
eventually included in a block.
|
||||||
|
|
||||||
Since there are multiple phases to processing a transaction, we offer
|
Since there are multiple phases to processing a transaction, we offer
|
||||||
|
|
|
@ -49,7 +49,13 @@ TODO: Better handle abci client errors. (make it automatically handle connection
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
var ErrTxInCache = errors.New("Tx already exists in cache")
|
var (
|
||||||
|
// ErrTxInCache is returned to the client if we saw tx earlier
|
||||||
|
ErrTxInCache = errors.New("Tx already exists in cache")
|
||||||
|
|
||||||
|
// ErrMempoolIsFull means Tendermint & an application can't handle that much load
|
||||||
|
ErrMempoolIsFull = errors.New("Mempool is full")
|
||||||
|
)
|
||||||
|
|
||||||
// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus
|
// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus
|
||||||
// round. Transaction validity is checked using the CheckTx abci message before the transaction is
|
// round. Transaction validity is checked using the CheckTx abci message before the transaction is
|
||||||
|
@ -80,7 +86,6 @@ type Mempool struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewMempool returns a new Mempool with the given configuration and connection to an application.
|
// NewMempool returns a new Mempool with the given configuration and connection to an application.
|
||||||
// TODO: Extract logger into arguments.
|
|
||||||
func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int64) *Mempool {
|
func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, height int64) *Mempool {
|
||||||
mempool := &Mempool{
|
mempool := &Mempool{
|
||||||
config: config,
|
config: config,
|
||||||
|
@ -202,11 +207,14 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
|
||||||
mem.proxyMtx.Lock()
|
mem.proxyMtx.Lock()
|
||||||
defer mem.proxyMtx.Unlock()
|
defer mem.proxyMtx.Unlock()
|
||||||
|
|
||||||
|
if mem.Size() >= mem.config.Size {
|
||||||
|
return ErrMempoolIsFull
|
||||||
|
}
|
||||||
|
|
||||||
// CACHE
|
// CACHE
|
||||||
if mem.cache.Exists(tx) {
|
if !mem.cache.Push(tx) {
|
||||||
return ErrTxInCache
|
return ErrTxInCache
|
||||||
}
|
}
|
||||||
mem.cache.Push(tx)
|
|
||||||
// END CACHE
|
// END CACHE
|
||||||
|
|
||||||
// WAL
|
// WAL
|
||||||
|
@ -264,8 +272,6 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
|
||||||
|
|
||||||
// remove from cache (it might be good later)
|
// remove from cache (it might be good later)
|
||||||
mem.cache.Remove(tx)
|
mem.cache.Remove(tx)
|
||||||
|
|
||||||
// TODO: handle other retcodes
|
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
// ignore other messages
|
// ignore other messages
|
||||||
|
@ -463,14 +469,6 @@ func (cache *txCache) Reset() {
|
||||||
cache.mtx.Unlock()
|
cache.mtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exists returns true if the given tx is cached.
|
|
||||||
func (cache *txCache) Exists(tx types.Tx) bool {
|
|
||||||
cache.mtx.Lock()
|
|
||||||
_, exists := cache.map_[string(tx)]
|
|
||||||
cache.mtx.Unlock()
|
|
||||||
return exists
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push adds the given tx to the txCache. It returns false if tx is already in the cache.
|
// Push adds the given tx to the txCache. It returns false if tx is already in the cache.
|
||||||
func (cache *txCache) Push(tx types.Tx) bool {
|
func (cache *txCache) Push(tx types.Tx) bool {
|
||||||
cache.mtx.Lock()
|
cache.mtx.Lock()
|
||||||
|
|
Loading…
Reference in New Issue