Merge pull request #955 from tendermint/939-p2p-exponential-backoff-on-reconnect

p2p: exponential backoff on reconnect. closes #939
This commit is contained in:
Ethan Buchman 2017-12-11 14:25:39 -05:00 committed by GitHub
commit 24a9491203
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 78 additions and 33 deletions

View File

@ -2,6 +2,7 @@ package p2p
import (
"fmt"
"math"
"math/rand"
"net"
"time"
@ -14,8 +15,19 @@ import (
)
const (
reconnectAttempts = 30
reconnectInterval = 3 * time.Second
// wait a random amount of time from this interval
// before dialing seeds or reconnecting to help prevent DoS
dialRandomizerIntervalMilliseconds = 3000
// repeatedly try to reconnect for a few minutes
// ie. 5 * 20 = 100s
reconnectAttempts = 20
reconnectInterval = 5 * time.Second
// then move into exponential backoff mode for ~1day
// ie. 3**10 = 16hrs
reconnectBackOffAttempts = 10
reconnectBackOffBaseSeconds = 3
)
type Reactor interface {
@ -74,6 +86,8 @@ type Switch struct {
filterConnByAddr func(net.Addr) error
filterConnByPubKey func(crypto.PubKeyEd25519) error
rng *rand.Rand // seed for randomizing dial times and orders
}
var (
@ -92,6 +106,10 @@ func NewSwitch(config *cfg.P2PConfig) *Switch {
nodeInfo: nil,
}
// Ensure we have a completely undeterministic PRNG. cmd.RandInt64() draws
// from a seed that's initialized with OS entropy on process start.
sw.rng = rand.New(rand.NewSource(cmn.RandInt64()))
// TODO: collapse the peerConfig into the config ?
sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond
sw.peerConfig.MConfig.SendRate = config.SendRate
@ -317,15 +335,11 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
addrBook.Save()
}
// Ensure we have a completely undeterministic PRNG. cmd.RandInt64() draws
// from a seed that's initialized with OS entropy on process start.
rng := rand.New(rand.NewSource(cmn.RandInt64()))
// permute the list, dial them in random order.
perm := rng.Perm(len(netAddrs))
perm := sw.rng.Perm(len(netAddrs))
for i := 0; i < len(perm); i++ {
go func(i int) {
time.Sleep(time.Duration(rng.Int63n(3000)) * time.Millisecond)
sw.randomSleep(0)
j := perm[i]
sw.dialSeed(netAddrs[j])
}(i)
@ -333,6 +347,12 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
return nil
}
// sleep for interval plus some random amount of ms on [0, dialRandomizerIntervalMilliseconds]
func (sw *Switch) randomSleep(interval time.Duration) {
r := time.Duration(sw.rng.Int63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond
time.Sleep(r + interval)
}
func (sw *Switch) dialSeed(addr *NetAddress) {
peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil {
@ -413,36 +433,61 @@ func (sw *Switch) Peers() IPeerSet {
// If the peer is persistent, it will attempt to reconnect.
// TODO: make record depending on reason.
func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
addr, _ := NewNetAddressString(peer.NodeInfo().RemoteAddr)
sw.Logger.Error("Stopping peer for error", "peer", peer, "err", reason)
sw.stopAndRemovePeer(peer, reason)
if peer.IsPersistent() {
go func() {
sw.Logger.Info("Reconnecting to peer", "peer", peer)
for i := 1; i < reconnectAttempts; i++ {
if !sw.IsRunning() {
return
}
peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil {
if i == reconnectAttempts {
sw.Logger.Info("Error reconnecting to peer. Giving up", "tries", i, "err", err)
return
}
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err)
time.Sleep(reconnectInterval)
continue
}
sw.Logger.Info("Reconnected to peer", "peer", peer)
return
}
}()
go sw.reconnectToPeer(peer)
}
}
// reconnectToPeer tries to reconnect to the peer, first repeatedly
// with a fixed interval, then with exponential backoff.
// If no success after all that, it stops trying, and leaves it
// to the PEX/Addrbook to find the peer again
func (sw *Switch) reconnectToPeer(peer Peer) {
addr, _ := NewNetAddressString(peer.NodeInfo().RemoteAddr)
start := time.Now()
sw.Logger.Info("Reconnecting to peer", "peer", peer)
for i := 0; i < reconnectAttempts; i++ {
if !sw.IsRunning() {
return
}
peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil {
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer)
// sleep a set amount
sw.randomSleep(reconnectInterval)
continue
} else {
sw.Logger.Info("Reconnected to peer", "peer", peer)
return
}
}
sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff",
"peer", peer, "elapsed", time.Since(start))
for i := 0; i < reconnectBackOffAttempts; i++ {
if !sw.IsRunning() {
return
}
// sleep an exponentially increasing amount
sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
peer, err := sw.DialPeerWithAddress(addr, true)
if err != nil {
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "peer", peer)
continue
} else {
sw.Logger.Info("Reconnected to peer", "peer", peer)
return
}
}
sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start))
}
// StopPeerGracefully disconnects from a peer gracefully.
// TODO: handle graceful disconnects.
func (sw *Switch) StopPeerGracefully(peer Peer) {

View File

@ -272,10 +272,10 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
// simulate failure by closing connection
peer.CloseConn()
// TODO: actually detect the disconnection and wait for reconnect
// TODO: remove sleep, detect the disconnection, wait for reconnect
npeers := sw.Peers().Size()
for i := 0; i < 20; i++ {
time.Sleep(100 * time.Millisecond)
time.Sleep(250 * time.Millisecond)
npeers = sw.Peers().Size()
if npeers > 0 {
break