feat: ADR-040: Implement RocksDB backend (#9851)
## Description
Partially resolves: https://github.com/vulcanize/cosmos-sdk/issues/14
Implements a [RocksDB](https://github.com/facebook/rocksdb)-based backend for the DB interface introduced by https://github.com/cosmos/cosmos-sdk/pull/9573 and specified by [ADR-040](eb7d939f86/docs/architecture/adr-040-storage-and-smt-state-commitments.md
).
* Historical versioning is implemented with [Checkpoints](https://github.com/facebook/rocksdb/wiki/Checkpoints).
* Uses `OptimisticTransactionDB` to allow concurrent transactions with write conflict detection. This depends on some additional CGo bindings - see https://github.com/tecbot/gorocksdb/pull/216, https://github.com/facebook/rocksdb/pull/8526. We'll need to replace the `gorocksdb` module until these are upstream.
---
### Author Checklist
*All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.*
I have...
- [x] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [x] added `!` to the type prefix if API or client breaking change
- [x] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting))
- [x] provided a link to the relevant issue or specification
- [ ] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules) - n/a
- [x] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing)
- [x] added a changelog entry to `CHANGELOG.md`
- [x] included comments for [documenting Go code](https://blog.golang.org/godoc)
- [x] updated the relevant documentation or specification
- [x] reviewed "Files changed" and left comments if necessary
- [ ] confirmed all CI checks have passed
### Reviewers Checklist
*All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.*
I have...
- [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] confirmed `!` in the type prefix if API or client breaking change
- [ ] confirmed all author checklist items have been addressed
- [ ] reviewed state machine logic
- [ ] reviewed API design and naming
- [ ] reviewed documentation is accurate
- [ ] reviewed tests and test coverage
- [ ] manually tested (if applicable)
This commit is contained in:
parent
9094794478
commit
33c8314efe
|
@ -188,6 +188,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
|||
* [\#9573](https://github.com/cosmos/cosmos-sdk/pull/9573) ADR 040 implementation: New DB interface
|
||||
* [\#9952](https://github.com/cosmos/cosmos-sdk/pull/9952) ADR 040: Implement in-memory DB backend
|
||||
* [\#9848](https://github.com/cosmos/cosmos-sdk/pull/9848) ADR-040: Implement BadgerDB backend
|
||||
* [\#9851](https://github.com/cosmos/cosmos-sdk/pull/9851) ADR-040: Implement RocksDB backend
|
||||
|
||||
|
||||
### Client Breaking Changes
|
||||
|
|
|
@ -62,3 +62,6 @@ tx2.Set(key, []byte("b"))
|
|||
tx1.Commit() // ok
|
||||
tx2.Commit() // ok
|
||||
```
|
||||
### RocksDB
|
||||
|
||||
A [RocksDB](https://github.com/facebook/rocksdb)-based backend. Internally this uses [`OptimisticTransactionDB`](https://github.com/facebook/rocksdb/wiki/Transactions#optimistictransactiondb) to allow concurrent transactions with write conflict detection. Historical versioning is internally implemented with [Checkpoints](https://github.com/facebook/rocksdb/wiki/Checkpoints).
|
||||
|
|
|
@ -3,6 +3,7 @@ package badgerdb
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/csv"
|
||||
"errors"
|
||||
"math"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -11,6 +12,7 @@ import (
|
|||
"sync/atomic"
|
||||
|
||||
dbm "github.com/cosmos/cosmos-sdk/db"
|
||||
dbutil "github.com/cosmos/cosmos-sdk/db/internal"
|
||||
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
)
|
||||
|
@ -41,6 +43,7 @@ type badgerTxn struct {
|
|||
|
||||
type badgerWriter struct {
|
||||
badgerTxn
|
||||
discarded bool
|
||||
}
|
||||
|
||||
type badgerIterator struct {
|
||||
|
@ -48,6 +51,7 @@ type badgerIterator struct {
|
|||
start, end []byte
|
||||
iter *badger.Iterator
|
||||
lastErr error
|
||||
// Whether iterator has been advanced to the first element (is fully initialized)
|
||||
primed bool
|
||||
}
|
||||
|
||||
|
@ -168,9 +172,9 @@ func (b *BadgerDB) ReaderAt(version uint64) (dbm.DBReader, error) {
|
|||
func (b *BadgerDB) ReadWriter() dbm.DBReadWriter {
|
||||
atomic.AddInt32(&b.openWriters, 1)
|
||||
b.mtx.RLock()
|
||||
ts := b.vmgr.lastCommitTs()
|
||||
ts := b.vmgr.lastTs
|
||||
b.mtx.RUnlock()
|
||||
return &badgerWriter{badgerTxn{txn: b.db.NewTransactionAt(ts, true), db: b}}
|
||||
return &badgerWriter{badgerTxn{txn: b.db.NewTransactionAt(ts, true), db: b}, false}
|
||||
}
|
||||
|
||||
func (b *BadgerDB) Writer() dbm.DBWriter {
|
||||
|
@ -261,11 +265,8 @@ func (tx *badgerTxn) Has(key []byte) (bool, error) {
|
|||
}
|
||||
|
||||
func (tx *badgerWriter) Set(key, value []byte) error {
|
||||
if len(key) == 0 {
|
||||
return dbm.ErrKeyEmpty
|
||||
}
|
||||
if value == nil {
|
||||
return dbm.ErrValueNil
|
||||
if err := dbutil.ValidateKv(key, value); err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.txn.Set(key, value)
|
||||
}
|
||||
|
@ -277,20 +278,31 @@ func (tx *badgerWriter) Delete(key []byte) error {
|
|||
return tx.txn.Delete(key)
|
||||
}
|
||||
|
||||
func (tx *badgerWriter) Commit() error {
|
||||
func (tx *badgerWriter) Commit() (err error) {
|
||||
if tx.discarded {
|
||||
return errors.New("transaction has been discarded")
|
||||
}
|
||||
defer func() { err = dbutil.CombineErrors(err, tx.Discard(), "Discard also failed") }()
|
||||
// Commit to the current commit TS, after ensuring it is > ReadTs
|
||||
tx.db.mtx.RLock()
|
||||
tx.db.vmgr.updateCommitTs(tx.txn.ReadTs())
|
||||
defer tx.Discard()
|
||||
return tx.txn.CommitAt(tx.db.vmgr.lastCommitTs(), nil)
|
||||
ts := tx.db.vmgr.lastTs
|
||||
tx.db.mtx.RUnlock()
|
||||
err = tx.txn.CommitAt(ts, nil)
|
||||
return
|
||||
}
|
||||
|
||||
func (tx *badgerTxn) Discard() {
|
||||
func (tx *badgerTxn) Discard() error {
|
||||
tx.txn.Discard()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tx *badgerWriter) Discard() {
|
||||
func (tx *badgerWriter) Discard() error {
|
||||
if !tx.discarded {
|
||||
defer atomic.AddInt32(&tx.db.openWriters, -1)
|
||||
tx.badgerTxn.Discard()
|
||||
tx.discarded = true
|
||||
}
|
||||
return tx.badgerTxn.Discard()
|
||||
}
|
||||
|
||||
func (tx *badgerTxn) iteratorOpts(start, end []byte, opts badger.IteratorOptions) (*badgerIterator, error) {
|
||||
|
@ -393,9 +405,11 @@ func (vm *versionManager) Copy() *versionManager {
|
|||
}
|
||||
}
|
||||
|
||||
// updateCommitTs atomically increments the lastTs if equal to readts.
|
||||
// updateCommitTs increments the lastTs if equal to readts.
|
||||
func (vm *versionManager) updateCommitTs(readts uint64) {
|
||||
atomic.CompareAndSwapUint64(&vm.lastTs, readts, readts+1)
|
||||
if vm.lastTs == readts {
|
||||
vm.lastTs += 1
|
||||
}
|
||||
}
|
||||
func (vm *versionManager) Save(target uint64) (uint64, error) {
|
||||
id, err := vm.VersionManager.Save(target)
|
||||
|
|
|
@ -69,7 +69,7 @@ func DoTestGetSetHasDelete(t *testing.T, load Loader) {
|
|||
err = txn.Set([]byte("b"), []byte{0x02})
|
||||
require.NoError(t, err)
|
||||
|
||||
view.Discard()
|
||||
require.NoError(t, view.Discard())
|
||||
require.NoError(t, txn.Commit())
|
||||
|
||||
txn = db.ReadWriter()
|
||||
|
@ -145,7 +145,7 @@ func DoTestIterators(t *testing.T, load Loader) {
|
|||
for ; iter.Next(); i++ {
|
||||
expectedValue := expected[i]
|
||||
value := iter.Value()
|
||||
require.EqualValues(t, expectedValue, string(value), "i=%v", i)
|
||||
require.Equal(t, expectedValue, string(value), "i=%v", i)
|
||||
}
|
||||
require.Equal(t, len(expected), i)
|
||||
}
|
||||
|
@ -189,7 +189,7 @@ func DoTestIterators(t *testing.T, load Loader) {
|
|||
it.Close()
|
||||
}
|
||||
|
||||
view.Discard()
|
||||
require.NoError(t, view.Discard())
|
||||
require.NoError(t, db.Close())
|
||||
}
|
||||
|
||||
|
@ -246,6 +246,7 @@ func DoTestVersioning(t *testing.T, load Loader) {
|
|||
require.NoError(t, err)
|
||||
has, err := view.Has([]byte("2"))
|
||||
require.False(t, has)
|
||||
require.NoError(t, view.Discard())
|
||||
|
||||
view, err = db.ReaderAt(v2)
|
||||
require.NoError(t, err)
|
||||
|
@ -258,13 +259,12 @@ func DoTestVersioning(t *testing.T, load Loader) {
|
|||
require.NoError(t, err)
|
||||
has, err = view.Has([]byte("1"))
|
||||
require.False(t, has)
|
||||
require.NoError(t, view.Discard())
|
||||
|
||||
// Try to read an invalid version
|
||||
view, err = db.ReaderAt(versions.Last() + 1)
|
||||
require.Equal(t, dbm.ErrVersionDoesNotExist, err)
|
||||
require.Equal(t, dbm.ErrVersionDoesNotExist, err, "should fail to read a nonexistent version")
|
||||
|
||||
require.NoError(t, db.DeleteVersion(v2))
|
||||
// Try to read a deleted version
|
||||
require.NoError(t, db.DeleteVersion(v2), "should delete version v2")
|
||||
view, err = db.ReaderAt(v2)
|
||||
require.Equal(t, dbm.ErrVersionDoesNotExist, err)
|
||||
|
||||
|
@ -283,6 +283,14 @@ func DoTestVersioning(t *testing.T, load Loader) {
|
|||
prev = ver
|
||||
}
|
||||
|
||||
// Open multiple readers for the same past version
|
||||
view, err = db.ReaderAt(v3)
|
||||
require.NoError(t, err)
|
||||
view2, err := db.ReaderAt(v3)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, view.Discard())
|
||||
require.NoError(t, view2.Discard())
|
||||
|
||||
require.NoError(t, db.Close())
|
||||
}
|
||||
|
||||
|
@ -300,23 +308,37 @@ func DoTestTransactions(t *testing.T, load Loader, multipleWriters bool) {
|
|||
t.Run("no commit", func(t *testing.T) {
|
||||
t.Helper()
|
||||
view := db.Reader()
|
||||
defer view.Discard()
|
||||
tx := getWriter()
|
||||
defer tx.Discard()
|
||||
require.NoError(t, tx.Set([]byte("0"), []byte("a")))
|
||||
v, err := view.Get([]byte("0"))
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, v)
|
||||
require.NoError(t, view.Discard())
|
||||
require.NoError(t, tx.Discard())
|
||||
})
|
||||
|
||||
// Try to commit version with open txns
|
||||
t.Run("open transactions", func(t *testing.T) {
|
||||
t.Run("cannot save with open transactions", func(t *testing.T) {
|
||||
t.Helper()
|
||||
tx := getWriter()
|
||||
tx.Set([]byte("2"), []byte("a"))
|
||||
require.NoError(t, tx.Set([]byte("0"), []byte("a")))
|
||||
_, err := db.SaveNextVersion()
|
||||
require.Equal(t, dbm.ErrOpenTransactions, err)
|
||||
tx.Discard()
|
||||
require.NoError(t, tx.Discard())
|
||||
})
|
||||
|
||||
// Try to use a transaction after closing
|
||||
t.Run("cannot reuse transaction", func(t *testing.T) {
|
||||
t.Helper()
|
||||
tx := getWriter()
|
||||
require.NoError(t, tx.Commit())
|
||||
require.Error(t, tx.Set([]byte("0"), []byte("a")))
|
||||
require.NoError(t, tx.Discard()) // redundant discard is fine
|
||||
|
||||
tx = getWriter()
|
||||
require.NoError(t, tx.Discard())
|
||||
require.Error(t, tx.Set([]byte("0"), []byte("a")))
|
||||
require.NoError(t, tx.Discard())
|
||||
})
|
||||
|
||||
// Continue only if the backend supports multiple concurrent writers
|
||||
|
@ -353,13 +375,129 @@ func DoTestTransactions(t *testing.T, load Loader, multipleWriters bool) {
|
|||
}
|
||||
wg.Wait()
|
||||
view := db.Reader()
|
||||
defer view.Discard()
|
||||
v, err := view.Get(ikey(0))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ival(0), v)
|
||||
require.NoError(t, view.Discard())
|
||||
})
|
||||
|
||||
}
|
||||
// Try to reuse a reader txn
|
||||
view := db.Reader()
|
||||
require.NoError(t, view.Discard())
|
||||
_, err := view.Get([]byte("0"))
|
||||
require.Error(t, err)
|
||||
require.NoError(t, view.Discard()) // redundant discard is fine
|
||||
|
||||
require.NoError(t, db.Close())
|
||||
}
|
||||
|
||||
// Test that Revert works as intended, optionally closing and
|
||||
// reloading the DB both before and after reverting
|
||||
func DoTestRevert(t *testing.T, load Loader, reload bool) {
|
||||
t.Helper()
|
||||
dirname := t.TempDir()
|
||||
db := load(t, dirname)
|
||||
var txn dbm.DBWriter
|
||||
|
||||
txn = db.Writer()
|
||||
require.NoError(t, txn.Set([]byte{2}, []byte{2}))
|
||||
require.NoError(t, txn.Commit())
|
||||
|
||||
txn = db.Writer()
|
||||
for i := byte(6); i < 10; i++ {
|
||||
require.NoError(t, txn.Set([]byte{i}, []byte{i}))
|
||||
}
|
||||
require.NoError(t, txn.Delete([]byte{2}))
|
||||
require.NoError(t, txn.Delete([]byte{3}))
|
||||
require.NoError(t, txn.Commit())
|
||||
|
||||
require.Error(t, db.Revert()) // can't revert with no versions
|
||||
|
||||
_, err := db.SaveNextVersion()
|
||||
require.NoError(t, err)
|
||||
|
||||
// get snapshot of db state
|
||||
state := map[string][]byte{}
|
||||
view := db.Reader()
|
||||
it, err := view.Iterator(nil, nil)
|
||||
require.NoError(t, err)
|
||||
for it.Next() {
|
||||
state[string(it.Key())] = it.Value()
|
||||
}
|
||||
require.NoError(t, it.Close())
|
||||
view.Discard()
|
||||
|
||||
checkContents := func() {
|
||||
view = db.Reader()
|
||||
count := 0
|
||||
it, err = view.Iterator(nil, nil)
|
||||
require.NoError(t, err)
|
||||
for it.Next() {
|
||||
val, has := state[string(it.Key())]
|
||||
require.True(t, has, "key should not be present: %v => %v", it.Key(), it.Value())
|
||||
require.Equal(t, val, it.Value())
|
||||
count++
|
||||
}
|
||||
require.NoError(t, it.Close())
|
||||
require.Equal(t, len(state), count)
|
||||
view.Discard()
|
||||
}
|
||||
|
||||
changeContents := func() {
|
||||
txn = db.Writer()
|
||||
require.NoError(t, txn.Set([]byte{3}, []byte{15}))
|
||||
require.NoError(t, txn.Set([]byte{7}, []byte{70}))
|
||||
require.NoError(t, txn.Delete([]byte{8}))
|
||||
require.NoError(t, txn.Delete([]byte{9}))
|
||||
require.NoError(t, txn.Set([]byte{10}, []byte{0}))
|
||||
require.NoError(t, txn.Commit())
|
||||
|
||||
txn = db.Writer()
|
||||
require.NoError(t, txn.Set([]byte{3}, []byte{30}))
|
||||
require.NoError(t, txn.Set([]byte{8}, []byte{8}))
|
||||
require.NoError(t, txn.Delete([]byte{9}))
|
||||
require.NoError(t, txn.Commit())
|
||||
}
|
||||
|
||||
changeContents()
|
||||
|
||||
if reload {
|
||||
db.Close()
|
||||
db = load(t, dirname)
|
||||
}
|
||||
|
||||
txn = db.Writer()
|
||||
require.Error(t, db.Revert()) // can't revert with open writers
|
||||
txn.Discard()
|
||||
require.NoError(t, db.Revert())
|
||||
|
||||
if reload {
|
||||
db.Close()
|
||||
db = load(t, dirname)
|
||||
}
|
||||
|
||||
checkContents()
|
||||
|
||||
// With intermediate versions added & deleted, revert again to v1
|
||||
changeContents()
|
||||
v2, _ := db.SaveNextVersion()
|
||||
|
||||
txn = db.Writer()
|
||||
require.NoError(t, txn.Delete([]byte{6}))
|
||||
require.NoError(t, txn.Set([]byte{8}, []byte{9}))
|
||||
require.NoError(t, txn.Set([]byte{11}, []byte{11}))
|
||||
txn.Commit()
|
||||
v3, _ := db.SaveNextVersion()
|
||||
|
||||
txn = db.Writer()
|
||||
require.NoError(t, txn.Set([]byte{12}, []byte{12}))
|
||||
txn.Commit()
|
||||
|
||||
db.DeleteVersion(v2)
|
||||
db.DeleteVersion(v3)
|
||||
db.Revert()
|
||||
checkContents()
|
||||
|
||||
require.NoError(t, db.Close())
|
||||
}
|
||||
|
||||
|
@ -389,9 +527,12 @@ func DoTestReloadDB(t *testing.T, load Loader) {
|
|||
require.NoError(t, err)
|
||||
|
||||
txn = db.Writer()
|
||||
require.NoError(t, txn.Set(ikey(100), ival(100)))
|
||||
require.NoError(t, txn.Set([]byte("working-version"), ival(100)))
|
||||
require.NoError(t, txn.Commit())
|
||||
|
||||
txn = db.Writer()
|
||||
require.NoError(t, txn.Set([]byte("uncommitted"), ival(200)))
|
||||
|
||||
// Reload and check each saved version
|
||||
db.Close()
|
||||
db = load(t, dirname)
|
||||
|
@ -401,19 +542,13 @@ func DoTestReloadDB(t *testing.T, load Loader) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, last, vset.Last())
|
||||
|
||||
txn = db.Writer()
|
||||
for i := 10; i < 15; i++ {
|
||||
require.NoError(t, txn.Set(ikey(i), ival(i+10)))
|
||||
}
|
||||
require.NoError(t, txn.Commit())
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
view, err := db.ReaderAt(firstVersions[i])
|
||||
require.NoError(t, err)
|
||||
val, err := view.Get(ikey(i))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ival(i), val)
|
||||
view.Discard()
|
||||
require.NoError(t, view.Discard())
|
||||
}
|
||||
|
||||
view, err := db.ReaderAt(last)
|
||||
|
@ -427,14 +562,18 @@ func DoTestReloadDB(t *testing.T, load Loader) {
|
|||
require.Equal(t, ival(i), v)
|
||||
}
|
||||
}
|
||||
view.Discard()
|
||||
require.NoError(t, view.Discard())
|
||||
|
||||
// Load working version
|
||||
view = db.Reader()
|
||||
val, err := view.Get(ikey(100))
|
||||
val, err := view.Get([]byte("working-version"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, ival(100), val)
|
||||
view.Discard()
|
||||
|
||||
val, err = view.Get([]byte("uncommitted"))
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, val)
|
||||
|
||||
require.NoError(t, view.Discard())
|
||||
require.NoError(t, db.Close())
|
||||
}
|
||||
|
|
22
db/go.mod
22
db/go.mod
|
@ -6,10 +6,30 @@ require (
|
|||
github.com/dgraph-io/badger/v3 v3.2103.1
|
||||
github.com/google/btree v1.0.0
|
||||
github.com/stretchr/testify v1.7.0
|
||||
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.0 // indirect
|
||||
github.com/cespare/xxhash v1.1.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.1 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dgraph-io/ristretto v0.1.0 // indirect
|
||||
github.com/dustin/go-humanize v1.0.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
|
||||
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
|
||||
github.com/golang/protobuf v1.3.1 // indirect
|
||||
github.com/golang/snappy v0.0.3 // indirect
|
||||
github.com/google/flatbuffers v1.12.0 // indirect
|
||||
github.com/klauspost/compress v1.12.3 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
go.opencensus.io v0.22.5 // indirect
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect
|
||||
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
|
||||
)
|
||||
|
||||
// FIXME: gorocksdb bindings for OptimisticTransactionDB are not merged upstream, so we use a fork
|
||||
// See https://github.com/tecbot/gorocksdb/pull/216
|
||||
replace github.com/tecbot/gorocksdb => github.com/roysc/gorocksdb v1.1.0
|
||||
|
|
|
@ -23,6 +23,12 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczC
|
|||
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
|
||||
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
|
||||
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c h1:8ISkoahWXwZR41ois5lSJBSVw4D0OV19Ht/JSTzvSv0=
|
||||
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c/go.mod h1:Yg+htXGokKKdzcwhuNDwVvN+uBxDGXJ7G/VN1d8fa64=
|
||||
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 h1:JWuenKqqX8nojtoVVWjGfOF9635RETekkoH6Cc9SX0A=
|
||||
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg=
|
||||
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpmbhCOZJ293Lz68O7PYrF2EzeiFMwCLk=
|
||||
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
|
@ -61,6 +67,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
|||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/roysc/gorocksdb v1.1.0 h1:+bPfxli0I3UhzJpghp/L8Y9oGR6SoDj4dFV5lPPHUvs=
|
||||
github.com/roysc/gorocksdb v1.1.0/go.mod h1:b/U29r/CtguX3TF7mKG1Jjn4APDqh4wECshxXdiWHpA=
|
||||
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
dbm "github.com/cosmos/cosmos-sdk/db"
|
||||
)
|
||||
|
||||
func ValidateKv(key, value []byte) error {
|
||||
if len(key) == 0 {
|
||||
return dbm.ErrKeyEmpty
|
||||
}
|
||||
if value == nil {
|
||||
return dbm.ErrValueNil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func CombineErrors(ret error, also error, desc string) error {
|
||||
if also != nil {
|
||||
if ret != nil {
|
||||
ret = fmt.Errorf("%w; %v: %v", ret, desc, also)
|
||||
} else {
|
||||
ret = also
|
||||
}
|
||||
}
|
||||
return ret
|
||||
}
|
|
@ -7,6 +7,7 @@ import (
|
|||
"sync/atomic"
|
||||
|
||||
dbm "github.com/cosmos/cosmos-sdk/db"
|
||||
dbutil "github.com/cosmos/cosmos-sdk/db/internal"
|
||||
"github.com/google/btree"
|
||||
)
|
||||
|
||||
|
@ -160,6 +161,9 @@ func (db *MemDB) DeleteVersion(target uint64) error {
|
|||
|
||||
// Get implements DBReader.
|
||||
func (tx *dbTxn) Get(key []byte) ([]byte, error) {
|
||||
if tx.btree == nil {
|
||||
return nil, dbm.ErrTransactionClosed
|
||||
}
|
||||
if len(key) == 0 {
|
||||
return nil, dbm.ErrKeyEmpty
|
||||
}
|
||||
|
@ -172,6 +176,9 @@ func (tx *dbTxn) Get(key []byte) ([]byte, error) {
|
|||
|
||||
// Has implements DBReader.
|
||||
func (tx *dbTxn) Has(key []byte) (bool, error) {
|
||||
if tx.btree == nil {
|
||||
return false, dbm.ErrTransactionClosed
|
||||
}
|
||||
if len(key) == 0 {
|
||||
return false, dbm.ErrKeyEmpty
|
||||
}
|
||||
|
@ -180,11 +187,11 @@ func (tx *dbTxn) Has(key []byte) (bool, error) {
|
|||
|
||||
// Set implements DBWriter.
|
||||
func (tx *dbWriter) Set(key []byte, value []byte) error {
|
||||
if len(key) == 0 {
|
||||
return dbm.ErrKeyEmpty
|
||||
if tx.btree == nil {
|
||||
return dbm.ErrTransactionClosed
|
||||
}
|
||||
if value == nil {
|
||||
return dbm.ErrValueNil
|
||||
if err := dbutil.ValidateKv(key, value); err != nil {
|
||||
return err
|
||||
}
|
||||
tx.btree.ReplaceOrInsert(newPair(key, value))
|
||||
return nil
|
||||
|
@ -192,6 +199,9 @@ func (tx *dbWriter) Set(key []byte, value []byte) error {
|
|||
|
||||
// Delete implements DBWriter.
|
||||
func (tx *dbWriter) Delete(key []byte) error {
|
||||
if tx.btree == nil {
|
||||
return dbm.ErrTransactionClosed
|
||||
}
|
||||
if len(key) == 0 {
|
||||
return dbm.ErrKeyEmpty
|
||||
}
|
||||
|
@ -202,6 +212,9 @@ func (tx *dbWriter) Delete(key []byte) error {
|
|||
// Iterator implements DBReader.
|
||||
// Takes out a read-lock on the database until the iterator is closed.
|
||||
func (tx *dbTxn) Iterator(start, end []byte) (dbm.Iterator, error) {
|
||||
if tx.btree == nil {
|
||||
return nil, dbm.ErrTransactionClosed
|
||||
}
|
||||
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
|
||||
return nil, dbm.ErrKeyEmpty
|
||||
}
|
||||
|
@ -211,6 +224,9 @@ func (tx *dbTxn) Iterator(start, end []byte) (dbm.Iterator, error) {
|
|||
// ReverseIterator implements DBReader.
|
||||
// Takes out a read-lock on the database until the iterator is closed.
|
||||
func (tx *dbTxn) ReverseIterator(start, end []byte) (dbm.Iterator, error) {
|
||||
if tx.btree == nil {
|
||||
return nil, dbm.ErrTransactionClosed
|
||||
}
|
||||
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
|
||||
return nil, dbm.ErrKeyEmpty
|
||||
}
|
||||
|
@ -219,17 +235,29 @@ func (tx *dbTxn) ReverseIterator(start, end []byte) (dbm.Iterator, error) {
|
|||
|
||||
// Commit implements DBWriter.
|
||||
func (tx *dbWriter) Commit() error {
|
||||
if tx.btree == nil {
|
||||
return dbm.ErrTransactionClosed
|
||||
}
|
||||
tx.db.mtx.Lock()
|
||||
defer tx.db.mtx.Unlock()
|
||||
defer tx.Discard()
|
||||
tx.db.btree = tx.btree
|
||||
return tx.Discard()
|
||||
}
|
||||
|
||||
// Discard implements DBReader.
|
||||
func (tx *dbTxn) Discard() error {
|
||||
if tx.btree != nil {
|
||||
tx.btree = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Discard implements DBReader and DBWriter.
|
||||
func (tx *dbTxn) Discard() {}
|
||||
func (tx *dbWriter) Discard() {
|
||||
atomic.AddInt32(&tx.db.openWriters, -1)
|
||||
// Discard implements DBWriter.
|
||||
func (tx *dbWriter) Discard() error {
|
||||
if tx.btree != nil {
|
||||
defer atomic.AddInt32(&tx.db.openWriters, -1)
|
||||
}
|
||||
return tx.dbTxn.Discard()
|
||||
}
|
||||
|
||||
// Print prints the database contents.
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
package rocksdb
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
|
||||
dbm "github.com/cosmos/cosmos-sdk/db"
|
||||
dbutil "github.com/cosmos/cosmos-sdk/db/internal"
|
||||
"github.com/tecbot/gorocksdb"
|
||||
)
|
||||
|
||||
type rocksDBBatch struct {
|
||||
batch *gorocksdb.WriteBatch
|
||||
mgr *dbManager
|
||||
}
|
||||
|
||||
var _ dbm.DBWriter = (*rocksDBBatch)(nil)
|
||||
|
||||
func (mgr *dbManager) newRocksDBBatch() *rocksDBBatch {
|
||||
return &rocksDBBatch{
|
||||
batch: gorocksdb.NewWriteBatch(),
|
||||
mgr: mgr,
|
||||
}
|
||||
}
|
||||
|
||||
// Set implements DBWriter.
|
||||
func (b *rocksDBBatch) Set(key, value []byte) error {
|
||||
if err := dbutil.ValidateKv(key, value); err != nil {
|
||||
return err
|
||||
}
|
||||
if b.batch == nil {
|
||||
return dbm.ErrTransactionClosed
|
||||
}
|
||||
b.batch.Put(key, value)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete implements DBWriter.
|
||||
func (b *rocksDBBatch) Delete(key []byte) error {
|
||||
if len(key) == 0 {
|
||||
return dbm.ErrKeyEmpty
|
||||
}
|
||||
if b.batch == nil {
|
||||
return dbm.ErrTransactionClosed
|
||||
}
|
||||
b.batch.Delete(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write implements DBWriter.
|
||||
func (b *rocksDBBatch) Commit() (err error) {
|
||||
if b.batch == nil {
|
||||
return dbm.ErrTransactionClosed
|
||||
}
|
||||
defer func() { err = dbutil.CombineErrors(err, b.Discard(), "Discard also failed") }()
|
||||
err = b.mgr.current.Write(b.mgr.opts.wo, b.batch)
|
||||
return
|
||||
}
|
||||
|
||||
// Close implements DBWriter.
|
||||
func (b *rocksDBBatch) Discard() error {
|
||||
if b.batch != nil {
|
||||
defer atomic.AddInt32(&b.mgr.openWriters, -1)
|
||||
b.batch.Destroy()
|
||||
b.batch = nil
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,483 @@
|
|||
package rocksdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
dbm "github.com/cosmos/cosmos-sdk/db"
|
||||
dbutil "github.com/cosmos/cosmos-sdk/db/internal"
|
||||
"github.com/tecbot/gorocksdb"
|
||||
)
|
||||
|
||||
var (
|
||||
currentDBFileName string = "current.db"
|
||||
checkpointFileFormat string = "%020d.db"
|
||||
)
|
||||
|
||||
var (
|
||||
_ dbm.DBConnection = (*RocksDB)(nil)
|
||||
_ dbm.DBReader = (*dbTxn)(nil)
|
||||
_ dbm.DBWriter = (*dbWriter)(nil)
|
||||
_ dbm.DBReadWriter = (*dbWriter)(nil)
|
||||
)
|
||||
|
||||
// RocksDB is a connection to a RocksDB key-value database.
|
||||
type RocksDB = dbManager
|
||||
|
||||
type dbManager struct {
|
||||
current *dbConnection
|
||||
dir string
|
||||
opts dbOptions
|
||||
vmgr *dbm.VersionManager
|
||||
mtx sync.RWMutex
|
||||
// Track open DBWriters
|
||||
openWriters int32
|
||||
cpCache checkpointCache
|
||||
}
|
||||
|
||||
type dbConnection = gorocksdb.OptimisticTransactionDB
|
||||
|
||||
type checkpointCache struct {
|
||||
cache map[uint64]*cpCacheEntry
|
||||
mtx sync.RWMutex
|
||||
}
|
||||
|
||||
type cpCacheEntry struct {
|
||||
cxn *dbConnection
|
||||
openCount uint
|
||||
}
|
||||
|
||||
type dbTxn struct {
|
||||
txn *gorocksdb.Transaction
|
||||
mgr *dbManager
|
||||
version uint64
|
||||
}
|
||||
type dbWriter struct{ dbTxn }
|
||||
|
||||
type dbOptions struct {
|
||||
dbo *gorocksdb.Options
|
||||
txo *gorocksdb.OptimisticTransactionOptions
|
||||
ro *gorocksdb.ReadOptions
|
||||
wo *gorocksdb.WriteOptions
|
||||
}
|
||||
|
||||
// NewDB creates a new RocksDB key-value database with inside the given directory.
|
||||
// If dir does not exist, it will be created.
|
||||
func NewDB(dir string) (*dbManager, error) {
|
||||
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// default rocksdb option, good enough for most cases, including heavy workloads.
|
||||
// 1GB table cache, 512MB write buffer(may use 50% more on heavy workloads).
|
||||
// compression: snappy as default, need to -lsnappy to enable.
|
||||
bbto := gorocksdb.NewDefaultBlockBasedTableOptions()
|
||||
bbto.SetBlockCache(gorocksdb.NewLRUCache(1 << 30))
|
||||
bbto.SetFilterPolicy(gorocksdb.NewBloomFilter(10))
|
||||
dbo := gorocksdb.NewDefaultOptions()
|
||||
dbo.SetBlockBasedTableFactory(bbto)
|
||||
dbo.SetCreateIfMissing(true)
|
||||
dbo.IncreaseParallelism(runtime.NumCPU())
|
||||
// 1.5GB maximum memory use for writebuffer.
|
||||
dbo.OptimizeLevelStyleCompaction(1<<30 + 512<<20)
|
||||
|
||||
opts := dbOptions{
|
||||
dbo: dbo,
|
||||
txo: gorocksdb.NewDefaultOptimisticTransactionOptions(),
|
||||
ro: gorocksdb.NewDefaultReadOptions(),
|
||||
wo: gorocksdb.NewDefaultWriteOptions(),
|
||||
}
|
||||
mgr := &dbManager{
|
||||
dir: dir,
|
||||
opts: opts,
|
||||
cpCache: checkpointCache{cache: map[uint64]*cpCacheEntry{}},
|
||||
}
|
||||
|
||||
err := os.MkdirAll(mgr.checkpointsDir(), 0755)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if mgr.vmgr, err = readVersions(mgr.checkpointsDir()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dbPath := filepath.Join(dir, currentDBFileName)
|
||||
// if the current db file is missing but there are checkpoints, restore it
|
||||
if mgr.vmgr.Count() > 0 {
|
||||
if _, err = os.Stat(dbPath); os.IsNotExist(err) {
|
||||
err = mgr.restoreFromCheckpoint(mgr.vmgr.Last(), dbPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
mgr.current, err = gorocksdb.OpenOptimisticTransactionDb(dbo, dbPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return mgr, nil
|
||||
}
|
||||
|
||||
func (mgr *dbManager) checkpointsDir() string {
|
||||
return filepath.Join(mgr.dir, "checkpoints")
|
||||
}
|
||||
|
||||
// Reads directory for checkpoints files
|
||||
func readVersions(dir string) (*dbm.VersionManager, error) {
|
||||
files, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var versions []uint64
|
||||
for _, f := range files {
|
||||
var version uint64
|
||||
if _, err := fmt.Sscanf(f.Name(), checkpointFileFormat, &version); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
versions = append(versions, version)
|
||||
}
|
||||
return dbm.NewVersionManager(versions), nil
|
||||
}
|
||||
|
||||
func (mgr *dbManager) checkpointPath(version uint64) (string, error) {
|
||||
dbPath := filepath.Join(mgr.checkpointsDir(), fmt.Sprintf(checkpointFileFormat, version))
|
||||
if stat, err := os.Stat(dbPath); err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
err = dbm.ErrVersionDoesNotExist
|
||||
}
|
||||
return "", err
|
||||
} else if !stat.IsDir() {
|
||||
return "", dbm.ErrVersionDoesNotExist
|
||||
}
|
||||
return dbPath, nil
|
||||
}
|
||||
|
||||
func (mgr *dbManager) openCheckpoint(version uint64) (*dbConnection, error) {
|
||||
mgr.cpCache.mtx.Lock()
|
||||
defer mgr.cpCache.mtx.Unlock()
|
||||
cp, has := mgr.cpCache.cache[version]
|
||||
if has {
|
||||
cp.openCount += 1
|
||||
return cp.cxn, nil
|
||||
}
|
||||
dbPath, err := mgr.checkpointPath(version)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
db, err := gorocksdb.OpenOptimisticTransactionDb(mgr.opts.dbo, dbPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mgr.cpCache.cache[version] = &cpCacheEntry{cxn: db, openCount: 1}
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func (mgr *dbManager) Reader() dbm.DBReader {
|
||||
mgr.mtx.RLock()
|
||||
defer mgr.mtx.RUnlock()
|
||||
return &dbTxn{
|
||||
// Note: oldTransaction could be passed here as a small optimization to
|
||||
// avoid allocating a new object.
|
||||
txn: mgr.current.TransactionBegin(mgr.opts.wo, mgr.opts.txo, nil),
|
||||
mgr: mgr,
|
||||
}
|
||||
}
|
||||
|
||||
func (mgr *dbManager) ReaderAt(version uint64) (dbm.DBReader, error) {
|
||||
mgr.mtx.RLock()
|
||||
defer mgr.mtx.RUnlock()
|
||||
db, err := mgr.openCheckpoint(version)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &dbTxn{
|
||||
txn: db.TransactionBegin(mgr.opts.wo, mgr.opts.txo, nil),
|
||||
mgr: mgr,
|
||||
version: version,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (mgr *dbManager) ReadWriter() dbm.DBReadWriter {
|
||||
mgr.mtx.RLock()
|
||||
defer mgr.mtx.RUnlock()
|
||||
atomic.AddInt32(&mgr.openWriters, 1)
|
||||
return &dbWriter{dbTxn{
|
||||
txn: mgr.current.TransactionBegin(mgr.opts.wo, mgr.opts.txo, nil),
|
||||
mgr: mgr,
|
||||
}}
|
||||
}
|
||||
|
||||
func (mgr *dbManager) Writer() dbm.DBWriter {
|
||||
mgr.mtx.RLock()
|
||||
defer mgr.mtx.RUnlock()
|
||||
atomic.AddInt32(&mgr.openWriters, 1)
|
||||
return mgr.newRocksDBBatch()
|
||||
}
|
||||
|
||||
func (mgr *dbManager) Versions() (dbm.VersionSet, error) {
|
||||
mgr.mtx.RLock()
|
||||
defer mgr.mtx.RUnlock()
|
||||
return mgr.vmgr, nil
|
||||
}
|
||||
|
||||
// SaveNextVersion implements DBConnection.
|
||||
func (mgr *dbManager) SaveNextVersion() (uint64, error) {
|
||||
return mgr.save(0)
|
||||
}
|
||||
|
||||
// SaveVersion implements DBConnection.
|
||||
func (mgr *dbManager) SaveVersion(target uint64) error {
|
||||
if target == 0 {
|
||||
return dbm.ErrInvalidVersion
|
||||
}
|
||||
_, err := mgr.save(target)
|
||||
return err
|
||||
}
|
||||
|
||||
func (mgr *dbManager) save(target uint64) (uint64, error) {
|
||||
mgr.mtx.Lock()
|
||||
defer mgr.mtx.Unlock()
|
||||
if mgr.openWriters > 0 {
|
||||
return 0, dbm.ErrOpenTransactions
|
||||
}
|
||||
newVmgr := mgr.vmgr.Copy()
|
||||
target, err := newVmgr.Save(target)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
cp, err := mgr.current.NewCheckpoint()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
dir := filepath.Join(mgr.checkpointsDir(), fmt.Sprintf(checkpointFileFormat, target))
|
||||
if err := cp.CreateCheckpoint(dir, 0); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
cp.Destroy()
|
||||
mgr.vmgr = newVmgr
|
||||
return target, nil
|
||||
}
|
||||
|
||||
func (mgr *dbManager) DeleteVersion(ver uint64) error {
|
||||
if mgr.cpCache.has(ver) {
|
||||
return dbm.ErrOpenTransactions
|
||||
}
|
||||
mgr.mtx.Lock()
|
||||
defer mgr.mtx.Unlock()
|
||||
dbPath, err := mgr.checkpointPath(ver)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mgr.vmgr = mgr.vmgr.Copy()
|
||||
mgr.vmgr.Delete(ver)
|
||||
return os.RemoveAll(dbPath)
|
||||
}
|
||||
|
||||
func (mgr *dbManager) Revert() (err error) {
|
||||
mgr.mtx.RLock()
|
||||
defer mgr.mtx.RUnlock()
|
||||
if mgr.openWriters > 0 {
|
||||
return dbm.ErrOpenTransactions
|
||||
}
|
||||
last := mgr.vmgr.Last()
|
||||
if last == 0 {
|
||||
return dbm.ErrInvalidVersion
|
||||
}
|
||||
// Close current connection and replace it with a checkpoint (created from the last checkpoint)
|
||||
mgr.current.Close()
|
||||
dbPath := filepath.Join(mgr.dir, currentDBFileName)
|
||||
err = os.RemoveAll(dbPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = mgr.restoreFromCheckpoint(last, dbPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
mgr.current, err = gorocksdb.OpenOptimisticTransactionDb(mgr.opts.dbo, dbPath)
|
||||
return
|
||||
}
|
||||
|
||||
func (mgr *dbManager) restoreFromCheckpoint(version uint64, path string) error {
|
||||
cxn, err := mgr.openCheckpoint(version)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer mgr.cpCache.decrement(version)
|
||||
cp, err := cxn.NewCheckpoint()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = cp.CreateCheckpoint(path, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cp.Destroy()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close implements DBConnection.
|
||||
func (mgr *dbManager) Close() error {
|
||||
mgr.current.Close()
|
||||
mgr.opts.destroy()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stats implements DBConnection.
|
||||
func (mgr *dbManager) Stats() map[string]string {
|
||||
keys := []string{"rocksdb.stats"}
|
||||
stats := make(map[string]string, len(keys))
|
||||
for _, key := range keys {
|
||||
stats[key] = mgr.current.GetProperty(key)
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
// Get implements DBReader.
|
||||
func (tx *dbTxn) Get(key []byte) ([]byte, error) {
|
||||
if tx.txn == nil {
|
||||
return nil, dbm.ErrTransactionClosed
|
||||
}
|
||||
if len(key) == 0 {
|
||||
return nil, dbm.ErrKeyEmpty
|
||||
}
|
||||
res, err := tx.txn.Get(tx.mgr.opts.ro, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return moveSliceToBytes(res), nil
|
||||
}
|
||||
|
||||
// Get implements DBReader.
|
||||
func (tx *dbWriter) Get(key []byte) ([]byte, error) {
|
||||
if tx.txn == nil {
|
||||
return nil, dbm.ErrTransactionClosed
|
||||
}
|
||||
if len(key) == 0 {
|
||||
return nil, dbm.ErrKeyEmpty
|
||||
}
|
||||
res, err := tx.txn.GetForUpdate(tx.mgr.opts.ro, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return moveSliceToBytes(res), nil
|
||||
}
|
||||
|
||||
// Has implements DBReader.
|
||||
func (tx *dbTxn) Has(key []byte) (bool, error) {
|
||||
bytes, err := tx.Get(key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return bytes != nil, nil
|
||||
}
|
||||
|
||||
// Set implements DBWriter.
|
||||
func (tx *dbWriter) Set(key []byte, value []byte) error {
|
||||
if tx.txn == nil {
|
||||
return dbm.ErrTransactionClosed
|
||||
}
|
||||
if err := dbutil.ValidateKv(key, value); err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.txn.Put(key, value)
|
||||
}
|
||||
|
||||
// Delete implements DBWriter.
|
||||
func (tx *dbWriter) Delete(key []byte) error {
|
||||
if tx.txn == nil {
|
||||
return dbm.ErrTransactionClosed
|
||||
}
|
||||
if len(key) == 0 {
|
||||
return dbm.ErrKeyEmpty
|
||||
}
|
||||
return tx.txn.Delete(key)
|
||||
}
|
||||
|
||||
func (tx *dbWriter) Commit() (err error) {
|
||||
if tx.txn == nil {
|
||||
return dbm.ErrTransactionClosed
|
||||
}
|
||||
defer func() { err = dbutil.CombineErrors(err, tx.Discard(), "Discard also failed") }()
|
||||
err = tx.txn.Commit()
|
||||
return
|
||||
}
|
||||
|
||||
func (tx *dbTxn) Discard() error {
|
||||
if tx.txn == nil {
|
||||
return nil // Discard() is idempotent
|
||||
}
|
||||
defer func() { tx.txn.Destroy(); tx.txn = nil }()
|
||||
if tx.version == 0 {
|
||||
return nil
|
||||
}
|
||||
if !tx.mgr.cpCache.decrement(tx.version) {
|
||||
return fmt.Errorf("transaction has no corresponding checkpoint cache entry: %v", tx.version)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tx *dbWriter) Discard() error {
|
||||
if tx.txn != nil {
|
||||
defer atomic.AddInt32(&tx.mgr.openWriters, -1)
|
||||
}
|
||||
return tx.dbTxn.Discard()
|
||||
}
|
||||
|
||||
// Iterator implements DBReader.
|
||||
func (tx *dbTxn) Iterator(start, end []byte) (dbm.Iterator, error) {
|
||||
if tx.txn == nil {
|
||||
return nil, dbm.ErrTransactionClosed
|
||||
}
|
||||
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
|
||||
return nil, dbm.ErrKeyEmpty
|
||||
}
|
||||
itr := tx.txn.NewIterator(tx.mgr.opts.ro)
|
||||
return newRocksDBIterator(itr, start, end, false), nil
|
||||
}
|
||||
|
||||
// ReverseIterator implements DBReader.
|
||||
func (tx *dbTxn) ReverseIterator(start, end []byte) (dbm.Iterator, error) {
|
||||
if tx.txn == nil {
|
||||
return nil, dbm.ErrTransactionClosed
|
||||
}
|
||||
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
|
||||
return nil, dbm.ErrKeyEmpty
|
||||
}
|
||||
itr := tx.txn.NewIterator(tx.mgr.opts.ro)
|
||||
return newRocksDBIterator(itr, start, end, true), nil
|
||||
}
|
||||
|
||||
func (o dbOptions) destroy() {
|
||||
o.ro.Destroy()
|
||||
o.wo.Destroy()
|
||||
o.txo.Destroy()
|
||||
o.dbo.Destroy()
|
||||
}
|
||||
|
||||
func (cpc *checkpointCache) has(ver uint64) bool {
|
||||
cpc.mtx.RLock()
|
||||
defer cpc.mtx.RUnlock()
|
||||
_, has := cpc.cache[ver]
|
||||
return has
|
||||
}
|
||||
|
||||
func (cpc *checkpointCache) decrement(ver uint64) bool {
|
||||
cpc.mtx.Lock()
|
||||
defer cpc.mtx.Unlock()
|
||||
cp, has := cpc.cache[ver]
|
||||
if !has {
|
||||
return false
|
||||
}
|
||||
cp.openCount -= 1
|
||||
if cp.openCount == 0 {
|
||||
cp.cxn.Close()
|
||||
delete(cpc.cache, ver)
|
||||
}
|
||||
return true
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
package rocksdb
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
dbm "github.com/cosmos/cosmos-sdk/db"
|
||||
"github.com/cosmos/cosmos-sdk/db/dbtest"
|
||||
)
|
||||
|
||||
func load(t *testing.T, dir string) dbm.DBConnection {
|
||||
db, err := NewDB(dir)
|
||||
require.NoError(t, err)
|
||||
return db
|
||||
}
|
||||
|
||||
func TestGetSetHasDelete(t *testing.T) {
|
||||
dbtest.DoTestGetSetHasDelete(t, load)
|
||||
}
|
||||
|
||||
func TestIterators(t *testing.T) {
|
||||
dbtest.DoTestIterators(t, load)
|
||||
}
|
||||
|
||||
func TestTransactions(t *testing.T) {
|
||||
dbtest.DoTestTransactions(t, load, true)
|
||||
}
|
||||
|
||||
func TestVersioning(t *testing.T) {
|
||||
dbtest.DoTestVersioning(t, load)
|
||||
}
|
||||
|
||||
func TestRevert(t *testing.T) {
|
||||
dbtest.DoTestRevert(t, load, false)
|
||||
dbtest.DoTestRevert(t, load, true)
|
||||
}
|
||||
|
||||
func TestReloadDB(t *testing.T) {
|
||||
dbtest.DoTestReloadDB(t, load)
|
||||
}
|
||||
|
||||
// Test that the DB can be reloaded after a failed Revert
|
||||
func TestRevertRecovery(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
db, err := NewDB(dir)
|
||||
require.NoError(t, err)
|
||||
_, err = db.SaveNextVersion()
|
||||
require.NoError(t, err)
|
||||
txn := db.Writer()
|
||||
require.NoError(t, txn.Set([]byte{1}, []byte{1}))
|
||||
require.NoError(t, txn.Set([]byte{2}, []byte{2}))
|
||||
require.NoError(t, txn.Commit())
|
||||
|
||||
// make checkpoints dir temporarily unreadable to trigger an error
|
||||
require.NoError(t, os.Chmod(db.checkpointsDir(), 0000))
|
||||
require.Error(t, db.Revert())
|
||||
|
||||
require.NoError(t, os.Chmod(db.checkpointsDir(), 0755))
|
||||
db, err = NewDB(dir)
|
||||
require.NoError(t, err)
|
||||
}
|
|
@ -0,0 +1,147 @@
|
|||
package rocksdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
dbm "github.com/cosmos/cosmos-sdk/db"
|
||||
"github.com/tecbot/gorocksdb"
|
||||
)
|
||||
|
||||
type rocksDBIterator struct {
|
||||
source *gorocksdb.Iterator
|
||||
start, end []byte
|
||||
isReverse bool
|
||||
isInvalid bool
|
||||
// Whether iterator has been advanced to the first element (is fully initialized)
|
||||
primed bool
|
||||
}
|
||||
|
||||
var _ dbm.Iterator = (*rocksDBIterator)(nil)
|
||||
|
||||
func newRocksDBIterator(source *gorocksdb.Iterator, start, end []byte, isReverse bool) *rocksDBIterator {
|
||||
if isReverse {
|
||||
if end == nil {
|
||||
source.SeekToLast()
|
||||
} else {
|
||||
source.Seek(end)
|
||||
if source.Valid() {
|
||||
eoakey := moveSliceToBytes(source.Key()) // end or after key
|
||||
if bytes.Compare(end, eoakey) <= 0 {
|
||||
source.Prev()
|
||||
}
|
||||
} else {
|
||||
source.SeekToLast()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if start == nil {
|
||||
source.SeekToFirst()
|
||||
} else {
|
||||
source.Seek(start)
|
||||
}
|
||||
}
|
||||
return &rocksDBIterator{
|
||||
source: source,
|
||||
start: start,
|
||||
end: end,
|
||||
isReverse: isReverse,
|
||||
isInvalid: false,
|
||||
primed: false,
|
||||
}
|
||||
}
|
||||
|
||||
// Domain implements Iterator.
|
||||
func (itr *rocksDBIterator) Domain() ([]byte, []byte) {
|
||||
return itr.start, itr.end
|
||||
}
|
||||
|
||||
// Valid implements Iterator.
|
||||
func (itr *rocksDBIterator) Valid() bool {
|
||||
if !itr.primed {
|
||||
return false
|
||||
}
|
||||
|
||||
if itr.isInvalid {
|
||||
return false
|
||||
}
|
||||
|
||||
if !itr.source.Valid() {
|
||||
itr.isInvalid = true
|
||||
return false
|
||||
}
|
||||
|
||||
var (
|
||||
start = itr.start
|
||||
end = itr.end
|
||||
key = moveSliceToBytes(itr.source.Key())
|
||||
)
|
||||
// If key is end or past it, invalid.
|
||||
if itr.isReverse {
|
||||
if start != nil && bytes.Compare(key, start) < 0 {
|
||||
itr.isInvalid = true
|
||||
return false
|
||||
}
|
||||
} else {
|
||||
if end != nil && bytes.Compare(key, end) >= 0 {
|
||||
itr.isInvalid = true
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Key implements Iterator.
|
||||
func (itr *rocksDBIterator) Key() []byte {
|
||||
itr.assertIsValid()
|
||||
return moveSliceToBytes(itr.source.Key())
|
||||
}
|
||||
|
||||
// Value implements Iterator.
|
||||
func (itr *rocksDBIterator) Value() []byte {
|
||||
itr.assertIsValid()
|
||||
return moveSliceToBytes(itr.source.Value())
|
||||
}
|
||||
|
||||
// Next implements Iterator.
|
||||
func (itr *rocksDBIterator) Next() bool {
|
||||
if !itr.primed {
|
||||
itr.primed = true
|
||||
} else {
|
||||
if itr.isReverse {
|
||||
itr.source.Prev()
|
||||
} else {
|
||||
itr.source.Next()
|
||||
}
|
||||
}
|
||||
return itr.Valid()
|
||||
}
|
||||
|
||||
// Error implements Iterator.
|
||||
func (itr *rocksDBIterator) Error() error {
|
||||
return itr.source.Err()
|
||||
}
|
||||
|
||||
// Close implements Iterator.
|
||||
func (itr *rocksDBIterator) Close() error {
|
||||
itr.source.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (itr *rocksDBIterator) assertIsValid() {
|
||||
if !itr.Valid() {
|
||||
panic("iterator is invalid")
|
||||
}
|
||||
}
|
||||
|
||||
// moveSliceToBytes will free the slice and copy out a go []byte
|
||||
// This function can be applied on *Slice returned from Key() and Value()
|
||||
// of an Iterator, because they are marked as freed.
|
||||
func moveSliceToBytes(s *gorocksdb.Slice) []byte {
|
||||
defer s.Free()
|
||||
if !s.Exists() {
|
||||
return nil
|
||||
}
|
||||
v := make([]byte, s.Size())
|
||||
copy(v, s.Data())
|
||||
return v
|
||||
}
|
13
db/types.go
13
db/types.go
|
@ -3,8 +3,8 @@ package db
|
|||
import "errors"
|
||||
|
||||
var (
|
||||
// ErrBatchClosed is returned when a closed or written batch is used.
|
||||
ErrBatchClosed = errors.New("batch has been written or closed")
|
||||
// ErrTransactionClosed is returned when a closed or written transaction is used.
|
||||
ErrTransactionClosed = errors.New("transaction has been written or closed")
|
||||
|
||||
// ErrKeyEmpty is returned when attempting to use an empty or nil key.
|
||||
ErrKeyEmpty = errors.New("key cannot be empty")
|
||||
|
@ -61,6 +61,11 @@ type DBConnection interface {
|
|||
// Deletes a saved version. Returns ErrVersionDoesNotExist for invalid versions.
|
||||
DeleteVersion(uint64) error
|
||||
|
||||
// Reverts the DB state to the last saved version.
|
||||
// Returns an error if no saved versions exist.
|
||||
// Returns an error if any open DBWriter transactions exist.
|
||||
Revert() error
|
||||
|
||||
// Close closes the database connection.
|
||||
Close() error
|
||||
}
|
||||
|
@ -97,7 +102,7 @@ type DBReader interface {
|
|||
ReverseIterator(start, end []byte) (Iterator, error)
|
||||
|
||||
// Discards the transaction, invalidating any future operations on it.
|
||||
Discard()
|
||||
Discard() error
|
||||
}
|
||||
|
||||
// DBWriter is a write-only transaction interface.
|
||||
|
@ -117,7 +122,7 @@ type DBWriter interface {
|
|||
Commit() error
|
||||
|
||||
// Discards the transaction, invalidating any future operations on it.
|
||||
Discard()
|
||||
Discard() error
|
||||
}
|
||||
|
||||
// DBReadWriter is a transaction interface that allows both reading and writing.
|
||||
|
|
1
go.mod
1
go.mod
|
@ -11,6 +11,7 @@ require (
|
|||
github.com/confio/ics23/go v0.6.6
|
||||
github.com/cosmos/btcutil v1.0.4
|
||||
github.com/cosmos/cosmos-proto v0.0.0-20210914142853-23ed61ac79ce
|
||||
github.com/cosmos/cosmos-sdk/db v0.0.0-20210831080937-2c31451a55b5
|
||||
github.com/cosmos/go-bip39 v1.0.0
|
||||
github.com/cosmos/iavl v0.17.1
|
||||
github.com/cosmos/ledger-cosmos-go v0.11.1
|
||||
|
|
2
go.sum
2
go.sum
|
@ -168,6 +168,8 @@ github.com/cosmos/btcutil v1.0.4 h1:n7C2ngKXo7UC9gNyMNLbzqz7Asuf+7Qv4gnX/rOdQ44=
|
|||
github.com/cosmos/btcutil v1.0.4/go.mod h1:Ffqc8Hn6TJUdDgHBwIZLtrLQC1KdJ9jGJl/TvgUaxbU=
|
||||
github.com/cosmos/cosmos-proto v0.0.0-20210914142853-23ed61ac79ce h1:nin7WtIMETZ8LezEYa5e9/iqyEgQka1x0cQYqgUeTGM=
|
||||
github.com/cosmos/cosmos-proto v0.0.0-20210914142853-23ed61ac79ce/go.mod h1:g2Q3cd94kOBVRAv7ahdtO27yUc4cpNuHGnI40qanl1k=
|
||||
github.com/cosmos/cosmos-sdk/db v0.0.0-20210831080937-2c31451a55b5 h1:GdtDczrd06rDuZ02iprBUfo0JkeauWHBQSDqoDYShf4=
|
||||
github.com/cosmos/cosmos-sdk/db v0.0.0-20210831080937-2c31451a55b5/go.mod h1:eAiR2sIGn3oIrcDiEUIqmH8UvPdIvN67Ui0XeKuTDnI=
|
||||
github.com/cosmos/go-bip39 v0.0.0-20180819234021-555e2067c45d/go.mod h1:tSxLoYXyBmiFeKpvmq4dzayMdCjCnu8uqmCysIGBT2Y=
|
||||
github.com/cosmos/go-bip39 v1.0.0 h1:pcomnQdrdH22njcAatO0yWojsUnCO3y2tNoV1cb6hHY=
|
||||
github.com/cosmos/go-bip39 v1.0.0/go.mod h1:RNJv0H/pOIVgxw6KS7QeX2a0Uo0aKUlfhZ4xuwvCdJw=
|
||||
|
|
Loading…
Reference in New Issue