From 2a24ae90c19677e1b94c6163081a552533de90a9 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 12 Feb 2018 14:31:52 +0400 Subject: [PATCH 1/2] fixes from Jae's review 1. remove pointer 2. add Quit() method to Service interface --- blockchain/pool.go | 8 ++++---- blockchain/reactor.go | 2 +- blockchain/reactor_test.go | 5 ++--- consensus/reactor.go | 2 +- consensus/state.go | 2 +- consensus/ticker.go | 2 +- evidence/reactor.go | 2 +- glide.lock | 8 ++++---- glide.yaml | 4 ++-- mempool/reactor.go | 8 ++++---- node/node_test.go | 2 +- p2p/peer.go | 6 ------ p2p/pex/addrbook.go | 2 +- p2p/pex/pex_reactor.go | 4 ++-- p2p/pex/pex_reactor_test.go | 1 - p2p/trust/metric.go | 4 ++-- p2p/trust/store.go | 2 +- rpc/client/httpclient.go | 2 +- rpc/lib/client/ws_client.go | 6 +++--- rpc/lib/client/ws_client_test.go | 4 ++-- rpc/lib/server/handlers.go | 8 ++++---- 21 files changed, 38 insertions(+), 46 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 6b40a8e7..d0f4d297 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -534,10 +534,10 @@ OUTER_LOOP: // Send request and wait. bpr.pool.sendRequest(bpr.height, peer.id) select { - case <-bpr.pool.Quit: + case <-bpr.pool.Quit(): bpr.Stop() return - case <-bpr.Quit: + case <-bpr.Quit(): return case <-bpr.redoCh: bpr.reset() @@ -545,10 +545,10 @@ OUTER_LOOP: case <-bpr.gotBlockCh: // We got the block, now see if it's good. select { - case <-bpr.pool.Quit: + case <-bpr.pool.Quit(): bpr.Stop() return - case <-bpr.Quit: + case <-bpr.Quit(): return case <-bpr.redoCh: bpr.reset() diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 1bb82c23..2ad6770b 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -322,7 +322,7 @@ FOR_LOOP: } } continue FOR_LOOP - case <-bcR.Quit: + case <-bcR.Quit(): break FOR_LOOP } } diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 9775d38a..263ca0f0 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -157,7 +157,7 @@ func makeBlock(height int64, state sm.State) *types.Block { // The Test peer type bcrTestPeer struct { - *cmn.BaseService + cmn.BaseService id p2p.ID ch chan interface{} } @@ -169,7 +169,7 @@ func newbcrTestPeer(id p2p.ID) *bcrTestPeer { id: id, ch: make(chan interface{}, 2), } - bcr.BaseService = cmn.NewBaseService(nil, "bcrTestPeer", bcr) + bcr.BaseService = *cmn.NewBaseService(nil, "bcrTestPeer", bcr) return bcr } @@ -196,4 +196,3 @@ func (tp *bcrTestPeer) IsOutbound() bool { return false } func (tp *bcrTestPeer) IsPersistent() bool { return true } func (tp *bcrTestPeer) Get(s string) interface{} { return s } func (tp *bcrTestPeer) Set(string, interface{}) {} -func (tp *bcrTestPeer) QuitChan() <-chan struct{} { return tp.Quit } diff --git a/consensus/reactor.go b/consensus/reactor.go index 44ff745c..b6379367 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -380,7 +380,7 @@ func (conR *ConsensusReactor) startBroadcastRoutine() error { edph := data.(types.TMEventData).Unwrap().(types.EventDataProposalHeartbeat) conR.broadcastProposalHeartbeatMessage(edph) } - case <-conR.Quit: + case <-conR.Quit(): conR.eventBus.UnsubscribeAll(ctx, subscriber) return } diff --git a/consensus/state.go b/consensus/state.go index adf85d08..aa334fdd 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -541,7 +541,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { // if the timeout is relevant to the rs // go to the next step cs.handleTimeout(ti, rs) - case <-cs.Quit: + case <-cs.Quit(): // NOTE: the internalMsgQueue may have signed messages from our // priv_val that haven't hit the WAL, but its ok because diff --git a/consensus/ticker.go b/consensus/ticker.go index f66856f9..b37b7c49 100644 --- a/consensus/ticker.go +++ b/consensus/ticker.go @@ -127,7 +127,7 @@ func (t *timeoutTicker) timeoutRoutine() { // We can eliminate it by merging the timeoutRoutine into receiveRoutine // and managing the timeouts ourselves with a millisecond ticker go func(toi timeoutInfo) { t.tockChan <- toi }(ti) - case <-t.Quit: + case <-t.Quit(): return } } diff --git a/evidence/reactor.go b/evidence/reactor.go index cb9706a3..169a274d 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -126,7 +126,7 @@ func (evR *EvidenceReactor) broadcastRoutine() { // broadcast all pending evidence msg := &EvidenceListMessage{evR.evpool.PendingEvidence()} evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg}) - case <-evR.Quit: + case <-evR.Quit(): return } } diff --git a/glide.lock b/glide.lock index 5d994b0e..2a77da54 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 41f411204b59e893053e59cda43466b3a6634c5fc88698d1f3131ecd5f239de7 -updated: 2018-02-09T09:56:16.586709479Z +hash: 0a994be202cfc9c8a820c5a68321bbbf5592f48790b9bd408b5f95cd344c3be5 +updated: 2018-02-12T08:29:16.126185849Z imports: - name: github.com/btcsuite/btcd version: 50de9da05b50eb15658bb350f6ea24368a111ab7 @@ -97,7 +97,7 @@ imports: - leveldb/table - leveldb/util - name: github.com/tendermint/abci - version: 5a4f56056e23cdfd5f3733db056968e016468508 + version: 5913ae8960c7ae5d748c37aa060bd35c99ff8a05 subpackages: - client - example/code @@ -117,7 +117,7 @@ imports: subpackages: - data - name: github.com/tendermint/tmlibs - version: 52ce4c20f8bc9b6da5fc1274bcce27c0b9dd738a + version: a57340ffb53aefb0fca1fc610d18fcbcc61b126f subpackages: - autofile - cli diff --git a/glide.yaml b/glide.yaml index 0fe66f3b..d93a80d7 100644 --- a/glide.yaml +++ b/glide.yaml @@ -19,7 +19,7 @@ import: - package: github.com/spf13/viper version: v1.0.0 - package: github.com/tendermint/abci - version: develop + version: 5913ae8960c7ae5d748c37aa060bd35c99ff8a05 subpackages: - client - example/dummy @@ -31,7 +31,7 @@ import: subpackages: - data - package: github.com/tendermint/tmlibs - version: develop + version: a57340ffb53aefb0fca1fc610d18fcbcc61b126f subpackages: - autofile - cli diff --git a/mempool/reactor.go b/mempool/reactor.go index 98c83337..58650a19 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -117,9 +117,9 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { if next = memR.Mempool.TxsFront(); next == nil { continue } - case <-peer.QuitChan(): + case <-peer.Quit(): return - case <-memR.Quit: + case <-memR.Quit(): return } } @@ -146,9 +146,9 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { case <-next.NextWaitChan(): // see the start of the for loop for nil check next = next.Next() - case <-peer.QuitChan(): + case <-peer.Quit(): return - case <-memR.Quit: + case <-memR.Quit(): return } } diff --git a/node/node_test.go b/node/node_test.go index eb8d109f..ca539382 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -41,7 +41,7 @@ func TestNodeStartStop(t *testing.T) { }() select { - case <-n.Quit: + case <-n.Quit(): case <-time.After(5 * time.Second): t.Fatal("timed out waiting for shutdown") } diff --git a/p2p/peer.go b/p2p/peer.go index cff99ad1..67ce411c 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -18,7 +18,6 @@ import ( // Peer is an interface representing a peer connected on a reactor. type Peer interface { cmn.Service - QuitChan() <-chan struct{} ID() ID // peer's cryptographic ID IsOutbound() bool // did we dial the peer @@ -332,11 +331,6 @@ func (p *peer) String() string { return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID()) } -// QuitChan returns a channel, which will be closed once peer is stopped. -func (p *peer) QuitChan() <-chan struct{} { - return p.Quit -} - //------------------------------------------------------------------ // helper funcs diff --git a/p2p/pex/addrbook.go b/p2p/pex/addrbook.go index 3a3be6e4..95ad70fe 100644 --- a/p2p/pex/addrbook.go +++ b/p2p/pex/addrbook.go @@ -332,7 +332,7 @@ out: select { case <-saveFileTicker.C: a.saveToFile(a.filePath) - case <-a.Quit: + case <-a.Quit(): break out } } diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 53075a1d..5aeca8f7 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -274,7 +274,7 @@ func (r *PEXReactor) ensurePeersRoutine() { select { case <-ticker.C: r.ensurePeers() - case <-r.Quit: + case <-r.Quit(): ticker.Stop() return } @@ -409,7 +409,7 @@ func (r *PEXReactor) crawlPeersRoutine() { case <-ticker.C: r.attemptDisconnects() r.crawlPeers() - case <-r.Quit: + case <-r.Quit(): return } } diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 6aeb7a3c..82dafecd 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -368,4 +368,3 @@ func (mp mockPeer) Send(byte, interface{}) bool { return false } func (mp mockPeer) TrySend(byte, interface{}) bool { return false } func (mp mockPeer) Set(string, interface{}) {} func (mp mockPeer) Get(string) interface{} { return nil } -func (mp mockPeer) QuitChan() <-chan struct{} { return mp.Quit } diff --git a/p2p/trust/metric.go b/p2p/trust/metric.go index bf6ddb5e..5770b420 100644 --- a/p2p/trust/metric.go +++ b/p2p/trust/metric.go @@ -118,7 +118,7 @@ func (tm *TrustMetric) OnStart() error { } // OnStop implements Service -// Nothing to do since the goroutine shuts down by itself via BaseService.Quit +// Nothing to do since the goroutine shuts down by itself via BaseService.Quit() func (tm *TrustMetric) OnStop() {} // Returns a snapshot of the trust metric history data @@ -298,7 +298,7 @@ loop: select { case <-tick: tm.NextTimeInterval() - case <-tm.Quit: + case <-tm.Quit(): // Stop all further tracking for this metric break loop } diff --git a/p2p/trust/store.go b/p2p/trust/store.go index 0e61b065..bbb4592a 100644 --- a/p2p/trust/store.go +++ b/p2p/trust/store.go @@ -200,7 +200,7 @@ loop: select { case <-t.C: tms.SaveToDB() - case <-tms.Quit: + case <-tms.Quit(): break loop } } diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 2b3f5ab2..bc6cf759 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -338,7 +338,7 @@ func (w *WSEvents) eventListener() { ch <- result.Data } w.mtx.RUnlock() - case <-w.Quit: + case <-w.Quit(): return } } diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 79e3f63f..ca75ad56 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -335,7 +335,7 @@ func (c *WSClient) reconnectRoutine() { c.startReadWriteRoutines() } } - case <-c.Quit: + case <-c.Quit(): return } } @@ -394,7 +394,7 @@ func (c *WSClient) writeRoutine() { c.Logger.Debug("sent ping") case <-c.readRoutineQuit: return - case <-c.Quit: + case <-c.Quit(): if err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil { c.Logger.Error("failed to write message", "err", err) } @@ -455,7 +455,7 @@ func (c *WSClient) readRoutine() { // c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop // both readRoutine and writeRoutine select { - case <-c.Quit: + case <-c.Quit(): case c.ResponsesCh <- response: } } diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index cc789728..73f67160 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -132,7 +132,7 @@ func TestWSClientReconnectFailure(t *testing.T) { for { select { case <-c.ResponsesCh: - case <-c.Quit: + case <-c.Quit(): return } } @@ -217,7 +217,7 @@ func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) { if resp.Result != nil { wg.Done() } - case <-c.Quit: + case <-c.Quit(): return } } diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 1e14ea9a..1bac625b 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -484,7 +484,7 @@ func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber { // It implements WSRPCConnection. It is Goroutine-safe. func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { select { - case <-wsc.Quit: + case <-wsc.Quit(): return case wsc.writeChan <- resp: } @@ -494,7 +494,7 @@ func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { // It implements WSRPCConnection. It is Goroutine-safe func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { select { - case <-wsc.Quit: + case <-wsc.Quit(): return false case wsc.writeChan <- resp: return true @@ -525,7 +525,7 @@ func (wsc *wsConnection) readRoutine() { for { select { - case <-wsc.Quit: + case <-wsc.Quit(): return default: // reset deadline for every type of message (control or data) @@ -643,7 +643,7 @@ func (wsc *wsConnection) writeRoutine() { return } } - case <-wsc.Quit: + case <-wsc.Quit(): return } } From fc585bcdecffc7f0854435c26ec68acc04915b08 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 12 Feb 2018 17:04:07 +0400 Subject: [PATCH 2/2] do not block when writing to pongTimeoutCh Refs #1205 --- p2p/conn/connection.go | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 46e36301..9acaf617 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -201,12 +201,12 @@ func (c *MConnection) OnStart() error { // OnStop implements BaseService func (c *MConnection) OnStop() { c.BaseService.OnStop() - if c.quit != nil { - close(c.quit) - } c.flushTimer.Stop() c.pingTimer.Stop() c.chStatsTimer.Stop() + if c.quit != nil { + close(c.quit) + } c.conn.Close() // nolint: errcheck // We can't close pong safely here because @@ -339,7 +339,10 @@ FOR_LOOP: c.sendMonitor.Update(int(n)) c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout) c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() { - c.pongTimeoutCh <- true + select { + case c.pongTimeoutCh <- true: + default: + } }) c.flush() case timeout := <-c.pongTimeoutCh: @@ -548,7 +551,6 @@ func (c *MConnection) stopPongTimer() { if !c.pongTimer.Stop() { <-c.pongTimer.C } - drain(c.pongTimeoutCh) c.pongTimer = nil } } @@ -780,13 +782,3 @@ type msgPacket struct { func (p msgPacket) String() string { return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF) } - -func drain(ch <-chan bool) { - for { - select { - case <-ch: - default: - return - } - } -}