From 74cdadec9f890ffc559c1bb51061675d034bc6b5 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sat, 2 Jan 2016 16:23:29 -0800 Subject: [PATCH] Refactor RPC to be more general --- benchmarks/simu/counter.go | 4 +- rpc/client/ws_client.go | 20 +--- rpc/core/events.go | 27 +++++ rpc/core/routes.go | 2 + rpc/core/types/responses.go | 48 +++++---- rpc/server/handlers.go | 210 ++++++++++++++++++++---------------- rpc/server/http_server.go | 6 +- rpc/types/types.go | 50 +++++++-- 8 files changed, 224 insertions(+), 143 deletions(-) create mode 100644 rpc/core/events.go diff --git a/benchmarks/simu/counter.go b/benchmarks/simu/counter.go index 25b7703e..307f29d0 100644 --- a/benchmarks/simu/counter.go +++ b/benchmarks/simu/counter.go @@ -10,7 +10,7 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/rpc/client" - // ctypes "github.com/tendermint/tendermint/rpc/core/types" + _ "github.com/tendermint/tendermint/rpc/core/types" // Register RPCResponse > Result types "github.com/tendermint/tendermint/rpc/types" ) @@ -49,7 +49,7 @@ func main() { if i%1000 == 0 { fmt.Println(i) } - time.Sleep(time.Microsecond * 250) + time.Sleep(time.Microsecond * 1000) } ws.Stop() diff --git a/rpc/client/ws_client.go b/rpc/client/ws_client.go index 2ca2f44f..8124b63b 100644 --- a/rpc/client/ws_client.go +++ b/rpc/client/ws_client.go @@ -2,18 +2,15 @@ package rpcclient import ( "net/http" - "strings" "time" "github.com/gorilla/websocket" . "github.com/tendermint/go-common" "github.com/tendermint/go-wire" - ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/rpc/types" ) const ( - wsEventsChannelCapacity = 10 wsResultsChannelCapacity = 10 wsWriteTimeoutSeconds = 10 ) @@ -22,8 +19,7 @@ type WSClient struct { QuitService Address string *websocket.Conn - EventsCh chan ctypes.ResultEvent // closes upon WSClient.Stop() - ResultsCh chan ctypes.Result // closes upon WSClient.Stop() + ResultsCh chan rpctypes.Result // closes upon WSClient.Stop() } // create a new connection @@ -31,8 +27,7 @@ func NewWSClient(addr string) *WSClient { wsClient := &WSClient{ Address: addr, Conn: nil, - EventsCh: make(chan ctypes.ResultEvent, wsEventsChannelCapacity), - ResultsCh: make(chan ctypes.Result, wsResultsChannelCapacity), + ResultsCh: make(chan rpctypes.Result, wsResultsChannelCapacity), } wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient) return wsClient @@ -72,7 +67,7 @@ func (wsc *WSClient) dial() error { func (wsc *WSClient) OnStop() { wsc.QuitService.OnStop() - // EventsCh and ResultsCh are closed in receiveEventsRoutine. + // ResultsCh is closed in receiveEventsRoutine. } func (wsc *WSClient) receiveEventsRoutine() { @@ -83,23 +78,18 @@ func (wsc *WSClient) receiveEventsRoutine() { wsc.Stop() break } else { - var response ctypes.Response + var response rpctypes.RPCResponse wire.ReadJSON(&response, data, &err) if err != nil { log.Info("WSClient failed to parse message", "error", err) wsc.Stop() break } - if strings.HasSuffix(response.ID, "#event") { - wsc.EventsCh <- *response.Result.(*ctypes.ResultEvent) - } else { - wsc.ResultsCh <- response.Result - } + wsc.ResultsCh <- response.Result } } // Cleanup - close(wsc.EventsCh) close(wsc.ResultsCh) } diff --git a/rpc/core/events.go b/rpc/core/events.go new file mode 100644 index 00000000..8907e0bb --- /dev/null +++ b/rpc/core/events.go @@ -0,0 +1,27 @@ +package core + +import ( + ctypes "github.com/tendermint/tendermint/rpc/core/types" + "github.com/tendermint/tendermint/rpc/types" + "github.com/tendermint/tendermint/types" +) + +func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) { + log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) + wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { + // NOTE: EventSwitch callbacks must be nonblocking + // NOTE: RPCResponses of subscribed events have id suffix "#event" + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) + }) + return &ctypes.ResultSubscribe{}, nil +} + +func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) { + log.Notice("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event) + wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg types.EventData) { + // NOTE: EventSwitch callbacks must be nonblocking + // NOTE: RPCResponses of subscribed events have id suffix "#event" + wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) + }) + return &ctypes.ResultUnsubscribe{}, nil +} diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 3deb7ec2..4b743336 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -6,6 +6,8 @@ import ( // TODO: eliminate redundancy between here and reading code from core/ var Routes = map[string]*rpc.RPCFunc{ + "subscribe": rpc.NewWSRPCFunc(Subscribe, []string{"event"}), + "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, []string{"event"}), "status": rpc.NewRPCFunc(Status, []string{}), "net_info": rpc.NewRPCFunc(NetInfo, []string{}), "blockchain": rpc.NewRPCFunc(BlockchainInfo, []string{"minHeight", "maxHeight"}), diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 2dc15328..95c4aacf 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -4,6 +4,7 @@ import ( "github.com/tendermint/go-crypto" "github.com/tendermint/go-p2p" "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/rpc/types" "github.com/tendermint/tendermint/types" ) @@ -12,6 +13,10 @@ type ResultBlockchainInfo struct { BlockMetas []*types.BlockMeta `json:"block_metas"` } +type ResultGenesis struct { + Genesis *types.GenesisDoc `json:"genesis"` +} + type ResultGetBlock struct { BlockMeta *types.BlockMeta `json:"block_meta"` Block *types.Block `json:"block"` @@ -55,8 +60,10 @@ type ResultListUnconfirmedTxs struct { Txs []types.Tx `json:"txs"` } -type ResultGenesis struct { - Genesis *types.GenesisDoc `json:"genesis"` +type ResultSubscribe struct { +} + +type ResultUnsubscribe struct { } type ResultEvent struct { @@ -67,31 +74,25 @@ type ResultEvent struct { //---------------------------------------- // response & result types -type Response struct { - JSONRPC string `json:"jsonrpc"` - ID string `json:"id"` - Result Result `json:"result"` - Error string `json:"error"` -} - const ( - ResultTypeBlockchainInfo = byte(0x05) - ResultTypeGetBlock = byte(0x06) - ResultTypeStatus = byte(0x07) - ResultTypeNetInfo = byte(0x08) - ResultTypeListValidators = byte(0x09) - ResultTypeDumpConsensusState = byte(0x0A) - ResultTypeBroadcastTx = byte(0x0E) - ResultTypeListUnconfirmedTxs = byte(0x0F) - ResultTypeGenesis = byte(0x11) - ResultTypeEvent = byte(0x13) // so websockets can respond to rpc functions + ResultTypeGenesis = byte(0x01) + ResultTypeBlockchainInfo = byte(0x02) + ResultTypeGetBlock = byte(0x03) + ResultTypeStatus = byte(0x04) + ResultTypeNetInfo = byte(0x05) + ResultTypeListValidators = byte(0x06) + ResultTypeDumpConsensusState = byte(0x07) + ResultTypeBroadcastTx = byte(0x08) + ResultTypeListUnconfirmedTxs = byte(0x09) + ResultTypeSubscribe = byte(0x0A) + ResultTypeUnsubscribe = byte(0x0B) + ResultTypeEvent = byte(0x0C) ) -type Result interface{} - // for wire.readReflect var _ = wire.RegisterInterface( - struct{ Result }{}, + struct{ rpctypes.Result }{}, + wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, wire.ConcreteType{&ResultGetBlock{}, ResultTypeGetBlock}, wire.ConcreteType{&ResultStatus{}, ResultTypeStatus}, @@ -100,6 +101,7 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState}, wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx}, wire.ConcreteType{&ResultListUnconfirmedTxs{}, ResultTypeListUnconfirmedTxs}, - wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, + wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, + wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, wire.ConcreteType{&ResultEvent{}, ResultTypeEvent}, ) diff --git a/rpc/server/handlers.go b/rpc/server/handlers.go index 40e3abe7..a28277f7 100644 --- a/rpc/server/handlers.go +++ b/rpc/server/handlers.go @@ -15,9 +15,7 @@ import ( . "github.com/tendermint/go-common" "github.com/tendermint/go-wire" "github.com/tendermint/tendermint/events" - ctypes "github.com/tendermint/tendermint/rpc/core/types" . "github.com/tendermint/tendermint/rpc/types" - "github.com/tendermint/tendermint/types" ) func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { @@ -39,6 +37,7 @@ type RPCFunc struct { args []reflect.Type // type of each function arg returns []reflect.Type // type of each return arg argNames []string // name of each argument + ws bool // websocket only } // wraps a function for quicker introspection @@ -48,6 +47,17 @@ func NewRPCFunc(f interface{}, args []string) *RPCFunc { args: funcArgTypes(f), returns: funcReturnTypes(f), argNames: args, + ws: false, + } +} + +func NewWSRPCFunc(f interface{}, args []string) *RPCFunc { + return &RPCFunc{ + f: reflect.ValueOf(f), + args: funcArgTypes(f), + returns: funcReturnTypes(f), + argNames: args, + ws: true, } } @@ -91,35 +101,39 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc { var request RPCRequest err := json.Unmarshal(b, &request) if err != nil { - WriteRPCResponse(w, NewRPCResponse("", nil, err.Error())) + WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) return } if len(r.URL.Path) > 1 { - WriteRPCResponse(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path))) + WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path))) return } rpcFunc := funcMap[request.Method] if rpcFunc == nil { - WriteRPCResponse(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) + WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) + return + } + if rpcFunc.ws { + WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method)) return } args, err := jsonParamsToArgs(rpcFunc, request.Params) if err != nil { - WriteRPCResponse(w, NewRPCResponse(request.ID, nil, err.Error())) + WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) return } returns := rpcFunc.f.Call(args) log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) result, err := unreflectResult(returns) if err != nil { - WriteRPCResponse(w, NewRPCResponse(request.ID, nil, err.Error())) + WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) return } - WriteRPCResponse(w, NewRPCResponse(request.ID, result, "")) + WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, "")) } } -// covert a list of interfaces to properly typed values +// Convert a list of interfaces to properly typed values func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) { if len(rpcFunc.argNames) != len(params) { return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", @@ -137,6 +151,25 @@ func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, return values, nil } +// Same as above, but with the first param the websocket connection +func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) { + if len(rpcFunc.argNames)-1 != len(params) { + return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", + len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params)) + } + values := make([]reflect.Value, len(params)+1) + values[0] = reflect.ValueOf(wsCtx) + for i, p := range params { + ty := rpcFunc.args[i+1] + v, err := _jsonObjectToArg(ty, p) + if err != nil { + return nil, err + } + values[i+1] = v + } + return values, nil +} + func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) { var err error v := reflect.New(ty) @@ -154,20 +187,27 @@ func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error // convert from a function name to the http handler func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) { + // Exception for websocket endpoints + if rpcFunc.ws { + return func(w http.ResponseWriter, r *http.Request) { + WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets")) + } + } + // All other endpoints return func(w http.ResponseWriter, r *http.Request) { args, err := httpParamsToArgs(rpcFunc, r) if err != nil { - WriteRPCResponse(w, NewRPCResponse("", nil, err.Error())) + WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) return } returns := rpcFunc.f.Call(args) log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) result, err := unreflectResult(returns) if err != nil { - WriteRPCResponse(w, NewRPCResponse("", nil, err.Error())) + WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) return } - WriteRPCResponse(w, NewRPCResponse("", result, "")) + WriteRPCResponseHTTP(w, NewRPCResponse("", result, "")) } } @@ -215,10 +255,10 @@ const ( // a single websocket connection // contains listener id, underlying ws connection, // and the event switch for subscribing to events -type WSConnection struct { +type wsConnection struct { QuitService - id string + remoteAddr string baseConn *websocket.Conn writeChan chan RPCResponse readTimeout *time.Timer @@ -229,20 +269,20 @@ type WSConnection struct { } // new websocket connection wrapper -func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WSConnection { - wsc := &WSConnection{ - id: baseConn.RemoteAddr().String(), - baseConn: baseConn, - writeChan: make(chan RPCResponse, writeChanCapacity), // error when full. - funcMap: funcMap, - evsw: evsw, +func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *wsConnection { + wsc := &wsConnection{ + remoteAddr: baseConn.RemoteAddr().String(), + baseConn: baseConn, + writeChan: make(chan RPCResponse, writeChanCapacity), // error when full. + funcMap: funcMap, + evsw: evsw, } - wsc.QuitService = *NewQuitService(log, "WSConnection", wsc) + wsc.QuitService = *NewQuitService(log, "wsConnection", wsc) return wsc } // wsc.Start() blocks until the connection closes. -func (wsc *WSConnection) OnStart() error { +func (wsc *wsConnection) OnStart() error { wsc.QuitService.OnStart() // Read subscriptions/unsubscriptions to events @@ -269,9 +309,9 @@ func (wsc *WSConnection) OnStart() error { return nil } -func (wsc *WSConnection) OnStop() { +func (wsc *wsConnection) OnStop() { wsc.QuitService.OnStop() - wsc.evsw.RemoveListener(wsc.id) + wsc.evsw.RemoveListener(wsc.remoteAddr) wsc.readTimeout.Stop() wsc.pingTicker.Stop() // The write loop closes the websocket connection @@ -279,7 +319,7 @@ func (wsc *WSConnection) OnStop() { // closes the writeChan } -func (wsc *WSConnection) readTimeoutRoutine() { +func (wsc *wsConnection) readTimeoutRoutine() { select { case <-wsc.readTimeout.C: log.Notice("Stopping connection due to read timeout") @@ -289,8 +329,19 @@ func (wsc *WSConnection) readTimeoutRoutine() { } } +// Implements WSRPCConnection +func (wsc *wsConnection) GetRemoteAddr() string { + return wsc.remoteAddr +} + +// Implements WSRPCConnection +func (wsc *wsConnection) GetEventSwitch() *events.EventSwitch { + return wsc.evsw +} + +// Implements WSRPCConnection // Blocking write to writeChan until service stops. -func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) { +func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) { select { case <-wsc.Quit: return @@ -298,8 +349,9 @@ func (wsc *WSConnection) writeRPCResponse(resp RPCResponse) { } } +// Implements WSRPCConnection // Nonblocking write. -func (wsc *WSConnection) tryWriteRPCResponse(resp RPCResponse) bool { +func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool { select { case <-wsc.Quit: return false @@ -311,8 +363,8 @@ func (wsc *WSConnection) tryWriteRPCResponse(resp RPCResponse) bool { } // Read from the socket and subscribe to or unsubscribe from events -func (wsc *WSConnection) readRoutine() { - // Do not close writeChan, to allow writeRPCResponse() to fail. +func (wsc *wsConnection) readRoutine() { + // Do not close writeChan, to allow WriteRPCResponse() to fail. // defer close(wsc.writeChan) for { @@ -327,7 +379,7 @@ func (wsc *WSConnection) readRoutine() { // We use `readTimeout` to handle read timeouts. _, in, err := wsc.baseConn.ReadMessage() if err != nil { - log.Notice("Failed to read from connection", "id", wsc.id) + log.Notice("Failed to read from connection", "remote", wsc.remoteAddr) // an error reading the connection, // kill the connection wsc.Stop() @@ -337,75 +389,45 @@ func (wsc *WSConnection) readRoutine() { err = json.Unmarshal(in, &request) if err != nil { errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) - wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, errStr)) + wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr)) continue } - switch request.Method { - case "subscribe": - if len(request.Params) != 1 { - wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "subscribe takes 1 event parameter string")) - continue - } - if event, ok := request.Params[0].(string); !ok { - wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "subscribe takes 1 event parameter string")) - continue - } else { - log.Notice("Subscribe to event", "id", wsc.id, "event", event) - wsc.evsw.AddListenerForEvent(wsc.id, event, func(msg types.EventData) { - // NOTE: EventSwitch callbacks must be nonblocking - // NOTE: RPCResponses of subscribed events have id suffix "#event" - wsc.tryWriteRPCResponse(NewRPCResponse(request.ID+"#event", ctypes.ResultEvent{event, msg}, "")) - }) - continue - } - case "unsubscribe": - if len(request.Params) == 0 { - log.Notice("Unsubscribe from all events", "id", wsc.id) - wsc.evsw.RemoveListener(wsc.id) - wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "")) - continue - } else if len(request.Params) == 1 { - if event, ok := request.Params[0].(string); !ok { - wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "unsubscribe takes 0 or 1 event parameter strings")) - continue - } else { - log.Notice("Unsubscribe from event", "id", wsc.id, "event", event) - wsc.evsw.RemoveListenerForEvent(event, wsc.id) - wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "")) - continue - } - } else { - wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "unsubscribe takes 0 or 1 event parameter strings")) - continue - } - default: - rpcFunc := wsc.funcMap[request.Method] - if rpcFunc == nil { - wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) - continue - } - args, err := jsonParamsToArgs(rpcFunc, request.Params) - if err != nil { - wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) - continue - } - returns := rpcFunc.f.Call(args) - log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns) - result, err := unreflectResult(returns) - if err != nil { - wsc.writeRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) - continue - } else { - wsc.writeRPCResponse(NewRPCResponse(request.ID, result, "")) - continue - } + + // Now, fetch the RPCFunc and execute it. + + rpcFunc := wsc.funcMap[request.Method] + if rpcFunc == nil { + wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) + continue } + var args []reflect.Value + if rpcFunc.ws { + wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc} + args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx) + } else { + args, err = jsonParamsToArgs(rpcFunc, request.Params) + } + if err != nil { + wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) + continue + } + returns := rpcFunc.f.Call(args) + log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) + continue + } else { + wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, "")) + continue + } + } } } // receives on a write channel and writes out on the socket -func (wsc *WSConnection) writeRoutine() { +func (wsc *wsConnection) writeRoutine() { defer wsc.baseConn.Close() var n, err = int(0), error(nil) for { @@ -463,7 +485,7 @@ func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) } } -// Upgrade the request/response (via http.Hijack) and starts the WSConnection. +// Upgrade the request/response (via http.Hijack) and starts the wsConnection. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { wsConn, err := wm.Upgrade(w, r, nil) if err != nil { @@ -474,7 +496,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ // register connection con := NewWSConnection(wsConn, wm.funcMap, wm.evsw) - log.Notice("New websocket connection", "origin", con.id) + log.Notice("New websocket connection", "remote", con.remoteAddr) con.Start() // Blocking } diff --git a/rpc/server/http_server.go b/rpc/server/http_server.go index 90669006..9e52a2b2 100644 --- a/rpc/server/http_server.go +++ b/rpc/server/http_server.go @@ -32,7 +32,7 @@ func StartHTTPServer(listenAddr string, handler http.Handler) (net.Listener, err return listener, nil } -func WriteRPCResponse(w http.ResponseWriter, res RPCResponse) { +func WriteRPCResponseHTTP(w http.ResponseWriter, res RPCResponse) { buf, n, err := new(bytes.Buffer), int(0), error(nil) wire.WriteJSON(res, buf, &n, &err) if err != nil { @@ -70,12 +70,12 @@ func RecoverAndLogHandler(handler http.Handler) http.Handler { // If RPCResponse if res, ok := e.(RPCResponse); ok { - WriteRPCResponse(rww, res) + WriteRPCResponseHTTP(rww, res) } else { // For the rest, log.Error("Panic in RPC HTTP handler", "error", e, "stack", string(debug.Stack())) rww.WriteHeader(http.StatusInternalServerError) - WriteRPCResponse(rww, NewRPCResponse("", nil, Fmt("Internal Server Error: %v", e))) + WriteRPCResponseHTTP(rww, NewRPCResponse("", nil, Fmt("Internal Server Error: %v", e))) } } diff --git a/rpc/types/types.go b/rpc/types/types.go index ba6ff0d6..f7bc98d5 100644 --- a/rpc/types/types.go +++ b/rpc/types/types.go @@ -1,5 +1,9 @@ package rpctypes +import ( + "github.com/tendermint/tendermint/events" +) + type RPCRequest struct { JSONRPC string `json:"jsonrpc"` ID string `json:"id"` @@ -16,14 +20,32 @@ func NewRPCRequest(id string, method string, params []interface{}) RPCRequest { } } -type RPCResponse struct { - JSONRPC string `json:"jsonrpc"` - ID string `json:"id"` - Result interface{} `json:"result"` - Error string `json:"error"` +//---------------------------------------- + +/* +Result is a generic interface. +Applications should register type-bytes like so: + +var _ = wire.RegisterInterface( + struct{ Result }{}, + wire.ConcreteType{&ResultGenesis{}, ResultTypeGenesis}, + wire.ConcreteType{&ResultBlockchainInfo{}, ResultTypeBlockchainInfo}, + ... +) +*/ +type Result interface { } -func NewRPCResponse(id string, res interface{}, err string) RPCResponse { +//---------------------------------------- + +type RPCResponse struct { + JSONRPC string `json:"jsonrpc"` + ID string `json:"id"` + Result Result `json:"result"` + Error string `json:"error"` +} + +func NewRPCResponse(id string, res Result, err string) RPCResponse { return RPCResponse{ JSONRPC: "2.0", ID: id, @@ -31,3 +53,19 @@ func NewRPCResponse(id string, res interface{}, err string) RPCResponse { Error: err, } } + +//---------------------------------------- + +// *wsConnection implements this interface. +type WSRPCConnection interface { + GetRemoteAddr() string + GetEventSwitch() *events.EventSwitch + WriteRPCResponse(resp RPCResponse) + TryWriteRPCResponse(resp RPCResponse) bool +} + +// websocket-only RPCFuncs take this as the first parameter. +type WSRPCContext struct { + Request RPCRequest + WSRPCConnection +}