iavlIterator using goroutine (#295)

This commit is contained in:
Jae Kwon 2017-12-10 00:24:55 -08:00 committed by GitHub
parent f4484ab3b5
commit 04fcc6193c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 182 additions and 32 deletions

2
glide.lock generated
View File

@ -136,7 +136,7 @@ imports:
- data - data
- data/base58 - data/base58
- name: github.com/tendermint/iavl - name: github.com/tendermint/iavl
version: 03e6c011329bce607eed69d60737c899519f1f70 version: ab22235a11524125a1df019e7b223d9797a88810
- name: github.com/tendermint/light-client - name: github.com/tendermint/light-client
version: 76313d625e662ed7b284d066d68ff71edd7a9fac version: 76313d625e662ed7b284d066d68ff71edd7a9fac
subpackages: subpackages:

View File

@ -1,20 +1,17 @@
package store package store
import ( import (
"path" "bytes"
"path/filepath" "sync"
"strings"
"github.com/pkg/errors"
"github.com/tendermint/iavl" "github.com/tendermint/iavl"
dbm "github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
) )
// NewIAVLStoreLoader returns a CommitStoreLoader that returns an iavlStore // NewIAVLStoreLoader returns a CommitStoreLoader that returns an iavlStore
func NewIAVLStoreLoader(dbName string, cacheSize int, numHistory int64) CommitStoreLoader { func NewIAVLStoreLoader(db dbm.DB, cacheSize int, numHistory int64) CommitStoreLoader {
l := iavlStoreLoader{ l := iavlStoreLoader{
dbName: dbName, db: db,
cacheSize: cacheSize, cacheSize: cacheSize,
numHistory: numHistory, numHistory: numHistory,
} }
@ -54,7 +51,7 @@ func (st *iavlStore) Commit() CommitID {
} }
// Release an old version of history // Release an old version of history
if st.numHistory < st.tree.Version() { if st.numHistory < st.tree.Version64() {
toRelease := version - st.numHistory toRelease := version - st.numHistory
st.tree.DeleteVersion(toRelease) st.tree.DeleteVersion(toRelease)
} }
@ -66,7 +63,12 @@ func (st *iavlStore) Commit() CommitID {
} }
// CacheWrap implements IterKVStore. // CacheWrap implements IterKVStore.
func (st *iavlStore) CacheWrap() CacheWriter { func (st *iavlStore) CacheWrap() CacheWrap {
return st.CacheIterKVStore()
}
// CacheKVStore implements IterKVStore.
func (st *iavlStore) CacheKVStore() CacheKVStore {
return st.CacheIterKVStore() return st.CacheIterKVStore()
} }
@ -101,68 +103,204 @@ func (st *iavlStore) Remove(key []byte) (prev []byte, removed bool) {
// Iterator implements IterKVStore. // Iterator implements IterKVStore.
func (st *iavlStore) Iterator(start, end []byte) Iterator { func (st *iavlStore) Iterator(start, end []byte) Iterator {
// XXX Create iavlIterator (without modifying tendermint/iavl) return newIAVLIterator(st.tree.Tree(), start, end, true)
return nil
} }
// ReverseIterator implements IterKVStore. // ReverseIterator implements IterKVStore.
func (st *iavlStore) ReverseIterator(start, end []byte) Iterator { func (st *iavlStore) ReverseIterator(start, end []byte) Iterator {
// XXX Create iavlIterator (without modifying tendermint/iavl) return newIAVLIterator(st.tree.Tree(), start, end, false)
return nil
} }
// First implements IterKVStore. // First implements IterKVStore.
func (is IAVLStore) First(start, end []byte) (kv KVPair, ok bool) { func (st *iavlStore) First(start, end []byte) (kv KVPair, ok bool) {
// XXX iter := st.Iterator(start, end)
return KVPair{}, false if !iter.Valid() {
return kv, false
}
defer iter.Release()
return KVPair{iter.Key(), iter.Value()}, true
} }
// Last implements IterKVStore. // Last implements IterKVStore.
func (is IAVLStore) Last(start, end []byte) (kv KVPair, ok bool) { func (st *iavlStore) Last(start, end []byte) (kv KVPair, ok bool) {
// XXX iter := st.ReverseIterator(end, start)
return KVPair{}, false if !iter.Valid() {
if v, ok := st.Get(start); ok {
return KVPair{cp(start), cp(v)}, true
} else {
return kv, false
}
}
defer iter.Release()
if bytes.Equal(iter.Key(), end) {
// Skip this one, end is exclusive.
iter.Next()
if !iter.Valid() {
return kv, false
}
}
return KVPair{iter.Key(), iter.Value()}, true
} }
//---------------------------------------- //----------------------------------------
type iavlIterator struct { type iavlIterator struct {
// TODO // Underlying store
tree *iavl.Tree
// Domain
start, end []byte
// Iteration order
ascending bool
// Channel to push iteration values.
iterCh chan KVPair
// Close this to release goroutine.
quitCh chan struct{}
// Close this to signal that state is initialized.
initCh chan struct{}
//----------------------------------------
// What follows are mutable state.
mtx sync.Mutex
invalid bool // True once, true forever
key []byte // The current key
value []byte // The current value
} }
var _ Iterator = (*iavlIterator)(nil) var _ Iterator = (*iavlIterator)(nil)
// newIAVLIterator will create a new iavlIterator.
// CONTRACT: Caller must release the iavlIterator, as each one creates a new
// goroutine.
func newIAVLIterator(t *iavl.Tree, start, end []byte, ascending bool) *iavlIterator {
itr := &iavlIterator{
tree: t,
start: cp(start),
end: cp(end),
ascending: ascending,
iterCh: make(chan KVPair, 0), // Set capacity > 0?
quitCh: make(chan struct{}),
initCh: make(chan struct{}),
}
go itr.iterateRoutine()
go itr.initRoutine()
return itr
}
// Run this to funnel items from the tree to iterCh.
func (ii *iavlIterator) iterateRoutine() {
ii.tree.IterateRange(
ii.start, ii.end, ii.ascending,
func(key, value []byte) bool {
select {
case <-ii.quitCh:
return true // done with iteration.
case ii.iterCh <- KVPair{key, value}:
return false // yay.
}
},
)
close(ii.iterCh) // done.
}
// Run this to fetch the first item.
func (ii *iavlIterator) initRoutine() {
ii.receiveNext()
close(ii.initCh)
}
// Domain implements Iterator // Domain implements Iterator
func (ii *iavlIterator) Domain() (start, end []byte) { func (ii *iavlIterator) Domain() (start, end []byte) {
// TODO return ii.start, ii.end
return nil, nil
} }
// Valid implements Iterator // Valid implements Iterator
func (ii *iavlIterator) Valid() bool { func (ii *iavlIterator) Valid() bool {
// TODO ii.waitInit()
return false ii.mtx.Lock()
defer ii.mtx.Unlock()
return !ii.invalid
} }
// Next implements Iterator // Next implements Iterator
func (ii *iavlIterator) Next() { func (ii *iavlIterator) Next() {
// TODO ii.waitInit()
ii.mtx.Lock()
defer ii.mtx.Unlock()
ii.assertIsValid()
ii.receiveNext()
} }
// Key implements Iterator // Key implements Iterator
func (ii *iavlIterator) Key() []byte { func (ii *iavlIterator) Key() []byte {
// TODO ii.waitInit()
return nil ii.mtx.Lock()
defer ii.mtx.Unlock()
ii.assertIsValid()
return ii.key
} }
// Value implements Iterator // Value implements Iterator
func (ii *iavlIterator) Value() []byte { func (ii *iavlIterator) Value() []byte {
// TODO ii.waitInit()
return nil ii.mtx.Lock()
defer ii.mtx.Unlock()
ii.assertIsValid()
return ii.value
} }
// Release implements Iterator // Release implements Iterator
func (ii *iavlIterator) Release() { func (ii *iavlIterator) Release() {
// TODO close(ii.quitCh)
}
//----------------------------------------
func (ii *iavlIterator) setNext(key, value []byte) {
ii.mtx.Lock()
defer ii.mtx.Unlock()
ii.assertIsValid()
ii.key = key
ii.value = value
}
func (ii *iavlIterator) setInvalid() {
ii.mtx.Lock()
defer ii.mtx.Unlock()
ii.assertIsValid()
ii.invalid = true
}
func (ii *iavlIterator) waitInit() {
<-ii.initCh
}
func (ii *iavlIterator) receiveNext() {
kvPair, ok := <-ii.iterCh
if ok {
ii.setNext(kvPair.Key, kvPair.Value)
} else {
ii.setInvalid()
}
}
func (ii *iavlIterator) assertIsValid() {
if ii.invalid {
panic("invalid iterator")
}
} }
//---------------------------------------- //----------------------------------------
@ -175,7 +313,7 @@ type iavlStoreLoader struct {
} }
// Load implements CommitLoader. // Load implements CommitLoader.
func (isl iavlLoader) Load(id CommitID) (CommitStore, error) { func (isl iavlStoreLoader) Load(id CommitID) (CommitStore, error) {
tree := iavl.NewVersionedTree(isl.db, isl.cacheSize) tree := iavl.NewVersionedTree(isl.db, isl.cacheSize)
err := tree.Load() err := tree.Load()
if err != nil { if err != nil {
@ -184,3 +322,11 @@ func (isl iavlLoader) Load(id CommitID) (CommitStore, error) {
store := newIAVLStore(tree, isl.numHistory) store := newIAVLStore(tree, isl.numHistory)
return store, nil return store, nil
} }
//----------------------------------------
func cp(bz []byte) (ret []byte) {
ret = make([]byte, len(bz))
copy(ret, bz)
return ret
}

View File

@ -79,7 +79,11 @@ type IterKVStore interface {
Iterator(start, end []byte) Iterator Iterator(start, end []byte) Iterator
ReverseIterator(start, end []byte) Iterator ReverseIterator(start, end []byte) Iterator
// Gets the first item.
First(start, end []byte) (kv KVPair, ok bool) First(start, end []byte) (kv KVPair, ok bool)
// Gets the last item (towards "end").
// End is exclusive.
Last(start, end []byte) (kv KVPair, ok bool) Last(start, end []byte) (kv KVPair, ok bool)
// CacheIterKVStore() wraps a thing with a cache. // CacheIterKVStore() wraps a thing with a cache.