From 96547d0ca846b1cd784033d534c46911bba1f745 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 10 Jan 2016 16:33:52 -0500 Subject: [PATCH 1/8] ws fixes; rpc tests --- config/tendermint_test/config.go | 69 ++++------ node/node.go | 23 +++- rpc/client/http_client.go | 108 ++++++++++++++-- rpc/core/events.go | 4 +- rpc/server/handlers.go | 2 +- rpc/test/client_test.go | 143 +++++++++++++++++++++ rpc/test/config.go | 18 +++ rpc/test/helpers.go | 212 +++++++++++++++++++++++++++++++ 8 files changed, 515 insertions(+), 64 deletions(-) create mode 100644 rpc/test/client_test.go create mode 100644 rpc/test/config.go create mode 100644 rpc/test/helpers.go diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 024a5c97..9820ba24 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -30,6 +30,7 @@ func initTMRoot(rootDir string) { configFilePath := path.Join(rootDir, "config.toml") genesisFilePath := path.Join(rootDir, "genesis.json") + privFilePath := path.Join(rootDir, "priv_validator.json") // Write default config file if missing. if !FileExists(configFilePath) { @@ -40,6 +41,9 @@ func initTMRoot(rootDir string) { if !FileExists(genesisFilePath) { MustWriteFile(genesisFilePath, []byte(defaultGenesis), 0644) } + if !FileExists(privFilePath) { + MustWriteFile(privFilePath, []byte(defaultPrivValidator), 0644) + } } func GetConfig(rootDir string) cfg.Config { @@ -58,7 +62,7 @@ func GetConfig(rootDir string) cfg.Config { } mapConfig.SetDefault("chain_id", "tendermint_test") mapConfig.SetDefault("genesis_file", rootDir+"/genesis.json") - mapConfig.SetDefault("proxy_app", "tcp://127.0.0.1:36658") + mapConfig.SetDefault("proxy_app", "local") mapConfig.SetDefault("moniker", "anonymous") mapConfig.SetDefault("node_laddr", "0.0.0.0:36656") mapConfig.SetDefault("fast_sync", false) @@ -78,7 +82,7 @@ func GetConfig(rootDir string) cfg.Config { var defaultConfigTmpl = `# This is a TOML config file. # For more information, see https://github.com/toml-lang/toml -proxy_app = "tcp://127.0.0.1:36658" +proxy_app = "local" moniker = "__MONIKER__" node_laddr = "0.0.0.0:36656" seeds = "" @@ -93,50 +97,33 @@ func defaultConfig(moniker string) (defaultConfig string) { return } -// priv keys generated deterministically eg rpc/tests/helpers.go var defaultGenesis = `{ - "chain_id" : "tendermint_test", - "accounts": [ - { - "address": "E9B5D87313356465FAE33C406CE2C2979DE60BCB", - "amount": 200000000 - }, - { - "address": "DFE4AFFA4CEE17CD01CB9E061D77C3ECED29BD88", - "amount": 200000000 - }, - { - "address": "F60D30722E7B497FA532FB3207C3FB29C31B1992", - "amount": 200000000 - }, - { - "address": "336CB40A5EB92E496E19B74FDFF2BA017C877FD6", - "amount": 200000000 - }, - { - "address": "D218F0F439BF0384F6F5EF8D0F8B398D941BD1DC", - "amount": 200000000 - } - ], + "genesis_time": "0001-01-01T00:00:00.000Z", + "chain_id": "tendermint_test", "validators": [ { - "pub_key": [1, "583779C3BFA3F6C7E23C7D830A9C3D023A216B55079AD38BFED1207B94A19548"], - "amount": 1000000, - "unbond_to": [ - { - "address": "E9B5D87313356465FAE33C406CE2C2979DE60BCB", - "amount": 100000 - } - ] + "pub_key": [ + 1, + "3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8" + ], + "amount": 10, + "name": "" } - ] + ], + "app_hash": "" }` var defaultPrivValidator = `{ - "address": "1D7A91CB32F758A02EBB9BE1FB6F8DEE56F90D42", - "pub_key": [1,"06FBAC4E285285D1D91FCBC7E91C780ADA11516F67462340B3980CE2B94940E8"], - "priv_key": [1,"C453604BD6480D5538B4C6FD2E3E314B5BCE518D75ADE4DA3DA85AB8ADFD819606FBAC4E285285D1D91FCBC7E91C780ADA11516F67462340B3980CE2B94940E8"], - "last_height":0, - "last_round":0, - "last_step":0 + "address": "D028C9981F7A87F3093672BF0D5B0E2A1B3ED456", + "pub_key": [ + 1, + "3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8" + ], + "priv_key": [ + 1, + "27F82582AEFAE7AB151CFB01C48BB6C1A0DA78F9BDDA979A9F70A84D074EB07D3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8" + ], + "last_height": 0, + "last_round": 0, + "last_step": 0 }` diff --git a/node/node.go b/node/node.go index 077e11d3..09dc5886 100644 --- a/node/node.go +++ b/node/node.go @@ -24,6 +24,7 @@ import ( "github.com/tendermint/tendermint/rpc/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmsp/example/golang" ) import _ "net/http/pprof" @@ -320,14 +321,22 @@ func getState() *sm.State { // Get a connection to the proxyAppConn addr. // Check the current hash, and panic if it doesn't match. -func getProxyApp(addr string, hash []byte) proxy.AppConn { - proxyConn, err := Connect(addr) - if err != nil { - Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) - } - proxyAppConn := proxy.NewRemoteAppConn(proxyConn, 1024) +func getProxyApp(addr string, hash []byte) (proxyAppCtx proxy.AppContext) { + // use local app (for testing) + if addr == "local" { + app := example.NewCounterApplication(true) + appCtx := app.Open() + proxyAppCtx = proxy.NewLocalAppContext(appCtx) + } else { + proxyConn, err := Connect(addr) + if err != nil { + Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) + } + proxyAppCtx := proxy.NewRemoteAppContext(proxyConn, 1024) - proxyAppConn.Start() + proxyAppCtx.Start() + + } // Check the hash currentHash, err := proxyAppConn.GetHashSync() diff --git a/rpc/client/http_client.go b/rpc/client/http_client.go index 6cc275d0..9ff72629 100644 --- a/rpc/client/http_client.go +++ b/rpc/client/http_client.go @@ -2,17 +2,47 @@ package rpcclient import ( "bytes" - "encoding/json" "errors" "io/ioutil" "net/http" + "net/url" + "strings" . "github.com/tendermint/go-common" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/rpc/types" ) -func CallHTTP(remote string, method string, params []interface{}, dest interface{}) (interface{}, error) { +// JSON rpc takes params as a slice +type ClientJSONRPC struct { + remote string +} + +func NewClientJSONRPC(remote string) *ClientJSONRPC { + return &ClientJSONRPC{remote} +} + +func (c *ClientJSONRPC) Call(method string, params []interface{}) (interface{}, error) { + return CallHTTP_JSONRPC(c.remote, method, params) +} + +// URI takes params as a map +type ClientURI struct { + remote string +} + +func NewClientURI(remote string) *ClientURI { + if !strings.HasSuffix(remote, "/") { + remote = remote + "/" + } + return &ClientURI{remote} +} + +func (c *ClientURI) Call(method string, params map[string]interface{}) (interface{}, error) { + return CallHTTP_URI(c.remote, method, params) +} + +func CallHTTP_JSONRPC(remote string, method string, params []interface{}) (interface{}, error) { // Make request and get responseBytes request := rpctypes.RPCRequest{ JSONRPC: "2.0", @@ -25,27 +55,79 @@ func CallHTTP(remote string, method string, params []interface{}, dest interface log.Info(Fmt("RPC request to %v: %v", remote, string(requestBytes))) httpResponse, err := http.Post(remote, "text/json", requestBuf) if err != nil { - return dest, err + return nil, err } defer httpResponse.Body.Close() responseBytes, err := ioutil.ReadAll(httpResponse.Body) if err != nil { - return dest, err + return nil, err } log.Info(Fmt("RPC response: %v", string(responseBytes))) + return unmarshalResponseBytes(responseBytes) +} - // Parse response into JSONResponse - response := rpctypes.RPCResponse{} - err = json.Unmarshal(responseBytes, &response) +func CallHTTP_URI(remote string, method string, params map[string]interface{}) (interface{}, error) { + values, err := argsToURLValues(params) if err != nil { - return dest, err + return nil, err + } + log.Info(Fmt("URI request to %v: %v", remote, values)) + resp, err := http.PostForm(remote+method, values) + if err != nil { + return nil, err + } + defer resp.Body.Close() + responseBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + return unmarshalResponseBytes(responseBytes) +} + +//------------------------------------------------ + +func unmarshalResponseBytes(responseBytes []byte) (interface{}, error) { + // read response + // if rpc/core/types is imported, the result will unmarshal + // into the correct type + var err error + response := &rpctypes.RPCResponse{} + wire.ReadJSON(response, responseBytes, &err) + if err != nil { + return nil, err } - // Parse response into dest - resultJSONObject := response.Result errorStr := response.Error if errorStr != "" { - return dest, errors.New(errorStr) + return nil, errors.New(errorStr) } - dest = wire.ReadJSONObject(dest, resultJSONObject, &err) - return dest, err + return response.Result, err +} + +func argsToURLValues(args map[string]interface{}) (url.Values, error) { + values := make(url.Values) + if len(args) == 0 { + return values, nil + } + err := argsToJson(args) + if err != nil { + return nil, err + } + for key, val := range args { + values.Set(key, val.(string)) + } + return values, nil +} + +func argsToJson(args map[string]interface{}) error { + var n int + var err error + for k, v := range args { + buf := new(bytes.Buffer) + wire.WriteJSON(v, buf, &n, &err) + if err != nil { + return err + } + args[k] = buf.String() + } + return nil } diff --git a/rpc/core/events.go b/rpc/core/events.go index 8907e0bb..83092fd9 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -11,7 +11,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) }) return &ctypes.ResultSubscribe{}, nil } @@ -21,7 +21,7 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsub wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) }) return &ctypes.ResultUnsubscribe{}, nil } diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index a28277f7..923544be 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -153,7 +153,7 @@ func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, // Same as above, but with the first param the websocket connection func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) { - if len(rpcFunc.argNames)-1 != len(params) { + if len(rpcFunc.argNames) != len(params) { return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params)) } diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go new file mode 100644 index 00000000..ff04ce27 --- /dev/null +++ b/rpc/test/client_test.go @@ -0,0 +1,143 @@ +package rpctest + +import ( + "fmt" + "testing" + + _ "github.com/tendermint/tendermint/config/tendermint_test" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/types" +) + +//-------------------------------------------------------------------------------- +// Test the HTTP client + +func TestURIStatus(t *testing.T) { + result, err := clientURI.Call("status", map[string]interface{}{}) + if err != nil { + t.Fatal(err) + } + testStatus(t, result) +} + +func TestJSONStatus(t *testing.T) { + result, err := clientJSON.Call("status", []interface{}{}) + if err != nil { + t.Fatal(err) + } + testStatus(t, result) +} + +func testStatus(t *testing.T, result interface{}) { + status := result.(*ctypes.ResultStatus) + if status.NodeInfo.Network != chainID { + t.Fatal(fmt.Errorf("ChainID mismatch: got %s expected %s", + status.NodeInfo.Network, chainID)) + } +} + +/*func TestURIBroadcastTx(t *testing.T) { + testBroadcastTx(t, "HTTP") +}*/ + +/*func TestJSONBroadcastTx(t *testing.T) { + testBroadcastTx(t, "JSONRPC") +}*/ + +// TODO +/* +func testBroadcastTx(t *testing.T, typ string) { + amt := int64(100) + toAddr := user[1].Address + tx := makeDefaultSendTxSigned(t, typ, toAddr, amt) + receipt := broadcastTx(t, typ, tx) + if receipt.CreatesContract > 0 { + t.Fatal("This tx does not create a contract") + } + if len(receipt.TxHash) == 0 { + t.Fatal("Failed to compute tx hash") + } + pool := node.MempoolReactor().Mempool + txs := pool.GetProposalTxs() + if len(txs) != mempoolCount { + t.Fatalf("The mem pool has %d txs. Expected %d", len(txs), mempoolCount) + } + tx2 := txs[mempoolCount-1].(*types.SendTx) + n, err := new(int64), new(error) + buf1, buf2 := new(bytes.Buffer), new(bytes.Buffer) + tx.WriteSignBytes(chainID, buf1, n, err) + tx2.WriteSignBytes(chainID, buf2, n, err) + if bytes.Compare(buf1.Bytes(), buf2.Bytes()) != 0 { + t.Fatal("inconsistent hashes for mempool tx and sent tx") + } +}*/ + +//-------------------------------------------------------------------------------- +// Test the websocket service + +var wsTyp = "JSONRPC" + +// make a simple connection to the server +func TestWSConnect(t *testing.T) { + con := newWSCon(t) + con.Close() +} + +// receive a new block message +func TestWSNewBlock(t *testing.T) { + con := newWSCon(t) + eid := types.EventStringNewBlock() + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { + fmt.Println("Check:", string(b)) + return nil + }) +} + +// receive a few new block messages in a row, with increasing height +func TestWSBlockchainGrowth(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + con := newWSCon(t) + eid := types.EventStringNewBlock() + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + // listen for NewBlock, ensure height increases by 1 + unmarshalValidateBlockchain(t, con, eid) +} + +/* TODO: this with dummy app.. +func TestWSDoubleFire(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + con := newWSCon(t) + eid := types.EventStringAccInput(user[0].Address) + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + amt := int64(100) + toAddr := user[1].Address + // broadcast the transaction, wait to hear about it + waitForEvent(t, con, eid, true, func() { + tx := makeDefaultSendTxSigned(t, wsTyp, toAddr, amt) + broadcastTx(t, wsTyp, tx) + }, func(eid string, b []byte) error { + return nil + }) + // but make sure we don't hear about it twice + waitForEvent(t, con, eid, false, func() { + }, func(eid string, b []byte) error { + return nil + }) +}*/ diff --git a/rpc/test/config.go b/rpc/test/config.go new file mode 100644 index 00000000..78769097 --- /dev/null +++ b/rpc/test/config.go @@ -0,0 +1,18 @@ +package rpctest + +import ( + cfg "github.com/tendermint/tendermint/config" + tmcfg "github.com/tendermint/tendermint/config/tendermint_test" +) + +var config cfg.Config = nil + +func initConfig() { + + cfg.OnConfig(func(newConfig cfg.Config) { + config = newConfig + }) + + c := tmcfg.GetConfig("") + cfg.ApplyConfig(c) // Notify modules of new config +} diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go new file mode 100644 index 00000000..a417cbea --- /dev/null +++ b/rpc/test/helpers.go @@ -0,0 +1,212 @@ +package rpctest + +import ( + "fmt" + "net/http" + "testing" + "time" + + "github.com/gorilla/websocket" + . "github.com/tendermint/go-common" + "github.com/tendermint/go-p2p" + "github.com/tendermint/go-wire" + + _ "github.com/tendermint/tendermint/config/tendermint_test" + nm "github.com/tendermint/tendermint/node" + client "github.com/tendermint/tendermint/rpc/client" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/rpc/types" + "github.com/tendermint/tendermint/types" +) + +// global variables for use across all tests +var ( + rpcAddr = "127.0.0.1:36657" // Not 46657 + requestAddr = "http://" + rpcAddr + websocketAddr = "ws://" + rpcAddr + "/websocket" + + node *nm.Node + + mempoolCount = 0 + + chainID string + + clientURI = client.NewClientURI(requestAddr) + clientJSON = client.NewClientJSONRPC(requestAddr) +) + +// create a new node and sleep forever +func newNode(ready chan struct{}) { + // Create & start node + node = nm.NewNode() + l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), true) + node.AddListener(l) + node.Start() + + // Run the RPC server. + node.StartRPC() + ready <- struct{}{} + + // Sleep forever + ch := make(chan struct{}) + <-ch +} + +// initialize config and create new node +func init() { + initConfig() + chainID = config.GetString("chain_id") + + // TODO: change consensus/state.go timeouts to be shorter + + // start a node + ready := make(chan struct{}) + go newNode(ready) + <-ready +} + +//-------------------------------------------------------------------------------- +// Utilities for testing the websocket service + +// create a new connection +func newWSCon(t *testing.T) *websocket.Conn { + dialer := websocket.DefaultDialer + rHeader := http.Header{} + con, r, err := dialer.Dial(websocketAddr, rHeader) + fmt.Println("response", r) + if err != nil { + t.Fatal(err) + } + return con +} + +// subscribe to an event +func subscribe(t *testing.T, con *websocket.Conn, eventid string) { + err := con.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "subscribe", + Params: []interface{}{eventid}, + }) + if err != nil { + t.Fatal(err) + } +} + +// unsubscribe from an event +func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) { + err := con.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "unsubscribe", + Params: []interface{}{eventid}, + }) + if err != nil { + t.Fatal(err) + } +} + +// wait for an event; do things that might trigger events, and check them when they are received +// the check function takes an event id and the byte slice read off the ws +func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeout bool, f func(), check func(string, []byte) error) { + // go routine to wait for webscoket msg + goodCh := make(chan []byte) + errCh := make(chan error) + quitCh := make(chan struct{}) + defer close(quitCh) + + // Read message + go func() { + for { + _, p, err := con.ReadMessage() + if err != nil { + errCh <- err + break + } else { + // if the event id isnt what we're waiting on + // ignore it + var response rpctypes.RPCResponse + var err error + wire.ReadJSON(&response, p, &err) + if err != nil { + errCh <- err + break + } + event, ok := response.Result.(*ctypes.ResultEvent) + if ok && event.Event == eventid { + goodCh <- p + break + } + } + } + }() + + // do stuff (transactions) + f() + + // wait for an event or timeout + timeout := time.NewTimer(10 * time.Second) + select { + case <-timeout.C: + if dieOnTimeout { + con.Close() + t.Fatalf("%s event was not received in time", eventid) + } + // else that's great, we didn't hear the event + // and we shouldn't have + case p := <-goodCh: + if dieOnTimeout { + // message was received and expected + // run the check + err := check(eventid, p) + if err != nil { + t.Fatal(err) + panic(err) // Show the stack trace. + } + } else { + con.Close() + t.Fatalf("%s event was not expected", eventid) + } + case err := <-errCh: + t.Fatal(err) + panic(err) // Show the stack trace. + } +} + +//-------------------------------------------------------------------------------- + +func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { + // unmarshall and assert somethings + var response rpctypes.RPCResponse + var err error + wire.ReadJSON(&response, b, &err) + if err != nil { + return nil, err + } + if response.Error != "" { + return nil, fmt.Errorf(response.Error) + } + block := response.Result.(*ctypes.ResultEvent).Data.(types.EventDataNewBlock).Block + return block, nil +} + +func unmarshalValidateBlockchain(t *testing.T, con *websocket.Conn, eid string) { + var initBlockN int + for i := 0; i < 2; i++ { + waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { + block, err := unmarshalResponseNewBlock(b) + if err != nil { + return err + } + if i == 0 { + initBlockN = block.Header.Height + } else { + if block.Header.Height != initBlockN+i { + return fmt.Errorf("Expected block %d, got block %d", i, block.Header.Height) + } + } + + return nil + }) + } +} From 0be13d1d2774eacdfe469746f22889cfa6b68317 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 12 Jan 2016 16:54:27 -0500 Subject: [PATCH 2/8] move alert, events, rpc into own repos --- alert/alert.go | 64 ----- alert/config.go | 14 - alert/email.go | 176 ------------ alert/log.go | 7 - benchmarks/simu/counter.go | 4 +- blockchain/reactor.go | 2 +- consensus/common_test.go | 2 +- consensus/reactor.go | 6 +- consensus/state.go | 2 +- consensus/state_test.go | 2 +- events/event_cache.go | 45 --- events/events.go | 215 -------------- events/log.go | 7 - mempool/reactor.go | 2 +- node/node.go | 12 +- node/node_test.go | 18 -- rpc/client/http_client.go | 133 --------- rpc/client/log.go | 7 - rpc/client/ws_client.go | 116 -------- rpc/core/events.go | 8 +- rpc/core/routes.go | 2 +- rpc/core/types/responses.go | 8 +- rpc/server/handlers.go | 553 ------------------------------------ rpc/server/http_params.go | 89 ------ rpc/server/http_server.go | 115 -------- rpc/server/log.go | 7 - rpc/test/helpers.go | 49 ++-- rpc/types/types.go | 71 ----- rpc/version.go | 3 - state/state.go | 1 + types/events.go | 8 +- 31 files changed, 57 insertions(+), 1691 deletions(-) delete mode 100644 alert/alert.go delete mode 100644 alert/config.go delete mode 100644 alert/email.go delete mode 100644 alert/log.go delete mode 100644 events/event_cache.go delete mode 100644 events/events.go delete mode 100644 events/log.go delete mode 100644 rpc/client/http_client.go delete mode 100644 rpc/client/log.go delete mode 100644 rpc/client/ws_client.go delete mode 100644 rpc/server/handlers.go delete mode 100644 rpc/server/http_params.go delete mode 100644 rpc/server/http_server.go delete mode 100644 rpc/server/log.go delete mode 100644 rpc/types/types.go delete mode 100644 rpc/version.go diff --git a/alert/alert.go b/alert/alert.go deleted file mode 100644 index c4763309..00000000 --- a/alert/alert.go +++ /dev/null @@ -1,64 +0,0 @@ -package alert - -import ( - "fmt" - "time" - - "github.com/sfreiberg/gotwilio" -) - -var lastAlertUnix int64 = 0 -var alertCountSince int = 0 - -// Sends a critical alert message to administrators. -func Alert(message string) { - log.Error(" ALERT \n" + message) - now := time.Now().Unix() - if now-lastAlertUnix > int64(config.GetInt("alert_min_interval")) { - message = fmt.Sprintf("%v:%v", config.GetString("chain_id"), message) - if alertCountSince > 0 { - message = fmt.Sprintf("%v (+%v more since)", message, alertCountSince) - alertCountSince = 0 - } - if len(config.GetString("alert_twilio_sid")) > 0 { - go sendTwilio(message) - } - if len(config.GetString("alert_email_recipients")) > 0 { - go sendEmail(message) - } - } else { - alertCountSince++ - } -} - -func sendTwilio(message string) { - defer func() { - if err := recover(); err != nil { - log.Error("sendTwilio error", "error", err) - } - }() - if len(message) > 50 { - message = message[:50] - } - twilio := gotwilio.NewTwilioClient(config.GetString("alert_twilio_sid"), config.GetString("alert_twilio_token")) - res, exp, err := twilio.SendSMS(config.GetString("alert_twilio_from"), config.GetString("alert_twilio_to"), message, "", "") - if exp != nil || err != nil { - log.Error("sendTwilio error", "res", res, "exp", exp, "error", err) - } -} - -func sendEmail(message string) { - defer func() { - if err := recover(); err != nil { - log.Error("sendEmail error", "error", err) - } - }() - subject := message - if len(subject) > 80 { - subject = subject[:80] - } - err := SendEmail(subject, message, config.GetStringSlice("alert_email_recipients")) - if err != nil { - log.Error("sendEmail error", "error", err, "message", message) - } -} diff --git a/alert/config.go b/alert/config.go deleted file mode 100644 index 4a7bf771..00000000 --- a/alert/config.go +++ /dev/null @@ -1,14 +0,0 @@ - -package alert - -import ( - cfg "github.com/tendermint/go-config" -) - -var config cfg.Config = nil - -func init() { - cfg.OnConfig(func(newConfig cfg.Config) { - config = newConfig - }) -} diff --git a/alert/email.go b/alert/email.go deleted file mode 100644 index c183f1d5..00000000 --- a/alert/email.go +++ /dev/null @@ -1,176 +0,0 @@ -// Forked from github.com/SlyMarbo/gmail -package alert - -import ( - "bytes" - "crypto/tls" - "encoding/base64" - "errors" - "fmt" - "io/ioutil" - "net/smtp" - "path/filepath" - "regexp" - "strings" -) - -// Convenience function -func SendEmail(subject, body string, tos []string) error { - email := Compose(subject, body) - email.From = config.GetString("smtp_user") - email.ContentType = "text/html; charset=utf-8" - email.AddRecipients(tos...) - err := email.Send() - return err -} - -// Email represents a single message, which may contain -// attachments. -type Email struct { - From string - To []string - Subject string - ContentType string - Body string - Attachments map[string][]byte -} - -// Compose begins a new email, filling the subject and body, -// and allocating memory for the list of recipients and the -// attachments. -func Compose(Subject, Body string) *Email { - out := new(Email) - out.To = make([]string, 0, 1) - out.Subject = Subject - out.Body = Body - out.Attachments = make(map[string][]byte) - return out -} - -// Attach takes a filename and adds this to the message. -// Note that since only the filename is stored (and not -// its path, for privacy reasons), multiple files in -// different directories but with the same filename and -// extension cannot be sent. -func (e *Email) Attach(Filename string) error { - b, err := ioutil.ReadFile(Filename) - if err != nil { - return err - } - - _, fname := filepath.Split(Filename) - e.Attachments[fname] = b - return nil -} - -// AddRecipient adds a single recipient. -func (e *Email) AddRecipient(Recipient string) { - e.To = append(e.To, Recipient) -} - -// AddRecipients adds one or more recipients. -func (e *Email) AddRecipients(Recipients ...string) { - e.To = append(e.To, Recipients...) -} - -// Send sends the email, returning any error encountered. -func (e *Email) Send() error { - if e.From == "" { - return errors.New("Error: No sender specified. Please set the Email.From field.") - } - if e.To == nil || len(e.To) == 0 { - return errors.New("Error: No recipient specified. Please set the Email.To field.") - } - - auth := smtp.PlainAuth( - "", - config.GetString("smtp_user"), - config.GetString("smtp_password"), - config.GetString("smtp_host"), - ) - - conn, err := smtp.Dial(fmt.Sprintf("%v:%v", config.GetString("smtp_host"), config.GetString("smtp_port"))) - if err != nil { - return err - } - - err = conn.StartTLS(&tls.Config{}) - if err != nil { - return err - } - - err = conn.Auth(auth) - if err != nil { - return err - } - - err = conn.Mail(e.From) - if err != nil { - if strings.Contains(err.Error(), "530 5.5.1") { - return errors.New("Error: Authentication failure. Your username or password is incorrect.") - } - return err - } - - for _, recipient := range e.To { - err = conn.Rcpt(recipient) - if err != nil { - return err - } - } - - wc, err := conn.Data() - if err != nil { - return err - } - defer wc.Close() - _, err = wc.Write(e.Bytes()) - if err != nil { - return err - } - - return nil -} - -func (e *Email) Bytes() []byte { - buf := bytes.NewBuffer(nil) - - var subject = e.Subject - subject = regexp.MustCompile("\n+").ReplaceAllString(subject, " ") - subject = regexp.MustCompile(" +").ReplaceAllString(subject, " ") - buf.WriteString("Subject: " + subject + "\n") - buf.WriteString("To: <" + strings.Join(e.To, ">,<") + ">\n") - buf.WriteString("MIME-Version: 1.0\n") - - // Boundary is used by MIME to separate files. - boundary := "f46d043c813270fc6b04c2d223da" - - if len(e.Attachments) > 0 { - buf.WriteString("Content-Type: multipart/mixed; boundary=" + boundary + "\n") - buf.WriteString("--" + boundary + "\n") - } - - if e.ContentType == "" { - e.ContentType = "text/plain; charset=utf-8" - } - buf.WriteString(fmt.Sprintf("Content-Type: %s\n\n", e.ContentType)) - buf.WriteString(e.Body) - - if len(e.Attachments) > 0 { - for k, v := range e.Attachments { - buf.WriteString("\n\n--" + boundary + "\n") - buf.WriteString("Content-Type: application/octet-stream\n") - buf.WriteString("Content-Transfer-Encoding: base64\n") - buf.WriteString("Content-Disposition: attachment; filename=\"" + k + "\"\n\n") - - b := make([]byte, base64.StdEncoding.EncodedLen(len(v))) - base64.StdEncoding.Encode(b, v) - buf.Write(b) - buf.WriteString("\n--" + boundary) - } - - buf.WriteString("--") - } - - return buf.Bytes() -} diff --git a/alert/log.go b/alert/log.go deleted file mode 100644 index 58055cf2..00000000 --- a/alert/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package alert - -import ( - "github.com/tendermint/go-logger" -) - -var log = logger.New("module", "alert") diff --git a/benchmarks/simu/counter.go b/benchmarks/simu/counter.go index 307f29d0..44e00a0c 100644 --- a/benchmarks/simu/counter.go +++ b/benchmarks/simu/counter.go @@ -8,10 +8,10 @@ import ( "github.com/gorilla/websocket" . "github.com/tendermint/go-common" + "github.com/tendermint/go-rpc/client" + "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/client" _ "github.com/tendermint/tendermint/rpc/core/types" // Register RPCResponse > Result types - "github.com/tendermint/tendermint/rpc/types" ) func main() { diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 7bcd1cb8..06168903 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -10,7 +10,7 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" diff --git a/consensus/common_test.go b/consensus/common_test.go index 9f7bc669..7effed68 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -11,7 +11,7 @@ import ( dbm "github.com/tendermint/go-db" bc "github.com/tendermint/tendermint/blockchain" _ "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" diff --git a/consensus/reactor.go b/consensus/reactor.go index f0e34cc4..d9f36d92 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -9,10 +9,10 @@ import ( "time" . "github.com/tendermint/go-common" + "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" - "github.com/tendermint/tendermint/events" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -234,12 +234,12 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { // broadcasting the result to peers func (conR *ConsensusReactor) registerEventCallbacks() { - conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) { + conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data events.EventData) { rs := data.(*types.EventDataRoundState).RoundState().(*RoundState) conR.broadcastNewRoundStep(rs) }) - conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { + conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) { edv := data.(*types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote, edv.Index) }) diff --git a/consensus/state.go b/consensus/state.go index dd7ec555..8cee3dbd 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -9,9 +9,9 @@ import ( "time" . "github.com/tendermint/go-common" + "github.com/tendermint/go-events" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" - "github.com/tendermint/tendermint/events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" diff --git a/consensus/state_test.go b/consensus/state_test.go index fbe6a3db..da8d4c90 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -7,7 +7,7 @@ import ( "time" _ "github.com/tendermint/tendermint/config/tendermint_test" - //"github.com/tendermint/tendermint/events" + //"github.com/tendermint/go-events" "github.com/tendermint/tendermint/types" ) diff --git a/events/event_cache.go b/events/event_cache.go deleted file mode 100644 index d0109ae1..00000000 --- a/events/event_cache.go +++ /dev/null @@ -1,45 +0,0 @@ -package events - -import ( - "github.com/tendermint/tendermint/types" -) - -const ( - eventsBufferSize = 1000 -) - -// An EventCache buffers events for a Fireable -// All events are cached. Filtering happens on Flush -type EventCache struct { - evsw Fireable - events []eventInfo -} - -// Create a new EventCache with an EventSwitch as backend -func NewEventCache(evsw Fireable) *EventCache { - return &EventCache{ - evsw: evsw, - events: make([]eventInfo, eventsBufferSize), - } -} - -// a cached event -type eventInfo struct { - event string - data types.EventData -} - -// Cache an event to be fired upon finality. -func (evc *EventCache) FireEvent(event string, data types.EventData) { - // append to list - evc.events = append(evc.events, eventInfo{event, data}) -} - -// Fire events by running evsw.FireEvent on all cached events. Blocks. -// Clears cached events -func (evc *EventCache) Flush() { - for _, ei := range evc.events { - evc.evsw.FireEvent(ei.event, ei.data) - } - evc.events = make([]eventInfo, eventsBufferSize) -} diff --git a/events/events.go b/events/events.go deleted file mode 100644 index 647522d0..00000000 --- a/events/events.go +++ /dev/null @@ -1,215 +0,0 @@ -package events - -import ( - "sync" - - . "github.com/tendermint/go-common" - "github.com/tendermint/tendermint/types" -) - -// reactors and other modules should export -// this interface to become eventable -type Eventable interface { - SetEventSwitch(evsw *EventSwitch) -} - -// an event switch or cache implements fireable -type Fireable interface { - FireEvent(event string, data types.EventData) -} - -type EventSwitch struct { - BaseService - - mtx sync.RWMutex - eventCells map[string]*eventCell - listeners map[string]*eventListener -} - -func NewEventSwitch() *EventSwitch { - evsw := &EventSwitch{} - evsw.BaseService = *NewBaseService(log, "EventSwitch", evsw) - return evsw -} - -func (evsw *EventSwitch) OnStart() error { - evsw.BaseService.OnStart() - evsw.eventCells = make(map[string]*eventCell) - evsw.listeners = make(map[string]*eventListener) - return nil -} - -func (evsw *EventSwitch) OnStop() { - evsw.BaseService.OnStop() - evsw.eventCells = nil - evsw.listeners = nil -} - -func (evsw *EventSwitch) AddListenerForEvent(listenerID, event string, cb eventCallback) { - // Get/Create eventCell and listener - evsw.mtx.Lock() - eventCell := evsw.eventCells[event] - if eventCell == nil { - eventCell = newEventCell() - evsw.eventCells[event] = eventCell - } - listener := evsw.listeners[listenerID] - if listener == nil { - listener = newEventListener(listenerID) - evsw.listeners[listenerID] = listener - } - evsw.mtx.Unlock() - - // Add event and listener - eventCell.AddListener(listenerID, cb) - listener.AddEvent(event) -} - -func (evsw *EventSwitch) RemoveListener(listenerID string) { - // Get and remove listener - evsw.mtx.RLock() - listener := evsw.listeners[listenerID] - delete(evsw.listeners, listenerID) - evsw.mtx.RUnlock() - - if listener == nil { - return - } - - // Remove callback for each event. - listener.SetRemoved() - for _, event := range listener.GetEvents() { - evsw.RemoveListenerForEvent(event, listenerID) - } -} - -func (evsw *EventSwitch) RemoveListenerForEvent(event string, listenerID string) { - // Get eventCell - evsw.mtx.Lock() - eventCell := evsw.eventCells[event] - evsw.mtx.Unlock() - - if eventCell == nil { - return - } - - // Remove listenerID from eventCell - numListeners := eventCell.RemoveListener(listenerID) - - // Maybe garbage collect eventCell. - if numListeners == 0 { - // Lock again and double check. - evsw.mtx.Lock() // OUTER LOCK - eventCell.mtx.Lock() // INNER LOCK - if len(eventCell.listeners) == 0 { - delete(evsw.eventCells, event) - } - eventCell.mtx.Unlock() // INNER LOCK - evsw.mtx.Unlock() // OUTER LOCK - } -} - -func (evsw *EventSwitch) FireEvent(event string, data types.EventData) { - // Get the eventCell - evsw.mtx.RLock() - eventCell := evsw.eventCells[event] - evsw.mtx.RUnlock() - - if eventCell == nil { - return - } - - // Fire event for all listeners in eventCell - eventCell.FireEvent(data) -} - -func (evsw *EventSwitch) SubscribeToEvent(receiver, eventID string, chanCap int) chan interface{} { - // listen for new round - ch := make(chan interface{}, chanCap) - evsw.AddListenerForEvent(receiver, eventID, func(data types.EventData) { - // NOTE: in production, evsw callbacks should be nonblocking. - ch <- data - }) - return ch -} - -//----------------------------------------------------------------------------- - -// eventCell handles keeping track of listener callbacks for a given event. -type eventCell struct { - mtx sync.RWMutex - listeners map[string]eventCallback -} - -func newEventCell() *eventCell { - return &eventCell{ - listeners: make(map[string]eventCallback), - } -} - -func (cell *eventCell) AddListener(listenerID string, cb eventCallback) { - cell.mtx.Lock() - cell.listeners[listenerID] = cb - cell.mtx.Unlock() -} - -func (cell *eventCell) RemoveListener(listenerID string) int { - cell.mtx.Lock() - delete(cell.listeners, listenerID) - numListeners := len(cell.listeners) - cell.mtx.Unlock() - return numListeners -} - -func (cell *eventCell) FireEvent(data types.EventData) { - cell.mtx.RLock() - for _, listener := range cell.listeners { - listener(data) - } - cell.mtx.RUnlock() -} - -//----------------------------------------------------------------------------- - -type eventCallback func(data types.EventData) - -type eventListener struct { - id string - - mtx sync.RWMutex - removed bool - events []string -} - -func newEventListener(id string) *eventListener { - return &eventListener{ - id: id, - removed: false, - events: nil, - } -} - -func (evl *eventListener) AddEvent(event string) { - evl.mtx.Lock() - defer evl.mtx.Unlock() - - if evl.removed { - return - } - evl.events = append(evl.events, event) -} - -func (evl *eventListener) GetEvents() []string { - evl.mtx.RLock() - defer evl.mtx.RUnlock() - - events := make([]string, len(evl.events)) - copy(events, evl.events) - return events -} - -func (evl *eventListener) SetRemoved() { - evl.mtx.Lock() - defer evl.mtx.Unlock() - evl.removed = true -} diff --git a/events/log.go b/events/log.go deleted file mode 100644 index 52546229..00000000 --- a/events/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package events - -import ( - "github.com/tendermint/go-logger" -) - -var log = logger.New("module", "events") diff --git a/mempool/reactor.go b/mempool/reactor.go index a9de9726..51be9bea 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -10,7 +10,7 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" "github.com/tendermint/tendermint/types" ) diff --git a/node/node.go b/node/node.go index 09dc5886..124a8773 100644 --- a/node/node.go +++ b/node/node.go @@ -12,16 +12,16 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" + "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-rpc" + "github.com/tendermint/go-rpc/server" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" - "github.com/tendermint/tendermint/events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" - "github.com/tendermint/tendermint/rpc" "github.com/tendermint/tendermint/rpc/core" - "github.com/tendermint/tendermint/rpc/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "github.com/tendermint/tmsp/example/golang" @@ -332,10 +332,10 @@ func getProxyApp(addr string, hash []byte) (proxyAppCtx proxy.AppContext) { if err != nil { Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) } - proxyAppCtx := proxy.NewRemoteAppContext(proxyConn, 1024) - - proxyAppCtx.Start() + remoteApp := proxy.NewRemoteAppContext(proxyConn, 1024) + remoteApp.Start() + proxyAppCtx = remoteApp } // Check the hash diff --git a/node/node_test.go b/node/node_test.go index d171bdc1..4374e514 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -4,30 +4,12 @@ import ( "testing" "time" - . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" _ "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/tendermint/types" - "github.com/tendermint/tmsp/example/golang" - "github.com/tendermint/tmsp/server" ) func TestNodeStartStop(t *testing.T) { - // Start a dummy app - go func() { - _, err := server.StartListener(config.GetString("proxy_app"), example.NewDummyApplication()) - if err != nil { - Exit(err.Error()) - } - }() - // wait for the server - time.Sleep(time.Second * 2) - - // Get PrivValidator - privValidatorFile := config.GetString("priv_validator_file") - privValidator := types.LoadOrGenPrivValidator(privValidatorFile) - // Create & start node n := NewNode(privValidator) l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp")) diff --git a/rpc/client/http_client.go b/rpc/client/http_client.go deleted file mode 100644 index 9ff72629..00000000 --- a/rpc/client/http_client.go +++ /dev/null @@ -1,133 +0,0 @@ -package rpcclient - -import ( - "bytes" - "errors" - "io/ioutil" - "net/http" - "net/url" - "strings" - - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/types" -) - -// JSON rpc takes params as a slice -type ClientJSONRPC struct { - remote string -} - -func NewClientJSONRPC(remote string) *ClientJSONRPC { - return &ClientJSONRPC{remote} -} - -func (c *ClientJSONRPC) Call(method string, params []interface{}) (interface{}, error) { - return CallHTTP_JSONRPC(c.remote, method, params) -} - -// URI takes params as a map -type ClientURI struct { - remote string -} - -func NewClientURI(remote string) *ClientURI { - if !strings.HasSuffix(remote, "/") { - remote = remote + "/" - } - return &ClientURI{remote} -} - -func (c *ClientURI) Call(method string, params map[string]interface{}) (interface{}, error) { - return CallHTTP_URI(c.remote, method, params) -} - -func CallHTTP_JSONRPC(remote string, method string, params []interface{}) (interface{}, error) { - // Make request and get responseBytes - request := rpctypes.RPCRequest{ - JSONRPC: "2.0", - Method: method, - Params: params, - ID: "", - } - requestBytes := wire.JSONBytes(request) - requestBuf := bytes.NewBuffer(requestBytes) - log.Info(Fmt("RPC request to %v: %v", remote, string(requestBytes))) - httpResponse, err := http.Post(remote, "text/json", requestBuf) - if err != nil { - return nil, err - } - defer httpResponse.Body.Close() - responseBytes, err := ioutil.ReadAll(httpResponse.Body) - if err != nil { - return nil, err - } - log.Info(Fmt("RPC response: %v", string(responseBytes))) - return unmarshalResponseBytes(responseBytes) -} - -func CallHTTP_URI(remote string, method string, params map[string]interface{}) (interface{}, error) { - values, err := argsToURLValues(params) - if err != nil { - return nil, err - } - log.Info(Fmt("URI request to %v: %v", remote, values)) - resp, err := http.PostForm(remote+method, values) - if err != nil { - return nil, err - } - defer resp.Body.Close() - responseBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - return unmarshalResponseBytes(responseBytes) -} - -//------------------------------------------------ - -func unmarshalResponseBytes(responseBytes []byte) (interface{}, error) { - // read response - // if rpc/core/types is imported, the result will unmarshal - // into the correct type - var err error - response := &rpctypes.RPCResponse{} - wire.ReadJSON(response, responseBytes, &err) - if err != nil { - return nil, err - } - errorStr := response.Error - if errorStr != "" { - return nil, errors.New(errorStr) - } - return response.Result, err -} - -func argsToURLValues(args map[string]interface{}) (url.Values, error) { - values := make(url.Values) - if len(args) == 0 { - return values, nil - } - err := argsToJson(args) - if err != nil { - return nil, err - } - for key, val := range args { - values.Set(key, val.(string)) - } - return values, nil -} - -func argsToJson(args map[string]interface{}) error { - var n int - var err error - for k, v := range args { - buf := new(bytes.Buffer) - wire.WriteJSON(v, buf, &n, &err) - if err != nil { - return err - } - args[k] = buf.String() - } - return nil -} diff --git a/rpc/client/log.go b/rpc/client/log.go deleted file mode 100644 index 8b33e2f1..00000000 --- a/rpc/client/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package rpcclient - -import ( - "github.com/tendermint/log15" -) - -var log = log15.New("module", "rpcclient") diff --git a/rpc/client/ws_client.go b/rpc/client/ws_client.go deleted file mode 100644 index 8124b63b..00000000 --- a/rpc/client/ws_client.go +++ /dev/null @@ -1,116 +0,0 @@ -package rpcclient - -import ( - "net/http" - "time" - - "github.com/gorilla/websocket" - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/types" -) - -const ( - wsResultsChannelCapacity = 10 - wsWriteTimeoutSeconds = 10 -) - -type WSClient struct { - QuitService - Address string - *websocket.Conn - ResultsCh chan rpctypes.Result // closes upon WSClient.Stop() -} - -// create a new connection -func NewWSClient(addr string) *WSClient { - wsClient := &WSClient{ - Address: addr, - Conn: nil, - ResultsCh: make(chan rpctypes.Result, wsResultsChannelCapacity), - } - wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient) - return wsClient -} - -func (wsc *WSClient) OnStart() error { - wsc.QuitService.OnStart() - err := wsc.dial() - if err != nil { - return err - } - go wsc.receiveEventsRoutine() - return nil -} - -func (wsc *WSClient) dial() error { - // Dial - dialer := websocket.DefaultDialer - rHeader := http.Header{} - con, _, err := dialer.Dial(wsc.Address, rHeader) - if err != nil { - return err - } - // Set the ping/pong handlers - con.SetPingHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) - return nil - }) - con.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - return nil - }) - wsc.Conn = con - return nil -} - -func (wsc *WSClient) OnStop() { - wsc.QuitService.OnStop() - // ResultsCh is closed in receiveEventsRoutine. -} - -func (wsc *WSClient) receiveEventsRoutine() { - for { - _, data, err := wsc.ReadMessage() - if err != nil { - log.Info("WSClient failed to read message", "error", err, "data", string(data)) - wsc.Stop() - break - } else { - var response rpctypes.RPCResponse - wire.ReadJSON(&response, data, &err) - if err != nil { - log.Info("WSClient failed to parse message", "error", err) - wsc.Stop() - break - } - wsc.ResultsCh <- response.Result - } - } - - // Cleanup - close(wsc.ResultsCh) -} - -// subscribe to an event -func (wsc *WSClient) Subscribe(eventid string) error { - err := wsc.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "subscribe", - Params: []interface{}{eventid}, - }) - return err -} - -// unsubscribe from an event -func (wsc *WSClient) Unsubscribe(eventid string) error { - err := wsc.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "unsubscribe", - Params: []interface{}{eventid}, - }) - return err -} diff --git a/rpc/core/events.go b/rpc/core/events.go index 83092fd9..7365377d 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -1,14 +1,14 @@ package core import ( + "github.com/tendermint/go-events" + "github.com/tendermint/go-rpc/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" - "github.com/tendermint/tendermint/rpc/types" - "github.com/tendermint/tendermint/types" ) func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { + wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) @@ -18,7 +18,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) { log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { + wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 4b743336..765c03b2 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -1,7 +1,7 @@ package core import ( - rpc "github.com/tendermint/tendermint/rpc/server" + rpc "github.com/tendermint/go-rpc/server" ) // TODO: eliminate redundancy between here and reading code from core/ diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 7ca47240..ac6737cf 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -2,9 +2,10 @@ package core_types import ( "github.com/tendermint/go-crypto" + "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/rpc/types" "github.com/tendermint/tendermint/types" ) @@ -67,9 +68,10 @@ type ResultSubscribe struct { type ResultUnsubscribe struct { } +// TODO: something about this type ResultEvent struct { - Event string `json:"event"` - Data types.EventData `json:"data"` + Event string `json:"event"` + Data events.EventData `json:"data"` } //---------------------------------------- diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go deleted file mode 100644 index 923544be..00000000 --- a/rpc/server/handlers.go +++ /dev/null @@ -1,553 +0,0 @@ -package rpcserver - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "io/ioutil" - "net/http" - "reflect" - "sort" - "time" - - "github.com/gorilla/websocket" - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/events" - . "github.com/tendermint/tendermint/rpc/types" -) - -func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { - // HTTP endpoints - for funcName, rpcFunc := range funcMap { - mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc)) - } - - // JSONRPC endpoints - mux.HandleFunc("/", makeJSONRPCHandler(funcMap)) -} - -//------------------------------------- -// function introspection - -// holds all type information for each function -type RPCFunc struct { - f reflect.Value // underlying rpc function - args []reflect.Type // type of each function arg - returns []reflect.Type // type of each return arg - argNames []string // name of each argument - ws bool // websocket only -} - -// wraps a function for quicker introspection -func NewRPCFunc(f interface{}, args []string) *RPCFunc { - return &RPCFunc{ - f: reflect.ValueOf(f), - args: funcArgTypes(f), - returns: funcReturnTypes(f), - argNames: args, - ws: false, - } -} - -func NewWSRPCFunc(f interface{}, args []string) *RPCFunc { - return &RPCFunc{ - f: reflect.ValueOf(f), - args: funcArgTypes(f), - returns: funcReturnTypes(f), - argNames: args, - ws: true, - } -} - -// return a function's argument types -func funcArgTypes(f interface{}) []reflect.Type { - t := reflect.TypeOf(f) - n := t.NumIn() - typez := make([]reflect.Type, n) - for i := 0; i < n; i++ { - typez[i] = t.In(i) - } - return typez -} - -// return a function's return types -func funcReturnTypes(f interface{}) []reflect.Type { - t := reflect.TypeOf(f) - n := t.NumOut() - typez := make([]reflect.Type, n) - for i := 0; i < n; i++ { - typez[i] = t.Out(i) - } - return typez -} - -// function introspection -//----------------------------------------------------------------------------- -// rpc.json - -// jsonrpc calls grab the given method's function info and runs reflect.Call -func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - b, _ := ioutil.ReadAll(r.Body) - // if its an empty request (like from a browser), - // just display a list of functions - if len(b) == 0 { - writeListOfEndpoints(w, r, funcMap) - return - } - - var request RPCRequest - err := json.Unmarshal(b, &request) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) - return - } - if len(r.URL.Path) > 1 { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path))) - return - } - rpcFunc := funcMap[request.Method] - if rpcFunc == nil { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) - return - } - if rpcFunc.ws { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method)) - return - } - args, err := jsonParamsToArgs(rpcFunc, request.Params) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) - return - } - returns := rpcFunc.f.Call(args) - log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) - return - } - WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, "")) - } -} - -// Convert a list of interfaces to properly typed values -func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) { - if len(rpcFunc.argNames) != len(params) { - return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", - len(rpcFunc.argNames), rpcFunc.argNames, len(params), params)) - } - values := make([]reflect.Value, len(params)) - for i, p := range params { - ty := rpcFunc.args[i] - v, err := _jsonObjectToArg(ty, p) - if err != nil { - return nil, err - } - values[i] = v - } - return values, nil -} - -// Same as above, but with the first param the websocket connection -func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) { - if len(rpcFunc.argNames) != len(params) { - return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", - len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params)) - } - values := make([]reflect.Value, len(params)+1) - values[0] = reflect.ValueOf(wsCtx) - for i, p := range params { - ty := rpcFunc.args[i+1] - v, err := _jsonObjectToArg(ty, p) - if err != nil { - return nil, err - } - values[i+1] = v - } - return values, nil -} - -func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) { - var err error - v := reflect.New(ty) - wire.ReadJSONObjectPtr(v.Interface(), object, &err) - if err != nil { - return v, err - } - v = v.Elem() - return v, nil -} - -// rpc.json -//----------------------------------------------------------------------------- -// rpc.http - -// convert from a function name to the http handler -func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) { - // Exception for websocket endpoints - if rpcFunc.ws { - return func(w http.ResponseWriter, r *http.Request) { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets")) - } - } - // All other endpoints - return func(w http.ResponseWriter, r *http.Request) { - args, err := httpParamsToArgs(rpcFunc, r) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) - return - } - returns := rpcFunc.f.Call(args) - log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) - return - } - WriteRPCResponseHTTP(w, NewRPCResponse("", result, "")) - } -} - -// Covert an http query to a list of properly typed values. -// To be properly decoded the arg must be a concrete type from tendermint (if its an interface). -func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) { - argTypes := rpcFunc.args - argNames := rpcFunc.argNames - - var err error - values := make([]reflect.Value, len(argNames)) - for i, name := range argNames { - ty := argTypes[i] - arg := GetParam(r, name) - values[i], err = _jsonStringToArg(ty, arg) - if err != nil { - return nil, err - } - } - return values, nil -} - -func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { - var err error - v := reflect.New(ty) - wire.ReadJSONPtr(v.Interface(), []byte(arg), &err) - if err != nil { - return v, err - } - v = v.Elem() - return v, nil -} - -// rpc.http -//----------------------------------------------------------------------------- -// rpc.websocket - -const ( - writeChanCapacity = 1000 - wsWriteTimeoutSeconds = 30 // each write times out after this - wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. - wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. -) - -// a single websocket connection -// contains listener id, underlying ws connection, -// and the event switch for subscribing to events -type wsConnection struct { - QuitService - - remoteAddr string - baseConn *websocket.Conn - writeChan chan RPCResponse - readTimeout *time.Timer - pingTicker *time.Ticker - - funcMap map[string]*RPCFunc - evsw *events.EventSwitch -} - -// new websocket connection wrapper -func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *wsConnection { - wsc := &wsConnection{ - remoteAddr: baseConn.RemoteAddr().String(), - baseConn: baseConn, - writeChan: make(chan RPCResponse, writeChanCapacity), // error when full. - funcMap: funcMap, - evsw: evsw, - } - wsc.QuitService = *NewQuitService(log, "wsConnection", wsc) - return wsc -} - -// wsc.Start() blocks until the connection closes. -func (wsc *wsConnection) OnStart() error { - wsc.QuitService.OnStart() - - // Read subscriptions/unsubscriptions to events - go wsc.readRoutine() - - // Custom Ping handler to touch readTimeout - wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds) - wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds) - wsc.baseConn.SetPingHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) - wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) - return nil - }) - wsc.baseConn.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) - return nil - }) - go wsc.readTimeoutRoutine() - - // Write responses, BLOCKING. - wsc.writeRoutine() - return nil -} - -func (wsc *wsConnection) OnStop() { - wsc.QuitService.OnStop() - wsc.evsw.RemoveListener(wsc.remoteAddr) - wsc.readTimeout.Stop() - wsc.pingTicker.Stop() - // The write loop closes the websocket connection - // when it exits its loop, and the read loop - // closes the writeChan -} - -func (wsc *wsConnection) readTimeoutRoutine() { - select { - case <-wsc.readTimeout.C: - log.Notice("Stopping connection due to read timeout") - wsc.Stop() - case <-wsc.Quit: - return - } -} - -// Implements WSRPCConnection -func (wsc *wsConnection) GetRemoteAddr() string { - return wsc.remoteAddr -} - -// Implements WSRPCConnection -func (wsc *wsConnection) GetEventSwitch() *events.EventSwitch { - return wsc.evsw -} - -// Implements WSRPCConnection -// Blocking write to writeChan until service stops. -func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) { - select { - case <-wsc.Quit: - return - case wsc.writeChan <- resp: - } -} - -// Implements WSRPCConnection -// Nonblocking write. -func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool { - select { - case <-wsc.Quit: - return false - case wsc.writeChan <- resp: - return true - default: - return false - } -} - -// Read from the socket and subscribe to or unsubscribe from events -func (wsc *wsConnection) readRoutine() { - // Do not close writeChan, to allow WriteRPCResponse() to fail. - // defer close(wsc.writeChan) - - for { - select { - case <-wsc.Quit: - return - default: - var in []byte - // Do not set a deadline here like below: - // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds)) - // The client may not send anything for a while. - // We use `readTimeout` to handle read timeouts. - _, in, err := wsc.baseConn.ReadMessage() - if err != nil { - log.Notice("Failed to read from connection", "remote", wsc.remoteAddr) - // an error reading the connection, - // kill the connection - wsc.Stop() - return - } - var request RPCRequest - err = json.Unmarshal(in, &request) - if err != nil { - errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr)) - continue - } - - // Now, fetch the RPCFunc and execute it. - - rpcFunc := wsc.funcMap[request.Method] - if rpcFunc == nil { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) - continue - } - var args []reflect.Value - if rpcFunc.ws { - wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc} - args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx) - } else { - args, err = jsonParamsToArgs(rpcFunc, request.Params) - } - if err != nil { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) - continue - } - returns := rpcFunc.f.Call(args) - log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) - continue - } else { - wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, "")) - continue - } - - } - } -} - -// receives on a write channel and writes out on the socket -func (wsc *wsConnection) writeRoutine() { - defer wsc.baseConn.Close() - var n, err = int(0), error(nil) - for { - select { - case <-wsc.Quit: - return - case <-wsc.pingTicker.C: - err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{}) - if err != nil { - log.Error("Failed to write ping message on websocket", "error", err) - wsc.Stop() - return - } - case msg := <-wsc.writeChan: - buf := new(bytes.Buffer) - wire.WriteJSON(msg, buf, &n, &err) - if err != nil { - log.Error("Failed to marshal RPCResponse to JSON", "error", err) - } else { - wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) - bufBytes := buf.Bytes() - if err = wsc.baseConn.WriteMessage(websocket.TextMessage, bufBytes); err != nil { - log.Warn("Failed to write response on websocket", "error", err) - wsc.Stop() - return - } - } - } - } -} - -//---------------------------------------- - -// Main manager for all websocket connections -// Holds the event switch -// NOTE: The websocket path is defined externally, e.g. in node/node.go -type WebsocketManager struct { - websocket.Upgrader - funcMap map[string]*RPCFunc - evsw *events.EventSwitch -} - -func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WebsocketManager { - return &WebsocketManager{ - funcMap: funcMap, - evsw: evsw, - Upgrader: websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - // TODO - return true - }, - }, - } -} - -// Upgrade the request/response (via http.Hijack) and starts the wsConnection. -func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { - wsConn, err := wm.Upgrade(w, r, nil) - if err != nil { - // TODO - return http error - log.Error("Failed to upgrade to websocket connection", "error", err) - return - } - - // register connection - con := NewWSConnection(wsConn, wm.funcMap, wm.evsw) - log.Notice("New websocket connection", "remote", con.remoteAddr) - con.Start() // Blocking -} - -// rpc.websocket -//----------------------------------------------------------------------------- - -// returns is result struct and error. If error is not nil, return it -func unreflectResult(returns []reflect.Value) (interface{}, error) { - errV := returns[1] - if errV.Interface() != nil { - return nil, fmt.Errorf("%v", errV.Interface()) - } - return returns[0].Interface(), nil -} - -// writes a list of available rpc endpoints as an html page -func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) { - noArgNames := []string{} - argNames := []string{} - for name, funcData := range funcMap { - if len(funcData.args) == 0 { - noArgNames = append(noArgNames, name) - } else { - argNames = append(argNames, name) - } - } - sort.Strings(noArgNames) - sort.Strings(argNames) - buf := new(bytes.Buffer) - buf.WriteString("") - buf.WriteString("
Available endpoints:
") - - for _, name := range noArgNames { - link := fmt.Sprintf("http://%s/%s", r.Host, name) - buf.WriteString(fmt.Sprintf("%s
", link, link)) - } - - buf.WriteString("
Endpoints that require arguments:
") - for _, name := range argNames { - link := fmt.Sprintf("http://%s/%s?", r.Host, name) - funcData := funcMap[name] - for i, argName := range funcData.argNames { - link += argName + "=_" - if i < len(funcData.argNames)-1 { - link += "&" - } - } - buf.WriteString(fmt.Sprintf("%s
", link, link)) - } - buf.WriteString("") - w.Header().Set("Content-Type", "text/html") - w.WriteHeader(200) - w.Write(buf.Bytes()) -} diff --git a/rpc/server/http_params.go b/rpc/server/http_params.go deleted file mode 100644 index acf5b4c8..00000000 --- a/rpc/server/http_params.go +++ /dev/null @@ -1,89 +0,0 @@ -package rpcserver - -import ( - "encoding/hex" - "fmt" - "net/http" - "regexp" - "strconv" -) - -var ( - // Parts of regular expressions - atom = "[A-Z0-9!#$%&'*+\\-/=?^_`{|}~]+" - dotAtom = atom + `(?:\.` + atom + `)*` - domain = `[A-Z0-9.-]+\.[A-Z]{2,4}` - - RE_HEX = regexp.MustCompile(`^(?i)[a-f0-9]+$`) - RE_EMAIL = regexp.MustCompile(`^(?i)(` + dotAtom + `)@(` + dotAtom + `)$`) - RE_ADDRESS = regexp.MustCompile(`^(?i)[a-z0-9]{25,34}$`) - RE_HOST = regexp.MustCompile(`^(?i)(` + domain + `)$`) - - //RE_ID12 = regexp.MustCompile(`^[a-zA-Z0-9]{12}$`) -) - -func GetParam(r *http.Request, param string) string { - s := r.URL.Query().Get(param) - if s == "" { - s = r.FormValue(param) - } - return s -} - -func GetParamByteSlice(r *http.Request, param string) ([]byte, error) { - s := GetParam(r, param) - return hex.DecodeString(s) -} - -func GetParamInt64(r *http.Request, param string) (int64, error) { - s := GetParam(r, param) - i, err := strconv.ParseInt(s, 10, 64) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return i, nil -} - -func GetParamInt32(r *http.Request, param string) (int32, error) { - s := GetParam(r, param) - i, err := strconv.ParseInt(s, 10, 32) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return int32(i), nil -} - -func GetParamUint64(r *http.Request, param string) (uint64, error) { - s := GetParam(r, param) - i, err := strconv.ParseUint(s, 10, 64) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return i, nil -} - -func GetParamUint(r *http.Request, param string) (uint, error) { - s := GetParam(r, param) - i, err := strconv.ParseUint(s, 10, 64) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return uint(i), nil -} - -func GetParamRegexp(r *http.Request, param string, re *regexp.Regexp) (string, error) { - s := GetParam(r, param) - if !re.MatchString(s) { - return "", fmt.Errorf(param, "Did not match regular expression %v", re.String()) - } - return s, nil -} - -func GetParamFloat64(r *http.Request, param string) (float64, error) { - s := GetParam(r, param) - f, err := strconv.ParseFloat(s, 64) - if err != nil { - return 0, fmt.Errorf(param, err.Error()) - } - return f, nil -} diff --git a/rpc/server/http_server.go b/rpc/server/http_server.go deleted file mode 100644 index fd35d0df..00000000 --- a/rpc/server/http_server.go +++ /dev/null @@ -1,115 +0,0 @@ -// Commons for HTTP handling -package rpcserver - -import ( - "bufio" - "fmt" - "net" - "net/http" - "runtime/debug" - "time" - - . "github.com/tendermint/go-common" - "github.com/tendermint/go-wire" - "github.com/tendermint/tendermint/alert" - . "github.com/tendermint/tendermint/rpc/types" -) - -func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, error) { - log.Notice(Fmt("Starting RPC HTTP server on %v", listenAddr)) - listener, err := net.Listen("tcp", listenAddr) - if err != nil { - return nil, fmt.Errorf("Failed to listen to %v", listenAddr) - } - go func() { - res := http.Serve( - listener, - RecoverAndLogHandler(handler), - ) - log.Crit("RPC HTTP server stopped", "result", res) - }() - return listener, nil -} - -func WriteRPCResponseHTTP(w http.ResponseWriter, res RPCResponse) { - jsonBytes := wire.JSONBytesPretty(res) - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(200) - w.Write(jsonBytes) -} - -//----------------------------------------------------------------------------- - -// Wraps an HTTP handler, adding error logging. -// If the inner function panics, the outer function recovers, logs, sends an -// HTTP 500 error response. -func RecoverAndLogHandler(handler http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // Wrap the ResponseWriter to remember the status - rww := &ResponseWriterWrapper{-1, w} - begin := time.Now() - - // Common headers - origin := r.Header.Get("Origin") - rww.Header().Set("Access-Control-Allow-Origin", origin) - rww.Header().Set("Access-Control-Allow-Credentials", "true") - rww.Header().Set("Access-Control-Expose-Headers", "X-Server-Time") - rww.Header().Set("X-Server-Time", fmt.Sprintf("%v", begin.Unix())) - - defer func() { - // Send a 500 error if a panic happens during a handler. - // Without this, Chrome & Firefox were retrying aborted ajax requests, - // at least to my localhost. - if e := recover(); e != nil { - - // If RPCResponse - if res, ok := e.(RPCResponse); ok { - WriteRPCResponseHTTP(rww, res) - } else { - // For the rest, - log.Error("Panic in RPC HTTP handler", "error", e, "stack", string(debug.Stack())) - rww.WriteHeader(http.StatusInternalServerError) - WriteRPCResponseHTTP(rww, NewRPCResponse("", nil, Fmt("Internal Server Error: %v", e))) - } - } - - // Finally, log. - durationMS := time.Since(begin).Nanoseconds() / 1000000 - if rww.Status == -1 { - rww.Status = 200 - } - log.Info("Served RPC HTTP response", - "method", r.Method, "url", r.URL, - "status", rww.Status, "duration", durationMS, - "remoteAddr", r.RemoteAddr, - ) - }() - - handler.ServeHTTP(rww, r) - }) -} - -// Remember the status for logging -type ResponseWriterWrapper struct { - Status int - http.ResponseWriter -} - -func (w *ResponseWriterWrapper) WriteHeader(status int) { - w.Status = status - w.ResponseWriter.WriteHeader(status) -} - -// implements http.Hijacker -func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) { - return w.ResponseWriter.(http.Hijacker).Hijack() -} - -// Stick it as a deferred statement in gouroutines to prevent the program from crashing. -func Recover(daemonName string) { - if e := recover(); e != nil { - stack := string(debug.Stack()) - errorString := fmt.Sprintf("[%s] %s\n%s", daemonName, e, stack) - alert.Alert(errorString) - } -} diff --git a/rpc/server/log.go b/rpc/server/log.go deleted file mode 100644 index 704e22e3..00000000 --- a/rpc/server/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package rpcserver - -import ( - "github.com/tendermint/log15" -) - -var log = log15.New("module", "rpcserver") diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index a417cbea..98ff0d96 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -11,30 +11,48 @@ import ( "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" + client "github.com/tendermint/go-rpc/client" + "github.com/tendermint/go-rpc/types" _ "github.com/tendermint/tendermint/config/tendermint_test" nm "github.com/tendermint/tendermint/node" - client "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/core/types" - "github.com/tendermint/tendermint/rpc/types" "github.com/tendermint/tendermint/types" ) // global variables for use across all tests var ( - rpcAddr = "127.0.0.1:36657" // Not 46657 - requestAddr = "http://" + rpcAddr - websocketAddr = "ws://" + rpcAddr + "/websocket" - node *nm.Node mempoolCount = 0 chainID string - clientURI = client.NewClientURI(requestAddr) - clientJSON = client.NewClientJSONRPC(requestAddr) + rpcAddr, requestAddr, websocketAddr string + + clientURI *client.ClientURI + clientJSON *client.ClientJSONRPC ) +// initialize config and create new node +func init() { + initConfig() + + chainID = config.GetString("chain_id") + rpcAddr = config.GetString("rpc_laddr") + requestAddr = "http://" + rpcAddr + websocketAddr = "ws://" + rpcAddr + "/websocket" + + clientURI = client.NewClientURI(requestAddr) + clientJSON = client.NewClientJSONRPC(requestAddr) + + // TODO: change consensus/state.go timeouts to be shorter + + // start a node + ready := make(chan struct{}) + go newNode(ready) + <-ready +} + // create a new node and sleep forever func newNode(ready chan struct{}) { // Create & start node @@ -52,19 +70,6 @@ func newNode(ready chan struct{}) { <-ch } -// initialize config and create new node -func init() { - initConfig() - chainID = config.GetString("chain_id") - - // TODO: change consensus/state.go timeouts to be shorter - - // start a node - ready := make(chan struct{}) - go newNode(ready) - <-ready -} - //-------------------------------------------------------------------------------- // Utilities for testing the websocket service @@ -192,7 +197,7 @@ func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { func unmarshalValidateBlockchain(t *testing.T, con *websocket.Conn, eid string) { var initBlockN int - for i := 0; i < 2; i++ { + for i := 0; i < 3; i++ { waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { block, err := unmarshalResponseNewBlock(b) if err != nil { diff --git a/rpc/types/types.go b/rpc/types/types.go deleted file mode 100644 index f7bc98d5..00000000 --- a/rpc/types/types.go +++ /dev/null @@ -1,71 +0,0 @@ -package rpctypes - -import ( - "github.com/tendermint/tendermint/events" -) - -type RPCRequest struct { - JSONRPC string `json:"jsonrpc"` - ID string `json:"id"` - Method string `json:"method"` - Params []interface{} `json:"params"` -} - -func NewRPCRequest(id string, method string, params []interface{}) RPCRequest { - return RPCRequest{ - JSONRPC: "2.0", - ID: id, - Method: method, - Params: params, - } -} - -//---------------------------------------- - -/* -Result is a generic interface. -Applications should register type-bytes like so: - -var _ = wire.RegisterInterface( - struct{ Result }{}, - wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, - wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, - ... -) -*/ -type Result interface { -} - -//---------------------------------------- - -type RPCResponse struct { - JSONRPC string `json:"jsonrpc"` - ID string `json:"id"` - Result Result `json:"result"` - Error string `json:"error"` -} - -func NewRPCResponse(id string, res Result, err string) RPCResponse { - return RPCResponse{ - JSONRPC: "2.0", - ID: id, - Result: res, - Error: err, - } -} - -//---------------------------------------- - -// *wsConnection implements this interface. -type WSRPCConnection interface { - GetRemoteAddr() string - GetEventSwitch() *events.EventSwitch - WriteRPCResponse(resp RPCResponse) - TryWriteRPCResponse(resp RPCResponse) bool -} - -// websocket-only RPCFuncs take this as the first parameter. -type WSRPCContext struct { - Request RPCRequest - WSRPCConnection -} diff --git a/rpc/version.go b/rpc/version.go deleted file mode 100644 index 2982824d..00000000 --- a/rpc/version.go +++ /dev/null @@ -1,3 +0,0 @@ -package rpc - -const Version = "0.4.0" diff --git a/state/state.go b/state/state.go index 798e8ce7..ff05ebfa 100644 --- a/state/state.go +++ b/state/state.go @@ -8,6 +8,7 @@ import ( . "github.com/tendermint/go-common" dbm "github.com/tendermint/go-db" + "github.com/tendermint/go-events" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" ) diff --git a/types/events.go b/types/events.go index 436e02de..0bcd2fcc 100644 --- a/types/events.go +++ b/types/events.go @@ -1,6 +1,8 @@ package types import ( + // for registering EventData + "github.com/tendermint/go-events" "github.com/tendermint/go-wire" ) @@ -38,12 +40,8 @@ const ( EventDataTypeVote = byte(0x12) ) -type EventData interface { - AssertIsEventData() -} - var _ = wire.RegisterInterface( - struct{ EventData }{}, + struct{ events.EventData }{}, wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock}, // wire.ConcreteType{EventDataFork{}, EventDataTypeFork }, wire.ConcreteType{EventDataTx{}, EventDataTypeTx}, From fb59255095fd44bec2b2da7debefbbce4d6e1dc1 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 12 Jan 2016 18:04:06 -0500 Subject: [PATCH 3/8] use TendermintResult for rpctypes.Result --- node/node.go | 10 ++++ rpc/core/events.go | 4 +- rpc/core/routes.go | 112 ++++++++++++++++++++++++++++++++---- rpc/core/types/responses.go | 10 +++- rpc/test/client_test.go | 2 +- rpc/test/helpers.go | 4 +- 6 files changed, 124 insertions(+), 18 deletions(-) diff --git a/node/node.go b/node/node.go index 124a8773..2e072aec 100644 --- a/node/node.go +++ b/node/node.go @@ -16,12 +16,14 @@ import ( "github.com/tendermint/go-p2p" "github.com/tendermint/go-rpc" "github.com/tendermint/go-rpc/server" + "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/rpc/core" + ctypes "github.com/tendermint/tendermint/rpc/core/types" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "github.com/tendermint/tmsp/example/golang" @@ -181,6 +183,14 @@ func (n *Node) StartRPC() (net.Listener, error) { listenAddr := config.GetString("rpc_laddr") + // register the result objects with wire + // so consumers of tendermint rpc will not have + // conflicts with their own rpc + wire.RegisterInterface( + struct{ rpctypes.Result }{}, + wire.ConcreteType{&ctypes.TendermintResult{}, 0x1}, + ) + mux := http.NewServeMux() wm := rpcserver.NewWebsocketManager(core.Routes, n.evsw) mux.HandleFunc("/websocket", wm.WebsocketHandler) diff --git a/rpc/core/events.go b/rpc/core/events.go index 7365377d..c9164e09 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -11,7 +11,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.TendermintResult{&ctypes.ResultEvent{event, msg}}, "")) }) return &ctypes.ResultSubscribe{}, nil } @@ -21,7 +21,7 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsub wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.TendermintResult{&ctypes.ResultEvent{event, msg}}, "")) }) return &ctypes.ResultUnsubscribe{}, nil } diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 765c03b2..185625d0 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -2,20 +2,110 @@ package core import ( rpc "github.com/tendermint/go-rpc/server" + "github.com/tendermint/go-rpc/types" + ctypes "github.com/tendermint/tendermint/rpc/core/types" ) // TODO: eliminate redundancy between here and reading code from core/ var Routes = map[string]*rpc.RPCFunc{ - "subscribe": rpc.NewWSRPCFunc(Subscribe, []string{"event"}), - "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, []string{"event"}), - "status": rpc.NewRPCFunc(Status, []string{}), - "net_info": rpc.NewRPCFunc(NetInfo, []string{}), - "blockchain": rpc.NewRPCFunc(BlockchainInfo, []string{"minHeight", "maxHeight"}), - "genesis": rpc.NewRPCFunc(Genesis, []string{}), - "get_block": rpc.NewRPCFunc(GetBlock, []string{"height"}), - "list_validators": rpc.NewRPCFunc(ListValidators, []string{}), - "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, []string{}), - "broadcast_tx": rpc.NewRPCFunc(BroadcastTx, []string{"tx"}), - "list_unconfirmed_txs": rpc.NewRPCFunc(ListUnconfirmedTxs, []string{}), + "subscribe": rpc.NewWSRPCFunc(SubscribeResult, "event"), + "unsubscribe": rpc.NewWSRPCFunc(UnsubscribeResult, "event"), + "status": rpc.NewRPCFunc(StatusResult, ""), + "net_info": rpc.NewRPCFunc(NetInfoResult, ""), + "blockchain": rpc.NewRPCFunc(BlockchainInfoResult, "minHeight,maxHeight"), + "genesis": rpc.NewRPCFunc(GenesisResult, ""), + "get_block": rpc.NewRPCFunc(GetBlockResult, "height"), + "list_validators": rpc.NewRPCFunc(ListValidatorsResult, ""), + "dump_consensus_state": rpc.NewRPCFunc(DumpConsensusStateResult, ""), + "broadcast_tx": rpc.NewRPCFunc(BroadcastTxResult, "tx"), + "list_unconfirmed_txs": rpc.NewRPCFunc(ListUnconfirmedTxsResult, ""), // subscribe/unsubscribe are reserved for websocket events. } + +func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.TendermintResult, error) { + if r, err := Subscribe(wsCtx, event); err != nil { + return nil, err + } else { + return &ctypes.TendermintResult{r}, nil + } +} + +func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.TendermintResult, error) { + if r, err := Unsubscribe(wsCtx, event); err != nil { + return nil, err + } else { + return &ctypes.TendermintResult{r}, nil + } +} + +func StatusResult() (*ctypes.TendermintResult, error) { + if r, err := Status(); err != nil { + return nil, err + } else { + return &ctypes.TendermintResult{r}, nil + } +} + +func NetInfoResult() (*ctypes.TendermintResult, error) { + if r, err := NetInfo(); err != nil { + return nil, err + } else { + return &ctypes.TendermintResult{r}, nil + } +} + +func BlockchainInfoResult(min, max int) (*ctypes.TendermintResult, error) { + if r, err := BlockchainInfo(min, max); err != nil { + return nil, err + } else { + return &ctypes.TendermintResult{r}, nil + } +} + +func GenesisResult() (*ctypes.TendermintResult, error) { + if r, err := Genesis(); err != nil { + return nil, err + } else { + return &ctypes.TendermintResult{r}, nil + } +} + +func GetBlockResult(height int) (*ctypes.TendermintResult, error) { + if r, err := GetBlock(height); err != nil { + return nil, err + } else { + return &ctypes.TendermintResult{r}, nil + } +} + +func ListValidatorsResult() (*ctypes.TendermintResult, error) { + if r, err := ListValidators(); err != nil { + return nil, err + } else { + return &ctypes.TendermintResult{r}, nil + } +} + +func DumpConsensusStateResult() (*ctypes.TendermintResult, error) { + if r, err := DumpConsensusState(); err != nil { + return nil, err + } else { + return &ctypes.TendermintResult{r}, nil + } +} + +func ListUnconfirmedTxsResult() (*ctypes.TendermintResult, error) { + if r, err := ListUnconfirmedTxs(); err != nil { + return nil, err + } else { + return &ctypes.TendermintResult{r}, nil + } +} + +func BroadcastTxResult(tx []byte) (*ctypes.TendermintResult, error) { + if r, err := BroadcastTx(tx); err != nil { + return nil, err + } else { + return &ctypes.TendermintResult{r}, nil + } +} diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index ac6737cf..dad02f10 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -4,7 +4,6 @@ import ( "github.com/tendermint/go-crypto" "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" - "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" ) @@ -92,9 +91,16 @@ const ( ResultTypeEvent = byte(0x0C) ) +type TendermintResultInterface interface{} + +// NOTE: up to the application to register this as rpctypes.Result +type TendermintResult struct { + Result TendermintResultInterface +} + // for wire.readReflect var _ = wire.RegisterInterface( - struct{ rpctypes.Result }{}, + struct{ TendermintResultInterface }{}, wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, wire.ConcreteType{&ResultGetBlock{}, ResultTypeGetBlock}, diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index ff04ce27..00f8c2b0 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -29,7 +29,7 @@ func TestJSONStatus(t *testing.T) { } func testStatus(t *testing.T, result interface{}) { - status := result.(*ctypes.ResultStatus) + status := result.(*ctypes.TendermintResult).Result.(*ctypes.ResultStatus) if status.NodeInfo.Network != chainID { t.Fatal(fmt.Errorf("ChainID mismatch: got %s expected %s", status.NodeInfo.Network, chainID)) diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 98ff0d96..09e6aa23 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -137,7 +137,7 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou errCh <- err break } - event, ok := response.Result.(*ctypes.ResultEvent) + event, ok := response.Result.(*ctypes.TendermintResult).Result.(*ctypes.ResultEvent) if ok && event.Event == eventid { goodCh <- p break @@ -191,7 +191,7 @@ func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { if response.Error != "" { return nil, fmt.Errorf(response.Error) } - block := response.Result.(*ctypes.ResultEvent).Data.(types.EventDataNewBlock).Block + block := response.Result.(*ctypes.TendermintResult).Result.(*ctypes.ResultEvent).Data.(types.EventDataNewBlock).Block return block, nil } From ee449a94c8cd7d350250b34f82fdb8821315840e Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 12 Jan 2016 18:09:19 -0500 Subject: [PATCH 4/8] move ResultEvent to go-events --- node/node.go | 3 ++- rpc/core/events.go | 4 ++-- rpc/core/types/responses.go | 9 --------- rpc/test/helpers.go | 6 +++--- 4 files changed, 7 insertions(+), 15 deletions(-) diff --git a/node/node.go b/node/node.go index 2e072aec..8e85706e 100644 --- a/node/node.go +++ b/node/node.go @@ -188,7 +188,8 @@ func (n *Node) StartRPC() (net.Listener, error) { // conflicts with their own rpc wire.RegisterInterface( struct{ rpctypes.Result }{}, - wire.ConcreteType{&ctypes.TendermintResult{}, 0x1}, + wire.ConcreteType{&events.EventResult{}, 0x1}, + wire.ConcreteType{&ctypes.TendermintResult{}, 0x2}, ) mux := http.NewServeMux() diff --git a/rpc/core/events.go b/rpc/core/events.go index c9164e09..cd9d31d3 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -11,7 +11,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.TendermintResult{&ctypes.ResultEvent{event, msg}}, "")) + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &events.EventResult{event, msg}, "")) }) return &ctypes.ResultSubscribe{}, nil } @@ -21,7 +21,7 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsub wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.TendermintResult{&ctypes.ResultEvent{event, msg}}, "")) + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &events.EventResult{event, msg}, "")) }) return &ctypes.ResultUnsubscribe{}, nil } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index dad02f10..7101f0aa 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -2,7 +2,6 @@ package core_types import ( "github.com/tendermint/go-crypto" - "github.com/tendermint/go-events" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" @@ -67,12 +66,6 @@ type ResultSubscribe struct { type ResultUnsubscribe struct { } -// TODO: something about this -type ResultEvent struct { - Event string `json:"event"` - Data events.EventData `json:"data"` -} - //---------------------------------------- // response & result types @@ -88,7 +81,6 @@ const ( ResultTypeListUnconfirmedTxs = byte(0x09) ResultTypeSubscribe = byte(0x0A) ResultTypeUnsubscribe = byte(0x0B) - ResultTypeEvent = byte(0x0C) ) type TendermintResultInterface interface{} @@ -112,5 +104,4 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultListUnconfirmedTxs{}, ResultTypeListUnconfirmedTxs}, wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, - wire.ConcreteType{&ResultEvent{}, ResultTypeEvent}, ) diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 09e6aa23..ac7c900e 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -11,11 +11,11 @@ import ( "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" + "github.com/tendermint/go-events" client "github.com/tendermint/go-rpc/client" "github.com/tendermint/go-rpc/types" _ "github.com/tendermint/tendermint/config/tendermint_test" nm "github.com/tendermint/tendermint/node" - ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" ) @@ -137,7 +137,7 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou errCh <- err break } - event, ok := response.Result.(*ctypes.TendermintResult).Result.(*ctypes.ResultEvent) + event, ok := response.Result.(*events.EventResult) if ok && event.Event == eventid { goodCh <- p break @@ -191,7 +191,7 @@ func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { if response.Error != "" { return nil, fmt.Errorf(response.Error) } - block := response.Result.(*ctypes.TendermintResult).Result.(*ctypes.ResultEvent).Data.(types.EventDataNewBlock).Block + block := response.Result.(*events.EventResult).Data.(types.EventDataNewBlock).Block return block, nil } From 799efb0629f126af5a28d34fa73caf49337c5ca5 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 12 Jan 2016 19:30:31 -0500 Subject: [PATCH 5/8] merge/rebase fixes --- node/node.go | 11 ++++++----- node/node_test.go | 5 +++++ rpc/test/helpers.go | 4 +++- state/execution.go | 4 ++-- state/state.go | 1 - types/priv_validator.go | 2 +- 6 files changed, 17 insertions(+), 10 deletions(-) diff --git a/node/node.go b/node/node.go index 8e85706e..e7965171 100644 --- a/node/node.go +++ b/node/node.go @@ -7,6 +7,7 @@ import ( "net" "net/http" "strings" + "sync" "time" . "github.com/tendermint/go-common" @@ -332,21 +333,21 @@ func getState() *sm.State { // Get a connection to the proxyAppConn addr. // Check the current hash, and panic if it doesn't match. -func getProxyApp(addr string, hash []byte) (proxyAppCtx proxy.AppContext) { +func getProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) { // use local app (for testing) if addr == "local" { app := example.NewCounterApplication(true) - appCtx := app.Open() - proxyAppCtx = proxy.NewLocalAppContext(appCtx) + mtx := new(sync.Mutex) + proxyAppConn = proxy.NewLocalAppConn(mtx, app) } else { proxyConn, err := Connect(addr) if err != nil { Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) } - remoteApp := proxy.NewRemoteAppContext(proxyConn, 1024) + remoteApp := proxy.NewRemoteAppConn(proxyConn, 1024) remoteApp.Start() - proxyAppCtx = remoteApp + proxyAppConn = remoteApp } // Check the hash diff --git a/node/node_test.go b/node/node_test.go index 4374e514..4619de5f 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -6,10 +6,15 @@ import ( "github.com/tendermint/go-p2p" _ "github.com/tendermint/tendermint/config/tendermint_test" + "github.com/tendermint/tendermint/types" ) func TestNodeStartStop(t *testing.T) { + // Get PrivValidator + privValidatorFile := config.GetString("priv_validator_file") + privValidator := types.LoadOrGenPrivValidator(privValidatorFile) + // Create & start node n := NewNode(privValidator) l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), config.GetBool("skip_upnp")) diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index ac7c900e..3932ff6a 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -56,7 +56,9 @@ func init() { // create a new node and sleep forever func newNode(ready chan struct{}) { // Create & start node - node = nm.NewNode() + privValidatorFile := config.GetString("priv_validator_file") + privValidator := types.LoadOrGenPrivValidator(privValidatorFile) + node = nm.NewNode(privValidator) l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), true) node.AddListener(l) node.Start() diff --git a/state/execution.go b/state/execution.go index 1578af4b..37e3426b 100644 --- a/state/execution.go +++ b/state/execution.go @@ -5,7 +5,7 @@ import ( "fmt" . "github.com/tendermint/go-common" - "github.com/tendermint/tendermint/events" + "github.com/tendermint/go-events" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" tmsp "github.com/tendermint/tmsp/types" @@ -90,7 +90,7 @@ func (s *State) execBlockOnProxyApp(evsw *events.EventSwitch, proxyAppConn proxy log.Warn("Error computing proxyAppConn hash", "error", err) return err } - log.Info("ExecBlock got %v valid txs and %v invalid txs", validTxs, invalidTxs) + log.Info(Fmt("ExecBlock got %v valid txs and %v invalid txs", validTxs, invalidTxs)) // Set the state's new AppHash s.AppHash = hash diff --git a/state/state.go b/state/state.go index ff05ebfa..798e8ce7 100644 --- a/state/state.go +++ b/state/state.go @@ -8,7 +8,6 @@ import ( . "github.com/tendermint/go-common" dbm "github.com/tendermint/go-db" - "github.com/tendermint/go-events" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" ) diff --git a/types/priv_validator.go b/types/priv_validator.go index 78b95c6a..9b01c8a6 100644 --- a/types/priv_validator.go +++ b/types/priv_validator.go @@ -43,7 +43,7 @@ type PrivValidator struct { // PrivKey should be empty if a Signer other than the default is being used. PrivKey crypto.PrivKey `json:"priv_key"` - Signer + Signer `json:"-"` // For persistence. // Overloaded for testing. From 3fdb4c03ab389aa16cf3580a9efaea267bd1306e Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 13 Jan 2016 18:38:55 -0500 Subject: [PATCH 6/8] rpc: TMResult and TMEventData --- node/node.go | 11 ---------- rpc/core/events.go | 10 ++++----- rpc/core/routes.go | 44 ++++++++++++++++++------------------- rpc/core/types/responses.go | 17 +++++++++----- rpc/test/client_test.go | 15 ++++++++----- rpc/test/helpers.go | 31 +++++++++++++++++++------- types/events.go | 20 +++++++++++------ 7 files changed, 82 insertions(+), 66 deletions(-) diff --git a/node/node.go b/node/node.go index e7965171..ab06d48c 100644 --- a/node/node.go +++ b/node/node.go @@ -17,14 +17,12 @@ import ( "github.com/tendermint/go-p2p" "github.com/tendermint/go-rpc" "github.com/tendermint/go-rpc/server" - "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/consensus" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/rpc/core" - ctypes "github.com/tendermint/tendermint/rpc/core/types" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" "github.com/tendermint/tmsp/example/golang" @@ -184,15 +182,6 @@ func (n *Node) StartRPC() (net.Listener, error) { listenAddr := config.GetString("rpc_laddr") - // register the result objects with wire - // so consumers of tendermint rpc will not have - // conflicts with their own rpc - wire.RegisterInterface( - struct{ rpctypes.Result }{}, - wire.ConcreteType{&events.EventResult{}, 0x1}, - wire.ConcreteType{&ctypes.TendermintResult{}, 0x2}, - ) - mux := http.NewServeMux() wm := rpcserver.NewWebsocketManager(core.Routes, n.evsw) mux.HandleFunc("/websocket", wm.WebsocketHandler) diff --git a/rpc/core/events.go b/rpc/core/events.go index cd9d31d3..0a3edd39 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -4,6 +4,7 @@ import ( "github.com/tendermint/go-events" "github.com/tendermint/go-rpc/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/types" ) func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { @@ -11,17 +12,14 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &events.EventResult{event, msg}, "")) + tmResult := ctypes.TMResult(&ctypes.ResultEvent{event, types.TMEventData(msg)}) + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &tmResult, "")) }) return &ctypes.ResultSubscribe{}, nil } func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) { log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) - wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) { - // NOTE: EventSwitch callbacks must be nonblocking - // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &events.EventResult{event, msg}, "")) - }) + wsCtx.GetEventSwitch().RemoveListener(event) return &ctypes.ResultUnsubscribe{}, nil } diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 185625d0..8e9c252d 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -22,90 +22,90 @@ var Routes = map[string]*rpc.RPCFunc{ // subscribe/unsubscribe are reserved for websocket events. } -func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.TendermintResult, error) { +func SubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { if r, err := Subscribe(wsCtx, event); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.TendermintResult, error) { +func UnsubscribeResult(wsCtx rpctypes.WSRPCContext, event string) (ctypes.TMResult, error) { if r, err := Unsubscribe(wsCtx, event); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func StatusResult() (*ctypes.TendermintResult, error) { +func StatusResult() (ctypes.TMResult, error) { if r, err := Status(); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func NetInfoResult() (*ctypes.TendermintResult, error) { +func NetInfoResult() (ctypes.TMResult, error) { if r, err := NetInfo(); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func BlockchainInfoResult(min, max int) (*ctypes.TendermintResult, error) { +func BlockchainInfoResult(min, max int) (ctypes.TMResult, error) { if r, err := BlockchainInfo(min, max); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func GenesisResult() (*ctypes.TendermintResult, error) { +func GenesisResult() (ctypes.TMResult, error) { if r, err := Genesis(); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func GetBlockResult(height int) (*ctypes.TendermintResult, error) { +func GetBlockResult(height int) (ctypes.TMResult, error) { if r, err := GetBlock(height); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func ListValidatorsResult() (*ctypes.TendermintResult, error) { +func ListValidatorsResult() (ctypes.TMResult, error) { if r, err := ListValidators(); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func DumpConsensusStateResult() (*ctypes.TendermintResult, error) { +func DumpConsensusStateResult() (ctypes.TMResult, error) { if r, err := DumpConsensusState(); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func ListUnconfirmedTxsResult() (*ctypes.TendermintResult, error) { +func ListUnconfirmedTxsResult() (ctypes.TMResult, error) { if r, err := ListUnconfirmedTxs(); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } -func BroadcastTxResult(tx []byte) (*ctypes.TendermintResult, error) { +func BroadcastTxResult(tx []byte) (ctypes.TMResult, error) { if r, err := BroadcastTx(tx); err != nil { return nil, err } else { - return &ctypes.TendermintResult{r}, nil + return r, nil } } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 7101f0aa..09b7a702 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -3,6 +3,7 @@ package core_types import ( "github.com/tendermint/go-crypto" "github.com/tendermint/go-p2p" + "github.com/tendermint/go-rpc/types" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/types" ) @@ -66,6 +67,11 @@ type ResultSubscribe struct { type ResultUnsubscribe struct { } +type ResultEvent struct { + Name string `json:"name"` + Data types.TMEventData `json:"data"` +} + //---------------------------------------- // response & result types @@ -81,18 +87,16 @@ const ( ResultTypeListUnconfirmedTxs = byte(0x09) ResultTypeSubscribe = byte(0x0A) ResultTypeUnsubscribe = byte(0x0B) + ResultTypeEvent = byte(0x0C) ) -type TendermintResultInterface interface{} - -// NOTE: up to the application to register this as rpctypes.Result -type TendermintResult struct { - Result TendermintResultInterface +type TMResult interface { + rpctypes.Result } // for wire.readReflect var _ = wire.RegisterInterface( - struct{ TendermintResultInterface }{}, + struct{ TMResult }{}, wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, wire.ConcreteType{&ResultGetBlock{}, ResultTypeGetBlock}, @@ -104,4 +108,5 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultListUnconfirmedTxs{}, ResultTypeListUnconfirmedTxs}, wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, + wire.ConcreteType{&ResultEvent{}, ResultTypeEvent}, ) diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index 00f8c2b0..d10f8c72 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -13,23 +13,26 @@ import ( // Test the HTTP client func TestURIStatus(t *testing.T) { - result, err := clientURI.Call("status", map[string]interface{}{}) + tmResult := new(ctypes.TMResult) + _, err := clientURI.Call("status", map[string]interface{}{}, tmResult) if err != nil { t.Fatal(err) } - testStatus(t, result) + testStatus(t, tmResult) } func TestJSONStatus(t *testing.T) { - result, err := clientJSON.Call("status", []interface{}{}) + tmResult := new(ctypes.TMResult) + _, err := clientJSON.Call("status", []interface{}{}, tmResult) if err != nil { t.Fatal(err) } - testStatus(t, result) + testStatus(t, tmResult) } -func testStatus(t *testing.T, result interface{}) { - status := result.(*ctypes.TendermintResult).Result.(*ctypes.ResultStatus) +func testStatus(t *testing.T, statusI interface{}) { + tmRes := statusI.(*ctypes.TMResult) + status := (*tmRes).(*ctypes.ResultStatus) if status.NodeInfo.Network != chainID { t.Fatal(fmt.Errorf("ChainID mismatch: got %s expected %s", status.NodeInfo.Network, chainID)) diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 3932ff6a..dfccdb1e 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -1,6 +1,7 @@ package rpctest import ( + "encoding/json" "fmt" "net/http" "testing" @@ -11,11 +12,11 @@ import ( "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" - "github.com/tendermint/go-events" client "github.com/tendermint/go-rpc/client" "github.com/tendermint/go-rpc/types" _ "github.com/tendermint/tendermint/config/tendermint_test" nm "github.com/tendermint/tendermint/node" + ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" ) @@ -133,14 +134,24 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou // if the event id isnt what we're waiting on // ignore it var response rpctypes.RPCResponse - var err error - wire.ReadJSON(&response, p, &err) + if err := json.Unmarshal(p, &response); err != nil { + errCh <- err + break + } + if response.Error != "" { + errCh <- fmt.Errorf(response.Error) + break + } + + result := new(ctypes.TMResult) + fmt.Println("RESULT:", string(*response.Result)) + wire.ReadJSONPtr(result, *response.Result, &err) if err != nil { errCh <- err break } - event, ok := response.Result.(*events.EventResult) - if ok && event.Event == eventid { + event, ok := (*result).(*ctypes.ResultEvent) + if ok && event.Name == eventid { goodCh <- p break } @@ -186,14 +197,18 @@ func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { // unmarshall and assert somethings var response rpctypes.RPCResponse var err error - wire.ReadJSON(&response, b, &err) - if err != nil { + if err := json.Unmarshal(b, &response); err != nil { return nil, err } if response.Error != "" { return nil, fmt.Errorf(response.Error) } - block := response.Result.(*events.EventResult).Data.(types.EventDataNewBlock).Block + var result ctypes.TMResult + wire.ReadJSONPtr(&result, *response.Result, &err) + if err != nil { + return nil, err + } + block := result.(*ctypes.ResultEvent).Data.(types.EventDataNewBlock).Block return block, nil } diff --git a/types/events.go b/types/events.go index 0bcd2fcc..26f29f41 100644 --- a/types/events.go +++ b/types/events.go @@ -1,7 +1,7 @@ package types import ( - // for registering EventData + // for registering TMEventData as events.EventData "github.com/tendermint/go-events" "github.com/tendermint/go-wire" ) @@ -30,6 +30,12 @@ func EventStringApp() string { return "App" } //---------------------------------------- +// implements events.EventData +type TMEventData interface { + events.EventData + // AssertIsTMEventData() +} + const ( EventDataTypeNewBlock = byte(0x01) EventDataTypeFork = byte(0x02) @@ -41,7 +47,7 @@ const ( ) var _ = wire.RegisterInterface( - struct{ events.EventData }{}, + struct{ TMEventData }{}, wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock}, // wire.ConcreteType{EventDataFork{}, EventDataTypeFork }, wire.ConcreteType{EventDataTx{}, EventDataTypeTx}, @@ -92,8 +98,8 @@ type EventDataVote struct { Vote *Vote } -func (_ EventDataNewBlock) AssertIsEventData() {} -func (_ EventDataTx) AssertIsEventData() {} -func (_ EventDataApp) AssertIsEventData() {} -func (_ EventDataRoundState) AssertIsEventData() {} -func (_ EventDataVote) AssertIsEventData() {} +func (_ EventDataNewBlock) AssertIsTMEventData() {} +func (_ EventDataTx) AssertIsTMEventData() {} +func (_ EventDataApp) AssertIsTMEventData() {} +func (_ EventDataRoundState) AssertIsTMEventData() {} +func (_ EventDataVote) AssertIsTMEventData() {} From 8e776a252b10fe21b9921374f94a9a7371a9aa55 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 13 Jan 2016 21:20:25 -0500 Subject: [PATCH 7/8] rpc: test cleanup --- rpc/test/client_test.go | 42 +++++++++---- rpc/test/helpers.go | 130 +++++++++------------------------------- 2 files changed, 58 insertions(+), 114 deletions(-) diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index d10f8c72..64f55758 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -82,21 +82,21 @@ var wsTyp = "JSONRPC" // make a simple connection to the server func TestWSConnect(t *testing.T) { - con := newWSCon(t) - con.Close() + wsc := newWSClient(t) + wsc.Stop() } // receive a new block message func TestWSNewBlock(t *testing.T) { - con := newWSCon(t) + wsc := newWSClient(t) eid := types.EventStringNewBlock() - subscribe(t, con, eid) + subscribe(t, wsc, eid) defer func() { - unsubscribe(t, con, eid) - con.Close() + unsubscribe(t, wsc, eid) + wsc.Stop() }() - waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { - fmt.Println("Check:", string(b)) + waitForEvent(t, wsc, eid, true, func() {}, func(eid string, b interface{}) error { + fmt.Println("Check:", b) return nil }) } @@ -106,15 +106,31 @@ func TestWSBlockchainGrowth(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode.") } - con := newWSCon(t) + wsc := newWSClient(t) eid := types.EventStringNewBlock() - subscribe(t, con, eid) + subscribe(t, wsc, eid) defer func() { - unsubscribe(t, con, eid) - con.Close() + unsubscribe(t, wsc, eid) + wsc.Stop() }() + // listen for NewBlock, ensure height increases by 1 - unmarshalValidateBlockchain(t, con, eid) + + var initBlockN int + for i := 0; i < 3; i++ { + waitForEvent(t, wsc, eid, true, func() {}, func(eid string, eventData interface{}) error { + block := eventData.(types.EventDataNewBlock).Block + if i == 0 { + initBlockN = block.Header.Height + } else { + if block.Header.Height != initBlockN+i { + return fmt.Errorf("Expected block %d, got block %d", initBlockN+i, block.Header.Height) + } + } + + return nil + }) + } } /* TODO: this with dummy app.. diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index dfccdb1e..21de52fa 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -1,19 +1,14 @@ package rpctest import ( - "encoding/json" - "fmt" - "net/http" "testing" "time" - "github.com/gorilla/websocket" . "github.com/tendermint/go-common" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" client "github.com/tendermint/go-rpc/client" - "github.com/tendermint/go-rpc/types" _ "github.com/tendermint/tendermint/config/tendermint_test" nm "github.com/tendermint/tendermint/node" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -77,84 +72,58 @@ func newNode(ready chan struct{}) { // Utilities for testing the websocket service // create a new connection -func newWSCon(t *testing.T) *websocket.Conn { - dialer := websocket.DefaultDialer - rHeader := http.Header{} - con, r, err := dialer.Dial(websocketAddr, rHeader) - fmt.Println("response", r) - if err != nil { +func newWSClient(t *testing.T) *client.WSClient { + wsc := client.NewWSClient(websocketAddr) + if _, err := wsc.Start(); err != nil { t.Fatal(err) } - return con + return wsc } // subscribe to an event -func subscribe(t *testing.T, con *websocket.Conn, eventid string) { - err := con.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "subscribe", - Params: []interface{}{eventid}, - }) - if err != nil { +func subscribe(t *testing.T, wsc *client.WSClient, eventid string) { + if err := wsc.Subscribe(eventid); err != nil { t.Fatal(err) } } // unsubscribe from an event -func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) { - err := con.WriteJSON(rpctypes.RPCRequest{ - JSONRPC: "2.0", - ID: "", - Method: "unsubscribe", - Params: []interface{}{eventid}, - }) - if err != nil { +func unsubscribe(t *testing.T, wsc *client.WSClient, eventid string) { + if err := wsc.Unsubscribe(eventid); err != nil { t.Fatal(err) } } // wait for an event; do things that might trigger events, and check them when they are received // the check function takes an event id and the byte slice read off the ws -func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeout bool, f func(), check func(string, []byte) error) { +func waitForEvent(t *testing.T, wsc *client.WSClient, eventid string, dieOnTimeout bool, f func(), check func(string, interface{}) error) { // go routine to wait for webscoket msg - goodCh := make(chan []byte) + goodCh := make(chan interface{}) errCh := make(chan error) - quitCh := make(chan struct{}) - defer close(quitCh) // Read message go func() { + var err error + LOOP: for { - _, p, err := con.ReadMessage() - if err != nil { - errCh <- err - break - } else { - // if the event id isnt what we're waiting on - // ignore it - var response rpctypes.RPCResponse - if err := json.Unmarshal(p, &response); err != nil { - errCh <- err - break - } - if response.Error != "" { - errCh <- fmt.Errorf(response.Error) - break - } - + select { + case r := <-wsc.ResultsCh: result := new(ctypes.TMResult) - fmt.Println("RESULT:", string(*response.Result)) - wire.ReadJSONPtr(result, *response.Result, &err) + wire.ReadJSONPtr(result, r, &err) if err != nil { errCh <- err - break + break LOOP } event, ok := (*result).(*ctypes.ResultEvent) if ok && event.Name == eventid { - goodCh <- p - break + goodCh <- event.Data + break LOOP } + case err := <-wsc.ErrorsCh: + errCh <- err + break LOOP + case <-wsc.Quit: + break LOOP } } }() @@ -167,68 +136,27 @@ func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeou select { case <-timeout.C: if dieOnTimeout { - con.Close() + wsc.Stop() t.Fatalf("%s event was not received in time", eventid) } // else that's great, we didn't hear the event // and we shouldn't have - case p := <-goodCh: + case eventData := <-goodCh: if dieOnTimeout { // message was received and expected // run the check - err := check(eventid, p) - if err != nil { - t.Fatal(err) - panic(err) // Show the stack trace. + if err := check(eventid, eventData); err != nil { + t.Fatal(err) // Show the stack trace. } } else { - con.Close() + wsc.Stop() t.Fatalf("%s event was not expected", eventid) } case err := <-errCh: t.Fatal(err) panic(err) // Show the stack trace. + } } //-------------------------------------------------------------------------------- - -func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { - // unmarshall and assert somethings - var response rpctypes.RPCResponse - var err error - if err := json.Unmarshal(b, &response); err != nil { - return nil, err - } - if response.Error != "" { - return nil, fmt.Errorf(response.Error) - } - var result ctypes.TMResult - wire.ReadJSONPtr(&result, *response.Result, &err) - if err != nil { - return nil, err - } - block := result.(*ctypes.ResultEvent).Data.(types.EventDataNewBlock).Block - return block, nil -} - -func unmarshalValidateBlockchain(t *testing.T, con *websocket.Conn, eid string) { - var initBlockN int - for i := 0; i < 3; i++ { - waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { - block, err := unmarshalResponseNewBlock(b) - if err != nil { - return err - } - if i == 0 { - initBlockN = block.Header.Height - } else { - if block.Header.Height != initBlockN+i { - return fmt.Errorf("Expected block %d, got block %d", i, block.Header.Height) - } - } - - return nil - }) - } -} From f100404362f3cbb7ba14c418f5d70d6919acc637 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 14 Jan 2016 11:07:31 -0800 Subject: [PATCH 8/8] Make EventDataRoundState use json:"-" instead of Getter/Setter --- consensus/reactor.go | 2 +- consensus/state.go | 8 ++++---- consensus/state_test.go | 28 ++++++++++++++-------------- types/events.go | 10 +--------- 4 files changed, 20 insertions(+), 28 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index d9f36d92..41ff77cc 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -235,7 +235,7 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) { func (conR *ConsensusReactor) registerEventCallbacks() { conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data events.EventData) { - rs := data.(*types.EventDataRoundState).RoundState().(*RoundState) + rs := data.(*types.EventDataRoundState).RoundState.(*RoundState) conR.broadcastNewRoundStep(rs) }) diff --git a/consensus/state.go b/consensus/state.go index 8cee3dbd..721f7157 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -99,11 +99,11 @@ type RoundState struct { func (rs *RoundState) RoundStateEvent() *types.EventDataRoundState { edrs := &types.EventDataRoundState{ - Height: rs.Height, - Round: rs.Round, - Step: rs.Step.String(), + Height: rs.Height, + Round: rs.Round, + Step: rs.Step.String(), + RoundState: rs, } - edrs.SetRoundState(rs) return edrs } diff --git a/consensus/state_test.go b/consensus/state_test.go index da8d4c90..f4d8f548 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -243,7 +243,7 @@ func TestFullRound1(t *testing.T) { // grab proposal re := <-propCh - propBlockHash := re.(*types.EventDataRoundState).RoundState().(*RoundState).ProposalBlock.Hash() + propBlockHash := re.(*types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash() <-voteCh // wait for prevote validatePrevote(t, cs, round, vss[0], propBlockHash) @@ -336,7 +336,7 @@ func TestLockNoPOL(t *testing.T) { cs1.startRoutines(0) re := <-proposalCh - rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs := re.(*types.EventDataRoundState).RoundState.(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -376,7 +376,7 @@ func TestLockNoPOL(t *testing.T) { // now we're on a new round and not the proposer, so wait for timeout re = <-timeoutProposeCh - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) if rs.ProposalBlock != nil { t.Fatal("Expected proposal block to be nil") @@ -420,7 +420,7 @@ func TestLockNoPOL(t *testing.T) { incrementRound(cs2) re = <-proposalCh - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) // now we're on a new round and are the proposer if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) { @@ -505,7 +505,7 @@ func TestLockPOLRelock(t *testing.T) { <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs := re.(*types.EventDataRoundState).RoundState.(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -576,7 +576,7 @@ func TestLockPOLRelock(t *testing.T) { be := <-newBlockCh b := be.(types.EventDataNewBlock) re = <-newRoundCh - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) if rs.Height != 2 { t.Fatal("Expected height to increment") } @@ -610,7 +610,7 @@ func TestLockPOLUnlock(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs := re.(*types.EventDataRoundState).RoundState.(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -634,7 +634,7 @@ func TestLockPOLUnlock(t *testing.T) { // timeout to new round re = <-timeoutWaitCh - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) lockedBlockHash := rs.LockedBlock.Hash() //XXX: this isnt gauranteed to get there before the timeoutPropose ... @@ -692,7 +692,7 @@ func TestLockPOLSafety1(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs := re.(*types.EventDataRoundState).RoundState.(*RoundState) propBlock := rs.ProposalBlock <-voteCh // prevote @@ -740,7 +740,7 @@ func TestLockPOLSafety1(t *testing.T) { re = <-proposalCh } - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) if rs.LockedBlock != nil { t.Fatal("we should not be locked!") @@ -903,7 +903,7 @@ func TestSlashingPrevotes(t *testing.T) { re := <-proposalCh <-voteCh // prevote - rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs := re.(*types.EventDataRoundState).RoundState.(*RoundState) // we should now be stuck in limbo forever, waiting for more prevotes // add one for a different block should cause us to go into prevote wait @@ -981,7 +981,7 @@ func TestHalt1(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs := re.(*types.EventDataRoundState).RoundState.(*RoundState) propBlock := rs.ProposalBlock propBlockParts := propBlock.MakePartSet() @@ -1004,7 +1004,7 @@ func TestHalt1(t *testing.T) { // timeout to new round <-timeoutWaitCh re = <-newRoundCh - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) log.Notice("### ONTO ROUND 1") /*Round2 @@ -1022,7 +1022,7 @@ func TestHalt1(t *testing.T) { // receiving that precommit should take us straight to commit <-newBlockCh re = <-newRoundCh - rs = re.(*types.EventDataRoundState).RoundState().(*RoundState) + rs = re.(*types.EventDataRoundState).RoundState.(*RoundState) if rs.Height != 2 { t.Fatal("expected height to increment") diff --git a/types/events.go b/types/events.go index 26f29f41..a316cfc5 100644 --- a/types/events.go +++ b/types/events.go @@ -81,15 +81,7 @@ type EventDataRoundState struct { Step string `json:"step"` // private, not exposed to websockets - rs interface{} -} - -func (edrs *EventDataRoundState) RoundState() interface{} { - return edrs.rs -} - -func (edrs *EventDataRoundState) SetRoundState(rs interface{}) { - edrs.rs = rs + RoundState interface{} `json:"-"` } type EventDataVote struct {