From 11c74dd692c176d4400eb12e3a91fd734d91bb8d Mon Sep 17 00:00:00 2001 From: Leo Date: Wed, 28 Oct 2020 22:41:37 +0100 Subject: [PATCH] bridge: refactor p2p logic into pkg/p2p ghstack-source-id: 86417c130111ef8e7ec9f4bb5bb97729adf23cbf Pull Request resolved: https://github.com/certusone/wormhole/pull/65 --- bridge/cmd/guardiand/main.go | 20 +++++- bridge/{cmd/guardiand => pkg/p2p}/p2p.go | 80 +++++++++++------------- 2 files changed, 54 insertions(+), 46 deletions(-) rename bridge/{cmd/guardiand => pkg/p2p}/p2p.go (77%) diff --git a/bridge/cmd/guardiand/main.go b/bridge/cmd/guardiand/main.go index c18a1e313..c9894d981 100644 --- a/bridge/cmd/guardiand/main.go +++ b/bridge/cmd/guardiand/main.go @@ -10,6 +10,7 @@ import ( "syscall" eth_common "github.com/ethereum/go-ethereum/common" + "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" "go.uber.org/zap" "golang.org/x/sys/unix" @@ -17,6 +18,7 @@ import ( "github.com/certusone/wormhole/bridge/pkg/common" "github.com/certusone/wormhole/bridge/pkg/devnet" "github.com/certusone/wormhole/bridge/pkg/ethereum" + "github.com/certusone/wormhole/bridge/pkg/p2p" "github.com/certusone/wormhole/bridge/pkg/processor" gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" solana "github.com/certusone/wormhole/bridge/pkg/solana" @@ -182,11 +184,27 @@ func main() { // VAAs to submit to Solana solanaVaaC := make(chan *vaa.VAA) + // Load p2p private key + var priv crypto.PrivKey + if *unsafeDevMode { + idx, err := devnet.GetDevnetIndex() + if err != nil { + logger.Fatal("Failed to parse hostname - are we running in devnet?") + } + priv = devnet.DeterministicP2PPrivKeyByIndex(int64(idx)) + } else { + priv, err = getOrCreateNodeKey(logger, *nodeKeyPath) + if err != nil { + logger.Fatal("Failed to load node key", zap.Error(err)) + } + } + // Run supervisor. supervisor.New(rootCtx, logger, func(ctx context.Context) error { // TODO: use a dependency injection framework like wire? - if err := supervisor.Run(ctx, "p2p", p2p(obsvC, sendC)); err != nil { + if err := supervisor.Run(ctx, "p2p", p2p.Run( + obsvC, sendC, priv, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel)); err != nil { return err } diff --git a/bridge/cmd/guardiand/p2p.go b/bridge/pkg/p2p/p2p.go similarity index 77% rename from bridge/cmd/guardiand/p2p.go rename to bridge/pkg/p2p/p2p.go index c5f8c7d1d..854de7ee9 100644 --- a/bridge/cmd/guardiand/p2p.go +++ b/bridge/pkg/p2p/p2p.go @@ -1,13 +1,14 @@ -package main +package p2p import ( "context" "fmt" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/multiformats/go-multiaddr" "strings" "time" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/libp2p/go-libp2p" connmgr "github.com/libp2p/go-libp2p-connmgr" "github.com/libp2p/go-libp2p-core/crypto" @@ -21,33 +22,22 @@ import ( "go.uber.org/zap" "google.golang.org/protobuf/proto" - "github.com/certusone/wormhole/bridge/pkg/devnet" gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" "github.com/certusone/wormhole/bridge/pkg/supervisor" ) -func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx context.Context) error { +func Run(obsvC chan *gossipv1.LockupObservation, + sendC chan []byte, + priv crypto.PrivKey, + port uint, + networkID string, + bootstrapPeers string, + nodeName string, + rootCtxCancel context.CancelFunc) func(ctx context.Context) error { + return func(ctx context.Context) (re error) { logger := supervisor.Logger(ctx) - var priv crypto.PrivKey - var err error - - if *unsafeDevMode { - idx, err2 := devnet.GetDevnetIndex() - if err2 != nil { - logger.Fatal("Failed to parse hostname - are we running in devnet?") - } - priv = devnet.DeterministicP2PPrivKeyByIndex(int64(idx)) - } else { - priv, err = getOrCreateNodeKey(logger, *nodeKeyPath) - if err != nil { - return fmt.Errorf("failed to load node key: %w", err) - } - } - - var idht *dht.IpfsDHT - h, err := libp2p.New(ctx, // Use the keypair we generated libp2p.Identity(priv), @@ -57,8 +47,8 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con // Listen on QUIC only. // TODO(leo): is this more or less stable than using both TCP and QUIC transports? // https://github.com/libp2p/go-libp2p/issues/688 - fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", *p2pPort), - fmt.Sprintf("/ip6/::/udp/%d/quic", *p2pPort), + fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port), + fmt.Sprintf("/ip6/::/udp/%d/quic", port), ), // Enable TLS security as the only security protocol. @@ -78,9 +68,9 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con // Let this host use the DHT to find other hosts libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { // TODO(leo): Persistent data store (i.e. address book) - idht, err = dht.New(ctx, h, dht.Mode(dht.ModeServer), + idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer), // TODO(leo): This intentionally makes us incompatible with the global IPFS DHT - dht.ProtocolPrefix(protocol.ID("/"+*p2pNetworkID)), + dht.ProtocolPrefix(protocol.ID("/"+networkID)), ) return idht, err }), @@ -96,9 +86,9 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con rootCtxCancel() }() - logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", *p2pBootstrap)) + logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", bootstrapPeers)) - topic := fmt.Sprintf("%s/%s", *p2pNetworkID, "broadcast") + topic := fmt.Sprintf("%s/%s", networkID, "broadcast") logger.Info("Subscribing pubsub topic", zap.String("topic", topic)) ps, err := pubsub.NewGossipSub(ctx, h) @@ -122,9 +112,9 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con // the service and have supervisor retry it. successes := 0 // Are we a bootstrap node? If so, it's okay to not have any peers. - bootstrap_node := false + bootstrapNode := false - for _, addr := range strings.Split(*p2pBootstrap, ",") { + for _, addr := range strings.Split(bootstrapPeers, ",") { if addr == "" { continue } @@ -141,7 +131,7 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con if pi.ID == h.ID() { logger.Info("We're a bootstrap node") - bootstrap_node = true + bootstrapNode = true continue } @@ -153,8 +143,8 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con } // TODO: continually reconnect to bootstrap nodes? - if successes == 0 && !bootstrap_node { - return fmt.Errorf("Failed to connect to any bootstrap peer") + if successes == 0 && !bootstrapNode { + return fmt.Errorf("failed to connect to any bootstrap peer") } else { logger.Info("Connected to bootstrap peers", zap.Int("num", successes)) } @@ -174,7 +164,7 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con case <-tick.C: msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_Heartbeat{ Heartbeat: &gossipv1.Heartbeat{ - NodeName: *nodeName, + NodeName: nodeName, Counter: ctr, Timestamp: time.Now().UnixNano(), }}} @@ -207,21 +197,21 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con }() for { - envl, err := sub.Next(ctx) + envelope, err := sub.Next(ctx) if err != nil { return fmt.Errorf("failed to receive pubsub message: %w", err) } var msg gossipv1.GossipMessage - err = proto.Unmarshal(envl.Data, &msg) + err = proto.Unmarshal(envelope.Data, &msg) if err != nil { logger.Info("received invalid message", - zap.String("data", string(envl.Data)), - zap.String("from", envl.GetFrom().String())) + zap.String("data", string(envelope.Data)), + zap.String("from", envelope.GetFrom().String())) continue } - if envl.GetFrom() == h.ID() { + if envelope.GetFrom() == h.ID() { logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message)) continue @@ -229,21 +219,21 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con logger.Debug("received message", zap.Any("payload", msg.Message), - zap.Binary("raw", envl.Data), - zap.String("from", envl.GetFrom().String())) + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) switch m := msg.Message.(type) { case *gossipv1.GossipMessage_Heartbeat: logger.Info("heartbeat received", zap.Any("value", m.Heartbeat), - zap.String("from", envl.GetFrom().String())) + zap.String("from", envelope.GetFrom().String())) case *gossipv1.GossipMessage_LockupObservation: obsvC <- m.LockupObservation default: logger.Warn("received unknown message type (running outdated software?)", zap.Any("payload", msg.Message), - zap.Binary("raw", envl.Data), - zap.String("from", envl.GetFrom().String())) + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) } } }