Cleanup multistore

This commit is contained in:
Jae Kwon 2017-12-01 08:52:54 -08:00
parent ec31313bc2
commit 3474efc599
3 changed files with 228 additions and 146 deletions

70
store/cachemultistore.go Normal file
View File

@ -0,0 +1,70 @@
package store
import dbm "github.com/tendermint/tmlibs/db"
//----------------------------------------
// cacheMultiStore
type cwWriter interface {
Write()
}
// cacheMultiStore holds many CacheWrap'd stores.
// Implements MultiStore.
type cacheMultiStore struct {
db dbm.DB
version int64
lastCommitID CommitID
substores map[string]cwWriter
}
func newCacheMultiStore(rs *rootMultiStore) cacheMultiStore {
cms := cacheMultiStore{
db: db.CacheWrap(),
version: rs.version,
lastCommitID: rs.lastCommitID,
substores: make(map[string]cwwWriter), len(rs.substores),
}
for name, substore := range rs.substores {
cms.substores[name] = substore.CacheWrap().(cwWriter)
}
return cms
}
// Implements CacheMultiStore
func (cms cacheMultiStore) LastCommitID() CommitID {
return cms.lastCommitID
}
// Implements CacheMultiStore
func (cms cacheMultiStore) CurrentVersion() int64 {
return cms.version
}
// Implements CacheMultiStore
func (cms cacheMultiStore) Write() {
cms.db.Write()
for substore := range rs.substores {
substore.(cwWriter).Write()
}
}
// Implements CacheMultiStore
func (rs cacheMultiStore) CacheMultiStore() CacheMultiStore {
return newCacheMultiStore(rs)
}
// Implements CacheMultiStore
func (rs cacheMultiStore) GetCommitter(name string) Committer {
return rs.store[name]
}
// Implements CacheMultiStore
func (rs cacheMultiStore) GetKVStore(name string) KVStore {
return rs.store[name].(KVStore)
}
// Implements CacheMultiStore
func (rs cacheMultiStore) GetIterKVStore(name string) IterKVStore {
return rs.store[name].(IterKVStore)
}

View File

@ -11,12 +11,20 @@ import (
) )
const ( const (
msLatestKey = "s/latest" latestVersionKey = "s/latest"
msStateKeyFmt = "s/%d" // s/<version> commitStateKeyFmt = "s/%d" // s/<version>
) )
type MultiStore interface { type MultiStore interface {
// Last commit, or the zero CommitID.
// If not zero, CommitID.Version is CurrentVersion()-1.
LastCommitID() CommitID
// Current version being worked on now, not yet committed.
// Should be greater than 0.
CurrentVersion() int64
// Cache wrap MultiStore. // Cache wrap MultiStore.
// NOTE: Caller should probably not call .Write() on each, but // NOTE: Caller should probably not call .Write() on each, but
// call CacheMultiStore.Write(). // call CacheMultiStore.Write().
@ -36,10 +44,13 @@ type CacheMultiStore interface {
//---------------------------------------- //----------------------------------------
// rootMultiStore is composed of many Committers. // rootMultiStore is composed of many Committers.
// Name contrasts with cacheMultiStore which is for cache-wrapping
// other MultiStores.
// Implements MultiStore. // Implements MultiStore.
type rootMultiStore struct { type rootMultiStore struct {
db dbm.DB db dbm.DB
version int64 curVersion int64
lastHash []byte
storeLoaders map[string]CommitterLoader storeLoaders map[string]CommitterLoader
substores map[string]Committer substores map[string]Committer
} }
@ -47,7 +58,8 @@ type rootMultiStore struct {
func NewMultiStore(db dbm.DB) *rootMultiStore { func NewMultiStore(db dbm.DB) *rootMultiStore {
return &rootMultiStore{ return &rootMultiStore{
db: db, db: db,
version: 0, curVersion: 0,
lastHash: nil,
storeLoaders: make(map[string]CommitterLoader), storeLoaders: make(map[string]CommitterLoader),
substores: make(map[string]Committer), substores: make(map[string]Committer),
} }
@ -60,70 +72,15 @@ func (rs *rootMultiStore) SetCommitterLoader(name string, loader CommitterLoader
rs.storeLoaders[name] = loader rs.storeLoaders[name] = loader
} }
//----------------------------------------
// rootMultiStore state
type msState struct {
Substores []substore
}
func (ms *msState) Sort() {
ms.Substores.Sort()
}
func (ms *msState) Hash() []byte {
m := make(map[string]interface{}, len(ms.Substores))
for _, substore := range ms.Substores {
m[substore.name] = substore.subState
}
return merkle.SimpleHashFromMap(m)
}
//----------------------------------------
// substore state
type substore struct {
name string
subState
}
// This gets serialized by go-wire
type subState struct {
CommitID CommitID
// ... maybe add more state
}
func (ss subState) Hash() []byte {
ssBytes, _ := wire.Marshal(ss) // Does not error
hasher := ripemd160.New()
hasher.Write(ssBytes)
return hasher.Sum(nil)
}
//----------------------------------------
// Call once after all calls to SetCommitterLoader are complete. // Call once after all calls to SetCommitterLoader are complete.
func (rs *rootMultiStore) LoadLatestVersion() error { func (rs *rootMultiStore) LoadLatestVersion() error {
ver := rs.getLatestVersion() ver := getLatestVersion(rs.db)
rs.LoadVersion(ver) rs.LoadVersion(ver)
} }
func (rs *rootMultiStore) getLatestVersion() int64 {
var latest int64
latestBytes := rs.db.Get(msLatestKey)
if latestBytes == nil {
return 0
}
err := wire.Unmarshal(latestBytes, &latest)
if err != nil {
panic(err)
}
return latest
}
// NOTE: Returns 0 unless LoadVersion() or LoadLatestVersion() is called. // NOTE: Returns 0 unless LoadVersion() or LoadLatestVersion() is called.
func (rs *rootMultiStore) GetVersion() int64 { func (rs *rootMultiStore) GetCurrentVersion() int64 {
return rs.version return rs.curVersion
} }
func (rs *rootMultiStore) LoadVersion(ver int64) error { func (rs *rootMultiStore) LoadVersion(ver int64) error {
@ -131,26 +88,20 @@ func (rs *rootMultiStore) LoadVersion(ver int64) error {
// Special logic for version 0 // Special logic for version 0
if ver == 0 { if ver == 0 {
for name, storeLoader := range rs.storeLoaders { for name, storeLoader := range rs.storeLoaders {
store, err := storeLoader(CommitID{Version: 0}) store, err := storeLoader(CommitID{})
if err != nil { if err != nil {
return fmt.Errorf("Failed to load rootMultiStore: %v", err) return fmt.Errorf("Failed to load rootMultiStore: %v", err)
} }
rs.curVersion = 1
rs.lastHash = nil
rs.substores[name] = store rs.substores[name] = store
} }
return nil return nil
} }
// Otherwise, version is 1 or greater // Otherwise, version is 1 or greater
msStateKey := fmt.Sprintf(msStateKeyFmt, ver) // Load commitState
stateBytes := rs.db.Get(msStateKey, ver) var state commitState = loadCommitState(rs.db, ver)
if bz == nil {
return fmt.Errorf("Failed to load rootMultiStore: no data")
}
var state msState
err := wire.Unmarshal(stateBytes, &state)
if err != nil {
return fmt.Errorf("Failed to load rootMultiStore: %v", err)
}
// Load each Substore // Load each Substore
var newSubstores = make(map[string]Committer) var newSubstores = make(map[string]Committer)
@ -175,42 +126,86 @@ func (rs *rootMultiStore) LoadVersion(ver int64) error {
} }
// Success. // Success.
rs.version = ver rs.curVersion = ver + 1
rs.lastHash = state.LastHash
rs.substores = newSubstores rs.substores = newSubstores
return nil return nil
} }
// Implements Committer // Commits each substore and gets commitState.
func (rs *rootMultiStore) Commit() CommitID { func (rs *RootMultiStore) doCommit() commitState {
version := rs.curVersion
lastHash := rs.LastHash
substores := make([]substore, len(rs.substores))
// Needs to be transactional
batch := rs.db.NewBatch()
// Save msState
var state msState
for name, store := range rs.substores { for name, store := range rs.substores {
// Commit
commitID := store.Commit() commitID := store.Commit()
state.Substores = append(state.Substores,
subState{ // Record CommitID
substores = append(substores,
substore{
Name: name, Name: name,
CommitID: commitID, CommitID: commitID,
}, },
) )
} }
state.Sort()
// Incr curVersion
rs.curVersion += 1
return commitState{
Version: version,
LastHash: lastHash,
Substores: substores,
}
}
//----------------------------------------
// Implements Committer
func (rs *rootMultiStore) Commit() CommitID {
version := rs.version
// Needs to be transactional
batch := rs.db.NewBatch()
// Commit each substore and get commitState
state := rs.doCommit()
stateBytes, err := wire.Marshal(state) stateBytes, err := wire.Marshal(state)
if err != nil { if err != nil {
panic(err) panic(err)
} }
msStateKey := fmt.Sprintf(msStateKeyFmt, rs.version) commitStateKey := fmt.Sprintf(commitStateKeyFmt, rs.version)
batch.Set(msStateKey, stateBytes) batch.Set(commitStateKey, stateBytes)
// Save msLatest // Save the latest version
latestBytes, _ := wire.Marshal(rs.version) // Does not error latestBytes, _ := wire.Marshal(rs.version) // Does not error
batch.Set(msLatestKey, latestBytes) batch.Set(latestVersionKey, latestBytes)
batch.Write() batch.Write()
batch.version += 1 rs.version += 1
return CommitID{
Version: version,
Hash: state.Hash(),
}
}
// Get the last committed CommitID
func (rs *rootMultiStore) LastCommitID() CommitID {
// If we haven't committed yet, return a zero CommitID
if rs.curVersion == 0 {
return CommitID{}
}
return CommitID{
Version: rs.curVersion - 1,
Hash: rs.LastHash,
}
} }
// Implements MultiStore // Implements MultiStore
@ -234,74 +229,87 @@ func (rs *rootMultiStore) GetIterKVStore(name string) IterKVStore {
} }
//---------------------------------------- //----------------------------------------
// subStates // commitState
type subStates []subState // NOTE: Keep commitState a simple immutable struct.
type commitState struct {
func (ssz subStates) Len() int { return len(ssz) } // Version
func (ssz subStates) Less(i, j int) bool { return ssz[i].Key < ssz[j].Key } Version int64
func (ssz subStates) Swap(i, j int) { ssz[i], ssz[j] = ssz[j], ssz[i] }
func (ssz subStates) Sort() { sort.Sort(ssz) }
func (ssz subStates) Hash() []byte { // Last hash (memoization)
hz := make([]merkle.Hashable, len(ssz)) LastHash []byte
for i, ss := range ssz {
hz[i] = ss // Substore info for
Substores []substore
}
// loads commitState from disk.
func loadCommitState(db dbm.DB, ver int64) (commitState, error) {
// Load from DB.
commitStateKey := fmt.Sprintf(commitStateKeyFmt, ver)
stateBytes := db.Get(commitStateKey, ver)
if bz == nil {
return commitState{}, fmt.Errorf("Failed to load rootMultiStore: no data")
}
// Parse bytes.
var state commitState
err := wire.Unmarshal(stateBytes, &state)
if err != nil {
return commitState{}, fmt.Errorf("Failed to load rootMultiStore: %v", err)
}
return state, nil
}
func (cs commitState) Hash() []byte {
// TODO cache to cs.hash []byte
m := make(map[string]interface{}, len(cs.Substores))
for _, substore := range cs.Substores {
m[substore.Name] = substore
}
return merkle.SimpleHashFromMap(m)
}
func (cs commitState) CommitID() CommitID {
return CommitID{
Version: cs.Version,
Hash: cs.Hash(),
} }
return merkle.SimpleHashFromHashables(hz)
} }
//---------------------------------------- //----------------------------------------
// cacheMultiStore // substore state
type cwWriter interface { type substore struct {
Write() Name string
substoreCore
} }
// cacheMultiStore holds many CacheWrap'd stores. type substoreCore struct {
// Implements MultiStore. CommitID CommitID
type cacheMultiStore struct { // ... maybe add more state
db dbm.DB
version int64
substores map[string]cwWriter
} }
func newCacheMultiStore(rs *rootMultiStore) cacheMultiStore { func (sc substoreCore) Hash() []byte {
cms := cacheMultiStore{ scBytes, _ := wire.Marshal(sc) // Does not error
db: db.CacheWrap(), hasher := ripemd160.New()
version: rs.version, hasher.Write(scBytes)
substores: make(map[string]cwwWriter), len(rs.substores), return hasher.Sum(nil)
}
//----------------------------------------
func getLatestVersion(db dbm.DB) int64 {
var latest int64
latestBytes := db.Get(latestVersionKey)
if latestBytes == nil {
return 0
} }
for name, substore := range rs.substores { err := wire.Unmarshal(latestBytes, &latest)
cms.substores[name] = substore.CacheWrap().(cwWriter) if err != nil {
panic(err)
} }
return cms return latest
}
// Implements CacheMultiStore
func (cms cacheMultiStore) Write() {
cms.db.Write()
for substore := range rs.substores {
substore.(cwWriter).Write()
}
}
// Implements CacheMultiStore
func (rs cacheMultiStore) CacheMultiStore() CacheMultiStore {
return newCacheMultiStore(rs)
}
// Implements CacheMultiStore
func (rs cacheMultiStore) GetCommitter(name string) Committer {
return rs.store[name]
}
// Implements CacheMultiStore
func (rs cacheMultiStore) GetKVStore(name string) KVStore {
return rs.store[name].(KVStore)
}
// Implements CacheMultiStore
func (rs cacheMultiStore) GetIterKVStore(name string) IterKVStore {
return rs.store[name].(IterKVStore)
} }

View File

@ -5,10 +5,14 @@ import (
) )
type CommitID struct { type CommitID struct {
Version uint64 Version int64
Hash []byte Hash []byte
} }
type (cid CommitID) IsZero() bool {
return cid.Version == 0 && len(cid.Hash) == 0
}
type Committer interface { type Committer interface {
// Commit persists the state to disk. // Commit persists the state to disk.