diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index dc5b7220f..efa749ec8 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -135,6 +135,47 @@ func DefaultConnectionManager() (*connmgr.BasicConnMgr, error) { ) } +// bootstrapAddrs takes a comma-separated string of multi-address strings and returns an array of []peer.AddrInfo that does not include `self`. +// if `self` is part of `bootstrapPeers`, return isBootstrapNode=true +func bootstrapAddrs(logger *zap.Logger, bootstrapPeers string, self peer.ID) (bootstrappers []peer.AddrInfo, isBootstrapNode bool) { + bootstrappers = make([]peer.AddrInfo, 0) + for _, addr := range strings.Split(bootstrapPeers, ",") { + if addr == "" { + continue + } + ma, err := multiaddr.NewMultiaddr(addr) + if err != nil { + logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err)) + continue + } + pi, err := peer.AddrInfoFromP2pAddr(ma) + if err != nil { + logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err)) + continue + } + if pi.ID == self { + logger.Info("We're a bootstrap node") + isBootstrapNode = true + continue + } + bootstrappers = append(bootstrappers, *pi) + } + return +} + +// connectToPeers connects `h` to `peers` and returns the number of successful connections. +func connectToPeers(ctx context.Context, logger *zap.Logger, h host.Host, peers []peer.AddrInfo) (successes int) { + successes = 0 + for _, p := range peers { + if err := h.Connect(ctx, p); err != nil { + logger.Error("Failed to connect to bootstrap peer", zap.String("peer", p.String()), zap.Error(err)) + } else { + successes += 1 + } + } + return successes +} + func Run( obsvC chan<- *gossipv1.SignedObservation, obsvReqC chan<- *gossipv1.ObservationRequest, @@ -189,27 +230,9 @@ func Run( // Let this host use the DHT to find other hosts libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", bootstrapPeers)) - bootstrappers := make([]peer.AddrInfo, 0) - for _, addr := range strings.Split(bootstrapPeers, ",") { - if addr == "" { - continue - } - ma, err := multiaddr.NewMultiaddr(addr) - if err != nil { - logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err)) - continue - } - pi, err := peer.AddrInfoFromP2pAddr(ma) - if err != nil { - logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err)) - continue - } - if pi.ID == h.ID() { - logger.Info("We're a bootstrap node") - continue - } - bootstrappers = append(bootstrappers, *pi) - } + + bootstrappers, _ := bootstrapAddrs(logger, bootstrapPeers, h.ID()) + // TODO(leo): Persistent data store (i.e. address book) idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer), // This intentionally makes us incompatible with the global IPFS DHT @@ -258,42 +281,16 @@ func Run( // Make sure we connect to at least 1 bootstrap node (this is particularly important in a local devnet and CI // as peer discovery can take a long time). - // Count number of successful connection attempts. If we fail to connect to any bootstrap peer, kill the service - // TODO: Currently, returning from this function will lead to rootCtxCancel() being called in the defer() above. - // The service will then be restarted by Tilt/kubernetes - successes := 0 - // Are we a bootstrap node? If so, it's okay to not have any peers. - bootstrapNode := false + bootstrappers, bootstrapNode := bootstrapAddrs(logger, bootstrapPeers, h.ID()) + successes := connectToPeers(ctx, logger, h, bootstrappers) - for _, addr := range strings.Split(bootstrapPeers, ",") { - if addr == "" { - continue - } - ma, err := multiaddr.NewMultiaddr(addr) - if err != nil { - logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err)) - continue - } - pi, err := peer.AddrInfoFromP2pAddr(ma) - if err != nil { - logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err)) - continue - } - - if pi.ID == h.ID() { - logger.Info("We're a bootstrap node") - bootstrapNode = true - continue - } - - if err = h.Connect(ctx, *pi); err != nil { - logger.Error("Failed to connect to bootstrap peer", zap.String("peer", addr), zap.Error(err)) - } else { - successes += 1 - } + if bootstrapNode { + logger.Info("We are a bootstrap node.") } - if successes == 0 && !bootstrapNode { + if successes == 0 && !bootstrapNode { // If we're a bootstrap node it's okay to not have any peers. + // If we fail to connect to any bootstrap peer, kill the service + // returning from this function will lead to rootCtxCancel() being called in the defer() above. The service will then be restarted by Tilt/kubernetes. return fmt.Errorf("failed to connect to any bootstrap peer") } logger.Info("Connected to bootstrap peers", zap.Int("num", successes))