ADR-038 Part 1: WriteListener, listen.KVStore, MultiStore and KVStore updates (#8551)

* StoreKVPair protobuf message definition and generated go types

* store WriteListener

* update MultiStore, CacheWrap, CacheWrapper interfaces

* adjust KVStores to fit new CacheWrapper interface

* new ListenKVStore

* adjust multistores to fit new MultiStore interface and enable wrapping returned KVStores with the new ListenKVStore

* typo fixes in adr

* ListenKV Store test

* update server mock KVStore and MultiStore

* multistore unit test; fix multistore constructor

* update changelog

* fix bug identified in CI

* improve codecov, minor fixes/adjustments

* review fixes

* review updates; flip set to delete in KVStorePair, updated proto-docs from running 'make proto-gen'
This commit is contained in:
Ian Norden 2021-03-30 15:13:51 -05:00 committed by GitHub
parent f8f52c6df6
commit feed37dc56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1456 additions and 46 deletions

View File

@ -96,6 +96,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (x/upgrade) [\#8743](https://github.com/cosmos/cosmos-sdk/pull/8743) Add tracking module versions as per ADR-041
* (types) [\#8962](https://github.com/cosmos/cosmos-sdk/issues/8962) Add `Abs()` method to `sdk.Int`.
* (x/bank) [\#8950](https://github.com/cosmos/cosmos-sdk/pull/8950) Improve efficiency on supply updates.
* (store) [\#8012](https://github.com/cosmos/cosmos-sdk/pull/8012) Implementation of ADR-038 WriteListener and listen.KVStore
### Bug Fixes

View File

@ -33,7 +33,7 @@ type WriteListener interface {
// if value is nil then it was deleted
// storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores
// set bool indicates if it was a set; true: set, false: delete
OnWrite(storeKey types.StoreKey, set bool, key []byte, value []byte)
OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error
}
```
@ -72,15 +72,20 @@ func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryMarshaler) *StoreKVP
}
// OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs
func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, set bool, key []byte, value []byte) {
func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, key []byte, value []byte, delete bool) error error {
kvPair := new(types.StoreKVPair)
kvPair.StoreKey = storeKey.Name()
kvPair.Set = set
kvPair.Delete = Delete
kvPair.Key = key
kvPair.Value = value
if by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair); err == nil {
wl.writer.Write(by)
by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair)
if err != nil {
return err
}
if _, err := wl.writer.Write(by); err != nil {
return err
}
return nil
}
```
@ -110,20 +115,22 @@ func NewStore(parent types.KVStore, psk types.StoreKey, listeners []types.WriteL
func (s *Store) Set(key []byte, value []byte) {
types.AssertValidKey(key)
s.parent.Set(key, value)
s.onWrite(true, key, value)
s.onWrite(false, key, value)
}
// Delete implements the KVStore interface. It traces a write operation and
// delegates the Delete call to the parent KVStore.
func (s *Store) Delete(key []byte) {
s.parent.Delete(key)
s.onWrite(false, key, nil)
s.onWrite(true, key, nil)
}
// onWrite writes a KVStore operation to all of the WriteListeners
func (s *Store) onWrite(set bool, key, value []byte) {
func (s *Store) onWrite(delete bool, key, value []byte) {
for _, l := range s.listeners {
l.OnWrite(s.parentStoreKey, set, key, value)
if err := l.OnWrite(s.parentStoreKey, key, value, delete); err != nil {
// log error
}
}
}
```
@ -140,9 +147,9 @@ type MultiStore interface {
// ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey
ListeningEnabled(key StoreKey) bool
// SetListeners sets the WriteListeners for the KVStore belonging to the provided StoreKey
// AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey
// It appends the listeners to a current set, if one already exists
SetListeners(key StoreKey, listeners []WriteListener)
AddListeners(key StoreKey, listeners []WriteListener)
}
```
@ -342,7 +349,7 @@ func (fss *FileStreamingService) Stream(wg *sync.WaitGroup, quitChan <-chan stru
case <-quitChan:
return
case by := <-fss.srcChan:
append(fss.stateCache, by)
fss.stateCache = append(fss.stateCache, by)
}
}
}()
@ -380,7 +387,7 @@ We will add a new method to the `BaseApp` to enable the registration of `Streami
func (app *BaseApp) RegisterHooks(s StreamingService) {
// set the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.SetListeners(key, lis)
app.cms.AddListeners(key, lis)
}
// register the streaming service hooks within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context using these hooks
@ -398,7 +405,7 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
...
// Call the streaming service hooks with the BeginBlock messages
for _ hook := range app.hooks {
for _, hook := range app.hooks {
hook.ListenBeginBlock(app.deliverState.ctx, req, res)
}
@ -445,7 +452,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
}
// Call the streaming service hooks with the DeliverTx messages
for _, hook := range app.hook {
for _, hook := range app.hooks {
hook.ListenDeliverTx(app.deliverState.ctx, req, res)
}

View File

@ -80,7 +80,6 @@
- [Output](#cosmos.bank.v1beta1.Output)
- [Params](#cosmos.bank.v1beta1.Params)
- [SendEnabled](#cosmos.bank.v1beta1.SendEnabled)
- [Supply](#cosmos.bank.v1beta1.Supply)
- [cosmos/bank/v1beta1/genesis.proto](#cosmos/bank/v1beta1/genesis.proto)
- [Balance](#cosmos.bank.v1beta1.Balance)
@ -133,6 +132,9 @@
- [CommitInfo](#cosmos.base.store.v1beta1.CommitInfo)
- [StoreInfo](#cosmos.base.store.v1beta1.StoreInfo)
- [cosmos/base/store/v1beta1/listening.proto](#cosmos/base/store/v1beta1/listening.proto)
- [StoreKVPair](#cosmos.base.store.v1beta1.StoreKVPair)
- [cosmos/base/store/v1beta1/snapshot.proto](#cosmos/base/store/v1beta1/snapshot.proto)
- [SnapshotIAVLItem](#cosmos.base.store.v1beta1.SnapshotIAVLItem)
- [SnapshotItem](#cosmos.base.store.v1beta1.SnapshotItem)
@ -1563,23 +1565,6 @@ sendable).
<a name="cosmos.bank.v1beta1.Supply"></a>
### Supply
Supply represents a struct that passively keeps track of the total supply
amounts in the network.
This message is deprecated now that supply is indexed by denom.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| `total` | [cosmos.base.v1beta1.Coin](#cosmos.base.v1beta1.Coin) | repeated | |
<!-- end messages -->
<!-- end enums -->
@ -2200,6 +2185,41 @@ between a store name and the commit ID.
<!-- end messages -->
<!-- end enums -->
<!-- end HasExtensions -->
<!-- end services -->
<a name="cosmos/base/store/v1beta1/listening.proto"></a>
<p align="right"><a href="#top">Top</a></p>
## cosmos/base/store/v1beta1/listening.proto
<a name="cosmos.base.store.v1beta1.StoreKVPair"></a>
### StoreKVPair
StoreKVPair is a KVStore KVPair used for listening to state changes (Sets and Deletes)
It optionally includes the StoreKey for the originating KVStore and a Boolean flag to distinguish between Sets and Deletes
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| `store_key` | [string](#string) | | the store key for the KVStore this pair originates from |
| `delete` | [bool](#bool) | | true indicates a delete operation, false indicates a set operation |
| `key` | [bytes](#bytes) | | |
| `value` | [bytes](#bytes) | | |
<!-- end messages -->
<!-- end enums -->

View File

@ -0,0 +1,13 @@
syntax = "proto3";
package cosmos.base.store.v1beta1;
option go_package = "github.com/cosmos/cosmos-sdk/store/types";
// StoreKVPair is a KVStore KVPair used for listening to state changes (Sets and Deletes)
// It optionally includes the StoreKey for the originating KVStore and a Boolean flag to distinguish between Sets and Deletes
message StoreKVPair {
string store_key = 1; // the store key for the KVStore this pair originates from
bool delete = 2; // true indicates a delete operation, false indicates a set operation
bytes key = 3;
bytes value = 4;
}

View File

@ -31,6 +31,10 @@ func (ms multiStore) CacheWrapWithTrace(_ io.Writer, _ sdk.TraceContext) sdk.Cac
panic("not implemented")
}
func (ms multiStore) CacheWrapWithListeners(_ store.StoreKey, _ []store.WriteListener) store.CacheWrap {
panic("not implemented")
}
func (ms multiStore) TracingEnabled() bool {
panic("not implemented")
}
@ -43,6 +47,14 @@ func (ms multiStore) SetTracer(w io.Writer) sdk.MultiStore {
panic("not implemented")
}
func (ms multiStore) AddListeners(key store.StoreKey, listeners []store.WriteListener) {
panic("not implemented")
}
func (ms multiStore) ListeningEnabled(key store.StoreKey) bool {
panic("not implemented")
}
func (ms multiStore) Commit() sdk.CommitID {
panic("not implemented")
}
@ -131,6 +143,10 @@ func (kv kvStore) CacheWrapWithTrace(w io.Writer, tc sdk.TraceContext) sdk.Cache
panic("not implemented")
}
func (kv kvStore) CacheWrapWithListeners(_ store.StoreKey, _ []store.WriteListener) store.CacheWrap {
panic("not implemented")
}
func (kv kvStore) GetStoreType() sdk.StoreType {
panic("not implemented")
}

View File

@ -10,6 +10,7 @@ import (
dbm "github.com/tendermint/tm-db"
"github.com/cosmos/cosmos-sdk/internal/conv"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/tracekv"
"github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
@ -146,6 +147,11 @@ func (store *Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types
return NewStore(tracekv.NewStore(store, w, tc))
}
// CacheWrapWithListeners implements the CacheWrapper interface.
func (store *Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap {
return NewStore(listenkv.NewStore(store, storeKey, listeners))
}
//----------------------------------------
// Iteration

View File

@ -25,6 +25,8 @@ type Store struct {
traceWriter io.Writer
traceContext types.TraceContext
listeners map[types.StoreKey][]types.WriteListener
}
var _ types.CacheMultiStore = Store{}
@ -35,6 +37,7 @@ var _ types.CacheMultiStore = Store{}
func NewFromKVStore(
store types.KVStore, stores map[types.StoreKey]types.CacheWrapper,
keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext,
listeners map[types.StoreKey][]types.WriteListener,
) Store {
cms := Store{
db: cachekv.NewStore(store),
@ -42,13 +45,20 @@ func NewFromKVStore(
keys: keys,
traceWriter: traceWriter,
traceContext: traceContext,
listeners: listeners,
}
for key, store := range stores {
var cacheWrapped types.CacheWrap
if cms.TracingEnabled() {
cms.stores[key] = store.CacheWrapWithTrace(cms.traceWriter, cms.traceContext)
cacheWrapped = store.CacheWrapWithTrace(cms.traceWriter, cms.traceContext)
} else {
cms.stores[key] = store.CacheWrap()
cacheWrapped = store.CacheWrap()
}
if cms.ListeningEnabled(key) {
cms.stores[key] = cacheWrapped.CacheWrapWithListeners(key, cms.listeners[key])
} else {
cms.stores[key] = cacheWrapped
}
}
@ -59,10 +69,10 @@ func NewFromKVStore(
// CacheWrapper objects. Each CacheWrapper store is a branched store.
func NewStore(
db dbm.DB, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey,
traceWriter io.Writer, traceContext types.TraceContext,
traceWriter io.Writer, traceContext types.TraceContext, listeners map[types.StoreKey][]types.WriteListener,
) Store {
return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext)
return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext, listeners)
}
func newCacheMultiStoreFromCMS(cms Store) Store {
@ -71,7 +81,7 @@ func newCacheMultiStoreFromCMS(cms Store) Store {
stores[k] = v
}
return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext)
return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext, cms.listeners)
}
// SetTracer sets the tracer for the MultiStore that the underlying
@ -102,6 +112,23 @@ func (cms Store) TracingEnabled() bool {
return cms.traceWriter != nil
}
// AddListeners adds listeners for a specific KVStore
func (cms Store) AddListeners(key types.StoreKey, listeners []types.WriteListener) {
if ls, ok := cms.listeners[key]; ok {
cms.listeners[key] = append(ls, listeners...)
} else {
cms.listeners[key] = listeners
}
}
// ListeningEnabled returns if listening is enabled for a specific KVStore
func (cms Store) ListeningEnabled(key types.StoreKey) bool {
if ls, ok := cms.listeners[key]; ok {
return len(ls) != 0
}
return false
}
// GetStoreType returns the type of the store.
func (cms Store) GetStoreType() types.StoreType {
return types.StoreTypeMulti
@ -125,6 +152,11 @@ func (cms Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.Cac
return cms.CacheWrap()
}
// CacheWrapWithListeners implements the CacheWrapper interface.
func (cms Store) CacheWrapWithListeners(_ types.StoreKey, _ []types.WriteListener) types.CacheWrap {
return cms.CacheWrap()
}
// Implements MultiStore.
func (cms Store) CacheMultiStore() types.CacheMultiStore {
return newCacheMultiStoreFromCMS(cms)

View File

@ -6,6 +6,7 @@ import (
dbm "github.com/tendermint/tm-db"
"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/tracekv"
"github.com/cosmos/cosmos-sdk/store/types"
)
@ -85,5 +86,10 @@ func (dsa Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Ca
return cachekv.NewStore(tracekv.NewStore(dsa, w, tc))
}
// CacheWrapWithListeners implements the CacheWrapper interface.
func (dsa Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap {
return cachekv.NewStore(listenkv.NewStore(dsa, storeKey, listeners))
}
// dbm.DB implements KVStore so we can CacheKVStore it.
var _ types.KVStore = Store{}

View File

@ -5,6 +5,8 @@ import (
"errors"
"testing"
"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
@ -71,3 +73,18 @@ func TestAccessors(t *testing.T) {
mockDB.EXPECT().ReverseIterator(gomock.Eq(start), gomock.Eq(end)).Times(1).Return(nil, errFoo)
require.Panics(t, func() { store.ReverseIterator(start, end) })
}
func TestCacheWraps(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockDB := mocks.NewMockDB(mockCtrl)
store := dbadapter.Store{mockDB}
cacheWrapper := store.CacheWrap()
require.IsType(t, &cachekv.Store{}, cacheWrapper)
cacheWrappedWithTrace := store.CacheWrapWithTrace(nil, nil)
require.IsType(t, &cachekv.Store{}, cacheWrappedWithTrace)
cacheWrappedWithListeners := store.CacheWrapWithListeners(nil, nil)
require.IsType(t, &cachekv.Store{}, cacheWrappedWithListeners)
}

View File

@ -98,6 +98,11 @@ func (gs *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.Cac
panic("cannot CacheWrapWithTrace a GasKVStore")
}
// CacheWrapWithListeners implements the CacheWrapper interface.
func (gs *Store) CacheWrapWithListeners(_ types.StoreKey, _ []types.WriteListener) types.CacheWrap {
panic("cannot CacheWrapWithListeners a GasKVStore")
}
func (gs *Store) iterator(start, end []byte, ascending bool) types.Iterator {
var parent types.Iterator
if ascending {

View File

@ -26,6 +26,7 @@ func TestGasKVStoreBasic(t *testing.T) {
require.Equal(t, types.StoreTypeDB, st.GetStoreType())
require.Panics(t, func() { st.CacheWrap() })
require.Panics(t, func() { st.CacheWrapWithTrace(nil, nil) })
require.Panics(t, func() { st.CacheWrapWithListeners(nil, nil) })
require.Panics(t, func() { st.Set(nil, []byte("value")) }, "setting a nil key should panic")
require.Panics(t, func() { st.Set([]byte(""), []byte("value")) }, "setting an empty key should panic")

View File

@ -14,6 +14,7 @@ import (
dbm "github.com/tendermint/tm-db"
"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/tracekv"
"github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
@ -159,6 +160,11 @@ func (st *Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Ca
return cachekv.NewStore(tracekv.NewStore(st, w, tc))
}
// CacheWrapWithListeners implements the CacheWrapper interface.
func (st *Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap {
return cachekv.NewStore(listenkv.NewStore(st, storeKey, listeners))
}
// Implements types.KVStore.
func (st *Store) Set(key, value []byte) {
types.AssertValidKey(key)

View File

@ -5,6 +5,8 @@ import (
"fmt"
"testing"
"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/iavl"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
@ -634,3 +636,18 @@ func TestSetInitialVersion(t *testing.T) {
})
}
}
func TestCacheWraps(t *testing.T) {
db := dbm.NewMemDB()
tree, _ := newAlohaTree(t, db)
store := UnsafeNewStore(tree)
cacheWrapper := store.CacheWrap()
require.IsType(t, &cachekv.Store{}, cacheWrapper)
cacheWrappedWithTrace := store.CacheWrapWithTrace(nil, nil)
require.IsType(t, &cachekv.Store{}, cacheWrappedWithTrace)
cacheWrappedWithListeners := store.CacheWrapWithListeners(nil, nil)
require.IsType(t, &cachekv.Store{}, cacheWrappedWithListeners)
}

155
store/listenkv/store.go Normal file
View File

@ -0,0 +1,155 @@
package listenkv
import (
"io"
"github.com/cosmos/cosmos-sdk/store/types"
)
var _ types.KVStore = &Store{}
// Store implements the KVStore interface with listening enabled.
// Operations are traced on each core KVStore call and written to any of the
// underlying listeners with the proper key and operation permissions
type Store struct {
parent types.KVStore
listeners []types.WriteListener
parentStoreKey types.StoreKey
}
// NewStore returns a reference to a new traceKVStore given a parent
// KVStore implementation and a buffered writer.
func NewStore(parent types.KVStore, parentStoreKey types.StoreKey, listeners []types.WriteListener) *Store {
return &Store{parent: parent, listeners: listeners, parentStoreKey: parentStoreKey}
}
// Get implements the KVStore interface. It traces a read operation and
// delegates a Get call to the parent KVStore.
func (s *Store) Get(key []byte) []byte {
value := s.parent.Get(key)
return value
}
// Set implements the KVStore interface. It traces a write operation and
// delegates the Set call to the parent KVStore.
func (s *Store) Set(key []byte, value []byte) {
types.AssertValidKey(key)
s.parent.Set(key, value)
s.onWrite(false, key, value)
}
// Delete implements the KVStore interface. It traces a write operation and
// delegates the Delete call to the parent KVStore.
func (s *Store) Delete(key []byte) {
s.parent.Delete(key)
s.onWrite(true, key, nil)
}
// Has implements the KVStore interface. It delegates the Has call to the
// parent KVStore.
func (s *Store) Has(key []byte) bool {
return s.parent.Has(key)
}
// Iterator implements the KVStore interface. It delegates the Iterator call
// the to the parent KVStore.
func (s *Store) Iterator(start, end []byte) types.Iterator {
return s.iterator(start, end, true)
}
// ReverseIterator implements the KVStore interface. It delegates the
// ReverseIterator call the to the parent KVStore.
func (s *Store) ReverseIterator(start, end []byte) types.Iterator {
return s.iterator(start, end, false)
}
// iterator facilitates iteration over a KVStore. It delegates the necessary
// calls to it's parent KVStore.
func (s *Store) iterator(start, end []byte, ascending bool) types.Iterator {
var parent types.Iterator
if ascending {
parent = s.parent.Iterator(start, end)
} else {
parent = s.parent.ReverseIterator(start, end)
}
return newTraceIterator(parent, s.listeners)
}
type listenIterator struct {
parent types.Iterator
listeners []types.WriteListener
}
func newTraceIterator(parent types.Iterator, listeners []types.WriteListener) types.Iterator {
return &listenIterator{parent: parent, listeners: listeners}
}
// Domain implements the Iterator interface.
func (li *listenIterator) Domain() (start []byte, end []byte) {
return li.parent.Domain()
}
// Valid implements the Iterator interface.
func (li *listenIterator) Valid() bool {
return li.parent.Valid()
}
// Next implements the Iterator interface.
func (li *listenIterator) Next() {
li.parent.Next()
}
// Key implements the Iterator interface.
func (li *listenIterator) Key() []byte {
key := li.parent.Key()
return key
}
// Value implements the Iterator interface.
func (li *listenIterator) Value() []byte {
value := li.parent.Value()
return value
}
// Close implements the Iterator interface.
func (li *listenIterator) Close() error {
return li.parent.Close()
}
// Error delegates the Error call to the parent iterator.
func (li *listenIterator) Error() error {
return li.parent.Error()
}
// GetStoreType implements the KVStore interface. It returns the underlying
// KVStore type.
func (s *Store) GetStoreType() types.StoreType {
return s.parent.GetStoreType()
}
// CacheWrap implements the KVStore interface. It panics as a Store
// cannot be cache wrapped.
func (s *Store) CacheWrap() types.CacheWrap {
panic("cannot CacheWrap a ListenKVStore")
}
// CacheWrapWithTrace implements the KVStore interface. It panics as a
// Store cannot be cache wrapped.
func (s *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap {
panic("cannot CacheWrapWithTrace a ListenKVStore")
}
// CacheWrapWithListeners implements the KVStore interface. It panics as a
// Store cannot be cache wrapped.
func (s *Store) CacheWrapWithListeners(_ types.StoreKey, _ []types.WriteListener) types.CacheWrap {
panic("cannot CacheWrapWithListeners a ListenKVStore")
}
// onWrite writes a KVStore operation to all of the WriteListeners
func (s *Store) onWrite(delete bool, key, value []byte) {
for _, l := range s.listeners {
l.OnWrite(s.parentStoreKey, key, value, delete)
}
}

View File

@ -0,0 +1,298 @@
package listenkv_test
import (
"bytes"
"fmt"
"io"
"testing"
"github.com/cosmos/cosmos-sdk/codec"
codecTypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/prefix"
"github.com/cosmos/cosmos-sdk/store/types"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
)
func bz(s string) []byte { return []byte(s) }
func keyFmt(i int) []byte { return bz(fmt.Sprintf("key%0.8d", i)) }
func valFmt(i int) []byte { return bz(fmt.Sprintf("value%0.8d", i)) }
var kvPairs = []types.KVPair{
{Key: keyFmt(1), Value: valFmt(1)},
{Key: keyFmt(2), Value: valFmt(2)},
{Key: keyFmt(3), Value: valFmt(3)},
}
var testStoreKey = types.NewKVStoreKey("listen_test")
var interfaceRegistry = codecTypes.NewInterfaceRegistry()
var testMarshaller = codec.NewProtoCodec(interfaceRegistry)
func newListenKVStore(w io.Writer) *listenkv.Store {
store := newEmptyListenKVStore(w)
for _, kvPair := range kvPairs {
store.Set(kvPair.Key, kvPair.Value)
}
return store
}
func newEmptyListenKVStore(w io.Writer) *listenkv.Store {
listener := types.NewStoreKVPairWriteListener(w, testMarshaller)
memDB := dbadapter.Store{DB: dbm.NewMemDB()}
return listenkv.NewStore(memDB, testStoreKey, []types.WriteListener{listener})
}
func TestListenKVStoreGet(t *testing.T) {
testCases := []struct {
key []byte
expectedValue []byte
}{
{
key: kvPairs[0].Key,
expectedValue: kvPairs[0].Value,
},
{
key: []byte("does-not-exist"),
expectedValue: nil,
},
}
for _, tc := range testCases {
var buf bytes.Buffer
store := newListenKVStore(&buf)
buf.Reset()
value := store.Get(tc.key)
require.Equal(t, tc.expectedValue, value)
}
}
func TestListenKVStoreSet(t *testing.T) {
testCases := []struct {
key []byte
value []byte
expectedOut *types.StoreKVPair
}{
{
key: kvPairs[0].Key,
value: kvPairs[0].Value,
expectedOut: &types.StoreKVPair{
Key: kvPairs[0].Key,
Value: kvPairs[0].Value,
StoreKey: testStoreKey.Name(),
Delete: false,
},
},
{
key: kvPairs[1].Key,
value: kvPairs[1].Value,
expectedOut: &types.StoreKVPair{
Key: kvPairs[1].Key,
Value: kvPairs[1].Value,
StoreKey: testStoreKey.Name(),
Delete: false,
},
},
{
key: kvPairs[2].Key,
value: kvPairs[2].Value,
expectedOut: &types.StoreKVPair{
Key: kvPairs[2].Key,
Value: kvPairs[2].Value,
StoreKey: testStoreKey.Name(),
Delete: false,
},
},
}
for _, tc := range testCases {
var buf bytes.Buffer
store := newEmptyListenKVStore(&buf)
buf.Reset()
store.Set(tc.key, tc.value)
storeKVPair := new(types.StoreKVPair)
testMarshaller.UnmarshalBinaryLengthPrefixed(buf.Bytes(), storeKVPair)
require.Equal(t, tc.expectedOut, storeKVPair)
}
var buf bytes.Buffer
store := newEmptyListenKVStore(&buf)
require.Panics(t, func() { store.Set([]byte(""), []byte("value")) }, "setting an empty key should panic")
require.Panics(t, func() { store.Set(nil, []byte("value")) }, "setting a nil key should panic")
}
func TestListenKVStoreDelete(t *testing.T) {
testCases := []struct {
key []byte
expectedOut *types.StoreKVPair
}{
{
key: kvPairs[0].Key,
expectedOut: &types.StoreKVPair{
Key: kvPairs[0].Key,
Value: nil,
StoreKey: testStoreKey.Name(),
Delete: true,
},
},
}
for _, tc := range testCases {
var buf bytes.Buffer
store := newListenKVStore(&buf)
buf.Reset()
store.Delete(tc.key)
storeKVPair := new(types.StoreKVPair)
testMarshaller.UnmarshalBinaryLengthPrefixed(buf.Bytes(), storeKVPair)
require.Equal(t, tc.expectedOut, storeKVPair)
}
}
func TestListenKVStoreHas(t *testing.T) {
testCases := []struct {
key []byte
expected bool
}{
{
key: kvPairs[0].Key,
expected: true,
},
}
for _, tc := range testCases {
var buf bytes.Buffer
store := newListenKVStore(&buf)
buf.Reset()
ok := store.Has(tc.key)
require.Equal(t, tc.expected, ok)
}
}
func TestTestListenKVStoreIterator(t *testing.T) {
var buf bytes.Buffer
store := newListenKVStore(&buf)
iterator := store.Iterator(nil, nil)
s, e := iterator.Domain()
require.Equal(t, []byte(nil), s)
require.Equal(t, []byte(nil), e)
testCases := []struct {
expectedKey []byte
expectedValue []byte
}{
{
expectedKey: kvPairs[0].Key,
expectedValue: kvPairs[0].Value,
},
{
expectedKey: kvPairs[1].Key,
expectedValue: kvPairs[1].Value,
},
{
expectedKey: kvPairs[2].Key,
expectedValue: kvPairs[2].Value,
},
}
for _, tc := range testCases {
ka := iterator.Key()
require.Equal(t, tc.expectedKey, ka)
va := iterator.Value()
require.Equal(t, tc.expectedValue, va)
iterator.Next()
}
require.False(t, iterator.Valid())
require.Panics(t, iterator.Next)
require.NoError(t, iterator.Close())
}
func TestTestListenKVStoreReverseIterator(t *testing.T) {
var buf bytes.Buffer
store := newListenKVStore(&buf)
iterator := store.ReverseIterator(nil, nil)
s, e := iterator.Domain()
require.Equal(t, []byte(nil), s)
require.Equal(t, []byte(nil), e)
testCases := []struct {
expectedKey []byte
expectedValue []byte
}{
{
expectedKey: kvPairs[2].Key,
expectedValue: kvPairs[2].Value,
},
{
expectedKey: kvPairs[1].Key,
expectedValue: kvPairs[1].Value,
},
{
expectedKey: kvPairs[0].Key,
expectedValue: kvPairs[0].Value,
},
}
for _, tc := range testCases {
ka := iterator.Key()
require.Equal(t, tc.expectedKey, ka)
va := iterator.Value()
require.Equal(t, tc.expectedValue, va)
iterator.Next()
}
require.False(t, iterator.Valid())
require.Panics(t, iterator.Next)
require.NoError(t, iterator.Close())
}
func TestListenKVStorePrefix(t *testing.T) {
store := newEmptyListenKVStore(nil)
pStore := prefix.NewStore(store, []byte("listen_prefix"))
require.IsType(t, prefix.Store{}, pStore)
}
func TestListenKVStoreGetStoreType(t *testing.T) {
memDB := dbadapter.Store{DB: dbm.NewMemDB()}
store := newEmptyListenKVStore(nil)
require.Equal(t, memDB.GetStoreType(), store.GetStoreType())
}
func TestListenKVStoreCacheWrap(t *testing.T) {
store := newEmptyListenKVStore(nil)
require.Panics(t, func() { store.CacheWrap() })
}
func TestListenKVStoreCacheWrapWithTrace(t *testing.T) {
store := newEmptyListenKVStore(nil)
require.Panics(t, func() { store.CacheWrapWithTrace(nil, nil) })
}
func TestListenKVStoreCacheWrapWithListeners(t *testing.T) {
store := newEmptyListenKVStore(nil)
require.Panics(t, func() { store.CacheWrapWithListeners(nil, nil) })
}

View File

@ -3,6 +3,8 @@ package mem_test
import (
"testing"
"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/stretchr/testify/require"
"github.com/cosmos/cosmos-sdk/store/mem"
@ -25,6 +27,15 @@ func TestStore(t *testing.T) {
db.Delete(key)
require.Nil(t, db.Get(key))
cacheWrapper := db.CacheWrap()
require.IsType(t, &cachekv.Store{}, cacheWrapper)
cacheWrappedWithTrace := db.CacheWrapWithTrace(nil, nil)
require.IsType(t, &cachekv.Store{}, cacheWrappedWithTrace)
cacheWrappedWithListeners := db.CacheWrapWithListeners(nil, nil)
require.IsType(t, &cachekv.Store{}, cacheWrappedWithListeners)
}
func TestCommit(t *testing.T) {

View File

@ -7,6 +7,7 @@ import (
"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/tracekv"
"github.com/cosmos/cosmos-sdk/store/types"
)
@ -45,6 +46,11 @@ func (s Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Cach
return cachekv.NewStore(tracekv.NewStore(s, w, tc))
}
// CacheWrapWithListeners implements the CacheWrapper interface.
func (s Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap {
return cachekv.NewStore(listenkv.NewStore(s, storeKey, listeners))
}
// Commit performs a no-op as entries are persistent between commitments.
func (s *Store) Commit() (id types.CommitID) { return }

View File

@ -6,6 +6,7 @@ import (
"io"
"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/tracekv"
"github.com/cosmos/cosmos-sdk/store/types"
)
@ -57,6 +58,11 @@ func (s Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Cach
return cachekv.NewStore(tracekv.NewStore(s, w, tc))
}
// CacheWrapWithListeners implements the CacheWrapper interface.
func (s Store) CacheWrapWithListeners(storeKey types.StoreKey, listeners []types.WriteListener) types.CacheWrap {
return cachekv.NewStore(listenkv.NewStore(s, storeKey, listeners))
}
// Implements KVStore
func (s Store) Get(key []byte) []byte {
res := s.parent.Get(s.key(key))

View File

@ -4,6 +4,8 @@ import (
"crypto/rand"
"testing"
"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
@ -426,3 +428,17 @@ func TestPrefixDBReverseIterator4(t *testing.T) {
checkInvalid(t, itr)
itr.Close()
}
func TestCacheWraps(t *testing.T) {
db := dbm.NewMemDB()
store := dbadapter.Store{DB: db}
cacheWrapper := store.CacheWrap()
require.IsType(t, &cachekv.Store{}, cacheWrapper)
cacheWrappedWithTrace := store.CacheWrapWithTrace(nil, nil)
require.IsType(t, &cachekv.Store{}, cacheWrappedWithTrace)
cacheWrappedWithListeners := store.CacheWrapWithListeners(nil, nil)
require.IsType(t, &cachekv.Store{}, cacheWrappedWithListeners)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
"github.com/cosmos/cosmos-sdk/store/iavl"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/mem"
"github.com/cosmos/cosmos-sdk/store/tracekv"
"github.com/cosmos/cosmos-sdk/store/transient"
@ -58,6 +59,8 @@ type Store struct {
traceContext types.TraceContext
interBlockCache types.MultiStorePersistentCache
listeners map[types.StoreKey][]types.WriteListener
}
var (
@ -77,6 +80,7 @@ func NewStore(db dbm.DB) *Store {
stores: make(map[types.StoreKey]types.CommitKVStore),
keysByName: make(map[string]types.StoreKey),
pruneHeights: make([]int64, 0),
listeners: make(map[types.StoreKey][]types.WriteListener),
}
}
@ -312,6 +316,23 @@ func (rs *Store) TracingEnabled() bool {
return rs.traceWriter != nil
}
// AddListeners adds listeners for a specific KVStore
func (rs *Store) AddListeners(key types.StoreKey, listeners []types.WriteListener) {
if ls, ok := rs.listeners[key]; ok {
rs.listeners[key] = append(ls, listeners...)
} else {
rs.listeners[key] = listeners
}
}
// ListeningEnabled returns if listening is enabled for a specific KVStore
func (rs *Store) ListeningEnabled(key types.StoreKey) bool {
if ls, ok := rs.listeners[key]; ok {
return len(ls) != 0
}
return false
}
// LastCommitID implements Committer/CommitStore.
func (rs *Store) LastCommitID() types.CommitID {
if rs.lastCommitInfo == nil {
@ -404,6 +425,11 @@ func (rs *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.Cac
return rs.CacheWrap()
}
// CacheWrapWithListeners implements the CacheWrapper interface.
func (rs *Store) CacheWrapWithListeners(_ types.StoreKey, _ []types.WriteListener) types.CacheWrap {
return rs.CacheWrap()
}
// CacheMultiStore creates ephemeral branch of the multi-store and returns a CacheMultiStore.
// It implements the MultiStore interface.
func (rs *Store) CacheMultiStore() types.CacheMultiStore {
@ -411,8 +437,7 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore {
for k, v := range rs.stores {
stores[k] = v
}
return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext)
return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners)
}
// CacheMultiStoreWithVersion is analogous to CacheMultiStore except that it
@ -442,7 +467,7 @@ func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStor
}
}
return cachemulti.NewStore(rs.db, cachedStores, rs.keysByName, rs.traceWriter, rs.traceContext), nil
return cachemulti.NewStore(rs.db, cachedStores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners), nil
}
// GetStore returns a mounted Store for a given StoreKey. If the StoreKey does
@ -472,6 +497,9 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore {
if rs.TracingEnabled() {
store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext)
}
if rs.ListeningEnabled(key) {
store = listenkv.NewStore(store, key, rs.listeners[key])
}
return store
}

View File

@ -1,6 +1,7 @@
package rootmulti
import (
"bytes"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
@ -16,9 +17,13 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
dbm "github.com/tendermint/tm-db"
"github.com/cosmos/cosmos-sdk/codec"
codecTypes "github.com/cosmos/cosmos-sdk/codec/types"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"github.com/cosmos/cosmos-sdk/store/iavl"
sdkmaps "github.com/cosmos/cosmos-sdk/store/internal/maps"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)
@ -62,6 +67,14 @@ func TestStoreMount(t *testing.T) {
require.Panics(t, func() { store.MountStoreWithDB(dup1, types.StoreTypeIAVL, db) })
}
func TestCacheMultiStore(t *testing.T) {
var db dbm.DB = dbm.NewMemDB()
ms := newMultiStoreWithMounts(db, types.PruneNothing)
cacheMulti := ms.CacheMultiStore()
require.IsType(t, cachemulti.Store{}, cacheMulti)
}
func TestCacheMultiStoreWithVersion(t *testing.T) {
var db dbm.DB = dbm.NewMemDB()
ms := newMultiStoreWithMounts(db, types.PruneNothing)
@ -672,6 +685,125 @@ func TestSetInitialVersion(t *testing.T) {
require.True(t, iavlStore.VersionExists(5))
}
func TestAddListenersAndListeningEnabled(t *testing.T) {
db := dbm.NewMemDB()
multi := newMultiStoreWithMounts(db, types.PruneNothing)
testKey := types.NewKVStoreKey("listening_test_key")
enabled := multi.ListeningEnabled(testKey)
require.False(t, enabled)
multi.AddListeners(testKey, []types.WriteListener{})
enabled = multi.ListeningEnabled(testKey)
require.False(t, enabled)
mockListener := types.NewStoreKVPairWriteListener(nil, nil)
multi.AddListeners(testKey, []types.WriteListener{mockListener})
wrongTestKey := types.NewKVStoreKey("wrong_listening_test_key")
enabled = multi.ListeningEnabled(wrongTestKey)
require.False(t, enabled)
enabled = multi.ListeningEnabled(testKey)
require.True(t, enabled)
}
var (
interfaceRegistry = codecTypes.NewInterfaceRegistry()
testMarshaller = codec.NewProtoCodec(interfaceRegistry)
testKey1 = []byte{1, 2, 3, 4, 5}
testValue1 = []byte{5, 4, 3, 2, 1}
testKey2 = []byte{2, 3, 4, 5, 6}
testValue2 = []byte{6, 5, 4, 3, 2}
)
func TestGetListenWrappedKVStore(t *testing.T) {
buf := new(bytes.Buffer)
var db dbm.DB = dbm.NewMemDB()
ms := newMultiStoreWithMounts(db, types.PruneNothing)
ms.LoadLatestVersion()
mockListeners := []types.WriteListener{types.NewStoreKVPairWriteListener(buf, testMarshaller)}
ms.AddListeners(testStoreKey1, mockListeners)
ms.AddListeners(testStoreKey2, mockListeners)
listenWrappedStore1 := ms.GetKVStore(testStoreKey1)
require.IsType(t, &listenkv.Store{}, listenWrappedStore1)
listenWrappedStore1.Set(testKey1, testValue1)
expectedOutputKVPairSet1, err := testMarshaller.MarshalBinaryLengthPrefixed(&types.StoreKVPair{
Key: testKey1,
Value: testValue1,
StoreKey: testStoreKey1.Name(),
Delete: false,
})
require.Nil(t, err)
kvPairSet1Bytes := buf.Bytes()
buf.Reset()
require.Equal(t, expectedOutputKVPairSet1, kvPairSet1Bytes)
listenWrappedStore1.Delete(testKey1)
expectedOutputKVPairDelete1, err := testMarshaller.MarshalBinaryLengthPrefixed(&types.StoreKVPair{
Key: testKey1,
Value: nil,
StoreKey: testStoreKey1.Name(),
Delete: true,
})
require.Nil(t, err)
kvPairDelete1Bytes := buf.Bytes()
buf.Reset()
require.Equal(t, expectedOutputKVPairDelete1, kvPairDelete1Bytes)
listenWrappedStore2 := ms.GetKVStore(testStoreKey2)
require.IsType(t, &listenkv.Store{}, listenWrappedStore2)
listenWrappedStore2.Set(testKey2, testValue2)
expectedOutputKVPairSet2, err := testMarshaller.MarshalBinaryLengthPrefixed(&types.StoreKVPair{
Key: testKey2,
Value: testValue2,
StoreKey: testStoreKey2.Name(),
Delete: false,
})
kvPairSet2Bytes := buf.Bytes()
buf.Reset()
require.Equal(t, expectedOutputKVPairSet2, kvPairSet2Bytes)
listenWrappedStore2.Delete(testKey2)
expectedOutputKVPairDelete2, err := testMarshaller.MarshalBinaryLengthPrefixed(&types.StoreKVPair{
Key: testKey2,
Value: nil,
StoreKey: testStoreKey2.Name(),
Delete: true,
})
kvPairDelete2Bytes := buf.Bytes()
buf.Reset()
require.Equal(t, expectedOutputKVPairDelete2, kvPairDelete2Bytes)
unwrappedStore := ms.GetKVStore(testStoreKey3)
require.IsType(t, &iavl.Store{}, unwrappedStore)
unwrappedStore.Set(testKey2, testValue2)
kvPairSet3Bytes := buf.Bytes()
buf.Reset()
require.Equal(t, []byte{}, kvPairSet3Bytes)
unwrappedStore.Delete(testKey2)
kvPairDelete3Bytes := buf.Bytes()
buf.Reset()
require.Equal(t, []byte{}, kvPairDelete3Bytes)
}
func TestCacheWraps(t *testing.T) {
db := dbm.NewMemDB()
multi := newMultiStoreWithMounts(db, types.PruneNothing)
cacheWrapper := multi.CacheWrap()
require.IsType(t, cachemulti.Store{}, cacheWrapper)
cacheWrappedWithTrace := multi.CacheWrapWithTrace(nil, nil)
require.IsType(t, cachemulti.Store{}, cacheWrappedWithTrace)
cacheWrappedWithListeners := multi.CacheWrapWithListeners(nil, nil)
require.IsType(t, cachemulti.Store{}, cacheWrappedWithListeners)
}
func BenchmarkMultistoreSnapshot100K(b *testing.B) {
benchmarkMultistoreSnapshot(b, 10, 10000)
}
@ -748,13 +880,19 @@ func benchmarkMultistoreSnapshotRestore(b *testing.B, stores uint8, storeKeys ui
//-----------------------------------------------------------------------
// utils
var (
testStoreKey1 = types.NewKVStoreKey("store1")
testStoreKey2 = types.NewKVStoreKey("store2")
testStoreKey3 = types.NewKVStoreKey("store3")
)
func newMultiStoreWithMounts(db dbm.DB, pruningOpts types.PruningOptions) *Store {
store := NewStore(db)
store.pruningOpts = pruningOpts
store.MountStoreWithDB(types.NewKVStoreKey("store1"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(types.NewKVStoreKey("store2"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(types.NewKVStoreKey("store3"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(testStoreKey1, types.StoreTypeIAVL, nil)
store.MountStoreWithDB(testStoreKey2, types.StoreTypeIAVL, nil)
store.MountStoreWithDB(testStoreKey3, types.StoreTypeIAVL, nil)
return store
}

View File

@ -164,13 +164,18 @@ func (tkv *Store) GetStoreType() types.StoreType {
// CacheWrap implements the KVStore interface. It panics because a Store
// cannot be branched.
func (tkv *Store) CacheWrap() types.CacheWrap {
panic("cannot CacheWrap a Store")
panic("cannot CacheWrap a TraceKVStore")
}
// CacheWrapWithTrace implements the KVStore interface. It panics as a
// Store cannot be branched.
func (tkv *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap {
panic("cannot CacheWrapWithTrace a Store")
panic("cannot CacheWrapWithTrace a TraceKVStore")
}
// CacheWrapWithListeners implements the CacheWrapper interface.
func (tkv *Store) CacheWrapWithListeners(_ types.StoreKey, _ []types.WriteListener) types.CacheWrap {
panic("cannot CacheWrapWithListeners a TraceKVStore")
}
// writeOperation writes a KVStore operation to the underlying io.Writer as

View File

@ -286,7 +286,13 @@ func TestTraceKVStoreCacheWrap(t *testing.T) {
store := newEmptyTraceKVStore(nil)
require.Panics(t, func() { store.CacheWrap() })
}
func TestTraceKVStoreCacheWrapWithTrace(t *testing.T) {
store := newEmptyTraceKVStore(nil)
require.Panics(t, func() { store.CacheWrapWithTrace(nil, nil) })
}
func TestTraceKVStoreCacheWrapWithListeners(t *testing.T) {
store := newEmptyTraceKVStore(nil)
require.Panics(t, func() { store.CacheWrapWithListeners(nil, nil) })
}

47
store/types/listening.go Normal file
View File

@ -0,0 +1,47 @@
package types
import (
"io"
"github.com/cosmos/cosmos-sdk/codec"
)
// WriteListener interface for streaming data out from a listenkv.Store
type WriteListener interface {
// if value is nil then it was deleted
// storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores
// delete bool indicates if it was a delete; true: delete, false: set
OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error
}
// StoreKVPairWriteListener is used to configure listening to a KVStore by writing out length-prefixed
// protobuf encoded StoreKVPairs to an underlying io.Writer
type StoreKVPairWriteListener struct {
writer io.Writer
marshaller codec.BinaryMarshaler
}
// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and codec.BinaryMarshaler
func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryMarshaler) *StoreKVPairWriteListener {
return &StoreKVPairWriteListener{
writer: w,
marshaller: m,
}
}
// OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs
func (wl *StoreKVPairWriteListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error {
kvPair := new(StoreKVPair)
kvPair.StoreKey = storeKey.Name()
kvPair.Delete = delete
kvPair.Key = key
kvPair.Value = value
by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair)
if err != nil {
return err
}
if _, err := wl.writer.Write(by); err != nil {
return err
}
return nil
}

469
store/types/listening.pb.go Normal file
View File

@ -0,0 +1,469 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: cosmos/base/store/v1beta1/listening.proto
package types
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
// StoreKVPair is a KVStore KVPair used for listening to state changes (Sets and Deletes)
// It optionally includes the StoreKey for the originating KVStore and a Boolean flag to distinguish between Sets and Deletes
type StoreKVPair struct {
StoreKey string `protobuf:"bytes,1,opt,name=store_key,json=storeKey,proto3" json:"store_key,omitempty"`
Delete bool `protobuf:"varint,2,opt,name=delete,proto3" json:"delete,omitempty"`
Key []byte `protobuf:"bytes,3,opt,name=key,proto3" json:"key,omitempty"`
Value []byte `protobuf:"bytes,4,opt,name=value,proto3" json:"value,omitempty"`
}
func (m *StoreKVPair) Reset() { *m = StoreKVPair{} }
func (m *StoreKVPair) String() string { return proto.CompactTextString(m) }
func (*StoreKVPair) ProtoMessage() {}
func (*StoreKVPair) Descriptor() ([]byte, []int) {
return fileDescriptor_a5d350879fe4fecd, []int{0}
}
func (m *StoreKVPair) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *StoreKVPair) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_StoreKVPair.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *StoreKVPair) XXX_Merge(src proto.Message) {
xxx_messageInfo_StoreKVPair.Merge(m, src)
}
func (m *StoreKVPair) XXX_Size() int {
return m.Size()
}
func (m *StoreKVPair) XXX_DiscardUnknown() {
xxx_messageInfo_StoreKVPair.DiscardUnknown(m)
}
var xxx_messageInfo_StoreKVPair proto.InternalMessageInfo
func (m *StoreKVPair) GetStoreKey() string {
if m != nil {
return m.StoreKey
}
return ""
}
func (m *StoreKVPair) GetDelete() bool {
if m != nil {
return m.Delete
}
return false
}
func (m *StoreKVPair) GetKey() []byte {
if m != nil {
return m.Key
}
return nil
}
func (m *StoreKVPair) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func init() {
proto.RegisterType((*StoreKVPair)(nil), "cosmos.base.store.v1beta1.StoreKVPair")
}
func init() {
proto.RegisterFile("cosmos/base/store/v1beta1/listening.proto", fileDescriptor_a5d350879fe4fecd)
}
var fileDescriptor_a5d350879fe4fecd = []byte{
// 224 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x4c, 0xce, 0x2f, 0xce,
0xcd, 0x2f, 0xd6, 0x4f, 0x4a, 0x2c, 0x4e, 0xd5, 0x2f, 0x2e, 0xc9, 0x2f, 0x4a, 0xd5, 0x2f, 0x33,
0x4c, 0x4a, 0x2d, 0x49, 0x34, 0xd4, 0xcf, 0xc9, 0x2c, 0x2e, 0x49, 0xcd, 0xcb, 0xcc, 0x4b, 0xd7,
0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x92, 0x84, 0x28, 0xd5, 0x03, 0x29, 0xd5, 0x03, 0x2b, 0xd5,
0x83, 0x2a, 0x55, 0xca, 0xe2, 0xe2, 0x0e, 0x06, 0x09, 0x78, 0x87, 0x05, 0x24, 0x66, 0x16, 0x09,
0x49, 0x73, 0x71, 0x82, 0xe5, 0xe3, 0xb3, 0x53, 0x2b, 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83,
0x38, 0xc0, 0x02, 0xde, 0xa9, 0x95, 0x42, 0x62, 0x5c, 0x6c, 0x29, 0xa9, 0x39, 0xa9, 0x25, 0xa9,
0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0x1c, 0x41, 0x50, 0x9e, 0x90, 0x00, 0x17, 0x33, 0x48, 0x39, 0xb3,
0x02, 0xa3, 0x06, 0x4f, 0x10, 0x88, 0x29, 0x24, 0xc2, 0xc5, 0x5a, 0x96, 0x98, 0x53, 0x9a, 0x2a,
0xc1, 0x02, 0x16, 0x83, 0x70, 0x9c, 0x9c, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1,
0xc1, 0x23, 0x39, 0xc6, 0x09, 0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, 0x63, 0xb8, 0xf1, 0x58, 0x8e,
0x21, 0x4a, 0x23, 0x3d, 0xb3, 0x24, 0xa3, 0x34, 0x49, 0x2f, 0x39, 0x3f, 0x57, 0x1f, 0xea, 0x2d,
0x08, 0xa5, 0x5b, 0x9c, 0x92, 0x0d, 0xf5, 0x5c, 0x49, 0x65, 0x41, 0x6a, 0x71, 0x12, 0x1b, 0xd8,
0x47, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x2b, 0xe0, 0xb3, 0x51, 0xfe, 0x00, 0x00, 0x00,
}
func (m *StoreKVPair) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *StoreKVPair) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *StoreKVPair) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Value) > 0 {
i -= len(m.Value)
copy(dAtA[i:], m.Value)
i = encodeVarintListening(dAtA, i, uint64(len(m.Value)))
i--
dAtA[i] = 0x22
}
if len(m.Key) > 0 {
i -= len(m.Key)
copy(dAtA[i:], m.Key)
i = encodeVarintListening(dAtA, i, uint64(len(m.Key)))
i--
dAtA[i] = 0x1a
}
if m.Delete {
i--
if m.Delete {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i--
dAtA[i] = 0x10
}
if len(m.StoreKey) > 0 {
i -= len(m.StoreKey)
copy(dAtA[i:], m.StoreKey)
i = encodeVarintListening(dAtA, i, uint64(len(m.StoreKey)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintListening(dAtA []byte, offset int, v uint64) int {
offset -= sovListening(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *StoreKVPair) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.StoreKey)
if l > 0 {
n += 1 + l + sovListening(uint64(l))
}
if m.Delete {
n += 2
}
l = len(m.Key)
if l > 0 {
n += 1 + l + sovListening(uint64(l))
}
l = len(m.Value)
if l > 0 {
n += 1 + l + sovListening(uint64(l))
}
return n
}
func sovListening(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozListening(x uint64) (n int) {
return sovListening(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *StoreKVPair) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowListening
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: StoreKVPair: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: StoreKVPair: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field StoreKey", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowListening
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthListening
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthListening
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.StoreKey = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Delete", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowListening
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.Delete = bool(v != 0)
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowListening
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthListening
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthListening
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...)
if m.Key == nil {
m.Key = []byte{}
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowListening
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthListening
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthListening
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Value = append(m.Value[:0], dAtA[iNdEx:postIndex]...)
if m.Value == nil {
m.Value = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipListening(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthListening
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipListening(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowListening
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowListening
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowListening
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthListening
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupListening
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthListening
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthListening = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowListening = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupListening = fmt.Errorf("proto: unexpected end of group")
)

View File

@ -0,0 +1,65 @@
package types
import (
"bytes"
"testing"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/codec/types"
"github.com/stretchr/testify/require"
)
func TestNewStoreKVPairWriteListener(t *testing.T) {
testWriter := new(bytes.Buffer)
interfaceRegistry := types.NewInterfaceRegistry()
testMarshaller := codec.NewProtoCodec(interfaceRegistry)
wl := NewStoreKVPairWriteListener(testWriter, testMarshaller)
require.IsType(t, &StoreKVPairWriteListener{}, wl)
require.Equal(t, testWriter, wl.writer)
require.Equal(t, testMarshaller, wl.marshaller)
}
func TestOnWrite(t *testing.T) {
testWriter := new(bytes.Buffer)
interfaceRegistry := types.NewInterfaceRegistry()
testMarshaller := codec.NewProtoCodec(interfaceRegistry)
wl := NewStoreKVPairWriteListener(testWriter, testMarshaller)
testStoreKey := NewKVStoreKey("test_key")
testKey := []byte("testing123")
testValue := []byte("testing321")
// test set
err := wl.OnWrite(testStoreKey, testKey, testValue, false)
require.Nil(t, err)
outputBytes := testWriter.Bytes()
outputKVPair := new(StoreKVPair)
expectedOutputKVPair := &StoreKVPair{
Key: testKey,
Value: testValue,
StoreKey: testStoreKey.Name(),
Delete: false,
}
testMarshaller.UnmarshalBinaryLengthPrefixed(outputBytes, outputKVPair)
require.EqualValues(t, expectedOutputKVPair, outputKVPair)
testWriter.Reset()
// test delete
err = wl.OnWrite(testStoreKey, testKey, testValue, true)
require.Nil(t, err)
outputBytes = testWriter.Bytes()
outputKVPair = new(StoreKVPair)
expectedOutputKVPair = &StoreKVPair{
Key: testKey,
Value: testValue,
StoreKey: testStoreKey.Name(),
Delete: true,
}
testMarshaller.UnmarshalBinaryLengthPrefixed(outputBytes, outputKVPair)
require.EqualValues(t, expectedOutputKVPair, outputKVPair)
}

View File

@ -130,6 +130,13 @@ type MultiStore interface {
// implied that the caller should update the context when necessary between
// tracing operations. The modified MultiStore is returned.
SetTracingContext(TraceContext) MultiStore
// ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey
ListeningEnabled(key StoreKey) bool
// AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey
// It appends the listeners to a current set, if one already exists
AddListeners(key StoreKey, listeners []WriteListener)
}
// From MultiStore.CacheMultiStore()....
@ -253,6 +260,9 @@ type CacheWrap interface {
// CacheWrapWithTrace recursively wraps again with tracing enabled.
CacheWrapWithTrace(w io.Writer, tc TraceContext) CacheWrap
// CacheWrapWithListeners recursively wraps again with listening enabled
CacheWrapWithListeners(storeKey StoreKey, listeners []WriteListener) CacheWrap
}
type CacheWrapper interface {
@ -261,6 +271,9 @@ type CacheWrapper interface {
// CacheWrapWithTrace branches a store with tracing enabled.
CacheWrapWithTrace(w io.Writer, tc TraceContext) CacheWrap
// CacheWrapWithListeners recursively wraps again with listening enabled
CacheWrapWithListeners(storeKey StoreKey, listeners []WriteListener) CacheWrap
}
func (cid CommitID) IsZero() bool {