260 lines
7.4 KiB
Go
260 lines
7.4 KiB
Go
package snapshots
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/sha256"
|
|
"io"
|
|
"io/ioutil"
|
|
"sync"
|
|
|
|
"github.com/cosmos/cosmos-sdk/snapshots/types"
|
|
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
|
|
)
|
|
|
|
const (
|
|
opNone operation = ""
|
|
opSnapshot operation = "snapshot"
|
|
opPrune operation = "prune"
|
|
opRestore operation = "restore"
|
|
|
|
chunkBufferSize = 4
|
|
)
|
|
|
|
// operation represents a Manager operation. Only one operation can be in progress at a time.
|
|
type operation string
|
|
|
|
// restoreDone represents the result of a restore operation.
|
|
type restoreDone struct {
|
|
complete bool // if true, restore completed successfully (not prematurely)
|
|
err error // if non-nil, restore errored
|
|
}
|
|
|
|
// Manager manages snapshot and restore operations for an app, making sure only a single
|
|
// long-running operation is in progress at any given time, and provides convenience methods
|
|
// mirroring the ABCI interface.
|
|
//
|
|
// Although the ABCI interface (and this manager) passes chunks as byte slices, the internal
|
|
// snapshot/restore APIs use IO streams (i.e. chan io.ReadCloser), for two reasons:
|
|
//
|
|
// 1) In the future, ABCI should support streaming. Consider e.g. InitChain during chain
|
|
// upgrades, which currently passes the entire chain state as an in-memory byte slice.
|
|
// https://github.com/tendermint/tendermint/issues/5184
|
|
//
|
|
// 2) io.ReadCloser streams automatically propagate IO errors, and can pass arbitrary
|
|
// errors via io.Pipe.CloseWithError().
|
|
type Manager struct {
|
|
store *Store
|
|
target types.Snapshotter
|
|
|
|
mtx sync.Mutex
|
|
operation operation
|
|
chRestore chan<- io.ReadCloser
|
|
chRestoreDone <-chan restoreDone
|
|
restoreChunkHashes [][]byte
|
|
restoreChunkIndex uint32
|
|
}
|
|
|
|
// NewManager creates a new manager.
|
|
func NewManager(store *Store, target types.Snapshotter) *Manager {
|
|
return &Manager{
|
|
store: store,
|
|
target: target,
|
|
}
|
|
}
|
|
|
|
// begin starts an operation, or errors if one is in progress. It manages the mutex itself.
|
|
func (m *Manager) begin(op operation) error {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
return m.beginLocked(op)
|
|
}
|
|
|
|
// beginLocked begins an operation while already holding the mutex.
|
|
func (m *Manager) beginLocked(op operation) error {
|
|
if op == opNone {
|
|
return sdkerrors.Wrap(sdkerrors.ErrLogic, "can't begin a none operation")
|
|
}
|
|
if m.operation != opNone {
|
|
return sdkerrors.Wrapf(sdkerrors.ErrConflict, "a %v operation is in progress", m.operation)
|
|
}
|
|
m.operation = op
|
|
return nil
|
|
}
|
|
|
|
// end ends the current operation.
|
|
func (m *Manager) end() {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
m.endLocked()
|
|
}
|
|
|
|
// endLocked ends the current operation while already holding the mutex.
|
|
func (m *Manager) endLocked() {
|
|
m.operation = opNone
|
|
if m.chRestore != nil {
|
|
close(m.chRestore)
|
|
m.chRestore = nil
|
|
}
|
|
m.chRestoreDone = nil
|
|
m.restoreChunkHashes = nil
|
|
m.restoreChunkIndex = 0
|
|
}
|
|
|
|
// Create creates a snapshot and returns its metadata.
|
|
func (m *Manager) Create(height uint64) (*types.Snapshot, error) {
|
|
if m == nil {
|
|
return nil, sdkerrors.Wrap(sdkerrors.ErrLogic, "no snapshot store configured")
|
|
}
|
|
err := m.begin(opSnapshot)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer m.end()
|
|
|
|
latest, err := m.store.GetLatest()
|
|
if err != nil {
|
|
return nil, sdkerrors.Wrap(err, "failed to examine latest snapshot")
|
|
}
|
|
if latest != nil && latest.Height >= height {
|
|
return nil, sdkerrors.Wrapf(sdkerrors.ErrConflict,
|
|
"a more recent snapshot already exists at height %v", latest.Height)
|
|
}
|
|
|
|
chunks, err := m.target.Snapshot(height, types.CurrentFormat)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return m.store.Save(height, types.CurrentFormat, chunks)
|
|
}
|
|
|
|
// List lists snapshots, mirroring ABCI ListSnapshots. It can be concurrent with other operations.
|
|
func (m *Manager) List() ([]*types.Snapshot, error) {
|
|
return m.store.List()
|
|
}
|
|
|
|
// LoadChunk loads a chunk into a byte slice, mirroring ABCI LoadChunk. It can be called
|
|
// concurrently with other operations. If the chunk does not exist, nil is returned.
|
|
func (m *Manager) LoadChunk(height uint64, format uint32, chunk uint32) ([]byte, error) {
|
|
reader, err := m.store.LoadChunk(height, format, chunk)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if reader == nil {
|
|
return nil, nil
|
|
}
|
|
defer reader.Close()
|
|
|
|
return ioutil.ReadAll(reader)
|
|
}
|
|
|
|
// Prune prunes snapshots, if no other operations are in progress.
|
|
func (m *Manager) Prune(retain uint32) (uint64, error) {
|
|
err := m.begin(opPrune)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer m.end()
|
|
return m.store.Prune(retain)
|
|
}
|
|
|
|
// Restore begins an async snapshot restoration, mirroring ABCI OfferSnapshot. Chunks must be fed
|
|
// via RestoreChunk() until the restore is complete or a chunk fails.
|
|
func (m *Manager) Restore(snapshot types.Snapshot) error {
|
|
if snapshot.Chunks == 0 {
|
|
return sdkerrors.Wrap(types.ErrInvalidMetadata, "no chunks")
|
|
}
|
|
if uint32(len(snapshot.Metadata.ChunkHashes)) != snapshot.Chunks {
|
|
return sdkerrors.Wrapf(types.ErrInvalidMetadata, "snapshot has %v chunk hashes, but %v chunks",
|
|
uint32(len(snapshot.Metadata.ChunkHashes)),
|
|
snapshot.Chunks)
|
|
}
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
err := m.beginLocked(opRestore)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Start an asynchronous snapshot restoration, passing chunks and completion status via channels.
|
|
chChunks := make(chan io.ReadCloser, chunkBufferSize)
|
|
chReady := make(chan struct{}, 1)
|
|
chDone := make(chan restoreDone, 1)
|
|
go func() {
|
|
err := m.target.Restore(snapshot.Height, snapshot.Format, chChunks, chReady)
|
|
chDone <- restoreDone{
|
|
complete: err == nil,
|
|
err: err,
|
|
}
|
|
close(chDone)
|
|
}()
|
|
|
|
// Check for any initial errors from the restore, before any chunks are fed.
|
|
select {
|
|
case done := <-chDone:
|
|
m.endLocked()
|
|
if done.err != nil {
|
|
return done.err
|
|
}
|
|
return sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended unexpectedly")
|
|
case <-chReady:
|
|
}
|
|
|
|
m.chRestore = chChunks
|
|
m.chRestoreDone = chDone
|
|
m.restoreChunkHashes = snapshot.Metadata.ChunkHashes
|
|
m.restoreChunkIndex = 0
|
|
return nil
|
|
}
|
|
|
|
// RestoreChunk adds a chunk to an active snapshot restoration, mirroring ABCI ApplySnapshotChunk.
|
|
// Chunks must be given until the restore is complete, returning true, or a chunk errors.
|
|
func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
|
|
m.mtx.Lock()
|
|
defer m.mtx.Unlock()
|
|
if m.operation != opRestore {
|
|
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "no restore operation in progress")
|
|
}
|
|
|
|
if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
|
|
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "received unexpected chunk")
|
|
}
|
|
|
|
// Check if any errors have occurred yet.
|
|
select {
|
|
case done := <-m.chRestoreDone:
|
|
m.endLocked()
|
|
if done.err != nil {
|
|
return false, done.err
|
|
}
|
|
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended unexpectedly")
|
|
default:
|
|
}
|
|
|
|
// Verify the chunk hash.
|
|
hash := sha256.Sum256(chunk)
|
|
expected := m.restoreChunkHashes[m.restoreChunkIndex]
|
|
if !bytes.Equal(hash[:], expected) {
|
|
return false, sdkerrors.Wrapf(types.ErrChunkHashMismatch,
|
|
"expected %x, got %x", hash, expected)
|
|
}
|
|
|
|
// Pass the chunk to the restore, and wait for completion if it was the final one.
|
|
m.chRestore <- ioutil.NopCloser(bytes.NewReader(chunk))
|
|
m.restoreChunkIndex++
|
|
|
|
if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
|
|
close(m.chRestore)
|
|
m.chRestore = nil
|
|
done := <-m.chRestoreDone
|
|
m.endLocked()
|
|
if done.err != nil {
|
|
return false, done.err
|
|
}
|
|
if !done.complete {
|
|
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended prematurely")
|
|
}
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
}
|