cosmos-sdk/store/v2/flat/store_test.go

587 lines
18 KiB
Go

package flat
import (
"errors"
"math"
"testing"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
dbm "github.com/cosmos/cosmos-sdk/db"
"github.com/cosmos/cosmos-sdk/db/memdb"
"github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/types/kv"
)
var (
cacheSize = 100
alohaData = map[string]string{
"hello": "goodbye",
"aloha": "shalom",
}
)
func newStoreWithData(t *testing.T, db dbm.DBConnection, storeData map[string]string) *Store {
store, err := NewStore(db, DefaultStoreConfig)
require.NoError(t, err)
for k, v := range storeData {
store.Set([]byte(k), []byte(v))
}
return store
}
func newAlohaStore(t *testing.T, db dbm.DBConnection) *Store {
return newStoreWithData(t, db, alohaData)
}
func TestGetSetHasDelete(t *testing.T) {
store := newAlohaStore(t, memdb.NewDB())
key := "hello"
exists := store.Has([]byte(key))
require.True(t, exists)
require.EqualValues(t, []byte(alohaData[key]), store.Get([]byte(key)))
value2 := "notgoodbye"
store.Set([]byte(key), []byte(value2))
require.EqualValues(t, value2, store.Get([]byte(key)))
store.Delete([]byte(key))
exists = store.Has([]byte(key))
require.False(t, exists)
require.Panics(t, func() { store.Get(nil) }, "Get(nil key) should panic")
require.Panics(t, func() { store.Get([]byte{}) }, "Get(empty key) should panic")
require.Panics(t, func() { store.Has(nil) }, "Has(nil key) should panic")
require.Panics(t, func() { store.Has([]byte{}) }, "Has(empty key) should panic")
require.Panics(t, func() { store.Set(nil, []byte("value")) }, "Set(nil key) should panic")
require.Panics(t, func() { store.Set([]byte{}, []byte("value")) }, "Set(empty key) should panic")
require.Panics(t, func() { store.Set([]byte("key"), nil) }, "Set(nil value) should panic")
store.indexTxn = rwCrudFails{store.indexTxn}
require.Panics(t, func() { store.Set([]byte("key"), []byte("value")) },
"Set() when index fails should panic")
}
func TestConstructors(t *testing.T) {
db := memdb.NewDB()
store := newAlohaStore(t, db)
store.Commit()
require.NoError(t, store.Close())
store, err := NewStore(db, DefaultStoreConfig)
require.NoError(t, err)
value := store.Get([]byte("hello"))
require.Equal(t, []byte("goodbye"), value)
require.NoError(t, store.Close())
// Loading with an initial version beyond the lowest should error
opts := StoreConfig{InitialVersion: 5, Pruning: types.PruneNothing}
store, err = NewStore(db, opts)
require.Error(t, err)
db.Close()
store, err = NewStore(dbVersionsFails{memdb.NewDB()}, DefaultStoreConfig)
require.Error(t, err)
store, err = NewStore(db, StoreConfig{MerkleDB: dbVersionsFails{memdb.NewDB()}})
require.Error(t, err)
// can't use a DB with open writers
db = memdb.NewDB()
merkledb := memdb.NewDB()
w := db.Writer()
store, err = NewStore(db, DefaultStoreConfig)
require.Error(t, err)
w.Discard()
w = merkledb.Writer()
store, err = NewStore(db, StoreConfig{MerkleDB: merkledb})
require.Error(t, err)
w.Discard()
// can't use DBs with different version history
merkledb.SaveNextVersion()
store, err = NewStore(db, StoreConfig{MerkleDB: merkledb})
require.Error(t, err)
merkledb.Close()
// can't load existing store when we can't access the latest Merkle root hash
store, err = NewStore(db, DefaultStoreConfig)
require.NoError(t, err)
store.Commit()
require.NoError(t, store.Close())
// because root is misssing
w = db.Writer()
w.Delete(merkleRootKey)
w.Commit()
db.SaveNextVersion()
store, err = NewStore(db, DefaultStoreConfig)
require.Error(t, err)
// or, because of an error
store, err = NewStore(dbRWCrudFails{db}, DefaultStoreConfig)
require.Error(t, err)
}
func TestIterators(t *testing.T) {
store := newStoreWithData(t, memdb.NewDB(), map[string]string{
string([]byte{0x00}): "0",
string([]byte{0x00, 0x00}): "0 0",
string([]byte{0x00, 0x01}): "0 1",
string([]byte{0x00, 0x02}): "0 2",
string([]byte{0x01}): "1",
})
var testCase = func(t *testing.T, iter types.Iterator, expected []string) {
var i int
for i = 0; iter.Valid(); iter.Next() {
expectedValue := expected[i]
value := iter.Value()
require.EqualValues(t, string(value), expectedValue)
i++
}
require.Equal(t, len(expected), i)
}
testCase(t, store.Iterator(nil, nil),
[]string{"0", "0 0", "0 1", "0 2", "1"})
testCase(t, store.Iterator([]byte{0}, nil),
[]string{"0", "0 0", "0 1", "0 2", "1"})
testCase(t, store.Iterator([]byte{0}, []byte{0, 1}),
[]string{"0", "0 0"})
testCase(t, store.Iterator([]byte{0}, []byte{1}),
[]string{"0", "0 0", "0 1", "0 2"})
testCase(t, store.Iterator([]byte{0, 1}, []byte{1}),
[]string{"0 1", "0 2"})
testCase(t, store.Iterator(nil, []byte{1}),
[]string{"0", "0 0", "0 1", "0 2"})
testCase(t, store.Iterator([]byte{0}, []byte{0}), []string{}) // start = end
testCase(t, store.Iterator([]byte{1}, []byte{0}), []string{}) // start > end
testCase(t, store.ReverseIterator(nil, nil),
[]string{"1", "0 2", "0 1", "0 0", "0"})
testCase(t, store.ReverseIterator([]byte{0}, nil),
[]string{"1", "0 2", "0 1", "0 0", "0"})
testCase(t, store.ReverseIterator([]byte{0}, []byte{0, 1}),
[]string{"0 0", "0"})
testCase(t, store.ReverseIterator([]byte{0}, []byte{1}),
[]string{"0 2", "0 1", "0 0", "0"})
testCase(t, store.ReverseIterator([]byte{0, 1}, []byte{1}),
[]string{"0 2", "0 1"})
testCase(t, store.ReverseIterator(nil, []byte{1}),
[]string{"0 2", "0 1", "0 0", "0"})
testCase(t, store.ReverseIterator([]byte{0}, []byte{0}), []string{}) // start = end
testCase(t, store.ReverseIterator([]byte{1}, []byte{0}), []string{}) // start > end
testCase(t, types.KVStorePrefixIterator(store, []byte{0}),
[]string{"0", "0 0", "0 1", "0 2"})
testCase(t, types.KVStoreReversePrefixIterator(store, []byte{0}),
[]string{"0 2", "0 1", "0 0", "0"})
require.Panics(t, func() { store.Iterator([]byte{}, nil) }, "Iterator(empty key) should panic")
require.Panics(t, func() { store.Iterator(nil, []byte{}) }, "Iterator(empty key) should panic")
require.Panics(t, func() { store.ReverseIterator([]byte{}, nil) }, "Iterator(empty key) should panic")
require.Panics(t, func() { store.ReverseIterator(nil, []byte{}) }, "Iterator(empty key) should panic")
}
func TestCommit(t *testing.T) {
testBasic := func(opts StoreConfig) {
// Sanity test for Merkle hashing
store, err := NewStore(memdb.NewDB(), opts)
require.NoError(t, err)
require.Zero(t, store.LastCommitID())
idNew := store.Commit()
store.Set([]byte{0}, []byte{0})
idOne := store.Commit()
require.Equal(t, idNew.Version+1, idOne.Version)
require.NotEqual(t, idNew.Hash, idOne.Hash)
// Hash of emptied store is same as new store
store.Delete([]byte{0})
idEmptied := store.Commit()
require.Equal(t, idNew.Hash, idEmptied.Hash)
previd := idEmptied
for i := byte(1); i < 5; i++ {
store.Set([]byte{i}, []byte{i})
id := store.Commit()
lastid := store.LastCommitID()
require.Equal(t, id.Hash, lastid.Hash)
require.Equal(t, id.Version, lastid.Version)
require.NotEqual(t, previd.Hash, id.Hash)
require.NotEqual(t, previd.Version, id.Version)
}
}
testBasic(StoreConfig{Pruning: types.PruneNothing})
testBasic(StoreConfig{Pruning: types.PruneNothing, MerkleDB: memdb.NewDB()})
testFailedCommit := func(t *testing.T, store *Store, db dbm.DBConnection) {
opts := store.opts
if db == nil {
db = store.stateDB
}
store.Set([]byte{0}, []byte{0})
require.Panics(t, func() { store.Commit() })
require.NoError(t, store.Close())
versions, _ := db.Versions()
require.Equal(t, 0, versions.Count())
if opts.MerkleDB != nil {
versions, _ = opts.MerkleDB.Versions()
require.Equal(t, 0, versions.Count())
}
store, err := NewStore(db, opts)
require.NoError(t, err)
require.Nil(t, store.Get([]byte{0}))
require.NoError(t, store.Close())
}
// Ensure storage commit is rolled back in each failure case
t.Run("recover after failed Commit", func(t *testing.T) {
store, err := NewStore(
dbRWCommitFails{memdb.NewDB()},
StoreConfig{Pruning: types.PruneNothing})
require.NoError(t, err)
testFailedCommit(t, store, nil)
})
t.Run("recover after failed SaveVersion", func(t *testing.T) {
store, err := NewStore(
dbSaveVersionFails{memdb.NewDB()},
StoreConfig{Pruning: types.PruneNothing})
require.NoError(t, err)
testFailedCommit(t, store, nil)
})
t.Run("recover after failed MerkleDB Commit", func(t *testing.T) {
store, err := NewStore(memdb.NewDB(),
StoreConfig{MerkleDB: dbRWCommitFails{memdb.NewDB()}, Pruning: types.PruneNothing})
require.NoError(t, err)
testFailedCommit(t, store, nil)
})
t.Run("recover after failed MerkleDB SaveVersion", func(t *testing.T) {
store, err := NewStore(memdb.NewDB(),
StoreConfig{MerkleDB: dbSaveVersionFails{memdb.NewDB()}, Pruning: types.PruneNothing})
require.NoError(t, err)
testFailedCommit(t, store, nil)
})
t.Run("recover after stateDB.Versions error triggers failure", func(t *testing.T) {
db := memdb.NewDB()
store, err := NewStore(db, DefaultStoreConfig)
require.NoError(t, err)
store.stateDB = dbVersionsFails{store.stateDB}
testFailedCommit(t, store, db)
})
t.Run("recover after stateTxn.Set error triggers failure", func(t *testing.T) {
store, err := NewStore(memdb.NewDB(), DefaultStoreConfig)
require.NoError(t, err)
store.stateTxn = rwCrudFails{store.stateTxn}
testFailedCommit(t, store, nil)
})
t.Run("stateDB.DeleteVersion error triggers failure", func(t *testing.T) {
store, err := NewStore(memdb.NewDB(), StoreConfig{MerkleDB: memdb.NewDB()})
require.NoError(t, err)
store.merkleTxn = rwCommitFails{store.merkleTxn}
store.stateDB = dbDeleteVersionFails{store.stateDB}
require.Panics(t, func() { store.Commit() })
})
t.Run("height overflow triggers failure", func(t *testing.T) {
store, err := NewStore(memdb.NewDB(),
StoreConfig{InitialVersion: math.MaxInt64, Pruning: types.PruneNothing})
require.NoError(t, err)
require.Equal(t, int64(math.MaxInt64), store.Commit().Version)
require.Panics(t, func() { store.Commit() })
require.Equal(t, int64(math.MaxInt64), store.LastCommitID().Version) // version history not modified
})
// setting initial version
store, err := NewStore(memdb.NewDB(),
StoreConfig{InitialVersion: 5, Pruning: types.PruneNothing, MerkleDB: memdb.NewDB()})
require.NoError(t, err)
require.Equal(t, int64(5), store.Commit().Version)
store, err = NewStore(memdb.NewDB(), StoreConfig{MerkleDB: memdb.NewDB()})
require.NoError(t, err)
store.Commit()
store.stateDB = dbVersionsFails{store.stateDB}
require.Panics(t, func() { store.LastCommitID() })
store, err = NewStore(memdb.NewDB(), StoreConfig{MerkleDB: memdb.NewDB()})
require.NoError(t, err)
store.Commit()
store.stateTxn = rwCrudFails{store.stateTxn}
require.Panics(t, func() { store.LastCommitID() })
}
func sliceToSet(slice []uint64) map[uint64]struct{} {
res := make(map[uint64]struct{})
for _, x := range slice {
res[x] = struct{}{}
}
return res
}
func TestPruning(t *testing.T) {
// Save versions up to 10 and verify pruning at final commit
testCases := []struct {
types.PruningOptions
kept []uint64
}{
{types.PruningOptions{2, 4, 10}, []uint64{4, 8, 9, 10}},
{types.PruningOptions{0, 4, 10}, []uint64{4, 8, 10}},
{types.PruneEverything, []uint64{10}},
{types.PruneNothing, []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}},
}
for tci, tc := range testCases {
dbs := []dbm.DBConnection{memdb.NewDB(), memdb.NewDB()}
store, err := NewStore(dbs[0], StoreConfig{Pruning: tc.PruningOptions, MerkleDB: dbs[1]})
require.NoError(t, err)
for i := byte(1); i <= 10; i++ {
store.Set([]byte{i}, []byte{i})
cid := store.Commit()
latest := uint64(i)
require.Equal(t, latest, uint64(cid.Version))
}
for _, db := range dbs {
versions, err := db.Versions()
require.NoError(t, err)
kept := sliceToSet(tc.kept)
for v := uint64(1); v <= 10; v++ {
_, has := kept[v]
require.Equal(t, has, versions.Exists(v), "Version = %v; tc #%d", v, tci)
}
}
}
// Test pruning interval
// Save up to 20th version while checking history at specific version checkpoints
opts := types.PruningOptions{0, 5, 10}
testCheckPoints := map[uint64][]uint64{
5: []uint64{1, 2, 3, 4, 5},
10: []uint64{5, 10},
15: []uint64{5, 10, 11, 12, 13, 14, 15},
20: []uint64{5, 10, 15, 20},
}
db := memdb.NewDB()
store, err := NewStore(db, StoreConfig{Pruning: opts})
require.NoError(t, err)
for i := byte(1); i <= 20; i++ {
store.Set([]byte{i}, []byte{i})
cid := store.Commit()
latest := uint64(i)
require.Equal(t, latest, uint64(cid.Version))
kept, has := testCheckPoints[latest]
if !has {
continue
}
versions, err := db.Versions()
require.NoError(t, err)
keptMap := sliceToSet(kept)
for v := uint64(1); v <= latest; v++ {
_, has := keptMap[v]
require.Equal(t, has, versions.Exists(v), "Version = %v; tc #%d", v, i)
}
}
}
func TestQuery(t *testing.T) {
store := newStoreWithData(t, memdb.NewDB(), nil)
k1, v1 := []byte("key1"), []byte("val1")
k2, v2 := []byte("key2"), []byte("val2")
v3 := []byte("val3")
ksub := []byte("key")
KVs0 := kv.Pairs{}
KVs1 := kv.Pairs{
Pairs: []kv.Pair{
{Key: k1, Value: v1},
{Key: k2, Value: v2},
},
}
KVs2 := kv.Pairs{
Pairs: []kv.Pair{
{Key: k1, Value: v3},
{Key: k2, Value: v2},
},
}
valExpSubEmpty, err := KVs0.Marshal()
require.NoError(t, err)
valExpSub1, err := KVs1.Marshal()
require.NoError(t, err)
valExpSub2, err := KVs2.Marshal()
require.NoError(t, err)
cid := store.Commit()
ver := cid.Version
query := abci.RequestQuery{Path: "/key", Data: k1, Height: ver}
querySub := abci.RequestQuery{Path: "/subspace", Data: ksub, Height: ver}
// query subspace before anything set
qres := store.Query(querySub)
require.True(t, qres.IsOK())
require.Equal(t, valExpSubEmpty, qres.Value)
// set data
store.Set(k1, v1)
store.Set(k2, v2)
// set data without commit, doesn't show up
qres = store.Query(query)
require.True(t, qres.IsOK())
require.Nil(t, qres.Value)
// commit it, but still don't see on old version
cid = store.Commit()
qres = store.Query(query)
require.True(t, qres.IsOK())
require.Nil(t, qres.Value)
// but yes on the new version
query.Height = cid.Version
qres = store.Query(query)
require.True(t, qres.IsOK())
require.Equal(t, v1, qres.Value)
// and for the subspace
qres = store.Query(querySub)
require.True(t, qres.IsOK())
require.Equal(t, valExpSub1, qres.Value)
// modify
store.Set(k1, v3)
cid = store.Commit()
// query will return old values, as height is fixed
qres = store.Query(query)
require.True(t, qres.IsOK())
require.Equal(t, v1, qres.Value)
// update to latest in the query and we are happy
query.Height = cid.Version
qres = store.Query(query)
require.True(t, qres.IsOK())
require.Equal(t, v3, qres.Value)
query2 := abci.RequestQuery{Path: "/key", Data: k2, Height: cid.Version}
qres = store.Query(query2)
require.True(t, qres.IsOK())
require.Equal(t, v2, qres.Value)
// and for the subspace
qres = store.Query(querySub)
require.True(t, qres.IsOK())
require.Equal(t, valExpSub2, qres.Value)
// default (height 0) will show latest -1
query0 := abci.RequestQuery{Path: "/key", Data: k1}
qres = store.Query(query0)
require.True(t, qres.IsOK())
require.Equal(t, v1, qres.Value)
// querying an empty store will fail
store2, err := NewStore(memdb.NewDB(), DefaultStoreConfig)
require.NoError(t, err)
qres = store2.Query(query0)
require.True(t, qres.IsErr())
// default shows latest, if latest-1 does not exist
store2.Set(k1, v1)
store2.Commit()
qres = store2.Query(query0)
require.True(t, qres.IsOK())
require.Equal(t, v1, qres.Value)
store2.Close()
// artificial error cases for coverage (should never happen with defined usage)
// ensure that height overflow triggers an error
require.NoError(t, err)
store2.stateDB = dbVersionsIs{store2.stateDB, dbm.NewVersionManager([]uint64{uint64(math.MaxInt64) + 1})}
qres = store2.Query(query0)
require.True(t, qres.IsErr())
// failure to access versions triggers an error
store2.stateDB = dbVersionsFails{store.stateDB}
qres = store2.Query(query0)
require.True(t, qres.IsErr())
store2.Close()
// query with a nil or empty key fails
badquery := abci.RequestQuery{Path: "/key", Data: []byte{}}
qres = store.Query(badquery)
require.True(t, qres.IsErr())
badquery.Data = nil
qres = store.Query(badquery)
require.True(t, qres.IsErr())
// querying an invalid height will fail
badquery = abci.RequestQuery{Path: "/key", Data: k1, Height: store.LastCommitID().Version + 1}
qres = store.Query(badquery)
require.True(t, qres.IsErr())
// or an invalid path
badquery = abci.RequestQuery{Path: "/badpath", Data: k1}
qres = store.Query(badquery)
require.True(t, qres.IsErr())
// test that proofs are generated with single and separate DBs
testProve := func() {
queryProve0 := abci.RequestQuery{Path: "/key", Data: k1, Prove: true}
store.Query(queryProve0)
qres = store.Query(queryProve0)
require.True(t, qres.IsOK())
require.Equal(t, v1, qres.Value)
require.NotNil(t, qres.ProofOps)
}
testProve()
store.Close()
store, err = NewStore(memdb.NewDB(), StoreConfig{MerkleDB: memdb.NewDB()})
require.NoError(t, err)
store.Set(k1, v1)
store.Commit()
testProve()
store.Close()
}
type dbDeleteVersionFails struct{ dbm.DBConnection }
type dbRWCommitFails struct{ *memdb.MemDB }
type dbRWCrudFails struct{ dbm.DBConnection }
type dbSaveVersionFails struct{ *memdb.MemDB }
type dbVersionsIs struct {
dbm.DBConnection
vset dbm.VersionSet
}
type dbVersionsFails struct{ dbm.DBConnection }
type rwCommitFails struct{ dbm.DBReadWriter }
type rwCrudFails struct{ dbm.DBReadWriter }
func (dbVersionsFails) Versions() (dbm.VersionSet, error) { return nil, errors.New("dbVersionsFails") }
func (db dbVersionsIs) Versions() (dbm.VersionSet, error) { return db.vset, nil }
func (db dbRWCrudFails) ReadWriter() dbm.DBReadWriter {
return rwCrudFails{db.DBConnection.ReadWriter()}
}
func (dbSaveVersionFails) SaveVersion(uint64) error { return errors.New("dbSaveVersionFails") }
func (dbDeleteVersionFails) DeleteVersion(uint64) error { return errors.New("dbDeleteVersionFails") }
func (tx rwCommitFails) Commit() error {
tx.Discard()
return errors.New("rwCommitFails")
}
func (db dbRWCommitFails) ReadWriter() dbm.DBReadWriter {
return rwCommitFails{db.MemDB.ReadWriter()}
}
func (rwCrudFails) Get([]byte) ([]byte, error) { return nil, errors.New("rwCrudFails.Get") }
func (rwCrudFails) Has([]byte) (bool, error) { return false, errors.New("rwCrudFails.Has") }
func (rwCrudFails) Set([]byte, []byte) error { return errors.New("rwCrudFails.Set") }
func (rwCrudFails) Delete([]byte) error { return errors.New("rwCrudFails.Delete") }