iavl tree LRU cache

This commit is contained in:
Jae Kwon 2014-10-06 00:15:37 -07:00
parent 877dd02e28
commit 839301b03c
7 changed files with 113 additions and 116 deletions

View File

@ -2,31 +2,27 @@ package common
import (
"container/heap"
"sync"
)
/*
Example usage:
func main() {
h := NewHeap()
h.Push(String("msg1"), 1)
h.Push(String("msg3"), 3)
h.Push(String("msg2"), 2)
fmt.Println(h.Pop())
fmt.Println(h.Pop())
fmt.Println(h.Pop())
}
*/
type Comparable interface {
Less(o interface{}) bool
}
//-----------------------------------------------------------------------------
/*
Example usage:
h := NewHeap()
h.Push(String("msg1"), 1)
h.Push(String("msg3"), 3)
h.Push(String("msg2"), 2)
fmt.Println(h.Pop())
fmt.Println(h.Pop())
fmt.Println(h.Pop())
*/
type Heap struct {
pq priorityQueue
}
@ -57,45 +53,6 @@ func (h *Heap) Pop() interface{} {
//-----------------------------------------------------------------------------
type CHeap struct {
mtx sync.Mutex
pq priorityQueue
}
func NewCHeap() *CHeap {
return &CHeap{pq: make([]*pqItem, 0)}
}
func (h *CHeap) Len() int64 {
h.mtx.Lock()
defer h.mtx.Unlock()
return int64(len(h.pq))
}
func (h *CHeap) Push(value interface{}, priority Comparable) {
h.mtx.Lock()
defer h.mtx.Unlock()
heap.Push(&h.pq, &pqItem{value: value, priority: priority})
}
func (h *CHeap) Peek() interface{} {
h.mtx.Lock()
defer h.mtx.Unlock()
if len(h.pq) == 0 {
return nil
}
return h.pq[0].value
}
func (h *CHeap) Pop() interface{} {
h.mtx.Lock()
defer h.mtx.Unlock()
item := heap.Pop(&h.pq).(*pqItem)
return item.value
}
//-----------------------------------------------------------------------------
///////////////////////
// From: http://golang.org/pkg/container/heap/#example__priorityQueue

View File

@ -38,25 +38,25 @@ func ReadIAVLNode(r io.Reader, n *int64, err *error) *IAVLNode {
node := &IAVLNode{}
// node header & key
node.height = ReadUInt8(r, &n, &err)
node.size = ReadUInt64(r, &n, &err)
node.key = ReadByteSlice(r, &n, &err)
if err != nil {
panic(err)
node.height = ReadUInt8(r, n, err)
node.size = ReadUInt64(r, n, err)
node.key = ReadByteSlice(r, n, err)
if *err != nil {
panic(*err)
}
// node value or children.
if node.height == 0 {
// value
node.value = ReadByteSlice(r, &n, &err)
node.value = ReadByteSlice(r, n, err)
} else {
// left
node.leftHash = ReadByteSlice(r, &n, &err)
node.leftHash = ReadByteSlice(r, n, err)
// right
node.rightHash = ReadByteSlice(r, &n, &err)
node.rightHash = ReadByteSlice(r, n, err)
}
if err != nil {
panic(err)
if *err != nil {
panic(*err)
}
return node
}
@ -134,8 +134,7 @@ func (self *IAVLNode) HashWithCount() ([]byte, uint64) {
func (self *IAVLNode) Save(ndb *IAVLNodeDB) []byte {
if self.hash == nil {
hash, _ := self.HashWithCount()
self.hash = hash
self.hash, _ = self.HashWithCount()
}
if self.persisted {
return self.hash
@ -197,7 +196,8 @@ func (self *IAVLNode) set(ndb *IAVLNodeDB, key []byte, value []byte) (_ *IAVLNod
// newKey: new leftmost leaf key for tree after successfully removing 'key' if changed.
// only one of newSelfHash or newSelf is returned.
func (self *IAVLNode) remove(ndb *IAVLNodeDB, key []byte) (newSelfHash []byte, newSelf *IAVLNode, newKey []byte, value []byte, err error) {
func (self *IAVLNode) remove(ndb *IAVLNodeDB, key []byte) (
newSelfHash []byte, newSelf *IAVLNode, newKey []byte, value []byte, err error) {
if self.height == 0 {
if bytes.Equal(self.key, key) {
return nil, nil, nil, self.value, nil
@ -258,14 +258,14 @@ func (self *IAVLNode) saveToCountHashes(w io.Writer) (n int64, hashCount uint64,
} else {
// left
if self.leftCached != nil {
leftHash, leftCount := self.left.HashWithCount()
leftHash, leftCount := self.leftCached.HashWithCount()
self.leftHash = leftHash
hashCount += leftCount
}
WriteByteSlice(w, self.leftHash, &n, &err)
// right
if self.rightCached != nil {
rightHash, rightCount := self.right.HashWithCount()
rightHash, rightCount := self.rightCached.HashWithCount()
self.rightHash = rightHash
hashCount += rightCount
}
@ -278,7 +278,7 @@ func (self *IAVLNode) getLeft(ndb *IAVLNodeDB) *IAVLNode {
if self.leftCached != nil {
return self.leftCached
} else {
return ndb.Get(leftHash)
return ndb.Get(self.leftHash)
}
}
@ -286,7 +286,7 @@ func (self *IAVLNode) getRight(ndb *IAVLNodeDB) *IAVLNode {
if self.rightCached != nil {
return self.rightCached
} else {
return ndb.Get(rightHash)
return ndb.Get(self.rightHash)
}
}

View File

@ -37,9 +37,9 @@ func TestUnit(t *testing.T) {
}
n := &IAVLNode{
key: right.lmd(nil).key,
left: left,
right: right,
key: right.lmd(nil).key,
leftCached: left,
rightCached: right,
}
n.calcHeightAndSize(nil)
n.HashWithCount()
@ -52,7 +52,7 @@ func TestUnit(t *testing.T) {
if n.height == 0 {
return fmt.Sprintf("%v", n.key[0])
} else {
return fmt.Sprintf("(%v %v)", P(n.left), P(n.right))
return fmt.Sprintf("(%v %v)", P(n.leftCached), P(n.rightCached))
}
}
@ -86,7 +86,7 @@ func TestUnit(t *testing.T) {
}
expectRemove := func(n *IAVLNode, i int, repr string, hashCount uint64) {
n2, _, value, err := n.remove(nil, []byte{byte(i)})
_, n2, _, value, err := n.remove(nil, []byte{byte(i)})
// ensure node was added & structure is as expected.
if value != nil || err != nil || P(n2) != repr {
t.Fatalf("Removing %v from %v:\nExpected %v\nUnexpectedly got %v value:%v err:%v",
@ -223,7 +223,7 @@ func TestPersistence(t *testing.T) {
hash, _ := t1.HashWithCount()
// Load a tree
t2 := NewIAVLTreeFromHash(db, hash)
t2 := LoadIAVLTreeFromHash(db, hash)
for key, value := range records {
t2value := t2.Get([]byte(key))
if string(t2value) != value {

View File

@ -1,6 +1,11 @@
package merkle
const HASH_BYTE_SIZE int = 4 + 32
import (
"bytes"
"container/list"
)
const defaultCacheCapacity = 1000 // TODO make configurable.
/*
Immutable AVL Tree (wraps the Node root)
@ -9,19 +14,19 @@ This tree is not concurrency safe.
You must wrap your calls with your own mutex.
*/
type IAVLTree struct {
ndb IAVLNodeDB
ndb *IAVLNodeDB
root *IAVLNode
}
func NewIAVLTree(db DB) *IAVLTree {
return &IAVLTree{
ndb: NewIAVLNodeDB(db),
ndb: NewIAVLNodeDB(defaultCacheCapacity, db),
root: nil,
}
}
func LoadIAVLTreeFromHash(db DB, hash []byte) *IAVLTree {
ndb := NewIAVLNodeDB(db)
ndb := NewIAVLNodeDB(defaultCacheCapacity, db)
root := ndb.Get(hash)
if root == nil {
return nil
@ -82,15 +87,6 @@ func (t *IAVLTree) Save() {
t.root.Save(t.ndb)
}
func (t *IAVLTree) SaveKey(key string) {
if t.root == nil {
return
}
hash, _ := t.root.HashWithCount()
t.root.Save(t.ndb)
t.ndb.Set([]byte(key), hash)
}
func (t *IAVLTree) Get(key []byte) (value []byte) {
if t.root == nil {
return nil
@ -120,37 +116,78 @@ func (t *IAVLTree) Copy() Tree {
//-----------------------------------------------------------------------------
type nodeElement struct {
node *IAVLNode
elem *list.Element
}
type IAVLNodeDB struct {
db DB
cache map[string]*IAVLNode
// XXX expire entries
capacity int
db DB
cache map[string]nodeElement
queue *list.List
}
func NewIAVLNodeDB(capacity int, db DB) *IAVLNodeDB {
return &IAVLNodeDB{
capacity: capacity,
db: db,
cache: make(map[string]nodeElement),
queue: list.New(),
}
}
func (ndb *IAVLNodeDB) Get(hash []byte) *IAVLNode {
buf := ndb.db.Get(hash)
r := bytes.NewReader(buf)
var n int64
var err error
node := ReadIAVLNode(r, &n, &err)
if err != nil {
panic(err)
// Check the cache.
nodeElem, ok := ndb.cache[string(hash)]
if ok {
// Already exists. Move to back of queue.
ndb.queue.MoveToBack(nodeElem.elem)
return nodeElem.node
} else {
// Doesn't exist, load.
buf := ndb.db.Get(hash)
r := bytes.NewReader(buf)
var n int64
var err error
node := ReadIAVLNode(r, &n, &err)
if err != nil {
panic(err)
}
node.persisted = true
ndb.cacheNode(node)
return node
}
node.persisted = true
ndb.cache[string(hash)] = node
return node
}
func (ndb *IAVLNodeDB) Save(node *IAVLNode) {
hash := node.hash
if hash != nil {
if node.hash == nil {
panic("Expected to find node.hash, but none found.")
}
if node.persisted {
panic("Shouldn't be calling save on an already persisted node.")
}
if _, ok := ndb.cache[string(node.hash)]; ok {
panic("Shouldn't be calling save on an already cached node.")
}
// Save node bytes to db
buf := bytes.NewBuffer(nil)
_, err := self.WriteTo(buf)
_, err := node.WriteTo(buf)
if err != nil {
panic(err)
}
ndb.db.Set(node.hash, buf.Bytes())
node.persisted = true
ndb.cache[string(hash)] = node
ndb.db.Set(hash, buf.Bytes())
ndb.cacheNode(node)
}
func (ndb *IAVLNodeDB) cacheNode(node *IAVLNode) {
// Create entry in cache and append to queue.
elem := ndb.queue.PushBack(node.hash)
ndb.cache[string(node.hash)] = nodeElement{node, elem}
// Maybe expire an item.
if ndb.queue.Len() > ndb.capacity {
hash := ndb.queue.Remove(ndb.queue.Front()).([]byte)
delete(ndb.cache, string(hash))
}
}

View File

@ -17,7 +17,6 @@ type Tree interface {
HashWithCount() ([]byte, uint64)
Hash() []byte
Save()
SaveKey(string)
Set(key []byte, vlaue []byte) bool
Remove(key []byte) ([]byte, error)
Copy() Tree

View File

@ -239,14 +239,18 @@ func printIAVLNode(node *IAVLNode, indent int) {
indentPrefix += " "
}
if node.right != nil {
printIAVLNode(node.rightFilled(nil), indent+1)
if node.rightCached != nil {
printIAVLNode(node.rightCached, indent+1)
} else if node.rightHash != nil {
fmt.Printf("%s %X\n", indentPrefix, node.rightHash)
}
fmt.Printf("%s%v:%v\n", indentPrefix, node.key, node.height)
if node.left != nil {
printIAVLNode(node.leftFilled(nil), indent+1)
if node.leftCached != nil {
printIAVLNode(node.leftCached, indent+1)
} else if node.leftHash != nil {
fmt.Printf("%s %X\n", indentPrefix, node.leftHash)
}
}

View File

@ -68,7 +68,7 @@ func LoadState(db DB) *State {
s.commitTime = ReadTime(reader, &n, &err)
s.blockHash = ReadByteSlice(reader, &n, &err)
accountsMerkleRoot := ReadByteSlice(reader, &n, &err)
s.accounts = merkle.NewIAVLTreeFromHash(db, accountsMerkleRoot)
s.accounts = merkle.LoadIAVLTreeFromHash(db, accountsMerkleRoot)
var validators = map[uint64]*Validator{}
for reader.Len() > 0 {
validator := ReadValidator(reader, &n, &err)