move dialSeedsIfAddrBookIsEmptyOrPEXFailedToConnect into PEX reactor

This commit is contained in:
Anton Kaliaev 2018-01-09 17:09:09 -06:00
parent ef0493ddf3
commit 705d51aa42
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
4 changed files with 24 additions and 37 deletions

View File

@ -8,7 +8,6 @@ import (
"net" "net"
"net/http" "net/http"
"strings" "strings"
"time"
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
@ -256,7 +255,8 @@ func NewNode(config *cfg.Config,
trustMetricStore = trust.NewTrustMetricStore(trustHistoryDB, trust.DefaultConfig()) trustMetricStore = trust.NewTrustMetricStore(trustHistoryDB, trust.DefaultConfig())
trustMetricStore.SetLogger(p2pLogger) trustMetricStore.SetLogger(p2pLogger)
pexReactor := p2p.NewPEXReactor(addrBook) pexReactor := p2p.NewPEXReactor(addrBook,
&p2p.PEXReactorConfig{Seeds: strings.Split(config.P2P.Seeds, ",")})
pexReactor.SetLogger(p2pLogger) pexReactor.SetLogger(p2pLogger)
sw.AddReactor("PEX", pexReactor) sw.AddReactor("PEX", pexReactor)
} }
@ -388,38 +388,10 @@ func (n *Node) OnStart() error {
} }
} }
if n.config.P2P.Seeds != "" {
err = n.dialSeedsIfAddrBookIsEmptyOrPEXFailedToConnect(strings.Split(n.config.P2P.Seeds, ","))
if err != nil {
return err
}
}
// start tx indexer // start tx indexer
return n.indexerService.Start() return n.indexerService.Start()
} }
func (n *Node) dialSeedsIfAddrBookIsEmptyOrPEXFailedToConnect(seeds []string) error {
// prefer peers from address book
if n.config.P2P.PexReactor && n.addrBook.Size() > 0 {
// give some time for PexReactor to connect us to other peers
const fallbackToSeedsAfter = 30 * time.Second
go func() {
time.Sleep(fallbackToSeedsAfter)
// fallback to dialing seeds if for some reason we can't connect to any
// peers
outbound, inbound, _ := n.sw.NumPeers()
if n.IsRunning() && outbound+inbound == 0 {
// TODO: ignore error?
n.sw.DialPeersAsync(n.addrBook, seeds, false)
}
}()
return nil
}
return n.sw.DialPeersAsync(n.addrBook, seeds, false)
}
// OnStop stops the Node. It implements cmn.Service. // OnStop stops the Node. It implements cmn.Service.
func (n *Node) OnStop() { func (n *Node) OnStop() {
n.BaseService.OnStop() n.BaseService.OnStop()

View File

@ -45,6 +45,7 @@ type PEXReactor struct {
BaseReactor BaseReactor
book *AddrBook book *AddrBook
config *PEXReactorConfig
ensurePeersPeriod time.Duration ensurePeersPeriod time.Duration
// tracks message count by peer, so we can prevent abuse // tracks message count by peer, so we can prevent abuse
@ -52,10 +53,18 @@ type PEXReactor struct {
maxMsgCountByPeer uint16 maxMsgCountByPeer uint16
} }
// PEXReactorConfig holds reactor specific configuration data.
type PEXReactorConfig struct {
// Seeds is a list of addresses reactor may use if it can't connect to peers
// in the addrbook.
Seeds []string
}
// NewPEXReactor creates new PEX reactor. // NewPEXReactor creates new PEX reactor.
func NewPEXReactor(b *AddrBook) *PEXReactor { func NewPEXReactor(b *AddrBook, config *PEXReactorConfig) *PEXReactor {
r := &PEXReactor{ r := &PEXReactor{
book: b, book: b,
config: config,
ensurePeersPeriod: defaultEnsurePeersPeriod, ensurePeersPeriod: defaultEnsurePeersPeriod,
msgCountByPeer: cmn.NewCMap(), msgCountByPeer: cmn.NewCMap(),
maxMsgCountByPeer: defaultMaxMsgCountByPeer, maxMsgCountByPeer: defaultMaxMsgCountByPeer,
@ -238,7 +247,7 @@ func (r *PEXReactor) ensurePeersRoutine() {
// placeholder. It should not be the case that an address becomes old/vetted // placeholder. It should not be the case that an address becomes old/vetted
// upon a single successful connection. // upon a single successful connection.
func (r *PEXReactor) ensurePeers() { func (r *PEXReactor) ensurePeers() {
numOutPeers, _, numDialing := r.Switch.NumPeers() numOutPeers, numInPeers, numDialing := r.Switch.NumPeers()
numToDial := minNumOutboundPeers - (numOutPeers + numDialing) numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial) r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
if numToDial <= 0 { if numToDial <= 0 {
@ -291,6 +300,12 @@ func (r *PEXReactor) ensurePeers() {
r.RequestPEX(peer) r.RequestPEX(peer)
} }
} }
// If we can't connect to any known address, fallback to dialing seeds
if numOutPeers+numInPeers+numDialing == 0 {
r.Logger.Info("No addresses to dial nor connected peers. Will dial seeds", "seeds", r.config.Seeds)
r.Switch.DialPeersAsync(r.book, r.config.Seeds, false)
}
} }
func (r *PEXReactor) flushMsgCountByPeer() { func (r *PEXReactor) flushMsgCountByPeer() {

View File

@ -24,7 +24,7 @@ func TestPEXReactorBasic(t *testing.T) {
book := NewAddrBook(dir+"addrbook.json", true) book := NewAddrBook(dir+"addrbook.json", true)
book.SetLogger(log.TestingLogger()) book.SetLogger(log.TestingLogger())
r := NewPEXReactor(book) r := NewPEXReactor(book, &PEXReactorConfig{})
r.SetLogger(log.TestingLogger()) r.SetLogger(log.TestingLogger())
assert.NotNil(r) assert.NotNil(r)
@ -40,7 +40,7 @@ func TestPEXReactorAddRemovePeer(t *testing.T) {
book := NewAddrBook(dir+"addrbook.json", true) book := NewAddrBook(dir+"addrbook.json", true)
book.SetLogger(log.TestingLogger()) book.SetLogger(log.TestingLogger())
r := NewPEXReactor(book) r := NewPEXReactor(book, &PEXReactorConfig{})
r.SetLogger(log.TestingLogger()) r.SetLogger(log.TestingLogger())
size := book.Size() size := book.Size()
@ -76,7 +76,7 @@ func TestPEXReactorRunning(t *testing.T) {
switches[i] = makeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { switches[i] = makeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
sw.SetLogger(log.TestingLogger().With("switch", i)) sw.SetLogger(log.TestingLogger().With("switch", i))
r := NewPEXReactor(book) r := NewPEXReactor(book, &PEXReactorConfig{})
r.SetLogger(log.TestingLogger()) r.SetLogger(log.TestingLogger())
r.SetEnsurePeersPeriod(250 * time.Millisecond) r.SetEnsurePeersPeriod(250 * time.Millisecond)
sw.AddReactor("pex", r) sw.AddReactor("pex", r)
@ -141,7 +141,7 @@ func TestPEXReactorReceive(t *testing.T) {
book := NewAddrBook(dir+"addrbook.json", false) book := NewAddrBook(dir+"addrbook.json", false)
book.SetLogger(log.TestingLogger()) book.SetLogger(log.TestingLogger())
r := NewPEXReactor(book) r := NewPEXReactor(book, &PEXReactorConfig{})
r.SetLogger(log.TestingLogger()) r.SetLogger(log.TestingLogger())
peer := createRandomPeer(false) peer := createRandomPeer(false)
@ -166,7 +166,7 @@ func TestPEXReactorAbuseFromPeer(t *testing.T) {
book := NewAddrBook(dir+"addrbook.json", true) book := NewAddrBook(dir+"addrbook.json", true)
book.SetLogger(log.TestingLogger()) book.SetLogger(log.TestingLogger())
r := NewPEXReactor(book) r := NewPEXReactor(book, &PEXReactorConfig{})
r.SetLogger(log.TestingLogger()) r.SetLogger(log.TestingLogger())
r.SetMaxMsgCountByPeer(5) r.SetMaxMsgCountByPeer(5)