From e4492afbad1af28c4ac33298681d368d66d37d66 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 5 Apr 2018 08:17:10 -0700 Subject: [PATCH] Merge --- Gopkg.lock | 7 +-- benchmarks/codec_test.go | 15 ++++-- blockchain/store.go | 2 +- cmd/tendermint/commands/show_validator.go | 8 ++-- cmd/tendermint/commands/wire.go | 12 +++++ consensus/common_test.go | 2 +- consensus/mempool_test.go | 2 +- consensus/reactor.go | 6 +-- consensus/reactor_test.go | 6 +-- consensus/state.go | 2 +- consensus/state_test.go | 28 ++++++------ consensus/wal_generator.go | 1 + consensus/wal_test.go | 2 +- lite/proxy/wrapper.go | 2 +- node/node.go | 14 +++--- rpc/client/event_test.go | 8 ++-- rpc/client/helpers.go | 4 +- rpc/core/events.go | 2 +- rpc/core/mempool.go | 2 +- rpc/test/helpers.go | 4 +- state/txindex/indexer_service.go | 2 +- types/event_bus.go | 28 ++++++------ types/event_bus_test.go | 2 +- types/events.go | 56 ++++------------------- 24 files changed, 97 insertions(+), 120 deletions(-) create mode 100644 cmd/tendermint/commands/wire.go diff --git a/Gopkg.lock b/Gopkg.lock index 19a15cc5..c0704d72 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -265,10 +265,7 @@ [[projects]] name = "github.com/tendermint/go-wire" - packages = [ - ".", - "data" - ] + packages = ["."] revision = "fa721242b042ecd4c6ed1a934ee740db4f74e45c" version = "v0.7.3" @@ -386,6 +383,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "8cd32c0a5faec4d8cbca3d9ea5d97b0e4e90bfc2a9f0e6b71a4846cb6030be94" + inputs-digest = "0dacd2eb1550ca01e0c64f77b721eda1a381dde1d246a56bfe5a2746b78b7bad" solver-name = "gps-cdcl" solver-version = 1 diff --git a/benchmarks/codec_test.go b/benchmarks/codec_test.go index 8ac62a24..b3441799 100644 --- a/benchmarks/codec_test.go +++ b/benchmarks/codec_test.go @@ -5,7 +5,6 @@ import ( "time" "github.com/tendermint/go-crypto" - "github.com/tendermint/go-wire" proto "github.com/tendermint/tendermint/benchmarks/proto" "github.com/tendermint/tendermint/p2p" @@ -33,7 +32,10 @@ func BenchmarkEncodeStatusWire(b *testing.B) { counter := 0 for i := 0; i < b.N; i++ { - jsonBytes := wire.JSONBytes(status) + jsonBytes, err := cdc.MarshalJSON(status) + if err != nil { + panic(err) + } counter += len(jsonBytes) } @@ -54,7 +56,10 @@ func BenchmarkEncodeNodeInfoWire(b *testing.B) { counter := 0 for i := 0; i < b.N; i++ { - jsonBytes := wire.JSONBytes(nodeInfo) + jsonBytes, err := cdc.MarshalJSON(nodeInfo) + if err != nil { + panic(err) + } counter += len(jsonBytes) } } @@ -74,7 +79,7 @@ func BenchmarkEncodeNodeInfoBinary(b *testing.B) { counter := 0 for i := 0; i < b.N; i++ { - jsonBytes := wire.BinaryBytes(nodeInfo) + jsonBytes := cdc.MustMarshalBinaryBare(nodeInfo) counter += len(jsonBytes) } @@ -82,7 +87,7 @@ func BenchmarkEncodeNodeInfoBinary(b *testing.B) { func BenchmarkEncodeNodeInfoProto(b *testing.B) { b.StopTimer() - pubKey := crypto.GenPrivKeyEd25519().PubKey().Unwrap().(crypto.PubKeyEd25519) + pubKey := crypto.GenPrivKeyEd25519().PubKey().(crypto.PubKeyEd25519) pubKey2 := &proto.PubKey{Ed25519: &proto.PubKeyEd25519{Bytes: pubKey[:]}} nodeInfo := proto.NodeInfo{ PubKey: pubKey2, diff --git a/blockchain/store.go b/blockchain/store.go index 8210f143..e322e414 100644 --- a/blockchain/store.go +++ b/blockchain/store.go @@ -69,7 +69,7 @@ func (bs *BlockStore) LoadBlock(height int64) *types.Block { part := bs.LoadBlockPart(height, i) buf = append(buf, part.Bytes...) } - err = cdc.UnmarshalBinaryBare(buf, block) + err = cdc.UnmarshalBinary(buf, block) if err != nil { // NOTE: The existence of meta should imply the existence of the // block. So, make sure meta is only saved after blocks are saved. diff --git a/cmd/tendermint/commands/show_validator.go b/cmd/tendermint/commands/show_validator.go index 458f11c2..6ead4bad 100644 --- a/cmd/tendermint/commands/show_validator.go +++ b/cmd/tendermint/commands/show_validator.go @@ -2,11 +2,9 @@ package commands import ( "fmt" - "github.com/spf13/cobra" - "github.com/tendermint/go-wire/data" - "github.com/tendermint/tendermint/types" + privval "github.com/tendermint/tendermint/types/priv_validator" ) // ShowValidatorCmd adds capabilities for showing the validator info. @@ -17,7 +15,7 @@ var ShowValidatorCmd = &cobra.Command{ } func showValidator(cmd *cobra.Command, args []string) { - privValidator := types.LoadOrGenPrivValidatorFS(config.PrivValidatorFile()) - pubKeyJSONBytes, _ := data.ToJSON(privValidator.PubKey) + privValidator := privval.LoadOrGenFilePV(config.PrivValidatorFile()) + pubKeyJSONBytes, _ := cdc.MarshalJSON(privValidator.GetPubKey()) fmt.Println(string(pubKeyJSONBytes)) } diff --git a/cmd/tendermint/commands/wire.go b/cmd/tendermint/commands/wire.go new file mode 100644 index 00000000..4c133a8c --- /dev/null +++ b/cmd/tendermint/commands/wire.go @@ -0,0 +1,12 @@ +package commands + +import ( + "github.com/tendermint/go-amino" + "github.com/tendermint/go-crypto" +) + +var cdc = amino.NewCodec() + +func init() { + crypto.RegisterAmino(cdc) +} diff --git a/consensus/common_test.go b/consensus/common_test.go index c8de4001..fe95210a 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -223,7 +223,7 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { voteCh := make(chan interface{}) go func() { for v := range voteCh0 { - vote := v.(types.TMEventData).Unwrap().(types.EventDataVote) + vote := v.(types.EventDataVote) // we only fire for our own votes if bytes.Equal(addr, vote.Vote.ValidatorAddress) { voteCh <- v diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index d1714a74..823f9865 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -108,7 +108,7 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) { ticker := time.NewTicker(time.Second * 30) select { case b := <-newBlockCh: - evt := b.(types.TMEventData).Unwrap().(types.EventDataNewBlock) + evt := b.(types.EventDataNewBlock) nTxs += int(evt.Block.Header.NumTxs) case <-ticker.C: panic("Timed out waiting to commit blocks with transactions") diff --git a/consensus/reactor.go b/consensus/reactor.go index 3ab8995d..85943c21 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -372,17 +372,17 @@ func (conR *ConsensusReactor) startBroadcastRoutine() error { select { case data, ok := <-stepsCh: if ok { // a receive from a closed channel returns the zero value immediately - edrs := data.(types.TMEventData).Unwrap().(types.EventDataRoundState) + edrs := data.(types.EventDataRoundState) conR.broadcastNewRoundStep(edrs.RoundState.(*cstypes.RoundState)) } case data, ok := <-votesCh: if ok { - edv := data.(types.TMEventData).Unwrap().(types.EventDataVote) + edv := data.(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote) } case data, ok := <-heartbeatsCh: if ok { - edph := data.(types.TMEventData).Unwrap().(types.EventDataProposalHeartbeat) + edph := data.(types.EventDataProposalHeartbeat) conR.broadcastProposalHeartbeatMessage(edph) } case <-conR.Quit(): diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 8e96de2b..f6f94137 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -301,7 +301,7 @@ func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{} if !ok { return } - newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block + newBlock := newBlockI.(types.EventDataNewBlock).Block css[j].Logger.Debug("waitForAndValidateBlock: Got block", "height", newBlock.Height) err := validateBlock(newBlock, activeVals) assert.Nil(t, err) @@ -322,7 +322,7 @@ func waitForAndValidateBlockWithTx(t *testing.T, n int, activeVals map[string]st if !ok { return } - newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block + newBlock := newBlockI.(types.EventDataNewBlock).Block css[j].Logger.Debug("waitForAndValidateBlockWithTx: Got block", "height", newBlock.Height) err := validateBlock(newBlock, activeVals) assert.Nil(t, err) @@ -354,7 +354,7 @@ func waitForBlockWithUpdatedValsAndValidateIt(t *testing.T, n int, updatedVals m if !ok { return } - newBlock = newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block + newBlock = newBlockI.(types.EventDataNewBlock).Block if newBlock.LastCommit.Size() == len(updatedVals) { css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block", "height", newBlock.Height) break LOOP diff --git a/consensus/state.go b/consensus/state.go index 42de5fec..0f086172 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -778,7 +778,7 @@ func (cs *ConsensusState) enterPropose(height int64, round int) { // if not a validator, we're done if !cs.Validators.HasAddress(cs.privValidator.GetAddress()) { - cs.Logger.Debug("This node is not a validator") + cs.Logger.Debug("This node is not a validator 2", cs.privValidator.GetAddress(), cs.Validators) return } cs.Logger.Debug("This node is a validator") diff --git a/consensus/state_test.go b/consensus/state_test.go index 6efc2120..0d7cad48 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -261,7 +261,7 @@ func TestStateFullRound1(t *testing.T) { // grab proposal re := <-propCh - propBlockHash := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState).ProposalBlock.Hash() + propBlockHash := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState).ProposalBlock.Hash() <-voteCh // wait for prevote validatePrevote(t, cs, round, vss[0], propBlockHash) @@ -356,7 +356,7 @@ func TestStateLockNoPOL(t *testing.T) { cs1.startRoutines(0) re := <-proposalCh - rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -396,7 +396,7 @@ func TestStateLockNoPOL(t *testing.T) { // now we're on a new round and not the proposer, so wait for timeout re = <-timeoutProposeCh - rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) if rs.ProposalBlock != nil { panic("Expected proposal block to be nil") @@ -440,7 +440,7 @@ func TestStateLockNoPOL(t *testing.T) { incrementRound(vs2) re = <-proposalCh - rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) // now we're on a new round and are the proposer if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) { @@ -529,7 +529,7 @@ func TestStateLockPOLRelock(t *testing.T) { <-newRoundCh re := <-proposalCh - rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -605,9 +605,9 @@ func TestStateLockPOLRelock(t *testing.T) { discardFromChan(voteCh, 2) be := <-newBlockCh - b := be.(types.TMEventData).Unwrap().(types.EventDataNewBlockHeader) + b := be.(types.EventDataNewBlockHeader) re = <-newRoundCh - rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) if rs.Height != 2 { panic("Expected height to increment") } @@ -643,7 +643,7 @@ func TestStateLockPOLUnlock(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -669,7 +669,7 @@ func TestStateLockPOLUnlock(t *testing.T) { // timeout to new round re = <-timeoutWaitCh - rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) lockedBlockHash := rs.LockedBlock.Hash() //XXX: this isnt guaranteed to get there before the timeoutPropose ... @@ -731,7 +731,7 @@ func TestStateLockPOLSafety1(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) propBlock := rs.ProposalBlock <-voteCh // prevote @@ -781,7 +781,7 @@ func TestStateLockPOLSafety1(t *testing.T) { re = <-proposalCh } - rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) if rs.LockedBlock != nil { panic("we should not be locked!") @@ -1033,7 +1033,7 @@ func TestStateHalt1(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + rs := re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) propBlock := rs.ProposalBlock propBlockParts := propBlock.MakePartSet(partSize) @@ -1056,7 +1056,7 @@ func TestStateHalt1(t *testing.T) { // timeout to new round <-timeoutWaitCh re = <-newRoundCh - rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) t.Log("### ONTO ROUND 1") /*Round2 @@ -1074,7 +1074,7 @@ func TestStateHalt1(t *testing.T) { // receiving that precommit should take us straight to commit <-newBlockCh re = <-newRoundCh - rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*cstypes.RoundState) + rs = re.(types.EventDataRoundState).RoundState.(*cstypes.RoundState) if rs.Height != 2 { panic("expected height to increment") diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index bb0b5296..65de399d 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -72,6 +72,7 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState.SetLogger(logger) consensusState.SetEventBus(eventBus) + fmt.Println(">>privval", privValidator) if privValidator != nil { consensusState.SetPrivValidator(privValidator) } diff --git a/consensus/wal_test.go b/consensus/wal_test.go index 9925b691..45af36e1 100644 --- a/consensus/wal_test.go +++ b/consensus/wal_test.go @@ -35,7 +35,7 @@ func TestWALEncoderDecoder(t *testing.T) { decoded, err := dec.Decode() require.NoError(t, err) - assert.True(t, msg.Time.Equal(decoded.Time)) + assert.Equal(t, msg.Time.UTC(), decoded.Time) assert.Equal(t, msg.Msg, decoded.Msg) } } diff --git a/lite/proxy/wrapper.go b/lite/proxy/wrapper.go index e8aa011e..5fb12a40 100644 --- a/lite/proxy/wrapper.go +++ b/lite/proxy/wrapper.go @@ -146,7 +146,7 @@ func (w Wrapper) Commit(height *int64) (*ctypes.ResultCommit, error) { // } // // check to validate it if possible, and drop if not valid -// switch t := tm.Unwrap().(type) { +// switch t := tm.(type) { // case types.EventDataNewBlockHeader: // err := verifyHeader(s.client, t.Header) // if err != nil { diff --git a/node/node.go b/node/node.go index dffdb83e..be744f84 100644 --- a/node/node.go +++ b/node/node.go @@ -10,8 +10,8 @@ import ( "strings" abci "github.com/tendermint/abci/types" + amino "github.com/tendermint/go-amino" crypto "github.com/tendermint/go-crypto" - wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/log" @@ -34,7 +34,7 @@ import ( "github.com/tendermint/tendermint/state/txindex/kv" "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/types" - priv_val "github.com/tendermint/tendermint/types/priv_validator" + privval "github.com/tendermint/tendermint/types/priv_validator" "github.com/tendermint/tendermint/version" _ "net/http/pprof" @@ -79,7 +79,7 @@ type NodeProvider func(*cfg.Config, log.Logger) (*Node, error) // It implements NodeProvider. func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { return NewNode(config, - types.LoadOrGenPrivValidatorFS(config.PrivValidatorFile()), + privval.LoadOrGenFilePV(config.PrivValidatorFile()), proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), DefaultGenesisDocProviderFunc(config), DefaultDBProvider, @@ -180,8 +180,8 @@ func NewNode(config *cfg.Config, // TODO: persist this key so external signer // can actually authenticate us privKey = crypto.GenPrivKeyEd25519() - pvsc = priv_val.NewSocketClient( - logger.With("module", "priv_val"), + pvsc = privval.NewSocketPV( + logger.With("module", "privval"), config.PrivValidatorListenAddr, privKey, ) @@ -445,7 +445,7 @@ func (n *Node) OnStop() { n.eventBus.Stop() n.indexerService.Stop() - if pvsc, ok := n.privValidator.(*priv_val.SocketClient); ok { + if pvsc, ok := n.privValidator.(*privval.SocketPV); ok { if err := pvsc.Stop(); err != nil { n.Logger.Error("Error stopping priv validator socket client", "err", err) } @@ -591,7 +591,7 @@ func (n *Node) makeNodeInfo(pubKey crypto.PubKey) p2p.NodeInfo { }, Moniker: n.config.Moniker, Other: []string{ - cmn.Fmt("wire_version=%v", wire.Version), + cmn.Fmt("amino_version=%v", amino.Version), cmn.Fmt("p2p_version=%v", p2p.Version), cmn.Fmt("consensus_version=%v", cs.Version), cmn.Fmt("rpc_version=%v/%v", rpc.Version, rpccore.Version), diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index 40a42c18..e90ace43 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -36,7 +36,7 @@ func TestHeaderEvents(t *testing.T) { evtTyp := types.EventNewBlockHeader evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(err, "%d: %+v", i, err) - _, ok := evt.Unwrap().(types.EventDataNewBlockHeader) + _, ok := evt.(types.EventDataNewBlockHeader) require.True(ok, "%d: %#v", i, evt) // TODO: more checks... } @@ -59,7 +59,7 @@ func TestBlockEvents(t *testing.T) { evtTyp := types.EventNewBlock evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(err, "%d: %+v", j, err) - blockEvent, ok := evt.Unwrap().(types.EventDataNewBlock) + blockEvent, ok := evt.(types.EventDataNewBlock) require.True(ok, "%d: %#v", j, evt) block := blockEvent.Block @@ -97,7 +97,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(err, "%d: %+v", i, err) // and make sure it has the proper info - txe, ok := evt.Unwrap().(types.EventDataTx) + txe, ok := evt.(types.EventDataTx) require.True(ok, "%d: %#v", i, evt) // make sure this is the proper tx require.EqualValues(tx, txe.Tx) @@ -129,7 +129,7 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(err, "%d: %+v", i, err) // and make sure it has the proper info - txe, ok := evt.Unwrap().(types.EventDataTx) + txe, ok := evt.(types.EventDataTx) require.True(ok, "%d: %#v", i, evt) // make sure this is the proper tx require.EqualValues(tx, txe.Tx) diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index e7a84b6b..86c65919 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -65,7 +65,7 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type query := types.QueryForEvent(evtTyp) err := c.Subscribe(ctx, subscriber, query, evts) if err != nil { - return types.TMEventData{}, errors.Wrap(err, "failed to subscribe") + return nil, errors.Wrap(err, "failed to subscribe") } // make sure to unregister after the test is over @@ -75,6 +75,6 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type case evt := <-evts: return evt.(types.TMEventData), nil case <-ctx.Done(): - return types.TMEventData{}, errors.New("timed out waiting for event") + return nil, errors.New("timed out waiting for event") } } diff --git a/rpc/core/events.go b/rpc/core/events.go index 9353ace6..beeaf4c9 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -61,7 +61,7 @@ import ( // // go func() { // for e := range txs { -// fmt.Println("got ", e.(types.TMEventData).Unwrap().(types.EventDataTx)) +// fmt.Println("got ", e.(types.EventDataTx)) // } // }() // ``` diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 1dbdd801..77c8c844 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -189,7 +189,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { timer := time.NewTimer(60 * 2 * time.Second) select { case deliverTxResMsg := <-deliverTxResCh: - deliverTxRes := deliverTxResMsg.(types.TMEventData).Unwrap().(types.EventDataTx) + deliverTxRes := deliverTxResMsg.(types.EventDataTx) // The tx was included in a block. deliverTxR := deliverTxRes.Result logger.Info("DeliverTx passed ", "tx", cmn.HexBytes(tx), "response", deliverTxR) diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index b67d7634..c9dc6d21 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -18,7 +18,7 @@ import ( ctypes "github.com/tendermint/tendermint/rpc/core/types" core_grpc "github.com/tendermint/tendermint/rpc/grpc" rpcclient "github.com/tendermint/tendermint/rpc/lib/client" - "github.com/tendermint/tendermint/types" + privval "github.com/tendermint/tendermint/types/priv_validator" ) var globalConfig *cfg.Config @@ -113,7 +113,7 @@ func NewTendermint(app abci.Application) *nm.Node { logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) logger = log.NewFilter(logger, log.AllowError()) privValidatorFile := config.PrivValidatorFile() - privValidator := types.LoadOrGenPrivValidatorFS(privValidatorFile) + privValidator := privval.LoadOrGenFilePV(privValidatorFile) papp := proxy.NewLocalClientCreator(app) node, err := nm.NewNode(config, privValidator, papp, nm.DefaultGenesisDocProviderFunc(config), diff --git a/state/txindex/indexer_service.go b/state/txindex/indexer_service.go index 3e5fab12..f5420f63 100644 --- a/state/txindex/indexer_service.go +++ b/state/txindex/indexer_service.go @@ -34,7 +34,7 @@ func (is *IndexerService) OnStart() error { go func() { for event := range ch { // TODO: may be not perfomant to write one event at a time - txResult := event.(types.TMEventData).Unwrap().(types.EventDataTx).TxResult + txResult := event.(types.EventDataTx).TxResult is.idr.Index(&txResult) } }() diff --git a/types/event_bus.go b/types/event_bus.go index 4edaea58..3e16500b 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -74,15 +74,15 @@ func (b *EventBus) Publish(eventType string, eventData TMEventData) error { //--- block, tx, and vote events func (b *EventBus) PublishEventNewBlock(event EventDataNewBlock) error { - return b.Publish(EventNewBlock, TMEventData{event}) + return b.Publish(EventNewBlock, event) } func (b *EventBus) PublishEventNewBlockHeader(event EventDataNewBlockHeader) error { - return b.Publish(EventNewBlockHeader, TMEventData{event}) + return b.Publish(EventNewBlockHeader, event) } func (b *EventBus) PublishEventVote(event EventDataVote) error { - return b.Publish(EventVote, TMEventData{event}) + return b.Publish(EventVote, event) } // PublishEventTx publishes tx event with tags from Result. Note it will add @@ -114,50 +114,50 @@ func (b *EventBus) PublishEventTx(event EventDataTx) error { logIfTagExists(TxHeightKey, tags, b.Logger) tags[TxHeightKey] = event.Height - b.pubsub.PublishWithTags(ctx, TMEventData{event}, tags) + b.pubsub.PublishWithTags(ctx, event, tags) return nil } func (b *EventBus) PublishEventProposalHeartbeat(event EventDataProposalHeartbeat) error { - return b.Publish(EventProposalHeartbeat, TMEventData{event}) + return b.Publish(EventProposalHeartbeat, event) } //--- EventDataRoundState events func (b *EventBus) PublishEventNewRoundStep(event EventDataRoundState) error { - return b.Publish(EventNewRoundStep, TMEventData{event}) + return b.Publish(EventNewRoundStep, event) } func (b *EventBus) PublishEventTimeoutPropose(event EventDataRoundState) error { - return b.Publish(EventTimeoutPropose, TMEventData{event}) + return b.Publish(EventTimeoutPropose, event) } func (b *EventBus) PublishEventTimeoutWait(event EventDataRoundState) error { - return b.Publish(EventTimeoutWait, TMEventData{event}) + return b.Publish(EventTimeoutWait, event) } func (b *EventBus) PublishEventNewRound(event EventDataRoundState) error { - return b.Publish(EventNewRound, TMEventData{event}) + return b.Publish(EventNewRound, event) } func (b *EventBus) PublishEventCompleteProposal(event EventDataRoundState) error { - return b.Publish(EventCompleteProposal, TMEventData{event}) + return b.Publish(EventCompleteProposal, event) } func (b *EventBus) PublishEventPolka(event EventDataRoundState) error { - return b.Publish(EventPolka, TMEventData{event}) + return b.Publish(EventPolka, event) } func (b *EventBus) PublishEventUnlock(event EventDataRoundState) error { - return b.Publish(EventUnlock, TMEventData{event}) + return b.Publish(EventUnlock, event) } func (b *EventBus) PublishEventRelock(event EventDataRoundState) error { - return b.Publish(EventRelock, TMEventData{event}) + return b.Publish(EventRelock, event) } func (b *EventBus) PublishEventLock(event EventDataRoundState) error { - return b.Publish(EventLock, TMEventData{event}) + return b.Publish(EventLock, event) } func logIfTagExists(tag string, tags map[string]interface{}, logger log.Logger) { diff --git a/types/event_bus_test.go b/types/event_bus_test.go index aa97092f..9002b531 100644 --- a/types/event_bus_test.go +++ b/types/event_bus_test.go @@ -73,7 +73,7 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes eventType = randEvent() } - eventBus.Publish(eventType, TMEventData{"Gamora"}) + eventBus.Publish(eventType, "Gamora") } } diff --git a/types/events.go b/types/events.go index d6f7b012..1323b65b 100644 --- a/types/events.go +++ b/types/events.go @@ -3,7 +3,7 @@ package types import ( "fmt" - "github.com/tendermint/go-wire/data" + "github.com/tendermint/go-amino" tmpubsub "github.com/tendermint/tmlibs/pubsub" tmquery "github.com/tendermint/tmlibs/pubsub/query" ) @@ -45,56 +45,20 @@ var ( ) // implements events.EventData -type TMEventDataInner interface { +type TMEventData interface { // empty interface } -type TMEventData struct { - TMEventDataInner `json:"unwrap"` +func RegisterEventDatas(cdc *amino.Codec) { + cdc.RegisterInterface((*TMEventData)(nil), nil) + cdc.RegisterConcrete(EventDataNewBlock{}, "tendermint/EventDataNameNewBlock", nil) + cdc.RegisterConcrete(EventDataNewBlockHeader{}, "tendermint/EventDataNameNewBlockHeader", nil) + cdc.RegisterConcrete(EventDataTx{}, "tendermint/EventDataNameTx", nil) + cdc.RegisterConcrete(EventDataRoundState{}, "tendermint/EventDataNameRoundState", nil) + cdc.RegisterConcrete(EventDataVote{}, "tendermint/EventDataNameVote", nil) + cdc.RegisterConcrete(EventDataProposalHeartbeat{}, "tendermint/EventDataNameProposalHeartbeat", nil) } -func (tmr TMEventData) MarshalJSON() ([]byte, error) { - return tmEventDataMapper.ToJSON(tmr.TMEventDataInner) -} - -func (tmr *TMEventData) UnmarshalJSON(data []byte) (err error) { - parsed, err := tmEventDataMapper.FromJSON(data) - if err == nil && parsed != nil { - tmr.TMEventDataInner = parsed.(TMEventDataInner) - } - return -} - -func (tmr TMEventData) Unwrap() TMEventDataInner { - tmrI := tmr.TMEventDataInner - for wrap, ok := tmrI.(TMEventData); ok; wrap, ok = tmrI.(TMEventData) { - tmrI = wrap.TMEventDataInner - } - return tmrI -} - -func (tmr TMEventData) Empty() bool { - return tmr.TMEventDataInner == nil -} - -const ( - EventDataTypeNewBlock = byte(0x01) - EventDataTypeFork = byte(0x02) - EventDataTypeTx = byte(0x03) - EventDataTypeNewBlockHeader = byte(0x04) - EventDataTypeRoundState = byte(0x11) - EventDataTypeVote = byte(0x12) - EventDataTypeProposalHeartbeat = byte(0x20) -) - -var tmEventDataMapper = data.NewMapper(TMEventData{}). - RegisterImplementation(EventDataNewBlock{}, EventDataNameNewBlock, EventDataTypeNewBlock). - RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader). - RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx). - RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState). - RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote). - RegisterImplementation(EventDataProposalHeartbeat{}, EventDataNameProposalHeartbeat, EventDataTypeProposalHeartbeat) - // Most event messages are basic types (a block, a transaction) // but some (an input to a call tx or a receive) are more exotic