zcash: move goroutine spawn to caller rather than callback

This commit is contained in:
George Tankersley 2020-05-20 19:10:55 -04:00
parent e22a55c194
commit f1e7d75e28
2 changed files with 76 additions and 68 deletions

View File

@ -4,9 +4,12 @@ import (
"log"
"net"
"os"
"runtime"
"strconv"
"sync"
"time"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
@ -51,6 +54,9 @@ type Seeder struct {
// The set of known addresses
addrBook *AddressBook
// The queue of incoming potential addresses
addrQueue chan *wire.NetAddress
}
func NewSeeder(network network.Network) (*Seeder, error) {
@ -68,6 +74,7 @@ func NewSeeder(network network.Network) (*Seeder, error) {
pendingPeers: NewPeerMap(),
livePeers: NewPeerMap(),
addrBook: NewAddressBook(),
addrQueue: make(chan *wire.NetAddress),
}
newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck
@ -96,6 +103,7 @@ func newTestSeeder(network network.Network) (*Seeder, error) {
pendingPeers: NewPeerMap(),
livePeers: NewPeerMap(),
addrBook: NewAddressBook(),
addrQueue: make(chan *wire.NetAddress),
}
newSeeder.config.Listeners.OnVerAck = newSeeder.onVerAck
@ -260,13 +268,78 @@ func (s *Seeder) DisconnectAllPeers() {
})
}
// RequestAddresses sends a request for more addresses to every peer we're connected to.
// RequestAddresses sends a request for more addresses to every peer we're connected to,
// then checks to make sure the addresses that come back are usable before adding them to
// the address book.
func (s *Seeder) RequestAddresses() {
s.livePeers.Range(func(key PeerKey, p *peer.Peer) bool {
s.logger.Printf("Requesting addresses from peer %s", p.Addr())
p.QueueMessage(wire.NewMsgGetAddr(), nil)
return true
})
// There's a sync concern: if this is called repeatedly you could end up broadcasting
// GetAddr messages to briefly live trial connections without meaning to. It's
// meant to be run on a timer that takes longer to fire than it takes to check addresses.
for i := 0; i < runtime.NumCPU()*2; i++ {
go func() {
var na *wire.NetAddress
for {
select {
case next := <-s.addrQueue:
// Pull the next address off the queue
na = next
case <-time.After(1 * time.Second):
// Or die if there wasn't one
return
}
// Note that AllowSelfConns is only exposed in a fork of btcd
// pending https://github.com/btcsuite/btcd/pull/1481, which
// is why the module `replace`s btcd.
if !addrmgr.IsRoutable(na) && !s.config.AllowSelfConns {
s.logger.Printf("Got bad addr %s:%d from peer %s", na.IP, na.Port, "<placeholder>")
// TODO blacklist peers who give us crap addresses
//s.DisconnectAndBlacklist(peerKeyFromPeer(p))
continue
}
potentialPeer := peerKeyFromNA(na)
if s.addrBook.IsKnown(potentialPeer) {
s.logger.Printf("Already knew about %s:%d", na.IP, na.Port)
continue
}
if s.addrBook.IsBlacklisted(potentialPeer) {
s.logger.Printf("Previously blacklisted %s:%d", na.IP, na.Port)
continue
}
portString := strconv.Itoa(int(na.Port))
err := s.Connect(na.IP.String(), portString)
if err != nil {
if err == ErrRepeatConnection {
//s.logger.Printf("Got duplicate peer %s:%d.", na.IP, na.Port)
continue
}
// Blacklist the potential peer. We might try to connect again later,
// since we assume IsRoutable filtered out the truly wrong ones.
s.logger.Printf("Got unusable peer %s:%d. Error: %s", na.IP, na.Port, err)
s.addrBook.Blacklist(potentialPeer)
continue
}
s.DisconnectPeer(potentialPeer)
s.logger.Printf("Successfully learned about %s:%d.", na.IP, na.Port)
s.addrBook.Add(potentialPeer)
}
}()
}
}
// WaitForAddresses waits for n addresses to be confirmed and available in the address book.
@ -285,5 +358,3 @@ func (s *Seeder) WaitForAddresses(n int, timeout time.Duration) error {
func (s *Seeder) Ready() bool {
return s.WaitForAddresses(minimumReadyAddresses, 1*time.Millisecond) == nil
}
//func (s *Seeder) Addresses(count int) []

View File

@ -1,11 +1,6 @@
package zcash
import (
"runtime"
"strconv"
"time"
"github.com/btcsuite/btcd/addrmgr"
"github.com/btcsuite/btcd/peer"
"github.com/btcsuite/btcd/wire"
)
@ -54,67 +49,9 @@ func (s *Seeder) onAddr(p *peer.Peer, msg *wire.MsgAddr) {
s.logger.Printf("Got %d addrs from peer %s", len(msg.AddrList), p.Addr())
queue := make(chan *wire.NetAddress, len(msg.AddrList))
//queue := make(chan *wire.NetAddress, len(msg.AddrList))
for _, na := range msg.AddrList {
queue <- na
}
for i := 0; i < runtime.NumCPU()*2; i++ {
go func() {
var na *wire.NetAddress
for {
select {
case next := <-queue:
// Pull the next address off the queue
na = next
case <-time.After(1 * time.Second):
// Or die if there wasn't one
return
}
// Note that AllowSelfConns is only exposed in a fork of btcd
// pending https://github.com/btcsuite/btcd/pull/1481, which
// is why the module `replace`s btcd.
if !addrmgr.IsRoutable(na) && !s.config.AllowSelfConns {
s.logger.Printf("Got bad addr %s:%d from peer %s", na.IP, na.Port, p.Addr())
s.DisconnectAndBlacklist(peerKeyFromPeer(p))
continue
}
potentialPeer := peerKeyFromNA(na)
if s.addrBook.IsKnown(potentialPeer) {
s.logger.Printf("Already knew about %s:%d", na.IP, na.Port)
continue
}
if s.addrBook.IsBlacklisted(potentialPeer) {
s.logger.Printf("Previously blacklisted %s:%d", na.IP, na.Port)
continue
}
portString := strconv.Itoa(int(na.Port))
err := s.Connect(na.IP.String(), portString)
if err != nil {
if err == ErrRepeatConnection {
s.logger.Printf("Got duplicate peer %s:%d from peer %s. Error: %s", na.IP, na.Port, p.Addr(), err)
continue
}
// Blacklist the potential peer. We might try to connect again later,
// since we assume IsRoutable filtered out the truly wrong ones.
s.logger.Printf("Got unusable peer %s:%d from peer %s. Error: %s", na.IP, na.Port, p.Addr(), err)
s.addrBook.Blacklist(potentialPeer)
continue
}
s.DisconnectPeer(potentialPeer)
s.logger.Printf("Successfully learned about %s:%d from %s.", na.IP, na.Port, p.Addr())
s.addrBook.Add(potentialPeer)
}
}()
s.addrQueue <- na
}
}