From 202d9a2c0c0168a7c2dfcf60e69c80d16090a78a Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 30 Jan 2018 12:28:05 +0400 Subject: [PATCH 1/4] fix memory leak in mempool reactor Leaking goroutine: ``` 114 @ 0x42f2bc 0x42f3ae 0x440794 0x4403b9 0x468002 0x9fe32d 0x9ff78f 0xa025ed 0x45e571 ``` Explanation: it blocks on an empty clist forever. so unless theres txs coming in, this go routine will just sit there, holding onto the peer too. if we're constantly reconnecting to some peer, old instances are not garbage collected, leading to memory leak. Fixes https://github.com/cosmos/gaia/issues/108 Previous attempt https://github.com/tendermint/tendermint/pull/1156 --- glide.lock | 2 ++ glide.yaml | 1 + mempool/reactor.go | 49 ++++++++++++++++++++++++++++--------- mempool/reactor_test.go | 53 +++++++++++++++++++++++++++++++++++++++-- 4 files changed, 92 insertions(+), 13 deletions(-) diff --git a/glide.lock b/glide.lock index 801c9f4d..61107006 100644 --- a/glide.lock +++ b/glide.lock @@ -203,3 +203,5 @@ testImports: subpackages: - assert - require +- name: github.com/fortytw2/leaktest + version: 3b724c3d7b8729a35bf4e577f71653aec6e53513 diff --git a/glide.yaml b/glide.yaml index c2726708..238110c2 100644 --- a/glide.yaml +++ b/glide.yaml @@ -61,3 +61,4 @@ testImport: subpackages: - assert - require +- package: github.com/fortytw2/leaktest diff --git a/mempool/reactor.go b/mempool/reactor.go index 4e43bb0c..66f32dd9 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -2,6 +2,7 @@ package mempool import ( "bytes" + "context" "fmt" "reflect" "time" @@ -101,24 +102,39 @@ type PeerState interface { } // Send new mempool txs to peer. -// TODO: Handle mempool or reactor shutdown - as is this routine -// may block forever if no new txs come in. func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { if !memR.config.Broadcast { return } + // used to abort waiting until a tx available + // otherwise TxsFrontWait/NextWait could block forever if there are + // no txs + ctx, cancel := context.WithCancel(context.Background()) + go func() { + const healthCheckInterval = 5 * time.Second + for { + if !memR.IsRunning() || !peer.IsRunning() { + cancel() + return + } + time.Sleep(healthCheckInterval) + } + }() + var next *clist.CElement for { - if !memR.IsRunning() || !peer.IsRunning() { - return // Quit! - } + // This happens because the CElement we were looking at got + // garbage collected (removed). That is, .NextWait() returned nil. + // Go ahead and start from the beginning. if next == nil { - // This happens because the CElement we were looking at got - // garbage collected (removed). That is, .NextWait() returned nil. - // Go ahead and start from the beginning. - next = memR.Mempool.TxsFrontWait() // Wait until a tx is available + // Wait until a tx is available + next = waitWithCancel(memR.Mempool.TxsFrontWait, ctx) + if ctx.Err() != nil { + return + } } + memTx := next.Value.(*mempoolTx) // make sure the peer is up to date height := memTx.Height() @@ -136,9 +152,20 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } + next = waitWithCancel(next.NextWait, ctx) + if ctx.Err() != nil { + return + } + } +} - next = next.NextWait() - continue +func waitWithCancel(f func() *clist.CElement, ctx context.Context) *clist.CElement { + el := make(chan *clist.CElement, 1) + select { + case el <- f(): + return <-el + case <-ctx.Done(): + return nil } } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 45458a98..3cbc5748 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/go-kit/kit/log/term" @@ -91,18 +93,65 @@ func _waitForTxs(t *testing.T, wg *sync.WaitGroup, txs types.Txs, reactorIdx int wg.Done() } -var ( +const ( NUM_TXS = 1000 TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow ) func TestReactorBroadcastTxMessage(t *testing.T) { config := cfg.TestConfig() - N := 4 + const N = 4 reactors := makeAndConnectMempoolReactors(config, N) + defer func() { + for _, r := range reactors { + r.Stop() + } + }() // send a bunch of txs to the first reactor's mempool // and wait for them all to be received in the others txs := checkTxs(t, reactors[0].Mempool, NUM_TXS) waitForTxs(t, txs, reactors) } + +func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + config := cfg.TestConfig() + const N = 2 + reactors := makeAndConnectMempoolReactors(config, N) + defer func() { + for _, r := range reactors { + r.Stop() + } + }() + + // stop peer + sw := reactors[1].Switch + sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason")) + + // check that we are not leaking any go-routines + // i.e. broadcastTxRoutine finishes when peer is stopped + leaktest.CheckTimeout(t, 10*time.Second)() +} + +func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + config := cfg.TestConfig() + const N = 2 + reactors := makeAndConnectMempoolReactors(config, N) + + // stop reactors + for _, r := range reactors { + r.Stop() + } + + // check that we are not leaking any go-routines + // i.e. broadcastTxRoutine finishes when reactor is stopped + leaktest.CheckTimeout(t, 10*time.Second)() +} From 11b68f1934c1cbc038d06c6538a0ed57ab84788c Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 1 Feb 2018 18:33:33 +0400 Subject: [PATCH 2/4] rewrite broadcastTxRoutine to use channels https://play.golang.org/p/gN21yO9IRs3 ``` func waitWithCancel(f func() *clist.CElement, ctx context.Context) *clist.CElement { el := make(chan *clist.CElement, 1) select { case el <- f(): ``` will just run f() blockingly, so this doesn't change much in terms of behavior. --- blockchain/reactor_test.go | 12 +++++---- glide.lock | 15 ++++++----- glide.yaml | 6 ++++- mempool/mempool.go | 15 ++++++++--- mempool/reactor.go | 53 +++++++++++++------------------------ p2p/peer.go | 6 +++++ p2p/pex/pex_reactor_test.go | 1 + 7 files changed, 58 insertions(+), 50 deletions(-) diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 26747ea6..9775d38a 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -157,7 +157,7 @@ func makeBlock(height int64, state sm.State) *types.Block { // The Test peer type bcrTestPeer struct { - cmn.Service + *cmn.BaseService id p2p.ID ch chan interface{} } @@ -165,11 +165,12 @@ type bcrTestPeer struct { var _ p2p.Peer = (*bcrTestPeer)(nil) func newbcrTestPeer(id p2p.ID) *bcrTestPeer { - return &bcrTestPeer{ - Service: cmn.NewBaseService(nil, "bcrTestPeer", nil), - id: id, - ch: make(chan interface{}, 2), + bcr := &bcrTestPeer{ + id: id, + ch: make(chan interface{}, 2), } + bcr.BaseService = cmn.NewBaseService(nil, "bcrTestPeer", bcr) + return bcr } func (tp *bcrTestPeer) lastValue() interface{} { return <-tp.ch } @@ -195,3 +196,4 @@ func (tp *bcrTestPeer) IsOutbound() bool { return false } func (tp *bcrTestPeer) IsPersistent() bool { return true } func (tp *bcrTestPeer) Get(s string) interface{} { return s } func (tp *bcrTestPeer) Set(string, interface{}) {} +func (tp *bcrTestPeer) QuitChan() <-chan struct{} { return tp.Quit } diff --git a/glide.lock b/glide.lock index 61107006..333314c1 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 78f23456c3ca7af06fc26e59107de92a7b208776643bda398b0a05348153da1b -updated: 2018-02-03T03:31:46.976175875-05:00 +hash: 94a3f8a3cf531e0cdde8bc160a2f4bab6f269d99a9a9633404e5badb0481f02c +updated: 2018-02-05T10:04:25.7693634Z imports: - name: github.com/btcsuite/btcd version: 50de9da05b50eb15658bb350f6ea24368a111ab7 @@ -116,9 +116,12 @@ imports: version: b6fc872b42d41158a60307db4da051dd6f179415 subpackages: - data - - nowriter/tmlegacy +- name: github.com/tendermint/iavl + version: 1a59ec0c82dc940c25339dd7c834df5cb76a95cb + subpackages: + - iavl - name: github.com/tendermint/tmlibs - version: deaaf014d8b8d1095054380a38b1b00e293f725f + version: 51684dabf79c2079f32cc25d6bccb748ee098386 subpackages: - autofile - cli @@ -194,6 +197,8 @@ testImports: version: 346938d642f2ec3594ed81d874461961cd0faa76 subpackages: - spew +- name: github.com/fortytw2/leaktest + version: 3b724c3d7b8729a35bf4e577f71653aec6e53513 - name: github.com/pmezard/go-difflib version: 792786c7400a136282c1664665ae0a8db921c6c2 subpackages: @@ -203,5 +208,3 @@ testImports: subpackages: - assert - require -- name: github.com/fortytw2/leaktest - version: 3b724c3d7b8729a35bf4e577f71653aec6e53513 diff --git a/glide.yaml b/glide.yaml index 238110c2..da72cf17 100644 --- a/glide.yaml +++ b/glide.yaml @@ -29,9 +29,13 @@ import: version: master subpackages: - data -- package: github.com/tendermint/tmlibs +- package: github.com/tendermint/iavl version: develop subpackages: + - iavl +- package: github.com/tendermint/tmlibs + version: 51684dabf79c2079f32cc25d6bccb748ee098386 + subpackages: - autofile - cli - cli/flags diff --git a/mempool/mempool.go b/mempool/mempool.go index 0cdd1dee..ec4f9847 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -178,10 +178,17 @@ func (mem *Mempool) Flush() { } } -// TxsFrontWait returns the first transaction in the ordered list for peer goroutines to call .NextWait() on. -// It blocks until the mempool is not empty (ie. until the internal `mem.txs` has at least one element) -func (mem *Mempool) TxsFrontWait() *clist.CElement { - return mem.txs.FrontWait() +// TxsFront returns the first transaction in the ordered list for peer +// goroutines to call .NextWait() on. +func (mem *Mempool) TxsFront() *clist.CElement { + return mem.txs.Front() +} + +// TxsWaitChan returns a channel to wait on transactions. It will be closed +// once the mempool is not empty (ie. the internal `mem.txs` has at least one +// element) +func (mem *Mempool) TxsWaitChan() <-chan struct{} { + return mem.txs.WaitChan() } // CheckTx executes a new transaction against the application to determine its validity diff --git a/mempool/reactor.go b/mempool/reactor.go index 66f32dd9..98c83337 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -2,7 +2,6 @@ package mempool import ( "bytes" - "context" "fmt" "reflect" "time" @@ -107,30 +106,20 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { return } - // used to abort waiting until a tx available - // otherwise TxsFrontWait/NextWait could block forever if there are - // no txs - ctx, cancel := context.WithCancel(context.Background()) - go func() { - const healthCheckInterval = 5 * time.Second - for { - if !memR.IsRunning() || !peer.IsRunning() { - cancel() - return - } - time.Sleep(healthCheckInterval) - } - }() - var next *clist.CElement for { - // This happens because the CElement we were looking at got - // garbage collected (removed). That is, .NextWait() returned nil. - // Go ahead and start from the beginning. + // This happens because the CElement we were looking at got garbage + // collected (removed). That is, .NextWait() returned nil. Go ahead and + // start from the beginning. if next == nil { - // Wait until a tx is available - next = waitWithCancel(memR.Mempool.TxsFrontWait, ctx) - if ctx.Err() != nil { + select { + case <-memR.Mempool.TxsWaitChan(): // Wait until a tx is available + if next = memR.Mempool.TxsFront(); next == nil { + continue + } + case <-peer.QuitChan(): + return + case <-memR.Quit: return } } @@ -152,23 +141,19 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } - next = waitWithCancel(next.NextWait, ctx) - if ctx.Err() != nil { + + select { + case <-next.NextWaitChan(): + // see the start of the for loop for nil check + next = next.Next() + case <-peer.QuitChan(): + return + case <-memR.Quit: return } } } -func waitWithCancel(f func() *clist.CElement, ctx context.Context) *clist.CElement { - el := make(chan *clist.CElement, 1) - select { - case el <- f(): - return <-el - case <-ctx.Done(): - return nil - } -} - //----------------------------------------------------------------------------- // Messages diff --git a/p2p/peer.go b/p2p/peer.go index 67ce411c..cff99ad1 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -18,6 +18,7 @@ import ( // Peer is an interface representing a peer connected on a reactor. type Peer interface { cmn.Service + QuitChan() <-chan struct{} ID() ID // peer's cryptographic ID IsOutbound() bool // did we dial the peer @@ -331,6 +332,11 @@ func (p *peer) String() string { return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) } +// QuitChan returns a channel, which will be closed once peer is stopped. +func (p *peer) QuitChan() <-chan struct{} { + return p.Quit +} + //------------------------------------------------------------------ // helper funcs diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 82dafecd..6aeb7a3c 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -368,3 +368,4 @@ func (mp mockPeer) Send(byte, interface{}) bool { return false } func (mp mockPeer) TrySend(byte, interface{}) bool { return false } func (mp mockPeer) Set(string, interface{}) {} func (mp mockPeer) Get(string) interface{} { return nil } +func (mp mockPeer) QuitChan() <-chan struct{} { return mp.Quit } From 84a0a1987cc80fa1fa509f01d86ea113fc1ba1e6 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 5 Feb 2018 22:26:14 +0400 Subject: [PATCH 3/4] comment out tests for now https://github.com/tendermint/tendermint/pull/1173#issuecomment-363173047 --- mempool/reactor_test.go | 70 ++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 3cbc5748..9f0b5b48 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -6,8 +6,6 @@ import ( "testing" "time" - "github.com/fortytw2/leaktest" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/go-kit/kit/log/term" @@ -114,44 +112,44 @@ func TestReactorBroadcastTxMessage(t *testing.T) { waitForTxs(t, txs, reactors) } -func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } +// func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { +// if testing.Short() { +// t.Skip("skipping test in short mode.") +// } - config := cfg.TestConfig() - const N = 2 - reactors := makeAndConnectMempoolReactors(config, N) - defer func() { - for _, r := range reactors { - r.Stop() - } - }() +// config := cfg.TestConfig() +// const N = 2 +// reactors := makeAndConnectMempoolReactors(config, N) +// defer func() { +// for _, r := range reactors { +// r.Stop() +// } +// }() - // stop peer - sw := reactors[1].Switch - sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason")) +// // stop peer +// sw := reactors[1].Switch +// sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason")) - // check that we are not leaking any go-routines - // i.e. broadcastTxRoutine finishes when peer is stopped - leaktest.CheckTimeout(t, 10*time.Second)() -} +// // check that we are not leaking any go-routines +// // i.e. broadcastTxRoutine finishes when peer is stopped +// leaktest.CheckTimeout(t, 10*time.Second)() +// } -func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } +// func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { +// if testing.Short() { +// t.Skip("skipping test in short mode.") +// } - config := cfg.TestConfig() - const N = 2 - reactors := makeAndConnectMempoolReactors(config, N) +// config := cfg.TestConfig() +// const N = 2 +// reactors := makeAndConnectMempoolReactors(config, N) - // stop reactors - for _, r := range reactors { - r.Stop() - } +// // stop reactors +// for _, r := range reactors { +// r.Stop() +// } - // check that we are not leaking any go-routines - // i.e. broadcastTxRoutine finishes when reactor is stopped - leaktest.CheckTimeout(t, 10*time.Second)() -} +// // check that we are not leaking any go-routines +// // i.e. broadcastTxRoutine finishes when reactor is stopped +// leaktest.CheckTimeout(t, 10*time.Second)() +// } From 945b0e6ecafaa8cf974262e518e21f0d1f19d8ed Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 5 Feb 2018 22:38:31 +0400 Subject: [PATCH 4/4] cleanup glide.yaml --- glide.lock | 8 ++------ glide.yaml | 10 ++-------- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/glide.lock b/glide.lock index 333314c1..cfb28d13 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 94a3f8a3cf531e0cdde8bc160a2f4bab6f269d99a9a9633404e5badb0481f02c -updated: 2018-02-05T10:04:25.7693634Z +hash: 8aeec731d864d5d3008b4403c3229800148c9b472969ef6e5181a8c93ac1f4c8 +updated: 2018-02-05T18:46:05.226387951Z imports: - name: github.com/btcsuite/btcd version: 50de9da05b50eb15658bb350f6ea24368a111ab7 @@ -116,10 +116,6 @@ imports: version: b6fc872b42d41158a60307db4da051dd6f179415 subpackages: - data -- name: github.com/tendermint/iavl - version: 1a59ec0c82dc940c25339dd7c834df5cb76a95cb - subpackages: - - iavl - name: github.com/tendermint/tmlibs version: 51684dabf79c2079f32cc25d6bccb748ee098386 subpackages: diff --git a/glide.yaml b/glide.yaml index da72cf17..fe874781 100644 --- a/glide.yaml +++ b/glide.yaml @@ -29,10 +29,6 @@ import: version: master subpackages: - data -- package: github.com/tendermint/iavl - version: develop - subpackages: - - iavl - package: github.com/tendermint/tmlibs version: 51684dabf79c2079f32cc25d6bccb748ee098386 subpackages: @@ -46,17 +42,16 @@ import: - log - merkle - pubsub + - pubsub/query - package: golang.org/x/crypto subpackages: - nacl/box - nacl/secretbox - ripemd160 -- package: golang.org/x/net - subpackages: - - context - package: google.golang.org/grpc version: v1.7.3 testImport: +- package: github.com/fortytw2/leaktest - package: github.com/go-kit/kit version: ^0.6.0 subpackages: @@ -65,4 +60,3 @@ testImport: subpackages: - assert - require -- package: github.com/fortytw2/leaktest