Merge PR #5579: Fix Restart application issue

This commit is contained in:
Aditya 2020-02-06 12:58:32 -08:00 committed by GitHub
parent 53bf2271d5
commit dba80caec0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 394 additions and 122 deletions

View File

@ -44,11 +44,23 @@ balances or a single balance by denom when the `denom` query parameter is presen
### API Breaking Changes
* (types) [\#5579](https://github.com/cosmos/cosmos-sdk/pull/5579) The `keepRecent` field has been removed from the `PruningOptions` type.
The `PruningOptions` type now only includes fields `KeepEvery` and `SnapshotEvery`, where `KeepEvery`
determines which committed heights are flushed to disk and `SnapshotEvery` determines which of these
heights are kept after pruning. The `IsValid` method should be called whenever using these options. Methods
`SnapshotVersion` and `FlushVersion` accept a version arugment and determine if the version should be
flushed to disk or kept as a snapshot. Note, `KeepRecent` is automatically inferred from the options
and provided directly the IAVL store.
* (modules) [\#5555](https://github.com/cosmos/cosmos-sdk/pull/5555) Move x/auth/client/utils/ types and functions to x/auth/client/.
* (modules) [\#5572](https://github.com/cosmos/cosmos-sdk/pull/5572) Move account balance logic and APIs from `x/auth` to `x/bank`.
### Bug Fixes
* (types) [\#5579](https://github.com/cosmos/cosmos-sdk/pull/5579) The IAVL `Store#Commit` method has been refactored to
delete a flushed version if it is not a snapshot version. The root multi-store now keeps track of `commitInfo` instead
of `types.CommitID`. During `Commit` of the root multi-store, `lastCommitInfo` is updated from the saved state
and is only flushed to disk if it is a snapshot version. During `Query` of the root multi-store, if the request height
is the latest height, we'll use the store's `lastCommitInfo`. Otherwise, we fetch `commitInfo` from disk.
* (x/bank) [\#5531](https://github.com/cosmos/cosmos-sdk/issues/5531) Added missing amount event to MsgMultiSend, emitted for each output.
* (client) [\#5618](https://github.com/cosmos/cosmos-sdk/pull/5618) Fix crash on the client when the verifier is not set.

View File

@ -83,7 +83,7 @@ func TestMountStores(t *testing.T) {
// Test that LoadLatestVersion actually does.
func TestLoadVersion(t *testing.T) {
logger := defaultLogger()
pruningOpt := SetPruning(store.PruneSyncable)
pruningOpt := SetPruning(store.PruneNothing)
db := dbm.NewMemDB()
name := t.Name()
app := NewBaseApp(name, logger, db, nil, pruningOpt)
@ -293,7 +293,7 @@ func TestAppVersionSetterGetter(t *testing.T) {
func TestLoadVersionInvalid(t *testing.T) {
logger := log.NewNopLogger()
pruningOpt := SetPruning(store.PruneSyncable)
pruningOpt := SetPruning(store.PruneNothing)
db := dbm.NewMemDB()
name := t.Name()
app := NewBaseApp(name, logger, db, nil, pruningOpt)
@ -326,6 +326,88 @@ func TestLoadVersionInvalid(t *testing.T) {
require.Error(t, err)
}
func TestLoadVersionPruning(t *testing.T) {
logger := log.NewNopLogger()
pruningOptions := store.PruningOptions{
KeepEvery: 2,
SnapshotEvery: 6,
}
pruningOpt := SetPruning(pruningOptions)
db := dbm.NewMemDB()
name := t.Name()
app := NewBaseApp(name, logger, db, nil, pruningOpt)
// make a cap key and mount the store
capKey := sdk.NewKVStoreKey(MainStoreKey)
app.MountStores(capKey)
err := app.LoadLatestVersion(capKey) // needed to make stores non-nil
require.Nil(t, err)
emptyCommitID := sdk.CommitID{}
// fresh store has zero/empty last commit
lastHeight := app.LastBlockHeight()
lastID := app.LastCommitID()
require.Equal(t, int64(0), lastHeight)
require.Equal(t, emptyCommitID, lastID)
// execute a block
header := abci.Header{Height: 1}
app.BeginBlock(abci.RequestBeginBlock{Header: header})
res := app.Commit()
// execute a block, collect commit ID
header = abci.Header{Height: 2}
app.BeginBlock(abci.RequestBeginBlock{Header: header})
res = app.Commit()
commitID2 := sdk.CommitID{Version: 2, Hash: res.Data}
// execute a block
header = abci.Header{Height: 3}
app.BeginBlock(abci.RequestBeginBlock{Header: header})
res = app.Commit()
commitID3 := sdk.CommitID{Version: 3, Hash: res.Data}
// reload with LoadLatestVersion, check it loads last flushed version
app = NewBaseApp(name, logger, db, nil, pruningOpt)
app.MountStores(capKey)
err = app.LoadLatestVersion(capKey)
require.Nil(t, err)
testLoadVersionHelper(t, app, int64(2), commitID2)
// re-execute block 3 and check it is same CommitID
header = abci.Header{Height: 3}
app.BeginBlock(abci.RequestBeginBlock{Header: header})
res = app.Commit()
recommitID3 := sdk.CommitID{Version: 3, Hash: res.Data}
require.Equal(t, commitID3, recommitID3, "Commits of identical blocks not equal after reload")
// execute a block, collect commit ID
header = abci.Header{Height: 4}
app.BeginBlock(abci.RequestBeginBlock{Header: header})
res = app.Commit()
commitID4 := sdk.CommitID{Version: 4, Hash: res.Data}
// execute a block
header = abci.Header{Height: 5}
app.BeginBlock(abci.RequestBeginBlock{Header: header})
res = app.Commit()
// reload with LoadLatestVersion, check it loads last flushed version
app = NewBaseApp(name, logger, db, nil, pruningOpt)
app.MountStores(capKey)
err = app.LoadLatestVersion(capKey)
require.Nil(t, err)
testLoadVersionHelper(t, app, int64(4), commitID4)
// reload with LoadVersion of previous flushed version
// and check it fails since previous flush should be pruned
app = NewBaseApp(name, logger, db, nil, pruningOpt)
app.MountStores(capKey)
err = app.LoadVersion(2, capKey)
require.NotNil(t, err)
}
func testLoadVersionHelper(t *testing.T, app *BaseApp, expectedHeight int64, expectedID sdk.CommitID) {
lastHeight := app.LastBlockHeight()
lastID := app.LastCommitID()

View File

@ -43,7 +43,7 @@ default, the application will run with Tendermint in process.
Pruning options can be provided via the '--pruning' flag. The options are as follows:
syncable: only those states not needed for state syncing will be deleted (keeps last 100 + every 10000th)
syncable: only those states not needed for state syncing will be deleted (flushes every 100th to disk and keeps every 10000th)
nothing: all historic states will be saved, nothing will be deleted (i.e. archiving node)
everything: all saved states will be deleted, storing only the current state

View File

@ -20,7 +20,7 @@ func TestGetOrSetStoreCache(t *testing.T) {
sKey := types.NewKVStoreKey("test")
tree, err := iavl.NewMutableTree(db, 100)
require.NoError(t, err)
store := iavlstore.UnsafeNewStore(tree)
store := iavlstore.UnsafeNewStore(tree, types.PruneNothing)
store2 := mngr.GetStoreCache(sKey, store)
require.NotNil(t, store2)
@ -34,7 +34,7 @@ func TestUnwrap(t *testing.T) {
sKey := types.NewKVStoreKey("test")
tree, err := iavl.NewMutableTree(db, 100)
require.NoError(t, err)
store := iavlstore.UnsafeNewStore(tree)
store := iavlstore.UnsafeNewStore(tree, types.PruneNothing)
_ = mngr.GetStoreCache(sKey, store)
require.Equal(t, store, mngr.Unwrap(sKey))
@ -48,7 +48,7 @@ func TestStoreCache(t *testing.T) {
sKey := types.NewKVStoreKey("test")
tree, err := iavl.NewMutableTree(db, 100)
require.NoError(t, err)
store := iavlstore.UnsafeNewStore(tree)
store := iavlstore.UnsafeNewStore(tree, types.PruneNothing)
kvStore := mngr.GetStoreCache(sKey, store)
for i := uint(0); i < cache.DefaultCommitKVStoreCacheSize*2; i++ {

View File

@ -1,9 +1,11 @@
package iavl
import (
"fmt"
"io"
"sync"
"github.com/pkg/errors"
"github.com/tendermint/iavl"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/merkle"
@ -29,18 +31,38 @@ var (
// Store Implements types.KVStore and CommitKVStore.
type Store struct {
tree Tree
tree Tree
pruning types.PruningOptions
}
// LoadStore returns an IAVL Store as a CommitKVStore. Internally it will load the
// LoadStore returns an IAVL Store as a CommitKVStore. Internally, it will load the
// store's version (id) from the provided DB. An error is returned if the version
// fails to load.
func LoadStore(db dbm.DB, id types.CommitID, pruning types.PruningOptions, lazyLoading bool) (types.CommitKVStore, error) {
if !pruning.IsValid() {
return nil, fmt.Errorf("pruning options are invalid: %v", pruning)
}
var keepRecent int64
// Determine the value of keepRecent based on the following:
//
// If KeepEvery = 1, keepRecent should be 0 since there is no need to keep
// latest version in a in-memory cache.
//
// If KeepEvery > 1, keepRecent should be 1 so that state changes in between
// flushed states can be saved in the in-memory latest tree.
if pruning.KeepEvery == 1 {
keepRecent = 0
} else {
keepRecent = 1
}
tree, err := iavl.NewMutableTreeWithOpts(
db,
dbm.NewMemDB(),
defaultIAVLCacheSize,
iavl.PruningOptions(pruning.KeepEvery(), pruning.KeepRecent()),
iavl.PruningOptions(pruning.KeepEvery, keepRecent),
)
if err != nil {
return nil, err
@ -56,15 +78,23 @@ func LoadStore(db dbm.DB, id types.CommitID, pruning types.PruningOptions, lazyL
return nil, err
}
return &Store{tree: tree}, nil
return &Store{
tree: tree,
pruning: pruning,
}, nil
}
// UnsafeNewStore returns a reference to a new IAVL Store with a given mutable
// IAVL tree reference.
// IAVL tree reference. It should only be used for testing purposes.
//
// CONTRACT: The IAVL tree should be fully loaded.
func UnsafeNewStore(tree *iavl.MutableTree) *Store {
return &Store{tree: tree}
// CONTRACT: PruningOptions passed in as argument must be the same as pruning options
// passed into iavl.MutableTree
func UnsafeNewStore(tree *iavl.MutableTree, po types.PruningOptions) *Store {
return &Store{
tree: tree,
pruning: po,
}
}
// GetImmutable returns a reference to a new store backed by an immutable IAVL
@ -82,18 +112,36 @@ func (st *Store) GetImmutable(version int64) (*Store, error) {
return nil, err
}
return &Store{tree: &immutableTree{iTree}}, nil
return &Store{
tree: &immutableTree{iTree},
pruning: st.pruning,
}, nil
}
// Implements Committer.
// Commit commits the current store state and returns a CommitID with the new
// version and hash.
func (st *Store) Commit() types.CommitID {
// Save a new version.
hash, version, err := st.tree.SaveVersion()
if err != nil {
// TODO: Do we want to extend Commit to allow returning errors?
panic(err)
}
// If the version we saved got flushed to disk, check if previous flushed
// version should be deleted.
if st.pruning.FlushVersion(version) {
previous := version - st.pruning.KeepEvery
// Previous flushed version should only be pruned if the previous version is
// not a snapshot version OR if snapshotting is disabled (SnapshotEvery == 0).
if previous != 0 && !st.pruning.SnapshotVersion(previous) {
err := st.tree.DeleteVersion(previous)
if errCause := errors.Cause(err); errCause != nil && errCause != iavl.ErrVersionDoesNotExist {
panic(err)
}
}
}
return types.CommitID{
Version: version,
Hash: hash,

View File

@ -52,7 +52,7 @@ func newAlohaTree(t *testing.T, db dbm.DB) (*iavl.MutableTree, types.CommitID) {
func TestGetImmutable(t *testing.T) {
db := dbm.NewMemDB()
tree, cID := newAlohaTree(t, db)
store := UnsafeNewStore(tree)
store := UnsafeNewStore(tree, types.PruneNothing)
require.True(t, tree.Set([]byte("hello"), []byte("adios")))
hash, ver, err := tree.SaveVersion()
@ -82,7 +82,7 @@ func TestGetImmutable(t *testing.T) {
func TestTestGetImmutableIterator(t *testing.T) {
db := dbm.NewMemDB()
tree, cID := newAlohaTree(t, db)
store := UnsafeNewStore(tree)
store := UnsafeNewStore(tree, types.PruneNothing)
newStore, err := store.GetImmutable(cID.Version)
require.NoError(t, err)
@ -105,7 +105,7 @@ func TestTestGetImmutableIterator(t *testing.T) {
func TestIAVLStoreGetSetHasDelete(t *testing.T) {
db := dbm.NewMemDB()
tree, _ := newAlohaTree(t, db)
iavlStore := UnsafeNewStore(tree)
iavlStore := UnsafeNewStore(tree, types.PruneNothing)
key := "hello"
@ -130,14 +130,14 @@ func TestIAVLStoreGetSetHasDelete(t *testing.T) {
func TestIAVLStoreNoNilSet(t *testing.T) {
db := dbm.NewMemDB()
tree, _ := newAlohaTree(t, db)
iavlStore := UnsafeNewStore(tree)
iavlStore := UnsafeNewStore(tree, types.PruneNothing)
require.Panics(t, func() { iavlStore.Set([]byte("key"), nil) }, "setting a nil value should panic")
}
func TestIAVLIterator(t *testing.T) {
db := dbm.NewMemDB()
tree, _ := newAlohaTree(t, db)
iavlStore := UnsafeNewStore(tree)
iavlStore := UnsafeNewStore(tree, types.PruneNothing)
iter := iavlStore.Iterator([]byte("aloha"), []byte("hellz"))
expected := []string{"aloha", "hello"}
var i int
@ -213,7 +213,7 @@ func TestIAVLReverseIterator(t *testing.T) {
tree, err := iavl.NewMutableTree(db, cacheSize)
require.NoError(t, err)
iavlStore := UnsafeNewStore(tree)
iavlStore := UnsafeNewStore(tree, types.PruneNothing)
iavlStore.Set([]byte{0x00}, []byte("0"))
iavlStore.Set([]byte{0x00, 0x00}, []byte("0 0"))
@ -246,7 +246,7 @@ func TestIAVLPrefixIterator(t *testing.T) {
tree, err := iavl.NewMutableTree(db, cacheSize)
require.NoError(t, err)
iavlStore := UnsafeNewStore(tree)
iavlStore := UnsafeNewStore(tree, types.PruneNothing)
iavlStore.Set([]byte("test1"), []byte("test1"))
iavlStore.Set([]byte("test2"), []byte("test2"))
@ -310,7 +310,7 @@ func TestIAVLReversePrefixIterator(t *testing.T) {
tree, err := iavl.NewMutableTree(db, cacheSize)
require.NoError(t, err)
iavlStore := UnsafeNewStore(tree)
iavlStore := UnsafeNewStore(tree, types.PruneNothing)
iavlStore.Set([]byte("test1"), []byte("test1"))
iavlStore.Set([]byte("test2"), []byte("test2"))
@ -375,7 +375,7 @@ func nextVersion(iavl *Store) {
func TestIAVLDefaultPruning(t *testing.T) {
//Expected stored / deleted version numbers for:
//numRecent = 5, storeEvery = 3
//numRecent = 5, storeEvery = 3, snapshotEvery = 5
var states = []pruneState{
{[]int64{}, []int64{}},
{[]int64{1}, []int64{}},
@ -383,23 +383,23 @@ func TestIAVLDefaultPruning(t *testing.T) {
{[]int64{1, 2, 3}, []int64{}},
{[]int64{1, 2, 3, 4}, []int64{}},
{[]int64{1, 2, 3, 4, 5}, []int64{}},
{[]int64{2, 3, 4, 5, 6}, []int64{1}},
{[]int64{3, 4, 5, 6, 7}, []int64{1, 2}},
{[]int64{3, 4, 5, 6, 7, 8}, []int64{1, 2}},
{[]int64{3, 5, 6, 7, 8, 9}, []int64{1, 2, 4}},
{[]int64{3, 6, 7, 8, 9, 10}, []int64{1, 2, 4, 5}},
{[]int64{3, 6, 7, 8, 9, 10, 11}, []int64{1, 2, 4, 5}},
{[]int64{3, 6, 8, 9, 10, 11, 12}, []int64{1, 2, 4, 5, 7}},
{[]int64{3, 6, 9, 10, 11, 12, 13}, []int64{1, 2, 4, 5, 7, 8}},
{[]int64{3, 6, 9, 10, 11, 12, 13, 14}, []int64{1, 2, 4, 5, 7, 8}},
{[]int64{3, 6, 9, 11, 12, 13, 14, 15}, []int64{1, 2, 4, 5, 7, 8, 10}},
{[]int64{2, 4, 5, 6}, []int64{1, 3}},
{[]int64{4, 5, 6, 7}, []int64{1, 2, 3}},
{[]int64{4, 5, 6, 7, 8}, []int64{1, 2, 3}},
{[]int64{5, 6, 7, 8, 9}, []int64{1, 2, 3, 4}},
{[]int64{6, 7, 8, 9, 10}, []int64{1, 2, 3, 4, 5}},
{[]int64{6, 7, 8, 9, 10, 11}, []int64{1, 2, 3, 4, 5}},
{[]int64{6, 8, 10, 11, 12}, []int64{1, 2, 3, 4, 5, 7, 9}},
{[]int64{6, 10, 11, 12, 13}, []int64{1, 2, 3, 4, 5, 7, 8, 9}},
{[]int64{6, 10, 11, 12, 13, 14}, []int64{1, 2, 3, 4, 5, 7, 8, 9}},
{[]int64{6, 11, 12, 13, 14, 15}, []int64{1, 2, 3, 4, 5, 7, 8, 9, 10}},
}
testPruning(t, int64(5), int64(3), states)
testPruning(t, int64(5), int64(3), int64(6), states)
}
func TestIAVLAlternativePruning(t *testing.T) {
//Expected stored / deleted version numbers for:
//numRecent = 3, storeEvery = 5
//numRecent = 3, storeEvery = 5, snapshotEvery = 10
var states = []pruneState{
{[]int64{}, []int64{}},
{[]int64{1}, []int64{}},
@ -411,14 +411,14 @@ func TestIAVLAlternativePruning(t *testing.T) {
{[]int64{5, 6, 7}, []int64{1, 2, 3, 4}},
{[]int64{5, 6, 7, 8}, []int64{1, 2, 3, 4}},
{[]int64{5, 7, 8, 9}, []int64{1, 2, 3, 4, 6}},
{[]int64{5, 8, 9, 10}, []int64{1, 2, 3, 4, 6, 7}},
{[]int64{5, 9, 10, 11}, []int64{1, 2, 3, 4, 6, 7, 8}},
{[]int64{5, 10, 11, 12}, []int64{1, 2, 3, 4, 6, 7, 8, 9}},
{[]int64{5, 10, 11, 12, 13}, []int64{1, 2, 3, 4, 6, 7, 8, 9}},
{[]int64{5, 10, 12, 13, 14}, []int64{1, 2, 3, 4, 6, 7, 8, 9, 11}},
{[]int64{5, 10, 13, 14, 15}, []int64{1, 2, 3, 4, 6, 7, 8, 9, 11, 12}},
{[]int64{8, 9, 10}, []int64{1, 2, 3, 4, 6, 7}},
{[]int64{9, 10, 11}, []int64{1, 2, 3, 4, 6, 7, 8}},
{[]int64{10, 11, 12}, []int64{1, 2, 3, 4, 5, 6, 7, 8, 9}},
{[]int64{10, 11, 12, 13}, []int64{1, 2, 3, 4, 5, 6, 7, 8, 9}},
{[]int64{10, 12, 13, 14}, []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 11}},
{[]int64{10, 13, 14, 15}, []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12}},
}
testPruning(t, int64(3), int64(5), states)
testPruning(t, int64(3), int64(5), int64(10), states)
}
type pruneState struct {
@ -426,26 +426,30 @@ type pruneState struct {
deleted []int64
}
func testPruning(t *testing.T, numRecent int64, storeEvery int64, states []pruneState) {
func testPruning(t *testing.T, numRecent int64, storeEvery int64, snapshotEvery int64, states []pruneState) {
db := dbm.NewMemDB()
pruningOpts := types.PruningOptions{
KeepEvery: storeEvery,
SnapshotEvery: snapshotEvery,
}
iavlOpts := iavl.PruningOptions(storeEvery, numRecent)
tree, err := iavl.NewMutableTreeWithOpts(db, dbm.NewMemDB(), cacheSize, iavlOpts)
require.NoError(t, err)
iavlStore := UnsafeNewStore(tree)
iavlStore := UnsafeNewStore(tree, pruningOpts)
for step, state := range states {
for _, ver := range state.stored {
require.True(t, iavlStore.VersionExists(ver),
"missing version %d with latest version %d; should save last %d and every %d",
ver, step, numRecent, storeEvery)
"missing version %d with latest version %d; should save last %d, store every %d, and snapshot every %d",
ver, step, numRecent, storeEvery, snapshotEvery)
}
for _, ver := range state.deleted {
require.False(t, iavlStore.VersionExists(ver),
"not pruned version %d with latest version %d; should prune all but last %d and every %d",
ver, step, numRecent, storeEvery)
"not pruned version %d with latest version %d; should prune all but last %d and every %d with intermediate flush interval %d",
ver, step, numRecent, snapshotEvery, storeEvery)
}
nextVersion(iavlStore)
@ -457,7 +461,7 @@ func TestIAVLNoPrune(t *testing.T) {
tree, err := iavl.NewMutableTree(db, cacheSize)
require.NoError(t, err)
iavlStore := UnsafeNewStore(tree)
iavlStore := UnsafeNewStore(tree, types.PruneNothing)
nextVersion(iavlStore)
for i := 1; i < 100; i++ {
@ -478,7 +482,7 @@ func TestIAVLPruneEverything(t *testing.T) {
tree, err := iavl.NewMutableTreeWithOpts(db, dbm.NewMemDB(), cacheSize, iavlOpts)
require.NoError(t, err)
iavlStore := UnsafeNewStore(tree)
iavlStore := UnsafeNewStore(tree, types.PruneEverything)
nextVersion(iavlStore)
for i := 1; i < 100; i++ {
@ -501,7 +505,7 @@ func TestIAVLStoreQuery(t *testing.T) {
tree, err := iavl.NewMutableTree(db, cacheSize)
require.NoError(t, err)
iavlStore := UnsafeNewStore(tree)
iavlStore := UnsafeNewStore(tree, types.PruneNothing)
k1, v1 := []byte("key1"), []byte("val1")
k2, v2 := []byte("key2"), []byte("val2")
@ -600,7 +604,7 @@ func BenchmarkIAVLIteratorNext(b *testing.B) {
tree.Set(key, value)
}
iavlStore := UnsafeNewStore(tree)
iavlStore := UnsafeNewStore(tree, types.PruneNothing)
iterators := make([]types.Iterator, b.N/treeSize)
for i := 0; i < len(iterators); i++ {

View File

@ -88,7 +88,7 @@ func TestIAVLStorePrefix(t *testing.T) {
db := dbm.NewMemDB()
tree, err := tiavl.NewMutableTree(db, cacheSize)
require.NoError(t, err)
iavlStore := iavl.UnsafeNewStore(tree)
iavlStore := iavl.UnsafeNewStore(tree, types.PruneNothing)
testPrefixStore(t, iavlStore, []byte("test"))
}

View File

@ -28,13 +28,13 @@ const (
// cacheMultiStore which is for cache-wrapping other MultiStores. It implements
// the CommitMultiStore interface.
type Store struct {
db dbm.DB
lastCommitID types.CommitID
pruningOpts types.PruningOptions
storesParams map[types.StoreKey]storeParams
stores map[types.StoreKey]types.CommitKVStore
keysByName map[string]types.StoreKey
lazyLoading bool
db dbm.DB
lastCommitInfo commitInfo
pruningOpts types.PruningOptions
storesParams map[types.StoreKey]storeParams
stores map[types.StoreKey]types.CommitKVStore
keysByName map[string]types.StoreKey
lazyLoading bool
traceWriter io.Writer
traceContext types.TraceContext
@ -146,11 +146,12 @@ func (rs *Store) LoadVersion(ver int64) error {
func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error {
infos := make(map[string]storeInfo)
var lastCommitID types.CommitID
var cInfo commitInfo
// load old data if we are not version 0
if ver != 0 {
cInfo, err := getCommitInfo(rs.db, ver)
var err error
cInfo, err = getCommitInfo(rs.db, ver)
if err != nil {
return err
}
@ -159,7 +160,6 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error {
for _, storeInfo := range cInfo.StoreInfos {
infos[storeInfo.Name] = storeInfo
}
lastCommitID = cInfo.CommitID()
}
// load each Store (note this doesn't panic on unmounted keys now)
@ -197,7 +197,7 @@ func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error {
}
}
rs.lastCommitID = lastCommitID
rs.lastCommitInfo = cInfo
rs.stores = newStores
return nil
@ -281,29 +281,26 @@ func (rs *Store) TracingEnabled() bool {
// Implements Committer/CommitStore.
func (rs *Store) LastCommitID() types.CommitID {
return rs.lastCommitID
return rs.lastCommitInfo.CommitID()
}
// Implements Committer/CommitStore.
func (rs *Store) Commit() types.CommitID {
// Commit stores.
version := rs.lastCommitID.Version + 1
commitInfo := commitStores(version, rs.stores)
version := rs.lastCommitInfo.Version + 1
rs.lastCommitInfo = commitStores(version, rs.stores)
// Need to update atomically.
batch := rs.db.NewBatch()
defer batch.Close()
setCommitInfo(batch, version, commitInfo)
setLatestVersion(batch, version)
batch.Write()
// write CommitInfo to disk only if this version was flushed to disk
if rs.pruningOpts.FlushVersion(version) {
flushCommitInfo(rs.db, version, rs.lastCommitInfo)
}
// Prepare for next version.
commitID := types.CommitID{
Version: version,
Hash: commitInfo.Hash(),
Hash: rs.lastCommitInfo.Hash(),
}
rs.lastCommitID = commitID
return commitID
}
@ -412,7 +409,6 @@ func (rs *Store) getStoreByName(name string) types.Store {
// Ie. `req.Path` here is `/<substore>/<path>`, and trimmed to `/<path>` for the substore.
// TODO: add proof for `multistore -> substore`.
func (rs *Store) Query(req abci.RequestQuery) abci.ResponseQuery {
// Query just routes this to a substore.
path := req.Path
storeName, subpath, err := parsePath(path)
if err != nil {
@ -441,9 +437,18 @@ func (rs *Store) Query(req abci.RequestQuery) abci.ResponseQuery {
return sdkerrors.QueryResult(sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "proof is unexpectedly empty; ensure height has not been pruned"))
}
commitInfo, errMsg := getCommitInfo(rs.db, res.Height)
if errMsg != nil {
return sdkerrors.QueryResult(err)
// If the request's height is the latest height we've committed, then utilize
// the store's lastCommitInfo as this commit info may not be flushed to disk.
// Otherwise, we query for the commit info from disk.
var commitInfo commitInfo
if res.Height == rs.lastCommitInfo.Version {
commitInfo = rs.lastCommitInfo
} else {
commitInfo, err = getCommitInfo(rs.db, res.Height)
if err != nil {
return sdkerrors.QueryResult(err)
}
}
// Restore origin path and append proof op.
@ -626,26 +631,22 @@ func commitStores(version int64, storeMap map[types.StoreKey]types.CommitKVStore
storeInfos := make([]storeInfo, 0, len(storeMap))
for key, store := range storeMap {
// Commit
commitID := store.Commit()
if store.GetStoreType() == types.StoreTypeTransient {
continue
}
// Record CommitID
si := storeInfo{}
si.Name = key.Name()
si.Core.CommitID = commitID
// si.Core.StoreType = store.GetStoreType()
storeInfos = append(storeInfos, si)
}
ci := commitInfo{
return commitInfo{
Version: version,
StoreInfos: storeInfos,
}
return ci
}
// Gets commitInfo from disk.
@ -676,3 +677,14 @@ func setCommitInfo(batch dbm.Batch, version int64, cInfo commitInfo) {
cInfoKey := fmt.Sprintf(commitInfoKeyFmt, version)
batch.Set([]byte(cInfoKey), cInfoBytes)
}
// flushCommitInfo flushes a commitInfo for given version to the DB. Note, this
// needs to happen atomically.
func flushCommitInfo(db dbm.DB, version int64, cInfo commitInfo) {
batch := db.NewBatch()
defer batch.Close()
setCommitInfo(batch, version, cInfo)
setLatestVersion(batch, version)
batch.Write()
}

View File

@ -1,6 +1,7 @@
package rootmulti
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
@ -53,7 +54,7 @@ func TestStoreMount(t *testing.T) {
func TestCacheMultiStoreWithVersion(t *testing.T) {
var db dbm.DB = dbm.NewMemDB()
ms := newMultiStoreWithMounts(db, types.PruneSyncable)
ms := newMultiStoreWithMounts(db, types.PruneNothing)
err := ms.LoadLatestVersion()
require.Nil(t, err)
@ -90,7 +91,7 @@ func TestCacheMultiStoreWithVersion(t *testing.T) {
func TestHashStableWithEmptyCommit(t *testing.T) {
var db dbm.DB = dbm.NewMemDB()
ms := newMultiStoreWithMounts(db, types.PruneSyncable)
ms := newMultiStoreWithMounts(db, types.PruneNothing)
err := ms.LoadLatestVersion()
require.Nil(t, err)
@ -114,7 +115,7 @@ func TestHashStableWithEmptyCommit(t *testing.T) {
func TestMultistoreCommitLoad(t *testing.T) {
var db dbm.DB = dbm.NewMemDB()
store := newMultiStoreWithMounts(db, types.PruneSyncable)
store := newMultiStoreWithMounts(db, types.PruneNothing)
err := store.LoadLatestVersion()
require.Nil(t, err)
@ -139,7 +140,7 @@ func TestMultistoreCommitLoad(t *testing.T) {
}
// Load the latest multistore again and check version.
store = newMultiStoreWithMounts(db, types.PruneSyncable)
store = newMultiStoreWithMounts(db, types.PruneNothing)
err = store.LoadLatestVersion()
require.Nil(t, err)
commitID = getExpectedCommitID(store, nCommits)
@ -152,7 +153,7 @@ func TestMultistoreCommitLoad(t *testing.T) {
// Load an older multistore and check version.
ver := nCommits - 1
store = newMultiStoreWithMounts(db, types.PruneSyncable)
store = newMultiStoreWithMounts(db, types.PruneNothing)
err = store.LoadVersion(ver)
require.Nil(t, err)
commitID = getExpectedCommitID(store, ver)
@ -289,6 +290,88 @@ func TestParsePath(t *testing.T) {
}
func TestMultiStoreRestart(t *testing.T) {
db := dbm.NewMemDB()
pruning := types.PruningOptions{
KeepEvery: 3,
SnapshotEvery: 6,
}
multi := newMultiStoreWithMounts(db, pruning)
err := multi.LoadLatestVersion()
require.Nil(t, err)
initCid := multi.LastCommitID()
k, v := "wind", "blows"
k2, v2 := "water", "flows"
k3, v3 := "fire", "burns"
for i := 1; i < 3; i++ {
// Set and commit data in one store.
store1 := multi.getStoreByName("store1").(types.KVStore)
store1.Set([]byte(k), []byte(fmt.Sprintf("%s:%d", v, i)))
// ... and another.
store2 := multi.getStoreByName("store2").(types.KVStore)
store2.Set([]byte(k2), []byte(fmt.Sprintf("%s:%d", v2, i)))
// ... and another.
store3 := multi.getStoreByName("store3").(types.KVStore)
store3.Set([]byte(k3), []byte(fmt.Sprintf("%s:%d", v3, i)))
multi.Commit()
cinfo, err := getCommitInfo(multi.db, int64(i))
require.NotNil(t, err)
require.Equal(t, commitInfo{}, cinfo)
}
// Set and commit data in one store.
store1 := multi.getStoreByName("store1").(types.KVStore)
store1.Set([]byte(k), []byte(fmt.Sprintf("%s:%d", v, 3)))
// ... and another.
store2 := multi.getStoreByName("store2").(types.KVStore)
store2.Set([]byte(k2), []byte(fmt.Sprintf("%s:%d", v2, 3)))
multi.Commit()
flushedCinfo, err := getCommitInfo(multi.db, 3)
require.Nil(t, err)
require.NotEqual(t, initCid, flushedCinfo, "CID is different after flush to disk")
// ... and another.
store3 := multi.getStoreByName("store3").(types.KVStore)
store3.Set([]byte(k3), []byte(fmt.Sprintf("%s:%d", v3, 3)))
multi.Commit()
postFlushCinfo, err := getCommitInfo(multi.db, 4)
require.NotNil(t, err)
require.Equal(t, commitInfo{}, postFlushCinfo, "Commit changed after in-memory commit")
multi = newMultiStoreWithMounts(db, pruning)
err = multi.LoadLatestVersion()
require.Nil(t, err)
reloadedCid := multi.LastCommitID()
require.Equal(t, flushedCinfo.CommitID(), reloadedCid, "Reloaded CID is not the same as last flushed CID")
// Check that store1 and store2 retained date from 3rd commit
store1 = multi.getStoreByName("store1").(types.KVStore)
val := store1.Get([]byte(k))
require.Equal(t, []byte(fmt.Sprintf("%s:%d", v, 3)), val, "Reloaded value not the same as last flushed value")
store2 = multi.getStoreByName("store2").(types.KVStore)
val2 := store2.Get([]byte(k2))
require.Equal(t, []byte(fmt.Sprintf("%s:%d", v2, 3)), val2, "Reloaded value not the same as last flushed value")
// Check that store3 still has data from last commit even though update happened on 2nd commit
store3 = multi.getStoreByName("store3").(types.KVStore)
val3 := store3.Get([]byte(k3))
require.Equal(t, []byte(fmt.Sprintf("%s:%d", v3, 2)), val3, "Reloaded value not the same as last flushed value")
}
func TestMultiStoreQuery(t *testing.T) {
db := dbm.NewMemDB()
multi := newMultiStoreWithMounts(db, types.PruneNothing)

View File

@ -1,35 +1,66 @@
package types
// PruningStrategy specifies how old states will be deleted over time where
// keepRecent can be used with keepEvery to create a pruning "strategy".
type PruningOptions struct {
keepRecent int64
keepEvery int64
}
func NewPruningOptions(keepRecent, keepEvery int64) PruningOptions {
return PruningOptions{
keepRecent: keepRecent,
keepEvery: keepEvery,
}
}
// How much recent state will be kept. Older state will be deleted.
func (po PruningOptions) KeepRecent() int64 {
return po.keepRecent
}
// Keeps every N stated, deleting others.
func (po PruningOptions) KeepEvery() int64 {
return po.keepEvery
}
// default pruning strategies
var (
// PruneEverything means all saved states will be deleted, storing only the current state
PruneEverything = NewPruningOptions(1, 0)
// PruneNothing means all historic states will be saved, nothing will be deleted
PruneNothing = NewPruningOptions(0, 1)
// PruneSyncable means only those states not needed for state syncing will be deleted (keeps last 100 + every 10000th)
PruneSyncable = NewPruningOptions(100, 10000)
// PruneEverything defines a pruning strategy where all committed states will
// be deleted, persisting only the current state.
PruneEverything = PruningOptions{
KeepEvery: 1,
SnapshotEvery: 0,
}
// PruneNothing defines a pruning strategy where all committed states will be
// kept on disk, i.e. no states will be pruned.
PruneNothing = PruningOptions{
KeepEvery: 1,
SnapshotEvery: 1,
}
// PruneSyncable defines a pruning strategy where only those states not needed
// for state syncing will be pruned. It flushes every 100th state to disk and
// keeps every 10000th.
PruneSyncable = PruningOptions{
KeepEvery: 100,
SnapshotEvery: 10000,
}
)
// PruningOptions defines the specific pruning strategy every store in a multi-store
// will use when committing state, where keepEvery determines which committed
// heights are flushed to disk and snapshotEvery determines which of these heights
// are kept after pruning.
type PruningOptions struct {
KeepEvery int64
SnapshotEvery int64
}
// IsValid verifies if the pruning options are valid. It returns false if invalid
// and true otherwise. Pruning options are considered valid iff:
//
// - KeepEvery > 0
// - SnapshotEvery >= 0
// - SnapshotEvery % KeepEvery = 0
func (po PruningOptions) IsValid() bool {
// must flush at positive block interval
if po.KeepEvery <= 0 {
return false
}
// cannot snapshot negative intervals
if po.SnapshotEvery < 0 {
return false
}
return po.SnapshotEvery%po.KeepEvery == 0
}
// FlushVersion returns a boolean signaling if the provided version/height should
// be flushed to disk.
func (po PruningOptions) FlushVersion(ver int64) bool {
return po.KeepEvery != 0 && ver%po.KeepEvery == 0
}
// SnapshotVersion returns a boolean signaling if the provided version/height
// should be snapshotted (kept on disk).
func (po PruningOptions) SnapshotVersion(ver int64) bool {
return po.SnapshotEvery != 0 && ver%po.SnapshotEvery == 0
}