Merge PR #4382: Support height queries for queriers

This commit is contained in:
Alexander Bezobchuk 2019-05-28 20:58:33 -04:00 committed by GitHub
parent 61d0f888b7
commit 8b1d75caa2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 309 additions and 31 deletions

View File

@ -0,0 +1,2 @@
#4318 Support height queries. Queries against nodes that have the queried
height pruned will return an error.

View File

@ -536,6 +536,15 @@ func handleQueryCustom(app *BaseApp, path []string, req abci.RequestQuery) (res
app.cms.CacheMultiStore(), app.checkState.ctx.BlockHeader(), true, app.logger,
).WithMinGasPrices(app.minGasPrices)
if req.Height > 0 {
cacheMS, err := app.cms.CacheMultiStoreWithVersion(req.Height)
if err != nil {
return sdk.ErrInternal(fmt.Sprintf("failed to load state at height %d; %s", req.Height, err)).QueryResult()
}
ctx = ctx.WithMultiStore(cacheMS)
}
// Passes the rest of the path as an argument to the querier.
//
// For example, in the path "custom/gov/proposal/test", the gov querier gets

View File

@ -71,7 +71,9 @@ func GetCommands(cmds ...*cobra.Command) []*cobra.Command {
c.Flags().Bool(FlagIndentResponse, false, "Add indent to JSON response")
c.Flags().Bool(FlagTrustNode, false, "Trust connected full node (don't verify proofs for responses)")
c.Flags().Bool(FlagUseLedger, false, "Use a connected Ledger device")
c.Flags().String(FlagNode, "tcp://localhost:26657", "<host>:<port> to tendermint rpc interface for this chain")
c.Flags().String(FlagNode, "tcp://localhost:26657", "<host>:<port> to Tendermint RPC interface for this chain")
c.Flags().Int64(FlagHeight, 0, "Use a specific height to query state at (this can error if the node is pruning state)")
viper.BindPFlag(FlagTrustNode, c.Flags().Lookup(FlagTrustNode))
viper.BindPFlag(FlagUseLedger, c.Flags().Lookup(FlagUseLedger))
viper.BindPFlag(FlagNode, c.Flags().Lookup(FlagNode))

View File

@ -18,6 +18,10 @@ func (ms multiStore) CacheMultiStore() sdk.CacheMultiStore {
panic("not implemented")
}
func (kv multiStore) CacheMultiStoreWithVersion(_ int64) (sdk.CacheMultiStore, error) {
panic("not implemented")
}
func (ms multiStore) CacheWrap() sdk.CacheWrap {
panic("not implemented")
}

View File

@ -124,6 +124,15 @@ func (cms Store) CacheMultiStore() types.CacheMultiStore {
return newCacheMultiStoreFromCMS(cms)
}
// CacheMultiStoreWithVersion implements the MultiStore interface. It will panic
// as an already cached multi-store cannot load previous versions.
//
// TODO: The store implementation can possibly be modified to support this as it
// seems safe to load previous versions (heights).
func (cms Store) CacheMultiStoreWithVersion(_ int64) (types.CacheMultiStore, error) {
panic("cannot cache-wrap cached multi-store with a version")
}
// GetStore returns an underlying Store by key.
func (cms Store) GetStore(key types.StoreKey) types.Store {
return cms.stores[key].(types.Store)

View File

@ -24,12 +24,15 @@ const (
// load the iavl store
func LoadStore(db dbm.DB, id types.CommitID, pruning types.PruningOptions) (types.CommitStore, error) {
tree := iavl.NewMutableTree(db, defaultIAVLCacheSize)
_, err := tree.LoadVersion(id.Version)
if err != nil {
return nil, err
}
iavl := UnsafeNewStore(tree, int64(0), int64(0))
iavl.SetPruning(pruning)
return iavl, nil
}
@ -41,8 +44,7 @@ var _ types.Queryable = (*Store)(nil)
// Store Implements types.KVStore and CommitStore.
type Store struct {
// The underlying tree.
tree *iavl.MutableTree
tree Tree
// How many old versions we hold onto.
// A value of 0 means keep no recent states.
@ -68,6 +70,28 @@ func UnsafeNewStore(tree *iavl.MutableTree, numRecent int64, storeEvery int64) *
return st
}
// GetImmutable returns a reference to a new store backed by an immutable IAVL
// tree at a specific version (height) without any pruning options. This should
// be used for querying and iteration only. If the version does not exist or has
// been pruned, an error will be returned. Any mutable operations executed will
// result in a panic.
func (st *Store) GetImmutable(version int64) (*Store, error) {
if !st.VersionExists(version) {
return nil, iavl.ErrVersionDoesNotExist
}
iTree, err := st.tree.GetImmutable(version)
if err != nil {
return nil, err
}
return &Store{
tree: &immutableTree{iTree},
numRecent: 0,
storeEvery: 0,
}, nil
}
// Implements Committer.
func (st *Store) Commit() types.CommitID {
// Save a new version.
@ -153,16 +177,34 @@ func (st *Store) Delete(key []byte) {
// Implements types.KVStore.
func (st *Store) Iterator(start, end []byte) types.Iterator {
return newIAVLIterator(st.tree.ImmutableTree, start, end, true)
var iTree *iavl.ImmutableTree
switch tree := st.tree.(type) {
case *immutableTree:
iTree = tree.ImmutableTree
case *iavl.MutableTree:
iTree = tree.ImmutableTree
}
return newIAVLIterator(iTree, start, end, true)
}
// Implements types.KVStore.
func (st *Store) ReverseIterator(start, end []byte) types.Iterator {
return newIAVLIterator(st.tree.ImmutableTree, start, end, false)
var iTree *iavl.ImmutableTree
switch tree := st.tree.(type) {
case *immutableTree:
iTree = tree.ImmutableTree
case *iavl.MutableTree:
iTree = tree.ImmutableTree
}
return newIAVLIterator(iTree, start, end, false)
}
// Handle gatest the latest height, if height is 0
func getHeight(tree *iavl.MutableTree, req abci.RequestQuery) int64 {
func getHeight(tree Tree, req abci.RequestQuery) int64 {
height := req.Height
if height == 0 {
latest := tree.Version()

View File

@ -45,6 +45,59 @@ func newAlohaTree(t *testing.T, db dbm.DB) (*iavl.MutableTree, types.CommitID) {
return tree, types.CommitID{ver, hash}
}
func TestGetImmutable(t *testing.T) {
db := dbm.NewMemDB()
tree, cID := newAlohaTree(t, db)
store := UnsafeNewStore(tree, 10, 10)
require.True(t, tree.Set([]byte("hello"), []byte("adios")))
hash, ver, err := tree.SaveVersion()
cID = types.CommitID{ver, hash}
require.Nil(t, err)
_, err = store.GetImmutable(cID.Version + 1)
require.Error(t, err)
newStore, err := store.GetImmutable(cID.Version - 1)
require.NoError(t, err)
require.Equal(t, newStore.Get([]byte("hello")), []byte("goodbye"))
newStore, err = store.GetImmutable(cID.Version)
require.NoError(t, err)
require.Equal(t, newStore.Get([]byte("hello")), []byte("adios"))
res := newStore.Query(abci.RequestQuery{Data: []byte("hello"), Height: cID.Version, Path: "/key", Prove: true})
require.Equal(t, res.Value, []byte("adios"))
require.NotNil(t, res.Proof)
require.Panics(t, func() { newStore.Set(nil, nil) })
require.Panics(t, func() { newStore.Delete(nil) })
require.Panics(t, func() { newStore.Commit() })
}
func TestTestGetImmutableIterator(t *testing.T) {
db := dbm.NewMemDB()
tree, cID := newAlohaTree(t, db)
store := UnsafeNewStore(tree, 10, 10)
newStore, err := store.GetImmutable(cID.Version)
require.NoError(t, err)
iter := newStore.Iterator([]byte("aloha"), []byte("hellz"))
expected := []string{"aloha", "hello"}
var i int
for i = 0; iter.Valid(); iter.Next() {
expectedKey := expected[i]
key, value := iter.Key(), iter.Value()
require.EqualValues(t, key, expectedKey)
require.EqualValues(t, value, treeData[expectedKey])
i++
}
require.Equal(t, len(expected), i)
}
func TestIAVLStoreGetSetHasDelete(t *testing.T) {
db := dbm.NewMemDB()
tree, _ := newAlohaTree(t, db)

84
store/iavl/tree.go Normal file
View File

@ -0,0 +1,84 @@
package iavl
import (
"fmt"
"github.com/tendermint/iavl"
)
var (
_ Tree = (*immutableTree)(nil)
_ Tree = (*iavl.MutableTree)(nil)
)
type (
// Tree defines an interface that both mutable and immutable IAVL trees
// must implement. For mutable IAVL trees, the interface is directly
// implemented by an iavl.MutableTree. For an immutable IAVL tree, a wrapper
// must be made.
Tree interface {
Has(key []byte) bool
Get(key []byte) (index int64, value []byte)
Set(key, value []byte) bool
Remove(key []byte) ([]byte, bool)
SaveVersion() ([]byte, int64, error)
DeleteVersion(version int64) error
Version() int64
Hash() []byte
VersionExists(version int64) bool
GetVersioned(key []byte, version int64) (int64, []byte)
GetVersionedWithProof(key []byte, version int64) ([]byte, *iavl.RangeProof, error)
GetImmutable(version int64) (*iavl.ImmutableTree, error)
}
// immutableTree is a simple wrapper around a reference to an iavl.ImmutableTree
// that implements the Tree interface. It should only be used for querying
// and iteration, specifically at previous heights.
immutableTree struct {
*iavl.ImmutableTree
}
)
func (it *immutableTree) Set(_, _ []byte) bool {
panic("cannot call 'Set' on an immutable IAVL tree")
}
func (it *immutableTree) Remove(_ []byte) ([]byte, bool) {
panic("cannot call 'Remove' on an immutable IAVL tree")
}
func (it *immutableTree) SaveVersion() ([]byte, int64, error) {
panic("cannot call 'SaveVersion' on an immutable IAVL tree")
}
func (it *immutableTree) DeleteVersion(_ int64) error {
panic("cannot call 'DeleteVersion' on an immutable IAVL tree")
}
func (it *immutableTree) VersionExists(version int64) bool {
return it.Version() == version
}
func (it *immutableTree) GetVersioned(key []byte, version int64) (int64, []byte) {
if it.Version() != version {
return -1, nil
}
return it.Get(key)
}
func (it *immutableTree) GetVersionedWithProof(key []byte, version int64) ([]byte, *iavl.RangeProof, error) {
if it.Version() != version {
return nil, nil, fmt.Errorf("version mismatch on immutable IAVL tree; got: %d, expected: %d", version, it.Version())
}
return it.GetWithProof(key)
}
func (it *immutableTree) GetImmutable(version int64) (*iavl.ImmutableTree, error) {
if it.Version() != version {
return nil, fmt.Errorf("version mismatch on immutable IAVL tree; got: %d, expected: %d", version, it.Version())
}
return it.ImmutableTree, nil
}

View File

@ -102,39 +102,38 @@ func (rs *Store) LoadLatestVersion() error {
// Implements CommitMultiStore.
func (rs *Store) LoadVersion(ver int64) error {
// Special logic for version 0
if ver == 0 {
// Special logic for version 0 where there is no need to get commit
// information.
for key, storeParams := range rs.storesParams {
id := types.CommitID{}
store, err := rs.loadCommitStoreFromParams(key, id, storeParams)
store, err := rs.loadCommitStoreFromParams(key, types.CommitID{}, storeParams)
if err != nil {
return fmt.Errorf("failed to load Store: %v", err)
}
rs.stores[key] = store
}
rs.lastCommitID = types.CommitID{}
return nil
}
// Otherwise, version is 1 or greater
// Get commitInfo
cInfo, err := getCommitInfo(rs.db, ver)
if err != nil {
return err
}
// Convert StoreInfos slice to map
// convert StoreInfos slice to map
infos := make(map[types.StoreKey]storeInfo)
for _, storeInfo := range cInfo.StoreInfos {
infos[rs.nameToKey(storeInfo.Name)] = storeInfo
}
// Load each Store
// load each Store
var newStores = make(map[types.StoreKey]types.CommitStore)
for key, storeParams := range rs.storesParams {
var id types.CommitID
info, ok := infos[key]
if ok {
id = info.Core.CommitID
@ -144,12 +143,13 @@ func (rs *Store) LoadVersion(ver int64) error {
if err != nil {
return fmt.Errorf("failed to load Store: %v", err)
}
newStores[key] = store
}
// Success.
rs.lastCommitID = cInfo.CommitID()
rs.stores = newStores
return nil
}
@ -231,9 +231,36 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore {
for k, v := range rs.stores {
stores[k] = v
}
return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext)
}
// CacheMultiStoreWithVersion is analogous to CacheMultiStore except that it
// attempts to load stores at a given version (height). An error is returned if
// any store cannot be loaded. This should only be used for querying and
// iterating at past heights.
func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStore, error) {
cachedStores := make(map[types.StoreKey]types.CacheWrapper)
for key, store := range rs.stores {
switch store.GetStoreType() {
case types.StoreTypeIAVL:
// Attempt to lazy-load an already saved IAVL store version. If the
// version does not exist or is pruned, an error should be returned.
iavlStore, err := store.(*iavl.Store).GetImmutable(version)
if err != nil {
return nil, err
}
cachedStores[key] = iavlStore
default:
cachedStores[key] = store
}
}
return cachemulti.NewStore(rs.db, cachedStores, rs.keysByName, rs.traceWriter, rs.traceContext), nil
}
// Implements MultiStore.
// If the store does not exist, panics.
func (rs *Store) GetStore(key types.StoreKey) types.Store {
@ -292,6 +319,7 @@ func (rs *Store) Query(req abci.RequestQuery) abci.ResponseQuery {
msg := fmt.Sprintf("no such store: %s", storeName)
return errors.ErrUnknownRequest(msg).QueryResult()
}
queryable, ok := store.(types.Queryable)
if !ok {
msg := fmt.Sprintf("store %s doesn't support queries", storeName)
@ -349,30 +377,31 @@ func parsePath(path string) (storeName string, subpath string, err errors.Error)
func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID, params storeParams) (store types.CommitStore, err error) {
var db dbm.DB
if params.db != nil {
db = dbm.NewPrefixDB(params.db, []byte("s/_/"))
} else {
db = dbm.NewPrefixDB(rs.db, []byte("s/k:"+params.key.Name()+"/"))
}
switch params.typ {
case types.StoreTypeMulti:
panic("recursive MultiStores not yet supported")
// TODO: id?
// return NewCommitMultiStore(db, id)
case types.StoreTypeIAVL:
store, err = iavl.LoadStore(db, id, rs.pruningOpts)
return
return iavl.LoadStore(db, id, rs.pruningOpts)
case types.StoreTypeDB:
store = commitDBStoreAdapter{dbadapter.Store{db}}
return
return commitDBStoreAdapter{dbadapter.Store{db}}, nil
case types.StoreTypeTransient:
_, ok := key.(*types.TransientStoreKey)
if !ok {
err = fmt.Errorf("invalid StoreKey for StoreTypeTransient: %s", key.String())
return
return store, fmt.Errorf("invalid StoreKey for StoreTypeTransient: %s", key.String())
}
store = transient.NewStore()
return
return transient.NewStore(), nil
default:
panic(fmt.Sprintf("unrecognized store type %v", params.typ))
}

View File

@ -37,6 +37,46 @@ func TestStoreMount(t *testing.T) {
require.Panics(t, func() { store.MountStoreWithDB(dup1, types.StoreTypeIAVL, db) })
}
func TestCacheMultiStoreWithVersion(t *testing.T) {
var db dbm.DB = dbm.NewMemDB()
if useDebugDB {
db = dbm.NewDebugDB("CMS", db)
}
ms := newMultiStoreWithMounts(db)
err := ms.LoadLatestVersion()
require.Nil(t, err)
commitID := types.CommitID{}
checkStore(t, ms, commitID, commitID)
k, v := []byte("wind"), []byte("blows")
store1 := ms.getStoreByName("store1").(types.KVStore)
store1.Set(k, v)
cID := ms.Commit()
require.Equal(t, int64(1), cID.Version)
// require failure when given an invalid or pruned version
_, err = ms.CacheMultiStoreWithVersion(cID.Version + 1)
require.Error(t, err)
// require a valid version can be cache-loaded
cms, err := ms.CacheMultiStoreWithVersion(cID.Version)
require.NoError(t, err)
// require a valid key lookup yields the correct value
kvStore := cms.GetKVStore(ms.keysByName["store1"])
require.NotNil(t, kvStore)
require.Equal(t, kvStore.Get(k), v)
// require we cannot commit (write) to a cache-versioned multi-store
require.Panics(t, func() {
kvStore.Set(k, []byte("newValue"))
cms.Write()
})
}
func TestMultistoreCommitLoad(t *testing.T) {
var db dbm.DB = dbm.NewMemDB()
if useDebugDB {

View File

@ -46,6 +46,10 @@ type MultiStore interface { //nolint
// call CacheMultiStore.Write().
CacheMultiStore() CacheMultiStore
// CacheMultiStoreWithVersion cache-wraps the underlying MultiStore where
// each stored is loaded at a specific version (height).
CacheMultiStoreWithVersion(version int64) (CacheMultiStore, error)
// Convenience for fetching substores.
// If the store does not exist, panics.
GetStore(StoreKey) Store
@ -86,14 +90,14 @@ type CommitMultiStore interface {
// Panics on a nil key.
GetCommitKVStore(key StoreKey) CommitKVStore
// Load the latest persisted version. Called once after all
// calls to Mount*Store() are complete.
// Load the latest persisted version. Called once after all calls to
// Mount*Store() are complete.
LoadLatestVersion() error
// Load a specific persisted version. When you load an old
// version, or when the last commit attempt didn't complete,
// the next commit after loading must be idempotent (return the
// same commit id). Otherwise the behavior is undefined.
// Load a specific persisted version. When you load an old version, or when
// the last commit attempt didn't complete, the next commit after loading
// must be idempotent (return the same commit id). Otherwise the behavior is
// undefined.
LoadVersion(ver int64) error
}