feat!: Add hooks to allow app modules to add things to state-sync (backport #10961) (#11267)

* feat!: Add hooks to allow app modules to add things to state-sync (#10961)

## Description

Closes: #7340

- Support registering multiple snapshotters in snapshot manager.
- Append the extension snapshotters to existing snapshot stream.

~TODO: testing.~
- existing tests are fixed

---

### Author Checklist

*All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.*

I have...

- [ ] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] added `!` to the type prefix if API or client breaking change
- [ ] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting))
- [ ] provided a link to the relevant issue or specification
- [ ] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules)
- [ ] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing)
- [ ] added a changelog entry to `CHANGELOG.md`
- [ ] included comments for [documenting Go code](https://blog.golang.org/godoc)
- [ ] updated the relevant documentation or specification
- [ ] reviewed "Files changed" and left comments if necessary
- [ ] confirmed all CI checks have passed

### Reviewers Checklist

*All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.*

I have...

- [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] confirmed `!` in the type prefix if API or client breaking change
- [ ] confirmed all author checklist items have been addressed
- [ ] reviewed state machine logic
- [ ] reviewed API design and naming
- [ ] reviewed documentation is accurate
- [ ] reviewed tests and test coverage
- [ ] manually tested (if applicable)

(cherry picked from commit 7e18e9f1bf)

# Conflicts:
#	api/cosmos/base/snapshots/v1beta1/snapshot.pulsar.go
#	server/mock/store.go
#	snapshots/helpers_test.go
#	snapshots/manager.go
#	store/rootmulti/store_test.go
#	store/v2/multi/store.go

* fix conflicts

* avoid api breakage

* changelog

* fix: rootmulti's Restore don't return the next unknown item as expected (#11286)

## Description
Solution:
- return the next unknown item and add a unit test to ensure that.





---

### Author Checklist

*All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.*

I have...

- [ ] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] added `!` to the type prefix if API or client breaking change
- [ ] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting))
- [ ] provided a link to the relevant issue or specification
- [ ] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules)
- [ ] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing)
- [ ] added a changelog entry to `CHANGELOG.md`
- [ ] included comments for [documenting Go code](https://blog.golang.org/godoc)
- [ ] updated the relevant documentation or specification
- [ ] reviewed "Files changed" and left comments if necessary
- [ ] confirmed all CI checks have passed

### Reviewers Checklist

*All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.*

I have...

- [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] confirmed `!` in the type prefix if API or client breaking change
- [ ] confirmed all author checklist items have been addressed 
- [ ] reviewed state machine logic
- [ ] reviewed API design and naming
- [ ] reviewed documentation is accurate
- [ ] reviewed tests and test coverage
- [ ] manually tested (if applicable)

Co-authored-by: yihuang <huang@crypto.com>
This commit is contained in:
mergify[bot] 2022-03-02 12:18:23 +01:00 committed by GitHub
parent 470014e3d3
commit 811cb8a8cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 2217 additions and 1521 deletions

View File

@ -55,6 +55,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [\#9576](https://github.com/cosmos/cosmos-sdk/pull/9576) Add debug error message to query result when enabled
* (types) [\#11200](https://github.com/cosmos/cosmos-sdk/pull/11200) Added `Min()` and `Max()` operations on sdk.Coins.
* [#11267](https://github.com/cosmos/cosmos-sdk/pull/11267) Add hooks to allow app modules to add things to state-sync (backport #10961).
## [v0.45.1](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.45.1) - 2022-02-03

View File

@ -273,6 +273,22 @@ func DefaultStoreLoader(ms sdk.CommitMultiStore) error {
return ms.LoadLatestVersion()
}
// CommitMultiStore returns the root multi-store.
// App constructor can use this to access the `cms`.
// UNSAFE: only safe to use during app initialization.
func (app *BaseApp) CommitMultiStore() sdk.CommitMultiStore {
if app.sealed {
panic("cannot call CommitMultiStore() after baseapp is sealed")
}
return app.cms
}
// SnapshotManager returns the snapshot manager.
// application use this to register extra extension snapshotters.
func (app *BaseApp) SnapshotManager() *snapshots.Manager {
return app.snapshotManager
}
// LoadVersion loads the BaseApp application version. It will panic if called
// more than once on a running baseapp.
func (app *BaseApp) LoadVersion(version int64) error {

View File

@ -17,4 +17,41 @@ message Snapshot {
// Metadata contains SDK-specific snapshot metadata.
message Metadata {
repeated bytes chunk_hashes = 1; // SHA-256 chunk hashes
}
}
// SnapshotItem is an item contained in a rootmulti.Store snapshot.
message SnapshotItem {
// item is the specific type of snapshot item.
oneof item {
SnapshotStoreItem store = 1;
SnapshotIAVLItem iavl = 2 [(gogoproto.customname) = "IAVL"];
SnapshotExtensionMeta extension = 3;
SnapshotExtensionPayload extension_payload = 4;
}
}
// SnapshotStoreItem contains metadata about a snapshotted store.
message SnapshotStoreItem {
string name = 1;
}
// SnapshotIAVLItem is an exported IAVL node.
message SnapshotIAVLItem {
bytes key = 1;
bytes value = 2;
// version is block height
int64 version = 3;
// height is depth of the tree.
int32 height = 4;
}
// SnapshotExtensionMeta contains metadata about an external snapshotter.
message SnapshotExtensionMeta {
string name = 1;
uint32 format = 2;
}
// SnapshotExtensionPayload contains payloads of an external snapshotter.
message SnapshotExtensionPayload {
bytes payload = 1;
}

View File

@ -1,28 +0,0 @@
syntax = "proto3";
package cosmos.base.store.v1beta1;
import "gogoproto/gogo.proto";
option go_package = "github.com/cosmos/cosmos-sdk/store/types";
// SnapshotItem is an item contained in a rootmulti.Store snapshot.
message SnapshotItem {
// item is the specific type of snapshot item.
oneof item {
SnapshotStoreItem store = 1;
SnapshotIAVLItem iavl = 2 [(gogoproto.customname) = "IAVL"];
}
}
// SnapshotStoreItem contains metadata about a snapshotted store.
message SnapshotStoreItem {
string name = 1;
}
// SnapshotIAVLItem is an exported IAVL node.
message SnapshotIAVLItem {
bytes key = 1;
bytes value = 2;
int64 version = 3;
int32 height = 4;
}

View File

@ -3,8 +3,10 @@ package mock
import (
"io"
protoio "github.com/gogo/protobuf/io"
dbm "github.com/tendermint/tm-db"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
store "github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)
@ -122,13 +124,13 @@ func (ms multiStore) SetInitialVersion(version int64) error {
panic("not implemented")
}
func (ms multiStore) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
func (ms multiStore) Snapshot(height uint64, protoWriter protoio.Writer) error {
panic("not implemented")
}
func (ms multiStore) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
panic("not implemented")
}

View File

@ -1,7 +1,9 @@
package snapshots_test
import (
"bufio"
"bytes"
"compress/zlib"
"crypto/sha256"
"errors"
"io"
@ -10,11 +12,14 @@ import (
"testing"
"time"
protoio "github.com/gogo/protobuf/io"
"github.com/stretchr/testify/require"
db "github.com/tendermint/tm-db"
"github.com/cosmos/cosmos-sdk/snapshots"
"github.com/cosmos/cosmos-sdk/snapshots/types"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)
func checksums(slice [][]byte) [][]byte {
@ -57,45 +62,85 @@ func readChunks(chunks <-chan io.ReadCloser) [][]byte {
return bodies
}
// snapshotItems serialize a array of bytes as SnapshotItem_ExtensionPayload, and return the chunks.
func snapshotItems(items [][]byte) [][]byte {
// copy the same parameters from the code
snapshotChunkSize := uint64(10e6)
snapshotBufferSize := int(snapshotChunkSize)
ch := make(chan io.ReadCloser)
go func() {
chunkWriter := snapshots.NewChunkWriter(ch, snapshotChunkSize)
bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize)
zWriter, _ := zlib.NewWriterLevel(bufWriter, 7)
protoWriter := protoio.NewDelimitedWriter(zWriter)
for _, item := range items {
types.WriteExtensionItem(protoWriter, item)
}
protoWriter.Close()
zWriter.Close()
bufWriter.Flush()
chunkWriter.Close()
}()
var chunks [][]byte
for chunkBody := range ch {
chunk, err := io.ReadAll(chunkBody)
if err != nil {
panic(err)
}
chunks = append(chunks, chunk)
}
return chunks
}
type mockSnapshotter struct {
chunks [][]byte
items [][]byte
}
func (m *mockSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
if format == 0 {
return types.ErrUnknownFormat
return snapshottypes.SnapshotItem{}, types.ErrUnknownFormat
}
if m.chunks != nil {
return errors.New("already has contents")
}
if ready != nil {
close(ready)
if m.items != nil {
return snapshottypes.SnapshotItem{}, errors.New("already has contents")
}
m.chunks = [][]byte{}
for reader := range chunks {
chunk, err := ioutil.ReadAll(reader)
if err != nil {
m.items = [][]byte{}
for {
item := &snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(item)
if err == io.EOF {
break
} else if err != nil {
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
}
payload := item.GetExtensionPayload()
if payload == nil {
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
}
m.items = append(m.items, payload.Payload)
}
return snapshottypes.SnapshotItem{}, nil
}
func (m *mockSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
for _, item := range m.items {
if err := types.WriteExtensionItem(protoWriter, item); err != nil {
return err
}
m.chunks = append(m.chunks, chunk)
}
return nil
}
func (m *mockSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
if format == 0 {
return nil, types.ErrUnknownFormat
}
ch := make(chan io.ReadCloser, len(m.chunks))
for _, chunk := range m.chunks {
ch <- ioutil.NopCloser(bytes.NewReader(chunk))
}
close(ch)
return ch, nil
func (m *mockSnapshotter) SnapshotFormat() uint32 {
return 1
}
func (m *mockSnapshotter) SupportedFormats() []uint32 {
return []uint32{1}
}
// setupBusyManager creates a manager with an empty store that is busy creating a snapshot at height 1.
@ -138,15 +183,13 @@ func (m *hungSnapshotter) Close() {
close(m.ch)
}
func (m *hungSnapshotter) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
func (m *hungSnapshotter) Snapshot(height uint64, protoWriter protoio.Writer) error {
<-m.ch
ch := make(chan io.ReadCloser, 1)
ch <- ioutil.NopCloser(bytes.NewReader([]byte{}))
return ch, nil
return nil
}
func (m *hungSnapshotter) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
panic("not implemented")
}

View File

@ -3,8 +3,11 @@ package snapshots
import (
"bytes"
"crypto/sha256"
"fmt"
"io"
"io/ioutil"
"math"
"sort"
"sync"
"github.com/cosmos/cosmos-sdk/snapshots/types"
@ -18,6 +21,8 @@ const (
opRestore operation = "restore"
chunkBufferSize = 4
snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit
)
// operation represents a Manager operation. Only one operation can be in progress at a time.
@ -43,8 +48,9 @@ type restoreDone struct {
// 2) io.ReadCloser streams automatically propagate IO errors, and can pass arbitrary
// errors via io.Pipe.CloseWithError().
type Manager struct {
store *Store
target types.Snapshotter
store *Store
multistore types.Snapshotter
extensions map[string]types.ExtensionSnapshotter
mtx sync.Mutex
operation operation
@ -55,13 +61,38 @@ type Manager struct {
}
// NewManager creates a new manager.
func NewManager(store *Store, target types.Snapshotter) *Manager {
func NewManager(store *Store, multistore types.Snapshotter) *Manager {
return &Manager{
store: store,
target: target,
store: store,
multistore: multistore,
extensions: make(map[string]types.ExtensionSnapshotter),
}
}
// NewManagerWithExtensions creates a new manager.
func NewManagerWithExtensions(store *Store, multistore types.Snapshotter, extensions map[string]types.ExtensionSnapshotter) *Manager {
return &Manager{
store: store,
multistore: multistore,
extensions: extensions,
}
}
// RegisterExtensions register extension snapshotters to manager
func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) error {
for _, extension := range extensions {
name := extension.SnapshotName()
if _, ok := m.extensions[name]; ok {
return fmt.Errorf("duplicated snapshotter name: %s", name)
}
if !IsFormatSupported(extension, extension.SnapshotFormat()) {
return fmt.Errorf("snapshotter don't support it's own snapshot format: %s %d", name, extension.SnapshotFormat())
}
m.extensions[name] = extension
}
return nil
}
// begin starts an operation, or errors if one is in progress. It manages the mutex itself.
func (m *Manager) begin(op operation) error {
m.mtx.Lock()
@ -100,6 +131,17 @@ func (m *Manager) endLocked() {
m.restoreChunkIndex = 0
}
// sortedExtensionNames sort extension names for deterministic iteration.
func (m *Manager) sortedExtensionNames() []string {
names := make([]string, 0, len(m.extensions))
for name := range m.extensions {
names = append(names, name)
}
sort.Strings(names)
return names
}
// Create creates a snapshot and returns its metadata.
func (m *Manager) Create(height uint64) (*types.Snapshot, error) {
if m == nil {
@ -120,11 +162,45 @@ func (m *Manager) Create(height uint64) (*types.Snapshot, error) {
"a more recent snapshot already exists at height %v", latest.Height)
}
chunks, err := m.target.Snapshot(height, types.CurrentFormat)
if err != nil {
return nil, err
// Spawn goroutine to generate snapshot chunks and pass their io.ReadClosers through a channel
ch := make(chan io.ReadCloser)
go m.createSnapshot(height, ch)
return m.store.Save(height, types.CurrentFormat, ch)
}
// createSnapshot do the heavy work of snapshotting after the validations of request are done
// the produced chunks are written to the channel.
func (m *Manager) createSnapshot(height uint64, ch chan<- io.ReadCloser) {
streamWriter := NewStreamWriter(ch)
if streamWriter == nil {
return
}
defer streamWriter.Close()
if err := m.multistore.Snapshot(height, streamWriter); err != nil {
streamWriter.CloseWithError(err)
return
}
for _, name := range m.sortedExtensionNames() {
extension := m.extensions[name]
// write extension metadata
err := streamWriter.WriteMsg(&types.SnapshotItem{
Item: &types.SnapshotItem_Extension{
Extension: &types.SnapshotExtensionMeta{
Name: name,
Format: extension.SnapshotFormat(),
},
},
})
if err != nil {
streamWriter.CloseWithError(err)
return
}
if err := extension.Snapshot(height, streamWriter); err != nil {
streamWriter.CloseWithError(err)
return
}
}
return m.store.Save(height, types.CurrentFormat, chunks)
}
// List lists snapshots, mirroring ABCI ListSnapshots. It can be concurrent with other operations.
@ -170,6 +246,19 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {
}
m.mtx.Lock()
defer m.mtx.Unlock()
// check multistore supported format preemptive
if snapshot.Format != types.CurrentFormat {
return sdkerrors.Wrapf(types.ErrUnknownFormat, "snapshot format %v", snapshot.Format)
}
if snapshot.Height == 0 {
return sdkerrors.Wrap(sdkerrors.ErrLogic, "cannot restore snapshot at height 0")
}
if snapshot.Height > uint64(math.MaxInt64) {
return sdkerrors.Wrapf(types.ErrInvalidMetadata,
"snapshot height %v cannot exceed %v", snapshot.Height, int64(math.MaxInt64))
}
err := m.beginLocked(opRestore)
if err != nil {
return err
@ -177,10 +266,10 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {
// Start an asynchronous snapshot restoration, passing chunks and completion status via channels.
chChunks := make(chan io.ReadCloser, chunkBufferSize)
chReady := make(chan struct{}, 1)
chDone := make(chan restoreDone, 1)
go func() {
err := m.target.Restore(snapshot.Height, snapshot.Format, chChunks, chReady)
err := m.restoreSnapshot(snapshot, chChunks)
chDone <- restoreDone{
complete: err == nil,
err: err,
@ -188,17 +277,6 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {
close(chDone)
}()
// Check for any initial errors from the restore, before any chunks are fed.
select {
case done := <-chDone:
m.endLocked()
if done.err != nil {
return done.err
}
return sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended unexpectedly")
case <-chReady:
}
m.chRestore = chChunks
m.chRestoreDone = chDone
m.restoreChunkHashes = snapshot.Metadata.ChunkHashes
@ -206,6 +284,42 @@ func (m *Manager) Restore(snapshot types.Snapshot) error {
return nil
}
// restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
streamReader, err := NewStreamReader(chChunks)
if err != nil {
return err
}
defer streamReader.Close()
next, err := m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader)
if err != nil {
return sdkerrors.Wrap(err, "multistore restore")
}
for {
if next.Item == nil {
// end of stream
break
}
metadata := next.GetExtension()
if metadata == nil {
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", next.Item)
}
extension, ok := m.extensions[metadata.Name]
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown extension snapshotter %s", metadata.Name)
}
if !IsFormatSupported(extension, metadata.Format) {
return sdkerrors.Wrapf(types.ErrUnknownFormat, "format %v for extension %s", metadata.Format, metadata.Name)
}
next, err = extension.Restore(snapshot.Height, metadata.Format, streamReader)
if err != nil {
return sdkerrors.Wrapf(err, "extension %s restore", metadata.Name)
}
}
return nil
}
// RestoreChunk adds a chunk to an active snapshot restoration, mirroring ABCI ApplySnapshotChunk.
// Chunks must be given until the restore is complete, returning true, or a chunk errors.
func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
@ -257,3 +371,13 @@ func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
}
return false, nil
}
// IsFormatSupported returns if the snapshotter supports restoration from given format.
func IsFormatSupported(snapshotter types.ExtensionSnapshotter, format uint32) bool {
for _, i := range snapshotter.SupportedFormats() {
if i == format {
return true
}
}
return false
}

View File

@ -53,13 +53,15 @@ func TestManager_LoadChunk(t *testing.T) {
func TestManager_Take(t *testing.T) {
store := setupStore(t)
snapshotter := &mockSnapshotter{
chunks: [][]byte{
{1, 2, 3},
{4, 5, 6},
{7, 8, 9},
},
items := [][]byte{
{1, 2, 3},
{4, 5, 6},
{7, 8, 9},
}
snapshotter := &mockSnapshotter{
items: items,
}
expectChunks := snapshotItems(items)
manager := snapshots.NewManager(store, snapshotter)
// nil manager should return error
@ -75,19 +77,18 @@ func TestManager_Take(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, &types.Snapshot{
Height: 5,
Format: types.CurrentFormat,
Chunks: 3,
Hash: []uint8{0x47, 0xe4, 0xee, 0x7f, 0x21, 0x1f, 0x73, 0x26, 0x5d, 0xd1, 0x76, 0x58, 0xf6, 0xe2, 0x1c, 0x13, 0x18, 0xbd, 0x6c, 0x81, 0xf3, 0x75, 0x98, 0xe2, 0xa, 0x27, 0x56, 0x29, 0x95, 0x42, 0xef, 0xcf},
Format: snapshotter.SnapshotFormat(),
Chunks: 1,
Hash: []uint8{0xcd, 0x17, 0x9e, 0x7f, 0x28, 0xb6, 0x82, 0x90, 0xc7, 0x25, 0xf3, 0x42, 0xac, 0x65, 0x73, 0x50, 0xaa, 0xa0, 0x10, 0x5c, 0x40, 0x8c, 0xd5, 0x1, 0xed, 0x82, 0xb5, 0xca, 0x8b, 0xe0, 0x83, 0xa2},
Metadata: types.Metadata{
ChunkHashes: checksums([][]byte{
{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}),
ChunkHashes: checksums(expectChunks),
},
}, snapshot)
storeSnapshot, chunks, err := store.Load(snapshot.Height, snapshot.Format)
require.NoError(t, err)
assert.Equal(t, snapshot, storeSnapshot)
assert.Equal(t, [][]byte{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}, readChunks(chunks))
assert.Equal(t, expectChunks, readChunks(chunks))
// creating a snapshot while a different snapshot is being created should error
manager = setupBusyManager(t)
@ -118,12 +119,14 @@ func TestManager_Restore(t *testing.T) {
target := &mockSnapshotter{}
manager := snapshots.NewManager(store, target)
chunks := [][]byte{
expectItems := [][]byte{
{1, 2, 3},
{4, 5, 6},
{7, 8, 9},
}
chunks := snapshotItems(expectItems)
// Restore errors on invalid format
err := manager.Restore(types.Snapshot{
Height: 3,
@ -133,7 +136,7 @@ func TestManager_Restore(t *testing.T) {
Metadata: types.Metadata{ChunkHashes: checksums(chunks)},
})
require.Error(t, err)
require.Equal(t, types.ErrUnknownFormat, err)
require.ErrorIs(t, err, types.ErrUnknownFormat)
// Restore errors on no chunks
err = manager.Restore(types.Snapshot{Height: 3, Format: 1, Hash: []byte{1, 2, 3}})
@ -154,7 +157,7 @@ func TestManager_Restore(t *testing.T) {
Height: 3,
Format: 1,
Hash: []byte{1, 2, 3},
Chunks: 3,
Chunks: 1,
Metadata: types.Metadata{ChunkHashes: checksums(chunks)},
})
require.NoError(t, err)
@ -182,7 +185,7 @@ func TestManager_Restore(t *testing.T) {
}
}
assert.Equal(t, chunks, target.chunks)
assert.Equal(t, expectItems, target.items)
// Starting a new restore should fail now, because the target already has contents.
err = manager.Restore(types.Snapshot{
@ -197,12 +200,12 @@ func TestManager_Restore(t *testing.T) {
// But if we clear out the target we should be able to start a new restore. This time we'll
// fail it with a checksum error. That error should stop the operation, so that we can do
// a prune operation right after.
target.chunks = nil
target.items = nil
err = manager.Restore(types.Snapshot{
Height: 3,
Format: 1,
Hash: []byte{1, 2, 3},
Chunks: 3,
Chunks: 1,
Metadata: types.Metadata{ChunkHashes: checksums(chunks)},
})
require.NoError(t, err)

109
snapshots/stream.go Normal file
View File

@ -0,0 +1,109 @@
package snapshots
import (
"bufio"
"compress/zlib"
"io"
protoio "github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)
const (
// Do not change chunk size without new snapshot format (must be uniform across nodes)
snapshotChunkSize = uint64(10e6)
snapshotBufferSize = int(snapshotChunkSize)
// Do not change compression level without new snapshot format (must be uniform across nodes)
snapshotCompressionLevel = 7
)
// StreamWriter set up a stream pipeline to serialize snapshot nodes:
// Exported Items -> delimited Protobuf -> zlib -> buffer -> chunkWriter -> chan io.ReadCloser
type StreamWriter struct {
chunkWriter *ChunkWriter
bufWriter *bufio.Writer
zWriter *zlib.Writer
protoWriter protoio.WriteCloser
}
// NewStreamWriter set up a stream pipeline to serialize snapshot DB records.
func NewStreamWriter(ch chan<- io.ReadCloser) *StreamWriter {
chunkWriter := NewChunkWriter(ch, snapshotChunkSize)
bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize)
zWriter, err := zlib.NewWriterLevel(bufWriter, snapshotCompressionLevel)
if err != nil {
chunkWriter.CloseWithError(sdkerrors.Wrap(err, "zlib failure"))
return nil
}
protoWriter := protoio.NewDelimitedWriter(zWriter)
return &StreamWriter{
chunkWriter: chunkWriter,
bufWriter: bufWriter,
zWriter: zWriter,
protoWriter: protoWriter,
}
}
// WriteMsg implements protoio.Write interface
func (sw *StreamWriter) WriteMsg(msg proto.Message) error {
return sw.protoWriter.WriteMsg(msg)
}
// Close implements io.Closer interface
func (sw *StreamWriter) Close() error {
if err := sw.protoWriter.Close(); err != nil {
sw.chunkWriter.CloseWithError(err)
return err
}
if err := sw.zWriter.Close(); err != nil {
sw.chunkWriter.CloseWithError(err)
return err
}
if err := sw.bufWriter.Flush(); err != nil {
sw.chunkWriter.CloseWithError(err)
return err
}
return sw.chunkWriter.Close()
}
// CloseWithError pass error to chunkWriter
func (sw *StreamWriter) CloseWithError(err error) {
sw.chunkWriter.CloseWithError(err)
}
// StreamReader set up a restore stream pipeline
// chan io.ReadCloser -> chunkReader -> zlib -> delimited Protobuf -> ExportNode
type StreamReader struct {
chunkReader *ChunkReader
zReader io.ReadCloser
protoReader protoio.ReadCloser
}
// NewStreamReader set up a restore stream pipeline.
func NewStreamReader(chunks <-chan io.ReadCloser) (*StreamReader, error) {
chunkReader := NewChunkReader(chunks)
zReader, err := zlib.NewReader(chunkReader)
if err != nil {
return nil, sdkerrors.Wrap(err, "zlib failure")
}
protoReader := protoio.NewDelimitedReader(zReader, snapshotMaxItemSize)
return &StreamReader{
chunkReader: chunkReader,
zReader: zReader,
protoReader: protoReader,
}, nil
}
// ReadMsg implements protoio.Reader interface
func (sr *StreamReader) ReadMsg(msg proto.Message) error {
return sr.protoReader.ReadMsg(msg)
}
// Close implements io.Closer interface
func (sr *StreamReader) Close() error {
sr.protoReader.Close()
sr.zReader.Close()
return sr.chunkReader.Close()
}

File diff suppressed because it is too large Load Diff

View File

@ -1,16 +1,35 @@
package types
import "io"
import (
protoio "github.com/gogo/protobuf/io"
)
// Snapshotter is something that can create and restore snapshots, consisting of streamed binary
// chunks - all of which must be read from the channel and closed. If an unsupported format is
// given, it must return ErrUnknownFormat (possibly wrapped with fmt.Errorf).
type Snapshotter interface {
// Snapshot creates a state snapshot, returning a channel of snapshot chunk readers.
Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error)
// Snapshot writes snapshot items into the protobuf writer.
Snapshot(height uint64, protoWriter protoio.Writer) error
// Restore restores a state snapshot, taking snapshot chunk readers as input.
// Restore restores a state snapshot from the protobuf items read from the reader.
// If the ready channel is non-nil, it returns a ready signal (by being closed) once the
// restorer is ready to accept chunks.
Restore(height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{}) error
Restore(height uint64, format uint32, protoReader protoio.Reader) (SnapshotItem, error)
}
// ExtensionSnapshotter is an extension Snapshotter that is appended to the snapshot stream.
// ExtensionSnapshotter has an unique name and manages it's own internal formats.
type ExtensionSnapshotter interface {
Snapshotter
// SnapshotName returns the name of snapshotter, it should be unique in the manager.
SnapshotName() string
// SnapshotFormat returns the default format the extension snapshotter use to encode the
// payloads when taking a snapshot.
// It's defined within the extension, different from the global format for the whole state-sync snapshot.
SnapshotFormat() uint32
// SupportedFormats returns a list of formats it can restore from.
SupportedFormats() []uint32
}

16
snapshots/types/util.go Normal file
View File

@ -0,0 +1,16 @@
package types
import (
protoio "github.com/gogo/protobuf/io"
)
// WriteExtensionItem writes an item payload for current extention snapshotter.
func WriteExtensionItem(protoWriter protoio.Writer, item []byte) error {
return protoWriter.WriteMsg(&SnapshotItem{
Item: &SnapshotItem_ExtensionPayload{
ExtensionPayload: &SnapshotExtensionPayload{
Payload: item,
},
},
})
}

View File

@ -0,0 +1,308 @@
package rootmulti_test
import (
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"math/rand"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/cosmos/cosmos-sdk/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/store/iavl"
"github.com/cosmos/cosmos-sdk/store/rootmulti"
"github.com/cosmos/cosmos-sdk/store/types"
dbm "github.com/tendermint/tm-db"
)
func newMultiStoreWithGeneratedData(db dbm.DB, stores uint8, storeKeys uint64) *rootmulti.Store {
multiStore := rootmulti.NewStore(db)
r := rand.New(rand.NewSource(49872768940)) // Fixed seed for deterministic tests
keys := []*types.KVStoreKey{}
for i := uint8(0); i < stores; i++ {
key := types.NewKVStoreKey(fmt.Sprintf("store%v", i))
multiStore.MountStoreWithDB(key, types.StoreTypeIAVL, nil)
keys = append(keys, key)
}
multiStore.LoadLatestVersion()
for _, key := range keys {
store := multiStore.GetCommitKVStore(key).(*iavl.Store)
for i := uint64(0); i < storeKeys; i++ {
k := make([]byte, 8)
v := make([]byte, 1024)
binary.BigEndian.PutUint64(k, i)
_, err := r.Read(v)
if err != nil {
panic(err)
}
store.Set(k, v)
}
}
multiStore.Commit()
multiStore.LoadLatestVersion()
return multiStore
}
func newMultiStoreWithMixedMounts(db dbm.DB) *rootmulti.Store {
store := rootmulti.NewStore(db)
store.MountStoreWithDB(types.NewKVStoreKey("iavl1"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(types.NewKVStoreKey("iavl2"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(types.NewKVStoreKey("iavl3"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(types.NewTransientStoreKey("trans1"), types.StoreTypeTransient, nil)
store.LoadLatestVersion()
return store
}
func newMultiStoreWithMixedMountsAndBasicData(db dbm.DB) *rootmulti.Store {
store := newMultiStoreWithMixedMounts(db)
store1 := store.GetStoreByName("iavl1").(types.CommitKVStore)
store2 := store.GetStoreByName("iavl2").(types.CommitKVStore)
trans1 := store.GetStoreByName("trans1").(types.KVStore)
store1.Set([]byte("a"), []byte{1})
store1.Set([]byte("b"), []byte{1})
store2.Set([]byte("X"), []byte{255})
store2.Set([]byte("A"), []byte{101})
trans1.Set([]byte("x1"), []byte{91})
store.Commit()
store1.Set([]byte("b"), []byte{2})
store1.Set([]byte("c"), []byte{3})
store2.Set([]byte("B"), []byte{102})
store.Commit()
store2.Set([]byte("C"), []byte{103})
store2.Delete([]byte("X"))
trans1.Set([]byte("x2"), []byte{92})
store.Commit()
return store
}
func assertStoresEqual(t *testing.T, expect, actual types.CommitKVStore, msgAndArgs ...interface{}) {
assert.Equal(t, expect.LastCommitID(), actual.LastCommitID())
expectIter := expect.Iterator(nil, nil)
expectMap := map[string][]byte{}
for ; expectIter.Valid(); expectIter.Next() {
expectMap[string(expectIter.Key())] = expectIter.Value()
}
require.NoError(t, expectIter.Error())
actualIter := expect.Iterator(nil, nil)
actualMap := map[string][]byte{}
for ; actualIter.Valid(); actualIter.Next() {
actualMap[string(actualIter.Key())] = actualIter.Value()
}
require.NoError(t, actualIter.Error())
assert.Equal(t, expectMap, actualMap, msgAndArgs...)
}
func TestMultistoreSnapshot_Checksum(t *testing.T) {
// Chunks from different nodes must fit together, so all nodes must produce identical chunks.
// This checksum test makes sure that the byte stream remains identical. If the test fails
// without having changed the data (e.g. because the Protobuf or zlib encoding changes),
// snapshottypes.CurrentFormat must be bumped.
store := newMultiStoreWithGeneratedData(dbm.NewMemDB(), 5, 10000)
version := uint64(store.LastCommitID().Version)
testcases := []struct {
format uint32
chunkHashes []string
}{
{1, []string{
"503e5b51b657055b77e88169fadae543619368744ad15f1de0736c0a20482f24",
"e1a0daaa738eeb43e778aefd2805e3dd720798288a410b06da4b8459c4d8f72e",
"aa048b4ee0f484965d7b3b06822cf0772cdcaad02f3b1b9055e69f2cb365ef3c",
"7921eaa3ed4921341e504d9308a9877986a879fe216a099c86e8db66fcba4c63",
"a4a864e6c02c9fca5837ec80dc84f650b25276ed7e4820cf7516ced9f9901b86",
"ca2879ac6e7205d257440131ba7e72bef784cd61642e32b847729e543c1928b9",
}},
}
for _, tc := range testcases {
tc := tc
t.Run(fmt.Sprintf("Format %v", tc.format), func(t *testing.T) {
ch := make(chan io.ReadCloser)
go func() {
streamWriter := snapshots.NewStreamWriter(ch)
defer streamWriter.Close()
require.NotNil(t, streamWriter)
err := store.Snapshot(version, streamWriter)
require.NoError(t, err)
}()
hashes := []string{}
hasher := sha256.New()
for chunk := range ch {
hasher.Reset()
_, err := io.Copy(hasher, chunk)
require.NoError(t, err)
hashes = append(hashes, hex.EncodeToString(hasher.Sum(nil)))
}
assert.Equal(t, tc.chunkHashes, hashes,
"Snapshot output for format %v has changed", tc.format)
})
}
}
func TestMultistoreSnapshot_Errors(t *testing.T) {
store := newMultiStoreWithMixedMountsAndBasicData(dbm.NewMemDB())
testcases := map[string]struct {
height uint64
expectType error
}{
"0 height": {0, nil},
"unknown height": {9, nil},
}
for name, tc := range testcases {
tc := tc
t.Run(name, func(t *testing.T) {
err := store.Snapshot(tc.height, nil)
require.Error(t, err)
if tc.expectType != nil {
assert.True(t, errors.Is(err, tc.expectType))
}
})
}
}
func TestMultistoreSnapshotRestore(t *testing.T) {
source := newMultiStoreWithMixedMountsAndBasicData(dbm.NewMemDB())
target := newMultiStoreWithMixedMounts(dbm.NewMemDB())
version := uint64(source.LastCommitID().Version)
require.EqualValues(t, 3, version)
dummyExtensionItem := snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_Extension{
Extension: &snapshottypes.SnapshotExtensionMeta{
Name: "test",
Format: 1,
},
},
}
chunks := make(chan io.ReadCloser, 100)
go func() {
streamWriter := snapshots.NewStreamWriter(chunks)
require.NotNil(t, streamWriter)
defer streamWriter.Close()
err := source.Snapshot(version, streamWriter)
require.NoError(t, err)
// write an extension metadata
err = streamWriter.WriteMsg(&dummyExtensionItem)
require.NoError(t, err)
}()
streamReader, err := snapshots.NewStreamReader(chunks)
require.NoError(t, err)
nextItem, err := target.Restore(version, snapshottypes.CurrentFormat, streamReader)
require.NoError(t, err)
require.Equal(t, *dummyExtensionItem.GetExtension(), *nextItem.GetExtension())
assert.Equal(t, source.LastCommitID(), target.LastCommitID())
for key, sourceStore := range source.GetStores() {
targetStore := target.GetStoreByName(key.Name()).(types.CommitKVStore)
switch sourceStore.GetStoreType() {
case types.StoreTypeTransient:
assert.False(t, targetStore.Iterator(nil, nil).Valid(),
"transient store %v not empty", key.Name())
default:
assertStoresEqual(t, sourceStore, targetStore, "store %q not equal", key.Name())
}
}
}
func benchmarkMultistoreSnapshot(b *testing.B, stores uint8, storeKeys uint64) {
b.Skip("Noisy with slow setup time, please see https://github.com/cosmos/cosmos-sdk/issues/8855.")
b.ReportAllocs()
b.StopTimer()
source := newMultiStoreWithGeneratedData(dbm.NewMemDB(), stores, storeKeys)
version := source.LastCommitID().Version
require.EqualValues(b, 1, version)
b.StartTimer()
for i := 0; i < b.N; i++ {
target := rootmulti.NewStore(dbm.NewMemDB())
for key := range source.GetStores() {
target.MountStoreWithDB(key, types.StoreTypeIAVL, nil)
}
err := target.LoadLatestVersion()
require.NoError(b, err)
require.EqualValues(b, 0, target.LastCommitID().Version)
chunks := make(chan io.ReadCloser)
go func() {
streamWriter := snapshots.NewStreamWriter(chunks)
require.NotNil(b, streamWriter)
err := source.Snapshot(uint64(version), streamWriter)
require.NoError(b, err)
}()
for reader := range chunks {
_, err := io.Copy(io.Discard, reader)
require.NoError(b, err)
err = reader.Close()
require.NoError(b, err)
}
}
}
func benchmarkMultistoreSnapshotRestore(b *testing.B, stores uint8, storeKeys uint64) {
b.Skip("Noisy with slow setup time, please see https://github.com/cosmos/cosmos-sdk/issues/8855.")
b.ReportAllocs()
b.StopTimer()
source := newMultiStoreWithGeneratedData(dbm.NewMemDB(), stores, storeKeys)
version := uint64(source.LastCommitID().Version)
require.EqualValues(b, 1, version)
b.StartTimer()
for i := 0; i < b.N; i++ {
target := rootmulti.NewStore(dbm.NewMemDB())
for key := range source.GetStores() {
target.MountStoreWithDB(key, types.StoreTypeIAVL, nil)
}
err := target.LoadLatestVersion()
require.NoError(b, err)
require.EqualValues(b, 0, target.LastCommitID().Version)
chunks := make(chan io.ReadCloser)
go func() {
writer := snapshots.NewStreamWriter(chunks)
require.NotNil(b, writer)
err := source.Snapshot(version, writer)
require.NoError(b, err)
}()
reader, err := snapshots.NewStreamReader(chunks)
require.NoError(b, err)
_, err = target.Restore(version, snapshottypes.CurrentFormat, reader)
require.NoError(b, err)
require.Equal(b, source.LastCommitID(), target.LastCommitID())
}
}
func BenchmarkMultistoreSnapshot100K(b *testing.B) {
benchmarkMultistoreSnapshot(b, 10, 10000)
}
func BenchmarkMultistoreSnapshot1M(b *testing.B) {
benchmarkMultistoreSnapshot(b, 10, 100000)
}
func BenchmarkMultistoreSnapshotRestore100K(b *testing.B) {
benchmarkMultistoreSnapshotRestore(b, 10, 10000)
}
func BenchmarkMultistoreSnapshotRestore1M(b *testing.B) {
benchmarkMultistoreSnapshotRestore(b, 10, 100000)
}

View File

@ -1,8 +1,6 @@
package rootmulti
import (
"bufio"
"compress/zlib"
"encoding/binary"
"fmt"
"io"
@ -18,7 +16,6 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
dbm "github.com/tendermint/tm-db"
"github.com/cosmos/cosmos-sdk/snapshots"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
@ -35,11 +32,6 @@ const (
latestVersionKey = "s/latest"
pruneHeightsKey = "s/pruneheights"
commitInfoKeyFmt = "s/%d" // s/<version>
// Do not change chunk size without new snapshot format (must be uniform across nodes)
snapshotChunkSize = uint64(10e6)
snapshotBufferSize = int(snapshotChunkSize)
snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit
)
// Store is composed of many CommitStores. Name contrasts with
@ -154,6 +146,11 @@ func (rs *Store) GetCommitKVStore(key types.StoreKey) types.CommitKVStore {
return rs.stores[key]
}
// GetStores returns mounted stores
func (rs *Store) GetStores() map[types.StoreKey]types.CommitKVStore {
return rs.stores
}
// LoadLatestVersionAndUpgrade implements CommitMultiStore
func (rs *Store) LoadLatestVersionAndUpgrade(upgrades *types.StoreUpgrades) error {
ver := getLatestVersion(rs.db)
@ -549,11 +546,11 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore {
return store
}
// getStoreByName performs a lookup of a StoreKey given a store name typically
// GetStoreByName performs a lookup of a StoreKey given a store name typically
// provided in a path. The StoreKey is then used to perform a lookup and return
// a Store. If the Store is wrapped in an inter-block cache, it will be unwrapped
// prior to being returned. If the StoreKey does not exist, nil is returned.
func (rs *Store) getStoreByName(name string) types.Store {
func (rs *Store) GetStoreByName(name string) types.Store {
key := rs.keysByName[name]
if key == nil {
return nil
@ -573,7 +570,7 @@ func (rs *Store) Query(req abci.RequestQuery) abci.ResponseQuery {
return sdkerrors.QueryResult(err)
}
store := rs.getStoreByName(storeName)
store := rs.GetStoreByName(storeName)
if store == nil {
return sdkerrors.QueryResult(sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "no such store: %s", storeName))
}
@ -658,15 +655,12 @@ func parsePath(path string) (storeName string, subpath string, err error) {
// identical across nodes such that chunks from different sources fit together. If the output for a
// given format changes (at the byte level), the snapshot format must be bumped - see
// TestMultistoreSnapshot_Checksum test.
func (rs *Store) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, error) {
if format != snapshottypes.CurrentFormat {
return nil, sdkerrors.Wrapf(snapshottypes.ErrUnknownFormat, "format %v", format)
}
func (rs *Store) Snapshot(height uint64, protoWriter protoio.Writer) error {
if height == 0 {
return nil, sdkerrors.Wrap(sdkerrors.ErrLogic, "cannot snapshot height 0")
return sdkerrors.Wrap(sdkerrors.ErrLogic, "cannot snapshot height 0")
}
if height > uint64(rs.LastCommitID().Version) {
return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot snapshot future height %v", height)
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot snapshot future height %v", height)
}
// Collect stores to snapshot (only IAVL stores are supported)
@ -683,7 +677,7 @@ func (rs *Store) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, e
// Non-persisted stores shouldn't be snapshotted
continue
default:
return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic,
return sdkerrors.Wrapf(sdkerrors.ErrLogic,
"don't know how to snapshot store %q of type %T", key.Name(), store)
}
}
@ -691,160 +685,99 @@ func (rs *Store) Snapshot(height uint64, format uint32) (<-chan io.ReadCloser, e
return strings.Compare(stores[i].name, stores[j].name) == -1
})
// Spawn goroutine to generate snapshot chunks and pass their io.ReadClosers through a channel
ch := make(chan io.ReadCloser)
go func() {
// Set up a stream pipeline to serialize snapshot nodes:
// ExportNode -> delimited Protobuf -> zlib -> buffer -> chunkWriter -> chan io.ReadCloser
chunkWriter := snapshots.NewChunkWriter(ch, snapshotChunkSize)
defer chunkWriter.Close()
bufWriter := bufio.NewWriterSize(chunkWriter, snapshotBufferSize)
defer func() {
if err := bufWriter.Flush(); err != nil {
chunkWriter.CloseWithError(err)
}
}()
zWriter, err := zlib.NewWriterLevel(bufWriter, 7)
// Export each IAVL store. Stores are serialized as a stream of SnapshotItem Protobuf
// messages. The first item contains a SnapshotStore with store metadata (i.e. name),
// and the following messages contain a SnapshotNode (i.e. an ExportNode). Store changes
// are demarcated by new SnapshotStore items.
for _, store := range stores {
exporter, err := store.Export(int64(height))
if err != nil {
chunkWriter.CloseWithError(sdkerrors.Wrap(err, "zlib failure"))
return
return err
}
defer exporter.Close()
err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_Store{
Store: &snapshottypes.SnapshotStoreItem{
Name: store.name,
},
},
})
if err != nil {
return err
}
defer func() {
if err := zWriter.Close(); err != nil {
chunkWriter.CloseWithError(err)
}
}()
protoWriter := protoio.NewDelimitedWriter(zWriter)
defer func() {
if err := protoWriter.Close(); err != nil {
chunkWriter.CloseWithError(err)
}
}()
// Export each IAVL store. Stores are serialized as a stream of SnapshotItem Protobuf
// messages. The first item contains a SnapshotStore with store metadata (i.e. name),
// and the following messages contain a SnapshotNode (i.e. an ExportNode). Store changes
// are demarcated by new SnapshotStore items.
for _, store := range stores {
exporter, err := store.Export(int64(height))
if err != nil {
chunkWriter.CloseWithError(err)
return
for {
node, err := exporter.Next()
if err == iavltree.ExportDone {
break
} else if err != nil {
return err
}
defer exporter.Close()
err = protoWriter.WriteMsg(&types.SnapshotItem{
Item: &types.SnapshotItem_Store{
Store: &types.SnapshotStoreItem{
Name: store.name,
err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_IAVL{
IAVL: &snapshottypes.SnapshotIAVLItem{
Key: node.Key,
Value: node.Value,
Height: int32(node.Height),
Version: node.Version,
},
},
})
if err != nil {
chunkWriter.CloseWithError(err)
return
return err
}
for {
node, err := exporter.Next()
if err == iavltree.ExportDone {
break
} else if err != nil {
chunkWriter.CloseWithError(err)
return
}
err = protoWriter.WriteMsg(&types.SnapshotItem{
Item: &types.SnapshotItem_IAVL{
IAVL: &types.SnapshotIAVLItem{
Key: node.Key,
Value: node.Value,
Height: int32(node.Height),
Version: node.Version,
},
},
})
if err != nil {
chunkWriter.CloseWithError(err)
return
}
}
exporter.Close()
}
}()
exporter.Close()
}
return ch, nil
return nil
}
// Restore implements snapshottypes.Snapshotter.
// returns next snapshot item and error.
func (rs *Store) Restore(
height uint64, format uint32, chunks <-chan io.ReadCloser, ready chan<- struct{},
) error {
if format != snapshottypes.CurrentFormat {
return sdkerrors.Wrapf(snapshottypes.ErrUnknownFormat, "format %v", format)
}
if height == 0 {
return sdkerrors.Wrap(sdkerrors.ErrLogic, "cannot restore snapshot at height 0")
}
if height > uint64(math.MaxInt64) {
return sdkerrors.Wrapf(snapshottypes.ErrInvalidMetadata,
"snapshot height %v cannot exceed %v", height, int64(math.MaxInt64))
}
// Signal readiness. Must be done before the readers below are set up, since the zlib
// reader reads from the stream on initialization, potentially causing deadlocks.
if ready != nil {
close(ready)
}
// Set up a restore stream pipeline
// chan io.ReadCloser -> chunkReader -> zlib -> delimited Protobuf -> ExportNode
chunkReader := snapshots.NewChunkReader(chunks)
defer chunkReader.Close()
zReader, err := zlib.NewReader(chunkReader)
if err != nil {
return sdkerrors.Wrap(err, "zlib failure")
}
defer zReader.Close()
protoReader := protoio.NewDelimitedReader(zReader, snapshotMaxItemSize)
defer protoReader.Close()
height uint64, format uint32, protoReader protoio.Reader,
) (snapshottypes.SnapshotItem, error) {
// Import nodes into stores. The first item is expected to be a SnapshotItem containing
// a SnapshotStoreItem, telling us which store to import into. The following items will contain
// SnapshotNodeItem (i.e. ExportNode) until we reach the next SnapshotStoreItem or EOF.
var importer *iavltree.Importer
var snapshotItem snapshottypes.SnapshotItem
loop:
for {
item := &types.SnapshotItem{}
err := protoReader.ReadMsg(item)
snapshotItem = snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(&snapshotItem)
if err == io.EOF {
break
} else if err != nil {
return sdkerrors.Wrap(err, "invalid protobuf message")
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "invalid protobuf message")
}
switch item := item.Item.(type) {
case *types.SnapshotItem_Store:
switch item := snapshotItem.Item.(type) {
case *snapshottypes.SnapshotItem_Store:
if importer != nil {
err = importer.Commit()
if err != nil {
return sdkerrors.Wrap(err, "IAVL commit failed")
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "IAVL commit failed")
}
importer.Close()
}
store, ok := rs.getStoreByName(item.Store.Name).(*iavl.Store)
store, ok := rs.GetStoreByName(item.Store.Name).(*iavl.Store)
if !ok || store == nil {
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot import into non-IAVL store %q", item.Store.Name)
return snapshottypes.SnapshotItem{}, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot import into non-IAVL store %q", item.Store.Name)
}
importer, err = store.Import(int64(height))
if err != nil {
return sdkerrors.Wrap(err, "import failed")
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "import failed")
}
defer importer.Close()
case *types.SnapshotItem_IAVL:
case *snapshottypes.SnapshotItem_IAVL:
if importer == nil {
return sdkerrors.Wrap(sdkerrors.ErrLogic, "received IAVL node item before store item")
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(sdkerrors.ErrLogic, "received IAVL node item before store item")
}
if item.IAVL.Height > math.MaxInt8 {
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "node height %v cannot exceed %v",
return snapshottypes.SnapshotItem{}, sdkerrors.Wrapf(sdkerrors.ErrLogic, "node height %v cannot exceed %v",
item.IAVL.Height, math.MaxInt8)
}
node := &iavltree.ExportNode{
@ -863,24 +796,24 @@ func (rs *Store) Restore(
}
err := importer.Add(node)
if err != nil {
return sdkerrors.Wrap(err, "IAVL node import failed")
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "IAVL node import failed")
}
default:
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", item)
break loop
}
}
if importer != nil {
err := importer.Commit()
if err != nil {
return sdkerrors.Wrap(err, "IAVL commit failed")
return snapshottypes.SnapshotItem{}, sdkerrors.Wrap(err, "IAVL commit failed")
}
importer.Close()
}
flushMetadata(rs.db, int64(height), rs.buildCommitInfo(int64(height)), []int64{})
return rs.LoadLatestVersion()
return snapshotItem, rs.LoadLatestVersion()
}
func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID, params storeParams) (types.CommitKVStore, error) {

View File

@ -2,25 +2,16 @@ package rootmulti
import (
"bytes"
"crypto/sha256"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
dbm "github.com/tendermint/tm-db"
"github.com/cosmos/cosmos-sdk/codec"
codecTypes "github.com/cosmos/cosmos-sdk/codec/types"
snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types"
"github.com/cosmos/cosmos-sdk/store/cachemulti"
"github.com/cosmos/cosmos-sdk/store/iavl"
sdkmaps "github.com/cosmos/cosmos-sdk/store/internal/maps"
@ -87,7 +78,7 @@ func TestCacheMultiStoreWithVersion(t *testing.T) {
k, v := []byte("wind"), []byte("blows")
store1 := ms.getStoreByName("store1").(types.KVStore)
store1 := ms.GetStoreByName("store1").(types.KVStore)
store1.Set(k, v)
cID := ms.Commit()
@ -124,7 +115,7 @@ func TestHashStableWithEmptyCommit(t *testing.T) {
k, v := []byte("wind"), []byte("blows")
store1 := ms.getStoreByName("store1").(types.KVStore)
store1 := ms.GetStoreByName("store1").(types.KVStore)
store1.Set(k, v)
cID := ms.Commit()
@ -148,11 +139,11 @@ func TestMultistoreCommitLoad(t *testing.T) {
checkStore(t, store, commitID, commitID)
// Make sure we can get stores by name.
s1 := store.getStoreByName("store1")
s1 := store.GetStoreByName("store1")
require.NotNil(t, s1)
s3 := store.getStoreByName("store3")
s3 := store.GetStoreByName("store3")
require.NotNil(t, s3)
s77 := store.getStoreByName("store77")
s77 := store.GetStoreByName("store77")
require.Nil(t, s77)
// Make a few commits and check them.
@ -192,21 +183,21 @@ func TestMultistoreLoadWithUpgrade(t *testing.T) {
// write some data in all stores
k1, v1 := []byte("first"), []byte("store")
s1, _ := store.getStoreByName("store1").(types.KVStore)
s1, _ := store.GetStoreByName("store1").(types.KVStore)
require.NotNil(t, s1)
s1.Set(k1, v1)
k2, v2 := []byte("second"), []byte("restore")
s2, _ := store.getStoreByName("store2").(types.KVStore)
s2, _ := store.GetStoreByName("store2").(types.KVStore)
require.NotNil(t, s2)
s2.Set(k2, v2)
k3, v3 := []byte("third"), []byte("dropped")
s3, _ := store.getStoreByName("store3").(types.KVStore)
s3, _ := store.GetStoreByName("store3").(types.KVStore)
require.NotNil(t, s3)
s3.Set(k3, v3)
s4, _ := store.getStoreByName("store4").(types.KVStore)
s4, _ := store.GetStoreByName("store4").(types.KVStore)
require.Nil(t, s4)
// do one commit
@ -229,7 +220,7 @@ func TestMultistoreLoadWithUpgrade(t *testing.T) {
checkStore(t, store, commitID, commitID)
// let's query data to see it was saved properly
s2, _ = store.getStoreByName("store2").(types.KVStore)
s2, _ = store.GetStoreByName("store2").(types.KVStore)
require.NotNil(t, s2)
require.Equal(t, v2, s2.Get(k2))
@ -239,17 +230,17 @@ func TestMultistoreLoadWithUpgrade(t *testing.T) {
require.Nil(t, err)
// s1 was not changed
s1, _ = restore.getStoreByName("store1").(types.KVStore)
s1, _ = restore.GetStoreByName("store1").(types.KVStore)
require.NotNil(t, s1)
require.Equal(t, v1, s1.Get(k1))
// store3 is mounted, but data deleted are gone
s3, _ = restore.getStoreByName("store3").(types.KVStore)
s3, _ = restore.GetStoreByName("store3").(types.KVStore)
require.NotNil(t, s3)
require.Nil(t, s3.Get(k3)) // data was deleted
// store4 is mounted, with empty data
s4, _ = restore.getStoreByName("store4").(types.KVStore)
s4, _ = restore.GetStoreByName("store4").(types.KVStore)
require.NotNil(t, s4)
iterator := s4.Iterator(nil, nil)
@ -267,11 +258,11 @@ func TestMultistoreLoadWithUpgrade(t *testing.T) {
s4.Set(k4, v4)
// store2 is no longer mounted
st2 := restore.getStoreByName("store2")
st2 := restore.GetStoreByName("store2")
require.Nil(t, st2)
// restore2 has the old data
rs2, _ := restore.getStoreByName("restore2").(types.KVStore)
rs2, _ := restore.GetStoreByName("restore2").(types.KVStore)
require.NotNil(t, rs2)
require.Equal(t, v2, rs2.Get(k2))
@ -285,15 +276,15 @@ func TestMultistoreLoadWithUpgrade(t *testing.T) {
require.Equal(t, migratedID, reload.LastCommitID())
// query this new store
rl1, _ := reload.getStoreByName("store1").(types.KVStore)
rl1, _ := reload.GetStoreByName("store1").(types.KVStore)
require.NotNil(t, rl1)
require.Equal(t, v1, rl1.Get(k1))
rl2, _ := reload.getStoreByName("restore2").(types.KVStore)
rl2, _ := reload.GetStoreByName("restore2").(types.KVStore)
require.NotNil(t, rl2)
require.Equal(t, v2, rl2.Get(k2))
rl4, _ := reload.getStoreByName("store4").(types.KVStore)
rl4, _ := reload.GetStoreByName("store4").(types.KVStore)
require.NotNil(t, rl4)
require.Equal(t, v4, rl4.Get(k4))
@ -345,15 +336,15 @@ func TestMultiStoreRestart(t *testing.T) {
for i := 1; i < 3; i++ {
// Set and commit data in one store.
store1 := multi.getStoreByName("store1").(types.KVStore)
store1 := multi.GetStoreByName("store1").(types.KVStore)
store1.Set([]byte(k), []byte(fmt.Sprintf("%s:%d", v, i)))
// ... and another.
store2 := multi.getStoreByName("store2").(types.KVStore)
store2 := multi.GetStoreByName("store2").(types.KVStore)
store2.Set([]byte(k2), []byte(fmt.Sprintf("%s:%d", v2, i)))
// ... and another.
store3 := multi.getStoreByName("store3").(types.KVStore)
store3 := multi.GetStoreByName("store3").(types.KVStore)
store3.Set([]byte(k3), []byte(fmt.Sprintf("%s:%d", v3, i)))
multi.Commit()
@ -364,11 +355,11 @@ func TestMultiStoreRestart(t *testing.T) {
}
// Set and commit data in one store.
store1 := multi.getStoreByName("store1").(types.KVStore)
store1 := multi.GetStoreByName("store1").(types.KVStore)
store1.Set([]byte(k), []byte(fmt.Sprintf("%s:%d", v, 3)))
// ... and another.
store2 := multi.getStoreByName("store2").(types.KVStore)
store2 := multi.GetStoreByName("store2").(types.KVStore)
store2.Set([]byte(k2), []byte(fmt.Sprintf("%s:%d", v2, 3)))
multi.Commit()
@ -378,7 +369,7 @@ func TestMultiStoreRestart(t *testing.T) {
require.NotEqual(t, initCid, flushedCinfo, "CID is different after flush to disk")
// ... and another.
store3 := multi.getStoreByName("store3").(types.KVStore)
store3 := multi.GetStoreByName("store3").(types.KVStore)
store3.Set([]byte(k3), []byte(fmt.Sprintf("%s:%d", v3, 3)))
multi.Commit()
@ -395,16 +386,16 @@ func TestMultiStoreRestart(t *testing.T) {
require.Equal(t, int64(4), reloadedCid.Version, "Reloaded CID is not the same as last flushed CID")
// Check that store1 and store2 retained date from 3rd commit
store1 = multi.getStoreByName("store1").(types.KVStore)
store1 = multi.GetStoreByName("store1").(types.KVStore)
val := store1.Get([]byte(k))
require.Equal(t, []byte(fmt.Sprintf("%s:%d", v, 3)), val, "Reloaded value not the same as last flushed value")
store2 = multi.getStoreByName("store2").(types.KVStore)
store2 = multi.GetStoreByName("store2").(types.KVStore)
val2 := store2.Get([]byte(k2))
require.Equal(t, []byte(fmt.Sprintf("%s:%d", v2, 3)), val2, "Reloaded value not the same as last flushed value")
// Check that store3 still has data from last commit even though update happened on 2nd commit
store3 = multi.getStoreByName("store3").(types.KVStore)
store3 = multi.GetStoreByName("store3").(types.KVStore)
val3 := store3.Get([]byte(k3))
require.Equal(t, []byte(fmt.Sprintf("%s:%d", v3, 3)), val3, "Reloaded value not the same as last flushed value")
}
@ -422,15 +413,15 @@ func TestMultiStoreQuery(t *testing.T) {
cid := multi.Commit()
// Make sure we can get by name.
garbage := multi.getStoreByName("bad-name")
garbage := multi.GetStoreByName("bad-name")
require.Nil(t, garbage)
// Set and commit data in one store.
store1 := multi.getStoreByName("store1").(types.KVStore)
store1 := multi.GetStoreByName("store1").(types.KVStore)
store1.Set(k, v)
// ... and another.
store2 := multi.getStoreByName("store2").(types.KVStore)
store2 := multi.GetStoreByName("store2").(types.KVStore)
store2.Set(k2, v2)
// Commit the multistore.
@ -553,121 +544,6 @@ func TestMultiStore_PruningRestart(t *testing.T) {
}
}
func TestMultistoreSnapshot_Checksum(t *testing.T) {
// Chunks from different nodes must fit together, so all nodes must produce identical chunks.
// This checksum test makes sure that the byte stream remains identical. If the test fails
// without having changed the data (e.g. because the Protobuf or zlib encoding changes),
// snapshottypes.CurrentFormat must be bumped.
store := newMultiStoreWithGeneratedData(dbm.NewMemDB(), 5, 10000)
version := uint64(store.LastCommitID().Version)
testcases := []struct {
format uint32
chunkHashes []string
}{
{1, []string{
"503e5b51b657055b77e88169fadae543619368744ad15f1de0736c0a20482f24",
"e1a0daaa738eeb43e778aefd2805e3dd720798288a410b06da4b8459c4d8f72e",
"aa048b4ee0f484965d7b3b06822cf0772cdcaad02f3b1b9055e69f2cb365ef3c",
"7921eaa3ed4921341e504d9308a9877986a879fe216a099c86e8db66fcba4c63",
"a4a864e6c02c9fca5837ec80dc84f650b25276ed7e4820cf7516ced9f9901b86",
"ca2879ac6e7205d257440131ba7e72bef784cd61642e32b847729e543c1928b9",
}},
}
for _, tc := range testcases {
tc := tc
t.Run(fmt.Sprintf("Format %v", tc.format), func(t *testing.T) {
chunks, err := store.Snapshot(version, tc.format)
require.NoError(t, err)
hashes := []string{}
hasher := sha256.New()
for chunk := range chunks {
hasher.Reset()
_, err := io.Copy(hasher, chunk)
require.NoError(t, err)
hashes = append(hashes, hex.EncodeToString(hasher.Sum(nil)))
}
assert.Equal(t, tc.chunkHashes, hashes,
"Snapshot output for format %v has changed", tc.format)
})
}
}
func TestMultistoreSnapshot_Errors(t *testing.T) {
store := newMultiStoreWithMixedMountsAndBasicData(dbm.NewMemDB())
testcases := map[string]struct {
height uint64
format uint32
expectType error
}{
"0 height": {0, snapshottypes.CurrentFormat, nil},
"0 format": {1, 0, snapshottypes.ErrUnknownFormat},
"unknown height": {9, snapshottypes.CurrentFormat, nil},
"unknown format": {1, 9, snapshottypes.ErrUnknownFormat},
}
for name, tc := range testcases {
tc := tc
t.Run(name, func(t *testing.T) {
_, err := store.Snapshot(tc.height, tc.format)
require.Error(t, err)
if tc.expectType != nil {
assert.True(t, errors.Is(err, tc.expectType))
}
})
}
}
func TestMultistoreRestore_Errors(t *testing.T) {
store := newMultiStoreWithMixedMounts(dbm.NewMemDB())
testcases := map[string]struct {
height uint64
format uint32
expectType error
}{
"0 height": {0, snapshottypes.CurrentFormat, nil},
"0 format": {1, 0, snapshottypes.ErrUnknownFormat},
"unknown format": {1, 9, snapshottypes.ErrUnknownFormat},
}
for name, tc := range testcases {
tc := tc
t.Run(name, func(t *testing.T) {
err := store.Restore(tc.height, tc.format, nil, nil)
require.Error(t, err)
if tc.expectType != nil {
assert.True(t, errors.Is(err, tc.expectType))
}
})
}
}
func TestMultistoreSnapshotRestore(t *testing.T) {
source := newMultiStoreWithMixedMountsAndBasicData(dbm.NewMemDB())
target := newMultiStoreWithMixedMounts(dbm.NewMemDB())
version := uint64(source.LastCommitID().Version)
require.EqualValues(t, 3, version)
chunks, err := source.Snapshot(version, snapshottypes.CurrentFormat)
require.NoError(t, err)
ready := make(chan struct{})
err = target.Restore(version, snapshottypes.CurrentFormat, chunks, ready)
require.NoError(t, err)
assert.EqualValues(t, struct{}{}, <-ready)
assert.Equal(t, source.LastCommitID(), target.LastCommitID())
for key, sourceStore := range source.stores {
targetStore := target.getStoreByName(key.Name()).(types.CommitKVStore)
switch sourceStore.GetStoreType() {
case types.StoreTypeTransient:
assert.False(t, targetStore.Iterator(nil, nil).Valid(),
"transient store %v not empty", key.Name())
default:
assertStoresEqual(t, sourceStore, targetStore, "store %q not equal", key.Name())
}
}
}
func TestSetInitialVersion(t *testing.T) {
db := dbm.NewMemDB()
multi := newMultiStoreWithMounts(db, types.PruneNothing)
@ -855,79 +731,6 @@ func TestTraceConcurrency(t *testing.T) {
stopW <- struct{}{}
}
func BenchmarkMultistoreSnapshot100K(b *testing.B) {
benchmarkMultistoreSnapshot(b, 10, 10000)
}
func BenchmarkMultistoreSnapshot1M(b *testing.B) {
benchmarkMultistoreSnapshot(b, 10, 100000)
}
func BenchmarkMultistoreSnapshotRestore100K(b *testing.B) {
benchmarkMultistoreSnapshotRestore(b, 10, 10000)
}
func BenchmarkMultistoreSnapshotRestore1M(b *testing.B) {
benchmarkMultistoreSnapshotRestore(b, 10, 100000)
}
func benchmarkMultistoreSnapshot(b *testing.B, stores uint8, storeKeys uint64) {
b.Skip("Noisy with slow setup time, please see https://github.com/cosmos/cosmos-sdk/issues/8855.")
b.ReportAllocs()
b.StopTimer()
source := newMultiStoreWithGeneratedData(dbm.NewMemDB(), stores, storeKeys)
version := source.LastCommitID().Version
require.EqualValues(b, 1, version)
b.StartTimer()
for i := 0; i < b.N; i++ {
target := NewStore(dbm.NewMemDB())
for key := range source.stores {
target.MountStoreWithDB(key, types.StoreTypeIAVL, nil)
}
err := target.LoadLatestVersion()
require.NoError(b, err)
require.EqualValues(b, 0, target.LastCommitID().Version)
chunks, err := source.Snapshot(uint64(version), snapshottypes.CurrentFormat)
require.NoError(b, err)
for reader := range chunks {
_, err := io.Copy(ioutil.Discard, reader)
require.NoError(b, err)
err = reader.Close()
require.NoError(b, err)
}
}
}
func benchmarkMultistoreSnapshotRestore(b *testing.B, stores uint8, storeKeys uint64) {
b.Skip("Noisy with slow setup time, please see https://github.com/cosmos/cosmos-sdk/issues/8855.")
b.ReportAllocs()
b.StopTimer()
source := newMultiStoreWithGeneratedData(dbm.NewMemDB(), stores, storeKeys)
version := uint64(source.LastCommitID().Version)
require.EqualValues(b, 1, version)
b.StartTimer()
for i := 0; i < b.N; i++ {
target := NewStore(dbm.NewMemDB())
for key := range source.stores {
target.MountStoreWithDB(key, types.StoreTypeIAVL, nil)
}
err := target.LoadLatestVersion()
require.NoError(b, err)
require.EqualValues(b, 0, target.LastCommitID().Version)
chunks, err := source.Snapshot(version, snapshottypes.CurrentFormat)
require.NoError(b, err)
err = target.Restore(version, snapshottypes.CurrentFormat, chunks, nil)
require.NoError(b, err)
require.Equal(b, source.LastCommitID(), target.LastCommitID())
}
}
//-----------------------------------------------------------------------
// utils
@ -948,75 +751,6 @@ func newMultiStoreWithMounts(db dbm.DB, pruningOpts types.PruningOptions) *Store
return store
}
func newMultiStoreWithMixedMounts(db dbm.DB) *Store {
store := NewStore(db)
store.MountStoreWithDB(types.NewKVStoreKey("iavl1"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(types.NewKVStoreKey("iavl2"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(types.NewKVStoreKey("iavl3"), types.StoreTypeIAVL, nil)
store.MountStoreWithDB(types.NewTransientStoreKey("trans1"), types.StoreTypeTransient, nil)
store.LoadLatestVersion()
return store
}
func newMultiStoreWithMixedMountsAndBasicData(db dbm.DB) *Store {
store := newMultiStoreWithMixedMounts(db)
store1 := store.getStoreByName("iavl1").(types.CommitKVStore)
store2 := store.getStoreByName("iavl2").(types.CommitKVStore)
trans1 := store.getStoreByName("trans1").(types.KVStore)
store1.Set([]byte("a"), []byte{1})
store1.Set([]byte("b"), []byte{1})
store2.Set([]byte("X"), []byte{255})
store2.Set([]byte("A"), []byte{101})
trans1.Set([]byte("x1"), []byte{91})
store.Commit()
store1.Set([]byte("b"), []byte{2})
store1.Set([]byte("c"), []byte{3})
store2.Set([]byte("B"), []byte{102})
store.Commit()
store2.Set([]byte("C"), []byte{103})
store2.Delete([]byte("X"))
trans1.Set([]byte("x2"), []byte{92})
store.Commit()
return store
}
func newMultiStoreWithGeneratedData(db dbm.DB, stores uint8, storeKeys uint64) *Store {
multiStore := NewStore(db)
r := rand.New(rand.NewSource(49872768940)) // Fixed seed for deterministic tests
keys := []*types.KVStoreKey{}
for i := uint8(0); i < stores; i++ {
key := types.NewKVStoreKey(fmt.Sprintf("store%v", i))
multiStore.MountStoreWithDB(key, types.StoreTypeIAVL, nil)
keys = append(keys, key)
}
multiStore.LoadLatestVersion()
for _, key := range keys {
store := multiStore.stores[key].(*iavl.Store)
for i := uint64(0); i < storeKeys; i++ {
k := make([]byte, 8)
v := make([]byte, 1024)
binary.BigEndian.PutUint64(k, i)
_, err := r.Read(v)
if err != nil {
panic(err)
}
store.Set(k, v)
}
}
multiStore.Commit()
multiStore.LoadLatestVersion()
return multiStore
}
func newMultiStoreWithModifiedMounts(db dbm.DB, pruningOpts types.PruningOptions) (*Store, *types.StoreUpgrades) {
store := NewStore(db)
store.pruningOpts = pruningOpts
@ -1038,25 +772,6 @@ func newMultiStoreWithModifiedMounts(db dbm.DB, pruningOpts types.PruningOptions
return store, upgrades
}
func assertStoresEqual(t *testing.T, expect, actual types.CommitKVStore, msgAndArgs ...interface{}) {
assert.Equal(t, expect.LastCommitID(), actual.LastCommitID())
expectIter := expect.Iterator(nil, nil)
expectMap := map[string][]byte{}
for ; expectIter.Valid(); expectIter.Next() {
expectMap[string(expectIter.Key())] = expectIter.Value()
}
require.NoError(t, expectIter.Error())
actualIter := expect.Iterator(nil, nil)
actualMap := map[string][]byte{}
for ; actualIter.Valid(); actualIter.Next() {
actualMap[string(actualIter.Key())] = actualIter.Value()
}
require.NoError(t, actualIter.Error())
assert.Equal(t, expectMap, actualMap, msgAndArgs...)
}
func checkStore(t *testing.T, store *Store, expect, got types.CommitID) {
require.Equal(t, expect, got)
require.Equal(t, expect, store.LastCommitID())

View File

@ -1,944 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: cosmos/base/store/v1beta1/snapshot.proto
package types
import (
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
// SnapshotItem is an item contained in a rootmulti.Store snapshot.
type SnapshotItem struct {
// item is the specific type of snapshot item.
//
// Types that are valid to be assigned to Item:
// *SnapshotItem_Store
// *SnapshotItem_IAVL
Item isSnapshotItem_Item `protobuf_oneof:"item"`
}
func (m *SnapshotItem) Reset() { *m = SnapshotItem{} }
func (m *SnapshotItem) String() string { return proto.CompactTextString(m) }
func (*SnapshotItem) ProtoMessage() {}
func (*SnapshotItem) Descriptor() ([]byte, []int) {
return fileDescriptor_9c55879db4cc4502, []int{0}
}
func (m *SnapshotItem) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *SnapshotItem) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_SnapshotItem.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *SnapshotItem) XXX_Merge(src proto.Message) {
xxx_messageInfo_SnapshotItem.Merge(m, src)
}
func (m *SnapshotItem) XXX_Size() int {
return m.Size()
}
func (m *SnapshotItem) XXX_DiscardUnknown() {
xxx_messageInfo_SnapshotItem.DiscardUnknown(m)
}
var xxx_messageInfo_SnapshotItem proto.InternalMessageInfo
type isSnapshotItem_Item interface {
isSnapshotItem_Item()
MarshalTo([]byte) (int, error)
Size() int
}
type SnapshotItem_Store struct {
Store *SnapshotStoreItem `protobuf:"bytes,1,opt,name=store,proto3,oneof" json:"store,omitempty"`
}
type SnapshotItem_IAVL struct {
IAVL *SnapshotIAVLItem `protobuf:"bytes,2,opt,name=iavl,proto3,oneof" json:"iavl,omitempty"`
}
func (*SnapshotItem_Store) isSnapshotItem_Item() {}
func (*SnapshotItem_IAVL) isSnapshotItem_Item() {}
func (m *SnapshotItem) GetItem() isSnapshotItem_Item {
if m != nil {
return m.Item
}
return nil
}
func (m *SnapshotItem) GetStore() *SnapshotStoreItem {
if x, ok := m.GetItem().(*SnapshotItem_Store); ok {
return x.Store
}
return nil
}
func (m *SnapshotItem) GetIAVL() *SnapshotIAVLItem {
if x, ok := m.GetItem().(*SnapshotItem_IAVL); ok {
return x.IAVL
}
return nil
}
// XXX_OneofWrappers is for the internal use of the proto package.
func (*SnapshotItem) XXX_OneofWrappers() []interface{} {
return []interface{}{
(*SnapshotItem_Store)(nil),
(*SnapshotItem_IAVL)(nil),
}
}
// SnapshotStoreItem contains metadata about a snapshotted store.
type SnapshotStoreItem struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}
func (m *SnapshotStoreItem) Reset() { *m = SnapshotStoreItem{} }
func (m *SnapshotStoreItem) String() string { return proto.CompactTextString(m) }
func (*SnapshotStoreItem) ProtoMessage() {}
func (*SnapshotStoreItem) Descriptor() ([]byte, []int) {
return fileDescriptor_9c55879db4cc4502, []int{1}
}
func (m *SnapshotStoreItem) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *SnapshotStoreItem) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_SnapshotStoreItem.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *SnapshotStoreItem) XXX_Merge(src proto.Message) {
xxx_messageInfo_SnapshotStoreItem.Merge(m, src)
}
func (m *SnapshotStoreItem) XXX_Size() int {
return m.Size()
}
func (m *SnapshotStoreItem) XXX_DiscardUnknown() {
xxx_messageInfo_SnapshotStoreItem.DiscardUnknown(m)
}
var xxx_messageInfo_SnapshotStoreItem proto.InternalMessageInfo
func (m *SnapshotStoreItem) GetName() string {
if m != nil {
return m.Name
}
return ""
}
// SnapshotIAVLItem is an exported IAVL node.
type SnapshotIAVLItem struct {
Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
Version int64 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"`
Height int32 `protobuf:"varint,4,opt,name=height,proto3" json:"height,omitempty"`
}
func (m *SnapshotIAVLItem) Reset() { *m = SnapshotIAVLItem{} }
func (m *SnapshotIAVLItem) String() string { return proto.CompactTextString(m) }
func (*SnapshotIAVLItem) ProtoMessage() {}
func (*SnapshotIAVLItem) Descriptor() ([]byte, []int) {
return fileDescriptor_9c55879db4cc4502, []int{2}
}
func (m *SnapshotIAVLItem) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *SnapshotIAVLItem) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_SnapshotIAVLItem.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *SnapshotIAVLItem) XXX_Merge(src proto.Message) {
xxx_messageInfo_SnapshotIAVLItem.Merge(m, src)
}
func (m *SnapshotIAVLItem) XXX_Size() int {
return m.Size()
}
func (m *SnapshotIAVLItem) XXX_DiscardUnknown() {
xxx_messageInfo_SnapshotIAVLItem.DiscardUnknown(m)
}
var xxx_messageInfo_SnapshotIAVLItem proto.InternalMessageInfo
func (m *SnapshotIAVLItem) GetKey() []byte {
if m != nil {
return m.Key
}
return nil
}
func (m *SnapshotIAVLItem) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *SnapshotIAVLItem) GetVersion() int64 {
if m != nil {
return m.Version
}
return 0
}
func (m *SnapshotIAVLItem) GetHeight() int32 {
if m != nil {
return m.Height
}
return 0
}
func init() {
proto.RegisterType((*SnapshotItem)(nil), "cosmos.base.store.v1beta1.SnapshotItem")
proto.RegisterType((*SnapshotStoreItem)(nil), "cosmos.base.store.v1beta1.SnapshotStoreItem")
proto.RegisterType((*SnapshotIAVLItem)(nil), "cosmos.base.store.v1beta1.SnapshotIAVLItem")
}
func init() {
proto.RegisterFile("cosmos/base/store/v1beta1/snapshot.proto", fileDescriptor_9c55879db4cc4502)
}
var fileDescriptor_9c55879db4cc4502 = []byte{
// 324 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xc1, 0x4a, 0xc3, 0x30,
0x18, 0xc7, 0x1b, 0xd7, 0x4d, 0xfd, 0xdc, 0x61, 0x86, 0x21, 0xd5, 0x43, 0x1d, 0xbb, 0x58, 0x50,
0x13, 0xa6, 0x4f, 0x60, 0xf1, 0xb0, 0xa1, 0xa7, 0x0c, 0x3c, 0x78, 0x4b, 0x67, 0x68, 0xcb, 0xd6,
0x65, 0x2c, 0x59, 0x61, 0x6f, 0xe1, 0x6b, 0xf8, 0x26, 0x1e, 0x77, 0xf4, 0x24, 0xd2, 0xbd, 0x88,
0x24, 0xe9, 0x2e, 0x8a, 0xe0, 0xa9, 0xdf, 0xbf, 0xfc, 0xfe, 0xbf, 0x7c, 0xf0, 0x41, 0x34, 0x91,
0xaa, 0x90, 0x8a, 0x26, 0x5c, 0x09, 0xaa, 0xb4, 0x5c, 0x0a, 0x5a, 0x0e, 0x12, 0xa1, 0xf9, 0x80,
0xaa, 0x39, 0x5f, 0xa8, 0x4c, 0x6a, 0xb2, 0x58, 0x4a, 0x2d, 0xf1, 0xa9, 0x23, 0x89, 0x21, 0x89,
0x25, 0x49, 0x4d, 0x9e, 0x75, 0x53, 0x99, 0x4a, 0x4b, 0x51, 0x33, 0xb9, 0x42, 0xff, 0x0d, 0x41,
0x7b, 0x5c, 0x3b, 0x46, 0x5a, 0x14, 0xf8, 0x1e, 0x9a, 0xb6, 0x17, 0xa0, 0x1e, 0x8a, 0x8e, 0x6e,
0xae, 0xc8, 0x9f, 0x46, 0xb2, 0xeb, 0x8d, 0xcd, 0x5f, 0x53, 0x1e, 0x7a, 0xcc, 0x95, 0xf1, 0x03,
0xf8, 0x39, 0x2f, 0x67, 0xc1, 0x9e, 0x95, 0x5c, 0xfe, 0x43, 0x32, 0xba, 0x7b, 0x7a, 0x34, 0x8e,
0xf8, 0xa0, 0xfa, 0x3c, 0xf7, 0x4d, 0x1a, 0x7a, 0xcc, 0x4a, 0xe2, 0x16, 0xf8, 0xb9, 0x16, 0x45,
0xff, 0x02, 0x8e, 0x7f, 0x3d, 0x89, 0x31, 0xf8, 0x73, 0x5e, 0xb8, 0x75, 0x0f, 0x99, 0x9d, 0xfb,
0x33, 0xe8, 0xfc, 0xd4, 0xe2, 0x0e, 0x34, 0xa6, 0x62, 0x6d, 0xb1, 0x36, 0x33, 0x23, 0xee, 0x42,
0xb3, 0xe4, 0xb3, 0x95, 0xb0, 0x4b, 0xb6, 0x99, 0x0b, 0x38, 0x80, 0xfd, 0x52, 0x2c, 0x55, 0x2e,
0xe7, 0x41, 0xa3, 0x87, 0xa2, 0x06, 0xdb, 0x45, 0x7c, 0x02, 0xad, 0x4c, 0xe4, 0x69, 0xa6, 0x03,
0xbf, 0x87, 0xa2, 0x26, 0xab, 0x53, 0x1c, 0xbf, 0x57, 0x21, 0xda, 0x54, 0x21, 0xfa, 0xaa, 0x42,
0xf4, 0xba, 0x0d, 0xbd, 0xcd, 0x36, 0xf4, 0x3e, 0xb6, 0xa1, 0xf7, 0x1c, 0xa5, 0xb9, 0xce, 0x56,
0x09, 0x99, 0xc8, 0x82, 0xd6, 0x27, 0x74, 0x9f, 0x6b, 0xf5, 0x32, 0xad, 0x0f, 0xa9, 0xd7, 0x0b,
0xa1, 0x92, 0x96, 0xbd, 0xc6, 0xed, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0x75, 0x87, 0x24, 0x7b,
0xea, 0x01, 0x00, 0x00,
}
func (m *SnapshotItem) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *SnapshotItem) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *SnapshotItem) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Item != nil {
{
size := m.Item.Size()
i -= size
if _, err := m.Item.MarshalTo(dAtA[i:]); err != nil {
return 0, err
}
}
}
return len(dAtA) - i, nil
}
func (m *SnapshotItem_Store) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *SnapshotItem_Store) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
if m.Store != nil {
{
size, err := m.Store.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintSnapshot(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *SnapshotItem_IAVL) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *SnapshotItem_IAVL) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
if m.IAVL != nil {
{
size, err := m.IAVL.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintSnapshot(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x12
}
return len(dAtA) - i, nil
}
func (m *SnapshotStoreItem) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *SnapshotStoreItem) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *SnapshotStoreItem) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Name) > 0 {
i -= len(m.Name)
copy(dAtA[i:], m.Name)
i = encodeVarintSnapshot(dAtA, i, uint64(len(m.Name)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *SnapshotIAVLItem) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *SnapshotIAVLItem) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *SnapshotIAVLItem) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Height != 0 {
i = encodeVarintSnapshot(dAtA, i, uint64(m.Height))
i--
dAtA[i] = 0x20
}
if m.Version != 0 {
i = encodeVarintSnapshot(dAtA, i, uint64(m.Version))
i--
dAtA[i] = 0x18
}
if len(m.Value) > 0 {
i -= len(m.Value)
copy(dAtA[i:], m.Value)
i = encodeVarintSnapshot(dAtA, i, uint64(len(m.Value)))
i--
dAtA[i] = 0x12
}
if len(m.Key) > 0 {
i -= len(m.Key)
copy(dAtA[i:], m.Key)
i = encodeVarintSnapshot(dAtA, i, uint64(len(m.Key)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintSnapshot(dAtA []byte, offset int, v uint64) int {
offset -= sovSnapshot(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *SnapshotItem) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Item != nil {
n += m.Item.Size()
}
return n
}
func (m *SnapshotItem_Store) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Store != nil {
l = m.Store.Size()
n += 1 + l + sovSnapshot(uint64(l))
}
return n
}
func (m *SnapshotItem_IAVL) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.IAVL != nil {
l = m.IAVL.Size()
n += 1 + l + sovSnapshot(uint64(l))
}
return n
}
func (m *SnapshotStoreItem) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Name)
if l > 0 {
n += 1 + l + sovSnapshot(uint64(l))
}
return n
}
func (m *SnapshotIAVLItem) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Key)
if l > 0 {
n += 1 + l + sovSnapshot(uint64(l))
}
l = len(m.Value)
if l > 0 {
n += 1 + l + sovSnapshot(uint64(l))
}
if m.Version != 0 {
n += 1 + sovSnapshot(uint64(m.Version))
}
if m.Height != 0 {
n += 1 + sovSnapshot(uint64(m.Height))
}
return n
}
func sovSnapshot(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozSnapshot(x uint64) (n int) {
return sovSnapshot(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *SnapshotItem) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: SnapshotItem: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: SnapshotItem: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Store", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthSnapshot
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthSnapshot
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
v := &SnapshotStoreItem{}
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
m.Item = &SnapshotItem_Store{v}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field IAVL", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthSnapshot
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthSnapshot
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
v := &SnapshotIAVLItem{}
if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
m.Item = &SnapshotItem_IAVL{v}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSnapshot(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthSnapshot
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *SnapshotStoreItem) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: SnapshotStoreItem: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: SnapshotStoreItem: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthSnapshot
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthSnapshot
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Name = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipSnapshot(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthSnapshot
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *SnapshotIAVLItem) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: SnapshotIAVLItem: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: SnapshotIAVLItem: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthSnapshot
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthSnapshot
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Key = append(m.Key[:0], dAtA[iNdEx:postIndex]...)
if m.Key == nil {
m.Key = []byte{}
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthSnapshot
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthSnapshot
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Value = append(m.Value[:0], dAtA[iNdEx:postIndex]...)
if m.Value == nil {
m.Value = []byte{}
}
iNdEx = postIndex
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Version", wireType)
}
m.Version = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Version |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Height", wireType)
}
m.Height = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowSnapshot
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Height |= int32(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipSnapshot(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthSnapshot
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipSnapshot(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowSnapshot
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowSnapshot
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowSnapshot
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthSnapshot
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupSnapshot
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthSnapshot
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthSnapshot = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowSnapshot = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupSnapshot = fmt.Errorf("proto: unexpected end of group")
)