cosmos-sdk/store/streaming/file/service.go

280 lines
9.4 KiB
Go

package file
import (
"errors"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"sync"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)
var _ baseapp.StreamingService = &StreamingService{}
// StreamingService is a concrete implementation of StreamingService that writes state changes out to files
type StreamingService struct {
listeners map[types.StoreKey][]types.WriteListener // the listeners that will be initialized with BaseApp
srcChan <-chan []byte // the channel that all the WriteListeners write their data out to
filePrefix string // optional prefix for each of the generated files
writeDir string // directory to write files into
codec codec.BinaryCodec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files
stateCache [][]byte // cache the protobuf binary encoded StoreKVPairs in the order they are received
stateCacheLock *sync.Mutex // mutex for the state cache
currentBlockNumber int64 // the current block number
currentTxIndex int64 // the index of the current tx
quitChan chan struct{} // channel to synchronize closure
}
// IntermediateWriter is used so that we do not need to update the underlying io.Writer
// inside the StoreKVPairWriteListener everytime we begin writing to a new file
type IntermediateWriter struct {
outChan chan<- []byte
}
// NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel
func NewIntermediateWriter(outChan chan<- []byte) *IntermediateWriter {
return &IntermediateWriter{
outChan: outChan,
}
}
// Write satisfies io.Writer
func (iw *IntermediateWriter) Write(b []byte) (int, error) {
iw.outChan <- b
return len(b), nil
}
// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys
func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec) (*StreamingService, error) {
listenChan := make(chan []byte)
iw := NewIntermediateWriter(listenChan)
listener := types.NewStoreKVPairWriteListener(iw, c)
listeners := make(map[types.StoreKey][]types.WriteListener, len(storeKeys))
// in this case, we are using the same listener for each Store
for _, key := range storeKeys {
listeners[key] = append(listeners[key], listener)
}
// check that the writeDir exists and is writeable so that we can catch the error here at initialization if it is not
// we don't open a dstFile until we receive our first ABCI message
if err := isDirWriteable(writeDir); err != nil {
return nil, err
}
return &StreamingService{
listeners: listeners,
srcChan: listenChan,
filePrefix: filePrefix,
writeDir: writeDir,
codec: c,
stateCache: make([][]byte, 0),
stateCacheLock: new(sync.Mutex),
}, nil
}
// Listeners satisfies the baseapp.StreamingService interface
// It returns the StreamingService's underlying WriteListeners
// Use for registering the underlying WriteListeners with the BaseApp
func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener {
return fss.listeners
}
// ListenBeginBlock satisfies the baseapp.ABCIListener interface
// It writes the received BeginBlock request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
// generate the new file
dstFile, err := fss.openBeginBlockFile(req)
if err != nil {
return err
}
// write req to file
lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil {
return err
}
// write all state changes cached for this stage to file
fss.stateCacheLock.Lock()
for _, stateChange := range fss.stateCache {
if _, err = dstFile.Write(stateChange); err != nil {
fss.stateCache = nil
fss.stateCacheLock.Unlock()
return err
}
}
// reset cache
fss.stateCache = nil
fss.stateCacheLock.Unlock()
// write res to file
lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil {
return err
}
// close file
return dstFile.Close()
}
func (fss *StreamingService) openBeginBlockFile(req abci.RequestBeginBlock) (*os.File, error) {
fss.currentBlockNumber = req.GetHeader().Height
fss.currentTxIndex = 0
fileName := fmt.Sprintf("block-%d-begin", fss.currentBlockNumber)
if fss.filePrefix != "" {
fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName)
}
return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600)
}
// ListenDeliverTx satisfies the baseapp.ABCIListener interface
// It writes the received DeliverTx request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error {
// generate the new file
dstFile, err := fss.openDeliverTxFile()
if err != nil {
return err
}
// write req to file
lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil {
return err
}
// write all state changes cached for this stage to file
fss.stateCacheLock.Lock()
for _, stateChange := range fss.stateCache {
if _, err = dstFile.Write(stateChange); err != nil {
fss.stateCache = nil
fss.stateCacheLock.Unlock()
return err
}
}
// reset cache
fss.stateCache = nil
fss.stateCacheLock.Unlock()
// write res to file
lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil {
return err
}
// close file
return dstFile.Close()
}
func (fss *StreamingService) openDeliverTxFile() (*os.File, error) {
fileName := fmt.Sprintf("block-%d-tx-%d", fss.currentBlockNumber, fss.currentTxIndex)
if fss.filePrefix != "" {
fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName)
}
fss.currentTxIndex++
return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600)
}
// ListenEndBlock satisfies the baseapp.ABCIListener interface
// It writes the received EndBlock request and response and the resulting state changes
// out to a file as described in the above the naming schema
func (fss *StreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error {
// generate the new file
dstFile, err := fss.openEndBlockFile()
if err != nil {
return err
}
// write req to file
lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil {
return err
}
// write all state changes cached for this stage to file
fss.stateCacheLock.Lock()
for _, stateChange := range fss.stateCache {
if _, err = dstFile.Write(stateChange); err != nil {
fss.stateCache = nil
fss.stateCacheLock.Unlock()
return err
}
}
// reset cache
fss.stateCache = nil
fss.stateCacheLock.Unlock()
// write res to file
lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res)
if err != nil {
return err
}
if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil {
return err
}
// close file
return dstFile.Close()
}
func (fss *StreamingService) openEndBlockFile() (*os.File, error) {
fileName := fmt.Sprintf("block-%d-end", fss.currentBlockNumber)
if fss.filePrefix != "" {
fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName)
}
return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600)
}
// Stream satisfies the baseapp.StreamingService interface
// It spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs
// and caches them in the order they were received
// returns an error if it is called twice
func (fss *StreamingService) Stream(wg *sync.WaitGroup) error {
if fss.quitChan != nil {
return errors.New("`Stream` has already been called. The stream needs to be closed before it can be started again")
}
fss.quitChan = make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-fss.quitChan:
fss.quitChan = nil
return
case by := <-fss.srcChan:
fss.stateCacheLock.Lock()
fss.stateCache = append(fss.stateCache, by)
fss.stateCacheLock.Unlock()
}
}
}()
return nil
}
// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface
func (fss *StreamingService) Close() error {
close(fss.quitChan)
return nil
}
// isDirWriteable checks if dir is writable by writing and removing a file
// to dir. It returns nil if dir is writable.
func isDirWriteable(dir string) error {
f := path.Join(dir, ".touch")
if err := ioutil.WriteFile(f, []byte(""), 0600); err != nil {
return err
}
return os.Remove(f)
}