From 26b2e808f7614569cf0132bb60d69763bd138ee2 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 19 Jun 2018 17:06:48 +0400 Subject: [PATCH 1/4] [rpc/lib/server] wrote a basic test for WebsocketManager --- rpc/lib/server/handlers.go | 17 ++++++----- rpc/lib/server/handlers_test.go | 51 ++++++++++++++++++++++++++++++++- 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index b478fd33..0bcf34bb 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -613,11 +613,9 @@ func (wsc *wsConnection) readRoutine() { if err != nil { wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err)) continue - } else { - wsc.WriteRPCResponse(types.NewRPCSuccessResponse(wsc.cdc, request.ID, result)) - continue } + wsc.WriteRPCResponse(types.NewRPCSuccessResponse(wsc.cdc, request.ID, result)) } } } @@ -684,20 +682,20 @@ func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error //---------------------------------------- -// WebsocketManager is the main manager for all websocket connections. -// It holds the event switch and a map of functions for routing. +// WebsocketManager provides a WS handler for incoming connections and passes a +// map of functions along with any additional params to new connections. // NOTE: The websocket path is defined externally, e.g. in node/node.go type WebsocketManager struct { websocket.Upgrader + funcMap map[string]*RPCFunc cdc *amino.Codec logger log.Logger wsConnOptions []func(*wsConnection) } -// NewWebsocketManager returns a new WebsocketManager that routes according to -// the given funcMap and connects to the server with the given connection -// options. +// NewWebsocketManager returns a new WebsocketManager that passes a map of +// functions, connection options and logger to new WS connections. func NewWebsocketManager(funcMap map[string]*RPCFunc, cdc *amino.Codec, wsConnOptions ...func(*wsConnection)) *WebsocketManager { return &WebsocketManager{ funcMap: funcMap, @@ -718,7 +716,8 @@ func (wm *WebsocketManager) SetLogger(l log.Logger) { wm.logger = l } -// WebsocketHandler upgrades the request/response (via http.Hijack) and starts the wsConnection. +// WebsocketHandler upgrades the request/response (via http.Hijack) and starts +// the wsConnection. func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { wsConn, err := wm.Upgrade(w, r, nil) if err != nil { diff --git a/rpc/lib/server/handlers_test.go b/rpc/lib/server/handlers_test.go index 86de0e4c..b1ea4675 100644 --- a/rpc/lib/server/handlers_test.go +++ b/rpc/lib/server/handlers_test.go @@ -9,15 +9,23 @@ import ( "strings" "testing" + "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/tendermint/go-amino" + amino "github.com/tendermint/go-amino" rs "github.com/tendermint/tendermint/rpc/lib/server" types "github.com/tendermint/tendermint/rpc/lib/types" "github.com/tendermint/tmlibs/log" ) +////////////////////////////////////////////////////////////////////////////// +// HTTP REST API +// TODO + +////////////////////////////////////////////////////////////////////////////// +// JSON-RPC over HTTP + func testMux() *http.ServeMux { funcMap := map[string]*rs.RPCFunc{ "c": rs.NewRPCFunc(func(s string, i int) (string, error) { return "foo", nil }, "s,i"), @@ -108,3 +116,44 @@ func TestUnknownRPCPath(t *testing.T) { // Always expecting back a 404 error require.Equal(t, http.StatusNotFound, res.StatusCode, "should always return 404") } + +////////////////////////////////////////////////////////////////////////////// +// JSON-RPC over WEBSOCKETS + +func TestWebsocketManagerHandler(t *testing.T) { + s := newWSServer() + defer s.Close() + + // check upgrader works + d := websocket.Dialer{} + c, dialResp, err := d.Dial("ws://"+s.Listener.Addr().String()+"/websocket", nil) + require.NoError(t, err) + + if got, want := dialResp.StatusCode, http.StatusSwitchingProtocols; got != want { + t.Errorf("dialResp.StatusCode = %q, want %q", got, want) + } + + // check basic functionality works + req, err := types.MapToRequest(amino.NewCodec(), "TestWebsocketManager", "c", map[string]interface{}{"s": "a", "i": 10}) + require.NoError(t, err) + err = c.WriteJSON(req) + require.NoError(t, err) + + var resp types.RPCResponse + err = c.ReadJSON(&resp) + require.NoError(t, err) + require.Nil(t, resp.Error) +} + +func newWSServer() *httptest.Server { + funcMap := map[string]*rs.RPCFunc{ + "c": rs.NewWSRPCFunc(func(wsCtx types.WSRPCContext, s string, i int) (string, error) { return "foo", nil }, "s,i"), + } + wm := rs.NewWebsocketManager(funcMap, amino.NewCodec()) + wm.SetLogger(log.TestingLogger()) + + mux := http.NewServeMux() + mux.HandleFunc("/websocket", wm.WebsocketHandler) + + return httptest.NewServer(mux) +} From aaddf5d32fe9e8bb8979b6f7cb8cd56ffe2972fa Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 19 Jun 2018 17:07:21 +0400 Subject: [PATCH 2/4] set pubsub default capacity to 0 Refs #951 Jae: I don't know a good way to catch these errors in general, but forcing pubsub's internal channel to have a capacity of 0 will reveal bugs sooner, if the subscriber also has a 0 or small capacity ch to pull from. --- rpc/lib/server/handlers.go | 2 +- rpc/lib/types/types.go | 4 +++- types/event_bus.go | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 0bcf34bb..6cc03012 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -17,7 +17,7 @@ import ( "github.com/gorilla/websocket" "github.com/pkg/errors" - "github.com/tendermint/go-amino" + amino "github.com/tendermint/go-amino" types "github.com/tendermint/tendermint/rpc/lib/types" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index 1eeb19ea..fe9a9253 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -7,7 +7,9 @@ import ( "strings" "github.com/pkg/errors" - "github.com/tendermint/go-amino" + + amino "github.com/tendermint/go-amino" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ) diff --git a/types/event_bus.go b/types/event_bus.go index 7480c824..cb4b17d5 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -9,7 +9,7 @@ import ( "github.com/tendermint/tmlibs/log" ) -const defaultCapacity = 1000 +const defaultCapacity = 0 type EventBusSubscriber interface { Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error From 4fc06e9d2aeb7e0d9d179aba9b11f556f868bbf4 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 19 Jun 2018 19:59:21 +0400 Subject: [PATCH 3/4] [libs/pubsub] fix memory leak Refs #1755 I started with writing a test for wsConnection (WebsocketManager) where I: - create a WS connection - do a simple echo call - close it No leaking goroutines, nor any leaking memory were detected. For useful shortcuts see my blog post https://blog.cosmos.network/debugging-the-memory-leak-in-tendermint-210186711420 Then I went to the rpc tests to see if calling Subscribe results in memory growth. It did. I used a slightly modified version of TestHeaderEvents function: ``` func TestHeaderEvents(t *testing.T) { // memory heap before f, err := os.Create("/tmp/mem1.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() for i := 0; i < 100; i++ { c := getHTTPClient() err = c.Start() require.Nil(t, err) evtTyp := types.EventNewBlockHeader evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(t, err) _, ok := evt.(types.EventDataNewBlockHeader) require.True(t, ok) c.Stop() c = nil } runtime.GC() // memory heap before f, err = os.Create("/tmp/mem2.mprof") if err != nil { t.Fatal(err) } pprof.WriteHeapProfile(f) f.Close() // dump all running goroutines time.Sleep(10 * time.Second) pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) } ``` ``` Showing nodes accounting for 35159.16kB, 100% of 35159.16kB total Showing top 10 nodes out of 48 flat flat% sum% cum cum% 32022.23kB 91.08% 91.08% 32022.23kB 91.08% github.com/tendermint/tendermint/libs/pubsub/query.(*QueryParser).Init 1056.33kB 3.00% 94.08% 1056.33kB 3.00% bufio.NewReaderSize 528.17kB 1.50% 95.58% 528.17kB 1.50% bufio.NewWriterSize 528.17kB 1.50% 97.09% 528.17kB 1.50% github.com/tendermint/tendermint/consensus.NewConsensusState 512.19kB 1.46% 98.54% 512.19kB 1.46% runtime.malg 512.08kB 1.46% 100% 512.08kB 1.46% syscall.ByteSliceFromString 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).(github.com/tendermint/tendermint/consensus.defaultDecideProposal)-fm 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).defaultDecideProposal 0 0% 100% 512.08kB 1.46% github.com/tendermint/tendermint/consensus.(*ConsensusState).enterNewRound ``` 100 subscriptions produce 32MB. Again, no additional goroutines are running after the end of the test (wsConnection readRoutine and writeRoutine both finishes). **It means that some exiting goroutine or object is holding a reference to the *Query objects, which are leaking.** One of them is pubsub#loop. It's using state.queries to map queries to clients and state.clients to map clients to queries. Before this commit, we're not thoroughly cleaning state.queries, which was the reason for memory leakage. --- libs/pubsub/pubsub.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 684ff358..776e0653 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -156,6 +156,8 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou if _, ok = s.subscriptions[clientID]; !ok { s.subscriptions[clientID] = make(map[string]Query) } + // preserve original query + // see Unsubscribe s.subscriptions[clientID][query.String()] = query s.mtx.Unlock() return nil @@ -314,6 +316,9 @@ func (state *state) remove(clientID string, q Query) { } delete(state.queries[q], clientID) + if len(state.queries[q]) == 0 { + delete(state.queries, q) + } } } @@ -328,8 +333,10 @@ func (state *state) removeAll(clientID string) { close(ch) delete(state.queries[q], clientID) + if len(state.queries[q]) == 0 { + delete(state.queries, q) + } } - delete(state.clients, clientID) } From cfff83fa3d85e7c189af6b2df9751be50f358fe0 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 19 Jun 2018 20:20:30 +0400 Subject: [PATCH 4/4] update changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7f9baf2..5afc0221 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## 0.20.1 + +BUG FIXES: + +- [rpc] fix memory leak in Websocket (when using `/subscribe` method) + ## 0.20.0 *June 6th, 2018*