package file import ( "errors" "fmt" "io/ioutil" "os" "path" "path/filepath" "sync" abci "github.com/tendermint/tendermint/abci/types" "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" ) var _ baseapp.StreamingService = &StreamingService{} // StreamingService is a concrete implementation of StreamingService that writes state changes out to files type StreamingService struct { listeners map[types.StoreKey][]types.WriteListener // the listeners that will be initialized with BaseApp srcChan <-chan []byte // the channel that all the WriteListeners write their data out to filePrefix string // optional prefix for each of the generated files writeDir string // directory to write files into codec codec.BinaryCodec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files stateCache [][]byte // cache the protobuf binary encoded StoreKVPairs in the order they are received stateCacheLock *sync.Mutex // mutex for the state cache currentBlockNumber int64 // the current block number currentTxIndex int64 // the index of the current tx quitChan chan struct{} // channel to synchronize closure } // IntermediateWriter is used so that we do not need to update the underlying io.Writer // inside the StoreKVPairWriteListener everytime we begin writing to a new file type IntermediateWriter struct { outChan chan<- []byte } // NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel func NewIntermediateWriter(outChan chan<- []byte) *IntermediateWriter { return &IntermediateWriter{ outChan: outChan, } } // Write satisfies io.Writer func (iw *IntermediateWriter) Write(b []byte) (int, error) { iw.outChan <- b return len(b), nil } // NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec) (*StreamingService, error) { listenChan := make(chan []byte) iw := NewIntermediateWriter(listenChan) listener := types.NewStoreKVPairWriteListener(iw, c) listeners := make(map[types.StoreKey][]types.WriteListener, len(storeKeys)) // in this case, we are using the same listener for each Store for _, key := range storeKeys { listeners[key] = append(listeners[key], listener) } // check that the writeDir exists and is writeable so that we can catch the error here at initialization if it is not // we don't open a dstFile until we receive our first ABCI message if err := isDirWriteable(writeDir); err != nil { return nil, err } return &StreamingService{ listeners: listeners, srcChan: listenChan, filePrefix: filePrefix, writeDir: writeDir, codec: c, stateCache: make([][]byte, 0), stateCacheLock: new(sync.Mutex), }, nil } // Listeners satisfies the baseapp.StreamingService interface // It returns the StreamingService's underlying WriteListeners // Use for registering the underlying WriteListeners with the BaseApp func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener { return fss.listeners } // ListenBeginBlock satisfies the baseapp.ABCIListener interface // It writes the received BeginBlock request and response and the resulting state changes // out to a file as described in the above the naming schema func (fss *StreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { // generate the new file dstFile, err := fss.openBeginBlockFile(req) if err != nil { return err } // write req to file lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req) if err != nil { return err } if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil { return err } // write all state changes cached for this stage to file fss.stateCacheLock.Lock() for _, stateChange := range fss.stateCache { if _, err = dstFile.Write(stateChange); err != nil { fss.stateCache = nil fss.stateCacheLock.Unlock() return err } } // reset cache fss.stateCache = nil fss.stateCacheLock.Unlock() // write res to file lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res) if err != nil { return err } if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil { return err } // close file return dstFile.Close() } func (fss *StreamingService) openBeginBlockFile(req abci.RequestBeginBlock) (*os.File, error) { fss.currentBlockNumber = req.GetHeader().Height fss.currentTxIndex = 0 fileName := fmt.Sprintf("block-%d-begin", fss.currentBlockNumber) if fss.filePrefix != "" { fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName) } return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600) } // ListenDeliverTx satisfies the baseapp.ABCIListener interface // It writes the received DeliverTx request and response and the resulting state changes // out to a file as described in the above the naming schema func (fss *StreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { // generate the new file dstFile, err := fss.openDeliverTxFile() if err != nil { return err } // write req to file lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req) if err != nil { return err } if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil { return err } // write all state changes cached for this stage to file fss.stateCacheLock.Lock() for _, stateChange := range fss.stateCache { if _, err = dstFile.Write(stateChange); err != nil { fss.stateCache = nil fss.stateCacheLock.Unlock() return err } } // reset cache fss.stateCache = nil fss.stateCacheLock.Unlock() // write res to file lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res) if err != nil { return err } if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil { return err } // close file return dstFile.Close() } func (fss *StreamingService) openDeliverTxFile() (*os.File, error) { fileName := fmt.Sprintf("block-%d-tx-%d", fss.currentBlockNumber, fss.currentTxIndex) if fss.filePrefix != "" { fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName) } fss.currentTxIndex++ return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600) } // ListenEndBlock satisfies the baseapp.ABCIListener interface // It writes the received EndBlock request and response and the resulting state changes // out to a file as described in the above the naming schema func (fss *StreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { // generate the new file dstFile, err := fss.openEndBlockFile() if err != nil { return err } // write req to file lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req) if err != nil { return err } if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil { return err } // write all state changes cached for this stage to file fss.stateCacheLock.Lock() for _, stateChange := range fss.stateCache { if _, err = dstFile.Write(stateChange); err != nil { fss.stateCache = nil fss.stateCacheLock.Unlock() return err } } // reset cache fss.stateCache = nil fss.stateCacheLock.Unlock() // write res to file lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res) if err != nil { return err } if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil { return err } // close file return dstFile.Close() } func (fss *StreamingService) openEndBlockFile() (*os.File, error) { fileName := fmt.Sprintf("block-%d-end", fss.currentBlockNumber) if fss.filePrefix != "" { fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName) } return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600) } // Stream satisfies the baseapp.StreamingService interface // It spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs // and caches them in the order they were received // returns an error if it is called twice func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { if fss.quitChan != nil { return errors.New("`Stream` has already been called. The stream needs to be closed before it can be started again") } fss.quitChan = make(chan struct{}) wg.Add(1) go func() { defer wg.Done() for { select { case <-fss.quitChan: fss.quitChan = nil return case by := <-fss.srcChan: fss.stateCacheLock.Lock() fss.stateCache = append(fss.stateCache, by) fss.stateCacheLock.Unlock() } } }() return nil } // Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface func (fss *StreamingService) Close() error { close(fss.quitChan) return nil } // isDirWriteable checks if dir is writable by writing and removing a file // to dir. It returns nil if dir is writable. func isDirWriteable(dir string) error { f := path.Join(dir, ".touch") if err := ioutil.WriteFile(f, []byte(""), 0600); err != nil { return err } return os.Remove(f) }