ws fixes; rpc tests

This commit is contained in:
Ethan Buchman 2016-01-10 16:33:52 -05:00
parent 790cde028b
commit 96547d0ca8
8 changed files with 515 additions and 64 deletions

View File

@ -30,6 +30,7 @@ func initTMRoot(rootDir string) {
configFilePath := path.Join(rootDir, "config.toml")
genesisFilePath := path.Join(rootDir, "genesis.json")
privFilePath := path.Join(rootDir, "priv_validator.json")
// Write default config file if missing.
if !FileExists(configFilePath) {
@ -40,6 +41,9 @@ func initTMRoot(rootDir string) {
if !FileExists(genesisFilePath) {
MustWriteFile(genesisFilePath, []byte(defaultGenesis), 0644)
}
if !FileExists(privFilePath) {
MustWriteFile(privFilePath, []byte(defaultPrivValidator), 0644)
}
}
func GetConfig(rootDir string) cfg.Config {
@ -58,7 +62,7 @@ func GetConfig(rootDir string) cfg.Config {
}
mapConfig.SetDefault("chain_id", "tendermint_test")
mapConfig.SetDefault("genesis_file", rootDir+"/genesis.json")
mapConfig.SetDefault("proxy_app", "tcp://127.0.0.1:36658")
mapConfig.SetDefault("proxy_app", "local")
mapConfig.SetDefault("moniker", "anonymous")
mapConfig.SetDefault("node_laddr", "0.0.0.0:36656")
mapConfig.SetDefault("fast_sync", false)
@ -78,7 +82,7 @@ func GetConfig(rootDir string) cfg.Config {
var defaultConfigTmpl = `# This is a TOML config file.
# For more information, see https://github.com/toml-lang/toml
proxy_app = "tcp://127.0.0.1:36658"
proxy_app = "local"
moniker = "__MONIKER__"
node_laddr = "0.0.0.0:36656"
seeds = ""
@ -93,50 +97,33 @@ func defaultConfig(moniker string) (defaultConfig string) {
return
}
// priv keys generated deterministically eg rpc/tests/helpers.go
var defaultGenesis = `{
"chain_id" : "tendermint_test",
"accounts": [
{
"address": "E9B5D87313356465FAE33C406CE2C2979DE60BCB",
"amount": 200000000
},
{
"address": "DFE4AFFA4CEE17CD01CB9E061D77C3ECED29BD88",
"amount": 200000000
},
{
"address": "F60D30722E7B497FA532FB3207C3FB29C31B1992",
"amount": 200000000
},
{
"address": "336CB40A5EB92E496E19B74FDFF2BA017C877FD6",
"amount": 200000000
},
{
"address": "D218F0F439BF0384F6F5EF8D0F8B398D941BD1DC",
"amount": 200000000
}
],
"genesis_time": "0001-01-01T00:00:00.000Z",
"chain_id": "tendermint_test",
"validators": [
{
"pub_key": [1, "583779C3BFA3F6C7E23C7D830A9C3D023A216B55079AD38BFED1207B94A19548"],
"amount": 1000000,
"unbond_to": [
{
"address": "E9B5D87313356465FAE33C406CE2C2979DE60BCB",
"amount": 100000
}
]
"pub_key": [
1,
"3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8"
],
"amount": 10,
"name": ""
}
]
],
"app_hash": ""
}`
var defaultPrivValidator = `{
"address": "1D7A91CB32F758A02EBB9BE1FB6F8DEE56F90D42",
"pub_key": [1,"06FBAC4E285285D1D91FCBC7E91C780ADA11516F67462340B3980CE2B94940E8"],
"priv_key": [1,"C453604BD6480D5538B4C6FD2E3E314B5BCE518D75ADE4DA3DA85AB8ADFD819606FBAC4E285285D1D91FCBC7E91C780ADA11516F67462340B3980CE2B94940E8"],
"last_height":0,
"last_round":0,
"last_step":0
"address": "D028C9981F7A87F3093672BF0D5B0E2A1B3ED456",
"pub_key": [
1,
"3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8"
],
"priv_key": [
1,
"27F82582AEFAE7AB151CFB01C48BB6C1A0DA78F9BDDA979A9F70A84D074EB07D3B3069C422E19688B45CBFAE7BB009FC0FA1B1EA86593519318B7214853803C8"
],
"last_height": 0,
"last_round": 0,
"last_step": 0
}`

View File

@ -24,6 +24,7 @@ import (
"github.com/tendermint/tendermint/rpc/server"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tmsp/example/golang"
)
import _ "net/http/pprof"
@ -320,14 +321,22 @@ func getState() *sm.State {
// Get a connection to the proxyAppConn addr.
// Check the current hash, and panic if it doesn't match.
func getProxyApp(addr string, hash []byte) proxy.AppConn {
proxyConn, err := Connect(addr)
if err != nil {
Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
}
proxyAppConn := proxy.NewRemoteAppConn(proxyConn, 1024)
func getProxyApp(addr string, hash []byte) (proxyAppCtx proxy.AppContext) {
// use local app (for testing)
if addr == "local" {
app := example.NewCounterApplication(true)
appCtx := app.Open()
proxyAppCtx = proxy.NewLocalAppContext(appCtx)
} else {
proxyConn, err := Connect(addr)
if err != nil {
Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
}
proxyAppCtx := proxy.NewRemoteAppContext(proxyConn, 1024)
proxyAppConn.Start()
proxyAppCtx.Start()
}
// Check the hash
currentHash, err := proxyAppConn.GetHashSync()

View File

@ -2,17 +2,47 @@ package rpcclient
import (
"bytes"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"net/url"
"strings"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/rpc/types"
)
func CallHTTP(remote string, method string, params []interface{}, dest interface{}) (interface{}, error) {
// JSON rpc takes params as a slice
type ClientJSONRPC struct {
remote string
}
func NewClientJSONRPC(remote string) *ClientJSONRPC {
return &ClientJSONRPC{remote}
}
func (c *ClientJSONRPC) Call(method string, params []interface{}) (interface{}, error) {
return CallHTTP_JSONRPC(c.remote, method, params)
}
// URI takes params as a map
type ClientURI struct {
remote string
}
func NewClientURI(remote string) *ClientURI {
if !strings.HasSuffix(remote, "/") {
remote = remote + "/"
}
return &ClientURI{remote}
}
func (c *ClientURI) Call(method string, params map[string]interface{}) (interface{}, error) {
return CallHTTP_URI(c.remote, method, params)
}
func CallHTTP_JSONRPC(remote string, method string, params []interface{}) (interface{}, error) {
// Make request and get responseBytes
request := rpctypes.RPCRequest{
JSONRPC: "2.0",
@ -25,27 +55,79 @@ func CallHTTP(remote string, method string, params []interface{}, dest interface
log.Info(Fmt("RPC request to %v: %v", remote, string(requestBytes)))
httpResponse, err := http.Post(remote, "text/json", requestBuf)
if err != nil {
return dest, err
return nil, err
}
defer httpResponse.Body.Close()
responseBytes, err := ioutil.ReadAll(httpResponse.Body)
if err != nil {
return dest, err
return nil, err
}
log.Info(Fmt("RPC response: %v", string(responseBytes)))
return unmarshalResponseBytes(responseBytes)
}
// Parse response into JSONResponse
response := rpctypes.RPCResponse{}
err = json.Unmarshal(responseBytes, &response)
func CallHTTP_URI(remote string, method string, params map[string]interface{}) (interface{}, error) {
values, err := argsToURLValues(params)
if err != nil {
return dest, err
return nil, err
}
log.Info(Fmt("URI request to %v: %v", remote, values))
resp, err := http.PostForm(remote+method, values)
if err != nil {
return nil, err
}
defer resp.Body.Close()
responseBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return unmarshalResponseBytes(responseBytes)
}
//------------------------------------------------
func unmarshalResponseBytes(responseBytes []byte) (interface{}, error) {
// read response
// if rpc/core/types is imported, the result will unmarshal
// into the correct type
var err error
response := &rpctypes.RPCResponse{}
wire.ReadJSON(response, responseBytes, &err)
if err != nil {
return nil, err
}
// Parse response into dest
resultJSONObject := response.Result
errorStr := response.Error
if errorStr != "" {
return dest, errors.New(errorStr)
return nil, errors.New(errorStr)
}
dest = wire.ReadJSONObject(dest, resultJSONObject, &err)
return dest, err
return response.Result, err
}
func argsToURLValues(args map[string]interface{}) (url.Values, error) {
values := make(url.Values)
if len(args) == 0 {
return values, nil
}
err := argsToJson(args)
if err != nil {
return nil, err
}
for key, val := range args {
values.Set(key, val.(string))
}
return values, nil
}
func argsToJson(args map[string]interface{}) error {
var n int
var err error
for k, v := range args {
buf := new(bytes.Buffer)
wire.WriteJSON(v, buf, &n, &err)
if err != nil {
return err
}
args[k] = buf.String()
}
return nil
}

View File

@ -11,7 +11,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) {
// NOTE: EventSwitch callbacks must be nonblocking
// NOTE: RPCResponses of subscribed events have id suffix "#event"
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, ""))
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, ""))
})
return &ctypes.ResultSubscribe{}, nil
}
@ -21,7 +21,7 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsub
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) {
// NOTE: EventSwitch callbacks must be nonblocking
// NOTE: RPCResponses of subscribed events have id suffix "#event"
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, ""))
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &ctypes.ResultEvent{event, msg}, ""))
})
return &ctypes.ResultUnsubscribe{}, nil
}

View File

@ -153,7 +153,7 @@ func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value,
// Same as above, but with the first param the websocket connection
func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) {
if len(rpcFunc.argNames)-1 != len(params) {
if len(rpcFunc.argNames) != len(params) {
return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)",
len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params))
}

143
rpc/test/client_test.go Normal file
View File

@ -0,0 +1,143 @@
package rpctest
import (
"fmt"
"testing"
_ "github.com/tendermint/tendermint/config/tendermint_test"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
)
//--------------------------------------------------------------------------------
// Test the HTTP client
func TestURIStatus(t *testing.T) {
result, err := clientURI.Call("status", map[string]interface{}{})
if err != nil {
t.Fatal(err)
}
testStatus(t, result)
}
func TestJSONStatus(t *testing.T) {
result, err := clientJSON.Call("status", []interface{}{})
if err != nil {
t.Fatal(err)
}
testStatus(t, result)
}
func testStatus(t *testing.T, result interface{}) {
status := result.(*ctypes.ResultStatus)
if status.NodeInfo.Network != chainID {
t.Fatal(fmt.Errorf("ChainID mismatch: got %s expected %s",
status.NodeInfo.Network, chainID))
}
}
/*func TestURIBroadcastTx(t *testing.T) {
testBroadcastTx(t, "HTTP")
}*/
/*func TestJSONBroadcastTx(t *testing.T) {
testBroadcastTx(t, "JSONRPC")
}*/
// TODO
/*
func testBroadcastTx(t *testing.T, typ string) {
amt := int64(100)
toAddr := user[1].Address
tx := makeDefaultSendTxSigned(t, typ, toAddr, amt)
receipt := broadcastTx(t, typ, tx)
if receipt.CreatesContract > 0 {
t.Fatal("This tx does not create a contract")
}
if len(receipt.TxHash) == 0 {
t.Fatal("Failed to compute tx hash")
}
pool := node.MempoolReactor().Mempool
txs := pool.GetProposalTxs()
if len(txs) != mempoolCount {
t.Fatalf("The mem pool has %d txs. Expected %d", len(txs), mempoolCount)
}
tx2 := txs[mempoolCount-1].(*types.SendTx)
n, err := new(int64), new(error)
buf1, buf2 := new(bytes.Buffer), new(bytes.Buffer)
tx.WriteSignBytes(chainID, buf1, n, err)
tx2.WriteSignBytes(chainID, buf2, n, err)
if bytes.Compare(buf1.Bytes(), buf2.Bytes()) != 0 {
t.Fatal("inconsistent hashes for mempool tx and sent tx")
}
}*/
//--------------------------------------------------------------------------------
// Test the websocket service
var wsTyp = "JSONRPC"
// make a simple connection to the server
func TestWSConnect(t *testing.T) {
con := newWSCon(t)
con.Close()
}
// receive a new block message
func TestWSNewBlock(t *testing.T) {
con := newWSCon(t)
eid := types.EventStringNewBlock()
subscribe(t, con, eid)
defer func() {
unsubscribe(t, con, eid)
con.Close()
}()
waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error {
fmt.Println("Check:", string(b))
return nil
})
}
// receive a few new block messages in a row, with increasing height
func TestWSBlockchainGrowth(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
con := newWSCon(t)
eid := types.EventStringNewBlock()
subscribe(t, con, eid)
defer func() {
unsubscribe(t, con, eid)
con.Close()
}()
// listen for NewBlock, ensure height increases by 1
unmarshalValidateBlockchain(t, con, eid)
}
/* TODO: this with dummy app..
func TestWSDoubleFire(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
con := newWSCon(t)
eid := types.EventStringAccInput(user[0].Address)
subscribe(t, con, eid)
defer func() {
unsubscribe(t, con, eid)
con.Close()
}()
amt := int64(100)
toAddr := user[1].Address
// broadcast the transaction, wait to hear about it
waitForEvent(t, con, eid, true, func() {
tx := makeDefaultSendTxSigned(t, wsTyp, toAddr, amt)
broadcastTx(t, wsTyp, tx)
}, func(eid string, b []byte) error {
return nil
})
// but make sure we don't hear about it twice
waitForEvent(t, con, eid, false, func() {
}, func(eid string, b []byte) error {
return nil
})
}*/

18
rpc/test/config.go Normal file
View File

@ -0,0 +1,18 @@
package rpctest
import (
cfg "github.com/tendermint/tendermint/config"
tmcfg "github.com/tendermint/tendermint/config/tendermint_test"
)
var config cfg.Config = nil
func initConfig() {
cfg.OnConfig(func(newConfig cfg.Config) {
config = newConfig
})
c := tmcfg.GetConfig("")
cfg.ApplyConfig(c) // Notify modules of new config
}

212
rpc/test/helpers.go Normal file
View File

@ -0,0 +1,212 @@
package rpctest
import (
"fmt"
"net/http"
"testing"
"time"
"github.com/gorilla/websocket"
. "github.com/tendermint/go-common"
"github.com/tendermint/go-p2p"
"github.com/tendermint/go-wire"
_ "github.com/tendermint/tendermint/config/tendermint_test"
nm "github.com/tendermint/tendermint/node"
client "github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/rpc/types"
"github.com/tendermint/tendermint/types"
)
// global variables for use across all tests
var (
rpcAddr = "127.0.0.1:36657" // Not 46657
requestAddr = "http://" + rpcAddr
websocketAddr = "ws://" + rpcAddr + "/websocket"
node *nm.Node
mempoolCount = 0
chainID string
clientURI = client.NewClientURI(requestAddr)
clientJSON = client.NewClientJSONRPC(requestAddr)
)
// create a new node and sleep forever
func newNode(ready chan struct{}) {
// Create & start node
node = nm.NewNode()
l := p2p.NewDefaultListener("tcp", config.GetString("node_laddr"), true)
node.AddListener(l)
node.Start()
// Run the RPC server.
node.StartRPC()
ready <- struct{}{}
// Sleep forever
ch := make(chan struct{})
<-ch
}
// initialize config and create new node
func init() {
initConfig()
chainID = config.GetString("chain_id")
// TODO: change consensus/state.go timeouts to be shorter
// start a node
ready := make(chan struct{})
go newNode(ready)
<-ready
}
//--------------------------------------------------------------------------------
// Utilities for testing the websocket service
// create a new connection
func newWSCon(t *testing.T) *websocket.Conn {
dialer := websocket.DefaultDialer
rHeader := http.Header{}
con, r, err := dialer.Dial(websocketAddr, rHeader)
fmt.Println("response", r)
if err != nil {
t.Fatal(err)
}
return con
}
// subscribe to an event
func subscribe(t *testing.T, con *websocket.Conn, eventid string) {
err := con.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "subscribe",
Params: []interface{}{eventid},
})
if err != nil {
t.Fatal(err)
}
}
// unsubscribe from an event
func unsubscribe(t *testing.T, con *websocket.Conn, eventid string) {
err := con.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "unsubscribe",
Params: []interface{}{eventid},
})
if err != nil {
t.Fatal(err)
}
}
// wait for an event; do things that might trigger events, and check them when they are received
// the check function takes an event id and the byte slice read off the ws
func waitForEvent(t *testing.T, con *websocket.Conn, eventid string, dieOnTimeout bool, f func(), check func(string, []byte) error) {
// go routine to wait for webscoket msg
goodCh := make(chan []byte)
errCh := make(chan error)
quitCh := make(chan struct{})
defer close(quitCh)
// Read message
go func() {
for {
_, p, err := con.ReadMessage()
if err != nil {
errCh <- err
break
} else {
// if the event id isnt what we're waiting on
// ignore it
var response rpctypes.RPCResponse
var err error
wire.ReadJSON(&response, p, &err)
if err != nil {
errCh <- err
break
}
event, ok := response.Result.(*ctypes.ResultEvent)
if ok && event.Event == eventid {
goodCh <- p
break
}
}
}
}()
// do stuff (transactions)
f()
// wait for an event or timeout
timeout := time.NewTimer(10 * time.Second)
select {
case <-timeout.C:
if dieOnTimeout {
con.Close()
t.Fatalf("%s event was not received in time", eventid)
}
// else that's great, we didn't hear the event
// and we shouldn't have
case p := <-goodCh:
if dieOnTimeout {
// message was received and expected
// run the check
err := check(eventid, p)
if err != nil {
t.Fatal(err)
panic(err) // Show the stack trace.
}
} else {
con.Close()
t.Fatalf("%s event was not expected", eventid)
}
case err := <-errCh:
t.Fatal(err)
panic(err) // Show the stack trace.
}
}
//--------------------------------------------------------------------------------
func unmarshalResponseNewBlock(b []byte) (*types.Block, error) {
// unmarshall and assert somethings
var response rpctypes.RPCResponse
var err error
wire.ReadJSON(&response, b, &err)
if err != nil {
return nil, err
}
if response.Error != "" {
return nil, fmt.Errorf(response.Error)
}
block := response.Result.(*ctypes.ResultEvent).Data.(types.EventDataNewBlock).Block
return block, nil
}
func unmarshalValidateBlockchain(t *testing.T, con *websocket.Conn, eid string) {
var initBlockN int
for i := 0; i < 2; i++ {
waitForEvent(t, con, eid, true, func() {}, func(eid string, b []byte) error {
block, err := unmarshalResponseNewBlock(b)
if err != nil {
return err
}
if i == 0 {
initBlockN = block.Header.Height
} else {
if block.Header.Height != initBlockN+i {
return fmt.Errorf("Expected block %d, got block %d", i, block.Header.Height)
}
}
return nil
})
}
}