node/p2p: Cleanup bootstrappers generation

This commit is contained in:
tbjump 2023-07-14 16:32:54 +00:00 committed by tbjump
parent c8fca0f5b9
commit 81b0c2a335
1 changed files with 51 additions and 54 deletions

View File

@ -135,6 +135,47 @@ func DefaultConnectionManager() (*connmgr.BasicConnMgr, error) {
)
}
// bootstrapAddrs takes a comma-separated string of multi-address strings and returns an array of []peer.AddrInfo that does not include `self`.
// if `self` is part of `bootstrapPeers`, return isBootstrapNode=true
func bootstrapAddrs(logger *zap.Logger, bootstrapPeers string, self peer.ID) (bootstrappers []peer.AddrInfo, isBootstrapNode bool) {
bootstrappers = make([]peer.AddrInfo, 0)
for _, addr := range strings.Split(bootstrapPeers, ",") {
if addr == "" {
continue
}
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
continue
}
pi, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
continue
}
if pi.ID == self {
logger.Info("We're a bootstrap node")
isBootstrapNode = true
continue
}
bootstrappers = append(bootstrappers, *pi)
}
return
}
// connectToPeers connects `h` to `peers` and returns the number of successful connections.
func connectToPeers(ctx context.Context, logger *zap.Logger, h host.Host, peers []peer.AddrInfo) (successes int) {
successes = 0
for _, p := range peers {
if err := h.Connect(ctx, p); err != nil {
logger.Error("Failed to connect to bootstrap peer", zap.String("peer", p.String()), zap.Error(err))
} else {
successes += 1
}
}
return successes
}
func Run(
obsvC chan<- *gossipv1.SignedObservation,
obsvReqC chan<- *gossipv1.ObservationRequest,
@ -189,27 +230,9 @@ func Run(
// Let this host use the DHT to find other hosts
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", bootstrapPeers))
bootstrappers := make([]peer.AddrInfo, 0)
for _, addr := range strings.Split(bootstrapPeers, ",") {
if addr == "" {
continue
}
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
continue
}
pi, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
continue
}
if pi.ID == h.ID() {
logger.Info("We're a bootstrap node")
continue
}
bootstrappers = append(bootstrappers, *pi)
}
bootstrappers, _ := bootstrapAddrs(logger, bootstrapPeers, h.ID())
// TODO(leo): Persistent data store (i.e. address book)
idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer),
// This intentionally makes us incompatible with the global IPFS DHT
@ -258,42 +281,16 @@ func Run(
// Make sure we connect to at least 1 bootstrap node (this is particularly important in a local devnet and CI
// as peer discovery can take a long time).
// Count number of successful connection attempts. If we fail to connect to any bootstrap peer, kill the service
// TODO: Currently, returning from this function will lead to rootCtxCancel() being called in the defer() above.
// The service will then be restarted by Tilt/kubernetes
successes := 0
// Are we a bootstrap node? If so, it's okay to not have any peers.
bootstrapNode := false
bootstrappers, bootstrapNode := bootstrapAddrs(logger, bootstrapPeers, h.ID())
successes := connectToPeers(ctx, logger, h, bootstrappers)
for _, addr := range strings.Split(bootstrapPeers, ",") {
if addr == "" {
continue
}
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
continue
}
pi, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
continue
}
if pi.ID == h.ID() {
logger.Info("We're a bootstrap node")
bootstrapNode = true
continue
}
if err = h.Connect(ctx, *pi); err != nil {
logger.Error("Failed to connect to bootstrap peer", zap.String("peer", addr), zap.Error(err))
} else {
successes += 1
}
if bootstrapNode {
logger.Info("We are a bootstrap node.")
}
if successes == 0 && !bootstrapNode {
if successes == 0 && !bootstrapNode { // If we're a bootstrap node it's okay to not have any peers.
// If we fail to connect to any bootstrap peer, kill the service
// returning from this function will lead to rootCtxCancel() being called in the defer() above. The service will then be restarted by Tilt/kubernetes.
return fmt.Errorf("failed to connect to any bootstrap peer")
}
logger.Info("Connected to bootstrap peers", zap.Int("num", successes))