bridge: refactor p2p logic into pkg/p2p

ghstack-source-id: 86417c1301
Pull Request resolved: https://github.com/certusone/wormhole/pull/65
This commit is contained in:
Leo 2020-10-28 22:41:37 +01:00
parent fd27570637
commit 11c74dd692
2 changed files with 54 additions and 46 deletions

View File

@ -10,6 +10,7 @@ import (
"syscall" "syscall"
eth_common "github.com/ethereum/go-ethereum/common" eth_common "github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
@ -17,6 +18,7 @@ import (
"github.com/certusone/wormhole/bridge/pkg/common" "github.com/certusone/wormhole/bridge/pkg/common"
"github.com/certusone/wormhole/bridge/pkg/devnet" "github.com/certusone/wormhole/bridge/pkg/devnet"
"github.com/certusone/wormhole/bridge/pkg/ethereum" "github.com/certusone/wormhole/bridge/pkg/ethereum"
"github.com/certusone/wormhole/bridge/pkg/p2p"
"github.com/certusone/wormhole/bridge/pkg/processor" "github.com/certusone/wormhole/bridge/pkg/processor"
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
solana "github.com/certusone/wormhole/bridge/pkg/solana" solana "github.com/certusone/wormhole/bridge/pkg/solana"
@ -182,11 +184,27 @@ func main() {
// VAAs to submit to Solana // VAAs to submit to Solana
solanaVaaC := make(chan *vaa.VAA) solanaVaaC := make(chan *vaa.VAA)
// Load p2p private key
var priv crypto.PrivKey
if *unsafeDevMode {
idx, err := devnet.GetDevnetIndex()
if err != nil {
logger.Fatal("Failed to parse hostname - are we running in devnet?")
}
priv = devnet.DeterministicP2PPrivKeyByIndex(int64(idx))
} else {
priv, err = getOrCreateNodeKey(logger, *nodeKeyPath)
if err != nil {
logger.Fatal("Failed to load node key", zap.Error(err))
}
}
// Run supervisor. // Run supervisor.
supervisor.New(rootCtx, logger, func(ctx context.Context) error { supervisor.New(rootCtx, logger, func(ctx context.Context) error {
// TODO: use a dependency injection framework like wire? // TODO: use a dependency injection framework like wire?
if err := supervisor.Run(ctx, "p2p", p2p(obsvC, sendC)); err != nil { if err := supervisor.Run(ctx, "p2p", p2p.Run(
obsvC, sendC, priv, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel)); err != nil {
return err return err
} }

View File

@ -1,13 +1,14 @@
package main package p2p
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"strings" "strings"
"time" "time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr" connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/crypto"
@ -21,33 +22,22 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"github.com/certusone/wormhole/bridge/pkg/devnet"
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/bridge/pkg/supervisor" "github.com/certusone/wormhole/bridge/pkg/supervisor"
) )
func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx context.Context) error { func Run(obsvC chan *gossipv1.LockupObservation,
sendC chan []byte,
priv crypto.PrivKey,
port uint,
networkID string,
bootstrapPeers string,
nodeName string,
rootCtxCancel context.CancelFunc) func(ctx context.Context) error {
return func(ctx context.Context) (re error) { return func(ctx context.Context) (re error) {
logger := supervisor.Logger(ctx) logger := supervisor.Logger(ctx)
var priv crypto.PrivKey
var err error
if *unsafeDevMode {
idx, err2 := devnet.GetDevnetIndex()
if err2 != nil {
logger.Fatal("Failed to parse hostname - are we running in devnet?")
}
priv = devnet.DeterministicP2PPrivKeyByIndex(int64(idx))
} else {
priv, err = getOrCreateNodeKey(logger, *nodeKeyPath)
if err != nil {
return fmt.Errorf("failed to load node key: %w", err)
}
}
var idht *dht.IpfsDHT
h, err := libp2p.New(ctx, h, err := libp2p.New(ctx,
// Use the keypair we generated // Use the keypair we generated
libp2p.Identity(priv), libp2p.Identity(priv),
@ -57,8 +47,8 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
// Listen on QUIC only. // Listen on QUIC only.
// TODO(leo): is this more or less stable than using both TCP and QUIC transports? // TODO(leo): is this more or less stable than using both TCP and QUIC transports?
// https://github.com/libp2p/go-libp2p/issues/688 // https://github.com/libp2p/go-libp2p/issues/688
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", *p2pPort), fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port),
fmt.Sprintf("/ip6/::/udp/%d/quic", *p2pPort), fmt.Sprintf("/ip6/::/udp/%d/quic", port),
), ),
// Enable TLS security as the only security protocol. // Enable TLS security as the only security protocol.
@ -78,9 +68,9 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
// Let this host use the DHT to find other hosts // Let this host use the DHT to find other hosts
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
// TODO(leo): Persistent data store (i.e. address book) // TODO(leo): Persistent data store (i.e. address book)
idht, err = dht.New(ctx, h, dht.Mode(dht.ModeServer), idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer),
// TODO(leo): This intentionally makes us incompatible with the global IPFS DHT // TODO(leo): This intentionally makes us incompatible with the global IPFS DHT
dht.ProtocolPrefix(protocol.ID("/"+*p2pNetworkID)), dht.ProtocolPrefix(protocol.ID("/"+networkID)),
) )
return idht, err return idht, err
}), }),
@ -96,9 +86,9 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
rootCtxCancel() rootCtxCancel()
}() }()
logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", *p2pBootstrap)) logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", bootstrapPeers))
topic := fmt.Sprintf("%s/%s", *p2pNetworkID, "broadcast") topic := fmt.Sprintf("%s/%s", networkID, "broadcast")
logger.Info("Subscribing pubsub topic", zap.String("topic", topic)) logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
ps, err := pubsub.NewGossipSub(ctx, h) ps, err := pubsub.NewGossipSub(ctx, h)
@ -122,9 +112,9 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
// the service and have supervisor retry it. // the service and have supervisor retry it.
successes := 0 successes := 0
// Are we a bootstrap node? If so, it's okay to not have any peers. // Are we a bootstrap node? If so, it's okay to not have any peers.
bootstrap_node := false bootstrapNode := false
for _, addr := range strings.Split(*p2pBootstrap, ",") { for _, addr := range strings.Split(bootstrapPeers, ",") {
if addr == "" { if addr == "" {
continue continue
} }
@ -141,7 +131,7 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
if pi.ID == h.ID() { if pi.ID == h.ID() {
logger.Info("We're a bootstrap node") logger.Info("We're a bootstrap node")
bootstrap_node = true bootstrapNode = true
continue continue
} }
@ -153,8 +143,8 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
} }
// TODO: continually reconnect to bootstrap nodes? // TODO: continually reconnect to bootstrap nodes?
if successes == 0 && !bootstrap_node { if successes == 0 && !bootstrapNode {
return fmt.Errorf("Failed to connect to any bootstrap peer") return fmt.Errorf("failed to connect to any bootstrap peer")
} else { } else {
logger.Info("Connected to bootstrap peers", zap.Int("num", successes)) logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
} }
@ -174,7 +164,7 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
case <-tick.C: case <-tick.C:
msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_Heartbeat{ msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_Heartbeat{
Heartbeat: &gossipv1.Heartbeat{ Heartbeat: &gossipv1.Heartbeat{
NodeName: *nodeName, NodeName: nodeName,
Counter: ctr, Counter: ctr,
Timestamp: time.Now().UnixNano(), Timestamp: time.Now().UnixNano(),
}}} }}}
@ -207,21 +197,21 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
}() }()
for { for {
envl, err := sub.Next(ctx) envelope, err := sub.Next(ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to receive pubsub message: %w", err) return fmt.Errorf("failed to receive pubsub message: %w", err)
} }
var msg gossipv1.GossipMessage var msg gossipv1.GossipMessage
err = proto.Unmarshal(envl.Data, &msg) err = proto.Unmarshal(envelope.Data, &msg)
if err != nil { if err != nil {
logger.Info("received invalid message", logger.Info("received invalid message",
zap.String("data", string(envl.Data)), zap.String("data", string(envelope.Data)),
zap.String("from", envl.GetFrom().String())) zap.String("from", envelope.GetFrom().String()))
continue continue
} }
if envl.GetFrom() == h.ID() { if envelope.GetFrom() == h.ID() {
logger.Debug("received message from ourselves, ignoring", logger.Debug("received message from ourselves, ignoring",
zap.Any("payload", msg.Message)) zap.Any("payload", msg.Message))
continue continue
@ -229,21 +219,21 @@ func p2p(obsvC chan *gossipv1.LockupObservation, sendC chan []byte) func(ctx con
logger.Debug("received message", logger.Debug("received message",
zap.Any("payload", msg.Message), zap.Any("payload", msg.Message),
zap.Binary("raw", envl.Data), zap.Binary("raw", envelope.Data),
zap.String("from", envl.GetFrom().String())) zap.String("from", envelope.GetFrom().String()))
switch m := msg.Message.(type) { switch m := msg.Message.(type) {
case *gossipv1.GossipMessage_Heartbeat: case *gossipv1.GossipMessage_Heartbeat:
logger.Info("heartbeat received", logger.Info("heartbeat received",
zap.Any("value", m.Heartbeat), zap.Any("value", m.Heartbeat),
zap.String("from", envl.GetFrom().String())) zap.String("from", envelope.GetFrom().String()))
case *gossipv1.GossipMessage_LockupObservation: case *gossipv1.GossipMessage_LockupObservation:
obsvC <- m.LockupObservation obsvC <- m.LockupObservation
default: default:
logger.Warn("received unknown message type (running outdated software?)", logger.Warn("received unknown message type (running outdated software?)",
zap.Any("payload", msg.Message), zap.Any("payload", msg.Message),
zap.Binary("raw", envl.Data), zap.Binary("raw", envelope.Data),
zap.String("from", envl.GetFrom().String())) zap.String("from", envelope.GetFrom().String()))
} }
} }
} }