event cache and fireable interace

This commit is contained in:
Ethan Buchman 2015-04-15 23:40:27 -07:00
parent a1c5e32d76
commit d27e0bbad5
12 changed files with 105 additions and 59 deletions

View File

@ -42,7 +42,7 @@ type BlockchainReactor struct {
quit chan struct{}
running uint32
evsw *events.EventSwitch
evsw events.Fireable
}
func NewBlockchainReactor(state *sm.State, store *BlockStore, sync bool) *BlockchainReactor {
@ -242,7 +242,7 @@ func (bcR *BlockchainReactor) BroadcastStatus() error {
}
// implements events.Eventable
func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) {
bcR.evsw = evsw
}

View File

@ -40,7 +40,7 @@ type ConsensusReactor struct {
blockStore *bc.BlockStore
conS *ConsensusState
evsw *events.EventSwitch
evsw events.Fireable
}
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore) *ConsensusReactor {
@ -234,9 +234,9 @@ func (conR *ConsensusReactor) ResetToState(state *sm.State) {
}
// implements events.Eventable
func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
conR.evsw = evsw
conR.conS.SetEventSwitch(evsw)
conR.conS.SetFireable(evsw)
}
//--------------------------------------

View File

@ -251,7 +251,8 @@ type ConsensusState struct {
stagedState *sm.State // Cache result of staged block.
lastCommitVoteHeight uint // Last called commitVoteBlock() or saveCommitVoteBlock() on.
evsw *events.EventSwitch
evsw events.Fireable
evc *events.EventCache // set in stageBlock and passed into state
}
func NewConsensusState(state *sm.State, blockStore *bc.BlockStore, mempoolReactor *mempl.MempoolReactor) *ConsensusState {
@ -443,9 +444,12 @@ ACTION_LOOP:
if cs.TryFinalizeCommit(rs.Height) {
// Now at new height
// cs.Step is at RoundStepNewHeight or RoundStepNewRound.
newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight)
cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock)
// TODO: go fire events from event cache
// fire some events!
go func() {
newBlock := cs.blockStore.LoadBlock(cs.state.LastBlockHeight)
cs.evsw.FireEvent(types.EventStringNewBlock(), newBlock)
cs.evc.Flush()
}()
scheduleNextAction()
continue ACTION_LOOP
} else {
@ -1032,6 +1036,9 @@ func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartS
// Create a copy of the state for staging
stateCopy := cs.state.Copy()
// reset the event cache and pass it into the state
cs.evc = events.NewEventCache(cs.evsw)
stateCopy.SetFireable(cs.evc)
// Commit block onto the copied state.
// NOTE: Basic validation is done in state.AppendBlock().
@ -1117,9 +1124,8 @@ func (cs *ConsensusState) saveCommitVoteBlock(block *types.Block, blockParts *ty
}
// implements events.Eventable
func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) {
func (cs *ConsensusState) SetFireable(evsw events.Fireable) {
cs.evsw = evsw
cs.state.SetEventSwitch(evsw)
}
//-----------------------------------------------------------------------------

39
events/event_cache.go Normal file
View File

@ -0,0 +1,39 @@
package events
const (
eventsBufferSize = 1000
)
// An EventCache buffers events for a Fireable
// All events are cached. Filtering happens on Flush
type EventCache struct {
evsw Fireable
events []eventInfo
}
// Create a new EventCache with an EventSwitch as backend
func NewEventCache(evsw Fireable) *EventCache {
return &EventCache{
evsw: evsw,
events: make([]eventInfo, eventsBufferSize),
}
}
// a cached event
type eventInfo struct {
event string
msg interface{}
}
// Cache an event to be fired upon finality.
func (evc *EventCache) FireEvent(event string, msg interface{}) {
// append to list
evc.events = append(evc.events, eventInfo{event, msg})
}
// Fire events by running evsw.FireEvent on all cached events. Blocks.
func (evc *EventCache) Flush() {
for _, ei := range evc.events {
evc.evsw.FireEvent(ei.event, ei.msg)
}
}

View File

@ -8,7 +8,12 @@ import (
// reactors and other modules should export
// this interface to become eventable
type Eventable interface {
SetEventSwitch(*EventSwitch)
SetFireable(Fireable)
}
// an event switch or cache implements fireable
type Fireable interface {
FireEvent(event string, msg interface{})
}
type EventSwitch struct {

View File

@ -42,7 +42,7 @@ func (mem *Mempool) GetCache() *sm.BlockCache {
func (mem *Mempool) AddTx(tx types.Tx) (err error) {
mem.mtx.Lock()
defer mem.mtx.Unlock()
err = sm.ExecTx(mem.cache, tx, false, false)
err = sm.ExecTx(mem.cache, tx, false, nil)
if err != nil {
log.Debug("AddTx() error", "tx", tx, "error", err)
return err
@ -93,7 +93,7 @@ func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) {
// Next, filter all txs that aren't valid given new state.
validTxs := []types.Tx{}
for _, tx := range txs {
err := sm.ExecTx(mem.cache, tx, false, false)
err := sm.ExecTx(mem.cache, tx, false, nil)
if err == nil {
log.Debug("Filter in, valid", "tx", tx)
validTxs = append(validTxs, tx)

View File

@ -24,7 +24,7 @@ type MempoolReactor struct {
Mempool *Mempool
evsw *events.EventSwitch
evsw events.Fireable
}
func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
@ -114,7 +114,7 @@ func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
}
// implements events.Eventable
func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (memR *MempoolReactor) SetFireable(evsw events.Fireable) {
memR.evsw = evsw
}

View File

@ -81,7 +81,7 @@ func NewNode() *Node {
// add the event switch to all services
// they should all satisfy events.Eventable
SetEventSwitch(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
SetFireable(eventSwitch, pexReactor, bcReactor, mempoolReactor, consensusReactor)
return &Node{
sw: sw,
@ -115,9 +115,9 @@ func (n *Node) Stop() {
}
// Add the event switch to reactors, mempool, etc.
func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
for _, e := range eventables {
e.SetEventSwitch(evsw)
e.SetFireable(evsw)
}
}

View File

@ -33,7 +33,7 @@ type PEXReactor struct {
book *AddrBook
evsw *events.EventSwitch
evsw events.Fireable
}
func NewPEXReactor(book *AddrBook) *PEXReactor {
@ -211,7 +211,7 @@ func (pexR *PEXReactor) ensurePeers() {
}
// implements events.Eventable
func (pexR *PEXReactor) SetEventSwitch(evsw *events.EventSwitch) {
func (pexR *PEXReactor) SetFireable(evsw events.Fireable) {
pexR.evsw = evsw
}

View File

@ -6,6 +6,7 @@ import (
"github.com/tendermint/tendermint/account"
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/vm"
)
@ -13,7 +14,7 @@ import (
// NOTE: If an error occurs during block execution, state will be left
// at an invalid state. Copy the state before calling ExecBlock!
func ExecBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader) error {
err := execBlock(s, block, blockPartsHeader, true)
err := execBlock(s, block, blockPartsHeader)
if err != nil {
return err
}
@ -29,7 +30,7 @@ func ExecBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeade
// executes transactions of a block, does not check block.StateHash
// NOTE: If an error occurs during block execution, state will be left
// at an invalid state. Copy the state before calling execBlock!
func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader, fireEvents bool) error {
func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeader) error {
// Basic block validation.
err := block.ValidateBasic(s.LastBlockHeight, s.LastBlockHash, s.LastBlockParts, s.LastBlockTime)
if err != nil {
@ -111,7 +112,7 @@ func execBlock(s *State, block *types.Block, blockPartsHeader types.PartSetHeade
// Commit each tx
for _, tx := range block.Data.Txs {
err := ExecTx(blockCache, tx, true, fireEvents)
err := ExecTx(blockCache, tx, true, s.evc)
if err != nil {
return InvalidTxError{tx, err}
}
@ -291,13 +292,11 @@ func adjustByOutputs(accounts map[string]*account.Account, outs []*types.TxOutpu
// If the tx is invalid, an error will be returned.
// Unlike ExecBlock(), state will not be altered.
func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) error {
func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall bool, evc events.Fireable) error {
// TODO: do something with fees
fees := uint64(0)
_s := blockCache.State() // hack to access validators and event switch.
nilSwitch := _s.evsw == nil
fireEvents = fireEvents && !nilSwitch
// Exec tx
switch tx := tx_.(type) {
@ -328,16 +327,14 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro
blockCache.UpdateAccount(acc)
}
// If we're in a block (not mempool),
// fire event on all inputs and outputs
// see types/events.go for spec
if fireEvents {
// if the evc is nil, nothing will happen
if evc != nil {
for _, i := range tx.Inputs {
_s.evsw.FireEvent(types.EventStringAccInput(i.Address), tx)
evc.FireEvent(types.EventStringAccInput(i.Address), tx)
}
for _, o := range tx.Outputs {
_s.evsw.FireEvent(types.EventStringAccOutput(o.Address), tx)
evc.FireEvent(types.EventStringAccOutput(o.Address), tx)
}
}
return nil
@ -427,7 +424,7 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro
txCache.UpdateAccount(caller) // because we adjusted by input above, and bumped nonce maybe.
txCache.UpdateAccount(callee) // because we adjusted by input above.
vmach := vm.NewVM(txCache, params, caller.Address, account.HashSignBytes(tx))
vmach.SetEventSwitch(_s.evsw)
vmach.SetFireable(_s.evc)
// NOTE: Call() transfers the value from caller to callee iff call succeeds.
ret, err := vmach.Call(caller, callee, code, tx.Data, value, &gas)
@ -451,12 +448,11 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro
// Create a receipt from the ret and whether errored.
log.Info("VM call complete", "caller", caller, "callee", callee, "return", ret, "err", err)
if fireEvents {
// Fire Events for sender and receiver
// a separate event will be fired from vm for each
_s.evsw.FireEvent(types.EventStringAccInput(tx.Input.Address), types.EventMsgCallTx{tx, ret, exception})
_s.evsw.FireEvent(types.EventStringAccReceive(tx.Address), types.EventMsgCallTx{tx, ret, exception})
// Fire Events for sender and receiver
// a separate event will be fired from vm for each additional call
if evc != nil {
evc.FireEvent(types.EventStringAccInput(tx.Input.Address), types.EventMsgCallTx{tx, ret, exception})
evc.FireEvent(types.EventStringAccReceive(tx.Address), types.EventMsgCallTx{tx, ret, exception})
}
} else {
// The mempool does not call txs until
@ -525,8 +521,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro
if !added {
panic("Failed to add validator")
}
if fireEvents {
_s.evsw.FireEvent(types.EventStringBond(), tx)
if evc != nil {
evc.FireEvent(types.EventStringBond(), tx)
}
return nil
@ -550,8 +546,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro
// Good!
_s.unbondValidator(val)
if fireEvents {
_s.evsw.FireEvent(types.EventStringUnbond(), tx)
if evc != nil {
evc.FireEvent(types.EventStringUnbond(), tx)
}
return nil
@ -575,8 +571,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro
// Good!
_s.rebondValidator(val)
if fireEvents {
_s.evsw.FireEvent(types.EventStringRebond(), tx)
if evc != nil {
evc.FireEvent(types.EventStringRebond(), tx)
}
return nil
@ -621,8 +617,8 @@ func ExecTx(blockCache *BlockCache, tx_ types.Tx, runCall, fireEvents bool) erro
// Good! (Bad validator!)
_s.destroyValidator(accused)
if fireEvents {
_s.evsw.FireEvent(types.EventStringDupeout(), tx)
if evc != nil {
evc.FireEvent(types.EventStringDupeout(), tx)
}
return nil

View File

@ -36,7 +36,7 @@ type State struct {
accounts merkle.Tree // Shouldn't be accessed directly.
validatorInfos merkle.Tree // Shouldn't be accessed directly.
evsw *events.EventSwitch
evc events.Fireable // typically an events.EventCache
}
func LoadState(db dbm.DB) *State {
@ -101,7 +101,6 @@ func (s *State) Copy() *State {
UnbondingValidators: s.UnbondingValidators.Copy(), // copy the valSet lazily.
accounts: s.accounts.Copy(),
validatorInfos: s.validatorInfos.Copy(),
evsw: s.evsw,
}
}
@ -119,7 +118,8 @@ func (s *State) Hash() []byte {
// Mutates the block in place and updates it with new state hash.
func (s *State) SetBlockStateHash(block *types.Block) error {
sCopy := s.Copy()
err := execBlock(sCopy, block, types.PartSetHeader{}, false) // don't fire events
// sCopy has no event cache in it, so this won't fire events
err := execBlock(sCopy, block, types.PartSetHeader{})
if err != nil {
return err
}
@ -268,9 +268,9 @@ func (s *State) LoadStorage(hash []byte) (storage merkle.Tree) {
// State.storage
//-------------------------------------
// implements events.Eventable
func (s *State) SetEventSwitch(evsw *events.EventSwitch) {
s.evsw = evsw
// Implements events.Eventable. Typically uses events.EventCache
func (s *State) SetFireable(evc events.Fireable) {
s.evc = evc
}
//-----------------------------------------------------------------------------

View File

@ -49,7 +49,7 @@ type VM struct {
callDepth int
evsw *events.EventSwitch
evc events.Fireable
}
func NewVM(appState AppState, params Params, origin Word256, txid []byte) *VM {
@ -63,8 +63,8 @@ func NewVM(appState AppState, params Params, origin Word256, txid []byte) *VM {
}
// satisfies events.Eventable
func (vm *VM) SetEventSwitch(evsw *events.EventSwitch) {
vm.evsw = evsw
func (vm *VM) SetFireable(evc events.Fireable) {
vm.evc = evc
}
// CONTRACT appState is aware of caller and callee, so we can just mutate them.
@ -93,8 +93,8 @@ func (vm *VM) Call(caller, callee *Account, code, input []byte, value uint64, ga
}
// if callDepth is 0 the event is fired from ExecTx (along with the Input event)
// otherwise, we fire from here.
if vm.callDepth != 0 && vm.evsw != nil {
vm.evsw.FireEvent(types.EventStringAccReceive(callee.Address.Prefix(20)), types.EventMsgCall{
if vm.callDepth != 0 && vm.evc != nil {
vm.evc.FireEvent(types.EventStringAccReceive(callee.Address.Prefix(20)), types.EventMsgCall{
&types.CallData{caller.Address.Prefix(20), callee.Address.Prefix(20), input, value, *gas},
vm.origin.Prefix(20),
vm.txid,