This ADR defines a set of changes to enable listening to state changes of individual KVStores and exposing these data to consumers.
## Context
Currently, KVStore data can be remotely accessed through [Queries](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules/messages-and-queries.md#queries)
which proceed either through Tendermint and the ABCI, or through the gRPC server.
In addition to these request/response queries, it would be beneficial to have a means of listening to state changes as they occur in real time.
## Decision
We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to state changes in underlying KVStores.
We will also introduce the tooling for writing these state changes out to files and configuring this service.
### Listening interface
In a new file, `store/types/listening.go`, we will create a `WriteListener` interface for streaming out state changes from a KVStore.
```go
// WriteListener interface for streaming data out from a listenkv.Store
type WriteListener interface {
// if value is nil then it was deleted
// storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores
// set bool indicates if it was a set; true: set, false: delete
We will introduce a new `StreamingService` interface for exposing `WriteListener` data streams to external consumers.
```go
// Hook interface used to hook into the ABCI message processing of the BaseApp
type Hook interface {
ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) // update the streaming service with the latest BeginBlock messages
ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) // update the steaming service with the latest EndBlock messages
ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) // update the steaming service with the latest DeliverTx messages
}
// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
filePrefix string // optional prefix for each of the generated files
writeDir string // directory to write files into
dstFile *os.File // the current write output file
marshaller codec.BinaryMarshaler // marshaller used for re-marshalling the ABCI messages to write them out to the destination files
stateCache [][]byte // cache the protobuf binary encoded StoreKVPairs in the order they are received
}
```
This streaming service uses a single instance of a simple intermediate `io.Writer` as the underlying `io.Writer` for its single `StoreKVPairWriteListener`,
It collects KV pairs from every KVStore synchronously off of the same channel, caching them in the order they are received, and then writing
them out to a file generated in response to an ABCI message hook. Files are named as outlined above, with optional prefixes to avoid potential naming collisions
across separate instances.
```go
// intermediateWriter is used so that we do not need to update the underlying io.Writer inside the StoreKVPairWriteListener
// everytime we begin writing to a new file
type intermediateWriter struct {
outChan chan <-[]byte
}
// NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel
Writing to a file is the simplest approach for streaming the data out to consumers.
This approach also provides the advantages of being persistent and durable, and the files can be read directly,
or an auxiliary streaming services can read from the files and serve the data over a remote interface.
#### Auxiliary streaming service
We will create a separate standalone process that reads and internally queues the state as it is written out to these files
and serves the data over a gRPC API. This API will allow filtering of requested data, e.g. by block number, block/tx hash, ABCI message type,
whether a DeliverTx message failed or succeeded, etc. In addition to unary RPC endpoints this service will expose `stream` RPC endpoints for realtime subscriptions.
#### File pruning
Without pruning the number of files can grow indefinitely, this may need to be managed by
the developer in an application or even module-specific manner (e.g. log rotation).
The file naming schema facilitates pruning by block number and/or ABCI message.
The gRPC auxiliary streaming service introduced above will include an option to remove the files as it consumes their data.
### Configuration
We will provide detailed documentation on how to configure a `FileStreamingService` from within an app's `AppCreator`,
using the provided `AppOptions` and TOML configuration fields.
#### BaseApp registration
We will add a new method to the `BaseApp` to enable the registration of `StreamingService`s:
```go
// RegisterStreamingService is used to register a streaming service with the BaseApp
// register the streaming service hooks within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context using these hooks
app.hooks = append(app.hooks, s)
}
```
We will also modify the `BeginBlock`, `EndBlock`, and `DeliverTx` methods to pass ABCI requests and responses to any streaming service hooks registered
// register the streaming service with the BaseApp
bApp.RegisterStreamingService(streamingService)
// waitgroup and quit channel for optional shutdown coordination of the streaming service
wg := new(sync.WaitGroup)
quitChan := new(chan struct{}))
// kick off the background streaming service loop
streamingService.Stream(wg, quitChan) // maybe this should be done from inside BaseApp instead?
}
...
return app
}
```
## Consequences
These changes will provide a means of subscribing to KVStore state changes in real time.
### Backwards Compatibility
- This ADR changes the `MultiStore`, `CacheWrap`, and `CacheWrapper` interfaces, implementations supporting the previous version of these interfaces will not support the new ones
### Positive
- Ability to listen to KVStore state changes in real time and expose these events to external consumers
### Negative
- Changes `MultiStore`, `CacheWrap`, and `CacheWrapper` interfaces
### Neutral
- Introduces additional- but optional- complexity to configuring and running a cosmos application
- If an application developer opts to use these features to expose data, they need to be aware of the ramifications/risks of that data exposure as it pertains to the specifics of their application