docs: ADR-038 plugin proposal (#10482)
For #10096 This PR introduces the updates to the ADR-038 spec for the transition to plugin-based streaming services. These updates reflect the implementation approach taken in https://github.com/i-norden/cosmos-sdk/pull/1 (will be rebased, retargeted, and reopened). ### Author Checklist *All items are required. Please add a note to the item if the item is not applicable and please add links to any relevant follow up issues.* I have... - [x] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title - [x] added `!` to the type prefix if API or client breaking change - [x] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting)) - [x] provided a link to the relevant issue or specification - [x] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules) - [x] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing) - [x] added a changelog entry to `CHANGELOG.md` - [x] included comments for [documenting Go code](https://blog.golang.org/godoc) - [x] updated the relevant documentation or specification - [ ] reviewed "Files changed" and left comments if necessary - [ ] confirmed all CI checks have passed ### Reviewers Checklist *All items are required. Please add a note if the item is not applicable and please add your handle next to the items reviewed if you only reviewed selected items.* I have... - [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title - [ ] confirmed `!` in the type prefix if API or client breaking change - [ ] confirmed all author checklist items have been addressed - [ ] reviewed state machine logic - [ ] reviewed API design and naming - [ ] reviewed documentation is accurate - [ ] reviewed tests and test coverage - [ ] manually tested (if applicable)
This commit is contained in:
parent
2dc88af7ba
commit
414fbd3341
|
@ -21,7 +21,7 @@ In addition to these request/response queries, it would be beneficial to have a
|
|||
## Decision
|
||||
|
||||
We will modify the `MultiStore` interface and its concrete (`rootmulti` and `cachemulti`) implementations and introduce a new `listenkv.Store` to allow listening to state changes in underlying KVStores.
|
||||
We will also introduce the tooling for writing these state changes out to files and configuring this service.
|
||||
We will introduce a plugin system for configuring and running streaming services that write these state changes and their surrounding ABCI message context to different destinations.
|
||||
|
||||
### Listening interface
|
||||
|
||||
|
@ -31,9 +31,9 @@ In a new file, `store/types/listening.go`, we will create a `WriteListener` inte
|
|||
// WriteListener interface for streaming data out from a listenkv.Store
|
||||
type WriteListener interface {
|
||||
// if value is nil then it was deleted
|
||||
// storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores
|
||||
// storeKey indicates the source KVStore, to facilitate using the same WriteListener across separate KVStores
|
||||
// delete bool indicates if it was a delete; true: delete, false: set
|
||||
OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error
|
||||
OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -125,7 +125,7 @@ func (s *Store) Delete(key []byte) {
|
|||
s.onWrite(true, key, nil)
|
||||
}
|
||||
|
||||
// onWrite writes a KVStore operation to all of the WriteListeners
|
||||
// onWrite writes a KVStore operation to all the WriteListeners
|
||||
func (s *Store) onWrite(delete bool, key, value []byte) {
|
||||
for _, l := range s.listeners {
|
||||
if err := l.OnWrite(s.parentStoreKey, key, value, delete); err != nil {
|
||||
|
@ -204,9 +204,14 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore {
|
|||
|
||||
### Exposing the data
|
||||
|
||||
#### Streaming service
|
||||
|
||||
We will introduce a new `StreamingService` interface for exposing `WriteListener` data streams to external consumers.
|
||||
In addition to streaming state changes as `StoreKVPair`s, the interface satisfies an `ABCIListener` interface that plugs into the BaseApp
|
||||
and relays ABCI requests and responses so that the service can group the state changes with the ABCI requests that affected them and the ABCI responses they affected.
|
||||
In addition to streaming state changes as `StoreKVPair`s, the interface satisfies an `ABCIListener` interface that plugs
|
||||
into the BaseApp and relays ABCI requests and responses so that the service can group the state changes with the ABCI
|
||||
requests that affected them and the ABCI responses they affected. The `ABCIListener` interface also exposes a
|
||||
`ListenSuccess` method which is (optionally) used by the `BaseApp` to await positive acknowledgement of message
|
||||
receipt from the `StreamingService`.
|
||||
|
||||
```go
|
||||
// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
|
||||
|
@ -217,11 +222,14 @@ type ABCIListener interface {
|
|||
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
|
||||
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
|
||||
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
|
||||
// ListenSuccess returns a chan that is used to acknowledge successful receipt of messages by the external service
|
||||
// after some configurable delay, `false` is sent to this channel from the service to signify failure of receipt
|
||||
ListenSuccess() <-chan bool
|
||||
}
|
||||
|
||||
// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
|
||||
type StreamingService interface {
|
||||
// Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file
|
||||
// Stream is the streaming service loop, awaits kv pairs and writes them to a destination stream or file
|
||||
Stream(wg *sync.WaitGroup) error
|
||||
// Listeners returns the streaming service's listeners for the BaseApp to register
|
||||
Listeners() map[types.StoreKey][]store.WriteListener
|
||||
|
@ -232,200 +240,29 @@ type StreamingService interface {
|
|||
}
|
||||
```
|
||||
|
||||
#### Writing state changes to files
|
||||
|
||||
We will introduce an implementation of `StreamingService` which writes state changes out to files as length-prefixed protobuf encoded `StoreKVPair`s.
|
||||
This service uses the same `StoreKVPairWriteListener` for every KVStore, writing all the KV pairs from every KVStore
|
||||
out to the same files, relying on the `StoreKey` field in the `StoreKVPair` protobuf message to later distinguish the source for each pair.
|
||||
|
||||
Writing to a file is the simplest approach for streaming the data out to consumers.
|
||||
This approach also provides the advantages of being persistent and durable, and the files can be read directly,
|
||||
or an auxiliary streaming services can read from the files and serve the data over a remote interface.
|
||||
|
||||
##### Encoding
|
||||
|
||||
For each pair of `BeginBlock` requests and responses, a file is created and named `block-{N}-begin`, where N is the block number.
|
||||
At the head of this file the length-prefixed protobuf encoded `BeginBlock` request is written.
|
||||
At the tail of this file the length-prefixed protobuf encoded `BeginBlock` response is written.
|
||||
In between these two encoded messages, the state changes that occurred due to the `BeginBlock` request are written chronologically as
|
||||
a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service
|
||||
is configured to listen to.
|
||||
|
||||
For each pair of `DeliverTx` requests and responses, a file is created and named `block-{N}-tx-{M}` where N is the block number and M
|
||||
is the tx number in the block (i.e. 0, 1, 2...).
|
||||
At the head of this file the length-prefixed protobuf encoded `DeliverTx` request is written.
|
||||
At the tail of this file the length-prefixed protobuf encoded `DeliverTx` response is written.
|
||||
In between these two encoded messages, the state changes that occurred due to the `DeliverTx` request are written chronologically as
|
||||
a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service
|
||||
is configured to listen to.
|
||||
|
||||
For each pair of `EndBlock` requests and responses, a file is created and named `block-{N}-end`, where N is the block number.
|
||||
At the head of this file the length-prefixed protobuf encoded `EndBlock` request is written.
|
||||
At the tail of this file the length-prefixed protobuf encoded `EndBlock` response is written.
|
||||
In between these two encoded messages, the state changes that occurred due to the `EndBlock` request are written chronologically as
|
||||
a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service
|
||||
is configured to listen to.
|
||||
|
||||
##### Decoding
|
||||
|
||||
To decode the files written in the above format we read all the bytes from a given file into memory and segment them into proto
|
||||
messages based on the length-prefixing of each message. Once segmented, it is known that the first message is the ABCI request,
|
||||
the last message is the ABCI response, and that every message in between is a `StoreKVPair`. This enables us to decode each segment into
|
||||
the appropriate message type.
|
||||
|
||||
The type of ABCI req/res, the block height, and the transaction index (where relevant) is known
|
||||
from the file name, and the KVStore each `StoreKVPair` originates from is known since the `StoreKey` is included as a field in the proto message.
|
||||
|
||||
##### Implementation example
|
||||
|
||||
```go
|
||||
// FileStreamingService is a concrete implementation of StreamingService that writes state changes out to a file
|
||||
type FileStreamingService struct {
|
||||
listeners map[sdk.StoreKey][]storeTypes.WriteListener // the listeners that will be initialized with BaseApp
|
||||
srcChan <-chan []byte // the channel that all of the WriteListeners write their data out to
|
||||
filePrefix string // optional prefix for each of the generated files
|
||||
writeDir string // directory to write files into
|
||||
dstFile *os.File // the current write output file
|
||||
marshaller codec.BinaryCodec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files
|
||||
stateCache [][]byte // cache the protobuf binary encoded StoreKVPairs in the order they are received
|
||||
}
|
||||
```
|
||||
|
||||
This streaming service uses a single instance of a simple intermediate `io.Writer` as the underlying `io.Writer` for its single `StoreKVPairWriteListener`,
|
||||
It collects KV pairs from every KVStore synchronously off of the same channel, caching them in the order they are received, and then writing
|
||||
them out to a file generated in response to an ABCI message hook. Files are named as outlined above, with optional prefixes to avoid potential naming collisions
|
||||
across separate instances.
|
||||
|
||||
```go
|
||||
// intermediateWriter is used so that we do not need to update the underlying io.Writer inside the StoreKVPairWriteListener
|
||||
// everytime we begin writing to a new file
|
||||
type intermediateWriter struct {
|
||||
outChan chan <-[]byte
|
||||
}
|
||||
|
||||
// NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel
|
||||
func NewIntermediateWriter(outChan chan <-[]byte) *intermediateWriter {
|
||||
return &intermediateWriter{
|
||||
outChan: outChan,
|
||||
}
|
||||
}
|
||||
|
||||
// Write satisfies io.Writer
|
||||
func (iw *intermediateWriter) Write(b []byte) (int, error) {
|
||||
iw.outChan <- b
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
// NewFileStreamingService creates a new FileStreamingService for the provided writeDir, (optional) filePrefix, and storeKeys
|
||||
func NewFileStreamingService(writeDir, filePrefix string, storeKeys []sdk.StoreKey, m codec.BinaryCodec) (*FileStreamingService, error) {
|
||||
listenChan := make(chan []byte, 0)
|
||||
iw := NewIntermediateWriter(listenChan)
|
||||
listener := listen.NewStoreKVPairWriteListener(iw, m)
|
||||
listners := make(map[sdk.StoreKey][]storeTypes.WriteListener, len(storeKeys))
|
||||
// in this case, we are using the same listener for each Store
|
||||
for _, key := range storeKeys {
|
||||
listeners[key] = listener
|
||||
}
|
||||
// check that the writeDir exists and is writeable so that we can catch the error here at initialization if it is not
|
||||
// we don't open a dstFile until we receive our first ABCI message
|
||||
if err := fileutil.IsDirWriteable(writeDir); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &FileStreamingService{
|
||||
listeners: listeners,
|
||||
srcChan: listenChan,
|
||||
filePrefix: filePrefix,
|
||||
writeDir: writeDir,
|
||||
marshaller: m,
|
||||
stateCache: make([][]byte, 0),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Listeners returns the StreamingService's underlying WriteListeners, use for registering them with the BaseApp
|
||||
func (fss *FileStreamingService) Listeners() map[sdk.StoreKey][]storeTypes.WriteListener {
|
||||
return fss.listeners
|
||||
}
|
||||
|
||||
func (fss *FileStreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) {
|
||||
// NOTE: this could either be done synchronously or asynchronously
|
||||
// create a new file with the req info according to naming schema
|
||||
// write req to file
|
||||
// write all state changes cached for this stage to file
|
||||
// reset cache
|
||||
// write res to file
|
||||
// close file
|
||||
}
|
||||
|
||||
func (fss *FileStreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) {
|
||||
// NOTE: this could either be done synchronously or asynchronously
|
||||
// create a new file with the req info according to naming schema
|
||||
// write req to file
|
||||
// write all state changes cached for this stage to file
|
||||
// reset cache
|
||||
// write res to file
|
||||
// close file
|
||||
}
|
||||
|
||||
func (fss *FileStreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) {
|
||||
// NOTE: this could either be done synchronously or asynchronously
|
||||
// create a new file with the req info according to naming schema
|
||||
// NOTE: if the tx failed, handle accordingly
|
||||
// write req to file
|
||||
// write all state changes cached for this stage to file
|
||||
// reset cache
|
||||
// write res to file
|
||||
// close file
|
||||
}
|
||||
|
||||
// Stream spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs and caches them in the order they were received
|
||||
func (fss *FileStreamingService) Stream(wg *sync.WaitGroup, quitChan <-chan struct{}) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-quitChan:
|
||||
return
|
||||
case by := <-fss.srcChan:
|
||||
fss.stateCache = append(fss.stateCache, by)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
```
|
||||
|
||||
#### Auxiliary streaming service
|
||||
|
||||
We will create a separate standalone process that reads and internally queues the state as it is written out to these files
|
||||
and serves the data over a gRPC API. This API will allow filtering of requested data, e.g. by block number, block/tx hash, ABCI message type,
|
||||
whether a DeliverTx message failed or succeeded, etc. In addition to unary RPC endpoints this service will expose `stream` RPC endpoints for realtime subscriptions.
|
||||
|
||||
#### File pruning
|
||||
|
||||
Without pruning the number of files can grow indefinitely, this may need to be managed by
|
||||
the developer in an application or even module-specific manner (e.g. log rotation).
|
||||
The file naming schema facilitates pruning by block number and/or ABCI message.
|
||||
The gRPC auxiliary streaming service introduced above will include an option to remove the files as it consumes their data.
|
||||
|
||||
### Configuration
|
||||
|
||||
We will provide detailed documentation on how to configure a `FileStreamingService` from within an app's `AppCreator`,
|
||||
using the provided `AppOptions` and TOML configuration fields.
|
||||
|
||||
#### BaseApp registration
|
||||
|
||||
We will add a new method to the `BaseApp` to enable the registration of `StreamingService`s:
|
||||
|
||||
```go
|
||||
// SetStreamingService is used to register a streaming service with the BaseApp
|
||||
// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
|
||||
func (app *BaseApp) SetStreamingService(s StreamingService) {
|
||||
// set the listeners for each StoreKey
|
||||
// add the listeners for each StoreKey
|
||||
for key, lis := range s.Listeners() {
|
||||
app.cms.AddListeners(key, lis)
|
||||
}
|
||||
// register the streaming service hooks within the BaseApp
|
||||
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context using these hooks
|
||||
app.hooks = append(app.hooks, s)
|
||||
// register the StreamingService within the BaseApp
|
||||
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
|
||||
app.abciListeners = append(app.abciListeners, s)
|
||||
}
|
||||
```
|
||||
|
||||
We will add a new method to the `BaseApp` that is used to configure a global wait limit for receiving positive acknowledgement
|
||||
of message receipt from the integrated `StreamingService`s.
|
||||
|
||||
```go
|
||||
func (app *BaseApp) SetGlobalWaitLimit(t time.Duration) {
|
||||
app.globalWaitLimit = t
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -438,8 +275,8 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
|
|||
...
|
||||
|
||||
// Call the streaming service hooks with the BeginBlock messages
|
||||
for _, hook := range app.hooks {
|
||||
hook.ListenBeginBlock(app.deliverState.ctx, req, res)
|
||||
for _, listener := range app.abciListeners {
|
||||
listener.ListenBeginBlock(app.deliverState.ctx, req, res)
|
||||
}
|
||||
|
||||
return res
|
||||
|
@ -452,8 +289,8 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
|
|||
...
|
||||
|
||||
// Call the streaming service hooks with the EndBlock messages
|
||||
for _, hook := range app.hooks {
|
||||
hook.ListenEndBlock(app.deliverState.ctx, req, res)
|
||||
for _, listener := range app.abciListeners {
|
||||
listener.ListenEndBlock(app.deliverState.ctx, req, res)
|
||||
}
|
||||
|
||||
return res
|
||||
|
@ -469,9 +306,9 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
|
|||
if err != nil {
|
||||
resultStr = "failed"
|
||||
res := sdkerrors.ResponseDeliverTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)
|
||||
// If we throw and error, be sure to still call the streaming service's hook
|
||||
for _, hook := range app.hooks {
|
||||
hook.ListenDeliverTx(app.deliverState.ctx, req, res)
|
||||
// If we throw an error, be sure to still call the streaming service's hook
|
||||
for _, listener := range app.abciListeners {
|
||||
listener.ListenDeliverTx(app.deliverState.ctx, req, res)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
@ -485,99 +322,116 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
|
|||
}
|
||||
|
||||
// Call the streaming service hooks with the DeliverTx messages
|
||||
for _, hook := range app.hooks {
|
||||
hook.ListenDeliverTx(app.deliverState.ctx, req, res)
|
||||
for _, listener := range app.abciListeners {
|
||||
listener.ListenDeliverTx(app.deliverState.ctx, req, res)
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
```
|
||||
|
||||
#### TOML Configuration
|
||||
|
||||
We will provide standard TOML configuration options for configuring a `FileStreamingService` for specific `Store`s.
|
||||
Note: the actual namespace is TBD.
|
||||
|
||||
```toml
|
||||
[store]
|
||||
streamers = [ # if len(streamers) > 0 we are streaming
|
||||
"file",
|
||||
]
|
||||
|
||||
[streamers]
|
||||
[streamers.file]
|
||||
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
|
||||
write_dir = "path to the write directory"
|
||||
prefix = "optional prefix to prepend to the generated file names"
|
||||
```
|
||||
|
||||
We will also provide a mapping of the TOML `store.streamers` "file" configuration option to a helper functions for constructing the specified
|
||||
streaming service. In the future, as other streaming services are added, their constructors will be added here as well.
|
||||
|
||||
Each configured streamer will receive the
|
||||
We will also modify the `Commit` method to process `success/failure` signals from the integrated `StreamingService`s using
|
||||
the `ABCIListener.ListenSuccess()` method. Each `StreamingService` has an internal wait threshold after which it sends
|
||||
`false` to the `ListenSuccess()` channel, and the BaseApp also imposes a configurable global wait limit.
|
||||
If the `StreamingService` is operating in a "fire-and-forget" mode, `ListenSuccess()` should immediately return `true`
|
||||
off the channel despite the success status of the service.
|
||||
|
||||
```go
|
||||
// ServiceConstructor is used to construct a streaming service
|
||||
type ServiceConstructor func(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error)
|
||||
func (app *BaseApp) Commit() (res abci.ResponseCommit) {
|
||||
|
||||
...
|
||||
|
||||
// ServiceType enum for specifying the type of StreamingService
|
||||
type ServiceType int
|
||||
var halt bool
|
||||
|
||||
const (
|
||||
Unknown ServiceType = iota
|
||||
File
|
||||
// add more in the future
|
||||
)
|
||||
switch {
|
||||
case app.haltHeight > 0 && uint64(header.Height) >= app.haltHeight:
|
||||
halt = true
|
||||
|
||||
// NewStreamingServiceType returns the streaming.ServiceType corresponding to the provided name
|
||||
func NewStreamingServiceType(name string) ServiceType {
|
||||
switch strings.ToLower(name) {
|
||||
case "file", "f":
|
||||
return File
|
||||
default:
|
||||
return Unknown
|
||||
}
|
||||
}
|
||||
case app.haltTime > 0 && header.Time.Unix() >= int64(app.haltTime):
|
||||
halt = true
|
||||
}
|
||||
|
||||
// String returns the string name of a streaming.ServiceType
|
||||
func (sst ServiceType) String() string {
|
||||
switch sst {
|
||||
case File:
|
||||
return "file"
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
// each listener has an internal wait threshold after which it sends `false` to the ListenSuccess() channel
|
||||
// but the BaseApp also imposes a global wait limit
|
||||
maxWait := time.NewTicker(app.globalWaitLimit)
|
||||
for _, lis := range app.abciListeners {
|
||||
select {
|
||||
case success := <- lis.ListenSuccess():
|
||||
if success == false {
|
||||
halt = true
|
||||
break
|
||||
}
|
||||
case <- maxWait.C:
|
||||
halt = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors
|
||||
var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{
|
||||
File: NewFileStreamingService,
|
||||
}
|
||||
if halt {
|
||||
// Halt the binary and allow Tendermint to receive the ResponseCommit
|
||||
// response with the commit ID hash. This will allow the node to successfully
|
||||
// restart and process blocks assuming the halt configuration has been
|
||||
// reset or moved to a more distant value.
|
||||
app.halt()
|
||||
}
|
||||
|
||||
// ServiceTypeFromString returns the streaming.ServiceConstructor corresponding to the provided name
|
||||
func ServiceTypeFromString(name string) (ServiceConstructor, error) {
|
||||
ssType := NewStreamingServiceType(name)
|
||||
if ssType == Unknown {
|
||||
return nil, fmt.Errorf("unrecognized streaming service name %s", name)
|
||||
}
|
||||
if constructor, ok := ServiceConstructorLookupTable[ssType]; ok {
|
||||
return constructor, nil
|
||||
}
|
||||
return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String())
|
||||
}
|
||||
...
|
||||
|
||||
// NewFileStreamingService is the streaming.ServiceConstructor function for creating a FileStreamingService
|
||||
func NewFileStreamingService(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error) {
|
||||
filePrefix := cast.ToString(opts.Get("streamers.file.prefix"))
|
||||
fileDir := cast.ToString(opts.Get("streamers.file.write_dir"))
|
||||
return file.NewStreamingService(fileDir, filePrefix, keys, marshaller)
|
||||
}
|
||||
```
|
||||
|
||||
#### Example configuration
|
||||
#### Plugin system
|
||||
|
||||
As a demonstration, we will implement the state watching features as part of SimApp.
|
||||
For example, the below is a very rudimentary integration of the state listening features into the SimApp `AppCreator` function:
|
||||
We propose a plugin architecture to load and run `StreamingService` implementations. We will introduce a plugin
|
||||
loading/preloading system that is used to load, initialize, inject, run, and stop Cosmos-SDK plugins. Each plugin
|
||||
must implement the following interface:
|
||||
|
||||
```go
|
||||
// Plugin is the base interface for all kinds of cosmos-sdk plugins
|
||||
// It will be included in interfaces of different Plugins
|
||||
type Plugin interface {
|
||||
// Name should return unique name of the plugin
|
||||
Name() string
|
||||
|
||||
// Version returns current version of the plugin
|
||||
Version() string
|
||||
|
||||
// Init is called once when the Plugin is being loaded
|
||||
// The plugin is passed the AppOptions for configuration
|
||||
// A plugin will not necessarily have a functional Init
|
||||
Init(env serverTypes.AppOptions) error
|
||||
|
||||
// Closer interface for shutting down the plugin process
|
||||
io.Closer
|
||||
}
|
||||
```
|
||||
|
||||
The `Name` method returns a plugin's name.
|
||||
The `Version` method returns a plugin's version.
|
||||
The `Init` method initializes a plugin with the provided `AppOptions`.
|
||||
The io.Closer is used to shut down the plugin service.
|
||||
|
||||
For the purposes of this ADR we introduce a single kind of plugin- a state streaming plugin.
|
||||
We will define a `StateStreamingPlugin` interface which extends the above `Plugin` interface to support a state streaming service.
|
||||
|
||||
```go
|
||||
// StateStreamingPlugin interface for plugins that load a baseapp.StreamingService onto a baseapp.BaseApp
|
||||
type StateStreamingPlugin interface {
|
||||
// Register configures and registers the plugin streaming service with the BaseApp
|
||||
Register(bApp *baseapp.BaseApp, marshaller codec.BinaryCodec, keys map[string]*types.KVStoreKey) error
|
||||
|
||||
// Start starts the background streaming process of the plugin streaming service
|
||||
Start(wg *sync.WaitGroup)
|
||||
|
||||
// Plugin is the base Plugin interface
|
||||
Plugin
|
||||
}
|
||||
```
|
||||
|
||||
The `Register` method is used during App construction to register the plugin's streaming service with an App's BaseApp using the BaseApp's `SetStreamingService` method.
|
||||
The `Start` method is used during App construction to start the registered plugin streaming services and maintain synchronization with them.
|
||||
|
||||
e.g. in `NewSimApp`:
|
||||
|
||||
```go
|
||||
func NewSimApp(
|
||||
|
@ -588,6 +442,17 @@ func NewSimApp(
|
|||
|
||||
...
|
||||
|
||||
// this loads the preloaded and any plugins found in `plugins.dir`
|
||||
pluginLoader, err := loader.NewPluginLoader(appOpts, logger)
|
||||
if err != nil {
|
||||
// handle error
|
||||
}
|
||||
|
||||
// initialize the loaded plugins
|
||||
if err := pluginLoader.Initialize(); err != nil {
|
||||
// hanlde error
|
||||
}
|
||||
|
||||
keys := sdk.NewKVStoreKeys(
|
||||
authtypes.StoreKey, banktypes.StoreKey, stakingtypes.StoreKey,
|
||||
minttypes.StoreKey, distrtypes.StoreKey, slashingtypes.StoreKey,
|
||||
|
@ -595,46 +460,16 @@ func NewSimApp(
|
|||
evidencetypes.StoreKey, ibctransfertypes.StoreKey, capabilitytypes.StoreKey,
|
||||
)
|
||||
|
||||
// configure state listening capabilities using AppOptions
|
||||
listeners := cast.ToStringSlice(appOpts.Get("store.streamers"))
|
||||
for _, listenerName := range listeners {
|
||||
// get the store keys allowed to be exposed for this streaming service
|
||||
exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName)))
|
||||
var exposeStoreKeys []sdk.StoreKey
|
||||
if exposeAll(exposeKeyStrs) { // if list contains `*`, expose all StoreKeys
|
||||
exposeStoreKeys = make([]sdk.StoreKey, 0, len(keys))
|
||||
for _, storeKey := range keys {
|
||||
exposeStoreKeys = append(exposeStoreKeys, storeKey)
|
||||
}
|
||||
} else {
|
||||
exposeStoreKeys = make([]sdk.StoreKey, 0, len(exposeKeyStrs))
|
||||
for _, keyStr := range exposeKeyStrs {
|
||||
if storeKey, ok := keys[keyStr]; ok {
|
||||
exposeStoreKeys = append(exposeStoreKeys, storeKey)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(exposeStoreKeys) == 0 { // short circuit if we are not exposing anything
|
||||
continue
|
||||
}
|
||||
// get the constructor for this listener name
|
||||
constructor, err := baseapp.NewStreamingServiceConstructor(listenerName)
|
||||
if err != nil {
|
||||
tmos.Exit(err.Error()) // or continue?
|
||||
}
|
||||
// generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose
|
||||
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
|
||||
if err != nil {
|
||||
tmos.Exit(err.Error())
|
||||
}
|
||||
// register the streaming service with the BaseApp
|
||||
bApp.RegisterStreamingService(streamingService)
|
||||
// waitgroup and quit channel for optional shutdown coordination of the streaming service
|
||||
wg := new(sync.WaitGroup)
|
||||
quitChan := make(chan struct{}))
|
||||
// kick off the background streaming service loop
|
||||
streamingService.Stream(wg, quitChan) // maybe this should be done from inside BaseApp instead?
|
||||
}
|
||||
// register the plugin(s) with the BaseApp
|
||||
if err := pluginLoader.Inject(bApp, appCodec, keys); err != nil {
|
||||
// handle error
|
||||
}
|
||||
|
||||
// start the plugin services, optionally use wg to synchronize shutdown using io.Closer
|
||||
wg := new(sync.WaitGroup)
|
||||
if err := pluginLoader.Start(wg); err != nil {
|
||||
// handler error
|
||||
}
|
||||
|
||||
...
|
||||
|
||||
|
@ -642,6 +477,58 @@ func NewSimApp(
|
|||
}
|
||||
```
|
||||
|
||||
|
||||
#### Configuration
|
||||
|
||||
The plugin system will be configured within an app's app.toml file.
|
||||
|
||||
```toml
|
||||
[plugins]
|
||||
on = false # turn the plugin system, as a whole, on or off
|
||||
disabled = ["list", "of", "plugin", "names", "to", "disable"]
|
||||
dir = "the directory to load non-preloaded plugins from; defaults to cosmos-sdk/plugin/plugins"
|
||||
```
|
||||
|
||||
There will be three parameters for configuring the plugin system: `plugins.on`, `plugins.disabled` and `plugins.dir`.
|
||||
`plugins.on` is a bool that turns on or off the plugin system at large, `plugins.dir` directs the system to a directory
|
||||
to load plugins from, and `plugins.disabled` is a list of names for the plugins we want to disable (useful for disabling preloaded plugins).
|
||||
|
||||
Configuration of a given plugin is ultimately specific to the plugin, but we will introduce some standards here:
|
||||
|
||||
Plugin TOML configuration should be split into separate sub-tables for each kind of plugin (e.g. `plugins.streaming`).
|
||||
Within these sub-tables, the parameters for a specific plugin of that kind are included in another sub-table (e.g. `plugins.streaming.file`).
|
||||
It is generally expected, but not required, that a streaming service plugin can be configured with a set of store keys
|
||||
(e.g. `plugins.streaming.file.keys`) for the stores it listens to and a mode (e.g. `plugins.streaming.file.mode`)
|
||||
that signifies whether the service operates in a fire-and-forget capacity (`faf`) or the BaseApp should require positive
|
||||
acknowledgement of message receipt by the service (`ack`).
|
||||
|
||||
e.g.
|
||||
|
||||
```toml
|
||||
[plugins]
|
||||
on = false # turn the plugin system, as a whole, on or off
|
||||
disabled = ["list", "of", "plugin", "names", "to", "disable"]
|
||||
dir = "the directory to load non-preloaded plugins from; defaults to "
|
||||
[plugins.streaming] # a mapping of plugin-specific streaming service parameters, mapped to their plugin name
|
||||
[plugins.streaming.file] # the specific parameters for the file streaming service plugin
|
||||
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
|
||||
writeDir = "path to the write directory"
|
||||
prefix = "optional prefix to prepend to the generated file names"
|
||||
mode = "faf" # faf == fire-and-forget; ack == require positive acknowledge of receipt
|
||||
[plugins.streaming.kafka]
|
||||
...
|
||||
[plugins.modules]
|
||||
...
|
||||
```
|
||||
|
||||
#### Encoding and decoding streams
|
||||
|
||||
ADR-038 introduces the interfaces and types for streaming state changes out from KVStores, associating this
|
||||
data with their related ABCI requests and responses, and registering a service for consuming this data and streaming it to some destination in a final format.
|
||||
Instead of prescribing a final data format in this ADR, it is left to a specific plugin implementation to define and document this format.
|
||||
We take this approach because flexibility in the final format is necessary to support a wide range of streaming service plugins. For example,
|
||||
the data format for a streaming service that writes the data out to a set of files will differ from the data format that is written to a Kafka topic.
|
||||
|
||||
## Consequences
|
||||
|
||||
These changes will provide a means of subscribing to KVStore state changes in real time.
|
||||
|
|
Loading…
Reference in New Issue