From 3595b5931a02fd22af6f0b2e756f0c1f437022f6 Mon Sep 17 00:00:00 2001 From: Emmanuel Odeke Date: Thu, 23 Nov 2017 17:18:05 -0700 Subject: [PATCH] mempool: implement Mempool.CloseWAL Fixes https://github.com/tendermint/tendermint/issues/890 Add a CloseWAL method to Mempool to close the underlying WAL file and then discard it so that further writes to it will have no effect. --- mempool/mempool.go | 20 +++++++++++ mempool/mempool_test.go | 73 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/mempool/mempool.go b/mempool/mempool.go index d781500c..7ccea410 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -110,6 +110,26 @@ func (mem *Mempool) SetLogger(l log.Logger) { mem.logger = l } +// CloseWAL closes and discards the underlying WAL file. +// Any further writes will not be relayed to disk. +func (mem *Mempool) CloseWAL() bool { + if mem == nil { + return false + } + + mem.proxyMtx.Lock() + defer mem.proxyMtx.Unlock() + + if mem.wal == nil { + return false + } + if err := mem.wal.Close(); err != nil && mem.logger != nil { + mem.logger.Error("Mempool.CloseWAL", "err", err) + } + mem.wal = nil + return true +} + func (mem *Mempool) initWAL() { walDir := mem.config.WalDir() if walDir != "" { diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 7773d9d7..8dea7f0d 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -1,8 +1,13 @@ package mempool import ( + "crypto/md5" "crypto/rand" "encoding/binary" + "fmt" + "io/ioutil" + "os" + "path/filepath" "testing" "time" @@ -13,6 +18,8 @@ import ( cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" + + "github.com/stretchr/testify/require" ) func newMempoolWithApp(cc proxy.ClientCreator) *Mempool { @@ -57,7 +64,7 @@ func checkTxs(t *testing.T, mempool *Mempool, count int) types.Txs { t.Error(err) } if err := mempool.CheckTx(txBytes, nil); err != nil { - t.Fatal("Error after CheckTx: %v", err) + t.Fatalf("Error after CheckTx: %v", err) } } return txs @@ -127,7 +134,7 @@ func TestSerialReap(t *testing.T) { binary.BigEndian.PutUint64(txBytes, uint64(i)) err := mempool.CheckTx(txBytes, nil) if err != nil { - t.Fatal("Error after CheckTx: %v", err) + t.Fatalf("Error after CheckTx: %v", err) } // This will fail because not serial (incrementing) @@ -135,7 +142,7 @@ func TestSerialReap(t *testing.T) { // It just won't show up on Reap(). err = mempool.CheckTx(txBytes, nil) if err != nil { - t.Fatal("Error after CheckTx: %v", err) + t.Fatalf("Error after CheckTx: %v", err) } } @@ -211,3 +218,63 @@ func TestSerialReap(t *testing.T) { // We should have 600 now. reapCheck(600) } + +func TestMempoolCloseWAL(t *testing.T) { + // 1. Create the temporary directory for mempool and WAL testing. + rootDir, err := ioutil.TempDir("", "mempool-test") + require.Nil(t, err, "expecting successful tmpdir creation") + defer os.RemoveAll(rootDir) + + // 2. Ensure that it doesn't contain any elements -- Sanity check + m1, err := filepath.Glob(filepath.Join(rootDir, "*")) + require.Nil(t, err, "successful globbing expected") + require.Equal(t, 0, len(m1), "no matches yet") + + // 3. Create the mempool + wcfg := *(cfg.DefaultMempoolConfig()) + wcfg.RootDir = rootDir + app := dummy.NewDummyApplication() + cc := proxy.NewLocalClientCreator(app) + appConnMem, _ := cc.NewABCIClient() + mempool := NewMempool(&wcfg, appConnMem, 10) + + // 4. Ensure that the directory contains the WAL file + m2, err := filepath.Glob(filepath.Join(rootDir, "*")) + require.Nil(t, err, "successful globbing expected") + require.Equal(t, 1, len(m2), "expecting the wal match in") + + // 5. Write some contents to the WAL + mempool.CheckTx(types.Tx([]byte("foo")), nil) + walFilepath := mempool.wal.Path + sum1 := checksumFile(walFilepath, t) + + // 6. Sanity check to ensure that the written TX matches the expectation. + require.Equal(t, sum1, checksumIt([]byte("foo\n")), "foo with a newline should be written") + + // 7. Invoke CloseWAL() and ensure it discards the + // WAL thus any other write won't go through. + require.True(t, mempool.CloseWAL(), "CloseWAL should CloseWAL") + mempool.CheckTx(types.Tx([]byte("bar")), nil) + sum2 := checksumFile(walFilepath, t) + require.Equal(t, sum1, sum2, "expected no change to the WAL after invoking CloseWAL() since it was discarded") + + // 8. Second CloseWAL should do nothing + require.False(t, mempool.CloseWAL(), "CloseWAL should CloseWAL") + + // 9. Sanity check to ensure that the WAL file still exists + m3, err := filepath.Glob(filepath.Join(rootDir, "*")) + require.Nil(t, err, "successful globbing expected") + require.Equal(t, 1, len(m3), "expecting the wal match in") +} + +func checksumIt(data []byte) string { + h := md5.New() + h.Write(data) + return fmt.Sprintf("%x", h.Sum(nil)) +} + +func checksumFile(p string, t *testing.T) string { + data, err := ioutil.ReadFile(p) + require.Nil(t, err, "expecting successful read of %q", p) + return checksumIt(data) +}