
366 lines
9.9 KiB

package main
import (
eth_common "github.com/ethereum/go-ethereum/common"
swarm "github.com/libp2p/go-libp2p-swarm"
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
ipfslog "github.com/ipfs/go-log/v2"
connmgr "github.com/libp2p/go-libp2p-connmgr"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
libp2ptls "github.com/libp2p/go-libp2p-tls"
var (
p2pNetworkID = flag.String("network", "/wormhole/dev", "P2P network identifier")
p2pPort = flag.Uint("port", 8999, "P2P UDP listener port")
p2pBootstrap = flag.String("bootstrap", "", "P2P bootstrap peers (comma-separated)")
nodeKeyPath = flag.String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
ethRPC = flag.String("ethRPC", "", "Ethereum RPC URL")
ethContract = flag.String("ethContract", "", "Ethereum bridge contract address")
ethConfirmations = flag.Uint64("ethConfirmations", 5, "Ethereum confirmation count requirement")
logLevel = flag.String("loglevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
func main() {
// Set up logging. The go-log zap wrapper that libp2p uses is compatible with our
// usage of zap in supervisor, which is nice.
lvl, err := ipfslog.LevelFromString(*logLevel)
if err != nil {
fmt.Println("Invalid log level")
// FIXME: add hostname to root logger for cleaner console output in multi-node development.
// The proper way is to change the output format to include the hostname.
hostname, err := os.Hostname()
if err != nil {
// Our root logger.
logger := ipfslog.Logger(fmt.Sprintf("%s-%s", "wormhole", hostname))
// Override the default go-log config, which uses a magic environment variable.
// Mute chatty subsystems.
ipfslog.SetLogLevel("swarm2", "error") // connection errors
// Verify flags
if *nodeKeyPath == "" {
logger.Fatal("Please specify -nodeKey")
if *ethRPC == "" {
logger.Fatal("Please specify -ethRPC")
ethContractAddr := eth_common.HexToAddress(*ethContract)
// Node's main lifecycle context.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Ethereum lock event channel
ec := make(chan *common.ChainLock)
// Run supervisor.
supervisor.New(ctx, logger.Desugar(), func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p); err != nil {
return err
watcher := ethereum.NewEthBridgeWatcher(
*ethRPC, ethContractAddr, *ethConfirmations, ec)
if err := supervisor.Run(ctx, "eth", watcher.Run); err != nil {
return err
supervisor.Signal(ctx, supervisor.SignalHealthy)
logger.Info("Created services")
select {}
}, supervisor.WithPropagatePanic)
// TODO(leo): only propagate panics in debug mode. We currently need this to properly reset p2p
// (it leaks its socket and we need to restart the process to fix it)
select {}
func getOrCreateNodeKey(logger *zap.Logger, path string) (crypto.PrivKey, error) {
b, err := ioutil.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
logger.Info("No node key found, generating a new one...", zap.String("path", path))
// TODO(leo): what does -1 mean?
priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1)
if err != nil {
s, err := crypto.MarshalPrivateKey(priv)
if err != nil {
err = ioutil.WriteFile(path, s, 0600)
if err != nil {
return nil, fmt.Errorf("failed to write node key: %w", err)
return priv, nil
} else {
return nil, fmt.Errorf("failed to read node key: %w", err)
priv, err := crypto.UnmarshalPrivateKey(b)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal node key: %w", err)
logger.Info("Found existing node key", zap.String("path", path))
return priv, nil
// FIXME: this hardcodes the private key if we're guardian-0.
// Proper fix is to add a debug mode and fetch the remote peer ID,
// or add a special bootstrap pod.
func bootstrapNodePrivateKeyHack() crypto.PrivKey {
hostname, err := os.Hostname()
if err != nil {
if hostname == "guardian-0" {
// node ID: 12D3KooWQ1sV2kowPY1iJX1hJcVTysZjKv3sfULTGwhdpUGGZ1VF
b, err := base64.StdEncoding.DecodeString("CAESQGlv6OJOMXrZZVTCC0cgCv7goXr6QaSVMZIndOIXKNh80vYnG+EutVlZK20Nx9cLkUG5ymKB\n88LXi/vPBwP8zfY=")
if err != nil {
priv, err := crypto.UnmarshalPrivateKey(b)
if err != nil {
return priv
return nil
func p2p(ctx context.Context) error {
logger := supervisor.Logger(ctx)
priv := bootstrapNodePrivateKeyHack()
var err error
if priv == nil {
priv, err = getOrCreateNodeKey(logger, *nodeKeyPath)
if err != nil {
} else {
logger.Info("HACK: loaded hardcoded guardian-0 node key")
var idht *dht.IpfsDHT
h, err := libp2p.New(ctx,
// Use the keypair we generated
// Multiple listen addresses
// Listen on QUIC only.
// TODO(leo): listen on ipv6
// TODO(leo): is this more or less stable than using both TCP and QUIC transports?
// https://github.com/libp2p/go-libp2p/issues/688
fmt.Sprintf("/ip4/", *p2pPort),
// Enable TLS security only.
libp2p.Security(libp2ptls.ID, libp2ptls.New),
// Enable QUIC transports.
// Enable TCP so we can connect to bootstrap nodes.
// (can be disabled if we bootstrap our own network)
// Let's prevent our peer from having too many
// connections by attaching a connection manager.
100, // Lowwater
400, // HighWater,
time.Minute, // GracePeriod
// Let this host use the DHT to find other hosts
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
// TODO(leo): Persistent data store (i.e. address book)
idht, err = dht.New(ctx, h, dht.Mode(dht.ModeServer),
// TODO(leo): This intentionally makes us incompatible with the global IPFS DHT
return idht, err
defer func() {
fmt.Printf("h is %+v", h)
// FIXME: why can this be nil? We need to close the host to free the socket because apparently,
// closing the context is not enough, but sometimes h is nil when the function runs.
if h != nil {
if err != nil {
logger.Info("Connecting to bootstrap peers")
// TODO(leo): use our own bootstrap peers rather than the IPFS ones so we have a dedicated network
//for _, addr := range dht.DefaultBootstrapPeers {
// pi, _ := peer.AddrInfoFromP2pAddr(addr)
// // We ignore errors as some bootstrap peers may be down and that is fine.
// _ = h.Connect(ctx, *pi)
// Add our own bootstrap nodes
// Count number of successful connection attempts. If we fail to connect to every bootstrap peer, kill
// the service and have supervisor retry it.
successes := 0
for _, addr := range strings.Split(*p2pBootstrap, ",") {
if addr == "" {
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
pi, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
if err = h.Connect(ctx, *pi); err != nil {
if err != swarm.ErrDialToSelf {
logger.Error("Failed to connect to bootstrap peer", zap.String("peer", addr), zap.Error(err))
} else {
// Dialing self, carrying on... (we're a bootstrap peer)
logger.Info("Tried to connect to ourselves - we're a bootstrap peer")
successes += 1
} else {
successes += 1
if successes == 0 {
return fmt.Errorf("Failed to connect to any bootstrap peer")
} else {
logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
// TODO(leo): crash if we couldn't connect to any bootstrap peers?
// (i.e. can we get stuck here if the other nodes have yet to come up?)
topic := fmt.Sprintf("%s/%s", *p2pNetworkID, "broadcast")
logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
ps, err := pubsub.NewGossipSub(ctx, h)
if err != nil {
th, err := ps.Join(topic)
if err != nil {
return fmt.Errorf("failed to join topic: %w", err)
sub, err := th.Subscribe()
if err != nil {
return fmt.Errorf("failed to subscribe topic: %w", err)
logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
hostname, err := os.Hostname()
if err != nil {
go func() {
ctr := int64(0)
for {
msg := gossipv1.Heartbeat{
Hostname: hostname,
Index: ctr,
b, err := proto.Marshal(&msg)
if err != nil {
err = th.Publish(ctx, b)
if err != nil {
log.Warn("failed to publish message", zap.Error(err))
time.Sleep(15 * time.Second)
for {
msg, err := sub.Next(ctx)
if err != nil {
return fmt.Errorf("failed to receive pubsub message: %w", err)
logger.Info("received message", zap.String("data", string(msg.Data)), zap.String("from", msg.GetFrom().String()))