cosmos-sdk/store/v2/multi/store.go

914 lines
26 KiB
Go

package multi
import (
"errors"
"fmt"
"io"
"math"
"strings"
"sync"
abci "github.com/tendermint/tendermint/abci/types"
dbm "github.com/cosmos/cosmos-sdk/db"
prefixdb "github.com/cosmos/cosmos-sdk/db/prefix"
util "github.com/cosmos/cosmos-sdk/internal"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
sdkmaps "github.com/cosmos/cosmos-sdk/store/internal/maps"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/prefix"
"github.com/cosmos/cosmos-sdk/store/tracekv"
types "github.com/cosmos/cosmos-sdk/store/v2"
"github.com/cosmos/cosmos-sdk/store/v2/mem"
"github.com/cosmos/cosmos-sdk/store/v2/smt"
"github.com/cosmos/cosmos-sdk/store/v2/transient"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/types/kv"
protoio "github.com/gogo/protobuf/io"
)
var (
_ types.Queryable = (*Store)(nil)
_ types.CommitMultiStore = (*Store)(nil)
_ types.CacheMultiStore = (*cacheStore)(nil)
_ types.BasicMultiStore = (*viewStore)(nil)
_ types.KVStore = (*substore)(nil)
)
var (
// Root prefixes
merkleRootKey = []byte{0} // Key for root hash of namespace tree
schemaPrefix = []byte{1} // Prefix for store keys (namespaces)
contentPrefix = []byte{2} // Prefix for store contents
// Per-substore prefixes
substoreMerkleRootKey = []byte{0} // Key for root hashes of Merkle trees
dataPrefix = []byte{1} // Prefix for state mappings
indexPrefix = []byte{2} // Prefix for Store reverse index
smtPrefix = []byte{3} // Prefix for SMT data
ErrVersionDoesNotExist = errors.New("version does not exist")
ErrMaximumHeight = errors.New("maximum block height reached")
)
func ErrStoreNotFound(skey string) error {
return fmt.Errorf("store does not exist for key: %s", skey)
}
// StoreConfig is used to define a schema and other options and pass them to the MultiStore constructor.
type StoreConfig struct {
// Version pruning options for backing DBs.
Pruning types.PruningOptions
// The minimum allowed version number.
InitialVersion uint64
// The backing DB to use for the state commitment Merkle tree data.
// If nil, Merkle data is stored in the state storage DB under a separate prefix.
StateCommitmentDB dbm.DBConnection
prefixRegistry
PersistentCache types.MultiStorePersistentCache
Upgrades []types.StoreUpgrades
*traceListenMixin
}
// StoreSchema defineds a mapping of substore keys to store types
type StoreSchema map[string]types.StoreType
// Store is the main persistent store type implementing CommitMultiStore.
// Substores consist of an SMT-based state commitment store and state storage.
// Substores must be reserved in the StoreConfig or defined as part of a StoreUpgrade in order to be valid.
// Note:
// The state commitment data and proof are structured in the same basic pattern as the MultiStore, but use an SMT rather than IAVL tree:
// * The state commitment store of each substore consists of a independent SMT.
// * The state commitment of the root store consists of a Merkle map of all registered persistent substore names to the root hash of their corresponding SMTs
type Store struct {
stateDB dbm.DBConnection
stateTxn dbm.DBReadWriter
StateCommitmentDB dbm.DBConnection
stateCommitmentTxn dbm.DBReadWriter
schema StoreSchema
mem *mem.Store
tran *transient.Store
mtx sync.RWMutex
// Copied from StoreConfig
Pruning types.PruningOptions
InitialVersion uint64 // if
*traceListenMixin
PersistentCache types.MultiStorePersistentCache
substoreCache map[string]*substore
}
type substore struct {
root *Store
name string
dataBucket dbm.DBReadWriter
indexBucket dbm.DBReadWriter
stateCommitmentStore *smt.Store
}
// Branched state
type cacheStore struct {
source types.BasicMultiStore
substores map[string]types.CacheKVStore
*traceListenMixin
}
// Read-only store for querying past versions
type viewStore struct {
stateView dbm.DBReader
stateCommitmentView dbm.DBReader
substoreCache map[string]*viewSubstore
schema StoreSchema
}
type viewSubstore struct {
root *viewStore
name string
dataBucket dbm.DBReader
indexBucket dbm.DBReader
stateCommitmentStore *smt.Store
}
// Builder type used to create a valid schema with no prefix conflicts
type prefixRegistry struct {
StoreSchema
reserved []string
}
// Mixin type that to compose trace & listen state into each root store variant type
type traceListenMixin struct {
listeners map[string][]types.WriteListener
TraceWriter io.Writer
TraceContext types.TraceContext
}
func newTraceListenMixin() *traceListenMixin {
return &traceListenMixin{listeners: map[string][]types.WriteListener{}}
}
// DefaultStoreConfig returns a MultiStore config with an empty schema, a single backing DB,
// pruning with PruneDefault, no listeners and no tracer.
func DefaultStoreConfig() StoreConfig {
return StoreConfig{
Pruning: types.PruneDefault,
prefixRegistry: prefixRegistry{
StoreSchema: StoreSchema{},
},
traceListenMixin: newTraceListenMixin(),
}
}
// Returns true for valid store types for a MultiStore schema
func validSubStoreType(sst types.StoreType) bool {
switch sst {
case types.StoreTypePersistent:
return true
case types.StoreTypeMemory:
return true
case types.StoreTypeTransient:
return true
default:
return false
}
}
// Returns true iff both schema maps match exactly (including mem/tran stores)
func (this StoreSchema) equal(that StoreSchema) bool {
if len(this) != len(that) {
return false
}
for key, val := range that {
myval, has := this[key]
if !has {
return false
}
if val != myval {
return false
}
}
return true
}
// Parses a schema from the DB
func readSavedSchema(bucket dbm.DBReader) (*prefixRegistry, error) {
ret := prefixRegistry{StoreSchema: StoreSchema{}}
it, err := bucket.Iterator(nil, nil)
if err != nil {
return nil, err
}
for it.Next() {
value := it.Value()
if len(value) != 1 || !validSubStoreType(types.StoreType(value[0])) {
return nil, fmt.Errorf("invalid mapping for store key: %v => %v", it.Key(), value)
}
ret.StoreSchema[string(it.Key())] = types.StoreType(value[0])
ret.reserved = append(ret.reserved, string(it.Key())) // assume iter yields keys sorted
}
if err = it.Close(); err != nil {
return nil, err
}
return &ret, nil
}
// NewStore constructs a MultiStore directly from a database.
// Creates a new store if no data exists; otherwise loads existing data.
func NewStore(db dbm.DBConnection, opts StoreConfig) (ret *Store, err error) {
versions, err := db.Versions()
if err != nil {
return
}
// If the DB is not empty, attempt to load existing data
if saved := versions.Count(); saved != 0 {
if opts.InitialVersion != 0 && versions.Last() < opts.InitialVersion {
return nil, fmt.Errorf("latest saved version is less than initial version: %v < %v",
versions.Last(), opts.InitialVersion)
}
}
// To abide by atomicity constraints, revert the DB to the last saved version, in case it contains
// committed data in the "working" version.
// This should only happen if Store.Commit previously failed.
err = db.Revert()
if err != nil {
return
}
stateTxn := db.ReadWriter()
defer func() {
if err != nil {
err = util.CombineErrors(err, stateTxn.Discard(), "stateTxn.Discard also failed")
}
}()
stateCommitmentTxn := stateTxn
if opts.StateCommitmentDB != nil {
var scVersions dbm.VersionSet
scVersions, err = opts.StateCommitmentDB.Versions()
if err != nil {
return
}
// Version sets of each DB must match
if !versions.Equal(scVersions) {
err = fmt.Errorf("Storage and StateCommitment DB have different version history") //nolint:stylecheck
return
}
err = opts.StateCommitmentDB.Revert()
if err != nil {
return
}
stateCommitmentTxn = opts.StateCommitmentDB.ReadWriter()
}
ret = &Store{
stateDB: db,
stateTxn: stateTxn,
StateCommitmentDB: opts.StateCommitmentDB,
stateCommitmentTxn: stateCommitmentTxn,
mem: mem.NewStore(),
tran: transient.NewStore(),
substoreCache: map[string]*substore{},
traceListenMixin: opts.traceListenMixin,
PersistentCache: opts.PersistentCache,
Pruning: opts.Pruning,
InitialVersion: opts.InitialVersion,
}
// Now load the substore schema
schemaView := prefixdb.NewPrefixReader(ret.stateDB.Reader(), schemaPrefix)
defer func() {
if err != nil {
err = util.CombineErrors(err, schemaView.Discard(), "schemaView.Discard also failed")
err = util.CombineErrors(err, ret.Close(), "base.Close also failed")
}
}()
reg, err := readSavedSchema(schemaView)
if err != nil {
return
}
// If the loaded schema is empty (for new store), just copy the config schema;
// Otherwise, verify it is identical to the config schema
if len(reg.StoreSchema) == 0 {
for k, v := range opts.StoreSchema {
reg.StoreSchema[k] = v
}
reg.reserved = make([]string, len(opts.reserved))
copy(reg.reserved, opts.reserved)
} else {
if !reg.equal(opts.StoreSchema) {
err = errors.New("loaded schema does not match configured schema")
return
}
}
// Apply migrations, then clear old schema and write the new one
for _, upgrades := range opts.Upgrades {
err = reg.migrate(ret, upgrades)
if err != nil {
return
}
}
schemaWriter := prefixdb.NewPrefixWriter(ret.stateTxn, schemaPrefix)
it, err := schemaView.Iterator(nil, nil)
if err != nil {
return
}
for it.Next() {
err = schemaWriter.Delete(it.Key())
if err != nil {
return
}
}
err = it.Close()
if err != nil {
return
}
err = schemaView.Discard()
if err != nil {
return
}
// NB. the migrated contents and schema are not committed until the next store.Commit
for skey, typ := range reg.StoreSchema {
err = schemaWriter.Set([]byte(skey), []byte{byte(typ)})
if err != nil {
return
}
}
ret.schema = reg.StoreSchema
return
}
func (s *Store) Close() error {
err := s.stateTxn.Discard()
if s.StateCommitmentDB != nil {
err = util.CombineErrors(err, s.stateCommitmentTxn.Discard(), "stateCommitmentTxn.Discard also failed")
}
return err
}
// Applies store upgrades to the DB contents.
func (pr *prefixRegistry) migrate(store *Store, upgrades types.StoreUpgrades) error {
// Get a view of current state to allow mutation while iterating
reader := store.stateDB.Reader()
scReader := reader
if store.StateCommitmentDB != nil {
scReader = store.StateCommitmentDB.Reader()
}
for _, key := range upgrades.Deleted {
sst, ix, err := pr.storeInfo(key)
if err != nil {
return err
}
if sst != types.StoreTypePersistent {
return fmt.Errorf("prefix is for non-persistent substore: %v (%v)", key, sst)
}
pr.reserved = append(pr.reserved[:ix], pr.reserved[ix+1:]...)
delete(pr.StoreSchema, key)
pfx := substorePrefix(key)
subReader := prefixdb.NewPrefixReader(reader, pfx)
it, err := subReader.Iterator(nil, nil)
if err != nil {
return err
}
for it.Next() {
store.stateTxn.Delete(it.Key())
}
it.Close()
if store.StateCommitmentDB != nil {
subReader = prefixdb.NewPrefixReader(scReader, pfx)
it, err = subReader.Iterator(nil, nil)
if err != nil {
return err
}
for it.Next() {
store.stateCommitmentTxn.Delete(it.Key())
}
it.Close()
}
}
for _, rename := range upgrades.Renamed {
sst, ix, err := pr.storeInfo(rename.OldKey)
if err != nil {
return err
}
if sst != types.StoreTypePersistent {
return fmt.Errorf("prefix is for non-persistent substore: %v (%v)", rename.OldKey, sst)
}
pr.reserved = append(pr.reserved[:ix], pr.reserved[ix+1:]...)
delete(pr.StoreSchema, rename.OldKey)
err = pr.RegisterSubstore(rename.NewKey, types.StoreTypePersistent)
if err != nil {
return err
}
oldPrefix := substorePrefix(rename.OldKey)
newPrefix := substorePrefix(rename.NewKey)
subReader := prefixdb.NewPrefixReader(reader, oldPrefix)
subWriter := prefixdb.NewPrefixWriter(store.stateTxn, newPrefix)
it, err := subReader.Iterator(nil, nil)
if err != nil {
return err
}
for it.Next() {
subWriter.Set(it.Key(), it.Value())
}
it.Close()
if store.StateCommitmentDB != nil {
subReader = prefixdb.NewPrefixReader(scReader, oldPrefix)
subWriter = prefixdb.NewPrefixWriter(store.stateCommitmentTxn, newPrefix)
it, err = subReader.Iterator(nil, nil)
if err != nil {
return err
}
for it.Next() {
subWriter.Set(it.Key(), it.Value())
}
it.Close()
}
}
for _, key := range upgrades.Added {
err := pr.RegisterSubstore(key, types.StoreTypePersistent)
if err != nil {
return err
}
}
return nil
}
func substorePrefix(key string) []byte {
return append(contentPrefix, key...)
}
// GetKVStore implements BasicMultiStore.
func (rs *Store) GetKVStore(skey types.StoreKey) types.KVStore {
key := skey.Name()
var parent types.KVStore
typ, has := rs.schema[key]
if !has {
panic(ErrStoreNotFound(key))
}
switch typ {
case types.StoreTypeMemory:
parent = rs.mem
case types.StoreTypeTransient:
parent = rs.tran
case types.StoreTypePersistent:
default:
panic(fmt.Errorf("StoreType not supported: %v", typ)) // should never happen
}
var ret types.KVStore
if parent != nil { // store is non-persistent
ret = prefix.NewStore(parent, []byte(key))
} else { // store is persistent
sub, err := rs.getSubstore(key)
if err != nil {
panic(err)
}
rs.substoreCache[key] = sub
ret = sub
}
// Wrap with trace/listen if needed. Note: we don't cache this, so users must get a new substore after
// modifying tracers/listeners.
return rs.wrapTraceListen(ret, skey)
}
// Gets a persistent substore. This reads, but does not update the substore cache.
// Use it in cases where we need to access a store internally (e.g. read/write Merkle keys, queries)
func (rs *Store) getSubstore(key string) (*substore, error) {
if cached, has := rs.substoreCache[key]; has {
return cached, nil
}
pfx := substorePrefix(key)
stateRW := prefixdb.NewPrefixReadWriter(rs.stateTxn, pfx)
stateCommitmentRW := prefixdb.NewPrefixReadWriter(rs.stateCommitmentTxn, pfx)
var stateCommitmentStore *smt.Store
rootHash, err := stateRW.Get(substoreMerkleRootKey)
if err != nil {
return nil, err
}
if rootHash != nil {
stateCommitmentStore = loadSMT(stateCommitmentRW, rootHash)
} else {
smtdb := prefixdb.NewPrefixReadWriter(stateCommitmentRW, smtPrefix)
stateCommitmentStore = smt.NewStore(smtdb)
}
return &substore{
root: rs,
name: key,
dataBucket: prefixdb.NewPrefixReadWriter(stateRW, dataPrefix),
indexBucket: prefixdb.NewPrefixReadWriter(stateRW, indexPrefix),
stateCommitmentStore: stateCommitmentStore,
}, nil
}
// Resets a substore's state after commit (because root stateTxn has been discarded)
func (s *substore) refresh(rootHash []byte) {
pfx := substorePrefix(s.name)
stateRW := prefixdb.NewPrefixReadWriter(s.root.stateTxn, pfx)
stateCommitmentRW := prefixdb.NewPrefixReadWriter(s.root.stateCommitmentTxn, pfx)
s.dataBucket = prefixdb.NewPrefixReadWriter(stateRW, dataPrefix)
s.indexBucket = prefixdb.NewPrefixReadWriter(stateRW, indexPrefix)
s.stateCommitmentStore = loadSMT(stateCommitmentRW, rootHash)
}
// Commit implements Committer.
func (s *Store) Commit() types.CommitID {
// Substores read-lock this mutex; lock to prevent racey invalidation of underlying txns
s.mtx.Lock()
defer s.mtx.Unlock()
// Determine the target version
versions, err := s.stateDB.Versions()
if err != nil {
panic(err)
}
target := versions.Last() + 1
if target > math.MaxInt64 {
panic(ErrMaximumHeight)
}
// Fast forward to initial version if needed
if s.InitialVersion != 0 && target < s.InitialVersion {
target = s.InitialVersion
}
cid, err := s.commit(target)
if err != nil {
panic(err)
}
// Prune if necessary
previous := cid.Version - 1
if s.Pruning.Interval != 0 && cid.Version%int64(s.Pruning.Interval) == 0 {
// The range of newly prunable versions
lastPrunable := previous - int64(s.Pruning.KeepRecent)
firstPrunable := lastPrunable - int64(s.Pruning.Interval)
for version := firstPrunable; version <= lastPrunable; version++ {
s.stateDB.DeleteVersion(uint64(version))
if s.StateCommitmentDB != nil {
s.StateCommitmentDB.DeleteVersion(uint64(version))
}
}
}
s.tran.Commit()
return *cid
}
func (s *Store) getMerkleRoots() (ret map[string][]byte, err error) {
ret = map[string][]byte{}
for key := range s.schema {
sub, has := s.substoreCache[key]
if !has {
sub, err = s.getSubstore(key)
if err != nil {
return
}
}
ret[key] = sub.stateCommitmentStore.Root()
}
return
}
// Calculates root hashes and commits to DB. Does not verify target version or perform pruning.
func (s *Store) commit(target uint64) (id *types.CommitID, err error) {
storeHashes, err := s.getMerkleRoots()
if err != nil {
return
}
// Update substore Merkle roots
for key, storeHash := range storeHashes {
pfx := substorePrefix(key)
stateW := prefixdb.NewPrefixReadWriter(s.stateTxn, pfx)
if err = stateW.Set(substoreMerkleRootKey, storeHash); err != nil {
return
}
}
rootHash := sdkmaps.HashFromMap(storeHashes)
if err = s.stateTxn.Set(merkleRootKey, rootHash); err != nil {
return
}
if err = s.stateTxn.Commit(); err != nil {
return
}
defer func() {
if err != nil {
err = util.CombineErrors(err, s.stateDB.Revert(), "stateDB.Revert also failed")
}
}()
err = s.stateDB.SaveVersion(target)
if err != nil {
return
}
stateTxn := s.stateDB.ReadWriter()
defer func() {
if err != nil {
err = util.CombineErrors(err, stateTxn.Discard(), "stateTxn.Discard also failed")
}
}()
stateCommitmentTxn := stateTxn
// If DBs are not separate, StateCommitment state has been committed & snapshotted
if s.StateCommitmentDB != nil {
// if any error is encountered henceforth, we must revert the state and SC dbs
defer func() {
if err != nil {
if delerr := s.stateDB.DeleteVersion(target); delerr != nil {
err = fmt.Errorf("%w: commit rollback failed: %v", err, delerr)
}
}
}()
err = s.stateCommitmentTxn.Commit()
if err != nil {
return
}
defer func() {
if err != nil {
err = util.CombineErrors(err, s.StateCommitmentDB.Revert(), "stateCommitmentDB.Revert also failed")
}
}()
err = s.StateCommitmentDB.SaveVersion(target)
if err != nil {
return
}
stateCommitmentTxn = s.StateCommitmentDB.ReadWriter()
}
s.stateTxn = stateTxn
s.stateCommitmentTxn = stateCommitmentTxn
// the state of all live substores must be refreshed
for key, sub := range s.substoreCache {
sub.refresh(storeHashes[key])
}
return &types.CommitID{Version: int64(target), Hash: rootHash}, nil
}
// LastCommitID implements Committer.
func (s *Store) LastCommitID() types.CommitID {
versions, err := s.stateDB.Versions()
if err != nil {
panic(err)
}
last := versions.Last()
if last == 0 {
return types.CommitID{}
}
// Latest Merkle root is the one currently stored
hash, err := s.stateTxn.Get(merkleRootKey)
if err != nil {
panic(err)
}
return types.CommitID{Version: int64(last), Hash: hash}
}
// SetInitialVersion implements CommitMultiStore.
func (rs *Store) SetInitialVersion(version uint64) error {
rs.InitialVersion = uint64(version)
return nil
}
// GetVersion implements CommitMultiStore.
func (rs *Store) GetVersion(version int64) (types.BasicMultiStore, error) {
return rs.getView(version)
}
// CacheMultiStore implements BasicMultiStore.
func (rs *Store) CacheMultiStore() types.CacheMultiStore {
return &cacheStore{
source: rs,
substores: map[string]types.CacheKVStore{},
traceListenMixin: newTraceListenMixin(),
}
}
// parsePath expects a format like /<storeName>[/<subpath>]
// Must start with /, subpath may be empty
// Returns error if it doesn't start with /
func parsePath(path string) (storeName string, subpath string, err error) {
if !strings.HasPrefix(path, "/") {
return storeName, subpath, sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "invalid path: %s", path)
}
paths := strings.SplitN(path[1:], "/", 2)
storeName = paths[0]
if len(paths) == 2 {
subpath = "/" + paths[1]
}
return storeName, subpath, nil
}
// Query implements ABCI interface, allows queries.
//
// by default we will return from (latest height -1),
// as we will have merkle proofs immediately (header height = data height + 1)
// If latest-1 is not present, use latest (which must be present)
// if you care to have the latest data to see a tx results, you must
// explicitly set the height you want to see
func (rs *Store) Query(req abci.RequestQuery) (res abci.ResponseQuery) {
if len(req.Data) == 0 {
return sdkerrors.QueryResult(sdkerrors.Wrap(sdkerrors.ErrTxDecode, "query cannot be zero length"), false)
}
// if height is 0, use the latest height
height := req.Height
if height == 0 {
versions, err := rs.stateDB.Versions()
if err != nil {
return sdkerrors.QueryResult(errors.New("failed to get version info"), false)
}
latest := versions.Last()
if versions.Exists(latest - 1) {
height = int64(latest - 1)
} else {
height = int64(latest)
}
}
if height < 0 {
return sdkerrors.QueryResult(fmt.Errorf("height overflow: %v", height), false)
}
res.Height = height
storeName, subpath, err := parsePath(req.Path)
if err != nil {
return sdkerrors.QueryResult(sdkerrors.Wrapf(err, "failed to parse path"), false)
}
view, err := rs.getView(height)
if err != nil {
if errors.Is(err, dbm.ErrVersionDoesNotExist) {
err = sdkerrors.ErrInvalidHeight
}
return sdkerrors.QueryResult(sdkerrors.Wrapf(err, "failed to access height"), false)
}
if _, has := rs.schema[storeName]; !has {
return sdkerrors.QueryResult(sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "no such store: %s", storeName), false)
}
substore, err := view.getSubstore(storeName)
if err != nil {
return sdkerrors.QueryResult(sdkerrors.Wrapf(err, "failed to access store: %s", storeName), false)
}
switch subpath {
case "/key":
var err error
res.Key = req.Data // data holds the key bytes
res.Value = substore.Get(res.Key)
if !req.Prove {
break
}
// TODO: actual IBC compatible proof. This is a placeholder so unit tests can pass
res.ProofOps, err = substore.GetProof(res.Key)
if err != nil {
return sdkerrors.QueryResult(fmt.Errorf("Merkle proof creation failed for key: %v", res.Key), false) //nolint: stylecheck // proper name
}
case "/subspace":
res.Key = req.Data // data holds the subspace prefix
pairs := kv.Pairs{
Pairs: make([]kv.Pair, 0),
}
res.Key = req.Data // data holds the subspace prefix
iterator := substore.Iterator(res.Key, types.PrefixEndBytes(res.Key))
for ; iterator.Valid(); iterator.Next() {
pairs.Pairs = append(pairs.Pairs, kv.Pair{Key: iterator.Key(), Value: iterator.Value()})
}
iterator.Close()
bz, err := pairs.Marshal()
if err != nil {
panic(fmt.Errorf("failed to marshal KV pairs: %w", err))
}
res.Value = bz
default:
return sdkerrors.QueryResult(sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "unexpected query path: %v", req.Path), false)
}
return res
}
func loadSMT(stateCommitmentTxn dbm.DBReadWriter, root []byte) *smt.Store {
smtdb := prefixdb.NewPrefixReadWriter(stateCommitmentTxn, smtPrefix)
return smt.LoadStore(smtdb, root)
}
// Returns closest index and whether it's a match
func binarySearch(hay []string, ndl string) (int, bool) {
var mid int
from, to := 0, len(hay)-1
for from <= to {
mid = (from + to) / 2
switch strings.Compare(hay[mid], ndl) {
case -1:
from = mid + 1
case 1:
to = mid - 1
default:
return mid, true
}
}
return from, false
}
func (pr *prefixRegistry) storeInfo(key string) (sst types.StoreType, ix int, err error) {
ix, has := binarySearch(pr.reserved, key)
if !has {
err = fmt.Errorf("prefix does not exist: %v", key)
return
}
sst, has = pr.StoreSchema[key]
if !has {
err = fmt.Errorf("prefix is registered but not in schema: %v", key)
}
return
}
func (pr *prefixRegistry) RegisterSubstore(key string, typ types.StoreType) error {
if !validSubStoreType(typ) {
return fmt.Errorf("StoreType not supported: %v", typ)
}
// Find the neighboring reserved prefix, and check for duplicates and conflicts
i, has := binarySearch(pr.reserved, key)
if has {
return fmt.Errorf("prefix already exists: %v", key)
}
if i > 0 && strings.HasPrefix(key, pr.reserved[i-1]) {
return fmt.Errorf("prefix conflict: '%v' exists, cannot add '%v'", pr.reserved[i-1], key)
}
if i < len(pr.reserved) && strings.HasPrefix(pr.reserved[i], key) {
return fmt.Errorf("prefix conflict: '%v' exists, cannot add '%v'", pr.reserved[i], key)
}
reserved := pr.reserved[:i]
reserved = append(reserved, key)
pr.reserved = append(reserved, pr.reserved[i:]...)
pr.StoreSchema[key] = typ
return nil
}
func (tlm *traceListenMixin) AddListeners(skey types.StoreKey, listeners []types.WriteListener) {
key := skey.Name()
tlm.listeners[key] = append(tlm.listeners[key], listeners...)
}
// ListeningEnabled returns if listening is enabled for a specific KVStore
func (tlm *traceListenMixin) ListeningEnabled(key types.StoreKey) bool {
if ls, has := tlm.listeners[key.Name()]; has {
return len(ls) != 0
}
return false
}
func (tlm *traceListenMixin) TracingEnabled() bool {
return tlm.TraceWriter != nil
}
func (tlm *traceListenMixin) SetTracer(w io.Writer) {
tlm.TraceWriter = w
}
func (tlm *traceListenMixin) SetTraceContext(tc types.TraceContext) {
tlm.TraceContext = tc
}
func (tlm *traceListenMixin) wrapTraceListen(store types.KVStore, skey types.StoreKey) types.KVStore {
if tlm.TracingEnabled() {
store = tracekv.NewStore(store, tlm.TraceWriter, tlm.TraceContext)
}
if tlm.ListeningEnabled(skey) {
store = listenkv.NewStore(store, skey, tlm.listeners[skey.Name()])
}
return store
}
func (s *Store) GetPruning() types.PruningOptions { return s.Pruning }
func (s *Store) SetPruning(po types.PruningOptions) { s.Pruning = po }
func (rs *Store) Restore(
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
return snapshottypes.SnapshotItem{}, nil
}
func (rs *Store) Snapshot(height uint64, protoWriter protoio.Writer) error {
return nil
}