diff --git a/node/node.go b/node/node.go index e7965171..ab06d48c 100644 --- a/node/node.go +++ b/node/node.go @@ -17,14 +17,12 @@ import ( "github.com/tendermint/go-p2p" "github.com/tendermint/go-rpc" "github.com/tendermint/go-rpc/server" - "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/rpc/core" - ctypes "github.com/tendermint/tendermint/rpc/core/types" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "github.com/tendermint/tmsp/example/golang" @@ -184,15 +182,6 @@ func (n *Node) StartRPC() (net.Listener, error) { listenAddr := config.GetString("rpc_laddr") - // register the result objects with wire - // so consumers of tendermint rpc will not have - // conflicts with their own rpc - wire.RegisterInterface( - struct{ rpctypes.Result }{}, - wire.ConcreteType{&events.EventResult{}, 0x1}, - wire.ConcreteType{&ctypes.TendermintResult{}, 0x2}, - ) - mux := http.NewServeMux() wm := rpcserver.NewWebsocketManager(core.Routes, n.evsw) mux.HandleFunc("/websocket", wm.WebsocketHandler) diff --git a/rpc/core/events.go b/rpc/core/events.go index cd9d31d3..0a3edd39 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -4,6 +4,7 @@ import ( "github.com/tendermint/go-events" "github.com/tendermint/go-rpc/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/types" ) func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { @@ -11,17 +12,14 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &events.EventResult{event, msg}, "")) + tmResult := ctypes.TMResult(&ctypes.ResultEvent{event, types.TMEventData(msg)}) + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &tmResult, "")) }) return &ctypes.ResultSubscribe{}, nil } func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) { log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { - // NOTE: EventSwitch callbacks must be nonblocking - // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &events.EventResult{event, msg}, "")) - }) + wsCtx.GetEventSwitch().RemoveListener(event) return &ctypes.ResultUnsubscribe{}, nil } diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 185625d0..8e9c252d 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -22,90 +22,90 @@ var Routes = map[string]*rpc.RPCFunc{ // subscribe/unsubscribe are reserved for websocket events. } -func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.TendermintResult, error) { +func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { if r, err := Subscribe(wsCtx, event); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.TendermintResult, error) { +func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { if r, err := Unsubscribe(wsCtx, event); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func StatusResult() (*ctypes.TendermintResult, error) { +func StatusResult() (ctypes.TMResult, error) { if r, err := Status(); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func NetInfoResult() (*ctypes.TendermintResult, error) { +func NetInfoResult() (ctypes.TMResult, error) { if r, err := NetInfo(); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func BlockchainInfoResult(min, max int) (*ctypes.TendermintResult, error) { +func BlockchainInfoResult(min, max int) (ctypes.TMResult, error) { if r, err := BlockchainInfo(min, max); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func GenesisResult() (*ctypes.TendermintResult, error) { +func GenesisResult() (ctypes.TMResult, error) { if r, err := Genesis(); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func GetBlockResult(height int) (*ctypes.TendermintResult, error) { +func GetBlockResult(height int) (ctypes.TMResult, error) { if r, err := GetBlock(height); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func ListValidatorsResult() (*ctypes.TendermintResult, error) { +func ListValidatorsResult() (ctypes.TMResult, error) { if r, err := ListValidators(); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func DumpConsensusStateResult() (*ctypes.TendermintResult, error) { +func DumpConsensusStateResult() (ctypes.TMResult, error) { if r, err := DumpConsensusState(); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func ListUnconfirmedTxsResult() (*ctypes.TendermintResult, error) { +func ListUnconfirmedTxsResult() (ctypes.TMResult, error) { if r, err := ListUnconfirmedTxs(); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func BroadcastTxResult(tx []byte) (*ctypes.TendermintResult, error) { +func BroadcastTxResult(tx []byte) (ctypes.TMResult, error) { if r, err := BroadcastTx(tx); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 7101f0aa..09b7a702 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -3,6 +3,7 @@ package core_types import ( "github.com/tendermint/go-crypto" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" ) @@ -66,6 +67,11 @@ type ResultSubscribe struct { type ResultUnsubscribe struct { } +type ResultEvent struct { + Name string `json:"name"` + Data types.TMEventData `json:"data"` +} + //---------------------------------------- // response & result types @@ -81,18 +87,16 @@ const ( ResultTypeListUnconfirmedTxs = byte(0x09) ResultTypeSubscribe = byte(0x0A) ResultTypeUnsubscribe = byte(0x0B) + ResultTypeEvent = byte(0x0C) ) -type TendermintResultInterface interface{} - -// NOTE: up to the application to register this as rpctypes.Result -type TendermintResult struct { - Result TendermintResultInterface +type TMResult interface { + rpctypes.Result } // for wire.readReflect var _ = wire.RegisterInterface( - struct{ TendermintResultInterface }{}, + struct{ TMResult }{}, wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, wire.ConcreteType{&ResultGetBlock{}, ResultTypeGetBlock}, @@ -104,4 +108,5 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultListUnconfirmedTxs{}, ResultTypeListUnconfirmedTxs}, wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, + wire.ConcreteType{&ResultEvent{}, ResultTypeEvent}, ) diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index 00f8c2b0..d10f8c72 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -13,23 +13,26 @@ import ( // Test the HTTP client func TestURIStatus(t *testing.T) { - result, err := clientURI.Call("status", map[string]interface{}{}) + tmResult := new(ctypes.TMResult) + _, err := clientURI.Call("status", map[string]interface{}{}, tmResult) if err != nil { t.Fatal(err) } - testStatus(t, result) + testStatus(t, tmResult) } func TestJSONStatus(t *testing.T) { - result, err := clientJSON.Call("status", []interface{}{}) + tmResult := new(ctypes.TMResult) + _, err := clientJSON.Call("status", []interface{}{}, tmResult) if err != nil { t.Fatal(err) } - testStatus(t, result) + testStatus(t, tmResult) } -func testStatus(t *testing.T, result interface{}) { - status := result.(*ctypes.TendermintResult).Result.(*ctypes.ResultStatus) +func testStatus(t *testing.T, statusI interface{}) { + tmRes := statusI.(*ctypes.TMResult) + status := (*tmRes).(*ctypes.ResultStatus) if status.NodeInfo.Network != chainID { t.Fatal(fmt.Errorf("ChainID mismatch: got %s expected %s", status.NodeInfo.Network, chainID)) diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 3932ff6a..dfccdb1e 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -1,6 +1,7 @@ package rpctest import ( + "encoding/json" "fmt" "net/http" "testing" @@ -11,11 +12,11 @@ import ( "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" - "github.com/tendermint/go-events" client "github.com/tendermint/go-rpc/client" "github.com/tendermint/go-rpc/types" _ "github.com/tendermint/tendermint/config/tendermint_test" nm "github.com/tendermint/tendermint/node" + ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" ) @@ -133,14 +134,24 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou // if the event id isnt what we're waiting on // ignore it var response rpctypes.RPCResponse - var err error - wire.ReadJSON(&response, p, &err) + if err := json.Unmarshal(p, &response); err != nil { + errCh <- err + break + } + if response.Error != "" { + errCh <- fmt.Errorf(response.Error) + break + } + + result := new(ctypes.TMResult) + fmt.Println("RESULT:", string(*response.Result)) + wire.ReadJSONPtr(result, *response.Result, &err) if err != nil { errCh <- err break } - event, ok := response.Result.(*events.EventResult) - if ok && event.Event == eventid { + event, ok := (*result).(*ctypes.ResultEvent) + if ok && event.Name == eventid { goodCh <- p break } @@ -186,14 +197,18 @@ func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { // unmarshall and assert somethings var response rpctypes.RPCResponse var err error - wire.ReadJSON(&response, b, &err) - if err != nil { + if err := json.Unmarshal(b, &response); err != nil { return nil, err } if response.Error != "" { return nil, fmt.Errorf(response.Error) } - block := response.Result.(*events.EventResult).Data.(types.EventDataNewBlock).Block + var result ctypes.TMResult + wire.ReadJSONPtr(&result, *response.Result, &err) + if err != nil { + return nil, err + } + block := result.(*ctypes.ResultEvent).Data.(types.EventDataNewBlock).Block return block, nil } diff --git a/types/events.go b/types/events.go index 0bcd2fcc..26f29f41 100644 --- a/types/events.go +++ b/types/events.go @@ -1,7 +1,7 @@ package types import ( - // for registering EventData + // for registering TMEventData as events.EventData "github.com/tendermint/go-events" "github.com/tendermint/go-wire" ) @@ -30,6 +30,12 @@ func EventStringApp() string { return "App" } //---------------------------------------- +// implements events.EventData +type TMEventData interface { + events.EventData + // AssertIsTMEventData() +} + const ( EventDataTypeNewBlock = byte(0x01) EventDataTypeFork = byte(0x02) @@ -41,7 +47,7 @@ const ( ) var _ = wire.RegisterInterface( - struct{ events.EventData }{}, + struct{ TMEventData }{}, wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock}, // wire.ConcreteType{EventDataFork{}, EventDataTypeFork }, wire.ConcreteType{EventDataTx{}, EventDataTypeTx}, @@ -92,8 +98,8 @@ type EventDataVote struct { Vote *Vote } -func (_ EventDataNewBlock) AssertIsEventData() {} -func (_ EventDataTx) AssertIsEventData() {} -func (_ EventDataApp) AssertIsEventData() {} -func (_ EventDataRoundState) AssertIsEventData() {} -func (_ EventDataVote) AssertIsEventData() {} +func (_ EventDataNewBlock) AssertIsTMEventData() {} +func (_ EventDataTx) AssertIsTMEventData() {} +func (_ EventDataApp) AssertIsTMEventData() {} +func (_ EventDataRoundState) AssertIsTMEventData() {} +func (_ EventDataVote) AssertIsTMEventData() {}