diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 207e5d27..9a68ad88 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -8,7 +8,6 @@ import ( "time" . "github.com/tendermint/go-common" - "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/proxy" @@ -52,7 +51,7 @@ type BlockchainReactor struct { timeoutsCh chan string lastBlock *types.Block - evsw *events.EventSwitch + evsw types.EventSwitch } func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor { @@ -268,7 +267,7 @@ func (bcR *BlockchainReactor) BroadcastStatusRequest() error { } // implements events.Eventable -func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) { bcR.evsw = evsw } diff --git a/consensus/common.go b/consensus/common.go index ec1a0fa9..2e4a4dba 100644 --- a/consensus/common.go +++ b/consensus/common.go @@ -1,14 +1,14 @@ package consensus import ( - "github.com/tendermint/go-events" + "github.com/tendermint/tendermint/types" ) // NOTE: this is blocking -func subscribeToEvent(evsw *events.EventSwitch, receiver, eventID string, chanCap int) chan interface{} { +func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap int) chan interface{} { // listen for new round ch := make(chan interface{}, chanCap) - evsw.AddListenerForEvent(receiver, eventID, func(data events.EventData) { + types.AddListenerForEvent(evsw, receiver, eventID, func(data types.TMEventData) { ch <- data }) return ch diff --git a/consensus/common_test.go b/consensus/common_test.go index 3e05d256..12eeeb56 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -10,7 +10,6 @@ import ( cfg "github.com/tendermint/go-config" dbm "github.com/tendermint/go-db" - "github.com/tendermint/go-events" bc "github.com/tendermint/tendermint/blockchain" mempl "github.com/tendermint/tendermint/mempool" sm "github.com/tendermint/tendermint/state" @@ -338,7 +337,7 @@ func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Applic cs := NewConsensusState(config, state, proxyAppConnCon, blockStore, mempool) cs.SetPrivValidator(pv) - evsw := events.NewEventSwitch() + evsw := types.NewEventSwitch() cs.SetEventSwitch(evsw) evsw.Start() return cs diff --git a/consensus/reactor.go b/consensus/reactor.go index 49dc73c2..be29f18e 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -9,7 +9,6 @@ import ( "time" . "github.com/tendermint/go-common" - "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" @@ -34,7 +33,7 @@ type ConsensusReactor struct { blockStore *bc.BlockStore conS *ConsensusState fastSync bool - evsw *events.EventSwitch + evsw types.EventSwitch } func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor { @@ -225,7 +224,7 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) { } // implements events.Eventable -func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { conR.evsw = evsw conR.conS.SetEventSwitch(evsw) } @@ -236,12 +235,12 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { // broadcasting the result to peers func (conR *ConsensusReactor) registerEventCallbacks() { - conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data events.EventData) { + types.AddListenerForEvent(conR.evsw, "conR", types.EventStringNewRoundStep(), func(data types.TMEventData) { rs := data.(types.EventDataRoundState).RoundState.(*RoundState) conR.broadcastNewRoundStep(rs) }) - conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) { + types.AddListenerForEvent(conR.evsw, "conR", types.EventStringVote(), func(data types.TMEventData) { edv := data.(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote, edv.Index) }) diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 654cec9c..db7d7546 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -9,7 +9,6 @@ import ( "time" . "github.com/tendermint/go-common" - "github.com/tendermint/go-events" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" ) @@ -170,7 +169,7 @@ func TestReplayCrashBeforeWritePropose(t *testing.T) { func TestReplayCrashBeforeWritePrevote(t *testing.T) { cs, newBlockCh, voteMsg, f := setupReplayTest(5, false) // prevote - cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) { + types.AddListenerForEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), func(data types.TMEventData) { // Set LastSig var err error var msg ConsensusLogMessage @@ -187,7 +186,7 @@ func TestReplayCrashBeforeWritePrevote(t *testing.T) { func TestReplayCrashBeforeWritePrecommit(t *testing.T) { cs, newBlockCh, voteMsg, f := setupReplayTest(7, false) // precommit - cs.evsw.AddListenerForEvent("tester", types.EventStringPolka(), func(data events.EventData) { + types.AddListenerForEvent(cs.evsw, "tester", types.EventStringPolka(), func(data types.TMEventData) { // Set LastSig var err error var msg ConsensusLogMessage diff --git a/consensus/state.go b/consensus/state.go index 51fd8864..b393bc30 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -10,7 +10,6 @@ import ( . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-events" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" mempl "github.com/tendermint/tendermint/mempool" @@ -231,7 +230,7 @@ type ConsensusState struct { tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine timeoutParams *TimeoutParams // parameters and functions for timeout intervals - evsw *events.EventSwitch + evsw types.EventSwitch wal *WAL replayMode bool // so we don't log signing errors during replay @@ -264,7 +263,7 @@ func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.Ap // Public interface // implements events.Eventable -func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) { +func (cs *ConsensusState) SetEventSwitch(evsw types.EventSwitch) { cs.evsw = evsw } @@ -545,7 +544,7 @@ func (cs *ConsensusState) newStep() { cs.nSteps += 1 // newStep is called by updateToStep in NewConsensusState before the evsw is set! if cs.evsw != nil { - cs.evsw.FireEvent(types.EventStringNewRoundStep(), rs) + types.FireEventNewRoundStep(cs.evsw, rs) } } @@ -719,13 +718,13 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { // XXX: should we fire timeout here? cs.enterNewRound(ti.Height, 0) case RoundStepPropose: - cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent()) + types.FireEventTimeoutPropose(cs.evsw, cs.RoundStateEvent()) cs.enterPrevote(ti.Height, ti.Round) case RoundStepPrevoteWait: - cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) + types.FireEventTimeoutWait(cs.evsw, cs.RoundStateEvent()) cs.enterPrecommit(ti.Height, ti.Round) case RoundStepPrecommitWait: - cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) + types.FireEventTimeoutWait(cs.evsw, cs.RoundStateEvent()) cs.enterNewRound(ti.Height, ti.Round+1) default: panic(Fmt("Invalid timeout step: %v", ti.Step)) @@ -777,7 +776,7 @@ func (cs *ConsensusState) enterNewRound(height int, round int) { } cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping - cs.evsw.FireEvent(types.EventStringNewRound(), cs.RoundStateEvent()) + types.FireEventNewRound(cs.evsw, cs.RoundStateEvent()) // Immediately go to enterPropose. cs.enterPropose(height, round) @@ -942,7 +941,7 @@ func (cs *ConsensusState) enterPrevote(height int, round int) { // fire event for how we got here if cs.isProposalComplete() { - cs.evsw.FireEvent(types.EventStringCompleteProposal(), cs.RoundStateEvent()) + types.FireEventCompleteProposal(cs.evsw, cs.RoundStateEvent()) } else { // we received +2/3 prevotes for a future round // TODO: catchup event? @@ -1047,7 +1046,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { } // At this point +2/3 prevoted for a particular block or nil - cs.evsw.FireEvent(types.EventStringPolka(), cs.RoundStateEvent()) + types.FireEventPolka(cs.evsw, cs.RoundStateEvent()) // the latest POLRound should be this round if cs.Votes.POLRound() < round { @@ -1063,7 +1062,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { cs.LockedRound = 0 cs.LockedBlock = nil cs.LockedBlockParts = nil - cs.evsw.FireEvent(types.EventStringUnlock(), cs.RoundStateEvent()) + types.FireEventUnlock(cs.evsw, cs.RoundStateEvent()) } cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{}) return @@ -1075,7 +1074,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { if cs.LockedBlock.HashesTo(hash) { log.Notice("enterPrecommit: +2/3 prevoted locked block. Relocking") cs.LockedRound = round - cs.evsw.FireEvent(types.EventStringRelock(), cs.RoundStateEvent()) + types.FireEventRelock(cs.evsw, cs.RoundStateEvent()) cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader) return } @@ -1090,7 +1089,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { cs.LockedRound = round cs.LockedBlock = cs.ProposalBlock cs.LockedBlockParts = cs.ProposalBlockParts - cs.evsw.FireEvent(types.EventStringLock(), cs.RoundStateEvent()) + types.FireEventLock(cs.evsw, cs.RoundStateEvent()) cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader) return } @@ -1106,7 +1105,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { cs.ProposalBlock = nil cs.ProposalBlockParts = types.NewPartSetFromHeader(partsHeader) } - cs.evsw.FireEvent(types.EventStringUnlock(), cs.RoundStateEvent()) + types.FireEventUnlock(cs.evsw, cs.RoundStateEvent()) cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{}) return } @@ -1226,14 +1225,14 @@ func (cs *ConsensusState) finalizeCommit(height int) { // Fire off event for new block. // TODO: Handle app failure. See #177 - cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block}) - cs.evsw.FireEvent(types.EventStringNewBlockHeader(), types.EventDataNewBlockHeader{block.Header}) + types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) + types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) // Create a copy of the state for staging stateCopy := cs.state.Copy() // event cache for txs - eventCache := events.NewEventCache(cs.evsw) + eventCache := types.NewEventCache(cs.evsw) // Run the block on the State: // + update validator sets @@ -1423,7 +1422,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string added, address, err = cs.LastCommit.AddByIndex(valIndex, vote) if added { log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) - cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote}) + types.FireEventVote(cs.evsw, types.EventDataVote{valIndex, address, vote}) } return @@ -1434,7 +1433,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string height := cs.Height added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey) if added { - cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote}) + types.FireEventVote(cs.evsw, types.EventDataVote{valIndex, address, vote}) switch vote.Type { case types.VoteTypePrevote: @@ -1452,7 +1451,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string cs.LockedRound = 0 cs.LockedBlock = nil cs.LockedBlockParts = nil - cs.evsw.FireEvent(types.EventStringUnlock(), cs.RoundStateEvent()) + types.FireEventUnlock(cs.evsw, cs.RoundStateEvent()) } } if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() { diff --git a/consensus/state_test.go b/consensus/state_test.go index d890c4e9..a09ab16f 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -6,9 +6,8 @@ import ( "testing" "time" - "github.com/tendermint/tendermint/config/tendermint_test" - //"github.com/tendermint/go-events" . "github.com/tendermint/go-common" + "github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/types" ) diff --git a/glide.lock b/glide.lock index 722c82a7..39ead086 100644 --- a/glide.lock +++ b/glide.lock @@ -64,17 +64,17 @@ imports: - name: github.com/tendermint/go-db version: 31fdd21c7eaeed53e0ea7ca597fb1e960e2988a5 - name: github.com/tendermint/go-events - version: 48fa21511b259278b871a37b6951da2d5bef698d + version: 1652dc8b3f7780079aa98c3ce20a83ee90b9758b - name: github.com/tendermint/go-logger version: cefb3a45c0bf3c493a04e9bcd9b1540528be59f2 - name: github.com/tendermint/go-merkle version: 05042c6ab9cad51d12e4cecf717ae68e3b1409a8 - name: github.com/tendermint/go-p2p - version: f508f3f20b5bb36f03d3bc83647b7a92425139d1 + version: 1eb390680d33299ba0e3334490eca587efd18414 subpackages: - upnp - name: github.com/tendermint/go-rpc - version: 479510be0e80dd9e5d6b1f941adad168df0af85f + version: 855255d73eecd25097288be70f3fb208a5817d80 subpackages: - client - server @@ -86,7 +86,7 @@ imports: subpackages: - term - name: github.com/tendermint/tmsp - version: ead192adbbbf85ac581cf775b18ae70d59f86457 + version: 5d3eb0328a615ba55b580ce871033e605aa8b97d subpackages: - client - example/counter diff --git a/mempool/reactor.go b/mempool/reactor.go index 25fe454f..92db544d 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -9,7 +9,6 @@ import ( "github.com/tendermint/go-clist" . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" - "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" @@ -28,7 +27,7 @@ type MempoolReactor struct { p2p.BaseReactor config cfg.Config Mempool *Mempool - evsw *events.EventSwitch + evsw types.EventSwitch } func NewMempoolReactor(config cfg.Config, mempool *Mempool) *MempoolReactor { @@ -143,7 +142,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { } // implements events.Eventable -func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) { +func (memR *MempoolReactor) SetEventSwitch(evsw types.EventSwitch) { memR.evsw = evsw } diff --git a/node/node.go b/node/node.go index e8099c7f..8da813ba 100644 --- a/node/node.go +++ b/node/node.go @@ -12,7 +12,6 @@ import ( cfg "github.com/tendermint/go-config" "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" - "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-rpc" "github.com/tendermint/go-rpc/server" @@ -32,7 +31,7 @@ import _ "net/http/pprof" type Node struct { config cfg.Config sw *p2p.Switch - evsw *events.EventSwitch + evsw types.EventSwitch blockStore *bc.BlockStore bcReactor *bc.BlockchainReactor mempoolReactor *mempl.MempoolReactor @@ -80,7 +79,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato privKey := crypto.GenPrivKeyEd25519() // Make event switch - eventSwitch := events.NewEventSwitch() + eventSwitch := types.NewEventSwitch() _, err := eventSwitch.Start() if err != nil { Exit(Fmt("Failed to start switch: %v", err)) @@ -187,7 +186,7 @@ func (n *Node) Stop() { } // Add the event switch to reactors, mempool, etc. -func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) { +func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) { for _, e := range eventables { e.SetEventSwitch(evsw) } @@ -252,7 +251,7 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor { return n.mempoolReactor } -func (n *Node) EventSwitch() *events.EventSwitch { +func (n *Node) EventSwitch() types.EventSwitch { return n.evsw } @@ -401,7 +400,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState { config.Set("chain_id", state.ChainID) // Make event switch - eventSwitch := events.NewEventSwitch() + eventSwitch := types.NewEventSwitch() _, err := eventSwitch.Start() if err != nil { Exit(Fmt("Failed to start event switch: %v", err)) diff --git a/rpc/core/events.go b/rpc/core/events.go index ab6fd35e..7dc3c7c3 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -1,7 +1,6 @@ package core import ( - "github.com/tendermint/go-events" "github.com/tendermint/go-rpc/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" @@ -9,10 +8,10 @@ import ( func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { + types.AddListenerForEvent(wsCtx.GetEventSwitch(), wsCtx.GetRemoteAddr(), event, func(msg types.TMEventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - tmResult := ctypes.TMResult(&ctypes.ResultEvent{event, types.TMEventData(msg)}) + tmResult := ctypes.TMResult(&ctypes.ResultEvent{event, msg}) wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &tmResult, "")) }) return &ctypes.ResultSubscribe{}, nil diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index bef85c79..8705f867 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -4,7 +4,6 @@ import ( "fmt" "time" - "github.com/tendermint/go-events" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" tmsp "github.com/tendermint/tmsp/types" @@ -52,9 +51,9 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // subscribe to tx being committed in block - appendTxResCh := make(chan *tmsp.Response, 1) - eventSwitch.AddListenerForEvent("rpc", types.EventStringTx(tx), func(data events.EventData) { - appendTxResCh <- data.(*tmsp.Response) + appendTxResCh := make(chan types.EventDataTx, 1) + types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) { + appendTxResCh <- data.(types.EventDataTx) }) // broadcast the tx and register checktx callback @@ -84,11 +83,10 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // The tx was included in a block. // NOTE we don't return an error regardless of the AppendTx code; // clients must check this to see if they need to send a new tx! - r := appendTxRes.GetAppendTx() return &ctypes.ResultBroadcastTx{ - Code: r.Code, - Data: r.Data, - Log: r.Log, + Code: appendTxRes.Code, + Data: appendTxRes.Result, + Log: appendTxRes.Log, }, nil case <-timer.C: r := checkTxR diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 90febf0b..8839c4f7 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -4,7 +4,6 @@ import ( cfg "github.com/tendermint/go-config" "github.com/tendermint/go-p2p" - "github.com/tendermint/go-events" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" @@ -12,7 +11,7 @@ import ( "github.com/tendermint/tendermint/types" ) -var eventSwitch *events.EventSwitch +var eventSwitch types.EventSwitch var blockStore *bc.BlockStore var consensusState *consensus.ConsensusState var consensusReactor *consensus.ConsensusReactor @@ -28,7 +27,7 @@ func SetConfig(c cfg.Config) { config = c } -func SetEventSwitch(evsw *events.EventSwitch) { +func SetEventSwitch(evsw types.EventSwitch) { eventSwitch = evsw } diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index 345049d5..ddc84ef0 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -257,6 +257,40 @@ func TestWSBlockchainGrowth(t *testing.T) { } } +func TestWSTxEvent(t *testing.T) { + wsc := newWSClient(t) + tx := randBytes() + + // listen for the tx I am about to submit + eid := types.EventStringTx(types.Tx(tx)) + subscribe(t, wsc, eid) + defer func() { + unsubscribe(t, wsc, eid) + wsc.Stop() + }() + + // send an tx + tmResult := new(ctypes.TMResult) + _, err := clientJSON.Call("broadcast_tx_sync", []interface{}{tx}, tmResult) + if err != nil { + t.Fatal("Error submitting event") + } + + waitForEvent(t, wsc, eid, true, func() {}, func(eid string, b interface{}) error { + evt, ok := b.(types.EventDataTx) + if !ok { + t.Fatal("Got wrong event type", b) + } + if bytes.Compare([]byte(evt.Tx), tx) != 0 { + t.Error("Event returned different tx") + } + if evt.Code != tmsp.CodeType_OK { + t.Error("Event returned tx error code", evt.Code) + } + return nil + }) +} + /* TODO: this with dummy app.. func TestWSDoubleFire(t *testing.T) { if testing.Short() { diff --git a/scripts/glide/status.sh b/scripts/glide/status.sh index 374fe38f..46ca5279 100644 --- a/scripts/glide/status.sh +++ b/scripts/glide/status.sh @@ -31,6 +31,11 @@ for lib in "${LIBS[@]}"; do echo "Vendored: $VENDORED" echo "Master: $MASTER" fi + elif [[ "$VENDORED" != "$HEAD" ]]; then + echo "" + echo "Vendored version of $lib matches origin/master but differs from HEAD" + echo "Vendored: $VENDORED" + echo "Head: $HEAD" fi done diff --git a/scripts/glide/update.sh b/scripts/glide/update.sh index 64dd8cda..59d4de33 100644 --- a/scripts/glide/update.sh +++ b/scripts/glide/update.sh @@ -16,4 +16,4 @@ cd $GOPATH/src/github.com/tendermint/$LIB NEW_COMMIT=$(git rev-parse HEAD) cd $PWD -sed -i "s/$OLD_COMMIT/$NEW_COMMIT/g" $GLIDE +sed -i "" "s/$OLD_COMMIT/$NEW_COMMIT/g" $GLIDE diff --git a/state/execution.go b/state/execution.go index 14ea965c..804cecfc 100644 --- a/state/execution.go +++ b/state/execution.go @@ -5,7 +5,6 @@ import ( "fmt" . "github.com/tendermint/go-common" - "github.com/tendermint/go-events" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" tmsp "github.com/tendermint/tmsp/types" @@ -18,7 +17,7 @@ func (s *State) ValidateBlock(block *types.Block) error { // Execute the block to mutate State. // Validates block and then executes Data.Txs in the block. -func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error { +func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error { // Validate the block. err := s.validateBlock(block) @@ -55,7 +54,7 @@ func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn // Executes block's transactions on proxyAppConn. // TODO: Generate a bitmap or otherwise store tx validity in state. -func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error { +func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error { var validTxs, invalidTxs = 0, 0 @@ -67,15 +66,25 @@ func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn pro // TODO: make use of this info // Blocks may include invalid txs. // reqAppendTx := req.(tmsp.RequestAppendTx) - if r.AppendTx.Code == tmsp.CodeType_OK { + txError := "" + apTx := r.AppendTx + if apTx.Code == tmsp.CodeType_OK { validTxs += 1 } else { log.Debug("Invalid tx", "code", r.AppendTx.Code, "log", r.AppendTx.Log) invalidTxs += 1 + txError = apTx.Code.String() } // NOTE: if we count we can access the tx from the block instead of // pulling it from the req - eventCache.FireEvent(types.EventStringTx(req.GetAppendTx().Tx), res) + event := types.EventDataTx{ + Tx: req.GetAppendTx().Tx, + Result: apTx.Data, + Code: apTx.Code, + Log: apTx.Log, + Error: txError, + } + types.FireEventTx(eventCache, event) } } proxyAppConn.SetResponseCallback(proxyCb) diff --git a/types/events.go b/types/events.go index 68313ff2..4fe4d407 100644 --- a/types/events.go +++ b/types/events.go @@ -5,6 +5,7 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-events" "github.com/tendermint/go-wire" + tmsp "github.com/tendermint/tmsp/types" ) // Functions to generate eventId strings @@ -35,7 +36,7 @@ func EventStringVote() string { return "Vote" } // implements events.EventData type TMEventData interface { events.EventData - // AssertIsTMEventData() + AssertIsTMEventData() } const ( @@ -72,10 +73,11 @@ type EventDataNewBlockHeader struct { // All txs fire EventDataTx type EventDataTx struct { - Tx Tx `json:"tx"` - Result []byte `json:"result"` - Log string `json:"log"` - Error string `json:"error"` + Tx Tx `json:"tx"` + Result []byte `json:"result"` + Log string `json:"log"` + Code tmsp.CodeType `json:"code"` + Error string `json:"error"` } // NOTE: This goes into the replay WAL @@ -99,3 +101,99 @@ func (_ EventDataNewBlockHeader) AssertIsTMEventData() {} func (_ EventDataTx) AssertIsTMEventData() {} func (_ EventDataRoundState) AssertIsTMEventData() {} func (_ EventDataVote) AssertIsTMEventData() {} + +//---------------------------------------- +// Wrappers for type safety + +type Fireable interface { + events.Fireable +} + +type Eventable interface { + SetEventSwitch(EventSwitch) +} + +type EventSwitch interface { + events.EventSwitch +} + +type EventCache interface { + Fireable + Flush() +} + +func NewEventSwitch() EventSwitch { + return events.NewEventSwitch() +} + +func NewEventCache(evsw EventSwitch) EventCache { + return events.NewEventCache(evsw) +} + +// All events should be based on this FireEvent to ensure they are TMEventData +func fireEvent(fireable events.Fireable, event string, data TMEventData) { + fireable.FireEvent(event, data) +} + +func AddListenerForEvent(evsw EventSwitch, id, event string, cb func(data TMEventData)) { + evsw.AddListenerForEvent(id, event, func(data events.EventData) { + cb(data.(TMEventData)) + }) + +} + +//--- block, tx, and vote events + +func FireEventNewBlock(fireable events.Fireable, block EventDataNewBlock) { + fireEvent(fireable, EventStringNewBlock(), block) +} + +func FireEventNewBlockHeader(fireable events.Fireable, header EventDataNewBlockHeader) { + fireEvent(fireable, EventStringNewBlockHeader(), header) +} + +func FireEventVote(fireable events.Fireable, vote EventDataVote) { + fireEvent(fireable, EventStringVote(), vote) +} + +func FireEventTx(fireable events.Fireable, tx EventDataTx) { + fireEvent(fireable, EventStringTx(tx.Tx), tx) +} + +//--- EventDataRoundState events + +func FireEventNewRoundStep(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringNewRoundStep(), rs) +} + +func FireEventTimeoutPropose(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringTimeoutPropose(), rs) +} + +func FireEventTimeoutWait(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringTimeoutWait(), rs) +} + +func FireEventNewRound(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringNewRound(), rs) +} + +func FireEventCompleteProposal(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringCompleteProposal(), rs) +} + +func FireEventPolka(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringPolka(), rs) +} + +func FireEventUnlock(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringUnlock(), rs) +} + +func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringRelock(), rs) +} + +func FireEventLock(fireable events.Fireable, rs EventDataRoundState) { + fireEvent(fireable, EventStringLock(), rs) +}