Merge PR #4724: Allow substore migrations upon multistore loading
This commit is contained in:
parent
865d473eb4
commit
1f8cdeed55
|
@ -0,0 +1,4 @@
|
||||||
|
#4724 Multistore supports substore migrations upon load.
|
||||||
|
New `rootmulti.Store.LoadLatestVersionAndUpgrade` method
|
||||||
|
Baseapp supports `StoreLoader` to enable various upgrade strategies
|
||||||
|
No longer panics if the store to load contains substores that we didn't explicitly mount.
|
|
@ -1,8 +1,9 @@
|
||||||
package baseapp
|
package baseapp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
|
@ -20,6 +21,7 @@ import (
|
||||||
|
|
||||||
"github.com/cosmos/cosmos-sdk/codec"
|
"github.com/cosmos/cosmos-sdk/codec"
|
||||||
"github.com/cosmos/cosmos-sdk/store"
|
"github.com/cosmos/cosmos-sdk/store"
|
||||||
|
storetypes "github.com/cosmos/cosmos-sdk/store/types"
|
||||||
sdk "github.com/cosmos/cosmos-sdk/types"
|
sdk "github.com/cosmos/cosmos-sdk/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -41,6 +43,12 @@ const (
|
||||||
MainStoreKey = "main"
|
MainStoreKey = "main"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// StoreLoader defines a customizable function to control how we load the CommitMultiStore
|
||||||
|
// from disk. This is useful for state migration, when loading a datastore written with
|
||||||
|
// an older version of the software. In particular, if a module changed the substore key name
|
||||||
|
// (or removed a substore) between two versions of the software.
|
||||||
|
type StoreLoader func(ms sdk.CommitMultiStore) error
|
||||||
|
|
||||||
// BaseApp reflects the ABCI application implementation.
|
// BaseApp reflects the ABCI application implementation.
|
||||||
type BaseApp struct {
|
type BaseApp struct {
|
||||||
// initialized on creation
|
// initialized on creation
|
||||||
|
@ -48,6 +56,7 @@ type BaseApp struct {
|
||||||
name string // application name from abci.Info
|
name string // application name from abci.Info
|
||||||
db dbm.DB // common DB backend
|
db dbm.DB // common DB backend
|
||||||
cms sdk.CommitMultiStore // Main (uncached) state
|
cms sdk.CommitMultiStore // Main (uncached) state
|
||||||
|
storeLoader StoreLoader // function to handle store loading, may be overridden with SetStoreLoader()
|
||||||
router sdk.Router // handle any kind of message
|
router sdk.Router // handle any kind of message
|
||||||
queryRouter sdk.QueryRouter // router for redirecting query calls
|
queryRouter sdk.QueryRouter // router for redirecting query calls
|
||||||
txDecoder sdk.TxDecoder // unmarshal []byte into sdk.Tx
|
txDecoder sdk.TxDecoder // unmarshal []byte into sdk.Tx
|
||||||
|
@ -106,6 +115,7 @@ func NewBaseApp(
|
||||||
name: name,
|
name: name,
|
||||||
db: db,
|
db: db,
|
||||||
cms: store.NewCommitMultiStore(db),
|
cms: store.NewCommitMultiStore(db),
|
||||||
|
storeLoader: DefaultStoreLoader,
|
||||||
router: NewRouter(),
|
router: NewRouter(),
|
||||||
queryRouter: NewQueryRouter(),
|
queryRouter: NewQueryRouter(),
|
||||||
txDecoder: txDecoder,
|
txDecoder: txDecoder,
|
||||||
|
@ -133,12 +143,6 @@ func (app *BaseApp) Logger() log.Logger {
|
||||||
return app.logger
|
return app.logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetCommitMultiStoreTracer sets the store tracer on the BaseApp's underlying
|
|
||||||
// CommitMultiStore.
|
|
||||||
func (app *BaseApp) SetCommitMultiStoreTracer(w io.Writer) {
|
|
||||||
app.cms.SetTracer(w)
|
|
||||||
}
|
|
||||||
|
|
||||||
// MountStores mounts all IAVL or DB stores to the provided keys in the BaseApp
|
// MountStores mounts all IAVL or DB stores to the provided keys in the BaseApp
|
||||||
// multistore.
|
// multistore.
|
||||||
func (app *BaseApp) MountStores(keys ...sdk.StoreKey) {
|
func (app *BaseApp) MountStores(keys ...sdk.StoreKey) {
|
||||||
|
@ -197,13 +201,74 @@ func (app *BaseApp) MountStore(key sdk.StoreKey, typ sdk.StoreType) {
|
||||||
// LoadLatestVersion loads the latest application version. It will panic if
|
// LoadLatestVersion loads the latest application version. It will panic if
|
||||||
// called more than once on a running BaseApp.
|
// called more than once on a running BaseApp.
|
||||||
func (app *BaseApp) LoadLatestVersion(baseKey *sdk.KVStoreKey) error {
|
func (app *BaseApp) LoadLatestVersion(baseKey *sdk.KVStoreKey) error {
|
||||||
err := app.cms.LoadLatestVersion()
|
err := app.storeLoader(app.cms)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return app.initFromMainStore(baseKey)
|
return app.initFromMainStore(baseKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DefaultStoreLoader will be used by default and loads the latest version
|
||||||
|
func DefaultStoreLoader(ms sdk.CommitMultiStore) error {
|
||||||
|
return ms.LoadLatestVersion()
|
||||||
|
}
|
||||||
|
|
||||||
|
// StoreLoaderWithUpgrade is used to prepare baseapp with a fixed StoreLoader
|
||||||
|
// pattern. This is useful in test cases, or with custom upgrade loading logic.
|
||||||
|
func StoreLoaderWithUpgrade(upgrades *storetypes.StoreUpgrades) StoreLoader {
|
||||||
|
return func(ms sdk.CommitMultiStore) error {
|
||||||
|
return ms.LoadLatestVersionAndUpgrade(upgrades)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpgradeableStoreLoader can be configured by SetStoreLoader() to check for the
|
||||||
|
// existence of a given upgrade file - json encoded StoreUpgrades data.
|
||||||
|
//
|
||||||
|
// If not file is present, it will peform the default load (no upgrades to store).
|
||||||
|
//
|
||||||
|
// If the file is present, it will parse the file and execute those upgrades
|
||||||
|
// (rename or delete stores), while loading the data. It will also delete the
|
||||||
|
// upgrade file upon successful load, so that the upgrade is only applied once,
|
||||||
|
// and not re-applied on next restart
|
||||||
|
//
|
||||||
|
// This is useful for in place migrations when a store key is renamed between
|
||||||
|
// two versions of the software. (TODO: this code will move to x/upgrades
|
||||||
|
// when PR #4233 is merged, here mainly to help test the design)
|
||||||
|
func UpgradeableStoreLoader(upgradeInfoPath string) StoreLoader {
|
||||||
|
return func(ms sdk.CommitMultiStore) error {
|
||||||
|
_, err := os.Stat(upgradeInfoPath)
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return DefaultStoreLoader(ms)
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// there is a migration file, let's execute
|
||||||
|
data, err := ioutil.ReadFile(upgradeInfoPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Cannot read upgrade file %s: %v", upgradeInfoPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var upgrades storetypes.StoreUpgrades
|
||||||
|
err = json.Unmarshal(data, &upgrades)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Cannot parse upgrade file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ms.LoadLatestVersionAndUpgrade(&upgrades)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Load and upgrade database: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we have a successful load, we delete the file
|
||||||
|
err = os.Remove(upgradeInfoPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("deleting upgrade file %s: %v", upgradeInfoPath, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// LoadVersion loads the BaseApp application version. It will panic if called
|
// LoadVersion loads the BaseApp application version. It will panic if called
|
||||||
// more than once on a running baseapp.
|
// more than once on a running baseapp.
|
||||||
func (app *BaseApp) LoadVersion(version int64, baseKey *sdk.KVStoreKey) error {
|
func (app *BaseApp) LoadVersion(version int64, baseKey *sdk.KVStoreKey) error {
|
||||||
|
|
|
@ -4,11 +4,10 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
store "github.com/cosmos/cosmos-sdk/store/types"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
@ -17,6 +16,8 @@ import (
|
||||||
dbm "github.com/tendermint/tm-db"
|
dbm "github.com/tendermint/tm-db"
|
||||||
|
|
||||||
"github.com/cosmos/cosmos-sdk/codec"
|
"github.com/cosmos/cosmos-sdk/codec"
|
||||||
|
"github.com/cosmos/cosmos-sdk/store/rootmulti"
|
||||||
|
store "github.com/cosmos/cosmos-sdk/store/types"
|
||||||
sdk "github.com/cosmos/cosmos-sdk/types"
|
sdk "github.com/cosmos/cosmos-sdk/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -130,6 +131,143 @@ func TestLoadVersion(t *testing.T) {
|
||||||
testLoadVersionHelper(t, app, int64(2), commitID2)
|
testLoadVersionHelper(t, app, int64(2), commitID2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func useDefaultLoader(app *BaseApp) {
|
||||||
|
app.SetStoreLoader(DefaultStoreLoader)
|
||||||
|
}
|
||||||
|
|
||||||
|
func useUpgradeLoader(upgrades *store.StoreUpgrades) func(*BaseApp) {
|
||||||
|
return func(app *BaseApp) {
|
||||||
|
app.SetStoreLoader(StoreLoaderWithUpgrade(upgrades))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func useFileUpgradeLoader(upgradeInfoPath string) func(*BaseApp) {
|
||||||
|
return func(app *BaseApp) {
|
||||||
|
app.SetStoreLoader(UpgradeableStoreLoader(upgradeInfoPath))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func initStore(t *testing.T, db dbm.DB, storeKey string, k, v []byte) {
|
||||||
|
rs := rootmulti.NewStore(db)
|
||||||
|
rs.SetPruning(store.PruneSyncable)
|
||||||
|
key := sdk.NewKVStoreKey(storeKey)
|
||||||
|
rs.MountStoreWithDB(key, store.StoreTypeIAVL, nil)
|
||||||
|
err := rs.LoadLatestVersion()
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Equal(t, int64(0), rs.LastCommitID().Version)
|
||||||
|
|
||||||
|
// write some data in substore
|
||||||
|
kv, _ := rs.GetStore(key).(store.KVStore)
|
||||||
|
require.NotNil(t, kv)
|
||||||
|
kv.Set(k, v)
|
||||||
|
commitID := rs.Commit()
|
||||||
|
require.Equal(t, int64(1), commitID.Version)
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkStore(t *testing.T, db dbm.DB, ver int64, storeKey string, k, v []byte) {
|
||||||
|
rs := rootmulti.NewStore(db)
|
||||||
|
rs.SetPruning(store.PruneSyncable)
|
||||||
|
key := sdk.NewKVStoreKey(storeKey)
|
||||||
|
rs.MountStoreWithDB(key, store.StoreTypeIAVL, nil)
|
||||||
|
err := rs.LoadLatestVersion()
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Equal(t, ver, rs.LastCommitID().Version)
|
||||||
|
|
||||||
|
// query data in substore
|
||||||
|
kv, _ := rs.GetStore(key).(store.KVStore)
|
||||||
|
require.NotNil(t, kv)
|
||||||
|
require.Equal(t, v, kv.Get(k))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that we can make commits and then reload old versions.
|
||||||
|
// Test that LoadLatestVersion actually does.
|
||||||
|
func TestSetLoader(t *testing.T) {
|
||||||
|
// write a renamer to a file
|
||||||
|
f, err := ioutil.TempFile("", "upgrade-*.json")
|
||||||
|
require.NoError(t, err)
|
||||||
|
data := []byte(`{"renamed":[{"old_key": "bnk", "new_key": "banker"}]}`)
|
||||||
|
_, err = f.Write(data)
|
||||||
|
require.NoError(t, err)
|
||||||
|
configName := f.Name()
|
||||||
|
require.NoError(t, f.Close())
|
||||||
|
|
||||||
|
// make sure it exists before running everything
|
||||||
|
_, err = os.Stat(configName)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
cases := map[string]struct {
|
||||||
|
setLoader func(*BaseApp)
|
||||||
|
origStoreKey string
|
||||||
|
loadStoreKey string
|
||||||
|
}{
|
||||||
|
"don't set loader": {
|
||||||
|
origStoreKey: "foo",
|
||||||
|
loadStoreKey: "foo",
|
||||||
|
},
|
||||||
|
"default loader": {
|
||||||
|
setLoader: useDefaultLoader,
|
||||||
|
origStoreKey: "foo",
|
||||||
|
loadStoreKey: "foo",
|
||||||
|
},
|
||||||
|
"rename with inline opts": {
|
||||||
|
setLoader: useUpgradeLoader(&store.StoreUpgrades{
|
||||||
|
Renamed: []store.StoreRename{{
|
||||||
|
OldKey: "foo",
|
||||||
|
NewKey: "bar",
|
||||||
|
}},
|
||||||
|
}),
|
||||||
|
origStoreKey: "foo",
|
||||||
|
loadStoreKey: "bar",
|
||||||
|
},
|
||||||
|
"file loader with missing file": {
|
||||||
|
setLoader: useFileUpgradeLoader(configName + "randomchars"),
|
||||||
|
origStoreKey: "bnk",
|
||||||
|
loadStoreKey: "bnk",
|
||||||
|
},
|
||||||
|
"file loader with existing file": {
|
||||||
|
setLoader: useFileUpgradeLoader(configName),
|
||||||
|
origStoreKey: "bnk",
|
||||||
|
loadStoreKey: "banker",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
k := []byte("key")
|
||||||
|
v := []byte("value")
|
||||||
|
|
||||||
|
for name, tc := range cases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
// prepare a db with some data
|
||||||
|
db := dbm.NewMemDB()
|
||||||
|
initStore(t, db, tc.origStoreKey, k, v)
|
||||||
|
|
||||||
|
// load the app with the existing db
|
||||||
|
opts := []func(*BaseApp){SetPruning(store.PruneSyncable)}
|
||||||
|
if tc.setLoader != nil {
|
||||||
|
opts = append(opts, tc.setLoader)
|
||||||
|
}
|
||||||
|
app := NewBaseApp(t.Name(), defaultLogger(), db, nil, opts...)
|
||||||
|
capKey := sdk.NewKVStoreKey(MainStoreKey)
|
||||||
|
app.MountStores(capKey)
|
||||||
|
app.MountStores(sdk.NewKVStoreKey(tc.loadStoreKey))
|
||||||
|
err := app.LoadLatestVersion(capKey)
|
||||||
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
// "execute" one block
|
||||||
|
app.BeginBlock(abci.RequestBeginBlock{Header: abci.Header{Height: 2}})
|
||||||
|
res := app.Commit()
|
||||||
|
require.NotNil(t, res.Data)
|
||||||
|
|
||||||
|
// check db is properly updated
|
||||||
|
checkStore(t, db, 2, tc.loadStoreKey, k, v)
|
||||||
|
checkStore(t, db, 2, tc.loadStoreKey, []byte("foo"), nil)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure config file was deleted
|
||||||
|
_, err = os.Stat(configName)
|
||||||
|
require.True(t, os.IsNotExist(err))
|
||||||
|
}
|
||||||
|
|
||||||
func TestAppVersionSetterGetter(t *testing.T) {
|
func TestAppVersionSetterGetter(t *testing.T) {
|
||||||
logger := defaultLogger()
|
logger := defaultLogger()
|
||||||
pruningOpt := SetPruning(store.PruneSyncable)
|
pruningOpt := SetPruning(store.PruneSyncable)
|
||||||
|
@ -995,7 +1133,6 @@ func TestMaxBlockGasLimits(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tc := range testCases {
|
for i, tc := range testCases {
|
||||||
fmt.Printf("debug i: %v\n", i)
|
|
||||||
tx := tc.tx
|
tx := tc.tx
|
||||||
|
|
||||||
// reset the block gas
|
// reset the block gas
|
||||||
|
|
|
@ -3,6 +3,7 @@ package baseapp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
dbm "github.com/tendermint/tm-db"
|
dbm "github.com/tendermint/tm-db"
|
||||||
|
|
||||||
|
@ -110,3 +111,17 @@ func (app *BaseApp) SetFauxMerkleMode() {
|
||||||
}
|
}
|
||||||
app.fauxMerkleMode = true
|
app.fauxMerkleMode = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetCommitMultiStoreTracer sets the store tracer on the BaseApp's underlying
|
||||||
|
// CommitMultiStore.
|
||||||
|
func (app *BaseApp) SetCommitMultiStoreTracer(w io.Writer) {
|
||||||
|
app.cms.SetTracer(w)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetStoreLoader allows us to customize the rootMultiStore initialization.
|
||||||
|
func (app *BaseApp) SetStoreLoader(loader StoreLoader) {
|
||||||
|
if app.sealed {
|
||||||
|
panic("SetStoreLoader() on sealed BaseApp")
|
||||||
|
}
|
||||||
|
app.storeLoader = loader
|
||||||
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
|
|
||||||
dbm "github.com/tendermint/tm-db"
|
dbm "github.com/tendermint/tm-db"
|
||||||
|
|
||||||
|
store "github.com/cosmos/cosmos-sdk/store/types"
|
||||||
sdk "github.com/cosmos/cosmos-sdk/types"
|
sdk "github.com/cosmos/cosmos-sdk/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -70,6 +71,14 @@ func (ms multiStore) LoadLatestVersion() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ms multiStore) LoadLatestVersionAndUpgrade(upgrades *store.StoreUpgrades) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ms multiStore) LoadVersionAndUpgrade(ver int64, upgrades *store.StoreUpgrades) error {
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
func (ms multiStore) LoadVersion(ver int64) error {
|
func (ms multiStore) LoadVersion(ver int64) error {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,65 +100,126 @@ func (rs *Store) GetCommitKVStore(key types.StoreKey) types.CommitKVStore {
|
||||||
return rs.stores[key].(types.CommitKVStore)
|
return rs.stores[key].(types.CommitKVStore)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements CommitMultiStore.
|
// LoadLatestVersionAndUpgrade implements CommitMultiStore
|
||||||
func (rs *Store) LoadLatestVersion() error {
|
func (rs *Store) LoadLatestVersionAndUpgrade(upgrades *types.StoreUpgrades) error {
|
||||||
ver := getLatestVersion(rs.db)
|
ver := getLatestVersion(rs.db)
|
||||||
return rs.LoadVersion(ver)
|
return rs.loadVersion(ver, upgrades)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements CommitMultiStore.
|
// LoadVersionAndUpgrade allows us to rename substores while loading an older version
|
||||||
|
func (rs *Store) LoadVersionAndUpgrade(ver int64, upgrades *types.StoreUpgrades) error {
|
||||||
|
return rs.loadVersion(ver, upgrades)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadLatestVersion implements CommitMultiStore.
|
||||||
|
func (rs *Store) LoadLatestVersion() error {
|
||||||
|
ver := getLatestVersion(rs.db)
|
||||||
|
return rs.loadVersion(ver, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadVersion implements CommitMultiStore.
|
||||||
func (rs *Store) LoadVersion(ver int64) error {
|
func (rs *Store) LoadVersion(ver int64) error {
|
||||||
if ver == 0 {
|
return rs.loadVersion(ver, nil)
|
||||||
// Special logic for version 0 where there is no need to get commit
|
}
|
||||||
// information.
|
|
||||||
for key, storeParams := range rs.storesParams {
|
|
||||||
store, err := rs.loadCommitStoreFromParams(key, types.CommitID{}, storeParams)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to load Store: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rs.stores[key] = store
|
func (rs *Store) loadVersion(ver int64, upgrades *types.StoreUpgrades) error {
|
||||||
}
|
infos := make(map[string]storeInfo)
|
||||||
|
var lastCommitID types.CommitID
|
||||||
rs.lastCommitID = types.CommitID{}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// load old data if we are not version 0
|
||||||
|
if ver != 0 {
|
||||||
cInfo, err := getCommitInfo(rs.db, ver)
|
cInfo, err := getCommitInfo(rs.db, ver)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// convert StoreInfos slice to map
|
// convert StoreInfos slice to map
|
||||||
infos := make(map[types.StoreKey]storeInfo)
|
|
||||||
for _, storeInfo := range cInfo.StoreInfos {
|
for _, storeInfo := range cInfo.StoreInfos {
|
||||||
infos[rs.nameToKey(storeInfo.Name)] = storeInfo
|
infos[storeInfo.Name] = storeInfo
|
||||||
|
}
|
||||||
|
lastCommitID = cInfo.CommitID()
|
||||||
}
|
}
|
||||||
|
|
||||||
// load each Store
|
// load each Store (note this doesn't panic on unmounted keys now)
|
||||||
var newStores = make(map[types.StoreKey]types.CommitStore)
|
var newStores = make(map[types.StoreKey]types.CommitStore)
|
||||||
for key, storeParams := range rs.storesParams {
|
for key, storeParams := range rs.storesParams {
|
||||||
var id types.CommitID
|
|
||||||
|
|
||||||
info, ok := infos[key]
|
// Load it
|
||||||
if ok {
|
store, err := rs.loadCommitStoreFromParams(key, rs.getCommitID(infos, key.Name()), storeParams)
|
||||||
id = info.Core.CommitID
|
|
||||||
}
|
|
||||||
|
|
||||||
store, err := rs.loadCommitStoreFromParams(key, id, storeParams)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to load Store: %v", err)
|
return fmt.Errorf("failed to load Store: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
newStores[key] = store
|
newStores[key] = store
|
||||||
|
|
||||||
|
// If it was deleted, remove all data
|
||||||
|
if upgrades.IsDeleted(key.Name()) {
|
||||||
|
if err := deleteKVStore(store.(types.KVStore)); err != nil {
|
||||||
|
return fmt.Errorf("failed to delete store %s: %v", key.Name(), err)
|
||||||
|
}
|
||||||
|
} else if oldName := upgrades.RenamedFrom(key.Name()); oldName != "" {
|
||||||
|
// handle renames specially
|
||||||
|
// make an unregistered key to satify loadCommitStore params
|
||||||
|
oldKey := types.NewKVStoreKey(oldName)
|
||||||
|
oldParams := storeParams
|
||||||
|
oldParams.key = oldKey
|
||||||
|
|
||||||
|
// load from the old name
|
||||||
|
oldStore, err := rs.loadCommitStoreFromParams(oldKey, rs.getCommitID(infos, oldName), oldParams)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to load old Store '%s': %v", oldName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rs.lastCommitID = cInfo.CommitID()
|
// move all data
|
||||||
|
if err := moveKVStoreData(oldStore.(types.KVStore), store.(types.KVStore)); err != nil {
|
||||||
|
return fmt.Errorf("failed to move store %s -> %s: %v", oldName, key.Name(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rs.lastCommitID = lastCommitID
|
||||||
rs.stores = newStores
|
rs.stores = newStores
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rs *Store) getCommitID(infos map[string]storeInfo, name string) types.CommitID {
|
||||||
|
info, ok := infos[name]
|
||||||
|
if !ok {
|
||||||
|
return types.CommitID{}
|
||||||
|
}
|
||||||
|
return info.Core.CommitID
|
||||||
|
}
|
||||||
|
|
||||||
|
func deleteKVStore(kv types.KVStore) error {
|
||||||
|
// Note that we cannot write while iterating, so load all keys here, delete below
|
||||||
|
var keys [][]byte
|
||||||
|
itr := kv.Iterator(nil, nil)
|
||||||
|
for itr.Valid() {
|
||||||
|
keys = append(keys, itr.Key())
|
||||||
|
itr.Next()
|
||||||
|
}
|
||||||
|
itr.Close()
|
||||||
|
|
||||||
|
for _, k := range keys {
|
||||||
|
kv.Delete(k)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// we simulate move by a copy and delete
|
||||||
|
func moveKVStoreData(oldDB types.KVStore, newDB types.KVStore) error {
|
||||||
|
// we read from one and write to another
|
||||||
|
itr := oldDB.Iterator(nil, nil)
|
||||||
|
for itr.Valid() {
|
||||||
|
newDB.Set(itr.Key(), itr.Value())
|
||||||
|
itr.Next()
|
||||||
|
}
|
||||||
|
itr.Close()
|
||||||
|
|
||||||
|
// then delete the old store
|
||||||
|
return deleteKVStore(oldDB)
|
||||||
|
}
|
||||||
|
|
||||||
// SetTracer sets the tracer for the MultiStore that the underlying
|
// SetTracer sets the tracer for the MultiStore that the underlying
|
||||||
// stores will utilize to trace operations. A MultiStore is returned.
|
// stores will utilize to trace operations. A MultiStore is returned.
|
||||||
func (rs *Store) SetTracer(w io.Writer) types.MultiStore {
|
func (rs *Store) SetTracer(w io.Writer) types.MultiStore {
|
||||||
|
@ -380,14 +441,15 @@ func parsePath(path string) (storeName string, subpath string, err errors.Error)
|
||||||
}
|
}
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
// Note: why do we use key and params.key in different places. Seems like there should be only one key used.
|
||||||
func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID, params storeParams) (store types.CommitStore, err error) {
|
func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID, params storeParams) (store types.CommitStore, err error) {
|
||||||
var db dbm.DB
|
var db dbm.DB
|
||||||
|
|
||||||
if params.db != nil {
|
if params.db != nil {
|
||||||
db = dbm.NewPrefixDB(params.db, []byte("s/_/"))
|
db = dbm.NewPrefixDB(params.db, []byte("s/_/"))
|
||||||
} else {
|
} else {
|
||||||
db = dbm.NewPrefixDB(rs.db, []byte("s/k:"+params.key.Name()+"/"))
|
prefix := "s/k:" + params.key.Name() + "/"
|
||||||
|
db = dbm.NewPrefixDB(rs.db, []byte(prefix))
|
||||||
}
|
}
|
||||||
|
|
||||||
switch params.typ {
|
switch params.typ {
|
||||||
|
@ -413,15 +475,6 @@ func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *Store) nameToKey(name string) types.StoreKey {
|
|
||||||
for key := range rs.storesParams {
|
|
||||||
if key.Name() == name {
|
|
||||||
return key
|
|
||||||
}
|
|
||||||
}
|
|
||||||
panic("Unknown name " + name)
|
|
||||||
}
|
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
// storeParams
|
// storeParams
|
||||||
|
|
||||||
|
|
|
@ -156,6 +156,101 @@ func TestMultistoreCommitLoad(t *testing.T) {
|
||||||
checkStore(t, store, commitID, commitID)
|
checkStore(t, store, commitID, commitID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMultistoreLoadWithUpgrade(t *testing.T) {
|
||||||
|
var db dbm.DB = dbm.NewMemDB()
|
||||||
|
store := newMultiStoreWithMounts(db)
|
||||||
|
err := store.LoadLatestVersion()
|
||||||
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
// write some data in all stores
|
||||||
|
k1, v1 := []byte("first"), []byte("store")
|
||||||
|
s1, _ := store.getStoreByName("store1").(types.KVStore)
|
||||||
|
require.NotNil(t, s1)
|
||||||
|
s1.Set(k1, v1)
|
||||||
|
|
||||||
|
k2, v2 := []byte("second"), []byte("restore")
|
||||||
|
s2, _ := store.getStoreByName("store2").(types.KVStore)
|
||||||
|
require.NotNil(t, s2)
|
||||||
|
s2.Set(k2, v2)
|
||||||
|
|
||||||
|
k3, v3 := []byte("third"), []byte("dropped")
|
||||||
|
s3, _ := store.getStoreByName("store3").(types.KVStore)
|
||||||
|
require.NotNil(t, s3)
|
||||||
|
s3.Set(k3, v3)
|
||||||
|
|
||||||
|
// do one commit
|
||||||
|
commitID := store.Commit()
|
||||||
|
expectedCommitID := getExpectedCommitID(store, 1)
|
||||||
|
checkStore(t, store, expectedCommitID, commitID)
|
||||||
|
|
||||||
|
ci, err := getCommitInfo(db, 1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, int64(1), ci.Version)
|
||||||
|
require.Equal(t, 3, len(ci.StoreInfos))
|
||||||
|
checkContains(t, ci.StoreInfos, []string{"store1", "store2", "store3"})
|
||||||
|
|
||||||
|
// Load without changes and make sure it is sensible
|
||||||
|
store = newMultiStoreWithMounts(db)
|
||||||
|
err = store.LoadLatestVersion()
|
||||||
|
require.Nil(t, err)
|
||||||
|
commitID = getExpectedCommitID(store, 1)
|
||||||
|
checkStore(t, store, commitID, commitID)
|
||||||
|
|
||||||
|
// let's query data to see it was saved properly
|
||||||
|
s2, _ = store.getStoreByName("store2").(types.KVStore)
|
||||||
|
require.NotNil(t, s2)
|
||||||
|
require.Equal(t, v2, s2.Get(k2))
|
||||||
|
|
||||||
|
// now, let's load with upgrades...
|
||||||
|
restore, upgrades := newMultiStoreWithModifiedMounts(db)
|
||||||
|
err = restore.LoadLatestVersionAndUpgrade(upgrades)
|
||||||
|
require.Nil(t, err)
|
||||||
|
|
||||||
|
// s1 was not changed
|
||||||
|
s1, _ = restore.getStoreByName("store1").(types.KVStore)
|
||||||
|
require.NotNil(t, s1)
|
||||||
|
require.Equal(t, v1, s1.Get(k1))
|
||||||
|
|
||||||
|
// store3 is mounted, but data deleted are gone
|
||||||
|
s3, _ = restore.getStoreByName("store3").(types.KVStore)
|
||||||
|
require.NotNil(t, s3)
|
||||||
|
require.Nil(t, s3.Get(k3)) // data was deleted
|
||||||
|
|
||||||
|
// store2 is no longer mounted
|
||||||
|
st2 := restore.getStoreByName("store2")
|
||||||
|
require.Nil(t, st2)
|
||||||
|
|
||||||
|
// restore2 has the old data
|
||||||
|
rs2, _ := restore.getStoreByName("restore2").(types.KVStore)
|
||||||
|
require.NotNil(t, rs2)
|
||||||
|
require.Equal(t, v2, rs2.Get(k2))
|
||||||
|
|
||||||
|
// store this migrated data, and load it again without migrations
|
||||||
|
migratedID := restore.Commit()
|
||||||
|
require.Equal(t, migratedID.Version, int64(2))
|
||||||
|
|
||||||
|
reload, _ := newMultiStoreWithModifiedMounts(db)
|
||||||
|
err = reload.LoadLatestVersion()
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Equal(t, migratedID, reload.LastCommitID())
|
||||||
|
|
||||||
|
// query this new store
|
||||||
|
rl1, _ := reload.getStoreByName("store1").(types.KVStore)
|
||||||
|
require.NotNil(t, rl1)
|
||||||
|
require.Equal(t, v1, rl1.Get(k1))
|
||||||
|
|
||||||
|
rl2, _ := reload.getStoreByName("restore2").(types.KVStore)
|
||||||
|
require.NotNil(t, rl2)
|
||||||
|
require.Equal(t, v2, rl2.Get(k2))
|
||||||
|
|
||||||
|
// check commitInfo in storage
|
||||||
|
ci, err = getCommitInfo(db, 2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, int64(2), ci.Version)
|
||||||
|
require.Equal(t, 3, len(ci.StoreInfos), ci.StoreInfos)
|
||||||
|
checkContains(t, ci.StoreInfos, []string{"store1", "restore2", "store3"})
|
||||||
|
}
|
||||||
|
|
||||||
func TestParsePath(t *testing.T) {
|
func TestParsePath(t *testing.T) {
|
||||||
_, _, err := parsePath("foo")
|
_, _, err := parsePath("foo")
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
@ -262,12 +357,52 @@ func newMultiStoreWithMounts(db dbm.DB) *Store {
|
||||||
return store
|
return store
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// store2 -> restore2
|
||||||
|
// store3 dropped data (but mount still there to test)
|
||||||
|
func newMultiStoreWithModifiedMounts(db dbm.DB) (*Store, *types.StoreUpgrades) {
|
||||||
|
store := NewStore(db)
|
||||||
|
store.pruningOpts = types.PruneSyncable
|
||||||
|
store.MountStoreWithDB(
|
||||||
|
types.NewKVStoreKey("store1"), types.StoreTypeIAVL, nil)
|
||||||
|
store.MountStoreWithDB(
|
||||||
|
types.NewKVStoreKey("restore2"), types.StoreTypeIAVL, nil)
|
||||||
|
store.MountStoreWithDB(
|
||||||
|
types.NewKVStoreKey("store3"), types.StoreTypeIAVL, nil)
|
||||||
|
|
||||||
|
upgrades := &types.StoreUpgrades{
|
||||||
|
Renamed: []types.StoreRename{{
|
||||||
|
OldKey: "store2",
|
||||||
|
NewKey: "restore2",
|
||||||
|
}},
|
||||||
|
Deleted: []string{"store3"},
|
||||||
|
}
|
||||||
|
return store, upgrades
|
||||||
|
}
|
||||||
|
|
||||||
func checkStore(t *testing.T, store *Store, expect, got types.CommitID) {
|
func checkStore(t *testing.T, store *Store, expect, got types.CommitID) {
|
||||||
require.Equal(t, expect, got)
|
require.Equal(t, expect, got)
|
||||||
require.Equal(t, expect, store.LastCommitID())
|
require.Equal(t, expect, store.LastCommitID())
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkContains(t testing.TB, info []storeInfo, wanted []string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
for _, want := range wanted {
|
||||||
|
checkHas(t, info, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkHas(t testing.TB, info []storeInfo, want string) {
|
||||||
|
t.Helper()
|
||||||
|
for _, i := range info {
|
||||||
|
if i.Name == want {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.Fatalf("storeInfo doesn't contain %s", want)
|
||||||
|
}
|
||||||
|
|
||||||
func getExpectedCommitID(store *Store, ver int64) types.CommitID {
|
func getExpectedCommitID(store *Store, ver int64) types.CommitID {
|
||||||
return types.CommitID{
|
return types.CommitID{
|
||||||
Version: ver,
|
Version: ver,
|
||||||
|
|
|
@ -38,6 +38,48 @@ type Queryable interface {
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
// MultiStore
|
// MultiStore
|
||||||
|
|
||||||
|
// StoreUpgrades defines a series of transformations to apply the multistore db upon load
|
||||||
|
type StoreUpgrades struct {
|
||||||
|
Renamed []StoreRename `json:"renamed"`
|
||||||
|
Deleted []string `json:"deleted"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// StoreRename defines a name change of a sub-store.
|
||||||
|
// All data previously under a PrefixStore with OldKey will be copied
|
||||||
|
// to a PrefixStore with NewKey, then deleted from OldKey store.
|
||||||
|
type StoreRename struct {
|
||||||
|
OldKey string `json:"old_key"`
|
||||||
|
NewKey string `json:"new_key"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsDeleted returns true if the given key should be deleted
|
||||||
|
func (s *StoreUpgrades) IsDeleted(key string) bool {
|
||||||
|
if s == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for _, d := range s.Deleted {
|
||||||
|
if d == key {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// RenamedFrom returns the oldKey if it was renamed
|
||||||
|
// Returns "" if it was not renamed
|
||||||
|
func (s *StoreUpgrades) RenamedFrom(key string) string {
|
||||||
|
if s == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
for _, re := range s.Renamed {
|
||||||
|
if re.NewKey == key {
|
||||||
|
return re.OldKey
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
type MultiStore interface { //nolint
|
type MultiStore interface { //nolint
|
||||||
Store
|
Store
|
||||||
|
|
||||||
|
@ -94,6 +136,16 @@ type CommitMultiStore interface {
|
||||||
// Mount*Store() are complete.
|
// Mount*Store() are complete.
|
||||||
LoadLatestVersion() error
|
LoadLatestVersion() error
|
||||||
|
|
||||||
|
// LoadLatestVersionAndUpgrade will load the latest version, but also
|
||||||
|
// rename/delete/create sub-store keys, before registering all the keys
|
||||||
|
// in order to handle breaking formats in migrations
|
||||||
|
LoadLatestVersionAndUpgrade(upgrades *StoreUpgrades) error
|
||||||
|
|
||||||
|
// LoadVersionAndUpgrade will load the named version, but also
|
||||||
|
// rename/delete/create sub-store keys, before registering all the keys
|
||||||
|
// in order to handle breaking formats in migrations
|
||||||
|
LoadVersionAndUpgrade(ver int64, upgrades *StoreUpgrades) error
|
||||||
|
|
||||||
// Load a specific persisted version. When you load an old version, or when
|
// Load a specific persisted version. When you load an old version, or when
|
||||||
// the last commit attempt didn't complete, the next commit after loading
|
// the last commit attempt didn't complete, the next commit after loading
|
||||||
// must be idempotent (return the same commit id). Otherwise the behavior is
|
// must be idempotent (return the same commit id). Otherwise the behavior is
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
package types
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStoreUpgrades(t *testing.T) {
|
||||||
|
type toDelete struct {
|
||||||
|
key string
|
||||||
|
delete bool
|
||||||
|
}
|
||||||
|
type toRename struct {
|
||||||
|
newkey string
|
||||||
|
result string
|
||||||
|
}
|
||||||
|
|
||||||
|
cases := map[string]struct {
|
||||||
|
upgrades *StoreUpgrades
|
||||||
|
expectDelete []toDelete
|
||||||
|
expectRename []toRename
|
||||||
|
}{
|
||||||
|
"empty upgrade": {
|
||||||
|
expectDelete: []toDelete{{"foo", false}},
|
||||||
|
expectRename: []toRename{{"foo", ""}},
|
||||||
|
},
|
||||||
|
"simple matches": {
|
||||||
|
upgrades: &StoreUpgrades{
|
||||||
|
Deleted: []string{"foo"},
|
||||||
|
Renamed: []StoreRename{{"bar", "baz"}},
|
||||||
|
},
|
||||||
|
expectDelete: []toDelete{{"foo", true}, {"bar", false}, {"baz", false}},
|
||||||
|
expectRename: []toRename{{"foo", ""}, {"bar", ""}, {"baz", "bar"}},
|
||||||
|
},
|
||||||
|
"many data points": {
|
||||||
|
upgrades: &StoreUpgrades{
|
||||||
|
Deleted: []string{"one", "two", "three", "four", "five"},
|
||||||
|
Renamed: []StoreRename{{"old", "new"}, {"white", "blue"}, {"black", "orange"}, {"fun", "boring"}},
|
||||||
|
},
|
||||||
|
expectDelete: []toDelete{{"four", true}, {"six", false}, {"baz", false}},
|
||||||
|
expectRename: []toRename{{"white", ""}, {"blue", "white"}, {"boring", "fun"}, {"missing", ""}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range cases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
for _, d := range tc.expectDelete {
|
||||||
|
assert.Equal(t, tc.upgrades.IsDeleted(d.key), d.delete)
|
||||||
|
}
|
||||||
|
for _, r := range tc.expectRename {
|
||||||
|
assert.Equal(t, tc.upgrades.RenamedFrom(r.newkey), r.result)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue