diff --git a/CHANGELOG.md b/CHANGELOG.md index e7f9baf2..5afc0221 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 0.20.1 + +BUG FIXES: + +- [rpc] fix memory leak in Websocket (when using `/subscribe` method) + ## 0.20.0 *June 6th, 2018* diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 684ff358..776e0653 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -156,6 +156,8 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou if _, ok = s.subscriptions[clientID]; !ok { s.subscriptions[clientID] = make(map[string]Query) } + // preserve original query + // see Unsubscribe s.subscriptions[clientID][query.String()] = query s.mtx.Unlock() return nil @@ -314,6 +316,9 @@ func (state *state) remove(clientID string, q Query) { } delete(state.queries[q], clientID) + if len(state.queries[q]) == 0 { + delete(state.queries, q) + } } } @@ -328,8 +333,10 @@ func (state *state) removeAll(clientID string) { close(ch) delete(state.queries[q], clientID) + if len(state.queries[q]) == 0 { + delete(state.queries, q) + } } - delete(state.clients, clientID) } diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index b478fd33..6cc03012 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -17,7 +17,7 @@ import ( "github.com/gorilla/websocket" "github.com/pkg/errors" - "github.com/tendermint/go-amino" + amino "github.com/tendermint/go-amino" types "github.com/tendermint/tendermint/rpc/lib/types" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" @@ -613,11 +613,9 @@ func (wsc *wsConnection) readRoutine() { if err != nil { wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err)) continue - } else { - wsc.WriteRPCResponse(types.NewRPCSuccessResponse(wsc.cdc, request.ID, result)) - continue } + wsc.WriteRPCResponse(types.NewRPCSuccessResponse(wsc.cdc, request.ID, result)) } } } @@ -684,20 +682,20 @@ func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error //---------------------------------------- -// WebsocketManager is the main manager for all websocket connections. -// It holds the event switch and a map of functions for routing. +// WebsocketManager provides a WS handler for incoming connections and passes a +// map of functions along with any additional params to new connections. // NOTE: The websocket path is defined externally, e.g. in node/node.go type WebsocketManager struct { websocket.Upgrader + funcMap map[string]*RPCFunc cdc *amino.Codec logger log.Logger wsConnOptions []func(*wsConnection) } -// NewWebsocketManager returns a new WebsocketManager that routes according to -// the given funcMap and connects to the server with the given connection -// options. +// NewWebsocketManager returns a new WebsocketManager that passes a map of +// functions, connection options and logger to new WS connections. func NewWebsocketManager(funcMap map[string]*RPCFunc, cdc *amino.Codec, wsConnOptions ...func(*wsConnection)) *WebsocketManager { return &WebsocketManager{ funcMap: funcMap, @@ -718,7 +716,8 @@ func (wm *WebsocketManager) SetLogger(l log.Logger) { wm.logger = l } -// WebsocketHandler upgrades the request/response (via http.Hijack) and starts the wsConnection. +// WebsocketHandler upgrades the request/response (via http.Hijack) and starts +// the wsConnection. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { wsConn, err := wm.Upgrade(w, r, nil) if err != nil { diff --git a/rpc/lib/server/handlers_test.go b/rpc/lib/server/handlers_test.go index 86de0e4c..b1ea4675 100644 --- a/rpc/lib/server/handlers_test.go +++ b/rpc/lib/server/handlers_test.go @@ -9,15 +9,23 @@ import ( "strings" "testing" + "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/tendermint/go-amino" + amino "github.com/tendermint/go-amino" rs "github.com/tendermint/tendermint/rpc/lib/server" types "github.com/tendermint/tendermint/rpc/lib/types" "github.com/tendermint/tmlibs/log" ) +////////////////////////////////////////////////////////////////////////////// +// HTTP REST API +// TODO + +////////////////////////////////////////////////////////////////////////////// +// JSON-RPC over HTTP + func testMux() *http.ServeMux { funcMap := map[string]*rs.RPCFunc{ "c": rs.NewRPCFunc(func(s string, i int) (string, error) { return "foo", nil }, "s,i"), @@ -108,3 +116,44 @@ func TestUnknownRPCPath(t *testing.T) { // Always expecting back a 404 error require.Equal(t, http.StatusNotFound, res.StatusCode, "should always return 404") } + +////////////////////////////////////////////////////////////////////////////// +// JSON-RPC over WEBSOCKETS + +func TestWebsocketManagerHandler(t *testing.T) { + s := newWSServer() + defer s.Close() + + // check upgrader works + d := websocket.Dialer{} + c, dialResp, err := d.Dial("ws://"+s.Listener.Addr().String()+"/websocket", nil) + require.NoError(t, err) + + if got, want := dialResp.StatusCode, http.StatusSwitchingProtocols; got != want { + t.Errorf("dialResp.StatusCode = %q, want %q", got, want) + } + + // check basic functionality works + req, err := types.MapToRequest(amino.NewCodec(), "TestWebsocketManager", "c", map[string]interface{}{"s": "a", "i": 10}) + require.NoError(t, err) + err = c.WriteJSON(req) + require.NoError(t, err) + + var resp types.RPCResponse + err = c.ReadJSON(&resp) + require.NoError(t, err) + require.Nil(t, resp.Error) +} + +func newWSServer() *httptest.Server { + funcMap := map[string]*rs.RPCFunc{ + "c": rs.NewWSRPCFunc(func(wsCtx types.WSRPCContext, s string, i int) (string, error) { return "foo", nil }, "s,i"), + } + wm := rs.NewWebsocketManager(funcMap, amino.NewCodec()) + wm.SetLogger(log.TestingLogger()) + + mux := http.NewServeMux() + mux.HandleFunc("/websocket", wm.WebsocketHandler) + + return httptest.NewServer(mux) +} diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index 1eeb19ea..fe9a9253 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -7,7 +7,9 @@ import ( "strings" "github.com/pkg/errors" - "github.com/tendermint/go-amino" + + amino "github.com/tendermint/go-amino" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ) diff --git a/types/event_bus.go b/types/event_bus.go index 7480c824..cb4b17d5 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -9,7 +9,7 @@ import ( "github.com/tendermint/tmlibs/log" ) -const defaultCapacity = 1000 +const defaultCapacity = 0 type EventBusSubscriber interface { Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error