wip: tests and fixes for kvstore iteration

This commit is contained in:
Ethan Buchman 2017-12-13 01:17:20 -05:00 committed by Jae Kwon
parent 66e6225d13
commit 72b0ed004b
6 changed files with 527 additions and 76 deletions

View File

@ -3,63 +3,494 @@ package store
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
)
func newCacheKVStore() CacheKVStore {
mem := dbm.NewMemDB()
return NewCacheKVStore(mem)
}
func keyFmt(i int) []byte { return bz(cmn.Fmt("key%0.8d", i)) }
func valFmt(i int) []byte { return bz(cmn.Fmt("value%0.8d", i)) }
func TestCacheKVStore(t *testing.T) {
mem := dbm.NewMemDB()
st := NewCacheKVStore(mem)
require.Empty(t, st.Get(bz("key1")), "Expected `key1` to be empty")
require.Empty(t, st.Get(keyFmt(1)), "Expected `key1` to be empty")
mem.Set(bz("key1"), bz("value1"))
st.Set(bz("key1"), bz("value1"))
require.Equal(t, bz("value1"), st.Get(bz("key1")))
// put something in mem and in cache
mem.Set(keyFmt(1), valFmt(1))
st.Set(keyFmt(1), valFmt(1))
require.Equal(t, valFmt(1), st.Get(keyFmt(1)))
st.Set(bz("key1"), bz("value2"))
require.Equal(t, bz("value2"), st.Get(bz("key1")))
require.Equal(t, bz("value1"), mem.Get(bz("key1")))
// update it in cache, shoudn't change mem
st.Set(keyFmt(1), valFmt(2))
require.Equal(t, valFmt(2), st.Get(keyFmt(1)))
require.Equal(t, valFmt(1), mem.Get(keyFmt(1)))
// write it. should change mem
st.Write()
require.Equal(t, bz("value2"), mem.Get(bz("key1")))
require.Equal(t, valFmt(2), mem.Get(keyFmt(1)))
require.Equal(t, valFmt(2), st.Get(keyFmt(1)))
// more writes and checks
st.Write()
st.Write()
require.Equal(t, bz("value2"), mem.Get(bz("key1")))
require.Equal(t, valFmt(2), mem.Get(keyFmt(1)))
require.Equal(t, valFmt(2), st.Get(keyFmt(1)))
// make a new one, check it
st = NewCacheKVStore(mem)
st.Delete(bz("key1"))
require.Empty(t, st.Get(bz("key1")))
require.Equal(t, mem.Get(bz("key1")), bz("value2"))
require.Equal(t, valFmt(2), st.Get(keyFmt(1)))
// make a new one and delete - should not be removed from mem
st = NewCacheKVStore(mem)
st.Delete(keyFmt(1))
require.Empty(t, st.Get(keyFmt(1)))
require.Equal(t, mem.Get(keyFmt(1)), valFmt(2))
// Write. should now be removed from both
st.Write()
require.Empty(t, st.Get(bz("key1")), "Expected `key1` to be empty")
require.Empty(t, mem.Get(bz("key1")), "Expected `key1` to be empty")
require.Empty(t, st.Get(keyFmt(1)), "Expected `key1` to be empty")
require.Empty(t, mem.Get(keyFmt(1)), "Expected `key1` to be empty")
}
func TestCacheKVStoreNested(t *testing.T) {
mem := dbm.NewMemDB()
st := NewCacheKVStore(mem)
st.Set(bz("key1"), bz("value1"))
require.Empty(t, mem.Get(bz("key1")))
require.Equal(t, bz("value1"), st.Get(bz("key1")))
// set. check its there on st and not on mem.
st.Set(keyFmt(1), valFmt(1))
require.Empty(t, mem.Get(keyFmt(1)))
require.Equal(t, valFmt(1), st.Get(keyFmt(1)))
// make a new from st and check
st2 := NewCacheKVStore(st)
require.Equal(t, bz("value1"), st2.Get(bz("key1")))
require.Equal(t, valFmt(1), st2.Get(keyFmt(1)))
st2.Set(bz("key1"), bz("VALUE2"))
require.Equal(t, []byte(nil), mem.Get(bz("key1")))
require.Equal(t, bz("value1"), st.Get(bz("key1")))
require.Equal(t, bz("VALUE2"), st2.Get(bz("key1")))
// update the value on st2, check it only effects st2
st2.Set(keyFmt(1), valFmt(3))
require.Equal(t, []byte(nil), mem.Get(keyFmt(1)))
require.Equal(t, valFmt(1), st.Get(keyFmt(1)))
require.Equal(t, valFmt(3), st2.Get(keyFmt(1)))
// st2 writes to its parent, st. doesnt effect mem
st2.Write()
require.Equal(t, []byte(nil), mem.Get(bz("key1")))
require.Equal(t, bz("VALUE2"), st.Get(bz("key1")))
require.Equal(t, []byte(nil), mem.Get(keyFmt(1)))
require.Equal(t, valFmt(3), st.Get(keyFmt(1)))
// updates mem
st.Write()
require.Equal(t, bz("VALUE2"), mem.Get(bz("key1")))
require.Equal(t, valFmt(3), mem.Get(keyFmt(1)))
}
func TestCacheKVIteratorBounds(t *testing.T) {
st := newCacheKVStore()
// set some items
nItems := 5
for i := 0; i < nItems; i++ {
st.Set(keyFmt(i), valFmt(i))
}
// iterate over all of them
itr := st.Iterator(nil, nil)
var i = 0
for ; itr.Valid(); itr.Next() {
k, v := itr.Key(), itr.Value()
assert.Equal(t, keyFmt(i), k)
assert.Equal(t, valFmt(i), v)
i += 1
}
assert.Equal(t, nItems, i)
// iterate over none
itr = st.Iterator(bz("money"), nil)
i = 0
for ; itr.Valid(); itr.Next() {
i += 1
}
assert.Equal(t, 0, i)
// iterate over lower
itr = st.Iterator(keyFmt(0), keyFmt(3))
i = 0
for ; itr.Valid(); itr.Next() {
k, v := itr.Key(), itr.Value()
assert.Equal(t, keyFmt(i), k)
assert.Equal(t, valFmt(i), v)
i += 1
}
assert.Equal(t, 3, i)
// iterate over upper
itr = st.Iterator(keyFmt(2), keyFmt(4))
i = 2
for ; itr.Valid(); itr.Next() {
k, v := itr.Key(), itr.Value()
assert.Equal(t, keyFmt(i), k)
assert.Equal(t, valFmt(i), v)
i += 1
}
assert.Equal(t, 4, i)
}
func TestCacheKVMergeIteratorBasics(t *testing.T) {
st := newCacheKVStore()
// set and delete an item in the cache, iterator should be empty
k, v := keyFmt(0), valFmt(0)
st.Set(k, v)
st.Delete(k)
assertIterateDomain(t, st, 0)
// now set it and assert its there
st.Set(k, v)
assertIterateDomain(t, st, 1)
// write it and assert its there
st.Write()
assertIterateDomain(t, st, 1)
// remove it in cache and assert its not
st.Delete(k)
assertIterateDomain(t, st, 0)
// write the delete and assert its not there
st.Write()
assertIterateDomain(t, st, 0)
// add two keys and assert theyre there
k1, v1 := keyFmt(1), valFmt(1)
st.Set(k, v)
st.Set(k1, v1)
assertIterateDomain(t, st, 2)
// write it and assert theyre there
st.Write()
assertIterateDomain(t, st, 2)
// remove one in cache and assert its not
st.Delete(k1)
assertIterateDomain(t, st, 1)
// write the delete and assert its not there
st.Write()
assertIterateDomain(t, st, 1)
// delete the other key in cache and asserts its empty
st.Delete(k)
assertIterateDomain(t, st, 0)
}
func TestCacheKVMergeIteratorDeleteLast(t *testing.T) {
st := newCacheKVStore()
// set some items and write them
nItems := 5
for i := 0; i < nItems; i++ {
st.Set(keyFmt(i), valFmt(i))
}
st.Write()
// set some more items and leave dirty
for i := nItems; i < nItems*2; i++ {
st.Set(keyFmt(i), valFmt(i))
}
// iterate over all of them
assertIterateDomain(t, st, nItems*2)
// delete them all
for i := 0; i < nItems*2; i++ {
last := nItems*2 - 1 - i
st.Delete(keyFmt(last))
assertIterateDomain(t, st, last)
}
}
func TestCacheKVMergeIteratorDeletes(t *testing.T) {
st := newCacheKVStore()
truth := dbm.NewMemDB()
// set some items and write them
nItems := 10
for i := 0; i < nItems; i++ {
doOp(st, truth, opSet, i)
}
st.Write()
// delete every other item, starting from 0
for i := 0; i < nItems; i += 2 {
doOp(st, truth, opDel, i)
assertIterateDomainCompare(t, st, truth)
}
// reset
st = newCacheKVStore()
truth = dbm.NewMemDB()
// set some items and write them
for i := 0; i < nItems; i++ {
doOp(st, truth, opSet, i)
}
st.Write()
// delete every other item, starting from 1
for i := 1; i < nItems; i += 2 {
doOp(st, truth, opDel, i)
assertIterateDomainCompare(t, st, truth)
}
}
func TestCacheKVMergeIteratorChunks(t *testing.T) {
st := newCacheKVStore()
// Use the truth to check values on the merge iterator
truth := dbm.NewMemDB()
// sets to the parent
setRange(st, truth, 0, 20)
setRange(st, truth, 40, 60)
st.Write()
// sets to the cache
setRange(st, truth, 20, 40)
setRange(st, truth, 60, 80)
assertIterateDomainCheck(t, st, truth, []keyRange{{0, 80}})
// remove some parents and some cache
deleteRange(st, truth, 15, 25)
assertIterateDomainCheck(t, st, truth, []keyRange{{0, 15}, {25, 80}})
// remove some parents and some cache
deleteRange(st, truth, 35, 45)
assertIterateDomainCheck(t, st, truth, []keyRange{{0, 15}, {25, 35}, {45, 80}})
// write, add more to the cache, and delete some cache
st.Write()
setRange(st, truth, 38, 42)
deleteRange(st, truth, 40, 43)
assertIterateDomainCheck(t, st, truth, []keyRange{{0, 15}, {25, 35}, {38, 40}, {45, 80}})
}
func TestCacheKVMergeIteratorRandom(t *testing.T) {
st := newCacheKVStore()
truth := dbm.NewMemDB()
start, end := 25, 75
max := 100
setRange(st, truth, start, end)
// do an op, test the iterator
for i := 0; i < 2000; i++ {
doRandomOp(st, truth, max)
assertIterateDomainCompare(t, st, truth)
}
}
//-------------------------------------------------------------------------------------------
// do some random ops
const (
opSet = 0
opSetRange = 1
opDel = 2
opDelRange = 3
opWrite = 4
totalOps = 5 // number of possible operations
)
func randInt(n int) int {
return cmn.RandInt() % n
}
// useful for replaying a error case if we find one
func doOp(st CacheKVStore, truth dbm.DB, op int, args ...int) {
switch op {
case opSet:
k := args[0]
st.Set(keyFmt(k), valFmt(k))
truth.Set(keyFmt(k), valFmt(k))
case opSetRange:
start := args[0]
end := args[1]
setRange(st, truth, start, end)
case opDel:
k := args[0]
st.Delete(keyFmt(k))
truth.Delete(keyFmt(k))
case opDelRange:
start := args[0]
end := args[1]
deleteRange(st, truth, start, end)
case opWrite:
st.Write()
}
}
func doRandomOp(st CacheKVStore, truth dbm.DB, maxKey int) {
r := randInt(totalOps)
switch r {
case opSet:
k := randInt(maxKey)
st.Set(keyFmt(k), valFmt(k))
truth.Set(keyFmt(k), valFmt(k))
case opSetRange:
start := randInt(maxKey - 2)
end := randInt(maxKey-start) + start
setRange(st, truth, start, end)
case opDel:
k := randInt(maxKey)
st.Delete(keyFmt(k))
truth.Delete(keyFmt(k))
case opDelRange:
start := randInt(maxKey - 2)
end := randInt(maxKey-start) + start
deleteRange(st, truth, start, end)
case opWrite:
st.Write()
}
}
//-------------------------------------------------------------------------------------------
// iterate over whole domain
func assertIterateDomain(t *testing.T, st KVStore, expectedN int) {
itr := st.Iterator(nil, nil)
var i = 0
for ; itr.Valid(); itr.Next() {
k, v := itr.Key(), itr.Value()
assert.Equal(t, keyFmt(i), k)
assert.Equal(t, valFmt(i), v)
i += 1
}
assert.Equal(t, expectedN, i)
}
func assertIterateDomainCheck(t *testing.T, st KVStore, mem dbm.DB, r []keyRange) {
// iterate over each and check they match the other
itr := st.Iterator(nil, nil)
itr2 := mem.Iterator(nil, nil) // ground truth
krc := newKeyRangeCounter(r)
i := 0
for ; krc.valid(); krc.next() {
assert.True(t, itr.Valid())
assert.True(t, itr2.Valid())
// check the key/val matches the ground truth
k, v := itr.Key(), itr.Value()
k2, v2 := itr2.Key(), itr2.Value()
assert.Equal(t, k, k2)
assert.Equal(t, v, v2)
// check they match the counter
assert.Equal(t, k, keyFmt(krc.key()))
itr.Next()
itr2.Next()
i += 1
}
assert.False(t, itr.Valid())
assert.False(t, itr2.Valid())
}
func assertIterateDomainCompare(t *testing.T, st KVStore, mem dbm.DB) {
// iterate over each and check they match the other
itr := st.Iterator(nil, nil)
itr2 := mem.Iterator(nil, nil) // ground truth
checkIterators(t, itr, itr2)
checkIterators(t, itr2, itr)
}
func checkIterators(t *testing.T, itr, itr2 Iterator) {
for ; itr.Valid(); itr.Next() {
assert.True(t, itr2.Valid())
k, v := itr.Key(), itr.Value()
k2, v2 := itr2.Key(), itr2.Value()
assert.Equal(t, k, k2)
assert.Equal(t, v, v2)
itr2.Next()
}
assert.False(t, itr.Valid())
assert.False(t, itr2.Valid())
}
//--------------------------------------------------------
func setRange(st KVStore, mem dbm.DB, start, end int) {
for i := start; i < end; i++ {
st.Set(keyFmt(i), valFmt(i))
mem.Set(keyFmt(i), valFmt(i))
}
}
func deleteRange(st KVStore, mem dbm.DB, start, end int) {
for i := start; i < end; i++ {
st.Delete(keyFmt(i))
mem.Delete(keyFmt(i))
}
}
//--------------------------------------------------------
type keyRange struct {
start int
end int
}
func (kr keyRange) len() int {
return kr.end - kr.start
}
func newKeyRangeCounter(kr []keyRange) *keyRangeCounter {
return &keyRangeCounter{keyRanges: kr}
}
// we can iterate over this and make sure our real iterators have all the right keys
type keyRangeCounter struct {
rangeIdx int
idx int
keyRanges []keyRange
}
func (krc *keyRangeCounter) valid() bool {
maxRangeIdx := len(krc.keyRanges) - 1
maxRange := krc.keyRanges[maxRangeIdx]
// if we're not in the max range, we're valid
if krc.rangeIdx <= maxRangeIdx &&
krc.idx < maxRange.len() {
return true
}
return false
}
func (krc *keyRangeCounter) next() {
thisKeyRange := krc.keyRanges[krc.rangeIdx]
if krc.idx == thisKeyRange.len()-1 {
krc.rangeIdx += 1
krc.idx = 0
} else {
krc.idx += 1
}
}
func (krc *keyRangeCounter) key() int {
thisKeyRange := krc.keyRanges[krc.rangeIdx]
return thisKeyRange.start + krc.idx
}
//--------------------------------------------------------
func bz(s string) []byte { return []byte(s) }

View File

@ -1,6 +1,8 @@
package store
import "bytes"
import (
"bytes"
)
// cacheMergeIterator merges a parent Iterator and a cache Iterator.
// The cache iterator may return nil keys to signal that an item
@ -46,15 +48,7 @@ func (iter *cacheMergeIterator) Domain() (start, end []byte) {
// Valid implements Iterator.
func (iter *cacheMergeIterator) Valid() bool {
// If parent is valid, this is valid.
if iter.parent.Valid() {
return true
}
// Otherwise depends on child.
iter.skipCacheDeletes(nil)
return iter.cache.Valid()
return iter.skipUntilExistsOrInvalid()
}
// Next implements Iterator
@ -76,8 +70,7 @@ func (iter *cacheMergeIterator) Next() {
// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()
cmp := iter.compare(keyP, keyC)
switch cmp {
switch iter.compare(keyP, keyC) {
case -1: // parent < cache
iter.parent.Next()
case 0: // parent == cache
@ -148,10 +141,10 @@ func (iter *cacheMergeIterator) Value() []byte {
}
}
// Release implements Iterator
func (iter *cacheMergeIterator) Release() {
iter.parent.Release()
iter.cache.Release()
// Close implements Iterator
func (iter *cacheMergeIterator) Close() {
iter.parent.Close()
iter.cache.Close()
}
// Like bytes.Compare but opposite if not ascending.
@ -164,44 +157,46 @@ func (iter *cacheMergeIterator) compare(a, b []byte) int {
}
// Skip all delete-items from the cache w/ `key < until`. After this function,
// current item is a non-delete-item, or `until <= key`.
// If the current item is not a delete item, does noting.
// If `until` is nil, there is no limit.
// current cache item is a non-delete-item, or `until <= key`.
// If the current cache item is not a delete item, does nothing.
// If `until` is nil, there is no limit, and cache may end up invalid.
// CONTRACT: cache is valid.
func (iter *cacheMergeIterator) skipCacheDeletes(until []byte) {
for (until == nil || iter.compare(iter.cache.Key(), until) < 0) &&
iter.cache.Value() == nil {
for iter.cache.Valid() &&
iter.cache.Value() == nil &&
(until == nil || iter.compare(iter.cache.Key(), until) < 0) {
iter.cache.Next()
if !iter.cache.Valid() {
return
}
}
}
// Fast forwards cache (or parent+cache in case of deleted items) until current
// item exists, or until iterator becomes invalid.
func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() {
// Returns whether the iterator is valid.
func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() bool {
for {
// Invalid.
if !iter.Valid() {
return
// If parent is invalid, fast-forward cache.
if !iter.parent.Valid() {
iter.skipCacheDeletes(nil)
return iter.cache.Valid()
}
// Parent is valid.
// Parent and Cache items exist.
keyP, keyC := iter.parent.Key(), iter.cache.Key()
cmp := iter.compare(keyP, keyC)
switch cmp {
if !iter.cache.Valid() {
return true
}
// Parent is valid, cache is valid.
// parent < cache
case -1:
// Compare parent and cache.
keyP := iter.parent.Key()
keyC := iter.cache.Key()
switch iter.compare(keyP, keyC) {
// Parent exists.
return
case -1: // parent < cache.
return true
// parent == cache
case 0:
case 0: // parent == cache.
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
@ -210,11 +205,11 @@ func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() {
iter.cache.Next()
continue
}
// Child shadows parent.
return
// Cache is not a delete.
// parent > cache
case 1:
return true // cache exists.
case 1: // cache < parent
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
@ -222,8 +217,9 @@ func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() {
iter.skipCacheDeletes(keyP)
continue
}
// Child exists.
return
// Cache is not a delete.
return true // cache exists.
}
}
}

View File

@ -8,7 +8,7 @@ func First(st KVStore, start, end []byte) (kv KVPair, ok bool) {
if !iter.Valid() {
return kv, false
}
defer iter.Release()
defer iter.Close()
return KVPair{iter.Key(), iter.Value()}, true
}
@ -23,7 +23,7 @@ func Last(st KVStore, start, end []byte) (kv KVPair, ok bool) {
return kv, false
}
}
defer iter.Release()
defer iter.Close()
if bytes.Equal(iter.Key(), end) {
// Skip this one, end is exclusive.

View File

@ -234,8 +234,8 @@ func (iter *iavlIterator) Value() []byte {
return iter.value
}
// Release implements Iterator
func (iter *iavlIterator) Release() {
// Close implements Iterator
func (iter *iavlIterator) Close() {
close(iter.quitCh)
}

View File

@ -1,5 +1,9 @@
package store
import (
dbm "github.com/tendermint/tmlibs/db"
)
// Iterates over iterKVCache items.
// if key is nil, means it was deleted.
// Implements Iterator.
@ -9,10 +13,17 @@ type memIterator struct {
}
func newMemIterator(start, end []byte, items []KVPair) *memIterator {
itemsInDomain := make([]KVPair, 0)
for _, item := range items {
ascending := keyCompare(start, end) < 0
if dbm.IsKeyInDomain(item.Key, start, end, !ascending) {
itemsInDomain = append(itemsInDomain, item)
}
}
return &memIterator{
start: start,
end: end,
items: items,
items: itemsInDomain,
}
}
@ -45,7 +56,7 @@ func (mi *memIterator) Value() []byte {
return mi.items[0].Value
}
func (mi *memIterator) Release() {
func (mi *memIterator) Close() {
mi.start = nil
mi.end = nil
mi.items = nil

View File

@ -1,6 +1,8 @@
package store
import (
"bytes"
"github.com/tendermint/go-wire/data"
"github.com/tendermint/tmlibs/db"
)
@ -128,3 +130,14 @@ type CommitID struct {
func (cid CommitID) IsZero() bool {
return cid.Version == 0 && len(cid.Hash) == 0
}
// bytes.Compare but bounded on both sides by nil.
// both (k1, nil) and (nil, k2) return -1
func keyCompare(k1, k2 []byte) int {
if k1 == nil && k2 == nil {
return 0
} else if k1 == nil || k2 == nil {
return -1
}
return bytes.Compare(k1, k2)
}