SimpleDB works except for persisted

This commit is contained in:
Ethan Frey 2017-07-26 19:26:06 -04:00
parent 84e2fa64f1
commit f1785e312d
4 changed files with 183 additions and 79 deletions

View File

@ -1,66 +1,140 @@
package state
import "container/list"
import "errors"
// KVCache is a cache that enforces deterministic sync order.
type KVCache struct {
store KVStore
cache map[string]kvCacheValue
keys *list.List
// MemKVCache is designed to wrap MemKVStore as a cache
type MemKVCache struct {
store SimpleDB
cache *MemKVStore
}
type kvCacheValue struct {
v []byte // The value of some key
e *list.Element // The KVCache.keys element
}
var _ SimpleDB = NewMemKVStore()
// NOTE: If store is nil, creates a new MemKVStore
func NewKVCache(store KVStore) *KVCache {
// NewMemKVCache wraps a cache around MemKVStore
//
// You probably don't want to use directly, but rather
// via MemKVCache.Checkpoint()
func NewMemKVCache(store SimpleDB) *MemKVCache {
if store == nil {
store = NewMemKVStore()
panic("wtf")
}
return (&KVCache{
return &MemKVCache{
store: store,
}).Reset()
}
func (kvc *KVCache) Reset() *KVCache {
kvc.cache = make(map[string]kvCacheValue)
kvc.keys = list.New()
return kvc
}
func (kvc *KVCache) Set(key []byte, value []byte) {
cacheValue, ok := kvc.cache[string(key)]
if ok {
kvc.keys.MoveToBack(cacheValue.e)
} else {
cacheValue.e = kvc.keys.PushBack(key)
cache: NewMemKVStore(),
}
cacheValue.v = value
kvc.cache[string(key)] = cacheValue
}
func (kvc *KVCache) Get(key []byte) (value []byte) {
cacheValue, ok := kvc.cache[string(key)]
if ok {
return cacheValue.v
} else {
value := kvc.store.Get(key)
kvc.cache[string(key)] = kvCacheValue{
v: value,
e: kvc.keys.PushBack(key),
func (c *MemKVCache) Set(key []byte, value []byte) {
c.cache.Set(key, value)
}
func (c *MemKVCache) Get(key []byte) (value []byte) {
value, ok := c.cache.m[string(key)]
if !ok {
value = c.store.Get(key)
c.cache.Set(key, value)
}
return value
}
func (c *MemKVCache) Has(key []byte) bool {
value := c.Get(key)
return value != nil
}
// Remove uses nil value as a flag to delete... not ideal but good enough
// for testing
func (c *MemKVCache) Remove(key []byte) (value []byte) {
value = c.Get(key)
c.cache.Set(key, nil)
return value
}
// List is also inefficiently implemented...
func (c *MemKVCache) List(start, end []byte, limit int) []Model {
orig := c.store.List(start, end, 0)
cached := c.cache.List(start, end, 0)
keys := c.combineLists(orig, cached)
// apply limit (too late)
if limit > 0 && len(keys) > 0 {
if limit > len(keys) {
limit = len(keys)
}
keys = keys[:limit]
}
return keys
}
func (c *MemKVCache) combineLists(orig, cache []Model) []Model {
store := NewMemKVStore()
for _, m := range orig {
store.Set(m.Key, m.Value)
}
for _, m := range cache {
if m.Value == nil {
store.Remove([]byte(m.Key))
} else {
store.Set([]byte(m.Key), m.Value)
}
}
return store.List(nil, nil, 0)
}
// First is done with List, but could be much more efficient
func (c *MemKVCache) First(start, end []byte) Model {
data := c.List(start, end, 0)
if len(data) == 0 {
return Model{}
}
return data[0]
}
// Last is done with List, but could be much more efficient
func (c *MemKVCache) Last(start, end []byte) Model {
data := c.List(start, end, 0)
if len(data) == 0 {
return Model{}
}
return data[len(data)-1]
}
// Checkpoint returns the same state, but where writes
// are buffered and don't affect the parent
func (c *MemKVCache) Checkpoint() SimpleDB {
return NewMemKVCache(c)
}
// Commit will take all changes from the checkpoint and write
// them to the parent.
// Returns an error if this is not a child of this one
func (c *MemKVCache) Commit(sub SimpleDB) error {
cache, ok := sub.(*MemKVCache)
if !ok {
return errors.New("sub is not a cache")
}
// TODO: see if it points to us
// apply the cached data to us
cache.applyCache()
return nil
}
// applyCache will apply all the cache methods to the underlying store
func (c *MemKVCache) applyCache() {
for k, v := range c.cache.m {
if v == nil {
c.store.Remove([]byte(k))
} else {
c.store.Set([]byte(k), v)
}
return value
}
}
//Update the store with the values from the cache
func (kvc *KVCache) Sync() {
for e := kvc.keys.Front(); e != nil; e = e.Next() {
key := e.Value.([]byte)
value := kvc.cache[string(key)]
kvc.store.Set(key, value.v)
}
kvc.Reset()
// Discard will remove reference to this
func (c *MemKVCache) Discard() {
c.cache = NewMemKVStore()
}

View File

@ -68,7 +68,7 @@ func TestCache(t *testing.T) {
db.Set(s.Key, s.Value)
}
for k, g := range tc.toGet {
msg := fmt.Sprintf("%d/%d/%d", i, j, k)
msg := fmt.Sprintf("%d/%d/%d: %#v", i, j, k, g)
checkGet(db, g, msg)
}
for k, lq := range tc.toList {
@ -84,12 +84,12 @@ func TestCache(t *testing.T) {
}
for k, r := range tc.removeCache {
val := cache.Remove(r.Key)
assert.EqualValues(r.Value, val, "%d/%d/%d", i, j, k)
assert.EqualValues(r.Value, val, "%d/%d/%d: %#v", i, j, k, r)
}
// make sure data is in cache
for k, g := range tc.getCache {
msg := fmt.Sprintf("%d/%d/%d", i, j, k)
msg := fmt.Sprintf("%d/%d/%d: %#v", i, j, k, g)
checkGet(cache, g, msg)
}
for k, lq := range tc.listCache {
@ -99,7 +99,7 @@ func TestCache(t *testing.T) {
// data not in basic store
for k, g := range tc.toGet {
msg := fmt.Sprintf("%d/%d/%d", i, j, k)
msg := fmt.Sprintf("%d/%d/%d: %#v", i, j, k, g)
checkGet(db, g, msg)
}
for k, lq := range tc.toList {

View File

@ -1,6 +1,7 @@
package state
import (
"errors"
"sort"
"github.com/tendermint/go-wire/data"
@ -56,7 +57,7 @@ type MemKVStore struct {
m map[string][]byte
}
// var _ SimpleDB = NewMemKVStore()
var _ SimpleDB = NewMemKVStore()
// NewMemKVStore initializes a MemKVStore
func NewMemKVStore() *MemKVStore {
@ -65,28 +66,27 @@ func NewMemKVStore() *MemKVStore {
}
}
func (mkv *MemKVStore) Set(key []byte, value []byte) {
mkv.m[string(key)] = value
func (m *MemKVStore) Set(key []byte, value []byte) {
m.m[string(key)] = value
}
func (mkv *MemKVStore) Get(key []byte) (value []byte) {
return mkv.m[string(key)]
func (m *MemKVStore) Get(key []byte) (value []byte) {
return m.m[string(key)]
}
func (mkv *MemKVStore) Has(key []byte) (has bool) {
_, ok := mkv.m[string(key)]
func (m *MemKVStore) Has(key []byte) (has bool) {
_, ok := m.m[string(key)]
return ok
}
func (mkv *MemKVStore) Remove(key []byte) (value []byte) {
val := mkv.m[string(key)]
delete(mkv.m, string(key))
func (m *MemKVStore) Remove(key []byte) (value []byte) {
val := m.m[string(key)]
delete(m.m, string(key))
return val
}
func (mkv *MemKVStore) List(start, end []byte, limit int) []Model {
keys := mkv.keysInRange(start, end)
sort.Strings(keys)
func (m *MemKVStore) List(start, end []byte, limit int) []Model {
keys := m.keysInRange(start, end)
if limit > 0 && len(keys) > 0 {
if limit > len(keys) {
limit = len(keys)
@ -98,16 +98,16 @@ func (mkv *MemKVStore) List(start, end []byte, limit int) []Model {
for i, k := range keys {
res[i] = Model{
Key: []byte(k),
Value: mkv.m[k],
Value: m.m[k],
}
}
return res
}
// First iterates through all keys to find the one that matches
func (mkv *MemKVStore) First(start, end []byte) Model {
func (m *MemKVStore) First(start, end []byte) Model {
key := ""
for _, k := range mkv.keysInRange(start, end) {
for _, k := range m.keysInRange(start, end) {
if key == "" || k < key {
key = k
}
@ -117,13 +117,13 @@ func (mkv *MemKVStore) First(start, end []byte) Model {
}
return Model{
Key: []byte(key),
Value: mkv.m[key],
Value: m.m[key],
}
}
func (mkv *MemKVStore) Last(start, end []byte) Model {
func (m *MemKVStore) Last(start, end []byte) Model {
key := ""
for _, k := range mkv.keysInRange(start, end) {
for _, k := range m.keysInRange(start, end) {
if key == "" || k > key {
key = k
}
@ -133,20 +133,39 @@ func (mkv *MemKVStore) Last(start, end []byte) Model {
}
return Model{
Key: []byte(key),
Value: mkv.m[key],
Value: m.m[key],
}
}
func (mkv *MemKVStore) Discard() {
mkv.m = make(map[string][]byte, 0)
func (m *MemKVStore) Discard() {
m.m = make(map[string][]byte, 0)
}
func (mkv *MemKVStore) keysInRange(start, end []byte) (res []string) {
func (m *MemKVStore) Checkpoint() SimpleDB {
return NewMemKVCache(m)
}
func (m *MemKVStore) Commit(sub SimpleDB) error {
cache, ok := sub.(*MemKVCache)
if !ok {
return errors.New("sub is not a cache")
}
// TODO: see if it points to us
// apply the cached data to us
cache.applyCache()
return nil
}
func (m *MemKVStore) keysInRange(start, end []byte) (res []string) {
s, e := string(start), string(end)
for k := range mkv.m {
if k >= s && k < e {
for k := range m.m {
afterStart := s == "" || k >= s
beforeEnd := e == "" || k < e
if afterStart && beforeEnd {
res = append(res, k)
}
}
sort.Strings(res)
return
}

View File

@ -4,13 +4,24 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tendermint/merkleeyes/iavl"
// dbm "github.com/tendermint/tmlibs/db"
)
func GetDBs() []SimpleDB {
// // tree with persistence....
// tmpDir, err := ioutil.TempDir("", "state-tests")
// if err != nil {
// panic(err)
// }
// db := dbm.NewDB("test-get-dbs", dbm.LevelDBBackendStr, tmpDir)
// persist := iavl.NewIAVLTree(500, db)
return []SimpleDB{
// NewMemKVStore(),
NewMemKVStore(),
NewBonsai(iavl.NewIAVLTree(0, nil)),
// NewBonsai(persist),
}
}