feat: ADR-038 Part 2: StreamingService interface, file writing implementation, and configuration (#8664)

<!-- < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < ☺
v                               ✰  Thanks for creating a PR! ✰
v    Before smashing the submit button please review the checkboxes.
v    If a checkbox is n/a - please still include it but + a little note why
☺ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >  -->

## Description

<!-- Add a description of the changes that this PR introduces and the files that
are the most critical to review.
-->

Hello 👋 this PR introduces the second stage of changes to support [ADR-038](https://github.com/cosmos/cosmos-sdk/pull/8012) state listening. This is rebased on top of the [first segment](https://github.com/cosmos/cosmos-sdk/pull/8551), which introduces the low level changes to the MultiStore and KVStore interfaces and implementations, the new WriteListener types, and the new listen.KVStore type.

In this segment we introduce the StreamingService interface, an implementation that writes out to files, and it's integration and configuration at the BaseApp level.

The idea was to have the first segment reviewed independently first but if people think it is easier/more time efficient to review both at the same time then we could start here.

Thanks!



This work is towards satisfying [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/master/docs/architecture/adr-038-state-listening.md)

---

Before we can merge this PR, please make sure that all the following items have been
checked off. If any of the checklist items are not applicable, please leave them but
write a little note why.

- [x] Targeted PR against correct branch (see [CONTRIBUTING.md](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting))
- [x] Linked to Github issue with discussion and accepted design OR link to spec that describes this work.
- [x] Code follows the [module structure standards](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules/structure.md).
- [x] Wrote unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing)
- [x] Updated relevant documentation (`docs/`) or specification (`x/<module>/spec/`)
- [x] Added relevant `godoc` [comments](https://blog.golang.org/godoc-documenting-go-code).
- [x] Added a relevant changelog entry to the `Unreleased` section in `CHANGELOG.md`
- [x] Re-reviewed `Files changed` in the Github PR explorer
- [x] Review `Codecov Report` in the comment section below once CI passes
This commit is contained in:
Ian Norden 2021-10-24 16:37:37 -05:00 committed by GitHub
parent 8d70efa799
commit 1326fa2a7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1222 additions and 86 deletions

View File

@ -46,6 +46,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [\#9776](https://github.com/cosmos/cosmos-sdk/pull/9776) Add flag `staking-bond-denom` to specify the staking bond denomination value when initializing a new chain.
* [\#9533](https://github.com/cosmos/cosmos-sdk/pull/9533) Added a new gRPC method, `DenomOwners`, in `x/bank` to query for all account holders of a specific denomination.
* (bank) [\#9618](https://github.com/cosmos/cosmos-sdk/pull/9618) Update bank.Metadata: add URI and URIHash attributes.
* (store) [\#8664](https://github.com/cosmos/cosmos-sdk/pull/8664) Implementation of ADR-038 file StreamingService
* [\#9837](https://github.com/cosmos/cosmos-sdk/issues/9837) `--generate-only` flag will accept the keyname now.
* [\#10045](https://github.com/cosmos/cosmos-sdk/pull/10045) Revert [#8549](https://github.com/cosmos/cosmos-sdk/pull/8549). Do not route grpc queries through Tendermint.
* [\#10326](https://github.com/cosmos/cosmos-sdk/pull/10326) `x/authz` add query all grants by granter query.

View File

@ -195,6 +195,14 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
}
// set the signed validators for addition to context in deliverTx
app.voteInfos = req.LastCommitInfo.GetVotes()
// call the hooks with the BeginBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
}
}
return res
}
@ -215,6 +223,13 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
res.ConsensusParamUpdates = cp
}
// call the streaming service hooks with the EndBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
}
}
return res
}
@ -262,15 +277,25 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")
var res abci.ResponseDeliverTx
defer func() {
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
}
}
}()
tx, err := app.txDecoder(req.Tx)
if err != nil {
return sdkerrors.ResponseDeliverTx(err, 0, 0, app.trace)
res = sdkerrors.ResponseDeliverTx(err, 0, 0, app.trace)
return res
}
ctx := app.getContextForTx(runTxModeDeliver, req.Tx)
res, err := app.txHandler.DeliverTx(ctx, tx, req)
res, err = app.txHandler.DeliverTx(ctx, tx, req)
if err != nil {
return sdkerrors.ResponseDeliverTx(err, uint64(res.GasUsed), uint64(res.GasWanted), app.trace)
res = sdkerrors.ResponseDeliverTx(err, uint64(res.GasUsed), uint64(res.GasWanted), app.trace)
return res
}
return res

View File

@ -125,6 +125,10 @@ type BaseApp struct { // nolint: maligned
// indexEvents defines the set of events in the form {eventType}.{attributeKey},
// which informs Tendermint what to index. If empty, all events will be indexed.
indexEvents map[string]struct{}
// abciListeners for hooking into the ABCI message processing of the BaseApp
// and exposing the requests and responses to external consumers
abciListeners []ABCIListener
}
// NewBaseApp returns a reference to an initialized BaseApp. It accepts a

View File

@ -229,3 +229,14 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) {
app.interfaceRegistry = registry
app.grpcQueryRouter.SetInterfaceRegistry(registry)
}
// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
func (app *BaseApp) SetStreamingService(s StreamingService) {
// add the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
}
// register the StreamingService within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
app.abciListeners = append(app.abciListeners, s)
}

33
baseapp/streaming.go Normal file
View File

@ -0,0 +1,33 @@
package baseapp
import (
"io"
"sync"
abci "github.com/tendermint/tendermint/abci/types"
store "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/types"
)
// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
}
// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
type StreamingService interface {
// Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file
Stream(wg *sync.WaitGroup) error
// Listeners returns the streaming service's listeners for the BaseApp to register
Listeners() map[store.StoreKey][]store.WriteListener
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
ABCIListener
// Closer interface
io.Closer
}

View File

@ -32,7 +32,7 @@ In a new file, `store/types/listening.go`, we will create a `WriteListener` inte
type WriteListener interface {
// if value is nil then it was deleted
// storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores
// set bool indicates if it was a set; true: set, false: delete
// delete bool indicates if it was a delete; true: delete, false: set
OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error
}
```
@ -205,20 +205,30 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore {
### Exposing the data
We will introduce a new `StreamingService` interface for exposing `WriteListener` data streams to external consumers.
In addition to streaming state changes as `StoreKVPair`s, the interface satisfies an `ABCIListener` interface that plugs into the BaseApp
and relays ABCI requests and responses so that the service can group the state changes with the ABCI requests that affected them and the ABCI responses they affected.
```go
// Hook interface used to hook into the ABCI message processing of the BaseApp
type Hook interface {
ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) // update the streaming service with the latest BeginBlock messages
ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) // update the steaming service with the latest EndBlock messages
ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) // update the steaming service with the latest DeliverTx messages
// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
}
// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
type StreamingService interface {
Stream(wg *sync.WaitGroup, quitChan <-chan struct{}) // streaming service loop, awaits kv pairs and writes them to some destination stream or file
Listeners() map[sdk.StoreKey][]storeTypes.WriteListener // returns the streaming service's listeners for the BaseApp to register
Hook
// Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file
Stream(wg *sync.WaitGroup) error
// Listeners returns the streaming service's listeners for the BaseApp to register
Listeners() map[types.StoreKey][]store.WriteListener
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
ABCIListener
// Closer interface
io.Closer
}
```
@ -228,18 +238,45 @@ We will introduce an implementation of `StreamingService` which writes state cha
This service uses the same `StoreKVPairWriteListener` for every KVStore, writing all the KV pairs from every KVStore
out to the same files, relying on the `StoreKey` field in the `StoreKVPair` protobuf message to later distinguish the source for each pair.
The file naming schema is as such:
Writing to a file is the simplest approach for streaming the data out to consumers.
This approach also provides the advantages of being persistent and durable, and the files can be read directly,
or an auxiliary streaming services can read from the files and serve the data over a remote interface.
* After every `BeginBlock` request a new file is created with the name `block-{N}-begin`, where N is the block number. All
subsequent state changes are written out to this file until the first `DeliverTx` request is received. At the head of these files,
the length-prefixed protobuf encoded `BeginBlock` request is written, and the response is written at the tail.
* After every `DeliverTx` request a new file is created with the name `block-{N}-tx-{M}` where N is the block number and M
is the tx number in the block (i.e. 0, 1, 2...). All subsequent state changes are written out to this file until the next
`DeliverTx` request is received or an `EndBlock` request is received. At the head of these files, the length-prefixed protobuf
encoded `DeliverTx` request is written, and the response is written at the tail.
* After every `EndBlock` request a new file is created with the name `block-{N}-end`, where N is the block number. All
subsequent state changes are written out to this file until the next `BeginBlock` request is received. At the head of these files,
the length-prefixed protobuf encoded `EndBlock` request is written, and the response is written at the tail.
##### Encoding
For each pair of `BeginBlock` requests and responses, a file is created and named `block-{N}-begin`, where N is the block number.
At the head of this file the length-prefixed protobuf encoded `BeginBlock` request is written.
At the tail of this file the length-prefixed protobuf encoded `BeginBlock` response is written.
In between these two encoded messages, the state changes that occurred due to the `BeginBlock` request are written chronologically as
a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service
is configured to listen to.
For each pair of `DeliverTx` requests and responses, a file is created and named `block-{N}-tx-{M}` where N is the block number and M
is the tx number in the block (i.e. 0, 1, 2...).
At the head of this file the length-prefixed protobuf encoded `DeliverTx` request is written.
At the tail of this file the length-prefixed protobuf encoded `DeliverTx` response is written.
In between these two encoded messages, the state changes that occurred due to the `DeliverTx` request are written chronologically as
a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service
is configured to listen to.
For each pair of `EndBlock` requests and responses, a file is created and named `block-{N}-end`, where N is the block number.
At the head of this file the length-prefixed protobuf encoded `EndBlock` request is written.
At the tail of this file the length-prefixed protobuf encoded `EndBlock` response is written.
In between these two encoded messages, the state changes that occurred due to the `EndBlock` request are written chronologically as
a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service
is configured to listen to.
##### Decoding
To decode the files written in the above format we read all the bytes from a given file into memory and segment them into proto
messages based on the length-prefixing of each message. Once segmented, it is known that the first message is the ABCI request,
the last message is the ABCI response, and that every message in between is a `StoreKVPair`. This enables us to decode each segment into
the appropriate message type.
The type of ABCI req/res, the block height, and the transaction index (where relevant) is known
from the file name, and the KVStore each `StoreKVPair` originates from is known since the `StoreKey` is included as a field in the proto message.
##### Implementation example
```go
// FileStreamingService is a concrete implementation of StreamingService that writes state changes out to a file
@ -357,10 +394,6 @@ func (fss *FileStreamingService) Stream(wg *sync.WaitGroup, quitChan <-chan stru
}
```
Writing to a file is the simplest approach for streaming the data out to consumers.
This approach also provides the advantages of being persistent and durable, and the files can be read directly,
or an auxiliary streaming services can read from the files and serve the data over a remote interface.
#### Auxiliary streaming service
We will create a separate standalone process that reads and internally queues the state as it is written out to these files
@ -384,8 +417,8 @@ using the provided `AppOptions` and TOML configuration fields.
We will add a new method to the `BaseApp` to enable the registration of `StreamingService`s:
```go
// RegisterStreamingService is used to register a streaming service with the BaseApp
func (app *BaseApp) RegisterHooks(s StreamingService) {
// SetStreamingService is used to register a streaming service with the BaseApp
func (app *BaseApp) SetStreamingService(s StreamingService) {
// set the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
@ -474,28 +507,30 @@ Note: the actual namespace is TBD.
[streamers]
[streamers.file]
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
writeDir = "path to the write directory"
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
```
We will also provide a mapping of the TOML `store.streamers` "file" configuration option to a helper functions for constructing the specified
streaming service. In the future, as other streaming services are added, their constructors will be added here as well.
```go
// StreamingServiceConstructor is used to construct a streaming service
type StreamingServiceConstructor func(opts servertypes.AppOptions, keys []sdk.StoreKey) (StreamingService, error)
Each configured streamer will receive the
// StreamingServiceType enum for specifying the type of StreamingService
type StreamingServiceType int
```go
// ServiceConstructor is used to construct a streaming service
type ServiceConstructor func(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error)
// ServiceType enum for specifying the type of StreamingService
type ServiceType int
const (
Unknown StreamingServiceType = iota
Unknown ServiceType = iota
File
// add more in the future
)
// NewStreamingServiceType returns the StreamingServiceType corresponding to the provided name
func NewStreamingServiceType(name string) StreamingServiceType {
// NewStreamingServiceType returns the streaming.ServiceType corresponding to the provided name
func NewStreamingServiceType(name string) ServiceType {
switch strings.ToLower(name) {
case "file", "f":
return File
@ -504,8 +539,8 @@ func NewStreamingServiceType(name string) StreamingServiceType {
}
}
// String returns the string name of a StreamingServiceType
func (sst StreamingServiceType) String() string {
// String returns the string name of a streaming.ServiceType
func (sst ServiceType) String() string {
switch sst {
case File:
return "file"
@ -514,28 +549,28 @@ func (sst StreamingServiceType) String() string {
}
}
// StreamingServiceConstructorLookupTable is a mapping of StreamingServiceTypes to StreamingServiceConstructors
var StreamingServiceConstructorLookupTable = map[StreamingServiceType]StreamingServiceConstructor{
File: FileStreamingConstructor,
// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors
var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{
File: NewFileStreamingService,
}
// NewStreamingServiceConstructor returns the StreamingServiceConstructor corresponding to the provided name
func NewStreamingServiceConstructor(name string) (StreamingServiceConstructor, error) {
// ServiceTypeFromString returns the streaming.ServiceConstructor corresponding to the provided name
func ServiceTypeFromString(name string) (ServiceConstructor, error) {
ssType := NewStreamingServiceType(name)
if ssType == Unknown {
return nil, fmt.Errorf("unrecognized streaming service name %s", name)
}
if constructor, ok := StreamingServiceConstructorLookupTable[ssType]; ok {
if constructor, ok := ServiceConstructorLookupTable[ssType]; ok {
return constructor, nil
}
return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String())
}
// FileStreamingConstructor is the StreamingServiceConstructor function for creating a FileStreamingService
func FileStreamingConstructor(opts servertypes.AppOptions, keys []sdk.StoreKey) (StreamingService, error) {
// NewFileStreamingService is the streaming.ServiceConstructor function for creating a FileStreamingService
func NewFileStreamingService(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error) {
filePrefix := cast.ToString(opts.Get("streamers.file.prefix"))
fileDir := cast.ToString(opts.Get("streamers.file.writeDir"))
return streaming.NewFileStreamingService(fileDir, filePrefix, keys), nil
fileDir := cast.ToString(opts.Get("streamers.file.write_dir"))
return file.NewStreamingService(fileDir, filePrefix, keys, marshaller)
}
```
@ -563,21 +598,32 @@ func NewSimApp(
// configure state listening capabilities using AppOptions
listeners := cast.ToStringSlice(appOpts.Get("store.streamers"))
for _, listenerName := range listeners {
// get the store keys allowed to be exposed for this streaming service/state listeners
exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", listenerName))
exposeStoreKeys = make([]storeTypes.StoreKey, 0, len(exposeKeyStrs))
// get the store keys allowed to be exposed for this streaming service
exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName)))
var exposeStoreKeys []sdk.StoreKey
if exposeAll(exposeKeyStrs) { // if list contains `*`, expose all StoreKeys
exposeStoreKeys = make([]sdk.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
} else {
exposeStoreKeys = make([]sdk.StoreKey, 0, len(exposeKeyStrs))
for _, keyStr := range exposeKeyStrs {
if storeKey, ok := keys[keyStr]; ok {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
}
}
if len(exposeStoreKeys) == 0 { // short circuit if we are not exposing anything
continue
}
// get the constructor for this listener name
constructor, err := baseapp.NewStreamingServiceConstructor(listenerName)
if err != nil {
tmos.Exit(err.Error()) // or continue?
}
// generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose
streamingService, err := constructor(appOpts, exposeStoreKeys)
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
if err != nil {
tmos.Exit(err.Error())
}
@ -585,7 +631,7 @@ func NewSimApp(
bApp.RegisterStreamingService(streamingService)
// waitgroup and quit channel for optional shutdown coordination of the streaming service
wg := new(sync.WaitGroup)
quitChan := new(chan struct{}))
quitChan := make(chan struct{}))
// kick off the background streaming service loop
streamingService.Stream(wg, quitChan) // maybe this should be done from inside BaseApp instead?
}

View File

@ -222,6 +222,16 @@ When `Store.{Get, Set}()` is called, the store forwards the call to its parent,
When `Store.Iterator()` is called, it does not simply prefix the `Store.prefix`, since it does not work as intended. In that case, some of the elements are traversed even they are not starting with the prefix.
### `ListenKv` Store
`listenkv.Store` is a wrapper `KVStore` which provides state listening capabilities over the underlying `KVStore`.
It is applied automatically by the Cosmos SDK on any `KVStore` whose `StoreKey` is specified during state streaming configuration.
Additional information about state streaming configuration can be found in the [store/streaming/README.md](../../store/streaming/README.md).
+++ https://github.com/cosmos/cosmos-sdk/blob/v0.44.1/store/listenkv/store.go#L11-L18
When `KVStore.Set` or `KVStore.Delete` methods are called, `listenkv.Store` automatically writes the operations to the set of `Store.listeners`.
## New Store package (`store/v2`)
The SDK is in the process of transitioning to use the types listed here as the default interface for state storage. At the time of writing, these cannot be used within an application and are not directly compatible with the `CommitMultiStore` and related types.

View File

@ -25,6 +25,7 @@ import (
"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
simappparams "github.com/cosmos/cosmos-sdk/simapp/params"
"github.com/cosmos/cosmos-sdk/store/streaming"
"github.com/cosmos/cosmos-sdk/testutil/testdata"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/module"
@ -215,6 +216,12 @@ func NewSimApp(
// not include this key.
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey")
// configure state listening capabilities using AppOptions
// we are doing nothing with the returned streamingServices and waitGroup in this case
if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, keys); err != nil {
tmos.Exit(err.Error())
}
app := &SimApp{
BaseApp: bApp,
legacyAmino: legacyAmino,

View File

@ -8,6 +8,8 @@ import (
"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
"github.com/cosmos/cosmos-sdk/store/listenkv"
"github.com/cosmos/cosmos-sdk/store/tracekv"
"github.com/cosmos/cosmos-sdk/store/types"
)
@ -49,17 +51,13 @@ func NewFromKVStore(
}
for key, store := range stores {
var cacheWrapped types.CacheWrap
if cms.TracingEnabled() {
cacheWrapped = store.CacheWrapWithTrace(cms.traceWriter, cms.traceContext)
} else {
cacheWrapped = store.CacheWrap()
store = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, cms.traceContext)
}
if cms.ListeningEnabled(key) {
cms.stores[key] = cacheWrapped.CacheWrapWithListeners(key, cms.listeners[key])
} else {
cms.stores[key] = cacheWrapped
store = listenkv.NewStore(store.(types.KVStore), key, listeners[key])
}
cms.stores[key] = cachekv.NewStore(store.(types.KVStore))
}
return cms

67
store/streaming/README.md Normal file
View File

@ -0,0 +1,67 @@
# State Streaming Service
This package contains the constructors for the `StreamingService`s used to write state changes out from individual KVStores to a
file or stream, as described in [ADR-038](../../docs/architecture/adr-038-state-listening.md) and defined in [types/streaming.go](../../baseapp/streaming.go).
The child directories contain the implementations for specific output destinations.
Currently, a `StreamingService` implementation that writes state changes out to files is supported, in the future support for additional
output destinations can be added.
The `StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file:
```toml
[store]
streamers = [ # if len(streamers) > 0 we are streaming
"file", # name of the streaming service, used by constructor
]
[streamers]
[streamers.file]
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
```
`store.streamers` contains a list of the names of the `StreamingService` implementations to employ which are used by `ServiceTypeFromString`
to return the `ServiceConstructor` for that particular implementation:
```go
listeners := cast.ToStringSlice(appOpts.Get("store.streamers"))
for _, listenerName := range listeners {
constructor, err := ServiceTypeFromString(listenerName)
if err != nil {
// handle error
}
}
```
`streamers` contains a mapping of the specific `StreamingService` implementation name to the configuration parameters for that specific service.
`streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service and is required by every type of `StreamingService`.
In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off.
Additional configuration parameters are optional and specific to the implementation.
In the case of the file streaming service, `streamers.file.write_dir` contains the path to the
directory to write the files to, and `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions
with other App `StreamingService` output files.
The `ServiceConstructor` accepts `AppOptions`, the store keys collected using `streamers.x.keys`, a `BinaryMarshaller` and
returns a `StreamingService` implementation. The `AppOptions` are passed in to provide access to any implementation specific configuration options,
e.g. in the case of the file streaming service the `streamers.file.write_dir` and `streamers.file.prefix`.
```go
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
if err != nil {
// handler error
}
```
The returned `StreamingService` is loaded into the BaseApp using the BaseApp's `SetStreamingService` method.
The `Stream` method is called on the service to begin the streaming process. Depending on the implementation this process
may be synchronous or asynchronous with the message processing of the state machine.
```go
bApp.SetStreamingService(streamingService)
wg := new(sync.WaitGroup)
quitChan := make(chan struct{})
streamingService.Stream(wg, quitChan)
```

View File

@ -0,0 +1,137 @@
package streaming
import (
"fmt"
"strings"
"sync"
"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/codec"
serverTypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
"github.com/cosmos/cosmos-sdk/store/types"
"github.com/spf13/cast"
)
// ServiceConstructor is used to construct a streaming service
type ServiceConstructor func(opts serverTypes.AppOptions, keys []types.StoreKey, marshaller codec.BinaryCodec) (baseapp.StreamingService, error)
// ServiceType enum for specifying the type of StreamingService
type ServiceType int
const (
Unknown ServiceType = iota
File
// add more in the future
)
// ServiceTypeFromString returns the streaming.ServiceType corresponding to the provided name
func ServiceTypeFromString(name string) ServiceType {
switch strings.ToLower(name) {
case "file", "f":
return File
default:
return Unknown
}
}
// String returns the string name of a streaming.ServiceType
func (sst ServiceType) String() string {
switch sst {
case File:
return "file"
default:
return "unknown"
}
}
// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors
var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{
File: NewFileStreamingService,
}
// NewServiceConstructor returns the streaming.ServiceConstructor corresponding to the provided name
func NewServiceConstructor(name string) (ServiceConstructor, error) {
ssType := ServiceTypeFromString(name)
if ssType == Unknown {
return nil, fmt.Errorf("unrecognized streaming service name %s", name)
}
if constructor, ok := ServiceConstructorLookupTable[ssType]; ok && constructor != nil {
return constructor, nil
}
return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String())
}
// NewFileStreamingService is the streaming.ServiceConstructor function for creating a FileStreamingService
func NewFileStreamingService(opts serverTypes.AppOptions, keys []types.StoreKey, marshaller codec.BinaryCodec) (baseapp.StreamingService, error) {
filePrefix := cast.ToString(opts.Get("streamers.file.prefix"))
fileDir := cast.ToString(opts.Get("streamers.file.write_dir"))
return file.NewStreamingService(fileDir, filePrefix, keys, marshaller)
}
// LoadStreamingServices is a function for loading StreamingServices onto the BaseApp using the provided AppOptions, codec, and keys
// It returns the WaitGroup and quit channel used to synchronize with the streaming services and any error that occurs during the setup
func LoadStreamingServices(bApp *baseapp.BaseApp, appOpts serverTypes.AppOptions, appCodec codec.BinaryCodec, keys map[string]*types.KVStoreKey) ([]baseapp.StreamingService, *sync.WaitGroup, error) {
// waitgroup and quit channel for optional shutdown coordination of the streaming service(s)
wg := new(sync.WaitGroup)
// configure state listening capabilities using AppOptions
streamers := cast.ToStringSlice(appOpts.Get("store.streamers"))
activeStreamers := make([]baseapp.StreamingService, 0, len(streamers))
for _, streamerName := range streamers {
// get the store keys allowed to be exposed for this streaming service
exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName)))
var exposeStoreKeys []types.StoreKey
if exposeAll(exposeKeyStrs) { // if list contains `*`, expose all StoreKeys
exposeStoreKeys = make([]types.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
} else {
exposeStoreKeys = make([]types.StoreKey, 0, len(exposeKeyStrs))
for _, keyStr := range exposeKeyStrs {
if storeKey, ok := keys[keyStr]; ok {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
}
}
if len(exposeStoreKeys) == 0 { // short circuit if we are not exposing anything
continue
}
// get the constructor for this streamer name
constructor, err := NewServiceConstructor(streamerName)
if err != nil {
// close any services we may have already spun up before hitting the error on this one
for _, activeStreamer := range activeStreamers {
activeStreamer.Close()
}
return nil, nil, err
}
// generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
if err != nil {
// close any services we may have already spun up before hitting the error on this one
for _, activeStreamer := range activeStreamers {
activeStreamer.Close()
}
return nil, nil, err
}
// register the streaming service with the BaseApp
bApp.SetStreamingService(streamingService)
// kick off the background streaming service loop
streamingService.Stream(wg)
// add to the list of active streamers
activeStreamers = append(activeStreamers, streamingService)
}
// if there are no active streamers, activeStreamers is empty (len == 0) and the waitGroup is not waiting on anything
return activeStreamers, wg, nil
}
func exposeAll(list []string) bool {
for _, ele := range list {
if ele == "*" {
return true
}
}
return false
}

View File

@ -0,0 +1,43 @@
package streaming
import (
"testing"
"github.com/cosmos/cosmos-sdk/codec"
codecTypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"
)
type fakeOptions struct{}
func (f *fakeOptions) Get(string) interface{} { return nil }
var (
mockOptions = new(fakeOptions)
mockKeys = []types.StoreKey{sdk.NewKVStoreKey("mockKey1"), sdk.NewKVStoreKey("mockKey2")}
interfaceRegistry = codecTypes.NewInterfaceRegistry()
testMarshaller = codec.NewProtoCodec(interfaceRegistry)
)
func TestStreamingServiceConstructor(t *testing.T) {
_, err := NewServiceConstructor("unexpectedName")
require.NotNil(t, err)
constructor, err := NewServiceConstructor("file")
require.Nil(t, err)
var expectedType ServiceConstructor
require.IsType(t, expectedType, constructor)
serv, err := constructor(mockOptions, mockKeys, testMarshaller)
require.Nil(t, err)
require.IsType(t, &file.StreamingService{}, serv)
listeners := serv.Listeners()
for _, key := range mockKeys {
_, ok := listeners[key]
require.True(t, ok)
}
}

View File

@ -0,0 +1,64 @@
# File Streaming Service
This pkg contains an implementation of the [StreamingService](../../../baseapp/streaming.go) that writes
the data stream out to files on the local filesystem. This process is performed synchronously with the message processing
of the state machine.
## Configuration
The `file.StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file:
```toml
[store]
streamers = [ # if len(streamers) > 0 we are streaming
"file", # name of the streaming service, used by constructor
]
[streamers]
[streamers.file]
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
```
We turn the service on by adding its name, "file", to `store.streamers`- the list of streaming services for this App to employ.
In `streamers.file` we include three configuration parameters for the file streaming service:
1. `streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service.
In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off.
2. `streamers.file.write_dir` contains the path to the directory to write the files to.
3. `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions
with other App `StreamingService` output files.
##### Encoding
For each pair of `BeginBlock` requests and responses, a file is created and named `block-{N}-begin`, where N is the block number.
At the head of this file the length-prefixed protobuf encoded `BeginBlock` request is written.
At the tail of this file the length-prefixed protobuf encoded `BeginBlock` response is written.
In between these two encoded messages, the state changes that occurred due to the `BeginBlock` request are written chronologically as
a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service
is configured to listen to.
For each pair of `DeliverTx` requests and responses, a file is created and named `block-{N}-tx-{M}` where N is the block number and M
is the tx number in the block (i.e. 0, 1, 2...).
At the head of this file the length-prefixed protobuf encoded `DeliverTx` request is written.
At the tail of this file the length-prefixed protobuf encoded `DeliverTx` response is written.
In between these two encoded messages, the state changes that occurred due to the `DeliverTx` request are written chronologically as
a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service
is configured to listen to.
For each pair of `EndBlock` requests and responses, a file is created and named `block-{N}-end`, where N is the block number.
At the head of this file the length-prefixed protobuf encoded `EndBlock` request is written.
At the tail of this file the length-prefixed protobuf encoded `EndBlock` response is written.
In between these two encoded messages, the state changes that occurred due to the `EndBlock` request are written chronologically as
a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service
is configured to listen to.
##### Decoding
To decode the files written in the above format we read all the bytes from a given file into memory and segment them into proto
messages based on the length-prefixing of each message. Once segmented, it is known that the first message is the ABCI request,
the last message is the ABCI response, and that every message in between is a `StoreKVPair`. This enables us to decode each segment into
the appropriate message type.
The type of ABCI req/res, the block height, and the transaction index (where relevant) is known
from the file name, and the KVStore each `StoreKVPair` originates from is known since the `StoreKey` is included as a field in the proto message.

View File

@ -0,0 +1,10 @@
[store]
streamers = [ # if len(streamers) > 0 we are streaming
"file", # name of the streaming service, used by constructor
]
[streamers]
[streamers.file]
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"

View File

@ -0,0 +1,279 @@
package file
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"sync"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)
var _ baseapp.StreamingService = &StreamingService{}
// StreamingService is a concrete implementation of StreamingService that writes state changes out to files
type StreamingService struct {
listeners map[types.StoreKey][]types.WriteListener // the listeners that will be initialized with BaseApp
srcChan <-chan []byte // the channel that all the WriteListeners write their data out to
filePrefix string // optional prefix for each of the generated files
writeDir string // directory to write files into
codec codec.BinaryCodec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files
stateCache [][]byte // cache the protobuf binary encoded StoreKVPairs in the order they are received
stateCacheLock *sync.Mutex // mutex for the state cache
currentBlockNumber int64 // the current block number
currentTxIndex int64 // the index of the current tx
quitChan chan struct{} // channel to synchronize closure
}
// IntermediateWriter is used so that we do not need to update the underlying io.Writer
// inside the StoreKVPairWriteListener everytime we begin writing to a new file
type IntermediateWriter struct {
outChan chan<- []byte
}
// NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel
func NewIntermediateWriter(outChan chan<- []byte) *IntermediateWriter {
return &IntermediateWriter{
outChan: outChan,
}
}
// Write satisfies io.Writer
func (iw *IntermediateWriter) Write(b []byte) (int, error) {
iw.outChan <- b
return len(b), nil
}
// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys
func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec) (*StreamingService, error) {
listenChan := make(chan []byte)
iw := NewIntermediateWriter(listenChan)
listener := types.NewStoreKVPairWriteListener(iw, c)
listeners := make(map[types.StoreKey][]types.WriteListener, len(storeKeys))
// in this case, we are using the same listener for each Store
for _, key := range storeKeys {
listeners[key] = append(listeners[key], listener)
}
// check that the writeDir exists and is writeable so that we can catch the error here at initialization if it is not
// we don't open a dstFile until we receive our first ABCI message
if err := isDirWriteable(writeDir); err != nil {
return nil, err
}
return &StreamingService{
listeners: listeners,
srcChan: listenChan,
filePrefix: filePrefix,
writeDir: writeDir,
codec: c,
stateCache: make([][]byte, 0),
stateCacheLock: new(sync.Mutex),
}, nil
}
// Listeners satisfies the baseapp.StreamingService interface
// It returns the StreamingService's underlying WriteListeners
// Use for registering the underlying WriteListeners with the BaseApp
func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener {
return fss.listeners
}
// ListenBeginBlock satisfies the baseapp.ABCIListener interface
// It writes the received BeginBlock request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
// generate the new file
dstFile, err := fss.openBeginBlockFile(req)
if err != nil {
return err
}
// write req to file
lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil {
return err
}
// write all state changes cached for this stage to file
fss.stateCacheLock.Lock()
for _, stateChange := range fss.stateCache {
if _, err = dstFile.Write(stateChange); err != nil {
fss.stateCache = nil
fss.stateCacheLock.Unlock()
return err
}
}
// reset cache
fss.stateCache = nil
fss.stateCacheLock.Unlock()
// write res to file
lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil {
return err
}
// close file
return dstFile.Close()
}
func (fss *StreamingService) openBeginBlockFile(req abci.RequestBeginBlock) (*os.File, error) {
fss.currentBlockNumber = req.GetHeader().Height
fss.currentTxIndex = 0
fileName := fmt.Sprintf("block-%d-begin", fss.currentBlockNumber)
if fss.filePrefix != "" {
fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName)
}
return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600)
}
// ListenDeliverTx satisfies the baseapp.ABCIListener interface
// It writes the received DeliverTx request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error {
// generate the new file
dstFile, err := fss.openDeliverTxFile()
if err != nil {
return err
}
// write req to file
lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil {
return err
}
// write all state changes cached for this stage to file
fss.stateCacheLock.Lock()
for _, stateChange := range fss.stateCache {
if _, err = dstFile.Write(stateChange); err != nil {
fss.stateCache = nil
fss.stateCacheLock.Unlock()
return err
}
}
// reset cache
fss.stateCache = nil
fss.stateCacheLock.Unlock()
// write res to file
lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil {
return err
}
// close file
return dstFile.Close()
}
func (fss *StreamingService) openDeliverTxFile() (*os.File, error) {
fileName := fmt.Sprintf("block-%d-tx-%d", fss.currentBlockNumber, fss.currentTxIndex)
if fss.filePrefix != "" {
fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName)
}
fss.currentTxIndex++
return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600)
}
// ListenEndBlock satisfies the baseapp.ABCIListener interface
// It writes the received EndBlock request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error {
// generate the new file
dstFile, err := fss.openEndBlockFile()
if err != nil {
return err
}
// write req to file
lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil {
return err
}
// write all state changes cached for this stage to file
fss.stateCacheLock.Lock()
for _, stateChange := range fss.stateCache {
if _, err = dstFile.Write(stateChange); err != nil {
fss.stateCache = nil
fss.stateCacheLock.Unlock()
return err
}
}
// reset cache
fss.stateCache = nil
fss.stateCacheLock.Unlock()
// write res to file
lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil {
return err
}
// close file
return dstFile.Close()
}
func (fss *StreamingService) openEndBlockFile() (*os.File, error) {
fileName := fmt.Sprintf("block-%d-end", fss.currentBlockNumber)
if fss.filePrefix != "" {
fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName)
}
return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600)
}
// Stream satisfies the baseapp.StreamingService interface
// It spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs
// and caches them in the order they were received
// returns an error if it is called twice
func (fss *StreamingService) Stream(wg *sync.WaitGroup) error {
if fss.quitChan != nil {
return errors.New("`Stream` has already been called. The stream needs to be closed before it can be started again")
}
fss.quitChan = make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-fss.quitChan:
fss.quitChan = nil
return
case by := <-fss.srcChan:
fss.stateCacheLock.Lock()
fss.stateCache = append(fss.stateCache, by)
fss.stateCacheLock.Unlock()
}
}
}()
return nil
}
// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface
func (fss *StreamingService) Close() error {
close(fss.quitChan)
return nil
}
// isDirWriteable checks if dir is writable by writing and removing a file
// to dir. It returns nil if dir is writable.
func isDirWriteable(dir string) error {
f := path.Join(dir, ".touch")
if err := ioutil.WriteFile(f, []byte(""), 0600); err != nil {
return err
}
return os.Remove(f)
}

View File

@ -0,0 +1,401 @@
package file
import (
"encoding/binary"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"github.com/cosmos/cosmos-sdk/codec"
codecTypes "github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
types1 "github.com/tendermint/tendermint/proto/tendermint/types"
)
var (
interfaceRegistry = codecTypes.NewInterfaceRegistry()
testMarshaller = codec.NewProtoCodec(interfaceRegistry)
testStreamingService *StreamingService
testListener1, testListener2 types.WriteListener
emptyContext = sdk.Context{}
// test abci message types
mockHash = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}
testBeginBlockReq = abci.RequestBeginBlock{
Header: types1.Header{
Height: 1,
},
ByzantineValidators: []abci.Evidence{},
Hash: mockHash,
LastCommitInfo: abci.LastCommitInfo{
Round: 1,
Votes: []abci.VoteInfo{},
},
}
testBeginBlockRes = abci.ResponseBeginBlock{
Events: []abci.Event{
{
Type: "testEventType1",
},
{
Type: "testEventType2",
},
},
}
testEndBlockReq = abci.RequestEndBlock{
Height: 1,
}
testEndBlockRes = abci.ResponseEndBlock{
Events: []abci.Event{},
ConsensusParamUpdates: &abci.ConsensusParams{},
ValidatorUpdates: []abci.ValidatorUpdate{},
}
mockTxBytes1 = []byte{9, 8, 7, 6, 5, 4, 3, 2, 1}
testDeliverTxReq1 = abci.RequestDeliverTx{
Tx: mockTxBytes1,
}
mockTxBytes2 = []byte{8, 7, 6, 5, 4, 3, 2}
testDeliverTxReq2 = abci.RequestDeliverTx{
Tx: mockTxBytes2,
}
mockTxResponseData1 = []byte{1, 3, 5, 7, 9}
testDeliverTxRes1 = abci.ResponseDeliverTx{
Events: []abci.Event{},
Code: 1,
Codespace: "mockCodeSpace",
Data: mockTxResponseData1,
GasUsed: 2,
GasWanted: 3,
Info: "mockInfo",
Log: "mockLog",
}
mockTxResponseData2 = []byte{1, 3, 5, 7, 9}
testDeliverTxRes2 = abci.ResponseDeliverTx{
Events: []abci.Event{},
Code: 1,
Codespace: "mockCodeSpace",
Data: mockTxResponseData2,
GasUsed: 2,
GasWanted: 3,
Info: "mockInfo",
Log: "mockLog",
}
// mock store keys
mockStoreKey1 = sdk.NewKVStoreKey("mockStore1")
mockStoreKey2 = sdk.NewKVStoreKey("mockStore2")
// file stuff
testPrefix = "testPrefix"
testDir = "./.test"
// mock state changes
mockKey1 = []byte{1, 2, 3}
mockValue1 = []byte{3, 2, 1}
mockKey2 = []byte{2, 3, 4}
mockValue2 = []byte{4, 3, 2}
mockKey3 = []byte{3, 4, 5}
mockValue3 = []byte{5, 4, 3}
)
func TestIntermediateWriter(t *testing.T) {
outChan := make(chan []byte, 0)
iw := NewIntermediateWriter(outChan)
require.IsType(t, &IntermediateWriter{}, iw)
testBytes := []byte{1, 2, 3, 4, 5}
var length int
var err error
waitChan := make(chan struct{}, 0)
go func() {
length, err = iw.Write(testBytes)
waitChan <- struct{}{}
}()
receivedBytes := <-outChan
<-waitChan
require.Equal(t, len(testBytes), length)
require.Equal(t, testBytes, receivedBytes)
require.Nil(t, err)
}
func TestFileStreamingService(t *testing.T) {
if os.Getenv("CI") != "" {
t.Skip("Skipping TestFileStreamingService in CI environment")
}
err := os.Mkdir(testDir, 0700)
require.Nil(t, err)
defer os.RemoveAll(testDir)
testKeys := []types.StoreKey{mockStoreKey1, mockStoreKey2}
testStreamingService, err = NewStreamingService(testDir, testPrefix, testKeys, testMarshaller)
require.Nil(t, err)
require.IsType(t, &StreamingService{}, testStreamingService)
require.Equal(t, testPrefix, testStreamingService.filePrefix)
require.Equal(t, testDir, testStreamingService.writeDir)
require.Equal(t, testMarshaller, testStreamingService.codec)
testListener1 = testStreamingService.listeners[mockStoreKey1][0]
testListener2 = testStreamingService.listeners[mockStoreKey2][0]
wg := new(sync.WaitGroup)
testStreamingService.Stream(wg)
testListenBeginBlock(t)
testListenDeliverTx1(t)
testListenDeliverTx2(t)
testListenEndBlock(t)
testStreamingService.Close()
wg.Wait()
}
func testListenBeginBlock(t *testing.T) {
expectedBeginBlockReqBytes, err := testMarshaller.Marshal(&testBeginBlockReq)
require.Nil(t, err)
expectedBeginBlockResBytes, err := testMarshaller.Marshal(&testBeginBlockRes)
require.Nil(t, err)
// write state changes
testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false)
testListener2.OnWrite(mockStoreKey2, mockKey2, mockValue2, false)
testListener1.OnWrite(mockStoreKey1, mockKey3, mockValue3, false)
// expected KV pairs
expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{
StoreKey: mockStoreKey1.Name(),
Key: mockKey1,
Value: mockValue1,
Delete: false,
})
require.Nil(t, err)
expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{
StoreKey: mockStoreKey2.Name(),
Key: mockKey2,
Value: mockValue2,
Delete: false,
})
require.Nil(t, err)
expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{
StoreKey: mockStoreKey1.Name(),
Key: mockKey3,
Value: mockValue3,
Delete: false,
})
require.Nil(t, err)
// send the ABCI messages
err = testStreamingService.ListenBeginBlock(emptyContext, testBeginBlockReq, testBeginBlockRes)
require.Nil(t, err)
// load the file, checking that it was created with the expected name
fileName := fmt.Sprintf("%s-block-%d-begin", testPrefix, testBeginBlockReq.GetHeader().Height)
fileBytes, err := readInFile(fileName)
require.Nil(t, err)
// segment the file into the separate gRPC messages and check the correctness of each
segments, err := segmentBytes(fileBytes)
require.Nil(t, err)
require.Equal(t, 5, len(segments))
require.Equal(t, expectedBeginBlockReqBytes, segments[0])
require.Equal(t, expectedKVPair1, segments[1])
require.Equal(t, expectedKVPair2, segments[2])
require.Equal(t, expectedKVPair3, segments[3])
require.Equal(t, expectedBeginBlockResBytes, segments[4])
}
func testListenDeliverTx1(t *testing.T) {
expectedDeliverTxReq1Bytes, err := testMarshaller.Marshal(&testDeliverTxReq1)
require.Nil(t, err)
expectedDeliverTxRes1Bytes, err := testMarshaller.Marshal(&testDeliverTxRes1)
require.Nil(t, err)
// write state changes
testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false)
testListener2.OnWrite(mockStoreKey2, mockKey2, mockValue2, false)
testListener1.OnWrite(mockStoreKey2, mockKey3, mockValue3, false)
// expected KV pairs
expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{
StoreKey: mockStoreKey1.Name(),
Key: mockKey1,
Value: mockValue1,
Delete: false,
})
require.Nil(t, err)
expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{
StoreKey: mockStoreKey2.Name(),
Key: mockKey2,
Value: mockValue2,
Delete: false,
})
require.Nil(t, err)
expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{
StoreKey: mockStoreKey2.Name(),
Key: mockKey3,
Value: mockValue3,
Delete: false,
})
require.Nil(t, err)
// send the ABCI messages
err = testStreamingService.ListenDeliverTx(emptyContext, testDeliverTxReq1, testDeliverTxRes1)
require.Nil(t, err)
// load the file, checking that it was created with the expected name
fileName := fmt.Sprintf("%s-block-%d-tx-%d", testPrefix, testBeginBlockReq.GetHeader().Height, 0)
fileBytes, err := readInFile(fileName)
require.Nil(t, err)
// segment the file into the separate gRPC messages and check the correctness of each
segments, err := segmentBytes(fileBytes)
require.Nil(t, err)
require.Equal(t, 5, len(segments))
require.Equal(t, expectedDeliverTxReq1Bytes, segments[0])
require.Equal(t, expectedKVPair1, segments[1])
require.Equal(t, expectedKVPair2, segments[2])
require.Equal(t, expectedKVPair3, segments[3])
require.Equal(t, expectedDeliverTxRes1Bytes, segments[4])
}
func testListenDeliverTx2(t *testing.T) {
expectedDeliverTxReq2Bytes, err := testMarshaller.Marshal(&testDeliverTxReq2)
require.Nil(t, err)
expectedDeliverTxRes2Bytes, err := testMarshaller.Marshal(&testDeliverTxRes2)
require.Nil(t, err)
// write state changes
testListener1.OnWrite(mockStoreKey2, mockKey1, mockValue1, false)
testListener2.OnWrite(mockStoreKey1, mockKey2, mockValue2, false)
testListener1.OnWrite(mockStoreKey2, mockKey3, mockValue3, false)
// expected KV pairs
expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{
StoreKey: mockStoreKey2.Name(),
Key: mockKey1,
Value: mockValue1,
Delete: false,
})
require.Nil(t, err)
expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{
StoreKey: mockStoreKey1.Name(),
Key: mockKey2,
Value: mockValue2,
Delete: false,
})
require.Nil(t, err)
expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{
StoreKey: mockStoreKey2.Name(),
Key: mockKey3,
Value: mockValue3,
Delete: false,
})
require.Nil(t, err)
// send the ABCI messages
err = testStreamingService.ListenDeliverTx(emptyContext, testDeliverTxReq2, testDeliverTxRes2)
require.Nil(t, err)
// load the file, checking that it was created with the expected name
fileName := fmt.Sprintf("%s-block-%d-tx-%d", testPrefix, testBeginBlockReq.GetHeader().Height, 1)
fileBytes, err := readInFile(fileName)
require.Nil(t, err)
// segment the file into the separate gRPC messages and check the correctness of each
segments, err := segmentBytes(fileBytes)
require.Nil(t, err)
require.Equal(t, 5, len(segments))
require.Equal(t, expectedDeliverTxReq2Bytes, segments[0])
require.Equal(t, expectedKVPair1, segments[1])
require.Equal(t, expectedKVPair2, segments[2])
require.Equal(t, expectedKVPair3, segments[3])
require.Equal(t, expectedDeliverTxRes2Bytes, segments[4])
}
func testListenEndBlock(t *testing.T) {
expectedEndBlockReqBytes, err := testMarshaller.Marshal(&testEndBlockReq)
require.Nil(t, err)
expectedEndBlockResBytes, err := testMarshaller.Marshal(&testEndBlockRes)
require.Nil(t, err)
// write state changes
testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false)
testListener2.OnWrite(mockStoreKey1, mockKey2, mockValue2, false)
testListener1.OnWrite(mockStoreKey2, mockKey3, mockValue3, false)
// expected KV pairs
expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{
StoreKey: mockStoreKey1.Name(),
Key: mockKey1,
Value: mockValue1,
Delete: false,
})
require.Nil(t, err)
expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{
StoreKey: mockStoreKey1.Name(),
Key: mockKey2,
Value: mockValue2,
Delete: false,
})
require.Nil(t, err)
expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{
StoreKey: mockStoreKey2.Name(),
Key: mockKey3,
Value: mockValue3,
Delete: false,
})
require.Nil(t, err)
// send the ABCI messages
err = testStreamingService.ListenEndBlock(emptyContext, testEndBlockReq, testEndBlockRes)
require.Nil(t, err)
// load the file, checking that it was created with the expected name
fileName := fmt.Sprintf("%s-block-%d-end", testPrefix, testEndBlockReq.Height)
fileBytes, err := readInFile(fileName)
require.Nil(t, err)
// segment the file into the separate gRPC messages and check the correctness of each
segments, err := segmentBytes(fileBytes)
require.Nil(t, err)
require.Equal(t, 5, len(segments))
require.Equal(t, expectedEndBlockReqBytes, segments[0])
require.Equal(t, expectedKVPair1, segments[1])
require.Equal(t, expectedKVPair2, segments[2])
require.Equal(t, expectedKVPair3, segments[3])
require.Equal(t, expectedEndBlockResBytes, segments[4])
}
func readInFile(name string) ([]byte, error) {
path := filepath.Join(testDir, name)
return ioutil.ReadFile(path)
}
// Returns all of the protobuf messages contained in the byte array as an array of byte arrays
// The messages have their length prefix removed
func segmentBytes(bz []byte) ([][]byte, error) {
var err error
segments := make([][]byte, 0)
for len(bz) > 0 {
var segment []byte
segment, bz, err = getHeadSegment(bz)
if err != nil {
return nil, err
}
segments = append(segments, segment)
}
return segments, nil
}
// Returns the bytes for the leading protobuf object in the byte array (removing the length prefix) and returns the remainder of the byte array
func getHeadSegment(bz []byte) ([]byte, []byte, error) {
size, prefixSize := binary.Uvarint(bz)
if prefixSize < 0 {
return nil, nil, fmt.Errorf("invalid number of bytes read from length-prefixed encoding: %d", prefixSize)
}
if size > uint64(len(bz)-prefixSize) {
return nil, nil, fmt.Errorf("not enough bytes to read; want: %v, got: %v", size, len(bz)-prefixSize)
}
return bz[prefixSize:(uint64(prefixSize) + size)], bz[uint64(prefixSize)+size:], nil
}