added a test for PEX reactor seed mode
This commit is contained in:
parent
39d8da3536
commit
949211a137
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
@ -334,6 +335,273 @@ func (r *PEXReactor) checkSeeds() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Explores the network searching for more peers. (continuous)
|
||||
// Seed/Crawler Mode causes this node to quickly disconnect
|
||||
// from peers, except other seed nodes.
|
||||
func (r *PEXReactor) seedCrawlerMode() {
|
||||
// Do an initial crawl
|
||||
r.crawlPeers()
|
||||
|
||||
// Fire periodically
|
||||
ticker := time.NewTicker(defaultSeedModePeriod)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
r.attemptDisconnects()
|
||||
r.crawlPeers()
|
||||
case <-r.Quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// crawlStatus handles temporary data needed for the
|
||||
// network crawling performed during seed/crawler mode.
|
||||
type crawlStatus struct {
|
||||
// The remote address of a potential peer we learned about
|
||||
Addr *NetAddress
|
||||
|
||||
// Not empty if we are connected to the address
|
||||
PeerID string
|
||||
|
||||
// The last time we attempt to reach this address
|
||||
LastAttempt time.Time
|
||||
|
||||
// The last time we successfully reached this address
|
||||
LastSuccess time.Time
|
||||
}
|
||||
|
||||
// oldestFirst implements sort.Interface for []crawlStatus
|
||||
// based on the LastAttempt field.
|
||||
type oldestFirst []crawlStatus
|
||||
|
||||
func (of oldestFirst) Len() int { return len(of) }
|
||||
func (of oldestFirst) Swap(i, j int) { of[i], of[j] = of[j], of[i] }
|
||||
func (of oldestFirst) Less(i, j int) bool { return of[i].LastAttempt.Before(of[j].LastAttempt) }
|
||||
|
||||
// getCrawlStatus returns addresses of potential peers that we wish to validate.
|
||||
// NOTE: The status information is ordered as described above.
|
||||
func (r *PEXReactor) getCrawlStatus() []crawlStatus {
|
||||
var of oldestFirst
|
||||
|
||||
addrs := r.book.ListOfKnownAddresses()
|
||||
// Go through all the addresses in the AddressBook
|
||||
for _, addr := range addrs {
|
||||
var peerID string
|
||||
|
||||
// Check if a peer is already connected from this addr
|
||||
if p := r.Switch.peers.GetByRemoteAddr(addr.Addr); p != nil {
|
||||
peerID = p.Key()
|
||||
}
|
||||
|
||||
of = append(of, crawlStatus{
|
||||
Addr: addr.Addr,
|
||||
PeerID: peerID,
|
||||
LastAttempt: addr.LastAttempt,
|
||||
LastSuccess: addr.LastSuccess,
|
||||
})
|
||||
}
|
||||
sort.Sort(of)
|
||||
return of
|
||||
}
|
||||
|
||||
// crawlPeers will crawl the network looking for new peer addresses. (once)
|
||||
//
|
||||
// TODO Basically, we need to work harder on our good-peer/bad-peer marking.
|
||||
// What we're currently doing in terms of marking good/bad peers is just a
|
||||
// placeholder. It should not be the case that an address becomes old/vetted
|
||||
// upon a single successful connection.
|
||||
func (r *PEXReactor) crawlPeers() {
|
||||
crawlerStatus := r.getCrawlStatus()
|
||||
|
||||
now := time.Now()
|
||||
// Use addresses we know of to reach additional peers
|
||||
for _, cs := range crawlerStatus {
|
||||
// Do not dial peers that are already connected
|
||||
if cs.PeerID != "" {
|
||||
continue
|
||||
}
|
||||
// Do not attempt to connect with peers we recently dialed
|
||||
if now.Sub(cs.LastAttempt) < defaultCrawlPeerInterval {
|
||||
continue
|
||||
}
|
||||
// Otherwise, attempt to connect with the known address
|
||||
p, err := r.Switch.DialPeerWithAddress(cs.Addr, false)
|
||||
if err != nil {
|
||||
r.book.MarkAttempt(cs.Addr)
|
||||
continue
|
||||
}
|
||||
// Enter the peer ID into our crawl status information
|
||||
cs.PeerID = p.Key()
|
||||
r.book.MarkGood(cs.Addr)
|
||||
}
|
||||
// Crawl the connected peers asking for more addresses
|
||||
for _, cs := range crawlerStatus {
|
||||
if cs.PeerID == "" {
|
||||
continue
|
||||
}
|
||||
// We will wait a minimum period of time before crawling peers again
|
||||
if now.Sub(cs.LastAttempt) >= defaultCrawlPeerInterval {
|
||||
p := r.Switch.Peers().Get(cs.PeerID)
|
||||
if p != nil {
|
||||
r.RequestPEX(p)
|
||||
r.book.MarkAttempt(cs.Addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// attemptDisconnects checks the crawlStatus info for Peers to disconnect from. (once)
|
||||
func (r *PEXReactor) attemptDisconnects() {
|
||||
crawlerStatus := r.getCrawlStatus()
|
||||
|
||||
now := time.Now()
|
||||
// Go through each peer we have connected with
|
||||
// looking for opportunities to disconnect
|
||||
for _, cs := range crawlerStatus {
|
||||
if cs.PeerID == "" {
|
||||
continue
|
||||
}
|
||||
// Remain connected to each peer for a minimum period of time
|
||||
if now.Sub(cs.LastSuccess) < defaultSeedDisconnectWaitPeriod {
|
||||
continue
|
||||
}
|
||||
// Fetch the Peer using the saved ID
|
||||
p := r.Switch.Peers().Get(cs.PeerID)
|
||||
if p == nil {
|
||||
continue
|
||||
}
|
||||
// Do not disconnect from persistent peers.
|
||||
// Specifically, we need to remain connected to other seeds
|
||||
if p.IsPersistent() {
|
||||
continue
|
||||
}
|
||||
// Otherwise, disconnect from the peer
|
||||
r.Switch.StopPeerGracefully(p)
|
||||
}
|
||||
}
|
||||
|
||||
// crawlStatus handles temporary data needed for the
|
||||
// network crawling performed during seed/crawler mode.
|
||||
type crawlStatus struct {
|
||||
// The remote address of a potential peer we learned about
|
||||
Addr *NetAddress
|
||||
|
||||
// Not empty if we are connected to the address
|
||||
PeerID string
|
||||
|
||||
// The last time we attempt to reach this address
|
||||
LastAttempt time.Time
|
||||
|
||||
// The last time we successfully reached this address
|
||||
LastSuccess time.Time
|
||||
}
|
||||
|
||||
// oldestAttempt implements sort.Interface for []crawlStatus
|
||||
// based on the LastAttempt field.
|
||||
type oldestAttempt []crawlStatus
|
||||
|
||||
func (oa oldestAttempt) Len() int { return len(oa) }
|
||||
func (oa oldestAttempt) Swap(i, j int) { oa[i], oa[j] = oa[j], oa[i] }
|
||||
func (oa oldestAttempt) Less(i, j int) bool { return oa[i].LastAttempt.Before(oa[j].LastAttempt) }
|
||||
|
||||
// getCrawlStatus returns addresses of potential peers that we wish to validate.
|
||||
// NOTE: The status information is ordered as described above.
|
||||
func (r *PEXReactor) getCrawlStatus() []crawlStatus {
|
||||
var oa oldestAttempt
|
||||
|
||||
addrs := r.book.ListOfKnownAddresses()
|
||||
// Go through all the addresses in the AddressBook
|
||||
for _, addr := range addrs {
|
||||
p := r.Switch.peers.GetByRemoteAddr(addr.Addr)
|
||||
|
||||
oa = append(oa, crawlStatus{
|
||||
Addr: addr.Addr,
|
||||
PeerID: p.Key(),
|
||||
LastAttempt: addr.LastAttempt,
|
||||
LastSuccess: addr.LastSuccess,
|
||||
})
|
||||
}
|
||||
sort.Sort(oa)
|
||||
return oa
|
||||
}
|
||||
|
||||
// crawlPeers will crawl the network looking for new peer addresses. (once)
|
||||
//
|
||||
// TODO Basically, we need to work harder on our good-peer/bad-peer marking.
|
||||
// What we're currently doing in terms of marking good/bad peers is just a
|
||||
// placeholder. It should not be the case that an address becomes old/vetted
|
||||
// upon a single successful connection.
|
||||
func (r *PEXReactor) crawlPeers() {
|
||||
crawlerStatus := r.getCrawlStatus()
|
||||
|
||||
now := time.Now()
|
||||
// Use addresses we know of to reach additional peers
|
||||
for _, cs := range crawlerStatus {
|
||||
// Do not dial peers that are already connected
|
||||
if cs.PeerID != "" {
|
||||
continue
|
||||
}
|
||||
// Do not attempt to connect with peers we recently dialed
|
||||
if now.Sub(cs.LastAttempt) < defaultCrawlPeerInterval {
|
||||
continue
|
||||
}
|
||||
// Otherwise, attempt to connect with the known address
|
||||
p, err := r.Switch.DialPeerWithAddress(cs.Addr, false)
|
||||
if err != nil {
|
||||
r.book.MarkAttempt(cs.Addr)
|
||||
continue
|
||||
}
|
||||
// Enter the peer ID into our crawl status information
|
||||
cs.PeerID = p.Key()
|
||||
r.book.MarkGood(cs.Addr)
|
||||
}
|
||||
// Crawl the connected peers asking for more addresses
|
||||
for _, cs := range crawlerStatus {
|
||||
if cs.PeerID == "" {
|
||||
continue
|
||||
}
|
||||
// We will wait a minimum period of time before crawling peers again
|
||||
if now.Sub(cs.LastAttempt) >= defaultCrawlPeerInterval {
|
||||
p := r.Switch.peers.Get(cs.PeerID)
|
||||
if p != nil {
|
||||
r.RequestPEX(p)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// attemptDisconnects checks the crawlStatus info for Peers to disconnect from. (once)
|
||||
func (r *PEXReactor) attemptDisconnects() {
|
||||
crawlerStatus := r.getCrawlStatus()
|
||||
|
||||
now := time.Now()
|
||||
// Go through each peer we have connected with
|
||||
// looking for opportunities to disconnect
|
||||
for _, cs := range crawlerStatus {
|
||||
if cs.PeerID == "" {
|
||||
continue
|
||||
}
|
||||
// Remain connected to each peer for a minimum period of time
|
||||
if now.Sub(cs.LastSuccess) < defaultSeedDisconnectWaitPeriod {
|
||||
continue
|
||||
}
|
||||
// Fetch the Peer using the saved ID
|
||||
p := r.Switch.peers.Get(cs.PeerID)
|
||||
if p == nil {
|
||||
continue
|
||||
}
|
||||
// Do not disconnect from persistent peers.
|
||||
// Specifically, we need to remain connected to other seeds
|
||||
if p.IsPersistent() {
|
||||
continue
|
||||
}
|
||||
// Otherwise, disconnect from the peer
|
||||
r.Switch.StopPeerGracefully(p)
|
||||
}
|
||||
}
|
||||
|
||||
// randomly dial seeds until we connect to one or exhaust them
|
||||
func (r *PEXReactor) dialSeed() {
|
||||
lSeeds := len(r.config.Seeds)
|
||||
|
|
|
@ -286,6 +286,51 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
|
|||
assertSomePeersWithTimeout(t, []*Switch{sw}, 10*time.Millisecond, 10*time.Second)
|
||||
}
|
||||
|
||||
func TestPEXReactorCrawlStatus(t *testing.T) {
|
||||
assert, require := assert.New(t), require.New(t)
|
||||
|
||||
dir, err := ioutil.TempDir("", "pex_reactor")
|
||||
require.Nil(err)
|
||||
defer os.RemoveAll(dir) // nolint: errcheck
|
||||
book := NewAddrBook(dir+"addrbook.json", false)
|
||||
book.SetLogger(log.TestingLogger())
|
||||
|
||||
var r *PEXReactor
|
||||
// Seed/Crawler mode uses data from the Switch
|
||||
makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
|
||||
r = NewPEXReactor(book, true)
|
||||
r.SetLogger(log.TestingLogger())
|
||||
sw.SetLogger(log.TestingLogger().With("switch", i))
|
||||
sw.AddReactor("pex", r)
|
||||
return sw
|
||||
})
|
||||
|
||||
// Create a peer, and add it to the peer set
|
||||
peer := createRandomPeer(false)
|
||||
r.Switch.peers.Add(peer)
|
||||
// Add the peer address to the address book
|
||||
addr1, _ := NewNetAddressString(peer.NodeInfo().ListenAddr)
|
||||
r.book.AddAddress(addr1, addr1)
|
||||
// Add an address to the book that does not have a peer
|
||||
_, addr2 := createRoutableAddr()
|
||||
r.book.AddAddress(addr2, addr1)
|
||||
|
||||
// Get the crawl status data
|
||||
status := r.getCrawlStatus()
|
||||
|
||||
// Make sure it has the proper number of elements
|
||||
assert.Equal(2, len(status))
|
||||
|
||||
var num int
|
||||
for _, cs := range status {
|
||||
if cs.PeerID != "" {
|
||||
num++
|
||||
}
|
||||
}
|
||||
// Check that only one has been identified as a connected peer
|
||||
assert.Equal(1, num)
|
||||
}
|
||||
|
||||
func createRoutableAddr() (addr string, netAddr *NetAddress) {
|
||||
for {
|
||||
addr = cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256)
|
||||
|
|
Loading…
Reference in New Issue