NodeDB & Copy() added back to Tree

This commit is contained in:
Jae Kwon 2014-10-11 20:39:13 -07:00
parent cd4ef5d01f
commit 02d1e7853e
4 changed files with 105 additions and 56 deletions

View File

@ -123,6 +123,7 @@ func (node *IAVLNode) getByIndex(t *IAVLTree, index uint64) (key interface{}, va
} }
} }
// NOTE: sets hashes recursively
func (node *IAVLNode) hashWithCount(t *IAVLTree) ([]byte, uint64) { func (node *IAVLNode) hashWithCount(t *IAVLTree) ([]byte, uint64) {
if node.hash != nil { if node.hash != nil {
return node.hash, 0 return node.hash, 0
@ -138,6 +139,8 @@ func (node *IAVLNode) hashWithCount(t *IAVLTree) ([]byte, uint64) {
return node.hash, hashCount + 1 return node.hash, hashCount + 1
} }
// NOTE: sets hashes recursively
// NOTE: clears leftNode/rightNode recursively
func (node *IAVLNode) save(t *IAVLTree) []byte { func (node *IAVLNode) save(t *IAVLTree) []byte {
if node.hash == nil { if node.hash == nil {
node.hash, _ = node.hashWithCount(t) node.hash, _ = node.hashWithCount(t)
@ -157,7 +160,7 @@ func (node *IAVLNode) save(t *IAVLTree) []byte {
} }
// save node // save node
t.saveNode(node) t.ndb.SaveNode(t, node)
return node.hash return node.hash
} }
@ -247,6 +250,7 @@ func (node *IAVLNode) remove(t *IAVLTree, key interface{}) (
} }
} }
// NOTE: sets hashes recursively
func (node *IAVLNode) writeToCountHashes(t *IAVLTree, w io.Writer) (n int64, hashCount uint64, err error) { func (node *IAVLNode) writeToCountHashes(t *IAVLTree, w io.Writer) (n int64, hashCount uint64, err error) {
// height & size & key // height & size & key
WriteUInt8(w, node.height, &n, &err) WriteUInt8(w, node.height, &n, &err)
@ -288,7 +292,7 @@ func (node *IAVLNode) getLeftNode(t *IAVLTree) *IAVLNode {
if node.leftNode != nil { if node.leftNode != nil {
return node.leftNode return node.leftNode
} else { } else {
return t.getNode(node.leftHash) return t.ndb.GetNode(t, node.leftHash)
} }
} }
@ -296,7 +300,7 @@ func (node *IAVLNode) getRightNode(t *IAVLTree) *IAVLNode {
if node.rightNode != nil { if node.rightNode != nil {
return node.rightNode return node.rightNode
} else { } else {
return t.getNode(node.rightHash) return t.ndb.GetNode(t, node.rightHash)
} }
} }
@ -328,6 +332,7 @@ func (node *IAVLNode) rotateLeft(t *IAVLTree) *IAVLNode {
return sr return sr
} }
// NOTE: mutates height and size
func (node *IAVLNode) calcHeightAndSize(t *IAVLTree) { func (node *IAVLNode) calcHeightAndSize(t *IAVLTree) {
node.height = maxUint8(node.getLeftNode(t).height, node.getRightNode(t).height) + 1 node.height = maxUint8(node.getLeftNode(t).height, node.getRightNode(t).height) + 1
node.size = node.getLeftNode(t).size + node.getRightNode(t).size node.size = node.getLeftNode(t).size + node.getRightNode(t).size

View File

@ -231,7 +231,8 @@ func TestPersistence(t *testing.T) {
hash, _ := t1.HashWithCount() hash, _ := t1.HashWithCount()
// Load a tree // Load a tree
t2 := LoadIAVLTreeFromHash(BasicCodec, BasicCodec, 0, db, hash) t2 := NewIAVLTree(BasicCodec, BasicCodec, 0, db)
t2.Load(hash)
for key, value := range records { for key, value := range records {
_, t2value := t2.Get(key) _, t2value := t2.Get(key)
if t2value != value { if t2value != value {

View File

@ -3,49 +3,62 @@ package merkle
import ( import (
"bytes" "bytes"
"container/list" "container/list"
"sync"
. "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/db" . "github.com/tendermint/tendermint/db"
) )
const defaultCacheCapacity = 1000 // TODO make configurable.
/* /*
Immutable AVL Tree (wraps the Node root) Immutable AVL Tree (wraps the Node root)
This tree is not goroutine safe.
This tree is not concurrency safe.
You must wrap your calls with your own mutex.
*/ */
type IAVLTree struct { type IAVLTree struct {
keyCodec Codec keyCodec Codec
valueCodec Codec valueCodec Codec
root *IAVLNode root *IAVLNode
ndb *nodeDB
// Cache
cache map[string]nodeElement
cacheSize int
queue *list.List
// Persistence
db DB
} }
func NewIAVLTree(keyCodec, valueCodec Codec, cacheSize int, db DB) *IAVLTree { func NewIAVLTree(keyCodec, valueCodec Codec, cacheSize int, db DB) *IAVLTree {
return &IAVLTree{ if db == nil {
keyCodec: keyCodec, // In-memory IAVLTree
valueCodec: valueCodec, return &IAVLTree{
root: nil, keyCodec: keyCodec,
cache: make(map[string]nodeElement), valueCodec: valueCodec,
cacheSize: cacheSize, }
queue: list.New(), } else {
db: db, // Persistent IAVLTree
return &IAVLTree{
keyCodec: keyCodec,
valueCodec: valueCodec,
ndb: newNodeDB(cacheSize, db),
}
} }
} }
func LoadIAVLTreeFromHash(keyCodec, valueCodec Codec, cacheSize int, db DB, hash []byte) *IAVLTree { // The returned tree and the original tree are goroutine independent.
t := NewIAVLTree(keyCodec, valueCodec, cacheSize, db) // That is, they can each run in their own goroutine.
t.root = t.getNode(hash) func (t *IAVLTree) Copy() *IAVLTree {
return t if t.ndb != nil && !t.root.persisted {
panic("It is unsafe to Copy() an unpersisted tree.")
// Saving a tree finalizes all the nodes.
// It sets all the hashes recursively,
// clears all the leftNode/rightNode values recursively,
// and all the .persisted flags get set.
// On the other hand, in-memory trees (ndb == nil)
// don't mutate
} else if t.ndb == nil && t.root.hash == nil {
panic("An in-memory IAVLTree must be hashed first")
// An in-memory IAVLTree is finalized when the hashes are
// calculated.
}
return &IAVLTree{
keyCodec: t.keyCodec,
valueCodec: t.valueCodec,
root: t.root,
ndb: t.ndb,
}
} }
func (t *IAVLTree) Size() uint64 { func (t *IAVLTree) Size() uint64 {
@ -100,6 +113,10 @@ func (t *IAVLTree) Save() []byte {
return t.root.save(t) return t.root.save(t)
} }
func (t *IAVLTree) Load(hash []byte) {
t.root = t.ndb.GetNode(t, hash)
}
func (t *IAVLTree) Get(key interface{}) (index uint64, value interface{}) { func (t *IAVLTree) Get(key interface{}) (index uint64, value interface{}) {
if t.root == nil { if t.root == nil {
return 0, nil return 0, nil
@ -123,36 +140,59 @@ func (t *IAVLTree) Remove(key interface{}) (value interface{}, removed bool) {
return nil, false return nil, false
} }
if newRoot == nil && newRootHash != nil { if newRoot == nil && newRootHash != nil {
t.root = t.getNode(newRootHash) t.root = t.ndb.GetNode(t, newRootHash)
} else { } else {
t.root = newRoot t.root = newRoot
} }
return value, true return value, true
} }
func (t *IAVLTree) Checkpoint() interface{} { func (t *IAVLTree) Iterate(fn func(key interface{}, value interface{}) bool) (stopped bool) {
return t.root return t.root.traverse(t, func(node *IAVLNode) bool {
if node.height == 0 {
return fn(node.key, node.value)
} else {
return false
}
})
} }
func (t *IAVLTree) Restore(checkpoint interface{}) { //-----------------------------------------------------------------------------
t.root = checkpoint.(*IAVLNode)
}
type nodeElement struct { type nodeElement struct {
node *IAVLNode node *IAVLNode
elem *list.Element elem *list.Element
} }
func (t *IAVLTree) getNode(hash []byte) *IAVLNode { type nodeDB struct {
mtx sync.Mutex
cache map[string]nodeElement
cacheSize int
cacheQueue *list.List
db DB
}
func newNodeDB(cacheSize int, db DB) *nodeDB {
return &nodeDB{
cache: make(map[string]nodeElement),
cacheSize: cacheSize,
cacheQueue: list.New(),
db: db,
}
}
func (ndb *nodeDB) GetNode(t *IAVLTree, hash []byte) *IAVLNode {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
// Check the cache. // Check the cache.
nodeElem, ok := t.cache[string(hash)] nodeElem, ok := ndb.cache[string(hash)]
if ok { if ok {
// Already exists. Move to back of queue. // Already exists. Move to back of cacheQueue.
t.queue.MoveToBack(nodeElem.elem) ndb.cacheQueue.MoveToBack(nodeElem.elem)
return nodeElem.node return nodeElem.node
} else { } else {
// Doesn't exist, load. // Doesn't exist, load.
buf := t.db.Get(hash) buf := ndb.db.Get(hash)
r := bytes.NewReader(buf) r := bytes.NewReader(buf)
var n int64 var n int64
var err error var err error
@ -161,30 +201,21 @@ func (t *IAVLTree) getNode(hash []byte) *IAVLNode {
panic(err) panic(err)
} }
node.persisted = true node.persisted = true
t.cacheNode(node) ndb.cacheNode(node)
return node return node
} }
} }
func (t *IAVLTree) cacheNode(node *IAVLNode) { func (ndb *nodeDB) SaveNode(t *IAVLTree, node *IAVLNode) {
// Create entry in cache and append to queue. ndb.mtx.Lock()
elem := t.queue.PushBack(node.hash) defer ndb.mtx.Unlock()
t.cache[string(node.hash)] = nodeElement{node, elem}
// Maybe expire an item.
if t.queue.Len() > t.cacheSize {
hash := t.queue.Remove(t.queue.Front()).([]byte)
delete(t.cache, string(hash))
}
}
func (t *IAVLTree) saveNode(node *IAVLNode) {
if node.hash == nil { if node.hash == nil {
panic("Expected to find node.hash, but none found.") panic("Expected to find node.hash, but none found.")
} }
if node.persisted { if node.persisted {
panic("Shouldn't be calling save on an already persisted node.") panic("Shouldn't be calling save on an already persisted node.")
} }
if _, ok := t.cache[string(node.hash)]; ok { if _, ok := ndb.cache[string(node.hash)]; ok {
panic("Shouldn't be calling save on an already cached node.") panic("Shouldn't be calling save on an already cached node.")
} }
// Save node bytes to db // Save node bytes to db
@ -193,7 +224,18 @@ func (t *IAVLTree) saveNode(node *IAVLNode) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
t.db.Set(node.hash, buf.Bytes()) ndb.db.Set(node.hash, buf.Bytes())
node.persisted = true node.persisted = true
t.cacheNode(node) ndb.cacheNode(node)
}
func (ndb *nodeDB) cacheNode(node *IAVLNode) {
// Create entry in cache and append to cacheQueue.
elem := ndb.cacheQueue.PushBack(node.hash)
ndb.cache[string(node.hash)] = nodeElement{node, elem}
// Maybe expire an item.
if ndb.cacheQueue.Len() > ndb.cacheSize {
hash := ndb.cacheQueue.Remove(ndb.cacheQueue.Front()).([]byte)
delete(ndb.cache, string(hash))
}
} }

View File

@ -13,6 +13,7 @@ type Tree interface {
Save() (hash []byte) Save() (hash []byte)
Checkpoint() (checkpoint interface{}) Checkpoint() (checkpoint interface{})
Restore(checkpoint interface{}) Restore(checkpoint interface{})
Iterate(func(key interface{}, value interface{}) (stop bool)) (stopped bool)
} }
type Hashable interface { type Hashable interface {