dnsseed,zcash: implement slow-motion recursive crawling

This will do one hop of additional graph crawl every time the update
interval fires. It will blacklist peers who are not consistently
available and over time should converge to the set of stable peers
suitable for DNS bootstrapping.
This commit is contained in:
George Tankersley 2020-05-22 01:02:32 -04:00
parent e06186309c
commit 814f8cb850
5 changed files with 142 additions and 21 deletions

View File

@ -65,18 +65,19 @@ func setup(c *caddy.Controller) error {
// Send the initial request for more addresses; spawns goroutines to process the responses. // Send the initial request for more addresses; spawns goroutines to process the responses.
// Ready() will flip to true once we've received and confirmed at least 10 peers. // Ready() will flip to true once we've received and confirmed at least 10 peers.
seeder.RequestAddresses() go func() {
// TODO load from storage if we already know some peers
log.Infof("Getting addresses from bootstrap peer %s:%s", address, port)
seeder.RequestAddresses()
runCrawl(seeder)
}()
// Start the update timer
go func() { go func() {
for { for {
select { select {
case <-time.After(updateInterval): case <-time.After(updateInterval):
seeder.RequestAddresses() runCrawl(seeder)
err := seeder.WaitForAddresses(10, 30*time.Second)
if err != nil {
log.Errorf("Failed to refresh addresses: %v", err)
}
// XXX: If we wanted to crawl independently, this would be the place.
} }
} }
}() }()
@ -93,3 +94,25 @@ func setup(c *caddy.Controller) error {
// All OK, return a nil error. // All OK, return a nil error.
return nil return nil
} }
func runCrawl(seeder *zcash.Seeder) {
log.Info("Beginning crawl")
start := time.Now()
// Slow motion crawl: we'll get them all eventually!
// 1. Make sure our addresses are still live and leave the
// connections open (true would disconnect immediately).
seeder.RefreshAddresses(false)
// 2. Request addresses from everyone we're connected to,
// synchronously. This will block a while in an attempt
// to catch all of the addr responses it can.
newPeerCount := seeder.RequestAddresses()
// 3. Disconnect from everyone & leave them alone for a while
seeder.DisconnectAllPeers()
elapsed := time.Now().Sub(start).Truncate(time.Second).Seconds()
log.Infof("Crawl complete, met %d new peers of %d in %.2f seconds", newPeerCount, seeder.GetPeerCount(), elapsed)
}

View File

@ -122,7 +122,7 @@ func (bk *AddressBook) Blacklist(s PeerKey) {
} }
} }
// Touch updates the last-seen timestamp if the peer is in the address book or does nothing if not. // Touch updates the last-seen timestamp if the peer is in the valid address book or does nothing if not.
func (bk *AddressBook) Touch(s PeerKey) { func (bk *AddressBook) Touch(s PeerKey) {
bk.addrState.Lock() bk.addrState.Lock()
defer bk.addrState.Unlock() defer bk.addrState.Unlock()
@ -132,13 +132,21 @@ func (bk *AddressBook) Touch(s PeerKey) {
} }
} }
func (bk *AddressBook) Count() int {
bk.addrState.RLock()
defer bk.addrState.RUnlock()
return len(bk.peers)
}
// IsKnown returns true if the peer is already in our address book, false if not. // IsKnown returns true if the peer is already in our address book, false if not.
func (bk *AddressBook) IsKnown(s PeerKey) bool { func (bk *AddressBook) IsKnown(s PeerKey) bool {
bk.addrState.RLock() bk.addrState.RLock()
defer bk.addrState.RUnlock() defer bk.addrState.RUnlock()
_, known := bk.peers[s] _, knownGood := bk.peers[s]
return known _, knownBad := bk.blacklist[s]
return knownGood || knownBad
} }
func (bk *AddressBook) IsBlacklisted(s PeerKey) bool { func (bk *AddressBook) IsBlacklisted(s PeerKey) bool {
@ -149,6 +157,18 @@ func (bk *AddressBook) IsBlacklisted(s PeerKey) bool {
return blacklisted return blacklisted
} }
// enqueueAddrs sends all of our known valid peers to a channel for processing
// and adds the count to a WaitGroup counter to aid processing.
func (bk *AddressBook) enqueueAddrs(addrQueue chan *wire.NetAddress, count *sync.WaitGroup) {
bk.addrState.RLock()
defer bk.addrState.RUnlock()
count.Add(len(bk.peers))
for _, v := range bk.peers {
addrQueue <- v.netaddr
}
}
// WaitForAddresses waits for n addresses to be received and their initial // WaitForAddresses waits for n addresses to be received and their initial
// connection attempts to resolve. There is no escape if that does not happen - // connection attempts to resolve. There is no escape if that does not happen -
// this is intended for test runners or goroutines with a timeout. // this is intended for test runners or goroutines with a timeout.

View File

@ -7,6 +7,7 @@ import (
"runtime" "runtime"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/addrmgr"
@ -34,7 +35,7 @@ var defaultPeerConfig = &peer.Config{
ProtocolVersion: 170009, // Blossom ProtocolVersion: 170009, // Blossom
} }
const ( var (
// The minimum number of addresses we need to know about to begin serving introductions // The minimum number of addresses we need to know about to begin serving introductions
minimumReadyAddresses = 10 minimumReadyAddresses = 10
@ -46,6 +47,9 @@ const (
// The amount of time crawler goroutines will wait for incoming addresses after a RequestAddresses() // The amount of time crawler goroutines will wait for incoming addresses after a RequestAddresses()
crawlerThreadTimeout = 30 * time.Second crawlerThreadTimeout = 30 * time.Second
// The number of goroutines to spawn for a crawl request
crawlerGoroutineCount = runtime.NumCPU() * 16
) )
// Seeder contains all of the state and configuration needed to request addresses from Zcash peers and present them to a DNS provider. // Seeder contains all of the state and configuration needed to request addresses from Zcash peers and present them to a DNS provider.
@ -73,6 +77,8 @@ func NewSeeder(network network.Network) (*Seeder, error) {
return nil, errors.Wrap(err, "could not construct seeder") return nil, errors.Wrap(err, "could not construct seeder")
} }
// sink, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0666)
// logger := log.New(sink, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC)
logger := log.New(os.Stdout, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC) logger := log.New(os.Stdout, "zcash_seeder: ", log.Ldate|log.Ltime|log.Lshortfile|log.LUTC)
newSeeder := Seeder{ newSeeder := Seeder{
@ -82,7 +88,7 @@ func NewSeeder(network network.Network) (*Seeder, error) {
pendingPeers: NewPeerMap(), pendingPeers: NewPeerMap(),
livePeers: NewPeerMap(), livePeers: NewPeerMap(),
addrBook: NewAddressBook(), addrBook: NewAddressBook(),
addrQueue: make(chan *wire.NetAddress, 100), addrQueue: make(chan *wire.NetAddress, 512),
} }
newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck
@ -111,7 +117,7 @@ func newTestSeeder(network network.Network) (*Seeder, error) {
pendingPeers: NewPeerMap(), pendingPeers: NewPeerMap(),
livePeers: NewPeerMap(), livePeers: NewPeerMap(),
addrBook: NewAddressBook(), addrBook: NewAddressBook(),
addrQueue: make(chan *wire.NetAddress, 100), addrQueue: make(chan *wire.NetAddress, 512),
} }
newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck
@ -278,8 +284,12 @@ func (s *Seeder) DisconnectAllPeers() {
// RequestAddresses sends a request for more addresses to every peer we're connected to, // RequestAddresses sends a request for more addresses to every peer we're connected to,
// then checks to make sure the addresses that come back are usable before adding them to // then checks to make sure the addresses that come back are usable before adding them to
// the address book. // the address book. The call attempts to block until all addresses have been processed,
func (s *Seeder) RequestAddresses() { // but since we can't know how many that will be it eventually times out. Therefore,
// while calling RequestAddresses synchronously is possible, it risks a major delay; most
// users will be better served by giving this its own goroutine and using WaitForAddresses
// with a timeout to pause only until a sufficient number of addresses are ready.
func (s *Seeder) RequestAddresses() int {
s.livePeers.Range(func(key PeerKey, p *peer.Peer) bool { s.livePeers.Range(func(key PeerKey, p *peer.Peer) bool {
s.logger.Printf("Requesting addresses from peer %s", p.Addr()) s.logger.Printf("Requesting addresses from peer %s", p.Addr())
p.QueueMessage(wire.NewMsgGetAddr(), nil) p.QueueMessage(wire.NewMsgGetAddr(), nil)
@ -290,7 +300,12 @@ func (s *Seeder) RequestAddresses() {
// GetAddr messages to briefly live trial connections without meaning to. It's // GetAddr messages to briefly live trial connections without meaning to. It's
// meant to be run on a timer that takes longer to fire than it takes to check addresses. // meant to be run on a timer that takes longer to fire than it takes to check addresses.
for i := 0; i < runtime.NumCPU()*4; i++ { var peerCount int32
var wg sync.WaitGroup
wg.Add(crawlerGoroutineCount)
for i := 0; i < crawlerGoroutineCount; i++ {
go func() { go func() {
var na *wire.NetAddress var na *wire.NetAddress
for { for {
@ -300,7 +315,7 @@ func (s *Seeder) RequestAddresses() {
na = next na = next
case <-time.After(crawlerThreadTimeout): case <-time.After(crawlerThreadTimeout):
// Or die if there wasn't one // Or die if there wasn't one
s.logger.Printf("Crawler thread timeout") wg.Done()
return return
} }
@ -345,10 +360,63 @@ func (s *Seeder) RequestAddresses() {
s.DisconnectPeer(potentialPeer) s.DisconnectPeer(potentialPeer)
s.logger.Printf("Successfully learned about %s:%d.", na.IP, na.Port) s.logger.Printf("Successfully learned about %s:%d.", na.IP, na.Port)
atomic.AddInt32(&peerCount, 1)
s.addrBook.Add(potentialPeer) s.addrBook.Add(potentialPeer)
} }
}() }()
} }
wg.Wait()
return int(peerCount)
}
// RefreshAddresses checks to make sure the addresses we think we know are
// still usable and removes them from the address book if they aren't.
// The call blocks until all addresses have been processed. If disconnect is
// true, we immediately disconnect from the peers after verifying them.
func (s *Seeder) RefreshAddresses(disconnect bool) {
refreshQueue := make(chan *wire.NetAddress, 100)
var count sync.WaitGroup
go s.addrBook.enqueueAddrs(refreshQueue, &count)
for i := 0; i < crawlerGoroutineCount; i++ {
go func() {
var na *wire.NetAddress
for {
select {
case next := <-refreshQueue:
// Pull the next address off the queue
na = next
case <-time.After(crawlerThreadTimeout):
// Or die if there wasn't one
return
}
peer := peerKeyFromNA(na)
portString := strconv.Itoa(int(na.Port))
err := s.Connect(na.IP.String(), portString)
if err != nil {
if err != ErrRepeatConnection {
// Blacklist the peer. TODO: We might try to connect again later.
s.logger.Printf("Peer %s:%d unusable on refresh. Error: %s", na.IP, na.Port, err)
s.addrBook.Blacklist(peer)
}
count.Done()
continue
}
if disconnect {
s.DisconnectPeer(peer)
}
count.Done()
}
}()
}
count.Wait()
} }
// WaitForAddresses waits for n addresses to be confirmed and available in the address book. // WaitForAddresses waits for n addresses to be confirmed and available in the address book.
@ -378,6 +446,11 @@ func (s *Seeder) AddressesV6(n int) []net.IP {
return s.addrBook.shuffleAddressList(n, true) return s.addrBook.shuffleAddressList(n, true)
} }
// GetPeerCount returns how many valid peers we know about.
func (s *Seeder) GetPeerCount() int {
return s.addrBook.Count()
}
// testBlacklist adds a peer to the blacklist directly, for testing. // testBlacklist adds a peer to the blacklist directly, for testing.
func (s *Seeder) testBlacklist(pk PeerKey) { func (s *Seeder) testBlacklist(pk PeerKey) {
s.addrBook.Blacklist(pk) s.addrBook.Blacklist(pk)

View File

@ -49,9 +49,14 @@ func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) {
s.logger.Printf("Got %d addrs from peer %s", len(msg.AddrList), p.Addr()) s.logger.Printf("Got %d addrs from peer %s", len(msg.AddrList), p.Addr())
//queue := make(chan *wire.NetAddress, len(msg.AddrList))
for _, na := range msg.AddrList { for _, na := range msg.AddrList {
// By checking if we know them before adding to the queue, we create
// the end condition for the crawler thread: it will time out after
// not processing any new addresses.
if s.addrBook.IsKnown(peerKeyFromNA(na)) {
s.logger.Printf("Already knew about %s:%d", na.IP, na.Port)
continue
}
s.addrQueue <- na s.addrQueue <- na
} }
} }

View File

@ -194,7 +194,7 @@ func TestRequestAddresses(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
regSeeder.RequestAddresses() go regSeeder.RequestAddresses()
err = regSeeder.WaitForAddresses(1, 1*time.Second) err = regSeeder.WaitForAddresses(1, 1*time.Second)
if err != nil { if err != nil {
@ -219,7 +219,7 @@ func TestBlacklist(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
regSeeder.RequestAddresses() go regSeeder.RequestAddresses()
err = regSeeder.WaitForAddresses(1, 1*time.Second) err = regSeeder.WaitForAddresses(1, 1*time.Second)
if err != nil { if err != nil {