diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index f774ffecf..7e5e49d4f 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -21,7 +21,7 @@ In addition to these request/response queries, it would be beneficial to have a ## Decision We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to state changes in underlying KVStores. -We will also introduce the tooling for writing these state changes out to files and configuring this service. +We will introduce a plugin system for configuring and running streaming services that write these state changes and their surrounding ABCI message context to different destinations. ### Listening interface @@ -31,9 +31,9 @@ In a new file, `store/types/listening.go`, we will create a `WriteListener` inte // WriteListener interface for streaming data out from a listenkv.Store type WriteListener interface { // if value is nil then it was deleted - // storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores + // storeKey indicates the source KVStore, to facilitate using the same WriteListener across separate KVStores // delete bool indicates if it was a delete; true: delete, false: set - OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error + OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error } ``` @@ -125,7 +125,7 @@ func (s *Store) Delete(key []byte) { s.onWrite(true, key, nil) } -// onWrite writes a KVStore operation to all of the WriteListeners +// onWrite writes a KVStore operation to all the WriteListeners func (s *Store) onWrite(delete bool, key, value []byte) { for _, l := range s.listeners { if err := l.OnWrite(s.parentStoreKey, key, value, delete); err != nil { @@ -204,9 +204,14 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore { ### Exposing the data +#### Streaming service + We will introduce a new `StreamingService` interface for exposing `WriteListener` data streams to external consumers. -In addition to streaming state changes as `StoreKVPair`s, the interface satisfies an `ABCIListener` interface that plugs into the BaseApp -and relays ABCI requests and responses so that the service can group the state changes with the ABCI requests that affected them and the ABCI responses they affected. +In addition to streaming state changes as `StoreKVPair`s, the interface satisfies an `ABCIListener` interface that plugs +into the BaseApp and relays ABCI requests and responses so that the service can group the state changes with the ABCI +requests that affected them and the ABCI responses they affected. The `ABCIListener` interface also exposes a +`ListenSuccess` method which is (optionally) used by the `BaseApp` to await positive acknowledgement of message +receipt from the `StreamingService`. ```go // ABCIListener interface used to hook into the ABCI message processing of the BaseApp @@ -217,11 +222,14 @@ type ABCIListener interface { ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error // ListenDeliverTx updates the steaming service with the latest DeliverTx messages ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error + // ListenSuccess returns a chan that is used to acknowledge successful receipt of messages by the external service + // after some configurable delay, `false` is sent to this channel from the service to signify failure of receipt + ListenSuccess() <-chan bool } // StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks type StreamingService interface { - // Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file + // Stream is the streaming service loop, awaits kv pairs and writes them to a destination stream or file Stream(wg *sync.WaitGroup) error // Listeners returns the streaming service's listeners for the BaseApp to register Listeners() map[types.StoreKey][]store.WriteListener @@ -232,200 +240,29 @@ type StreamingService interface { } ``` -#### Writing state changes to files - -We will introduce an implementation of `StreamingService` which writes state changes out to files as length-prefixed protobuf encoded `StoreKVPair`s. -This service uses the same `StoreKVPairWriteListener` for every KVStore, writing all the KV pairs from every KVStore -out to the same files, relying on the `StoreKey` field in the `StoreKVPair` protobuf message to later distinguish the source for each pair. - -Writing to a file is the simplest approach for streaming the data out to consumers. -This approach also provides the advantages of being persistent and durable, and the files can be read directly, -or an auxiliary streaming services can read from the files and serve the data over a remote interface. - -##### Encoding - -For each pair of `BeginBlock` requests and responses, a file is created and named `block-{N}-begin`, where N is the block number. -At the head of this file the length-prefixed protobuf encoded `BeginBlock` request is written. -At the tail of this file the length-prefixed protobuf encoded `BeginBlock` response is written. -In between these two encoded messages, the state changes that occurred due to the `BeginBlock` request are written chronologically as -a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service -is configured to listen to. - -For each pair of `DeliverTx` requests and responses, a file is created and named `block-{N}-tx-{M}` where N is the block number and M -is the tx number in the block (i.e. 0, 1, 2...). -At the head of this file the length-prefixed protobuf encoded `DeliverTx` request is written. -At the tail of this file the length-prefixed protobuf encoded `DeliverTx` response is written. -In between these two encoded messages, the state changes that occurred due to the `DeliverTx` request are written chronologically as -a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service -is configured to listen to. - -For each pair of `EndBlock` requests and responses, a file is created and named `block-{N}-end`, where N is the block number. -At the head of this file the length-prefixed protobuf encoded `EndBlock` request is written. -At the tail of this file the length-prefixed protobuf encoded `EndBlock` response is written. -In between these two encoded messages, the state changes that occurred due to the `EndBlock` request are written chronologically as -a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service -is configured to listen to. - -##### Decoding - -To decode the files written in the above format we read all the bytes from a given file into memory and segment them into proto -messages based on the length-prefixing of each message. Once segmented, it is known that the first message is the ABCI request, -the last message is the ABCI response, and that every message in between is a `StoreKVPair`. This enables us to decode each segment into -the appropriate message type. - -The type of ABCI req/res, the block height, and the transaction index (where relevant) is known -from the file name, and the KVStore each `StoreKVPair` originates from is known since the `StoreKey` is included as a field in the proto message. - -##### Implementation example - -```go -// FileStreamingService is a concrete implementation of StreamingService that writes state changes out to a file -type FileStreamingService struct { - listeners map[sdk.StoreKey][]storeTypes.WriteListener // the listeners that will be initialized with BaseApp - srcChan <-chan []byte // the channel that all of the WriteListeners write their data out to - filePrefix string // optional prefix for each of the generated files - writeDir string // directory to write files into - dstFile *os.File // the current write output file - marshaller codec.BinaryCodec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files - stateCache [][]byte // cache the protobuf binary encoded StoreKVPairs in the order they are received -} -``` - -This streaming service uses a single instance of a simple intermediate `io.Writer` as the underlying `io.Writer` for its single `StoreKVPairWriteListener`, -It collects KV pairs from every KVStore synchronously off of the same channel, caching them in the order they are received, and then writing -them out to a file generated in response to an ABCI message hook. Files are named as outlined above, with optional prefixes to avoid potential naming collisions -across separate instances. - -```go -// intermediateWriter is used so that we do not need to update the underlying io.Writer inside the StoreKVPairWriteListener -// everytime we begin writing to a new file -type intermediateWriter struct { - outChan chan <-[]byte -} - -// NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel -func NewIntermediateWriter(outChan chan <-[]byte) *intermediateWriter { - return &intermediateWriter{ - outChan: outChan, - } -} - -// Write satisfies io.Writer -func (iw *intermediateWriter) Write(b []byte) (int, error) { - iw.outChan <- b - return len(b), nil -} - -// NewFileStreamingService creates a new FileStreamingService for the provided writeDir, (optional) filePrefix, and storeKeys -func NewFileStreamingService(writeDir, filePrefix string, storeKeys []sdk.StoreKey, m codec.BinaryCodec) (*FileStreamingService, error) { - listenChan := make(chan []byte, 0) - iw := NewIntermediateWriter(listenChan) - listener := listen.NewStoreKVPairWriteListener(iw, m) - listners := make(map[sdk.StoreKey][]storeTypes.WriteListener, len(storeKeys)) - // in this case, we are using the same listener for each Store - for _, key := range storeKeys { - listeners[key] = listener - } - // check that the writeDir exists and is writeable so that we can catch the error here at initialization if it is not - // we don't open a dstFile until we receive our first ABCI message - if err := fileutil.IsDirWriteable(writeDir); err != nil { - return nil, err - } - return &FileStreamingService{ - listeners: listeners, - srcChan: listenChan, - filePrefix: filePrefix, - writeDir: writeDir, - marshaller: m, - stateCache: make([][]byte, 0), - }, nil -} - -// Listeners returns the StreamingService's underlying WriteListeners, use for registering them with the BaseApp -func (fss *FileStreamingService) Listeners() map[sdk.StoreKey][]storeTypes.WriteListener { - return fss.listeners -} - -func (fss *FileStreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) { - // NOTE: this could either be done synchronously or asynchronously - // create a new file with the req info according to naming schema - // write req to file - // write all state changes cached for this stage to file - // reset cache - // write res to file - // close file -} - -func (fss *FileStreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) { - // NOTE: this could either be done synchronously or asynchronously - // create a new file with the req info according to naming schema - // write req to file - // write all state changes cached for this stage to file - // reset cache - // write res to file - // close file -} - -func (fss *FileStreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) { - // NOTE: this could either be done synchronously or asynchronously - // create a new file with the req info according to naming schema - // NOTE: if the tx failed, handle accordingly - // write req to file - // write all state changes cached for this stage to file - // reset cache - // write res to file - // close file -} - -// Stream spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs and caches them in the order they were received -func (fss *FileStreamingService) Stream(wg *sync.WaitGroup, quitChan <-chan struct{}) { - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-quitChan: - return - case by := <-fss.srcChan: - fss.stateCache = append(fss.stateCache, by) - } - } - }() -} -``` - -#### Auxiliary streaming service - -We will create a separate standalone process that reads and internally queues the state as it is written out to these files -and serves the data over a gRPC API. This API will allow filtering of requested data, e.g. by block number, block/tx hash, ABCI message type, -whether a DeliverTx message failed or succeeded, etc. In addition to unary RPC endpoints this service will expose `stream` RPC endpoints for realtime subscriptions. - -#### File pruning - -Without pruning the number of files can grow indefinitely, this may need to be managed by -the developer in an application or even module-specific manner (e.g. log rotation). -The file naming schema facilitates pruning by block number and/or ABCI message. -The gRPC auxiliary streaming service introduced above will include an option to remove the files as it consumes their data. - -### Configuration - -We will provide detailed documentation on how to configure a `FileStreamingService` from within an app's `AppCreator`, -using the provided `AppOptions` and TOML configuration fields. - #### BaseApp registration We will add a new method to the `BaseApp` to enable the registration of `StreamingService`s: ```go -// SetStreamingService is used to register a streaming service with the BaseApp +// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore func (app *BaseApp) SetStreamingService(s StreamingService) { - // set the listeners for each StoreKey + // add the listeners for each StoreKey for key, lis := range s.Listeners() { app.cms.AddListeners(key, lis) } - // register the streaming service hooks within the BaseApp - // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context using these hooks - app.hooks = append(app.hooks, s) + // register the StreamingService within the BaseApp + // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context + app.abciListeners = append(app.abciListeners, s) +} +``` + +We will add a new method to the `BaseApp` that is used to configure a global wait limit for receiving positive acknowledgement +of message receipt from the integrated `StreamingService`s. + +```go +func (app *BaseApp) SetGlobalWaitLimit(t time.Duration) { + app.globalWaitLimit = t } ``` @@ -438,8 +275,8 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg ... // Call the streaming service hooks with the BeginBlock messages - for _, hook := range app.hooks { - hook.ListenBeginBlock(app.deliverState.ctx, req, res) + for _, listener := range app.abciListeners { + listener.ListenBeginBlock(app.deliverState.ctx, req, res) } return res @@ -452,8 +289,8 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc ... // Call the streaming service hooks with the EndBlock messages - for _, hook := range app.hooks { - hook.ListenEndBlock(app.deliverState.ctx, req, res) + for _, listener := range app.abciListeners { + listener.ListenEndBlock(app.deliverState.ctx, req, res) } return res @@ -469,9 +306,9 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx if err != nil { resultStr = "failed" res := sdkerrors.ResponseDeliverTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace) - // If we throw and error, be sure to still call the streaming service's hook - for _, hook := range app.hooks { - hook.ListenDeliverTx(app.deliverState.ctx, req, res) + // If we throw an error, be sure to still call the streaming service's hook + for _, listener := range app.abciListeners { + listener.ListenDeliverTx(app.deliverState.ctx, req, res) } return res } @@ -485,99 +322,116 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx } // Call the streaming service hooks with the DeliverTx messages - for _, hook := range app.hooks { - hook.ListenDeliverTx(app.deliverState.ctx, req, res) + for _, listener := range app.abciListeners { + listener.ListenDeliverTx(app.deliverState.ctx, req, res) } return res } ``` -#### TOML Configuration - -We will provide standard TOML configuration options for configuring a `FileStreamingService` for specific `Store`s. -Note: the actual namespace is TBD. - -```toml -[store] - streamers = [ # if len(streamers) > 0 we are streaming - "file", - ] - -[streamers] - [streamers.file] - keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] - write_dir = "path to the write directory" - prefix = "optional prefix to prepend to the generated file names" -``` - -We will also provide a mapping of the TOML `store.streamers` "file" configuration option to a helper functions for constructing the specified -streaming service. In the future, as other streaming services are added, their constructors will be added here as well. - -Each configured streamer will receive the +We will also modify the `Commit` method to process `success/failure` signals from the integrated `StreamingService`s using +the `ABCIListener.ListenSuccess()` method. Each `StreamingService` has an internal wait threshold after which it sends +`false` to the `ListenSuccess()` channel, and the BaseApp also imposes a configurable global wait limit. +If the `StreamingService` is operating in a "fire-and-forget" mode, `ListenSuccess()` should immediately return `true` +off the channel despite the success status of the service. ```go -// ServiceConstructor is used to construct a streaming service -type ServiceConstructor func(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error) +func (app *BaseApp) Commit() (res abci.ResponseCommit) { + + ... -// ServiceType enum for specifying the type of StreamingService -type ServiceType int + var halt bool -const ( - Unknown ServiceType = iota - File - // add more in the future -) + switch { + case app.haltHeight > 0 && uint64(header.Height) >= app.haltHeight: + halt = true -// NewStreamingServiceType returns the streaming.ServiceType corresponding to the provided name -func NewStreamingServiceType(name string) ServiceType { - switch strings.ToLower(name) { - case "file", "f": - return File - default: - return Unknown - } -} + case app.haltTime > 0 && header.Time.Unix() >= int64(app.haltTime): + halt = true + } -// String returns the string name of a streaming.ServiceType -func (sst ServiceType) String() string { - switch sst { - case File: - return "file" - default: - return "" - } -} + // each listener has an internal wait threshold after which it sends `false` to the ListenSuccess() channel + // but the BaseApp also imposes a global wait limit + maxWait := time.NewTicker(app.globalWaitLimit) + for _, lis := range app.abciListeners { + select { + case success := <- lis.ListenSuccess(): + if success == false { + halt = true + break + } + case <- maxWait.C: + halt = true + break + } + } -// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors -var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{ - File: NewFileStreamingService, -} + if halt { + // Halt the binary and allow Tendermint to receive the ResponseCommit + // response with the commit ID hash. This will allow the node to successfully + // restart and process blocks assuming the halt configuration has been + // reset or moved to a more distant value. + app.halt() + } -// ServiceTypeFromString returns the streaming.ServiceConstructor corresponding to the provided name -func ServiceTypeFromString(name string) (ServiceConstructor, error) { - ssType := NewStreamingServiceType(name) - if ssType == Unknown { - return nil, fmt.Errorf("unrecognized streaming service name %s", name) - } - if constructor, ok := ServiceConstructorLookupTable[ssType]; ok { - return constructor, nil - } - return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String()) -} + ... -// NewFileStreamingService is the streaming.ServiceConstructor function for creating a FileStreamingService -func NewFileStreamingService(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error) { - filePrefix := cast.ToString(opts.Get("streamers.file.prefix")) - fileDir := cast.ToString(opts.Get("streamers.file.write_dir")) - return file.NewStreamingService(fileDir, filePrefix, keys, marshaller) } ``` -#### Example configuration +#### Plugin system -As a demonstration, we will implement the state watching features as part of SimApp. -For example, the below is a very rudimentary integration of the state listening features into the SimApp `AppCreator` function: +We propose a plugin architecture to load and run `StreamingService` implementations. We will introduce a plugin +loading/preloading system that is used to load, initialize, inject, run, and stop Cosmos-SDK plugins. Each plugin +must implement the following interface: + +```go +// Plugin is the base interface for all kinds of cosmos-sdk plugins +// It will be included in interfaces of different Plugins +type Plugin interface { + // Name should return unique name of the plugin + Name() string + + // Version returns current version of the plugin + Version() string + + // Init is called once when the Plugin is being loaded + // The plugin is passed the AppOptions for configuration + // A plugin will not necessarily have a functional Init + Init(env serverTypes.AppOptions) error + + // Closer interface for shutting down the plugin process + io.Closer +} +``` + +The `Name` method returns a plugin's name. +The `Version` method returns a plugin's version. +The `Init` method initializes a plugin with the provided `AppOptions`. +The io.Closer is used to shut down the plugin service. + +For the purposes of this ADR we introduce a single kind of plugin- a state streaming plugin. +We will define a `StateStreamingPlugin` interface which extends the above `Plugin` interface to support a state streaming service. + +```go +// StateStreamingPlugin interface for plugins that load a baseapp.StreamingService onto a baseapp.BaseApp +type StateStreamingPlugin interface { + // Register configures and registers the plugin streaming service with the BaseApp + Register(bApp *baseapp.BaseApp, marshaller codec.BinaryCodec, keys map[string]*types.KVStoreKey) error + + // Start starts the background streaming process of the plugin streaming service + Start(wg *sync.WaitGroup) + + // Plugin is the base Plugin interface + Plugin +} +``` + +The `Register` method is used during App construction to register the plugin's streaming service with an App's BaseApp using the BaseApp's `SetStreamingService` method. +The `Start` method is used during App construction to start the registered plugin streaming services and maintain synchronization with them. + +e.g. in `NewSimApp`: ```go func NewSimApp( @@ -588,6 +442,17 @@ func NewSimApp( ... + // this loads the preloaded and any plugins found in `plugins.dir` + pluginLoader, err := loader.NewPluginLoader(appOpts, logger) + if err != nil { + // handle error + } + + // initialize the loaded plugins + if err := pluginLoader.Initialize(); err != nil { + // hanlde error + } + keys := sdk.NewKVStoreKeys( authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey, minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey, @@ -595,46 +460,16 @@ func NewSimApp( evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey, ) - // configure state listening capabilities using AppOptions - listeners := cast.ToStringSlice(appOpts.Get("store.streamers")) - for _, listenerName := range listeners { - // get the store keys allowed to be exposed for this streaming service - exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName))) - var exposeStoreKeys []sdk.StoreKey - if exposeAll(exposeKeyStrs) { // if list contains `*`, expose all StoreKeys - exposeStoreKeys = make([]sdk.StoreKey, 0, len(keys)) - for _, storeKey := range keys { - exposeStoreKeys = append(exposeStoreKeys, storeKey) - } - } else { - exposeStoreKeys = make([]sdk.StoreKey, 0, len(exposeKeyStrs)) - for _, keyStr := range exposeKeyStrs { - if storeKey, ok := keys[keyStr]; ok { - exposeStoreKeys = append(exposeStoreKeys, storeKey) - } - } - } - if len(exposeStoreKeys) == 0 { // short circuit if we are not exposing anything - continue - } - // get the constructor for this listener name - constructor, err := baseapp.NewStreamingServiceConstructor(listenerName) - if err != nil { - tmos.Exit(err.Error()) // or continue? - } - // generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose - streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec) - if err != nil { - tmos.Exit(err.Error()) - } - // register the streaming service with the BaseApp - bApp.RegisterStreamingService(streamingService) - // waitgroup and quit channel for optional shutdown coordination of the streaming service - wg := new(sync.WaitGroup) - quitChan := make(chan struct{})) - // kick off the background streaming service loop - streamingService.Stream(wg, quitChan) // maybe this should be done from inside BaseApp instead? - } + // register the plugin(s) with the BaseApp + if err := pluginLoader.Inject(bApp, appCodec, keys); err != nil { + // handle error + } + + // start the plugin services, optionally use wg to synchronize shutdown using io.Closer + wg := new(sync.WaitGroup) + if err := pluginLoader.Start(wg); err != nil { + // handler error + } ... @@ -642,6 +477,58 @@ func NewSimApp( } ``` + +#### Configuration + +The plugin system will be configured within an app's app.toml file. + +```toml +[plugins] + on = false # turn the plugin system, as a whole, on or off + disabled = ["list", "of", "plugin", "names", "to", "disable"] + dir = "the directory to load non-preloaded plugins from; defaults to cosmos-sdk/plugin/plugins" +``` + +There will be three parameters for configuring the plugin system: `plugins.on`, `plugins.disabled` and `plugins.dir`. +`plugins.on` is a bool that turns on or off the plugin system at large, `plugins.dir` directs the system to a directory +to load plugins from, and `plugins.disabled` is a list of names for the plugins we want to disable (useful for disabling preloaded plugins). + +Configuration of a given plugin is ultimately specific to the plugin, but we will introduce some standards here: + +Plugin TOML configuration should be split into separate sub-tables for each kind of plugin (e.g. `plugins.streaming`). +Within these sub-tables, the parameters for a specific plugin of that kind are included in another sub-table (e.g. `plugins.streaming.file`). +It is generally expected, but not required, that a streaming service plugin can be configured with a set of store keys +(e.g. `plugins.streaming.file.keys`) for the stores it listens to and a mode (e.g. `plugins.streaming.file.mode`) +that signifies whether the service operates in a fire-and-forget capacity (`faf`) or the BaseApp should require positive +acknowledgement of message receipt by the service (`ack`). + +e.g. + +```toml +[plugins] + on = false # turn the plugin system, as a whole, on or off + disabled = ["list", "of", "plugin", "names", "to", "disable"] + dir = "the directory to load non-preloaded plugins from; defaults to " + [plugins.streaming] # a mapping of plugin-specific streaming service parameters, mapped to their plugin name + [plugins.streaming.file] # the specific parameters for the file streaming service plugin + keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] + writeDir = "path to the write directory" + prefix = "optional prefix to prepend to the generated file names" + mode = "faf" # faf == fire-and-forget; ack == require positive acknowledge of receipt + [plugins.streaming.kafka] + ... + [plugins.modules] + ... +``` + +#### Encoding and decoding streams + +ADR-038 introduces the interfaces and types for streaming state changes out from KVStores, associating this +data with their related ABCI requests and responses, and registering a service for consuming this data and streaming it to some destination in a final format. +Instead of prescribing a final data format in this ADR, it is left to a specific plugin implementation to define and document this format. +We take this approach because flexibility in the final format is necessary to support a wide range of streaming service plugins. For example, +the data format for a streaming service that writes the data out to a set of files will differ from the data format that is written to a Kafka topic. + ## Consequences These changes will provide a means of subscribing to KVStore state changes in real time.