feat: ADR-040: Implement DBConnection.Revert (#10308)
Implements the `DBConnection.Revert` method which reverts DB state to the last saved version. This will be need to implement atomic commits with the KV store for https://github.com/cosmos/cosmos-sdk/pull/9892 (supports [ADR-040](eb7d939f86/docs/architecture/adr-040-storage-and-smt-state-commitments.md
)).
Closes: https://github.com/cosmos/cosmos-sdk/pull/10308
---
### Author Checklist
*All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.*
I have...
- [x] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] added `!` to the type prefix if API or client breaking change
- [x] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting))
- [x] provided a link to the relevant issue or specification
- [ ] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules)
- [x] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing)
- [x] added a changelog entry to `CHANGELOG.md`
- [x] included comments for [documenting Go code](https://blog.golang.org/godoc)
- [x] updated the relevant documentation or specification
- [x] reviewed "Files changed" and left comments if necessary
- [ ] confirmed all CI checks have passed
### Reviewers Checklist
*All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.*
I have...
- [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] confirmed `!` in the type prefix if API or client breaking change
- [ ] confirmed all author checklist items have been addressed
- [ ] reviewed state machine logic
- [ ] reviewed API design and naming
- [ ] reviewed documentation is accurate
- [ ] reviewed tests and test coverage
- [ ] manually tested (if applicable)
This commit is contained in:
parent
53ac6d8510
commit
88f39ad8de
|
@ -189,6 +189,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
|||
* [\#9952](https://github.com/cosmos/cosmos-sdk/pull/9952) ADR 040: Implement in-memory DB backend
|
||||
* [\#9848](https://github.com/cosmos/cosmos-sdk/pull/9848) ADR-040: Implement BadgerDB backend
|
||||
* [\#9851](https://github.com/cosmos/cosmos-sdk/pull/9851) ADR-040: Implement RocksDB backend
|
||||
* [\#10308](https://github.com/cosmos/cosmos-sdk/pull/10308) ADR-040: Implement DBConnection.Revert
|
||||
|
||||
|
||||
### Client Breaking Changes
|
||||
|
|
|
@ -2,9 +2,9 @@ package badgerdb
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/csv"
|
||||
"errors"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
@ -15,6 +15,8 @@ import (
|
|||
dbutil "github.com/cosmos/cosmos-sdk/db/internal"
|
||||
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
bpb "github.com/dgraph-io/badger/v3/pb"
|
||||
"github.com/dgraph-io/ristretto/z"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -57,10 +59,10 @@ type badgerIterator struct {
|
|||
|
||||
// Map our versions to Badger timestamps.
|
||||
//
|
||||
// A badger Txn's commit TS must be strictly greater than a record's "last-read"
|
||||
// TS in order to detect conflicts, and a Txn must be read at a TS after last
|
||||
// A badger Txn's commit timestamp must be strictly greater than a record's "last-read"
|
||||
// timestamp in order to detect conflicts, and a Txn must be read at a timestamp after last
|
||||
// commit to see current state. So we must use commit increments that are more
|
||||
// granular than our version interval, and map versions to the corresponding TS.
|
||||
// granular than our version interval, and map versions to the corresponding timestamp.
|
||||
type versionManager struct {
|
||||
*dbm.VersionManager
|
||||
vmap map[uint64]uint64
|
||||
|
@ -111,7 +113,10 @@ func readVersionsFile(path string) (*versionManager, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var versions []uint64
|
||||
var (
|
||||
versions []uint64
|
||||
lastTs uint64
|
||||
)
|
||||
vmap := map[uint64]uint64{}
|
||||
for _, row := range rows {
|
||||
version, err := strconv.ParseUint(row[0], 10, 64)
|
||||
|
@ -122,6 +127,9 @@ func readVersionsFile(path string) (*versionManager, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if version == 0 { // 0 maps to the latest timestamp
|
||||
lastTs = ts
|
||||
}
|
||||
versions = append(versions, version)
|
||||
vmap[version] = ts
|
||||
}
|
||||
|
@ -129,7 +137,7 @@ func readVersionsFile(path string) (*versionManager, error) {
|
|||
return &versionManager{
|
||||
VersionManager: vmgr,
|
||||
vmap: vmap,
|
||||
lastTs: vmgr.Last(),
|
||||
lastTs: lastTs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -141,7 +149,9 @@ func writeVersionsFile(vm *versionManager, path string) error {
|
|||
}
|
||||
defer file.Close()
|
||||
w := csv.NewWriter(file)
|
||||
var rows [][]string
|
||||
rows := [][]string{
|
||||
[]string{"0", strconv.FormatUint(vm.lastTs, 10)},
|
||||
}
|
||||
for it := vm.Iterator(); it.Next(); {
|
||||
version := it.Value()
|
||||
ts, ok := vm.vmap[version]
|
||||
|
@ -157,16 +167,20 @@ func writeVersionsFile(vm *versionManager, path string) error {
|
|||
}
|
||||
|
||||
func (b *BadgerDB) Reader() dbm.DBReader {
|
||||
return &badgerTxn{txn: b.db.NewTransactionAt(math.MaxUint64, false), db: b}
|
||||
b.mtx.RLock()
|
||||
ts := b.vmgr.lastTs
|
||||
b.mtx.RUnlock()
|
||||
return &badgerTxn{txn: b.db.NewTransactionAt(ts, false), db: b}
|
||||
}
|
||||
|
||||
func (b *BadgerDB) ReaderAt(version uint64) (dbm.DBReader, error) {
|
||||
b.mtx.RLock()
|
||||
defer b.mtx.RUnlock()
|
||||
if !b.vmgr.Exists(version) {
|
||||
ts, has := b.vmgr.versionTs(version)
|
||||
if !has {
|
||||
return nil, dbm.ErrVersionDoesNotExist
|
||||
}
|
||||
return &badgerTxn{txn: b.db.NewTransactionAt(b.vmgr.versionTs(version), false), db: b}, nil
|
||||
return &badgerTxn{txn: b.db.NewTransactionAt(ts, false), db: b}, nil
|
||||
}
|
||||
|
||||
func (b *BadgerDB) ReadWriter() dbm.DBReadWriter {
|
||||
|
@ -232,6 +246,83 @@ func (b *BadgerDB) DeleteVersion(target uint64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (b *BadgerDB) Revert() error {
|
||||
b.mtx.RLock()
|
||||
defer b.mtx.RUnlock()
|
||||
if b.openWriters > 0 {
|
||||
return dbm.ErrOpenTransactions
|
||||
}
|
||||
|
||||
// Revert from latest commit timestamp to last "saved" timestamp
|
||||
// if no versions exist, use 0 as it precedes any possible commit timestamp
|
||||
var target uint64
|
||||
last := b.vmgr.Last()
|
||||
if last == 0 {
|
||||
target = 0
|
||||
} else {
|
||||
var has bool
|
||||
if target, has = b.vmgr.versionTs(last); !has {
|
||||
return errors.New("bad version history")
|
||||
}
|
||||
}
|
||||
lastTs := b.vmgr.lastTs
|
||||
if target == lastTs {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Badger provides no way to rollback committed data, so we undo all changes
|
||||
// since the target version using the Stream API
|
||||
stream := b.db.NewStreamAt(lastTs)
|
||||
// Skips unchanged keys
|
||||
stream.ChooseKey = func(item *badger.Item) bool { return item.Version() > target }
|
||||
// Scans for value at target version
|
||||
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
|
||||
kv := bpb.KV{Key: key}
|
||||
// advance down to <= target version
|
||||
itr.Next() // we have at least one newer version
|
||||
for itr.Valid() && bytes.Equal(key, itr.Item().Key()) && itr.Item().Version() > target {
|
||||
itr.Next()
|
||||
}
|
||||
if itr.Valid() && bytes.Equal(key, itr.Item().Key()) && !itr.Item().IsDeletedOrExpired() {
|
||||
var err error
|
||||
kv.Value, err = itr.Item().ValueCopy(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &bpb.KVList{Kv: []*bpb.KV{&kv}}, nil
|
||||
}
|
||||
txn := b.db.NewTransactionAt(lastTs, true)
|
||||
defer txn.Discard()
|
||||
stream.Send = func(buf *z.Buffer) error {
|
||||
kvl, err := badger.BufferToKVList(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// nil Value indicates a deleted entry
|
||||
for _, kv := range kvl.Kv {
|
||||
if kv.Value == nil {
|
||||
err = txn.Delete(kv.Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
err = txn.Set(kv.Key, kv.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err := stream.Orchestrate(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return txn.CommitAt(lastTs, nil)
|
||||
}
|
||||
|
||||
func (b *BadgerDB) Stats() map[string]string { return nil }
|
||||
|
||||
func (tx *badgerTxn) Get(key []byte) ([]byte, error) {
|
||||
|
@ -283,7 +374,7 @@ func (tx *badgerWriter) Commit() (err error) {
|
|||
return errors.New("transaction has been discarded")
|
||||
}
|
||||
defer func() { err = dbutil.CombineErrors(err, tx.Discard(), "Discard also failed") }()
|
||||
// Commit to the current commit TS, after ensuring it is > ReadTs
|
||||
// Commit to the current commit timestamp, after ensuring it is > ReadTs
|
||||
tx.db.mtx.RLock()
|
||||
tx.db.vmgr.updateCommitTs(tx.txn.ReadTs())
|
||||
ts := tx.db.vmgr.lastTs
|
||||
|
@ -385,14 +476,23 @@ func (i *badgerIterator) Value() []byte {
|
|||
return val
|
||||
}
|
||||
|
||||
func (vm *versionManager) versionTs(ver uint64) uint64 {
|
||||
return vm.vmap[ver]
|
||||
func (vm *versionManager) versionTs(ver uint64) (uint64, bool) {
|
||||
ts, has := vm.vmap[ver]
|
||||
return ts, has
|
||||
}
|
||||
|
||||
// updateCommitTs increments the lastTs if equal to readts.
|
||||
func (vm *versionManager) updateCommitTs(readts uint64) {
|
||||
if vm.lastTs == readts {
|
||||
vm.lastTs += 1
|
||||
}
|
||||
}
|
||||
|
||||
// Atomically accesses the last commit timestamp used as a version marker.
|
||||
func (vm *versionManager) lastCommitTs() uint64 {
|
||||
return atomic.LoadUint64(&vm.lastTs)
|
||||
}
|
||||
|
||||
func (vm *versionManager) Copy() *versionManager {
|
||||
vmap := map[uint64]uint64{}
|
||||
for ver, ts := range vm.vmap {
|
||||
|
@ -405,12 +505,6 @@ func (vm *versionManager) Copy() *versionManager {
|
|||
}
|
||||
}
|
||||
|
||||
// updateCommitTs increments the lastTs if equal to readts.
|
||||
func (vm *versionManager) updateCommitTs(readts uint64) {
|
||||
if vm.lastTs == readts {
|
||||
vm.lastTs += 1
|
||||
}
|
||||
}
|
||||
func (vm *versionManager) Save(target uint64) (uint64, error) {
|
||||
id, err := vm.VersionManager.Save(target)
|
||||
if err != nil {
|
||||
|
@ -419,3 +513,8 @@ func (vm *versionManager) Save(target uint64) (uint64, error) {
|
|||
vm.vmap[id] = vm.lastTs // non-atomic, already guarded by the vmgr mutex
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (vm *versionManager) Delete(target uint64) {
|
||||
vm.VersionManager.Delete(target)
|
||||
delete(vm.vmap, target)
|
||||
}
|
||||
|
|
|
@ -31,6 +31,11 @@ func TestVersioning(t *testing.T) {
|
|||
dbtest.DoTestVersioning(t, load)
|
||||
}
|
||||
|
||||
func TestRevert(t *testing.T) {
|
||||
dbtest.DoTestRevert(t, load, false)
|
||||
dbtest.DoTestRevert(t, load, true)
|
||||
}
|
||||
|
||||
func TestReloadDB(t *testing.T) {
|
||||
dbtest.DoTestReloadDB(t, load)
|
||||
}
|
||||
|
|
|
@ -399,27 +399,37 @@ func DoTestRevert(t *testing.T, load Loader, reload bool) {
|
|||
db := load(t, dirname)
|
||||
var txn dbm.DBWriter
|
||||
|
||||
txn = db.Writer()
|
||||
require.NoError(t, txn.Set([]byte{2}, []byte{2}))
|
||||
require.NoError(t, txn.Commit())
|
||||
initContents := func() {
|
||||
txn = db.Writer()
|
||||
require.NoError(t, txn.Set([]byte{2}, []byte{2}))
|
||||
require.NoError(t, txn.Commit())
|
||||
|
||||
txn = db.Writer()
|
||||
for i := byte(6); i < 10; i++ {
|
||||
require.NoError(t, txn.Set([]byte{i}, []byte{i}))
|
||||
txn = db.Writer()
|
||||
for i := byte(6); i < 10; i++ {
|
||||
require.NoError(t, txn.Set([]byte{i}, []byte{i}))
|
||||
}
|
||||
require.NoError(t, txn.Delete([]byte{2}))
|
||||
require.NoError(t, txn.Delete([]byte{3}))
|
||||
require.NoError(t, txn.Commit())
|
||||
}
|
||||
require.NoError(t, txn.Delete([]byte{2}))
|
||||
require.NoError(t, txn.Delete([]byte{3}))
|
||||
require.NoError(t, txn.Commit())
|
||||
|
||||
require.Error(t, db.Revert()) // can't revert with no versions
|
||||
initContents()
|
||||
require.NoError(t, db.Revert())
|
||||
view := db.Reader()
|
||||
it, err := view.Iterator(nil, nil)
|
||||
require.NoError(t, err)
|
||||
require.False(t, it.Next()) // db is empty
|
||||
require.NoError(t, it.Close())
|
||||
require.NoError(t, view.Discard())
|
||||
|
||||
_, err := db.SaveNextVersion()
|
||||
initContents()
|
||||
_, err = db.SaveNextVersion()
|
||||
require.NoError(t, err)
|
||||
|
||||
// get snapshot of db state
|
||||
state := map[string][]byte{}
|
||||
view := db.Reader()
|
||||
it, err := view.Iterator(nil, nil)
|
||||
view = db.Reader()
|
||||
it, err = view.Iterator(nil, nil)
|
||||
require.NoError(t, err)
|
||||
for it.Next() {
|
||||
state[string(it.Key())] = it.Value()
|
||||
|
|
|
@ -159,6 +159,31 @@ func (db *MemDB) DeleteVersion(target uint64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (db *MemDB) Revert() error {
|
||||
db.mtx.RLock()
|
||||
defer db.mtx.RUnlock()
|
||||
if db.openWriters > 0 {
|
||||
return dbm.ErrOpenTransactions
|
||||
}
|
||||
|
||||
last := db.vmgr.Last()
|
||||
if last == 0 {
|
||||
db.btree = btree.New(bTreeDegree)
|
||||
return nil
|
||||
}
|
||||
var has bool
|
||||
db.btree, has = db.saved[last]
|
||||
if !has {
|
||||
return fmt.Errorf("bad version history: version %v not saved", last)
|
||||
}
|
||||
for ver, _ := range db.saved {
|
||||
if ver > last {
|
||||
delete(db.saved, ver)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get implements DBReader.
|
||||
func (tx *dbTxn) Get(key []byte) ([]byte, error) {
|
||||
if tx.btree == nil {
|
||||
|
|
|
@ -44,6 +44,10 @@ func TestVersioning(t *testing.T) {
|
|||
dbtest.DoTestVersioning(t, load)
|
||||
}
|
||||
|
||||
func TestRevert(t *testing.T) {
|
||||
dbtest.DoTestRevert(t, load, false)
|
||||
}
|
||||
|
||||
func TestTransactions(t *testing.T) {
|
||||
dbtest.DoTestTransactions(t, load, false)
|
||||
}
|
||||
|
|
|
@ -284,10 +284,6 @@ func (mgr *dbManager) Revert() (err error) {
|
|||
if mgr.openWriters > 0 {
|
||||
return dbm.ErrOpenTransactions
|
||||
}
|
||||
last := mgr.vmgr.Last()
|
||||
if last == 0 {
|
||||
return dbm.ErrInvalidVersion
|
||||
}
|
||||
// Close current connection and replace it with a checkpoint (created from the last checkpoint)
|
||||
mgr.current.Close()
|
||||
dbPath := filepath.Join(mgr.dir, currentDBFileName)
|
||||
|
@ -295,9 +291,11 @@ func (mgr *dbManager) Revert() (err error) {
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = mgr.restoreFromCheckpoint(last, dbPath)
|
||||
if err != nil {
|
||||
return
|
||||
if last := mgr.vmgr.Last(); last != 0 {
|
||||
err = mgr.restoreFromCheckpoint(last, dbPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
mgr.current, err = gorocksdb.OpenOptimisticTransactionDb(mgr.opts.dbo, dbPath)
|
||||
return
|
||||
|
|
29
db/types.go
29
db/types.go
|
@ -31,38 +31,37 @@ var (
|
|||
// and read and write access.
|
||||
// Past versions are only accessible read-only.
|
||||
type DBConnection interface {
|
||||
// Opens a read-only transaction at the current working version.
|
||||
// Reader opens a read-only transaction at the current working version.
|
||||
Reader() DBReader
|
||||
|
||||
// Opens a read-only transaction at a specified version.
|
||||
// ReaderAt opens a read-only transaction at a specified version.
|
||||
// Returns ErrVersionDoesNotExist for invalid versions.
|
||||
ReaderAt(uint64) (DBReader, error)
|
||||
|
||||
// Opens a read-write transaction at the current version.
|
||||
// ReadWriter opens a read-write transaction at the current version.
|
||||
ReadWriter() DBReadWriter
|
||||
|
||||
// Opens a write-only transaction at the current version.
|
||||
// Writer opens a write-only transaction at the current version.
|
||||
Writer() DBWriter
|
||||
|
||||
// Returns all saved versions as an immutable set which is safe for concurrent access.
|
||||
// Versions returns all saved versions as an immutable set which is safe for concurrent access.
|
||||
Versions() (VersionSet, error)
|
||||
|
||||
// Saves the current contents of the database and returns the next version ID, which will be
|
||||
// `Versions().Last()+1`.
|
||||
// SaveNextVersion saves the current contents of the database and returns the next version ID,
|
||||
// which will be `Versions().Last()+1`.
|
||||
// Returns an error if any open DBWriter transactions exist.
|
||||
// TODO: rename to something more descriptive?
|
||||
SaveNextVersion() (uint64, error)
|
||||
|
||||
// Attempts to save database at a specific version ID, which must be greater than or equal to
|
||||
// what would be returned by `SaveNextVersion`.
|
||||
// SaveVersion attempts to save database at a specific version ID, which must be greater than or
|
||||
// equal to what would be returned by `SaveNextVersion`.
|
||||
// Returns an error if any open DBWriter transactions exist.
|
||||
SaveVersion(uint64) error
|
||||
|
||||
// Deletes a saved version. Returns ErrVersionDoesNotExist for invalid versions.
|
||||
// DeleteVersion deletes a saved version. Returns ErrVersionDoesNotExist for invalid versions.
|
||||
DeleteVersion(uint64) error
|
||||
|
||||
// Reverts the DB state to the last saved version.
|
||||
// Returns an error if no saved versions exist.
|
||||
// Revert reverts the DB state to the last saved version; if none exist, this clears the DB.
|
||||
// Returns an error if any open DBWriter transactions exist.
|
||||
Revert() error
|
||||
|
||||
|
@ -101,7 +100,7 @@ type DBReader interface {
|
|||
// TODO: replace with an extra argument to Iterator()?
|
||||
ReverseIterator(start, end []byte) (Iterator, error)
|
||||
|
||||
// Discards the transaction, invalidating any future operations on it.
|
||||
// Discard discards the transaction, invalidating any future operations on it.
|
||||
Discard() error
|
||||
}
|
||||
|
||||
|
@ -118,10 +117,10 @@ type DBWriter interface {
|
|||
// CONTRACT: key readonly []byte
|
||||
Delete([]byte) error
|
||||
|
||||
// Flushes pending writes and discards the transaction.
|
||||
// Commit flushes pending writes and discards the transaction.
|
||||
Commit() error
|
||||
|
||||
// Discards the transaction, invalidating any future operations on it.
|
||||
// Discard discards the transaction, invalidating any future operations on it.
|
||||
Discard() error
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue