Started implementing SimpleDB for MemKVStore
This commit is contained in:
parent
caff0ad01b
commit
28ebfd64dd
|
@ -0,0 +1,111 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
)
|
||||
|
||||
// KVCache is a cache that enforces deterministic sync order.
|
||||
type KVCache struct {
|
||||
store KVStore
|
||||
cache map[string]kvCacheValue
|
||||
keys *list.List
|
||||
logging bool
|
||||
logLines []string
|
||||
}
|
||||
|
||||
type kvCacheValue struct {
|
||||
v []byte // The value of some key
|
||||
e *list.Element // The KVCache.keys element
|
||||
}
|
||||
|
||||
// NOTE: If store is nil, creates a new MemKVStore
|
||||
func NewKVCache(store KVStore) *KVCache {
|
||||
if store == nil {
|
||||
store = NewMemKVStore()
|
||||
}
|
||||
return (&KVCache{
|
||||
store: store,
|
||||
}).Reset()
|
||||
}
|
||||
|
||||
func (kvc *KVCache) SetLogging() {
|
||||
kvc.logging = true
|
||||
}
|
||||
|
||||
func (kvc *KVCache) GetLogLines() []string {
|
||||
return kvc.logLines
|
||||
}
|
||||
|
||||
func (kvc *KVCache) ClearLogLines() {
|
||||
kvc.logLines = nil
|
||||
}
|
||||
|
||||
func (kvc *KVCache) Reset() *KVCache {
|
||||
kvc.cache = make(map[string]kvCacheValue)
|
||||
kvc.keys = list.New()
|
||||
return kvc
|
||||
}
|
||||
|
||||
func (kvc *KVCache) Set(key []byte, value []byte) {
|
||||
if kvc.logging {
|
||||
line := fmt.Sprintf("Set %v = %v", LegibleBytes(key), LegibleBytes(value))
|
||||
kvc.logLines = append(kvc.logLines, line)
|
||||
}
|
||||
cacheValue, ok := kvc.cache[string(key)]
|
||||
if ok {
|
||||
kvc.keys.MoveToBack(cacheValue.e)
|
||||
} else {
|
||||
cacheValue.e = kvc.keys.PushBack(key)
|
||||
}
|
||||
cacheValue.v = value
|
||||
kvc.cache[string(key)] = cacheValue
|
||||
}
|
||||
|
||||
func (kvc *KVCache) Get(key []byte) (value []byte) {
|
||||
cacheValue, ok := kvc.cache[string(key)]
|
||||
if ok {
|
||||
if kvc.logging {
|
||||
line := fmt.Sprintf("Get (hit) %v = %v", LegibleBytes(key), LegibleBytes(cacheValue.v))
|
||||
kvc.logLines = append(kvc.logLines, line)
|
||||
}
|
||||
return cacheValue.v
|
||||
} else {
|
||||
value := kvc.store.Get(key)
|
||||
kvc.cache[string(key)] = kvCacheValue{
|
||||
v: value,
|
||||
e: kvc.keys.PushBack(key),
|
||||
}
|
||||
if kvc.logging {
|
||||
line := fmt.Sprintf("Get (miss) %v = %v", LegibleBytes(key), LegibleBytes(value))
|
||||
kvc.logLines = append(kvc.logLines, line)
|
||||
}
|
||||
return value
|
||||
}
|
||||
}
|
||||
|
||||
//Update the store with the values from the cache
|
||||
func (kvc *KVCache) Sync() {
|
||||
for e := kvc.keys.Front(); e != nil; e = e.Next() {
|
||||
key := e.Value.([]byte)
|
||||
value := kvc.cache[string(key)]
|
||||
kvc.store.Set(key, value.v)
|
||||
}
|
||||
kvc.Reset()
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
func LegibleBytes(data []byte) string {
|
||||
s := ""
|
||||
for _, b := range data {
|
||||
if 0x21 <= b && b < 0x7F {
|
||||
s += cmn.Green(string(b))
|
||||
} else {
|
||||
s += cmn.Blue(cmn.Fmt("%02X", b))
|
||||
}
|
||||
}
|
||||
return s
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestKVCache(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
//stores to be tested
|
||||
ms := NewMemKVStore()
|
||||
store := NewMemKVStore()
|
||||
kvc := NewKVCache(store)
|
||||
|
||||
//key value pairs to be tested within the system
|
||||
var keyvalue = []struct {
|
||||
key string
|
||||
value string
|
||||
}{
|
||||
{"foo", "snake"},
|
||||
{"bar", "mouse"},
|
||||
}
|
||||
|
||||
//set the kvc to have all the key value pairs
|
||||
setRecords := func(kv KVStore) {
|
||||
for _, n := range keyvalue {
|
||||
kv.Set([]byte(n.key), []byte(n.value))
|
||||
}
|
||||
}
|
||||
|
||||
//store has all the key value pairs
|
||||
storeHasAll := func(kv KVStore) bool {
|
||||
for _, n := range keyvalue {
|
||||
if !bytes.Equal(kv.Get([]byte(n.key)), []byte(n.value)) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
//test read/write for MemKVStore
|
||||
setRecords(ms)
|
||||
assert.True(storeHasAll(ms), "MemKVStore doesn't retrieve after Set")
|
||||
|
||||
//test read/write for KVCache
|
||||
setRecords(kvc)
|
||||
assert.True(storeHasAll(kvc), "KVCache doesn't retrieve after Set")
|
||||
|
||||
//test reset
|
||||
kvc.Reset()
|
||||
assert.False(storeHasAll(kvc), "KVCache retrieving after reset")
|
||||
|
||||
//test sync
|
||||
setRecords(kvc)
|
||||
assert.False(storeHasAll(store), "store retrieving before synced")
|
||||
kvc.Sync()
|
||||
assert.True(storeHasAll(store), "store isn't retrieving after synced")
|
||||
|
||||
//test logging
|
||||
assert.Zero(len(kvc.GetLogLines()), "logging events existed before using SetLogging")
|
||||
kvc.SetLogging()
|
||||
setRecords(kvc)
|
||||
assert.Equal(len(kvc.GetLogLines()), 2, "incorrect number of logging events recorded")
|
||||
kvc.ClearLogLines()
|
||||
assert.Zero(len(kvc.GetLogLines()), "logging events still exists after ClearLogLines")
|
||||
|
||||
}
|
187
state/kvstore.go
187
state/kvstore.go
|
@ -1,13 +1,12 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/tendermint/go-wire/data"
|
||||
. "github.com/tendermint/tmlibs/common"
|
||||
)
|
||||
|
||||
// KVStore is a simple interface to get/set data
|
||||
type KVStore interface {
|
||||
Set(key, value []byte)
|
||||
Get(key []byte) (value []byte)
|
||||
|
@ -15,30 +14,34 @@ type KVStore interface {
|
|||
|
||||
//----------------------------------------
|
||||
|
||||
// Model grabs together key and value to allow easier return values
|
||||
type Model struct {
|
||||
Key data.Bytes
|
||||
Value data.Bytes
|
||||
}
|
||||
|
||||
// What I wished to have...
|
||||
// SimpleDB allows us to do some basic range queries on a db
|
||||
type SimpleDB interface {
|
||||
KVStore
|
||||
|
||||
Has(key []byte) (has bool)
|
||||
Remove(key []byte) (value []byte) // returns old value if there was one
|
||||
|
||||
// Start is inclusive, End is exclusive...
|
||||
// Thus List ([]byte{12, 13}, []byte{12, 14}) will return anything with
|
||||
// the prefix []byte{12, 13}
|
||||
List(start, end []byte, limit int) []Model
|
||||
First(start, end []byte) Model
|
||||
Last(start, end []byte) Model
|
||||
|
||||
// Checkpoint returns the same state, but where writes
|
||||
// are buffered and don't affect the parent
|
||||
Checkpoint() SimpleDB
|
||||
// // Checkpoint returns the same state, but where writes
|
||||
// // are buffered and don't affect the parent
|
||||
// Checkpoint() SimpleDB
|
||||
|
||||
// Commit will take all changes from the checkpoint and write
|
||||
// them to the parent.
|
||||
// Returns an error if this is not a child of this one
|
||||
Commit(SimpleDB) error
|
||||
// // Commit will take all changes from the checkpoint and write
|
||||
// // them to the parent.
|
||||
// // Returns an error if this is not a child of this one
|
||||
// Commit(SimpleDB) error
|
||||
|
||||
// Discard will remove reference to this
|
||||
Discard()
|
||||
|
@ -50,6 +53,8 @@ type MemKVStore struct {
|
|||
m map[string][]byte
|
||||
}
|
||||
|
||||
var _ SimpleDB = NewMemKVStore()
|
||||
|
||||
func NewMemKVStore() *MemKVStore {
|
||||
return &MemKVStore{
|
||||
m: make(map[string][]byte, 0),
|
||||
|
@ -64,107 +69,75 @@ func (mkv *MemKVStore) Get(key []byte) (value []byte) {
|
|||
return mkv.m[string(key)]
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
// A Cache that enforces deterministic sync order.
|
||||
type KVCache struct {
|
||||
store KVStore
|
||||
cache map[string]kvCacheValue
|
||||
keys *list.List
|
||||
logging bool
|
||||
logLines []string
|
||||
func (mkv *MemKVStore) Has(key []byte) (has bool) {
|
||||
_, ok := mkv.m[string(key)]
|
||||
return ok
|
||||
}
|
||||
|
||||
type kvCacheValue struct {
|
||||
v []byte // The value of some key
|
||||
e *list.Element // The KVCache.keys element
|
||||
func (mkv *MemKVStore) Remove(key []byte) (value []byte) {
|
||||
val := mkv.m[string(key)]
|
||||
delete(mkv.m, string(key))
|
||||
return val
|
||||
}
|
||||
|
||||
// NOTE: If store is nil, creates a new MemKVStore
|
||||
func NewKVCache(store KVStore) *KVCache {
|
||||
if store == nil {
|
||||
store = NewMemKVStore()
|
||||
}
|
||||
return (&KVCache{
|
||||
store: store,
|
||||
}).Reset()
|
||||
}
|
||||
func (mkv *MemKVStore) List(start, end []byte, limit int) []Model {
|
||||
keys := mkv.keysInRange(start, end)
|
||||
sort.Strings(keys)
|
||||
keys = keys[:limit]
|
||||
|
||||
func (kvc *KVCache) SetLogging() {
|
||||
kvc.logging = true
|
||||
}
|
||||
|
||||
func (kvc *KVCache) GetLogLines() []string {
|
||||
return kvc.logLines
|
||||
}
|
||||
|
||||
func (kvc *KVCache) ClearLogLines() {
|
||||
kvc.logLines = nil
|
||||
}
|
||||
|
||||
func (kvc *KVCache) Reset() *KVCache {
|
||||
kvc.cache = make(map[string]kvCacheValue)
|
||||
kvc.keys = list.New()
|
||||
return kvc
|
||||
}
|
||||
|
||||
func (kvc *KVCache) Set(key []byte, value []byte) {
|
||||
if kvc.logging {
|
||||
line := fmt.Sprintf("Set %v = %v", LegibleBytes(key), LegibleBytes(value))
|
||||
kvc.logLines = append(kvc.logLines, line)
|
||||
}
|
||||
cacheValue, ok := kvc.cache[string(key)]
|
||||
if ok {
|
||||
kvc.keys.MoveToBack(cacheValue.e)
|
||||
} else {
|
||||
cacheValue.e = kvc.keys.PushBack(key)
|
||||
}
|
||||
cacheValue.v = value
|
||||
kvc.cache[string(key)] = cacheValue
|
||||
}
|
||||
|
||||
func (kvc *KVCache) Get(key []byte) (value []byte) {
|
||||
cacheValue, ok := kvc.cache[string(key)]
|
||||
if ok {
|
||||
if kvc.logging {
|
||||
line := fmt.Sprintf("Get (hit) %v = %v", LegibleBytes(key), LegibleBytes(cacheValue.v))
|
||||
kvc.logLines = append(kvc.logLines, line)
|
||||
}
|
||||
return cacheValue.v
|
||||
} else {
|
||||
value := kvc.store.Get(key)
|
||||
kvc.cache[string(key)] = kvCacheValue{
|
||||
v: value,
|
||||
e: kvc.keys.PushBack(key),
|
||||
}
|
||||
if kvc.logging {
|
||||
line := fmt.Sprintf("Get (miss) %v = %v", LegibleBytes(key), LegibleBytes(value))
|
||||
kvc.logLines = append(kvc.logLines, line)
|
||||
}
|
||||
return value
|
||||
}
|
||||
}
|
||||
|
||||
//Update the store with the values from the cache
|
||||
func (kvc *KVCache) Sync() {
|
||||
for e := kvc.keys.Front(); e != nil; e = e.Next() {
|
||||
key := e.Value.([]byte)
|
||||
value := kvc.cache[string(key)]
|
||||
kvc.store.Set(key, value.v)
|
||||
}
|
||||
kvc.Reset()
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
func LegibleBytes(data []byte) string {
|
||||
s := ""
|
||||
for _, b := range data {
|
||||
if 0x21 <= b && b < 0x7F {
|
||||
s += Green(string(b))
|
||||
} else {
|
||||
s += Blue(Fmt("%02X", b))
|
||||
res := make([]Model, len(keys))
|
||||
for i, k := range keys {
|
||||
res[i] = Model{
|
||||
Key: []byte(k),
|
||||
Value: mkv.m[k],
|
||||
}
|
||||
}
|
||||
return s
|
||||
return res
|
||||
}
|
||||
|
||||
// First iterates through all keys to find the one that matches
|
||||
func (mkv *MemKVStore) First(start, end []byte) Model {
|
||||
key := ""
|
||||
for _, k := range mkv.keysInRange(start, end) {
|
||||
if key == "" || k < key {
|
||||
key = k
|
||||
}
|
||||
}
|
||||
if key == "" {
|
||||
return Model{}
|
||||
}
|
||||
return Model{
|
||||
Key: []byte(key),
|
||||
Value: mkv.m[key],
|
||||
}
|
||||
}
|
||||
|
||||
func (mkv *MemKVStore) Last(start, end []byte) Model {
|
||||
key := ""
|
||||
for _, k := range mkv.keysInRange(start, end) {
|
||||
if key == "" || k > key {
|
||||
key = k
|
||||
}
|
||||
}
|
||||
if key == "" {
|
||||
return Model{}
|
||||
}
|
||||
return Model{
|
||||
Key: []byte(key),
|
||||
Value: mkv.m[key],
|
||||
}
|
||||
}
|
||||
|
||||
func (mkv *MemKVStore) Discard() {
|
||||
mkv.m = make(map[string][]byte, 0)
|
||||
}
|
||||
|
||||
func (mkv *MemKVStore) keysInRange(start, end []byte) (res []string) {
|
||||
s, e := string(start), string(end)
|
||||
for k := range mkv.m {
|
||||
if k >= s && k < e {
|
||||
res = append(res, k)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue