Plugin interface methods take store

This commit is contained in:
Jae Kwon 2016-05-01 13:52:08 -07:00
parent fefcbbf3b0
commit 324e72f36d
7 changed files with 170 additions and 159 deletions

View File

@ -3,7 +3,7 @@ package app
import ( import (
"strings" "strings"
"github.com/tendermint/basecoin/state" sm "github.com/tendermint/basecoin/state"
"github.com/tendermint/basecoin/types" "github.com/tendermint/basecoin/types"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
@ -28,19 +28,21 @@ const (
type Basecoin struct { type Basecoin struct {
eyesCli *eyes.Client eyesCli *eyes.Client
govMint *gov.Governmint govMint *gov.Governmint
state *state.State state *sm.State
cacheState *sm.State
plugins *types.Plugins plugins *types.Plugins
} }
func NewBasecoin(eyesCli *eyes.Client) *Basecoin { func NewBasecoin(eyesCli *eyes.Client) *Basecoin {
govMint := gov.NewGovernmint(eyesCli) govMint := gov.NewGovernmint()
state_ := state.NewState(eyesCli) state := sm.NewState(eyesCli)
plugins := types.NewPlugins() plugins := types.NewPlugins()
plugins.RegisterPlugin(PluginTypeByteGov, PluginNameGov, govMint) plugins.RegisterPlugin(PluginTypeByteGov, PluginNameGov, govMint)
return &Basecoin{ return &Basecoin{
eyesCli: eyesCli, eyesCli: eyesCli,
govMint: govMint, govMint: govMint,
state: state_, state: state,
cacheState: nil,
plugins: plugins, plugins: plugins,
} }
} }
@ -59,7 +61,7 @@ func (app *Basecoin) SetOption(key string, value string) (log string) {
if plugin == nil { if plugin == nil {
return "Invalid plugin name: " + PluginName return "Invalid plugin name: " + PluginName
} }
return plugin.SetOption(key, value) return plugin.SetOption(app.state, key, value)
} else { } else {
// Set option on basecoin // Set option on basecoin
switch key { switch key {
@ -92,7 +94,7 @@ func (app *Basecoin) AppendTx(txBytes []byte) (res tmsp.Result) {
return tmsp.ErrBaseEncodingError.AppendLog("Error decoding tx: " + err.Error()) return tmsp.ErrBaseEncodingError.AppendLog("Error decoding tx: " + err.Error())
} }
// Validate and exec tx // Validate and exec tx
res = state.ExecTx(app.state, app.plugins, tx, false, nil) res = sm.ExecTx(app.state, app.plugins, tx, false, nil)
if res.IsErr() { if res.IsErr() {
return res.PrependLog("Error in AppendTx") return res.PrependLog("Error in AppendTx")
} }
@ -111,7 +113,7 @@ func (app *Basecoin) CheckTx(txBytes []byte) (res tmsp.Result) {
return tmsp.ErrBaseEncodingError.AppendLog("Error decoding tx: " + err.Error()) return tmsp.ErrBaseEncodingError.AppendLog("Error decoding tx: " + err.Error())
} }
// Validate tx // Validate tx
res = state.ExecTx(app.state, app.plugins, tx, true, nil) res = sm.ExecTx(app.cacheState, app.plugins, tx, true, nil)
if res.IsErr() { if res.IsErr() {
return res.PrependLog("Error in CheckTx") return res.PrependLog("Error in CheckTx")
} }
@ -130,8 +132,6 @@ func (app *Basecoin) Query(query []byte) (res tmsp.Result) {
return tmsp.OK.SetLog("This type of query not yet supported") return tmsp.OK.SetLog("This type of query not yet supported")
case PluginTypeByteEyes: case PluginTypeByteEyes:
return app.eyesCli.QuerySync(query) return app.eyesCli.QuerySync(query)
case PluginTypeByteGov:
return app.govMint.Query(query)
} }
return tmsp.ErrBaseUnknownPlugin.SetLog( return tmsp.ErrBaseUnknownPlugin.SetLog(
Fmt("Unknown plugin with type byte %X", typeByte)) Fmt("Unknown plugin with type byte %X", typeByte))
@ -139,14 +139,7 @@ func (app *Basecoin) Query(query []byte) (res tmsp.Result) {
// TMSP::Commit // TMSP::Commit
func (app *Basecoin) Commit() (res tmsp.Result) { func (app *Basecoin) Commit() (res tmsp.Result) {
// First, commit all the plugins // Commit eyes.
for _, plugin := range app.plugins.GetList() {
res = plugin.Commit()
if res.IsErr() {
PanicSanity(Fmt("Error committing plugin %v", plugin.Name))
}
}
// Then, commit eyes.
res = app.eyesCli.CommitSync() res = app.eyesCli.CommitSync()
if res.IsErr() { if res.IsErr() {
PanicSanity("Error getting hash: " + res.Error()) PanicSanity("Error getting hash: " + res.Error())
@ -157,32 +150,23 @@ func (app *Basecoin) Commit() (res tmsp.Result) {
// TMSP::InitChain // TMSP::InitChain
func (app *Basecoin) InitChain(validators []*tmsp.Validator) { func (app *Basecoin) InitChain(validators []*tmsp.Validator) {
for _, plugin := range app.plugins.GetList() { for _, plugin := range app.plugins.GetList() {
if _, ok := plugin.Plugin.(tmsp.BlockchainAware); ok { plugin.Plugin.InitChain(app.state, validators)
plugin.Plugin.(tmsp.BlockchainAware).InitChain(validators)
}
} }
} }
// TMSP::BeginBlock // TMSP::BeginBlock
func (app *Basecoin) BeginBlock(height uint64) { func (app *Basecoin) BeginBlock(height uint64) {
app.state.ResetCacheState()
for _, plugin := range app.plugins.GetList() { for _, plugin := range app.plugins.GetList() {
if _, ok := plugin.Plugin.(tmsp.BlockchainAware); ok { plugin.Plugin.BeginBlock(app.state, height)
plugin.Plugin.(tmsp.BlockchainAware).BeginBlock(height)
}
} }
app.cacheState = app.state.CacheWrap()
} }
// TMSP::EndBlock // TMSP::EndBlock
func (app *Basecoin) EndBlock(height uint64) (vals []*tmsp.Validator) { func (app *Basecoin) EndBlock(height uint64) (diffs []*tmsp.Validator) {
for _, plugin := range app.plugins.GetList() { for _, plugin := range app.plugins.GetList() {
if plugin.Plugin == app.govMint { moreDiffs := plugin.Plugin.EndBlock(app.state, height)
vals = plugin.Plugin.(tmsp.BlockchainAware).EndBlock(height) diffs = append(diffs, moreDiffs...)
} else {
if _, ok := plugin.Plugin.(tmsp.BlockchainAware); ok {
plugin.Plugin.(tmsp.BlockchainAware).EndBlock(height)
}
}
} }
return return
} }

View File

@ -8,20 +8,11 @@ import (
) )
// If the tx is invalid, a TMSP error will be returned. // If the tx is invalid, a TMSP error will be returned.
func ExecTx(s *State, pgz *types.Plugins, tx types.Tx, isCheckTx bool, evc events.Fireable) tmsp.Result { func ExecTx(state *State, pgz *types.Plugins, tx types.Tx, isCheckTx bool, evc events.Fireable) tmsp.Result {
// TODO: do something with fees // TODO: do something with fees
fees := types.Coins{} fees := types.Coins{}
chainID := s.GetChainID() chainID := state.GetChainID()
// Get the state. If isCheckTx, then we use a cache.
// The idea is to throw away this cache after every EndBlock().
var state types.AccountGetterSetter
if isCheckTx {
state = s.GetCheckCache()
} else {
state = s
}
// Exec tx // Exec tx
switch tx := tx.(type) { switch tx := tx.(type) {
@ -129,15 +120,16 @@ func ExecTx(s *State, pgz *types.Plugins, tx types.Tx, isCheckTx bool, evc event
} }
// Create inAcc checkpoint // Create inAcc checkpoint
inAccCopy := inAcc.Copy() inAccDeducted := inAcc.Copy()
// Run the tx. // Run the tx.
cache := types.NewAccountCache(state) // XXX cache := types.NewStateCache(state)
cache := state.CacheWrap()
cache.SetAccount(tx.Input.Address, inAcc) cache.SetAccount(tx.Input.Address, inAcc)
ctx := types.NewCallContext(cache, inAcc, coins) ctx := types.NewCallContext(tx.Input.Address, coins)
res = plugin.RunTx(ctx, tx.Data) res = plugin.RunTx(cache, ctx, tx.Data)
if res.IsOK() { if res.IsOK() {
cache.Sync() cache.CacheSync()
log.Info("Successful execution") log.Info("Successful execution")
// Fire events // Fire events
/* /*
@ -153,10 +145,10 @@ func ExecTx(s *State, pgz *types.Plugins, tx types.Tx, isCheckTx bool, evc event
} else { } else {
log.Info("AppTx failed", "error", res) log.Info("AppTx failed", "error", res)
// Just return the coins and return. // Just return the coins and return.
inAccCopy.Balance = inAccCopy.Balance.Plus(coins) inAccDeducted.Balance = inAccDeducted.Balance.Plus(coins)
// But take the gas // But take the gas
// TODO // TODO
state.SetAccount(tx.Input.Address, inAccCopy) state.SetAccount(tx.Input.Address, inAccDeducted)
} }
return res return res

View File

@ -4,26 +4,21 @@ import (
"github.com/tendermint/basecoin/types" "github.com/tendermint/basecoin/types"
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
eyes "github.com/tendermint/merkleeyes/client"
) )
// CONTRACT: State should be quick to copy.
// See CacheWrap().
type State struct { type State struct {
chainID string chainID string
eyesCli *eyes.Client store types.KVStore
checkCache *types.AccountCache cache *types.KVCache // optional
LastBlockHeight uint64
LastBlockHash []byte
GasLimit int64
} }
func NewState(eyesCli *eyes.Client) *State { func NewState(store types.KVStore) *State {
s := &State{ return &State{
chainID: "", chainID: "",
eyesCli: eyesCli, store: store,
} }
s.checkCache = types.NewAccountCache(s)
return s
} }
func (s *State) SetChainID(chainID string) { func (s *State) SetChainID(chainID string) {
@ -37,36 +32,34 @@ func (s *State) GetChainID() string {
return s.chainID return s.chainID
} }
func (s *State) Get(key []byte) (value []byte) {
return s.store.Get(key)
}
func (s *State) Set(key []byte, value []byte) {
s.store.Set(key, value)
}
func (s *State) GetAccount(addr []byte) *types.Account { func (s *State) GetAccount(addr []byte) *types.Account {
res := s.eyesCli.GetSync(AccountKey(addr)) return GetAccount(s.store, addr)
if res.IsErr() {
panic(Fmt("Error loading account addr %X error: %v", addr, res.Error()))
}
if len(res.Data) == 0 {
return nil
}
var acc *types.Account
err := wire.ReadBinaryBytes(res.Data, &acc)
if err != nil {
panic(Fmt("Error reading account %X error: %v", res.Data, err.Error()))
}
return acc
} }
func (s *State) SetAccount(addr []byte, acc *types.Account) { func (s *State) SetAccount(addr []byte, acc *types.Account) {
accBytes := wire.BinaryBytes(acc) SetAccount(s.store, addr, acc)
res := s.eyesCli.SetSync(AccountKey(addr), accBytes) }
if res.IsErr() {
panic(Fmt("Error storing account addr %X error: %v", addr, res.Error())) func (s *State) CacheWrap() *State {
cache := types.NewKVCache(s.store)
return &State{
chainID: s.chainID,
store: cache,
cache: cache,
} }
} }
func (s *State) GetCheckCache() *types.AccountCache { // NOTE: errors if s is not from CacheWrap()
return s.checkCache func (s *State) CacheSync() {
} s.cache.Sync()
func (s *State) ResetCacheState() {
s.checkCache = types.NewAccountCache(s)
} }
//---------------------------------------- //----------------------------------------
@ -74,3 +67,22 @@ func (s *State) ResetCacheState() {
func AccountKey(addr []byte) []byte { func AccountKey(addr []byte) []byte {
return append([]byte("base/a/"), addr...) return append([]byte("base/a/"), addr...)
} }
func GetAccount(store types.KVStore, addr []byte) *types.Account {
data := store.Get(AccountKey(addr))
if len(data) == 0 {
return nil
}
var acc *types.Account
err := wire.ReadBinaryBytes(data, &acc)
if err != nil {
panic(Fmt("Error reading account %X error: %v",
data, err.Error()))
}
return acc
}
func SetAccount(store types.KVStore, addr []byte, acc *types.Account) {
accBytes := wire.BinaryBytes(acc)
store.Set(AccountKey(addr), accBytes)
}

View File

@ -163,6 +163,7 @@ func testGov() {
} }
fmt.Println(res) fmt.Println(res)
// XXX Difficult to debug without a stacktrace...
// TODO more tests... // TODO more tests...
} }

View File

@ -1,57 +0,0 @@
package types
import (
"sort"
)
type AccountCache struct {
state AccountGetterSetter
accounts map[string]*Account
}
func NewAccountCache(state AccountGetterSetter) *AccountCache {
return &AccountCache{
state: state,
accounts: make(map[string]*Account),
}
}
func (cache *AccountCache) GetAccount(addr []byte) *Account {
acc, ok := cache.accounts[string(addr)]
if !ok {
acc = cache.state.GetAccount(addr)
cache.accounts[string(addr)] = acc
}
return acc
}
func (cache *AccountCache) SetAccount(addr []byte, acc *Account) {
cache.accounts[string(addr)] = acc
}
func (cache *AccountCache) Sync() {
// MUST BE DETERMINISTIC
// First, order the addrs.
addrs := []string{}
for addr := range cache.accounts {
addrs = append(addrs, string(addr))
}
sort.Strings(addrs)
// Set the accounts in order.
for _, addr := range addrs {
cache.state.SetAccount([]byte(addr), cache.accounts[addr])
}
// Reset accounts
cache.accounts = make(map[string]*Account)
}
//----------------------------------------
// NOT USED
type AccountCacher interface {
GetAccount(addr []byte) *Account
SetAccount(addr []byte, acc *Account)
Sync()
}

81
types/kvstore.go Normal file
View File

@ -0,0 +1,81 @@
package types
import (
"container/list"
)
type KVStore interface {
Set(key, value []byte)
Get(key []byte) (value []byte)
}
//----------------------------------------
type MemKVStore struct {
m map[string][]byte
}
func NewMemKVStore() *MemKVStore {
return &MemKVStore{
m: make(map[string][]byte, 0),
}
}
func (mkv *MemKVStore) Set(key []byte, value []byte) {
mkv.m[string(key)] = value
}
func (mkv *MemKVStore) Get(key []byte) (value []byte) {
return mkv.m[string(key)]
}
//----------------------------------------
// A Cache that enforces deterministic sync order.
type KVCache struct {
store KVStore
cache map[string]kvCacheValue
keys *list.List
}
type kvCacheValue struct {
v []byte // The value of some key
e *list.Element // The KVCache.keys element
}
func NewKVCache(store KVStore) *KVCache {
return (&KVCache{
store: store,
}).Reset()
}
func (kvc *KVCache) Reset() *KVCache {
kvc.cache = make(map[string]kvCacheValue)
kvc.keys = list.New()
return kvc
}
func (kvc *KVCache) Set(key []byte, value []byte) {
cacheValue, ok := kvc.cache[string(key)]
if ok {
kvc.keys.MoveToBack(cacheValue.e)
} else {
cacheValue.e = kvc.keys.PushBack(key)
}
cacheValue.v = value
kvc.cache[string(key)] = cacheValue
}
func (kvc *KVCache) Get(key []byte) (value []byte) {
cacheValue := kvc.cache[string(key)]
return cacheValue.v
}
func (kvc *KVCache) Sync() {
for e := kvc.keys.Front(); e != nil; e = e.Next() {
key := e.Value.([]byte)
value := kvc.cache[string(key)]
kvc.store.Set(key, value.v)
}
kvc.Reset()
}

View File

@ -4,12 +4,12 @@ import (
tmsp "github.com/tendermint/tmsp/types" tmsp "github.com/tendermint/tmsp/types"
) )
// Value is any floating value. It must be given to someone.
type Plugin interface { type Plugin interface {
SetOption(key string, value string) (log string) SetOption(store KVStore, key string, value string) (log string)
RunTx(ctx CallContext, txBytes []byte) (res tmsp.Result) RunTx(store KVStore, ctx CallContext, txBytes []byte) (res tmsp.Result)
Query(query []byte) (res tmsp.Result) InitChain(store KVStore, vals []*tmsp.Validator)
Commit() (res tmsp.Result) BeginBlock(store KVStore, height uint64)
EndBlock(store KVStore, height uint64) []*tmsp.Validator
} }
type NamedPlugin struct { type NamedPlugin struct {
@ -21,14 +21,12 @@ type NamedPlugin struct {
//---------------------------------------- //----------------------------------------
type CallContext struct { type CallContext struct {
Cache AccountCacher Caller []byte
Caller *Account
Coins Coins Coins Coins
} }
func NewCallContext(cache AccountCacher, caller *Account, coins Coins) CallContext { func NewCallContext(caller []byte, coins Coins) CallContext {
return CallContext{ return CallContext{
Cache: cache,
Caller: caller, Caller: caller,
Coins: coins, Coins: coins,
} }