From 96547d0ca846b1cd784033d534c46911bba1f745 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 10 Jan 2016 16:33:52 -0500 Subject: [PATCH] ws fixes; rpc tests --- config/tendermint_test/config.go | 69 ++++------ node/node.go | 23 +++- rpc/client/http_client.go | 108 ++++++++++++++-- rpc/core/events.go | 4 +- rpc/server/handlers.go | 2 +- rpc/test/client_test.go | 143 +++++++++++++++++++++ rpc/test/config.go | 18 +++ rpc/test/helpers.go | 212 +++++++++++++++++++++++++++++++ 8 files changed, 515 insertions(+), 64 deletions(-) create mode 100644 rpc/test/client_test.go create mode 100644 rpc/test/config.go create mode 100644 rpc/test/helpers.go diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 024a5c97..9820ba24 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -30,6 +30,7 @@ func initTMRoot(rootDir string) { configFilePath := path.Join(rootDir, "config.toml") genesisFilePath := path.Join(rootDir, "genesis.json") + privFilePath := path.Join(rootDir, "priv_validator.json") // Write default config file if missing. if !FileExists(configFilePath) { @@ -40,6 +41,9 @@ func initTMRoot(rootDir string) { if !FileExists(genesisFilePath) { MustWriteFile(genesisFilePath, []byte(defaultGenesis), 0644) } + if !FileExists(privFilePath) { + MustWriteFile(privFilePath, []byte(defaultPrivValidator), 0644) + } } func GetConfig(rootDir string) cfg.Config { @@ -58,7 +62,7 @@ func GetConfig(rootDir string) cfg.Config { } mapConfig.SetDefault("chain_id", "tendermint_test") mapConfig.SetDefault("genesis_file", rootDir+"/genesis.json") - mapConfig.SetDefault("proxy_app", "tcp://127.0.0.1:36658") + mapConfig.SetDefault("proxy_app", "local") mapConfig.SetDefault("moniker", "anonymous") mapConfig.SetDefault("node_laddr", "0.0.0.0:36656") mapConfig.SetDefault("fast_sync", false) @@ -78,7 +82,7 @@ func GetConfig(rootDir string) cfg.Config { var defaultConfigTmpl = `# This is a TOML config file. # For more information, see https://github.com/toml-lang/toml -proxy_app = "tcp://127.0.0.1:36658" +proxy_app = "local" moniker = "__MONIKER__" node_laddr = "0.0.0.0:36656" seeds = "" @@ -93,50 +97,33 @@ func defaultConfig(moniker string) (defaultConfig string) { return } -// priv keys generated deterministically eg rpc/tests/helpers.go var defaultGenesis = `{ - "chain_id" : "tendermint_test", - "accounts": [ - { - "address": "E9B5D87313356465FAE33C406CE2C2979DE60BCB", - "amount": 200000000 - }, - { - "address": "DFE4AFFA4CEE17CD01CB9E061D77C3ECED29BD88", - "amount": 200000000 - }, - { - "address": "F60D30722E7B497FA532FB3207C3FB29C31B1992", - "amount": 200000000 - }, - { - "address": "336CB40A5EB92E496E19B74FDFF2BA017C877FD6", - "amount": 200000000 - }, - { - "address": "D218F0F439BF0384F6F5EF8D0F8B398D941BD1DC", - "amount": 200000000 - } - ], + "genesis_time": "0001-01-01T00:00:00.000Z", + "chain_id": "tendermint_test", "validators": [ { - "pub_key": [1, "583779C3BFA3F6C7E23C7D830A9C3D023A216B55079AD38BFED1207B94A19548"], - "amount": 1000000, - "unbond_to": [ - { - "address": "E9B5D87313356465FAE33C406CE2C2979DE60BCB", - "amount": 100000 - } - ] + "pub_key": [ + 1, + "3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8" + ], + "amount": 10, + "name": "" } - ] + ], + "app_hash": "" }` var defaultPrivValidator = `{ - "address": "1D7A91CB32F758A02EBB9BE1FB6F8DEE56F90D42", - "pub_key": [1,"06FBAC4E285285D1D91FCBC7E91C780ADA11516F67462340B3980CE2B94940E8"], - "priv_key": [1,"C453604BD6480D5538B4C6FD2E3E314B5BCE518D75ADE4DA3DA85AB8ADFD819606FBAC4E285285D1D91FCBC7E91C780ADA11516F67462340B3980CE2B94940E8"], - "last_height":0, - "last_round":0, - "last_step":0 + "address": "D028C9981F7A87F3093672BF0D5B0E2A1B3ED456", + "pub_key": [ + 1, + "3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8" + ], + "priv_key": [ + 1, + "27F82582AEFAE7AB151CFB01C48BB6C1A0DA78F9BDDA979A9F70A84D074EB07D3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8" + ], + "last_height": 0, + "last_round": 0, + "last_step": 0 }` diff --git a/node/node.go b/node/node.go index 077e11d3..09dc5886 100644 --- a/node/node.go +++ b/node/node.go @@ -24,6 +24,7 @@ import ( "github.com/tendermint/tendermint/rpc/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmsp/example/golang" ) import _ "net/http/pprof" @@ -320,14 +321,22 @@ func getState() *sm.State { // Get a connection to the proxyAppConn addr. // Check the current hash, and panic if it doesn't match. -func getProxyApp(addr string, hash []byte) proxy.AppConn { - proxyConn, err := Connect(addr) - if err != nil { - Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) - } - proxyAppConn := proxy.NewRemoteAppConn(proxyConn, 1024) +func getProxyApp(addr string, hash []byte) (proxyAppCtx proxy.AppContext) { + // use local app (for testing) + if addr == "local" { + app := example.NewCounterApplication(true) + appCtx := app.Open() + proxyAppCtx = proxy.NewLocalAppContext(appCtx) + } else { + proxyConn, err := Connect(addr) + if err != nil { + Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) + } + proxyAppCtx := proxy.NewRemoteAppContext(proxyConn, 1024) - proxyAppConn.Start() + proxyAppCtx.Start() + + } // Check the hash currentHash, err := proxyAppConn.GetHashSync() diff --git a/rpc/client/http_client.go b/rpc/client/http_client.go index 6cc275d0..9ff72629 100644 --- a/rpc/client/http_client.go +++ b/rpc/client/http_client.go @@ -2,17 +2,47 @@ package rpcclient import ( "bytes" - "encoding/json" "errors" "io/ioutil" "net/http" + "net/url" + "strings" . "github.com/tendermint/go-common" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/rpc/types" ) -func CallHTTP(remote string, method string, params []interface{}, dest interface{}) (interface{}, error) { +// JSON rpc takes params as a slice +type ClientJSONRPC struct { + remote string +} + +func NewClientJSONRPC(remote string) *ClientJSONRPC { + return &ClientJSONRPC{remote} +} + +func (c *ClientJSONRPC) Call(method string, params []interface{}) (interface{}, error) { + return CallHTTP_JSONRPC(c.remote, method, params) +} + +// URI takes params as a map +type ClientURI struct { + remote string +} + +func NewClientURI(remote string) *ClientURI { + if !strings.HasSuffix(remote, "/") { + remote = remote + "/" + } + return &ClientURI{remote} +} + +func (c *ClientURI) Call(method string, params map[string]interface{}) (interface{}, error) { + return CallHTTP_URI(c.remote, method, params) +} + +func CallHTTP_JSONRPC(remote string, method string, params []interface{}) (interface{}, error) { // Make request and get responseBytes request := rpctypes.RPCRequest{ JSONRPC: "2.0", @@ -25,27 +55,79 @@ func CallHTTP(remote string, method string, params []interface{}, dest interface log.Info(Fmt("RPC request to %v: %v", remote, string(requestBytes))) httpResponse, err := http.Post(remote, "text/json", requestBuf) if err != nil { - return dest, err + return nil, err } defer httpResponse.Body.Close() responseBytes, err := ioutil.ReadAll(httpResponse.Body) if err != nil { - return dest, err + return nil, err } log.Info(Fmt("RPC response: %v", string(responseBytes))) + return unmarshalResponseBytes(responseBytes) +} - // Parse response into JSONResponse - response := rpctypes.RPCResponse{} - err = json.Unmarshal(responseBytes, &response) +func CallHTTP_URI(remote string, method string, params map[string]interface{}) (interface{}, error) { + values, err := argsToURLValues(params) if err != nil { - return dest, err + return nil, err + } + log.Info(Fmt("URI request to %v: %v", remote, values)) + resp, err := http.PostForm(remote+method, values) + if err != nil { + return nil, err + } + defer resp.Body.Close() + responseBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + return unmarshalResponseBytes(responseBytes) +} + +//------------------------------------------------ + +func unmarshalResponseBytes(responseBytes []byte) (interface{}, error) { + // read response + // if rpc/core/types is imported, the result will unmarshal + // into the correct type + var err error + response := &rpctypes.RPCResponse{} + wire.ReadJSON(response, responseBytes, &err) + if err != nil { + return nil, err } - // Parse response into dest - resultJSONObject := response.Result errorStr := response.Error if errorStr != "" { - return dest, errors.New(errorStr) + return nil, errors.New(errorStr) } - dest = wire.ReadJSONObject(dest, resultJSONObject, &err) - return dest, err + return response.Result, err +} + +func argsToURLValues(args map[string]interface{}) (url.Values, error) { + values := make(url.Values) + if len(args) == 0 { + return values, nil + } + err := argsToJson(args) + if err != nil { + return nil, err + } + for key, val := range args { + values.Set(key, val.(string)) + } + return values, nil +} + +func argsToJson(args map[string]interface{}) error { + var n int + var err error + for k, v := range args { + buf := new(bytes.Buffer) + wire.WriteJSON(v, buf, &n, &err) + if err != nil { + return err + } + args[k] = buf.String() + } + return nil } diff --git a/rpc/core/events.go b/rpc/core/events.go index 8907e0bb..83092fd9 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -11,7 +11,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) }) return &ctypes.ResultSubscribe{}, nil } @@ -21,7 +21,7 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsub wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { // NOTE: EventSwitch callbacks must be nonblocking // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, "")) }) return &ctypes.ResultUnsubscribe{}, nil } diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index a28277f7..923544be 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -153,7 +153,7 @@ func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, // Same as above, but with the first param the websocket connection func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) { - if len(rpcFunc.argNames)-1 != len(params) { + if len(rpcFunc.argNames) != len(params) { return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params)) } diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go new file mode 100644 index 00000000..ff04ce27 --- /dev/null +++ b/rpc/test/client_test.go @@ -0,0 +1,143 @@ +package rpctest + +import ( + "fmt" + "testing" + + _ "github.com/tendermint/tendermint/config/tendermint_test" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/types" +) + +//-------------------------------------------------------------------------------- +// Test the HTTP client + +func TestURIStatus(t *testing.T) { + result, err := clientURI.Call("status", map[string]interface{}{}) + if err != nil { + t.Fatal(err) + } + testStatus(t, result) +} + +func TestJSONStatus(t *testing.T) { + result, err := clientJSON.Call("status", []interface{}{}) + if err != nil { + t.Fatal(err) + } + testStatus(t, result) +} + +func testStatus(t *testing.T, result interface{}) { + status := result.(*ctypes.ResultStatus) + if status.NodeInfo.Network != chainID { + t.Fatal(fmt.Errorf("ChainID mismatch: got %s expected %s", + status.NodeInfo.Network, chainID)) + } +} + +/*func TestURIBroadcastTx(t *testing.T) { + testBroadcastTx(t, "HTTP") +}*/ + +/*func TestJSONBroadcastTx(t *testing.T) { + testBroadcastTx(t, "JSONRPC") +}*/ + +// TODO +/* +func testBroadcastTx(t *testing.T, typ string) { + amt := int64(100) + toAddr := user[1].Address + tx := makeDefaultSendTxSigned(t, typ, toAddr, amt) + receipt := broadcastTx(t, typ, tx) + if receipt.CreatesContract > 0 { + t.Fatal("This tx does not create a contract") + } + if len(receipt.TxHash) == 0 { + t.Fatal("Failed to compute tx hash") + } + pool := node.MempoolReactor().Mempool + txs := pool.GetProposalTxs() + if len(txs) != mempoolCount { + t.Fatalf("The mem pool has %d txs. Expected %d", len(txs), mempoolCount) + } + tx2 := txs[mempoolCount-1].(*types.SendTx) + n, err := new(int64), new(error) + buf1, buf2 := new(bytes.Buffer), new(bytes.Buffer) + tx.WriteSignBytes(chainID, buf1, n, err) + tx2.WriteSignBytes(chainID, buf2, n, err) + if bytes.Compare(buf1.Bytes(), buf2.Bytes()) != 0 { + t.Fatal("inconsistent hashes for mempool tx and sent tx") + } +}*/ + +//-------------------------------------------------------------------------------- +// Test the websocket service + +var wsTyp = "JSONRPC" + +// make a simple connection to the server +func TestWSConnect(t *testing.T) { + con := newWSCon(t) + con.Close() +} + +// receive a new block message +func TestWSNewBlock(t *testing.T) { + con := newWSCon(t) + eid := types.EventStringNewBlock() + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { + fmt.Println("Check:", string(b)) + return nil + }) +} + +// receive a few new block messages in a row, with increasing height +func TestWSBlockchainGrowth(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + con := newWSCon(t) + eid := types.EventStringNewBlock() + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + // listen for NewBlock, ensure height increases by 1 + unmarshalValidateBlockchain(t, con, eid) +} + +/* TODO: this with dummy app.. +func TestWSDoubleFire(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + con := newWSCon(t) + eid := types.EventStringAccInput(user[0].Address) + subscribe(t, con, eid) + defer func() { + unsubscribe(t, con, eid) + con.Close() + }() + amt := int64(100) + toAddr := user[1].Address + // broadcast the transaction, wait to hear about it + waitForEvent(t, con, eid, true, func() { + tx := makeDefaultSendTxSigned(t, wsTyp, toAddr, amt) + broadcastTx(t, wsTyp, tx) + }, func(eid string, b []byte) error { + return nil + }) + // but make sure we don't hear about it twice + waitForEvent(t, con, eid, false, func() { + }, func(eid string, b []byte) error { + return nil + }) +}*/ diff --git a/rpc/test/config.go b/rpc/test/config.go new file mode 100644 index 00000000..78769097 --- /dev/null +++ b/rpc/test/config.go @@ -0,0 +1,18 @@ +package rpctest + +import ( + cfg "github.com/tendermint/tendermint/config" + tmcfg "github.com/tendermint/tendermint/config/tendermint_test" +) + +var config cfg.Config = nil + +func initConfig() { + + cfg.OnConfig(func(newConfig cfg.Config) { + config = newConfig + }) + + c := tmcfg.GetConfig("") + cfg.ApplyConfig(c) // Notify modules of new config +} diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go new file mode 100644 index 00000000..a417cbea --- /dev/null +++ b/rpc/test/helpers.go @@ -0,0 +1,212 @@ +package rpctest + +import ( + "fmt" + "net/http" + "testing" + "time" + + "github.com/gorilla/websocket" + . "github.com/tendermint/go-common" + "github.com/tendermint/go-p2p" + "github.com/tendermint/go-wire" + + _ "github.com/tendermint/tendermint/config/tendermint_test" + nm "github.com/tendermint/tendermint/node" + client "github.com/tendermint/tendermint/rpc/client" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/rpc/types" + "github.com/tendermint/tendermint/types" +) + +// global variables for use across all tests +var ( + rpcAddr = "127.0.0.1:36657" // Not 46657 + requestAddr = "http://" + rpcAddr + websocketAddr = "ws://" + rpcAddr + "/websocket" + + node *nm.Node + + mempoolCount = 0 + + chainID string + + clientURI = client.NewClientURI(requestAddr) + clientJSON = client.NewClientJSONRPC(requestAddr) +) + +// create a new node and sleep forever +func newNode(ready chan struct{}) { + // Create & start node + node = nm.NewNode() + l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), true) + node.AddListener(l) + node.Start() + + // Run the RPC server. + node.StartRPC() + ready <- struct{}{} + + // Sleep forever + ch := make(chan struct{}) + <-ch +} + +// initialize config and create new node +func init() { + initConfig() + chainID = config.GetString("chain_id") + + // TODO: change consensus/state.go timeouts to be shorter + + // start a node + ready := make(chan struct{}) + go newNode(ready) + <-ready +} + +//-------------------------------------------------------------------------------- +// Utilities for testing the websocket service + +// create a new connection +func newWSCon(t *testing.T) *websocket.Conn { + dialer := websocket.DefaultDialer + rHeader := http.Header{} + con, r, err := dialer.Dial(websocketAddr, rHeader) + fmt.Println("response", r) + if err != nil { + t.Fatal(err) + } + return con +} + +// subscribe to an event +func subscribe(t *testing.T, con *websocket.Conn, eventid string) { + err := con.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "subscribe", + Params: []interface{}{eventid}, + }) + if err != nil { + t.Fatal(err) + } +} + +// unsubscribe from an event +func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) { + err := con.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "unsubscribe", + Params: []interface{}{eventid}, + }) + if err != nil { + t.Fatal(err) + } +} + +// wait for an event; do things that might trigger events, and check them when they are received +// the check function takes an event id and the byte slice read off the ws +func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeout bool, f func(), check func(string, []byte) error) { + // go routine to wait for webscoket msg + goodCh := make(chan []byte) + errCh := make(chan error) + quitCh := make(chan struct{}) + defer close(quitCh) + + // Read message + go func() { + for { + _, p, err := con.ReadMessage() + if err != nil { + errCh <- err + break + } else { + // if the event id isnt what we're waiting on + // ignore it + var response rpctypes.RPCResponse + var err error + wire.ReadJSON(&response, p, &err) + if err != nil { + errCh <- err + break + } + event, ok := response.Result.(*ctypes.ResultEvent) + if ok && event.Event == eventid { + goodCh <- p + break + } + } + } + }() + + // do stuff (transactions) + f() + + // wait for an event or timeout + timeout := time.NewTimer(10 * time.Second) + select { + case <-timeout.C: + if dieOnTimeout { + con.Close() + t.Fatalf("%s event was not received in time", eventid) + } + // else that's great, we didn't hear the event + // and we shouldn't have + case p := <-goodCh: + if dieOnTimeout { + // message was received and expected + // run the check + err := check(eventid, p) + if err != nil { + t.Fatal(err) + panic(err) // Show the stack trace. + } + } else { + con.Close() + t.Fatalf("%s event was not expected", eventid) + } + case err := <-errCh: + t.Fatal(err) + panic(err) // Show the stack trace. + } +} + +//-------------------------------------------------------------------------------- + +func unmarshalResponseNewBlock(b []byte) (*types.Block, error) { + // unmarshall and assert somethings + var response rpctypes.RPCResponse + var err error + wire.ReadJSON(&response, b, &err) + if err != nil { + return nil, err + } + if response.Error != "" { + return nil, fmt.Errorf(response.Error) + } + block := response.Result.(*ctypes.ResultEvent).Data.(types.EventDataNewBlock).Block + return block, nil +} + +func unmarshalValidateBlockchain(t *testing.T, con *websocket.Conn, eid string) { + var initBlockN int + for i := 0; i < 2; i++ { + waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error { + block, err := unmarshalResponseNewBlock(b) + if err != nil { + return err + } + if i == 0 { + initBlockN = block.Header.Height + } else { + if block.Header.Height != initBlockN+i { + return fmt.Errorf("Expected block %d, got block %d", i, block.Header.Height) + } + } + + return nil + }) + } +}