add test, TrySend in broadcast

This commit is contained in:
zbo14 2017-11-01 23:13:18 -04:00 committed by Anton Kaliaev
parent e7bc946760
commit 1d16df6a92
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
4 changed files with 112 additions and 25 deletions

View File

@ -2,6 +2,7 @@ package conn
import (
"bufio"
"errors"
"fmt"
"io"
"math"
@ -22,6 +23,7 @@ const (
minWriteBufferSize = 65536
updateStats = 2 * time.Second
pingTimeout = 40 * time.Second
pongTimeout = 60 * time.Second
// some of these defaults are written in the user config
// flushThrottle, sendRate, recvRate
@ -84,6 +86,7 @@ type MConnection struct {
quit chan struct{}
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
pingTimer *cmn.RepeatTimer // send pings periodically
pongTimer *cmn.ThrottleTimer // close conn if pong not recv in 1 min
chStatsTimer *cmn.RepeatTimer // update channel stats periodically
created time.Time // time of creation
@ -170,6 +173,7 @@ func (c *MConnection) OnStart() error {
c.quit = make(chan struct{})
c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
c.pongTimer = cmn.NewThrottleTimer("pong", pongTimeout)
c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
go c.sendRoutine()
go c.recvRoutine()
@ -181,6 +185,7 @@ func (c *MConnection) OnStop() {
c.BaseService.OnStop()
c.flushTimer.Stop()
c.pingTimer.Stop()
c.pongTimer.Stop()
c.chStatsTimer.Stop()
if c.quit != nil {
close(c.quit)
@ -315,7 +320,12 @@ FOR_LOOP:
c.Logger.Debug("Send Ping")
wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
c.sendMonitor.Update(int(n))
c.flush()
go c.flush()
c.Logger.Debug("Starting pong timer")
c.pongTimer.Set()
case <-c.pongTimer.Ch:
c.Logger.Debug("Pong timeout")
err = errors.New("pong timeout")
case <-c.pong:
c.Logger.Debug("Send Pong")
wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
@ -454,8 +464,8 @@ FOR_LOOP:
// never block
}
case packetTypePong:
// do nothing
c.Logger.Debug("Receive Pong")
c.pongTimer.Unset()
case packetTypeMsg:
pkt, n, err := msgPacket{}, int(0), error(nil)
wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err)

View File

@ -116,6 +116,37 @@ func TestMConnectionStatus(t *testing.T) {
assert.Zero(status.Channels[0].SendQueueSize)
}
func TestPingPongTimeout(t *testing.T) {
assert, require := assert.New(t), require.New(t)
server, client := net.Pipe()
defer server.Close()
defer client.Close()
receivedCh := make(chan []byte)
errorsCh := make(chan interface{})
onReceive := func(chID byte, msgBytes []byte) {
receivedCh <- msgBytes
}
onError := func(r interface{}) {
errorsCh <- r
}
mconn := createMConnectionWithCallbacks(client, onReceive, onError)
_, err := mconn.Start()
require.Nil(err)
defer mconn.Stop()
select {
case receivedBytes := <-receivedCh:
t.Fatalf("Expected error, got %v", receivedBytes)
case err := <-errorsCh:
assert.NotNil(err)
assert.False(mconn.IsRunning())
case <-time.After(500*time.Millisecond + 100*time.Second):
t.Fatal("Did not receive error in ~(pingTimeout + pongTimeout) seconds")
}
}
func TestMConnectionStopsAndReturnsError(t *testing.T) {
assert, require := assert.New(t), require.New(t)

View File

@ -200,9 +200,44 @@ func (sw *Switch) OnStop() {
//---------------------------------------------------------------------
// Peers
// Peers returns the set of peers that are connected to the switch.
func (sw *Switch) Peers() IPeerSet {
return sw.peers
// Broadcast runs a go routine for each attempted send, which will block
// trying to send for defaultSendTimeoutSeconds. Returns a channel
// which receives broadcast result for each attempted send (success=false if times out).
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
// TODO: Something more intelligent.
type BroadcastResult struct {
PeerKey string
Success bool
}
func (sw *Switch) Broadcast(chID byte, msg interface{}) chan BroadcastResult {
successChan := make(chan BroadcastResult, len(sw.peers.List()))
sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg)
for _, peer := range sw.peers.List() {
go func(peer Peer) {
success := peer.Send(chID, msg)
successChan <- BroadcastResult{peer.Key(), success}
}(peer)
}
return successChan
}
func (sw *Switch) TryBroadcast(chID byte, msg interface{}) chan BroadcastResult {
successChan := make(chan BroadcastResult, len(sw.peers.List()))
sw.Logger.Debug("TryBroadcast", "channel", chID, "msg", msg)
for _, peer := range sw.peers.List() {
success := peer.TrySend(chID, msg)
if success {
successChan <- BroadcastResult{peer.Key(), success}
} else {
go func(peer Peer) {
success := peer.Send(chID, msg)
successChan <- BroadcastResult{peer.Key(), success}
}(peer)
}
}
return successChan
}
// NumPeers returns the count of outbound/inbound and outbound-dialing peers.
@ -219,21 +254,9 @@ func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
return
}
// Broadcast runs a go routine for each attempted send, which will block
// trying to send for defaultSendTimeoutSeconds. Returns a channel
// which receives success values for each attempted send (false if times out).
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
// TODO: Something more intelligent.
func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
successChan := make(chan bool, len(sw.peers.List()))
sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg)
for _, peer := range sw.peers.List() {
go func(peer Peer) {
success := peer.Send(chID, msg)
successChan <- success
}(peer)
}
return successChan
// Peers returns the set of peers that are connected to the switch.
func (sw *Switch) Peers() IPeerSet {
return sw.peers
}
// StopPeerForError disconnects from a peer due to external error.

View File

@ -128,10 +128,12 @@ func TestSwitches(t *testing.T) {
ch0Msg := "channel zero"
ch1Msg := "channel foo"
ch2Msg := "channel bar"
ch3Msg := "channel baz"
s1.Broadcast(byte(0x00), ch0Msg)
s1.Broadcast(byte(0x01), ch1Msg)
s1.Broadcast(byte(0x02), ch2Msg)
s1.TryBroadcast(byte(0x03), ch3Msg)
assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
@ -324,12 +326,15 @@ func BenchmarkSwitches(b *testing.B) {
numSuccess, numFailure := 0, 0
// Send random message from foo channel to another
// Send random message from foo channel to another with Broadcast
for i := 0; i < b.N; i++ {
chID := byte(i % 4)
successChan := s1.Broadcast(chID, "test data")
for s := range successChan {
if s {
resultChan := s1.Broadcast(chID, "test data")
for res := range resultChan {
if !s1.peers.Has(res.PeerKey) {
b.Errorf("unexpected peerKey: %s", res.PeerKey)
}
if res.Success {
numSuccess++
} else {
numFailure++
@ -337,7 +342,25 @@ func BenchmarkSwitches(b *testing.B) {
}
}
b.Logf("success: %v, failure: %v", numSuccess, numFailure)
b.Logf("Broadcast: success: %v, failure: %v", numSuccess, numFailure)
// Send random message from foo channel to another with TryBroadcast
for i := 0; i < b.N; i++ {
chID := byte(i % 4)
resultChan := s1.TryBroadcast(chID, "test data")
for res := range resultChan {
if !s1.peers.Has(res.PeerKey) {
b.Errorf("unexpected peerKey: %s", res.PeerKey)
}
if res.Success {
numSuccess++
} else {
numFailure++
}
}
}
b.Logf("TryBroadcast: success: %v, failure: %v", numSuccess, numFailure)
// Allow everything to flush before stopping switches & closing connections.
b.StopTimer()