20 KiB
ADR 038: KVStore state listening
Changelog
- 11/23/2020: Initial draft
Status
Proposed
Abstract
This ADR defines a set of changes to enable listening to state changes of individual KVStores and exposing these data to consumers.
Context
Currently, KVStore data can be remotely accessed through Queries which proceed either through Tendermint and the ABCI, or through the gRPC server. In addition to these request/response queries, it would be beneficial to have a means of listening to state changes as they occur in real time.
Decision
We will modify the MultiStore
interface and its concrete (rootmulti
and cachemulti
) implementations and introduce a new listenkv.Store
to allow listening to state changes in underlying KVStores.
We will introduce a plugin system for configuring and running streaming services that write these state changes and their surrounding ABCI message context to different destinations.
Listening interface
In a new file, store/types/listening.go
, we will create a WriteListener
interface for streaming out state changes from a KVStore.
// WriteListener interface for streaming data out from a listenkv.Store
type WriteListener interface {
// if value is nil then it was deleted
// storeKey indicates the source KVStore, to facilitate using the same WriteListener across separate KVStores
// delete bool indicates if it was a delete; true: delete, false: set
OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error
}
Listener type
We will create a concrete implementation of the WriteListener
interface in store/types/listening.go
, that writes out protobuf
encoded KV pairs to an underlying io.Writer
.
This will include defining a simple protobuf type for the KV pairs. In addition to the key and value fields this message will include the StoreKey for the originating KVStore so that we can write out from separate KVStores to the same stream/file and determine the source of each KV pair.
message StoreKVPair {
optional string store_key = 1; // the store key for the KVStore this pair originates from
required bool set = 2; // true indicates a set operation, false indicates a delete operation
required bytes key = 3;
required bytes value = 4;
}
// StoreKVPairWriteListener is used to configure listening to a KVStore by writing out length-prefixed
// protobuf encoded StoreKVPairs to an underlying io.Writer
type StoreKVPairWriteListener struct {
writer io.Writer
marshaller codec.BinaryCodec
}
// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and codec.BinaryCodec
func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryCodec) *StoreKVPairWriteListener {
return &StoreKVPairWriteListener{
writer: w,
marshaller: m,
}
}
// OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs
func (wl *StoreKVPairWriteListener) OnWrite(storeKey types.StoreKey, key []byte, value []byte, delete bool) error error {
kvPair := new(types.StoreKVPair)
kvPair.StoreKey = storeKey.Name()
kvPair.Delete = Delete
kvPair.Key = key
kvPair.Value = value
by, err := wl.marshaller.MarshalBinaryLengthPrefixed(kvPair)
if err != nil {
return err
}
if _, err := wl.writer.Write(by); err != nil {
return err
}
return nil
}
ListenKVStore
We will create a new Store
type listenkv.Store
that the MultiStore
wraps around a KVStore
to enable state listening.
We can configure the Store
with a set of WriteListener
s which stream the output to specific destinations.
// Store implements the KVStore interface with listening enabled.
// Operations are traced on each core KVStore call and written to any of the
// underlying listeners with the proper key and operation permissions
type Store struct {
parent types.KVStore
listeners []types.WriteListener
parentStoreKey types.StoreKey
}
// NewStore returns a reference to a new traceKVStore given a parent
// KVStore implementation and a buffered writer.
func NewStore(parent types.KVStore, psk types.StoreKey, listeners []types.WriteListener) *Store {
return &Store{parent: parent, listeners: listeners, parentStoreKey: psk}
}
// Set implements the KVStore interface. It traces a write operation and
// delegates the Set call to the parent KVStore.
func (s *Store) Set(key []byte, value []byte) {
types.AssertValidKey(key)
s.parent.Set(key, value)
s.onWrite(false, key, value)
}
// Delete implements the KVStore interface. It traces a write operation and
// delegates the Delete call to the parent KVStore.
func (s *Store) Delete(key []byte) {
s.parent.Delete(key)
s.onWrite(true, key, nil)
}
// onWrite writes a KVStore operation to all the WriteListeners
func (s *Store) onWrite(delete bool, key, value []byte) {
for _, l := range s.listeners {
if err := l.OnWrite(s.parentStoreKey, key, value, delete); err != nil {
// log error
}
}
}
MultiStore interface updates
We will update the MultiStore
interface to allow us to wrap a set of listeners around a specific KVStore
.
Additionally, we will update the CacheWrap
and CacheWrapper
interfaces to enable listening in the caching layer.
type MultiStore interface {
...
// ListeningEnabled returns if listening is enabled for the KVStore belonging the provided StoreKey
ListeningEnabled(key StoreKey) bool
// AddListeners adds WriteListeners for the KVStore belonging to the provided StoreKey
// It appends the listeners to a current set, if one already exists
AddListeners(key StoreKey, listeners []WriteListener)
}
type CacheWrap interface {
...
// CacheWrapWithListeners recursively wraps again with listening enabled
CacheWrapWithListeners(storeKey types.StoreKey, listeners []WriteListener) CacheWrap
}
type CacheWrapper interface {
...
// CacheWrapWithListeners recursively wraps again with listening enabled
CacheWrapWithListeners(storeKey types.StoreKey, listeners []WriteListener) CacheWrap
}
MultiStore implementation updates
We will modify all of the Store
and MultiStore
implementations to satisfy these new interfaces, and adjust the rootmulti
GetKVStore
method
to wrap the returned KVStore
with a listenkv.Store
if listening is turned on for that Store
.
func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore {
store := rs.stores[key].(types.KVStore)
if rs.TracingEnabled() {
store = tracekv.NewStore(store, rs.traceWriter, rs.traceContext)
}
if rs.ListeningEnabled(key) {
store = listenkv.NewStore(key, store, rs.listeners[key])
}
return store
}
We will also adjust the cachemulti
constructor methods and the rootmulti
CacheMultiStore
method to forward the listeners
to and enable listening in the cache layer.
func (rs *Store) CacheMultiStore() types.CacheMultiStore {
stores := make(map[types.StoreKey]types.CacheWrapper)
for k, v := range rs.stores {
stores[k] = v
}
return cachemulti.NewStore(rs.db, stores, rs.keysByName, rs.traceWriter, rs.traceContext, rs.listeners)
}
Exposing the data
Streaming service
We will introduce a new StreamingService
interface for exposing WriteListener
data streams to external consumers.
In addition to streaming state changes as StoreKVPair
s, the interface satisfies an ABCIListener
interface that plugs
into the BaseApp and relays ABCI requests and responses so that the service can group the state changes with the ABCI
requests that affected them and the ABCI responses they affected. The ABCIListener
interface also exposes a
ListenSuccess
method which is (optionally) used by the BaseApp
to await positive acknowledgement of message
receipt from the StreamingService
.
// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
// ListenSuccess returns a chan that is used to acknowledge successful receipt of messages by the external service
// after some configurable delay, `false` is sent to this channel from the service to signify failure of receipt
ListenSuccess() <-chan bool
}
// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
type StreamingService interface {
// Stream is the streaming service loop, awaits kv pairs and writes them to a destination stream or file
Stream(wg *sync.WaitGroup) error
// Listeners returns the streaming service's listeners for the BaseApp to register
Listeners() map[types.StoreKey][]store.WriteListener
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
ABCIListener
// Closer interface
io.Closer
}
BaseApp registration
We will add a new method to the BaseApp
to enable the registration of StreamingService
s:
// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
func (app *BaseApp) SetStreamingService(s StreamingService) {
// add the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
}
// register the StreamingService within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
app.abciListeners = append(app.abciListeners, s)
}
We will add a new method to the BaseApp
that is used to configure a global wait limit for receiving positive acknowledgement
of message receipt from the integrated StreamingService
s.
func (app *BaseApp) SetGlobalWaitLimit(t time.Duration) {
app.globalWaitLimit = t
}
We will also modify the BeginBlock
, EndBlock
, and DeliverTx
methods to pass ABCI requests and responses to any streaming service hooks registered
with the BaseApp
.
func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeginBlock) {
...
// Call the streaming service hooks with the BeginBlock messages
for _, listener := range app.abciListeners {
listener.ListenBeginBlock(app.deliverState.ctx, req, res)
}
return res
}
func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBlock) {
...
// Call the streaming service hooks with the EndBlock messages
for _, listener := range app.abciListeners {
listener.ListenEndBlock(app.deliverState.ctx, req, res)
}
return res
}
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
...
gInfo, result, err := app.runTx(runTxModeDeliver, req.Tx)
if err != nil {
resultStr = "failed"
res := sdkerrors.ResponseDeliverTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)
// If we throw an error, be sure to still call the streaming service's hook
for _, listener := range app.abciListeners {
listener.ListenDeliverTx(app.deliverState.ctx, req, res)
}
return res
}
res := abci.ResponseDeliverTx{
GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints?
GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints?
Log: result.Log,
Data: result.Data,
Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents),
}
// Call the streaming service hooks with the DeliverTx messages
for _, listener := range app.abciListeners {
listener.ListenDeliverTx(app.deliverState.ctx, req, res)
}
return res
}
We will also modify the Commit
method to process success/failure
signals from the integrated StreamingService
s using
the ABCIListener.ListenSuccess()
method. Each StreamingService
has an internal wait threshold after which it sends
false
to the ListenSuccess()
channel, and the BaseApp also imposes a configurable global wait limit.
If the StreamingService
is operating in a "fire-and-forget" mode, ListenSuccess()
should immediately return true
off the channel despite the success status of the service.
func (app *BaseApp) Commit() (res abci.ResponseCommit) {
...
var halt bool
switch {
case app.haltHeight > 0 && uint64(header.Height) >= app.haltHeight:
halt = true
case app.haltTime > 0 && header.Time.Unix() >= int64(app.haltTime):
halt = true
}
// each listener has an internal wait threshold after which it sends `false` to the ListenSuccess() channel
// but the BaseApp also imposes a global wait limit
maxWait := time.NewTicker(app.globalWaitLimit)
for _, lis := range app.abciListeners {
select {
case success := <- lis.ListenSuccess():
if success == false {
halt = true
break
}
case <- maxWait.C:
halt = true
break
}
}
if halt {
// Halt the binary and allow Tendermint to receive the ResponseCommit
// response with the commit ID hash. This will allow the node to successfully
// restart and process blocks assuming the halt configuration has been
// reset or moved to a more distant value.
app.halt()
}
...
}
Plugin system
We propose a plugin architecture to load and run StreamingService
implementations. We will introduce a plugin
loading/preloading system that is used to load, initialize, inject, run, and stop Cosmos-SDK plugins. Each plugin
must implement the following interface:
// Plugin is the base interface for all kinds of cosmos-sdk plugins
// It will be included in interfaces of different Plugins
type Plugin interface {
// Name should return unique name of the plugin
Name() string
// Version returns current version of the plugin
Version() string
// Init is called once when the Plugin is being loaded
// The plugin is passed the AppOptions for configuration
// A plugin will not necessarily have a functional Init
Init(env serverTypes.AppOptions) error
// Closer interface for shutting down the plugin process
io.Closer
}
The Name
method returns a plugin's name.
The Version
method returns a plugin's version.
The Init
method initializes a plugin with the provided AppOptions
.
The io.Closer is used to shut down the plugin service.
For the purposes of this ADR we introduce a single kind of plugin- a state streaming plugin.
We will define a StateStreamingPlugin
interface which extends the above Plugin
interface to support a state streaming service.
// StateStreamingPlugin interface for plugins that load a baseapp.StreamingService onto a baseapp.BaseApp
type StateStreamingPlugin interface {
// Register configures and registers the plugin streaming service with the BaseApp
Register(bApp *baseapp.BaseApp, marshaller codec.BinaryCodec, keys map[string]*types.KVStoreKey) error
// Start starts the background streaming process of the plugin streaming service
Start(wg *sync.WaitGroup)
// Plugin is the base Plugin interface
Plugin
}
The Register
method is used during App construction to register the plugin's streaming service with an App's BaseApp using the BaseApp's SetStreamingService
method.
The Start
method is used during App construction to start the registered plugin streaming services and maintain synchronization with them.
e.g. in NewSimApp
:
func NewSimApp(
logger log.Logger, db dbm.DB, traceStore io.Writer, loadLatest bool, encodingConfig simappparams.EncodingConfig, appOpts servertypes.AppOptions, baseAppOptions ...func(*baseapp.BaseApp),
) *SimApp {
...
// this loads the preloaded and any plugins found in `plugins.dir`
pluginLoader, err := loader.NewPluginLoader(appOpts, logger)
if err != nil {
// handle error
}
// initialize the loaded plugins
if err := pluginLoader.Initialize(); err != nil {
// hanlde error
}
keys := sdk.NewKVStoreKeys(
authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey,
minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey,
govtypes.StoreKey, paramstypes.StoreKey, ibchost.StoreKey, upgradetypes.StoreKey,
evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey,
)
// register the plugin(s) with the BaseApp
if err := pluginLoader.Inject(bApp, appCodec, keys); err != nil {
// handle error
}
// start the plugin services, optionally use wg to synchronize shutdown using io.Closer
wg := new(sync.WaitGroup)
if err := pluginLoader.Start(wg); err != nil {
// handler error
}
...
return app
}
Configuration
The plugin system will be configured within an app's app.toml file.
[plugins]
on = false # turn the plugin system, as a whole, on or off
disabled = ["list", "of", "plugin", "names", "to", "disable"]
dir = "the directory to load non-preloaded plugins from; defaults to cosmos-sdk/plugin/plugins"
There will be three parameters for configuring the plugin system: plugins.on
, plugins.disabled
and plugins.dir
.
plugins.on
is a bool that turns on or off the plugin system at large, plugins.dir
directs the system to a directory
to load plugins from, and plugins.disabled
is a list of names for the plugins we want to disable (useful for disabling preloaded plugins).
Configuration of a given plugin is ultimately specific to the plugin, but we will introduce some standards here:
Plugin TOML configuration should be split into separate sub-tables for each kind of plugin (e.g. plugins.streaming
).
Within these sub-tables, the parameters for a specific plugin of that kind are included in another sub-table (e.g. plugins.streaming.file
).
It is generally expected, but not required, that a streaming service plugin can be configured with a set of store keys
(e.g. plugins.streaming.file.keys
) for the stores it listens to and a mode (e.g. plugins.streaming.file.mode
)
that signifies whether the service operates in a fire-and-forget capacity (faf
) or the BaseApp should require positive
acknowledgement of message receipt by the service (ack
).
e.g.
[plugins]
on = false # turn the plugin system, as a whole, on or off
disabled = ["list", "of", "plugin", "names", "to", "disable"]
dir = "the directory to load non-preloaded plugins from; defaults to "
[plugins.streaming] # a mapping of plugin-specific streaming service parameters, mapped to their plugin name
[plugins.streaming.file] # the specific parameters for the file streaming service plugin
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
writeDir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
mode = "faf" # faf == fire-and-forget; ack == require positive acknowledge of receipt
[plugins.streaming.kafka]
...
[plugins.modules]
...
Encoding and decoding streams
ADR-038 introduces the interfaces and types for streaming state changes out from KVStores, associating this data with their related ABCI requests and responses, and registering a service for consuming this data and streaming it to some destination in a final format. Instead of prescribing a final data format in this ADR, it is left to a specific plugin implementation to define and document this format. We take this approach because flexibility in the final format is necessary to support a wide range of streaming service plugins. For example, the data format for a streaming service that writes the data out to a set of files will differ from the data format that is written to a Kafka topic.
Consequences
These changes will provide a means of subscribing to KVStore state changes in real time.
Backwards Compatibility
- This ADR changes the
MultiStore
,CacheWrap
, andCacheWrapper
interfaces, implementations supporting the previous version of these interfaces will not support the new ones
Positive
- Ability to listen to KVStore state changes in real time and expose these events to external consumers
Negative
- Changes
MultiStore
,CacheWrap
, andCacheWrapper
interfaces
Neutral
- Introduces additional- but optional- complexity to configuring and running a cosmos application
- If an application developer opts to use these features to expose data, they need to be aware of the ramifications/risks of that data exposure as it pertains to the specifics of their application