Merge pull request #4 from tendermint/develop

C LevelDB
This commit is contained in:
Ethan Buchman 2017-01-12 21:37:57 -05:00 committed by GitHub
commit 996c483f23
7 changed files with 438 additions and 106 deletions

134
c_level_db.go Normal file
View File

@ -0,0 +1,134 @@
// +build gcc
package db
import (
"fmt"
"path"
"github.com/jmhodges/levigo"
. "github.com/tendermint/go-common"
)
func init() {
dbCreator := func(name string, dir string) (DB, error) {
return NewCLevelDB(name, dir)
}
registerDBCreator(LevelDBBackendStr, dbCreator, true)
registerDBCreator(CLevelDBBackendStr, dbCreator, false)
}
type CLevelDB struct {
db *levigo.DB
ro *levigo.ReadOptions
wo *levigo.WriteOptions
woSync *levigo.WriteOptions
}
func NewCLevelDB(name string, dir string) (*CLevelDB, error) {
dbPath := path.Join(dir, name+".db")
opts := levigo.NewOptions()
opts.SetCache(levigo.NewLRUCache(1 << 30))
opts.SetCreateIfMissing(true)
db, err := levigo.Open(dbPath, opts)
if err != nil {
return nil, err
}
ro := levigo.NewReadOptions()
wo := levigo.NewWriteOptions()
woSync := levigo.NewWriteOptions()
woSync.SetSync(true)
database := &CLevelDB{
db: db,
ro: ro,
wo: wo,
woSync: woSync,
}
return database, nil
}
func (db *CLevelDB) Get(key []byte) []byte {
res, err := db.db.Get(db.ro, key)
if err != nil {
PanicCrisis(err)
}
return res
}
func (db *CLevelDB) Set(key []byte, value []byte) {
err := db.db.Put(db.wo, key, value)
if err != nil {
PanicCrisis(err)
}
}
func (db *CLevelDB) SetSync(key []byte, value []byte) {
err := db.db.Put(db.woSync, key, value)
if err != nil {
PanicCrisis(err)
}
}
func (db *CLevelDB) Delete(key []byte) {
err := db.db.Delete(db.wo, key)
if err != nil {
PanicCrisis(err)
}
}
func (db *CLevelDB) DeleteSync(key []byte) {
err := db.db.Delete(db.woSync, key)
if err != nil {
PanicCrisis(err)
}
}
func (db *CLevelDB) DB() *levigo.DB {
return db.db
}
func (db *CLevelDB) Close() {
db.db.Close()
db.ro.Close()
db.wo.Close()
db.woSync.Close()
}
func (db *CLevelDB) Print() {
iter := db.db.NewIterator(db.ro)
defer iter.Close()
for iter.Seek(nil); iter.Valid(); iter.Next() {
key := iter.Key()
value := iter.Value()
fmt.Printf("[%X]:\t[%X]\n", key, value)
}
}
func (db *CLevelDB) NewBatch() Batch {
batch := levigo.NewWriteBatch()
return &cLevelDBBatch{db, batch}
}
//--------------------------------------------------------------------------------
type cLevelDBBatch struct {
db *CLevelDB
batch *levigo.WriteBatch
}
func (mBatch *cLevelDBBatch) Set(key, value []byte) {
mBatch.batch.Put(key, value)
}
func (mBatch *cLevelDBBatch) Delete(key []byte) {
mBatch.batch.Delete(key)
}
func (mBatch *cLevelDBBatch) Write() {
err := mBatch.db.db.Write(mBatch.db.wo, mBatch.batch)
if err != nil {
PanicCrisis(err)
}
}

86
c_level_db_test.go Normal file
View File

@ -0,0 +1,86 @@
// +build gcc
package db
import (
"bytes"
"fmt"
"testing"
. "github.com/tendermint/go-common"
)
func BenchmarkRandomReadsWrites2(b *testing.B) {
b.StopTimer()
numItems := int64(1000000)
internal := map[int64]int64{}
for i := 0; i < int(numItems); i++ {
internal[int64(i)] = int64(0)
}
db, err := NewGoLevelDB(Fmt("test_%x", RandStr(12)), "")
if err != nil {
b.Fatal(err.Error())
return
}
fmt.Println("ok, starting")
b.StartTimer()
for i := 0; i < b.N; i++ {
// Write something
{
idx := (int64(RandInt()) % numItems)
internal[idx] += 1
val := internal[idx]
idxBytes := int642Bytes(int64(idx))
valBytes := int642Bytes(int64(val))
//fmt.Printf("Set %X -> %X\n", idxBytes, valBytes)
db.Set(
idxBytes,
valBytes,
)
}
// Read something
{
idx := (int64(RandInt()) % numItems)
val := internal[idx]
idxBytes := int642Bytes(int64(idx))
valBytes := db.Get(idxBytes)
//fmt.Printf("Get %X -> %X\n", idxBytes, valBytes)
if val == 0 {
if !bytes.Equal(valBytes, nil) {
b.Errorf("Expected %X for %v, got %X",
nil, idx, valBytes)
break
}
} else {
if len(valBytes) != 8 {
b.Errorf("Expected length 8 for %v, got %X",
idx, valBytes)
break
}
valGot := bytes2Int64(valBytes)
if val != valGot {
b.Errorf("Expected %v for %v, got %v",
val, idx, valGot)
break
}
}
}
}
db.Close()
}
/*
func int642Bytes(i int64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(i))
return buf
}
func bytes2Int64(buf []byte) int64 {
return int64(binary.BigEndian.Uint64(buf))
}
*/

51
db.go
View File

@ -1,10 +1,6 @@
package db package db
import ( import . "github.com/tendermint/go-common"
"path"
. "github.com/tendermint/go-common"
)
type DB interface { type DB interface {
Get([]byte) []byte Get([]byte) []byte
@ -13,30 +9,43 @@ type DB interface {
Delete([]byte) Delete([]byte)
DeleteSync([]byte) DeleteSync([]byte)
Close() Close()
NewBatch() Batch
// For debugging // For debugging
Print() Print()
} }
type Batch interface {
Set(key, value []byte)
Delete(key []byte)
Write()
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Database types const (
const DBBackendMemDB = "memdb" LevelDBBackendStr = "leveldb" // legacy, defaults to goleveldb.
const DBBackendLevelDB = "leveldb" CLevelDBBackendStr = "cleveldb"
GoLevelDBBackendStr = "goleveldb"
MemDBBackendStr = "memdb"
)
type dbCreator func(name string, dir string) (DB, error)
var backends = map[string]dbCreator{}
func registerDBCreator(backend string, creator dbCreator, force bool) {
_, ok := backends[backend]
if !force && ok {
return
}
backends[backend] = creator
}
func NewDB(name string, backend string, dir string) DB { func NewDB(name string, backend string, dir string) DB {
switch backend { db, err := backends[backend](name, dir)
case DBBackendMemDB: if err != nil {
db := NewMemDB() PanicSanity(Fmt("Error initializing DB: %v", err))
return db
case DBBackendLevelDB:
db, err := NewLevelDB(path.Join(dir, name+".db"))
if err != nil {
PanicCrisis(err)
}
return db
default:
PanicSanity(Fmt("Unknown DB backend: %v", backend))
} }
return nil return db
} }

118
go_level_db.go Normal file
View File

@ -0,0 +1,118 @@
package db
import (
"fmt"
"path"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
. "github.com/tendermint/go-common"
)
func init() {
dbCreator := func(name string, dir string) (DB, error) {
return NewGoLevelDB(name, dir)
}
registerDBCreator(LevelDBBackendStr, dbCreator, false)
registerDBCreator(GoLevelDBBackendStr, dbCreator, false)
}
type GoLevelDB struct {
db *leveldb.DB
}
func NewGoLevelDB(name string, dir string) (*GoLevelDB, error) {
dbPath := path.Join(dir, name+".db")
db, err := leveldb.OpenFile(dbPath, nil)
if err != nil {
return nil, err
}
database := &GoLevelDB{db: db}
return database, nil
}
func (db *GoLevelDB) Get(key []byte) []byte {
res, err := db.db.Get(key, nil)
if err != nil {
if err == errors.ErrNotFound {
return nil
} else {
PanicCrisis(err)
}
}
return res
}
func (db *GoLevelDB) Set(key []byte, value []byte) {
err := db.db.Put(key, value, nil)
if err != nil {
PanicCrisis(err)
}
}
func (db *GoLevelDB) SetSync(key []byte, value []byte) {
err := db.db.Put(key, value, &opt.WriteOptions{Sync: true})
if err != nil {
PanicCrisis(err)
}
}
func (db *GoLevelDB) Delete(key []byte) {
err := db.db.Delete(key, nil)
if err != nil {
PanicCrisis(err)
}
}
func (db *GoLevelDB) DeleteSync(key []byte) {
err := db.db.Delete(key, &opt.WriteOptions{Sync: true})
if err != nil {
PanicCrisis(err)
}
}
func (db *GoLevelDB) DB() *leveldb.DB {
return db.db
}
func (db *GoLevelDB) Close() {
db.db.Close()
}
func (db *GoLevelDB) Print() {
iter := db.db.NewIterator(nil, nil)
for iter.Next() {
key := iter.Key()
value := iter.Value()
fmt.Printf("[%X]:\t[%X]\n", key, value)
}
}
func (db *GoLevelDB) NewBatch() Batch {
batch := new(leveldb.Batch)
return &goLevelDBBatch{db, batch}
}
//--------------------------------------------------------------------------------
type goLevelDBBatch struct {
db *GoLevelDB
batch *leveldb.Batch
}
func (mBatch *goLevelDBBatch) Set(key, value []byte) {
mBatch.batch.Put(key, value)
}
func (mBatch *goLevelDBBatch) Delete(key []byte) {
mBatch.batch.Delete(key)
}
func (mBatch *goLevelDBBatch) Write() {
err := mBatch.db.db.Write(mBatch.batch, nil)
if err != nil {
PanicCrisis(err)
}
}

View File

@ -17,7 +17,7 @@ func BenchmarkRandomReadsWrites(b *testing.B) {
for i := 0; i < int(numItems); i++ { for i := 0; i < int(numItems); i++ {
internal[int64(i)] = int64(0) internal[int64(i)] = int64(0)
} }
db, err := NewLevelDB(Fmt("test_%x", RandStr(12))) db, err := NewCLevelDB(Fmt("test_%x", RandStr(12)), "")
if err != nil { if err != nil {
b.Fatal(err.Error()) b.Fatal(err.Error())
return return

View File

@ -1,83 +0,0 @@
package db
import (
"fmt"
"path"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
. "github.com/tendermint/go-common"
)
type LevelDB struct {
db *leveldb.DB
}
func NewLevelDB(name string) (*LevelDB, error) {
dbPath := path.Join(name)
db, err := leveldb.OpenFile(dbPath, nil)
if err != nil {
return nil, err
}
database := &LevelDB{db: db}
return database, nil
}
func (db *LevelDB) Get(key []byte) []byte {
res, err := db.db.Get(key, nil)
if err != nil {
if err == errors.ErrNotFound {
return nil
} else {
PanicCrisis(err)
}
}
return res
}
func (db *LevelDB) Set(key []byte, value []byte) {
err := db.db.Put(key, value, nil)
if err != nil {
PanicCrisis(err)
}
}
func (db *LevelDB) SetSync(key []byte, value []byte) {
err := db.db.Put(key, value, &opt.WriteOptions{Sync: true})
if err != nil {
PanicCrisis(err)
}
}
func (db *LevelDB) Delete(key []byte) {
err := db.db.Delete(key, nil)
if err != nil {
PanicCrisis(err)
}
}
func (db *LevelDB) DeleteSync(key []byte) {
err := db.db.Delete(key, &opt.WriteOptions{Sync: true})
if err != nil {
PanicCrisis(err)
}
}
func (db *LevelDB) DB() *leveldb.DB {
return db.db
}
func (db *LevelDB) Close() {
db.db.Close()
}
func (db *LevelDB) Print() {
iter := db.db.NewIterator(nil, nil)
for iter.Next() {
key := iter.Key()
value := iter.Value()
fmt.Printf("[%X]:\t[%X]\n", key, value)
}
}

View File

@ -2,10 +2,18 @@ package db
import ( import (
"fmt" "fmt"
"sync"
) )
func init() {
registerDBCreator(MemDBBackendStr, func(name string, dir string) (DB, error) {
return NewMemDB(), nil
}, false)
}
type MemDB struct { type MemDB struct {
db map[string][]byte mtx sync.Mutex
db map[string][]byte
} }
func NewMemDB() *MemDB { func NewMemDB() *MemDB {
@ -14,31 +22,91 @@ func NewMemDB() *MemDB {
} }
func (db *MemDB) Get(key []byte) []byte { func (db *MemDB) Get(key []byte) []byte {
db.mtx.Lock()
defer db.mtx.Unlock()
return db.db[string(key)] return db.db[string(key)]
} }
func (db *MemDB) Set(key []byte, value []byte) { func (db *MemDB) Set(key []byte, value []byte) {
db.mtx.Lock()
defer db.mtx.Unlock()
db.db[string(key)] = value db.db[string(key)] = value
} }
func (db *MemDB) SetSync(key []byte, value []byte) { func (db *MemDB) SetSync(key []byte, value []byte) {
db.mtx.Lock()
defer db.mtx.Unlock()
db.db[string(key)] = value db.db[string(key)] = value
} }
func (db *MemDB) Delete(key []byte) { func (db *MemDB) Delete(key []byte) {
db.mtx.Lock()
defer db.mtx.Unlock()
delete(db.db, string(key)) delete(db.db, string(key))
} }
func (db *MemDB) DeleteSync(key []byte) { func (db *MemDB) DeleteSync(key []byte) {
db.mtx.Lock()
defer db.mtx.Unlock()
delete(db.db, string(key)) delete(db.db, string(key))
} }
func (db *MemDB) Close() { func (db *MemDB) Close() {
db.mtx.Lock()
defer db.mtx.Unlock()
db = nil db = nil
} }
func (db *MemDB) Print() { func (db *MemDB) Print() {
db.mtx.Lock()
defer db.mtx.Unlock()
for key, value := range db.db { for key, value := range db.db {
fmt.Printf("[%X]:\t[%X]\n", []byte(key), value) fmt.Printf("[%X]:\t[%X]\n", []byte(key), value)
} }
} }
func (db *MemDB) NewBatch() Batch {
return &memDBBatch{db, nil}
}
//--------------------------------------------------------------------------------
type memDBBatch struct {
db *MemDB
ops []operation
}
type opType int
const (
opTypeSet = 1
opTypeDelete = 2
)
type operation struct {
opType
key []byte
value []byte
}
func (mBatch *memDBBatch) Set(key, value []byte) {
mBatch.ops = append(mBatch.ops, operation{opTypeSet, key, value})
}
func (mBatch *memDBBatch) Delete(key []byte) {
mBatch.ops = append(mBatch.ops, operation{opTypeDelete, key, nil})
}
func (mBatch *memDBBatch) Write() {
mBatch.db.mtx.Lock()
defer mBatch.db.mtx.Unlock()
for _, op := range mBatch.ops {
if op.opType == opTypeSet {
mBatch.db.db[string(op.key)] = op.value
} else if op.opType == opTypeDelete {
delete(mBatch.db.db, string(op.key))
}
}
}