Merge pull request #892 from tendermint/mempool-Stop-method
mempool: implement Mempool.CloseWAL
This commit is contained in:
commit
1b120466ee
|
@ -110,6 +110,26 @@ func (mem *Mempool) SetLogger(l log.Logger) {
|
||||||
mem.logger = l
|
mem.logger = l
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CloseWAL closes and discards the underlying WAL file.
|
||||||
|
// Any further writes will not be relayed to disk.
|
||||||
|
func (mem *Mempool) CloseWAL() bool {
|
||||||
|
if mem == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
mem.proxyMtx.Lock()
|
||||||
|
defer mem.proxyMtx.Unlock()
|
||||||
|
|
||||||
|
if mem.wal == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if err := mem.wal.Close(); err != nil && mem.logger != nil {
|
||||||
|
mem.logger.Error("Mempool.CloseWAL", "err", err)
|
||||||
|
}
|
||||||
|
mem.wal = nil
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (mem *Mempool) initWAL() {
|
func (mem *Mempool) initWAL() {
|
||||||
walDir := mem.config.WalDir()
|
walDir := mem.config.WalDir()
|
||||||
if walDir != "" {
|
if walDir != "" {
|
||||||
|
|
|
@ -1,8 +1,13 @@
|
||||||
package mempool
|
package mempool
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/md5"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -13,6 +18,8 @@ import (
|
||||||
cfg "github.com/tendermint/tendermint/config"
|
cfg "github.com/tendermint/tendermint/config"
|
||||||
"github.com/tendermint/tendermint/proxy"
|
"github.com/tendermint/tendermint/proxy"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
|
func newMempoolWithApp(cc proxy.ClientCreator) *Mempool {
|
||||||
|
@ -57,7 +64,7 @@ func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
if err := mempool.CheckTx(txBytes, nil); err != nil {
|
if err := mempool.CheckTx(txBytes, nil); err != nil {
|
||||||
t.Fatal("Error after CheckTx: %v", err)
|
t.Fatalf("Error after CheckTx: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return txs
|
return txs
|
||||||
|
@ -127,7 +134,7 @@ func TestSerialReap(t *testing.T) {
|
||||||
binary.BigEndian.PutUint64(txBytes, uint64(i))
|
binary.BigEndian.PutUint64(txBytes, uint64(i))
|
||||||
err := mempool.CheckTx(txBytes, nil)
|
err := mempool.CheckTx(txBytes, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error after CheckTx: %v", err)
|
t.Fatalf("Error after CheckTx: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// This will fail because not serial (incrementing)
|
// This will fail because not serial (incrementing)
|
||||||
|
@ -135,7 +142,7 @@ func TestSerialReap(t *testing.T) {
|
||||||
// It just won't show up on Reap().
|
// It just won't show up on Reap().
|
||||||
err = mempool.CheckTx(txBytes, nil)
|
err = mempool.CheckTx(txBytes, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal("Error after CheckTx: %v", err)
|
t.Fatalf("Error after CheckTx: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -211,3 +218,63 @@ func TestSerialReap(t *testing.T) {
|
||||||
// We should have 600 now.
|
// We should have 600 now.
|
||||||
reapCheck(600)
|
reapCheck(600)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMempoolCloseWAL(t *testing.T) {
|
||||||
|
// 1. Create the temporary directory for mempool and WAL testing.
|
||||||
|
rootDir, err := ioutil.TempDir("", "mempool-test")
|
||||||
|
require.Nil(t, err, "expecting successful tmpdir creation")
|
||||||
|
defer os.RemoveAll(rootDir)
|
||||||
|
|
||||||
|
// 2. Ensure that it doesn't contain any elements -- Sanity check
|
||||||
|
m1, err := filepath.Glob(filepath.Join(rootDir, "*"))
|
||||||
|
require.Nil(t, err, "successful globbing expected")
|
||||||
|
require.Equal(t, 0, len(m1), "no matches yet")
|
||||||
|
|
||||||
|
// 3. Create the mempool
|
||||||
|
wcfg := *(cfg.DefaultMempoolConfig())
|
||||||
|
wcfg.RootDir = rootDir
|
||||||
|
app := dummy.NewDummyApplication()
|
||||||
|
cc := proxy.NewLocalClientCreator(app)
|
||||||
|
appConnMem, _ := cc.NewABCIClient()
|
||||||
|
mempool := NewMempool(&wcfg, appConnMem, 10)
|
||||||
|
|
||||||
|
// 4. Ensure that the directory contains the WAL file
|
||||||
|
m2, err := filepath.Glob(filepath.Join(rootDir, "*"))
|
||||||
|
require.Nil(t, err, "successful globbing expected")
|
||||||
|
require.Equal(t, 1, len(m2), "expecting the wal match in")
|
||||||
|
|
||||||
|
// 5. Write some contents to the WAL
|
||||||
|
mempool.CheckTx(types.Tx([]byte("foo")), nil)
|
||||||
|
walFilepath := mempool.wal.Path
|
||||||
|
sum1 := checksumFile(walFilepath, t)
|
||||||
|
|
||||||
|
// 6. Sanity check to ensure that the written TX matches the expectation.
|
||||||
|
require.Equal(t, sum1, checksumIt([]byte("foo\n")), "foo with a newline should be written")
|
||||||
|
|
||||||
|
// 7. Invoke CloseWAL() and ensure it discards the
|
||||||
|
// WAL thus any other write won't go through.
|
||||||
|
require.True(t, mempool.CloseWAL(), "CloseWAL should CloseWAL")
|
||||||
|
mempool.CheckTx(types.Tx([]byte("bar")), nil)
|
||||||
|
sum2 := checksumFile(walFilepath, t)
|
||||||
|
require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded")
|
||||||
|
|
||||||
|
// 8. Second CloseWAL should do nothing
|
||||||
|
require.False(t, mempool.CloseWAL(), "CloseWAL should CloseWAL")
|
||||||
|
|
||||||
|
// 9. Sanity check to ensure that the WAL file still exists
|
||||||
|
m3, err := filepath.Glob(filepath.Join(rootDir, "*"))
|
||||||
|
require.Nil(t, err, "successful globbing expected")
|
||||||
|
require.Equal(t, 1, len(m3), "expecting the wal match in")
|
||||||
|
}
|
||||||
|
|
||||||
|
func checksumIt(data []byte) string {
|
||||||
|
h := md5.New()
|
||||||
|
h.Write(data)
|
||||||
|
return fmt.Sprintf("%x", h.Sum(nil))
|
||||||
|
}
|
||||||
|
|
||||||
|
func checksumFile(p string, t *testing.T) string {
|
||||||
|
data, err := ioutil.ReadFile(p)
|
||||||
|
require.Nil(t, err, "expecting successful read of %q", p)
|
||||||
|
return checksumIt(data)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue