From 1d16df6a92222f6438065d912c03cf624dd6cb21 Mon Sep 17 00:00:00 2001 From: zbo14 Date: Wed, 1 Nov 2017 23:13:18 -0400 Subject: [PATCH 01/18] add test, TrySend in broadcast --- p2p/conn/connection.go | 14 +++++++-- p2p/conn/connection_test.go | 31 +++++++++++++++++++ p2p/switch.go | 59 ++++++++++++++++++++++++++----------- p2p/switch_test.go | 33 +++++++++++++++++---- 4 files changed, 112 insertions(+), 25 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 7727ee32..d44ad075 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -2,6 +2,7 @@ package conn import ( "bufio" + "errors" "fmt" "io" "math" @@ -22,6 +23,7 @@ const ( minWriteBufferSize = 65536 updateStats = 2 * time.Second pingTimeout = 40 * time.Second + pongTimeout = 60 * time.Second // some of these defaults are written in the user config // flushThrottle, sendRate, recvRate @@ -84,6 +86,7 @@ type MConnection struct { quit chan struct{} flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. pingTimer *cmn.RepeatTimer // send pings periodically + pongTimer *cmn.ThrottleTimer // close conn if pong not recv in 1 min chStatsTimer *cmn.RepeatTimer // update channel stats periodically created time.Time // time of creation @@ -170,6 +173,7 @@ func (c *MConnection) OnStart() error { c.quit = make(chan struct{}) c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout) + c.pongTimer = cmn.NewThrottleTimer("pong", pongTimeout) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats) go c.sendRoutine() go c.recvRoutine() @@ -181,6 +185,7 @@ func (c *MConnection) OnStop() { c.BaseService.OnStop() c.flushTimer.Stop() c.pingTimer.Stop() + c.pongTimer.Stop() c.chStatsTimer.Stop() if c.quit != nil { close(c.quit) @@ -315,7 +320,12 @@ FOR_LOOP: c.Logger.Debug("Send Ping") wire.WriteByte(packetTypePing, c.bufWriter, &n, &err) c.sendMonitor.Update(int(n)) - c.flush() + go c.flush() + c.Logger.Debug("Starting pong timer") + c.pongTimer.Set() + case <-c.pongTimer.Ch: + c.Logger.Debug("Pong timeout") + err = errors.New("pong timeout") case <-c.pong: c.Logger.Debug("Send Pong") wire.WriteByte(packetTypePong, c.bufWriter, &n, &err) @@ -454,8 +464,8 @@ FOR_LOOP: // never block } case packetTypePong: - // do nothing c.Logger.Debug("Receive Pong") + c.pongTimer.Unset() case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 9c8eccbe..3bac0bd6 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -116,6 +116,37 @@ func TestMConnectionStatus(t *testing.T) { assert.Zero(status.Channels[0].SendQueueSize) } +func TestPingPongTimeout(t *testing.T) { + assert, require := assert.New(t), require.New(t) + + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + receivedCh := make(chan []byte) + errorsCh := make(chan interface{}) + onReceive := func(chID byte, msgBytes []byte) { + receivedCh <- msgBytes + } + onError := func(r interface{}) { + errorsCh <- r + } + mconn := createMConnectionWithCallbacks(client, onReceive, onError) + _, err := mconn.Start() + require.Nil(err) + defer mconn.Stop() + + select { + case receivedBytes := <-receivedCh: + t.Fatalf("Expected error, got %v", receivedBytes) + case err := <-errorsCh: + assert.NotNil(err) + assert.False(mconn.IsRunning()) + case <-time.After(500*time.Millisecond + 100*time.Second): + t.Fatal("Did not receive error in ~(pingTimeout + pongTimeout) seconds") + } +} + func TestMConnectionStopsAndReturnsError(t *testing.T) { assert, require := assert.New(t), require.New(t) diff --git a/p2p/switch.go b/p2p/switch.go index 9502359d..4a2c3480 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -200,9 +200,44 @@ func (sw *Switch) OnStop() { //--------------------------------------------------------------------- // Peers -// Peers returns the set of peers that are connected to the switch. -func (sw *Switch) Peers() IPeerSet { - return sw.peers +// Broadcast runs a go routine for each attempted send, which will block +// trying to send for defaultSendTimeoutSeconds. Returns a channel +// which receives broadcast result for each attempted send (success=false if times out). +// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. +// TODO: Something more intelligent. + +type BroadcastResult struct { + PeerKey string + Success bool +} + +func (sw *Switch) Broadcast(chID byte, msg interface{}) chan BroadcastResult { + successChan := make(chan BroadcastResult, len(sw.peers.List())) + sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) + for _, peer := range sw.peers.List() { + go func(peer Peer) { + success := peer.Send(chID, msg) + successChan <- BroadcastResult{peer.Key(), success} + }(peer) + } + return successChan +} + +func (sw *Switch) TryBroadcast(chID byte, msg interface{}) chan BroadcastResult { + successChan := make(chan BroadcastResult, len(sw.peers.List())) + sw.Logger.Debug("TryBroadcast", "channel", chID, "msg", msg) + for _, peer := range sw.peers.List() { + success := peer.TrySend(chID, msg) + if success { + successChan <- BroadcastResult{peer.Key(), success} + } else { + go func(peer Peer) { + success := peer.Send(chID, msg) + successChan <- BroadcastResult{peer.Key(), success} + }(peer) + } + } + return successChan } // NumPeers returns the count of outbound/inbound and outbound-dialing peers. @@ -219,21 +254,9 @@ func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { return } -// Broadcast runs a go routine for each attempted send, which will block -// trying to send for defaultSendTimeoutSeconds. Returns a channel -// which receives success values for each attempted send (false if times out). -// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. -// TODO: Something more intelligent. -func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { - successChan := make(chan bool, len(sw.peers.List())) - sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) - for _, peer := range sw.peers.List() { - go func(peer Peer) { - success := peer.Send(chID, msg) - successChan <- success - }(peer) - } - return successChan +// Peers returns the set of peers that are connected to the switch. +func (sw *Switch) Peers() IPeerSet { + return sw.peers } // StopPeerForError disconnects from a peer due to external error. diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 75f9640b..3c91e046 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -128,10 +128,12 @@ func TestSwitches(t *testing.T) { ch0Msg := "channel zero" ch1Msg := "channel foo" ch2Msg := "channel bar" + ch3Msg := "channel baz" s1.Broadcast(byte(0x00), ch0Msg) s1.Broadcast(byte(0x01), ch1Msg) s1.Broadcast(byte(0x02), ch2Msg) + s1.TryBroadcast(byte(0x03), ch3Msg) assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second) assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second) @@ -324,12 +326,15 @@ func BenchmarkSwitches(b *testing.B) { numSuccess, numFailure := 0, 0 - // Send random message from foo channel to another + // Send random message from foo channel to another with Broadcast for i := 0; i < b.N; i++ { chID := byte(i % 4) - successChan := s1.Broadcast(chID, "test data") - for s := range successChan { - if s { + resultChan := s1.Broadcast(chID, "test data") + for res := range resultChan { + if !s1.peers.Has(res.PeerKey) { + b.Errorf("unexpected peerKey: %s", res.PeerKey) + } + if res.Success { numSuccess++ } else { numFailure++ @@ -337,7 +342,25 @@ func BenchmarkSwitches(b *testing.B) { } } - b.Logf("success: %v, failure: %v", numSuccess, numFailure) + b.Logf("Broadcast: success: %v, failure: %v", numSuccess, numFailure) + + // Send random message from foo channel to another with TryBroadcast + for i := 0; i < b.N; i++ { + chID := byte(i % 4) + resultChan := s1.TryBroadcast(chID, "test data") + for res := range resultChan { + if !s1.peers.Has(res.PeerKey) { + b.Errorf("unexpected peerKey: %s", res.PeerKey) + } + if res.Success { + numSuccess++ + } else { + numFailure++ + } + } + } + + b.Logf("TryBroadcast: success: %v, failure: %v", numSuccess, numFailure) // Allow everything to flush before stopping switches & closing connections. b.StopTimer() From 5af22d6ee63efc507f207a702450afdaec171945 Mon Sep 17 00:00:00 2001 From: zbo14 Date: Fri, 3 Nov 2017 14:22:50 -0400 Subject: [PATCH 02/18] remove SwitchEventNewPeer, SwitchEventDonePeer --- p2p/conn/connection.go | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index d44ad075..0cdb56a3 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -320,6 +320,7 @@ FOR_LOOP: c.Logger.Debug("Send Ping") wire.WriteByte(packetTypePing, c.bufWriter, &n, &err) c.sendMonitor.Update(int(n)) + // c.flush go c.flush() c.Logger.Debug("Starting pong timer") c.pongTimer.Set() From f97ead4f5f189870876bde7495dbdadc72227af9 Mon Sep 17 00:00:00 2001 From: zbo14 Date: Wed, 8 Nov 2017 18:22:51 -0500 Subject: [PATCH 03/18] prep for merge --- p2p/conn/connection.go | 1 + p2p/switch_test.go | 33 +++++---------------------------- 2 files changed, 6 insertions(+), 28 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 0cdb56a3..c0f7efcd 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -466,6 +466,7 @@ FOR_LOOP: } case packetTypePong: c.Logger.Debug("Receive Pong") + // Should we unset pongTimer if we get other packet? c.pongTimer.Unset() case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 3c91e046..75f9640b 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -128,12 +128,10 @@ func TestSwitches(t *testing.T) { ch0Msg := "channel zero" ch1Msg := "channel foo" ch2Msg := "channel bar" - ch3Msg := "channel baz" s1.Broadcast(byte(0x00), ch0Msg) s1.Broadcast(byte(0x01), ch1Msg) s1.Broadcast(byte(0x02), ch2Msg) - s1.TryBroadcast(byte(0x03), ch3Msg) assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second) assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second) @@ -326,15 +324,12 @@ func BenchmarkSwitches(b *testing.B) { numSuccess, numFailure := 0, 0 - // Send random message from foo channel to another with Broadcast + // Send random message from foo channel to another for i := 0; i < b.N; i++ { chID := byte(i % 4) - resultChan := s1.Broadcast(chID, "test data") - for res := range resultChan { - if !s1.peers.Has(res.PeerKey) { - b.Errorf("unexpected peerKey: %s", res.PeerKey) - } - if res.Success { + successChan := s1.Broadcast(chID, "test data") + for s := range successChan { + if s { numSuccess++ } else { numFailure++ @@ -342,25 +337,7 @@ func BenchmarkSwitches(b *testing.B) { } } - b.Logf("Broadcast: success: %v, failure: %v", numSuccess, numFailure) - - // Send random message from foo channel to another with TryBroadcast - for i := 0; i < b.N; i++ { - chID := byte(i % 4) - resultChan := s1.TryBroadcast(chID, "test data") - for res := range resultChan { - if !s1.peers.Has(res.PeerKey) { - b.Errorf("unexpected peerKey: %s", res.PeerKey) - } - if res.Success { - numSuccess++ - } else { - numFailure++ - } - } - } - - b.Logf("TryBroadcast: success: %v, failure: %v", numSuccess, numFailure) + b.Logf("success: %v, failure: %v", numSuccess, numFailure) // Allow everything to flush before stopping switches & closing connections. b.StopTimer() From 9b554fb2c43e5e1683658684acb1fafc9bcae0ec Mon Sep 17 00:00:00 2001 From: zbo14 Date: Thu, 9 Nov 2017 01:12:19 -0500 Subject: [PATCH 04/18] switch test modification --- p2p/conn/connection.go | 2 +- p2p/switch_test.go | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index c0f7efcd..04ca4228 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -320,7 +320,7 @@ FOR_LOOP: c.Logger.Debug("Send Ping") wire.WriteByte(packetTypePing, c.bufWriter, &n, &err) c.sendMonitor.Update(int(n)) - // c.flush + // should be c.flush go c.flush() c.Logger.Debug("Starting pong timer") c.pongTimer.Set() diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 75f9640b..9c8b763e 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -128,14 +128,17 @@ func TestSwitches(t *testing.T) { ch0Msg := "channel zero" ch1Msg := "channel foo" ch2Msg := "channel bar" + ch3Msg := "channel baz" s1.Broadcast(byte(0x00), ch0Msg) s1.Broadcast(byte(0x01), ch1Msg) s1.Broadcast(byte(0x02), ch2Msg) + s1.TryBroadcast(byte(0x03), ch3Msg) assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second) assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second) assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second) + assertMsgReceivedWithTimeout(t, ch3Msg, byte(0x03), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second) } func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) { @@ -328,8 +331,11 @@ func BenchmarkSwitches(b *testing.B) { for i := 0; i < b.N; i++ { chID := byte(i % 4) successChan := s1.Broadcast(chID, "test data") - for s := range successChan { - if s { + for res := range successChan { + if !s1.peers.Has(res.PeerKey) { + b.Error("Unexpected peerKey: " + res.PeerKey) + } + if res.Success { numSuccess++ } else { numFailure++ From 91e4f4b7868f5ff7e7ba8449aa9aab8384fff12f Mon Sep 17 00:00:00 2001 From: zbo14 Date: Thu, 9 Nov 2017 14:27:17 -0500 Subject: [PATCH 05/18] ping/pong timeout in config --- p2p/conn/connection.go | 13 +++++++++---- p2p/conn/connection_test.go | 7 +++++-- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 04ca4228..b49a45db 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -22,8 +22,6 @@ const ( minReadBufferSize = 1024 minWriteBufferSize = 65536 updateStats = 2 * time.Second - pingTimeout = 40 * time.Second - pongTimeout = 60 * time.Second // some of these defaults are written in the user config // flushThrottle, sendRate, recvRate @@ -36,6 +34,8 @@ const ( defaultSendRate = int64(512000) // 500KB/s defaultRecvRate = int64(512000) // 500KB/s defaultSendTimeout = 10 * time.Second + defaultPingTimeout = 40 * time.Second + defaultPongTimeout = 60 * time.Second ) type receiveCbFunc func(chID byte, msgBytes []byte) @@ -100,6 +100,9 @@ type MConnConfig struct { MaxMsgPacketPayloadSize int FlushThrottle time.Duration + + pingTimeout time.Duration + pongTimeout time.Duration } func (cfg *MConnConfig) maxMsgPacketTotalSize() int { @@ -113,6 +116,8 @@ func DefaultMConnConfig() *MConnConfig { RecvRate: defaultRecvRate, MaxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize, FlushThrottle: defaultFlushThrottle, + pingTimeout: defaultPingTimeout, + pongTimeout: defaultPongTimeout, } } @@ -172,8 +177,8 @@ func (c *MConnection) OnStart() error { } c.quit = make(chan struct{}) c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) - c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout) - c.pongTimer = cmn.NewThrottleTimer("pong", pongTimeout) + c.pingTimer = cmn.NewRepeatTimer("ping", c.config.pingTimeout) + c.pongTimer = cmn.NewThrottleTimer("pong", c.config.pongTimeout) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats) go c.sendRoutine() go c.recvRoutine() diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 3bac0bd6..5686af6a 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -23,7 +23,10 @@ func createTestMConnection(conn net.Conn) *MConnection { func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection { chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} - c := NewMConnection(conn, chDescs, onReceive, onError) + cfg := DefaultMConnConfig() + cfg.pingTimeout = 40 * time.Millisecond + cfg.pongTimeout = 60 * time.Millisecond + c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg) c.SetLogger(log.TestingLogger()) return c } @@ -142,7 +145,7 @@ func TestPingPongTimeout(t *testing.T) { case err := <-errorsCh: assert.NotNil(err) assert.False(mconn.IsRunning()) - case <-time.After(500*time.Millisecond + 100*time.Second): + case <-time.After(10*time.Millisecond + mconn.config.pingTimeout + mconn.config.pongTimeout): t.Fatal("Did not receive error in ~(pingTimeout + pongTimeout) seconds") } } From b28b76ddf74280c3ebacfcf003d931977478b184 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 10 Jan 2018 20:06:17 -0600 Subject: [PATCH 06/18] rename pingTimeout to pingInterval, pongTimer is now time.Timer --- p2p/conn/connection.go | 38 ++++++++++++++++++++++--------------- p2p/conn/connection_test.go | 24 +++++++++++------------ 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index b49a45db..23c8edc9 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -34,8 +34,8 @@ const ( defaultSendRate = int64(512000) // 500KB/s defaultRecvRate = int64(512000) // 500KB/s defaultSendTimeout = 10 * time.Second - defaultPingTimeout = 40 * time.Second - defaultPongTimeout = 60 * time.Second + defaultPingInterval = 40 * time.Second + defaultPongTimeout = 35 * time.Second ) type receiveCbFunc func(chID byte, msgBytes []byte) @@ -86,7 +86,7 @@ type MConnection struct { quit chan struct{} flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. pingTimer *cmn.RepeatTimer // send pings periodically - pongTimer *cmn.ThrottleTimer // close conn if pong not recv in 1 min + pongTimer *time.Timer // close conn if pong is not received in pongTimeout chStatsTimer *cmn.RepeatTimer // update channel stats periodically created time.Time // time of creation @@ -101,8 +101,8 @@ type MConnConfig struct { FlushThrottle time.Duration - pingTimeout time.Duration - pongTimeout time.Duration + pingInterval time.Duration + pongTimeout time.Duration } func (cfg *MConnConfig) maxMsgPacketTotalSize() int { @@ -116,7 +116,7 @@ func DefaultMConnConfig() *MConnConfig { RecvRate: defaultRecvRate, MaxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize, FlushThrottle: defaultFlushThrottle, - pingTimeout: defaultPingTimeout, + pingInterval: defaultPingInterval, pongTimeout: defaultPongTimeout, } } @@ -133,6 +133,10 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection { + if config.pongTimeout >= config.pingInterval { + panic("pongTimeout must be less than pingInterval") + } + mconn := &MConnection{ conn: conn, bufReader: bufio.NewReaderSize(conn, minReadBufferSize), @@ -176,9 +180,12 @@ func (c *MConnection) OnStart() error { return err } c.quit = make(chan struct{}) - c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) - c.pingTimer = cmn.NewRepeatTimer("ping", c.config.pingTimeout) - c.pongTimer = cmn.NewThrottleTimer("pong", c.config.pongTimeout) + c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle) + c.pingTimer = cmn.NewRepeatTimer("ping", c.config.pingInterval) + c.pongTimer = time.NewTimer(c.config.pongTimeout) + // we start timer once we've send ping; needed here because we use start + // listening in recvRoutine + _ = c.pongTimer.Stop() c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats) go c.sendRoutine() go c.recvRoutine() @@ -190,7 +197,7 @@ func (c *MConnection) OnStop() { c.BaseService.OnStop() c.flushTimer.Stop() c.pingTimer.Stop() - c.pongTimer.Stop() + _ = c.pongTimer.Stop() c.chStatsTimer.Stop() if c.quit != nil { close(c.quit) @@ -325,12 +332,12 @@ FOR_LOOP: c.Logger.Debug("Send Ping") wire.WriteByte(packetTypePing, c.bufWriter, &n, &err) c.sendMonitor.Update(int(n)) - // should be c.flush go c.flush() c.Logger.Debug("Starting pong timer") - c.pongTimer.Set() - case <-c.pongTimer.Ch: + c.pongTimer.Reset(c.config.pongTimeout) + case <-c.pongTimer.C: c.Logger.Debug("Pong timeout") + // XXX: should we decrease peer score instead of closing connection? err = errors.New("pong timeout") case <-c.pong: c.Logger.Debug("Send Pong") @@ -471,8 +478,9 @@ FOR_LOOP: } case packetTypePong: c.Logger.Debug("Receive Pong") - // Should we unset pongTimer if we get other packet? - c.pongTimer.Unset() + if !c.pongTimer.Stop() { + <-c.pongTimer.C + } case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 5686af6a..5570331e 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -24,8 +24,8 @@ func createTestMConnection(conn net.Conn) *MConnection { func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection { chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} cfg := DefaultMConnConfig() - cfg.pingTimeout = 40 * time.Millisecond - cfg.pongTimeout = 60 * time.Millisecond + cfg.pingInterval = 40 * time.Millisecond + cfg.pongTimeout = 35 * time.Millisecond c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg) c.SetLogger(log.TestingLogger()) return c @@ -119,9 +119,7 @@ func TestMConnectionStatus(t *testing.T) { assert.Zero(status.Channels[0].SendQueueSize) } -func TestPingPongTimeout(t *testing.T) { - assert, require := assert.New(t), require.New(t) - +func TestPongTimeoutResultsInError(t *testing.T) { server, client := net.Pipe() defer server.Close() defer client.Close() @@ -135,18 +133,18 @@ func TestPingPongTimeout(t *testing.T) { errorsCh <- r } mconn := createMConnectionWithCallbacks(client, onReceive, onError) - _, err := mconn.Start() - require.Nil(err) + err := mconn.Start() + require.Nil(t, err) defer mconn.Stop() + expectErrorAfter := 10*time.Millisecond + mconn.config.pingInterval + mconn.config.pongTimeout select { - case receivedBytes := <-receivedCh: - t.Fatalf("Expected error, got %v", receivedBytes) + case msgBytes := <-receivedCh: + t.Fatalf("Expected error, but got %v", msgBytes) case err := <-errorsCh: - assert.NotNil(err) - assert.False(mconn.IsRunning()) - case <-time.After(10*time.Millisecond + mconn.config.pingTimeout + mconn.config.pongTimeout): - t.Fatal("Did not receive error in ~(pingTimeout + pongTimeout) seconds") + assert.NotNil(t, err) + case <-time.After(expectErrorAfter): + t.Fatalf("Expected to receive error after %v", expectErrorAfter) } } From 5834a598160928eefb50530ed408656b6ab98948 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 10 Jan 2018 20:24:31 -0600 Subject: [PATCH 07/18] read ping --- p2p/conn/connection.go | 2 +- p2p/conn/connection_test.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 23c8edc9..7c7cee34 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -332,7 +332,7 @@ FOR_LOOP: c.Logger.Debug("Send Ping") wire.WriteByte(packetTypePing, c.bufWriter, &n, &err) c.sendMonitor.Update(int(n)) - go c.flush() + c.flush() c.Logger.Debug("Starting pong timer") c.pongTimer.Reset(c.config.pongTimeout) case <-c.pongTimer.C: diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 5570331e..86364211 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -137,6 +137,11 @@ func TestPongTimeoutResultsInError(t *testing.T) { require.Nil(t, err) defer mconn.Stop() + go func() { + // read ping + server.Read(make([]byte, 1)) + }() + expectErrorAfter := 10*time.Millisecond + mconn.config.pingInterval + mconn.config.pongTimeout select { case msgBytes := <-receivedCh: From 4e2000abfe50b929a6a62f8131e7d509e66d3aa3 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 10 Jan 2018 20:24:42 -0600 Subject: [PATCH 08/18] control order by sending msgs from one goroutine --- p2p/conn/connection_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 86364211..23b7078e 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -339,8 +339,6 @@ func TestMConnectionTrySend(t *testing.T) { go func() { mconn.TrySend(0x01, msg) resultCh <- "TrySend" - }() - go func() { mconn.Send(0x01, msg) resultCh <- "Send" }() From 860da464df1421d608f37b68bef4f182dd34bf6b Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 11 Jan 2018 15:32:25 -0600 Subject: [PATCH 09/18] remove weird concurrency testing --- p2p/conn/connection_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 23b7078e..8ab865bf 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -339,12 +339,8 @@ func TestMConnectionTrySend(t *testing.T) { go func() { mconn.TrySend(0x01, msg) resultCh <- "TrySend" - mconn.Send(0x01, msg) - resultCh <- "Send" }() assert.False(mconn.CanSend(0x01)) assert.False(mconn.TrySend(0x01, msg)) assert.Equal("TrySend", <-resultCh) - server.Read(make([]byte, len(msg))) - assert.Equal("Send", <-resultCh) // Order constrained by parallel blocking above } From d14d4a252767c227bc57381d30bb7824bf18ac6e Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 11 Jan 2018 17:15:04 -0600 Subject: [PATCH 10/18] remove TryBroadcast --- p2p/switch.go | 31 ++++--------------------------- p2p/switch_test.go | 10 ++-------- 2 files changed, 6 insertions(+), 35 deletions(-) diff --git a/p2p/switch.go b/p2p/switch.go index 4a2c3480..7c09212b 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -202,44 +202,21 @@ func (sw *Switch) OnStop() { // Broadcast runs a go routine for each attempted send, which will block // trying to send for defaultSendTimeoutSeconds. Returns a channel -// which receives broadcast result for each attempted send (success=false if times out). +// which receives success values for each attempted send (false if times out). // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. // TODO: Something more intelligent. - -type BroadcastResult struct { - PeerKey string - Success bool -} - -func (sw *Switch) Broadcast(chID byte, msg interface{}) chan BroadcastResult { - successChan := make(chan BroadcastResult, len(sw.peers.List())) +func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { + successChan := make(chan bool, len(sw.peers.List())) sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) for _, peer := range sw.peers.List() { go func(peer Peer) { success := peer.Send(chID, msg) - successChan <- BroadcastResult{peer.Key(), success} + successChan <- success }(peer) } return successChan } -func (sw *Switch) TryBroadcast(chID byte, msg interface{}) chan BroadcastResult { - successChan := make(chan BroadcastResult, len(sw.peers.List())) - sw.Logger.Debug("TryBroadcast", "channel", chID, "msg", msg) - for _, peer := range sw.peers.List() { - success := peer.TrySend(chID, msg) - if success { - successChan <- BroadcastResult{peer.Key(), success} - } else { - go func(peer Peer) { - success := peer.Send(chID, msg) - successChan <- BroadcastResult{peer.Key(), success} - }(peer) - } - } - return successChan -} - // NumPeers returns the count of outbound/inbound and outbound-dialing peers. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { peers := sw.peers.List() diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 9c8b763e..75f9640b 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -128,17 +128,14 @@ func TestSwitches(t *testing.T) { ch0Msg := "channel zero" ch1Msg := "channel foo" ch2Msg := "channel bar" - ch3Msg := "channel baz" s1.Broadcast(byte(0x00), ch0Msg) s1.Broadcast(byte(0x01), ch1Msg) s1.Broadcast(byte(0x02), ch2Msg) - s1.TryBroadcast(byte(0x03), ch3Msg) assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second) assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second) assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second) - assertMsgReceivedWithTimeout(t, ch3Msg, byte(0x03), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second) } func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) { @@ -331,11 +328,8 @@ func BenchmarkSwitches(b *testing.B) { for i := 0; i < b.N; i++ { chID := byte(i % 4) successChan := s1.Broadcast(chID, "test data") - for res := range successChan { - if !s1.peers.Has(res.PeerKey) { - b.Error("Unexpected peerKey: " + res.PeerKey) - } - if res.Success { + for s := range successChan { + if s { numSuccess++ } else { numFailure++ From 3ae738f4533f5eb541d480fb678a5c556b5908ba Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 11 Jan 2018 18:06:33 -0600 Subject: [PATCH 11/18] increase timeouts --- p2p/conn/connection.go | 6 +++--- p2p/conn/connection_test.go | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 7c7cee34..25aac301 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -34,8 +34,8 @@ const ( defaultSendRate = int64(512000) // 500KB/s defaultRecvRate = int64(512000) // 500KB/s defaultSendTimeout = 10 * time.Second - defaultPingInterval = 40 * time.Second - defaultPongTimeout = 35 * time.Second + defaultPingInterval = 60 * time.Second + defaultPongTimeout = 45 * time.Second ) type receiveCbFunc func(chID byte, msgBytes []byte) @@ -134,7 +134,7 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection { if config.pongTimeout >= config.pingInterval { - panic("pongTimeout must be less than pingInterval") + panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)") } mconn := &MConnection{ diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 8ab865bf..4fb8d341 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -24,8 +24,8 @@ func createTestMConnection(conn net.Conn) *MConnection { func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection { chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} cfg := DefaultMConnConfig() - cfg.pingInterval = 40 * time.Millisecond - cfg.pongTimeout = 35 * time.Millisecond + cfg.pingInterval = 60 * time.Millisecond + cfg.pongTimeout = 45 * time.Millisecond c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg) c.SetLogger(log.TestingLogger()) return c @@ -119,7 +119,7 @@ func TestMConnectionStatus(t *testing.T) { assert.Zero(status.Channels[0].SendQueueSize) } -func TestPongTimeoutResultsInError(t *testing.T) { +func TestMConnectionPongTimeoutResultsInError(t *testing.T) { server, client := net.Pipe() defer server.Close() defer client.Close() @@ -142,7 +142,7 @@ func TestPongTimeoutResultsInError(t *testing.T) { server.Read(make([]byte, 1)) }() - expectErrorAfter := 10*time.Millisecond + mconn.config.pingInterval + mconn.config.pongTimeout + expectErrorAfter := (mconn.config.pingInterval + mconn.config.pongTimeout) * 2 select { case msgBytes := <-receivedCh: t.Fatalf("Expected error, but got %v", msgBytes) From 161e100a24ef54cdbd630a241086400d3f8c7261 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 11 Jan 2018 18:24:28 -0600 Subject: [PATCH 12/18] close return channel when we're done Benchmark results: ``` BenchmarkSwitchBroadcast-2 30000 71275 ns/op --- BENCH: BenchmarkSwitchBroadcast-2 switch_test.go:339: success: 1, failure: 0 switch_test.go:339: success: 100, failure: 0 switch_test.go:339: success: 10000, failure: 0 switch_test.go:339: success: 30000, failure: 0 ``` --- p2p/conn/secret_connection_test.go | 2 +- p2p/switch.go | 17 +++++++++++++---- p2p/switch_test.go | 12 ++++-------- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/p2p/conn/secret_connection_test.go b/p2p/conn/secret_connection_test.go index 8af9cdeb..4cf715dd 100644 --- a/p2p/conn/secret_connection_test.go +++ b/p2p/conn/secret_connection_test.go @@ -4,7 +4,7 @@ import ( "io" "testing" - "github.com/tendermint/go-crypto" + crypto "github.com/tendermint/go-crypto" cmn "github.com/tendermint/tmlibs/common" ) diff --git a/p2p/switch.go b/p2p/switch.go index 7c09212b..f1d02dcf 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -5,6 +5,7 @@ import ( "math" "math/rand" "net" + "sync" "time" "github.com/pkg/errors" @@ -200,20 +201,28 @@ func (sw *Switch) OnStop() { //--------------------------------------------------------------------- // Peers -// Broadcast runs a go routine for each attempted send, which will block -// trying to send for defaultSendTimeoutSeconds. Returns a channel -// which receives success values for each attempted send (false if times out). +// Broadcast runs a go routine for each attempted send, which will block trying +// to send for defaultSendTimeoutSeconds. Returns a channel which receives +// success values for each attempted send (false if times out). Channel will be +// closed once msg send to all peers. +// // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. -// TODO: Something more intelligent. func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { successChan := make(chan bool, len(sw.peers.List())) sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) + var wg sync.WaitGroup for _, peer := range sw.peers.List() { + wg.Add(1) go func(peer Peer) { + defer wg.Done() success := peer.Send(chID, msg) successChan <- success }(peer) } + go func() { + wg.Wait() + close(successChan) + }() return successChan } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 75f9640b..be1d96e9 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -300,10 +300,8 @@ func TestSwitchFullConnectivity(t *testing.T) { } } -func BenchmarkSwitches(b *testing.B) { - b.StopTimer() - - s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch { +func BenchmarkSwitchBroadcast(b *testing.B) { + s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch { // Make bar reactors of bar channels each sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{ {ID: byte(0x00), Priority: 10}, @@ -320,7 +318,8 @@ func BenchmarkSwitches(b *testing.B) { // Allow time for goroutines to boot up time.Sleep(1 * time.Second) - b.StartTimer() + + b.ResetTimer() numSuccess, numFailure := 0, 0 @@ -338,7 +337,4 @@ func BenchmarkSwitches(b *testing.B) { } b.Logf("success: %v, failure: %v", numSuccess, numFailure) - - // Allow everything to flush before stopping switches & closing connections. - b.StopTimer() } From 747b73cb95dab52ee1076ce83dcc92dca86ef93a Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 23 Jan 2018 13:11:44 +0400 Subject: [PATCH 13/18] fix merge conflicts --- p2p/conn/connection.go | 27 ++++++++++++++++----------- p2p/conn/connection_test.go | 6 +++--- p2p/switch_test.go | 2 +- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 25aac301..71d8608f 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -97,12 +97,17 @@ type MConnConfig struct { SendRate int64 `mapstructure:"send_rate"` RecvRate int64 `mapstructure:"recv_rate"` - MaxMsgPacketPayloadSize int + // Maximum payload size + MaxMsgPacketPayloadSize int `mapstructure:"max_msg_packet_payload_size"` - FlushThrottle time.Duration + // Interval to flush writes (throttled) + FlushThrottle time.Duration `mapstructure:"flush_throttle"` - pingInterval time.Duration - pongTimeout time.Duration + // Interval to send pings + PingInterval time.Duration `mapstructure:"ping_interval"` + + // Maximum wait time for pongs + PongTimeout time.Duration `mapstructure:"pong_timeout"` } func (cfg *MConnConfig) maxMsgPacketTotalSize() int { @@ -116,8 +121,8 @@ func DefaultMConnConfig() *MConnConfig { RecvRate: defaultRecvRate, MaxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize, FlushThrottle: defaultFlushThrottle, - pingInterval: defaultPingInterval, - pongTimeout: defaultPongTimeout, + PingInterval: defaultPingInterval, + PongTimeout: defaultPongTimeout, } } @@ -133,7 +138,7 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei // NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection { - if config.pongTimeout >= config.pingInterval { + if config.PongTimeout >= config.PingInterval { panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)") } @@ -180,9 +185,9 @@ func (c *MConnection) OnStart() error { return err } c.quit = make(chan struct{}) - c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle) - c.pingTimer = cmn.NewRepeatTimer("ping", c.config.pingInterval) - c.pongTimer = time.NewTimer(c.config.pongTimeout) + c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) + c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval) + c.pongTimer = time.NewTimer(c.config.PongTimeout) // we start timer once we've send ping; needed here because we use start // listening in recvRoutine _ = c.pongTimer.Stop() @@ -334,7 +339,7 @@ FOR_LOOP: c.sendMonitor.Update(int(n)) c.flush() c.Logger.Debug("Starting pong timer") - c.pongTimer.Reset(c.config.pongTimeout) + c.pongTimer.Reset(c.config.PongTimeout) case <-c.pongTimer.C: c.Logger.Debug("Pong timeout") // XXX: should we decrease peer score instead of closing connection? diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 4fb8d341..d505805e 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -24,8 +24,8 @@ func createTestMConnection(conn net.Conn) *MConnection { func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection { chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} cfg := DefaultMConnConfig() - cfg.pingInterval = 60 * time.Millisecond - cfg.pongTimeout = 45 * time.Millisecond + cfg.PingInterval = 60 * time.Millisecond + cfg.PongTimeout = 45 * time.Millisecond c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg) c.SetLogger(log.TestingLogger()) return c @@ -142,7 +142,7 @@ func TestMConnectionPongTimeoutResultsInError(t *testing.T) { server.Read(make([]byte, 1)) }() - expectErrorAfter := (mconn.config.pingInterval + mconn.config.pongTimeout) * 2 + expectErrorAfter := (mconn.config.PingInterval + mconn.config.PongTimeout) * 2 select { case msgBytes := <-receivedCh: t.Fatalf("Expected error, but got %v", msgBytes) diff --git a/p2p/switch_test.go b/p2p/switch_test.go index be1d96e9..745eb44e 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -301,7 +301,7 @@ func TestSwitchFullConnectivity(t *testing.T) { } func BenchmarkSwitchBroadcast(b *testing.B) { - s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch { + s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch { // Make bar reactors of bar channels each sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{ {ID: byte(0x00), Priority: 10}, From f4ff66de30232bb5c3f70d390843bbcab5543e8e Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 24 Jan 2018 14:41:31 +0400 Subject: [PATCH 14/18] rewrite pong timer to use time.AfterFunc --- p2p/conn/connection.go | 42 +++++++------- p2p/conn/connection_test.go | 107 ++++++++++++++++++++++++++++++++++-- 2 files changed, 126 insertions(+), 23 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 71d8608f..4e461b35 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -83,11 +83,15 @@ type MConnection struct { errored uint32 config *MConnConfig - quit chan struct{} - flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. - pingTimer *cmn.RepeatTimer // send pings periodically - pongTimer *time.Timer // close conn if pong is not received in pongTimeout - chStatsTimer *cmn.RepeatTimer // update channel stats periodically + quit chan struct{} + flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. + pingTimer *cmn.RepeatTimer // send pings periodically + + // close conn if pong is not received in pongTimeout + pongTimer *time.Timer + pongTimeoutCh chan struct{} + + chStatsTimer *cmn.RepeatTimer // update channel stats periodically created time.Time // time of creation } @@ -187,10 +191,7 @@ func (c *MConnection) OnStart() error { c.quit = make(chan struct{}) c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval) - c.pongTimer = time.NewTimer(c.config.PongTimeout) - // we start timer once we've send ping; needed here because we use start - // listening in recvRoutine - _ = c.pongTimer.Stop() + c.pongTimeoutCh = make(chan struct{}) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats) go c.sendRoutine() go c.recvRoutine() @@ -200,13 +201,12 @@ func (c *MConnection) OnStart() error { // OnStop implements BaseService func (c *MConnection) OnStop() { c.BaseService.OnStop() - c.flushTimer.Stop() - c.pingTimer.Stop() - _ = c.pongTimer.Stop() - c.chStatsTimer.Stop() if c.quit != nil { close(c.quit) } + c.flushTimer.Stop() + c.pingTimer.Stop() + c.chStatsTimer.Stop() c.conn.Close() // nolint: errcheck // We can't close pong safely here because @@ -337,12 +337,13 @@ FOR_LOOP: c.Logger.Debug("Send Ping") wire.WriteByte(packetTypePing, c.bufWriter, &n, &err) c.sendMonitor.Update(int(n)) + c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout) + c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() { + c.pongTimeoutCh <- struct{}{} + }) c.flush() - c.Logger.Debug("Starting pong timer") - c.pongTimer.Reset(c.config.PongTimeout) - case <-c.pongTimer.C: + case <-c.pongTimeoutCh: c.Logger.Debug("Pong timeout") - // XXX: should we decrease peer score instead of closing connection? err = errors.New("pong timeout") case <-c.pong: c.Logger.Debug("Send Pong") @@ -350,6 +351,9 @@ FOR_LOOP: c.sendMonitor.Update(int(n)) c.flush() case <-c.quit: + if c.pongTimer != nil { + _ = c.pongTimer.Stop() + } break FOR_LOOP case <-c.send: // Send some msgPackets @@ -483,8 +487,8 @@ FOR_LOOP: } case packetTypePong: c.Logger.Debug("Receive Pong") - if !c.pongTimer.Stop() { - <-c.pongTimer.C + if c.pongTimer != nil { + _ = c.pongTimer.Stop() } case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index d505805e..acfa8032 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -24,7 +24,7 @@ func createTestMConnection(conn net.Conn) *MConnection { func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection { chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} cfg := DefaultMConnConfig() - cfg.PingInterval = 60 * time.Millisecond + cfg.PingInterval = 90 * time.Millisecond cfg.PongTimeout = 45 * time.Millisecond c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg) c.SetLogger(log.TestingLogger()) @@ -137,19 +137,118 @@ func TestMConnectionPongTimeoutResultsInError(t *testing.T) { require.Nil(t, err) defer mconn.Stop() + serverGotPing := make(chan struct{}) go func() { // read ping server.Read(make([]byte, 1)) + serverGotPing <- struct{}{} }() + <-serverGotPing - expectErrorAfter := (mconn.config.PingInterval + mconn.config.PongTimeout) * 2 + pongTimerExpired := mconn.config.PongTimeout + 10*time.Millisecond select { case msgBytes := <-receivedCh: t.Fatalf("Expected error, but got %v", msgBytes) case err := <-errorsCh: assert.NotNil(t, err) - case <-time.After(expectErrorAfter): - t.Fatalf("Expected to receive error after %v", expectErrorAfter) + case <-time.After(pongTimerExpired): + t.Fatalf("Expected to receive error after %v", pongTimerExpired) + } +} + +func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) { + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + receivedCh := make(chan []byte) + errorsCh := make(chan interface{}) + onReceive := func(chID byte, msgBytes []byte) { + receivedCh <- msgBytes + } + onError := func(r interface{}) { + errorsCh <- r + } + mconn := createMConnectionWithCallbacks(client, onReceive, onError) + err := mconn.Start() + require.Nil(t, err) + defer mconn.Stop() + + // sending 3 pongs in a row + _, err = server.Write([]byte{packetTypePong}) + require.Nil(t, err) + _, err = server.Write([]byte{packetTypePong}) + require.Nil(t, err) + _, err = server.Write([]byte{packetTypePong}) + require.Nil(t, err) + + serverGotPing := make(chan struct{}) + go func() { + // read ping + server.Read(make([]byte, 1)) + serverGotPing <- struct{}{} + // respond with pong + _, err = server.Write([]byte{packetTypePong}) + require.Nil(t, err) + }() + <-serverGotPing + + pongTimerExpired := mconn.config.PongTimeout + 10*time.Millisecond + select { + case msgBytes := <-receivedCh: + t.Fatalf("Expected no data, but got %v", msgBytes) + case err := <-errorsCh: + t.Fatalf("Expected no error, but got %v", err) + case <-time.After(pongTimerExpired): + assert.True(t, mconn.IsRunning()) + } +} + +func TestMConnectionPingPongs(t *testing.T) { + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + receivedCh := make(chan []byte) + errorsCh := make(chan interface{}) + onReceive := func(chID byte, msgBytes []byte) { + receivedCh <- msgBytes + } + onError := func(r interface{}) { + errorsCh <- r + } + mconn := createMConnectionWithCallbacks(client, onReceive, onError) + err := mconn.Start() + require.Nil(t, err) + defer mconn.Stop() + + serverGotPing := make(chan struct{}) + go func() { + // read ping + server.Read(make([]byte, 1)) + serverGotPing <- struct{}{} + // respond with pong + _, err = server.Write([]byte{packetTypePong}) + require.Nil(t, err) + + time.Sleep(mconn.config.PingInterval) + + // read ping + server.Read(make([]byte, 1)) + // respond with pong + _, err = server.Write([]byte{packetTypePong}) + require.Nil(t, err) + }() + <-serverGotPing + + pongTimerExpired := (mconn.config.PongTimeout + 10*time.Millisecond) * 2 + select { + case msgBytes := <-receivedCh: + t.Fatalf("Expected no data, but got %v", msgBytes) + case err := <-errorsCh: + t.Fatalf("Expected no error, but got %v", err) + case <-time.After(2 * pongTimerExpired): + assert.True(t, mconn.IsRunning()) } } From ac0123d249c0220fadc8359d7da8b4f8d0de360f Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Mon, 5 Feb 2018 23:12:09 +0400 Subject: [PATCH 15/18] drain pongTimeoutCh and pongTimer's channel to prevent leaks --- p2p/conn/connection.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 4e461b35..b02e1ccb 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -352,7 +352,10 @@ FOR_LOOP: c.flush() case <-c.quit: if c.pongTimer != nil { - _ = c.pongTimer.Stop() + if !c.pongTimer.Stop() { + <-c.pongTimer.C + } + drain(c.pongTimeoutCh) } break FOR_LOOP case <-c.send: @@ -488,7 +491,10 @@ FOR_LOOP: case packetTypePong: c.Logger.Debug("Receive Pong") if c.pongTimer != nil { - _ = c.pongTimer.Stop() + if !c.pongTimer.Stop() { + <-c.pongTimer.C + } + drain(c.pongTimeoutCh) } case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) @@ -764,3 +770,13 @@ type msgPacket struct { func (p msgPacket) String() string { return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF) } + +func drain(ch <-chan struct{}) { + for { + select { + case <-ch: + default: + return + } + } +} From 26419fba28fd91bcf5dc90a4ea3135d7b600e4ca Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 6 Feb 2018 12:31:34 +0400 Subject: [PATCH 16/18] refactor code plus add one more test * extract stopPongTimer method * TestMConnectionMultiplePings --- p2p/conn/connection.go | 24 +++++++++---------- p2p/conn/connection_test.go | 48 ++++++++++++++++++++++++++++++++----- 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index b02e1ccb..6fbb425e 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -351,12 +351,7 @@ FOR_LOOP: c.sendMonitor.Update(int(n)) c.flush() case <-c.quit: - if c.pongTimer != nil { - if !c.pongTimer.Stop() { - <-c.pongTimer.C - } - drain(c.pongTimeoutCh) - } + c.stopPongTimer() break FOR_LOOP case <-c.send: // Send some msgPackets @@ -482,6 +477,7 @@ FOR_LOOP: switch pktType { case packetTypePing: // TODO: prevent abuse, as they cause flush()'s. + // https://github.com/tendermint/tendermint/issues/1190 c.Logger.Debug("Receive Ping") select { case c.pong <- struct{}{}: @@ -490,12 +486,7 @@ FOR_LOOP: } case packetTypePong: c.Logger.Debug("Receive Pong") - if c.pongTimer != nil { - if !c.pongTimer.Stop() { - <-c.pongTimer.C - } - drain(c.pongTimeoutCh) - } + c.stopPongTimer() case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err) @@ -543,6 +534,15 @@ FOR_LOOP: } } +func (c *MConnection) stopPongTimer() { + if c.pongTimer != nil { + if !c.pongTimer.Stop() { + <-c.pongTimer.C + } + drain(c.pongTimeoutCh) + } +} + type ConnectionStatus struct { Duration time.Duration SendMonitor flow.Status diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index acfa8032..270b4ae9 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -145,7 +145,7 @@ func TestMConnectionPongTimeoutResultsInError(t *testing.T) { }() <-serverGotPing - pongTimerExpired := mconn.config.PongTimeout + 10*time.Millisecond + pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond select { case msgBytes := <-receivedCh: t.Fatalf("Expected error, but got %v", msgBytes) @@ -174,7 +174,7 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) { require.Nil(t, err) defer mconn.Stop() - // sending 3 pongs in a row + // sending 3 pongs in a row (abuse) _, err = server.Write([]byte{packetTypePong}) require.Nil(t, err) _, err = server.Write([]byte{packetTypePong}) @@ -184,8 +184,9 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) { serverGotPing := make(chan struct{}) go func() { - // read ping - server.Read(make([]byte, 1)) + // read ping (one byte) + _, err = server.Read(make([]byte, 1)) + require.Nil(t, err) serverGotPing <- struct{}{} // respond with pong _, err = server.Write([]byte{packetTypePong}) @@ -193,7 +194,7 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) { }() <-serverGotPing - pongTimerExpired := mconn.config.PongTimeout + 10*time.Millisecond + pongTimerExpired := mconn.config.PongTimeout + 20*time.Millisecond select { case msgBytes := <-receivedCh: t.Fatalf("Expected no data, but got %v", msgBytes) @@ -204,6 +205,41 @@ func TestMConnectionMultiplePongsInTheBeginning(t *testing.T) { } } +func TestMConnectionMultiplePings(t *testing.T) { + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + receivedCh := make(chan []byte) + errorsCh := make(chan interface{}) + onReceive := func(chID byte, msgBytes []byte) { + receivedCh <- msgBytes + } + onError := func(r interface{}) { + errorsCh <- r + } + mconn := createMConnectionWithCallbacks(client, onReceive, onError) + err := mconn.Start() + require.Nil(t, err) + defer mconn.Stop() + + // sending 3 pings in a row (abuse) + _, err = server.Write([]byte{packetTypePing}) + require.Nil(t, err) + _, err = server.Read(make([]byte, 1)) + require.Nil(t, err) + _, err = server.Write([]byte{packetTypePing}) + require.Nil(t, err) + _, err = server.Read(make([]byte, 1)) + require.Nil(t, err) + _, err = server.Write([]byte{packetTypePing}) + require.Nil(t, err) + _, err = server.Read(make([]byte, 1)) + require.Nil(t, err) + + assert.True(t, mconn.IsRunning()) +} + func TestMConnectionPingPongs(t *testing.T) { server, client := net.Pipe() defer server.Close() @@ -241,7 +277,7 @@ func TestMConnectionPingPongs(t *testing.T) { }() <-serverGotPing - pongTimerExpired := (mconn.config.PongTimeout + 10*time.Millisecond) * 2 + pongTimerExpired := (mconn.config.PongTimeout + 20*time.Millisecond) * 2 select { case msgBytes := <-receivedCh: t.Fatalf("Expected no data, but got %v", msgBytes) From 45750e1b299f1cb230397ca6dd246cc993a32b70 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 9 Feb 2018 15:16:22 +0400 Subject: [PATCH 17/18] fix race by sending signal instead of stopping pongTimer --- p2p/conn/connection.go | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 6fbb425e..938c3eb2 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -89,7 +89,7 @@ type MConnection struct { // close conn if pong is not received in pongTimeout pongTimer *time.Timer - pongTimeoutCh chan struct{} + pongTimeoutCh chan bool // true - timeout, false - peer sent pong chStatsTimer *cmn.RepeatTimer // update channel stats periodically @@ -191,7 +191,7 @@ func (c *MConnection) OnStart() error { c.quit = make(chan struct{}) c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval) - c.pongTimeoutCh = make(chan struct{}) + c.pongTimeoutCh = make(chan bool) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats) go c.sendRoutine() go c.recvRoutine() @@ -339,19 +339,22 @@ FOR_LOOP: c.sendMonitor.Update(int(n)) c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout) c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() { - c.pongTimeoutCh <- struct{}{} + c.pongTimeoutCh <- true }) c.flush() - case <-c.pongTimeoutCh: - c.Logger.Debug("Pong timeout") - err = errors.New("pong timeout") + case timeout := <-c.pongTimeoutCh: + if timeout { + c.Logger.Debug("Pong timeout") + err = errors.New("pong timeout") + } else { + c.stopPongTimer() + } case <-c.pong: c.Logger.Debug("Send Pong") wire.WriteByte(packetTypePong, c.bufWriter, &n, &err) c.sendMonitor.Update(int(n)) c.flush() case <-c.quit: - c.stopPongTimer() break FOR_LOOP case <-c.send: // Send some msgPackets @@ -376,6 +379,7 @@ FOR_LOOP: } // Cleanup + c.stopPongTimer() } // Returns true if messages from channels were exhausted. @@ -486,7 +490,11 @@ FOR_LOOP: } case packetTypePong: c.Logger.Debug("Receive Pong") - c.stopPongTimer() + select { + case c.pongTimeoutCh <- false: + case <-c.quit: + break FOR_LOOP + } case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err) @@ -534,12 +542,14 @@ FOR_LOOP: } } +// not goroutine-safe func (c *MConnection) stopPongTimer() { if c.pongTimer != nil { if !c.pongTimer.Stop() { <-c.pongTimer.C } drain(c.pongTimeoutCh) + c.pongTimer = nil } } @@ -771,7 +781,7 @@ func (p msgPacket) String() string { return fmt.Sprintf("MsgPacket{%X:%X T:%X}", p.ChannelID, p.Bytes, p.EOF) } -func drain(ch <-chan struct{}) { +func drain(ch <-chan bool) { for { select { case <-ch: From 22b038810aee12a0e4c647abb7ec1499c409c7b8 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 9 Feb 2018 21:58:02 +0400 Subject: [PATCH 18/18] do not block in recvRoutine --- p2p/conn/connection.go | 8 ++++---- p2p/conn/connection_test.go | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 938c3eb2..46e36301 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -153,7 +153,7 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec sendMonitor: flow.New(0, 0), recvMonitor: flow.New(0, 0), send: make(chan struct{}, 1), - pong: make(chan struct{}), + pong: make(chan struct{}, 1), onReceive: onReceive, onError: onError, config: config, @@ -191,7 +191,7 @@ func (c *MConnection) OnStart() error { c.quit = make(chan struct{}) c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval) - c.pongTimeoutCh = make(chan bool) + c.pongTimeoutCh = make(chan bool, 1) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats) go c.sendRoutine() go c.recvRoutine() @@ -492,8 +492,8 @@ FOR_LOOP: c.Logger.Debug("Receive Pong") select { case c.pongTimeoutCh <- false: - case <-c.quit: - break FOR_LOOP + default: + // never block } case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 270b4ae9..d308ea61 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -22,10 +22,10 @@ func createTestMConnection(conn net.Conn) *MConnection { } func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *MConnection { - chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} cfg := DefaultMConnConfig() cfg.PingInterval = 90 * time.Millisecond cfg.PongTimeout = 45 * time.Millisecond + chDescs := []*ChannelDescriptor{&ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}} c := NewMConnectionWithConfig(conn, chDescs, onReceive, onError, cfg) c.SetLogger(log.TestingLogger()) return c @@ -224,6 +224,7 @@ func TestMConnectionMultiplePings(t *testing.T) { defer mconn.Stop() // sending 3 pings in a row (abuse) + // see https://github.com/tendermint/tendermint/issues/1190 _, err = server.Write([]byte{packetTypePing}) require.Nil(t, err) _, err = server.Read(make([]byte, 1))