bridge: split up guardiand/main.go
This commit is contained in:
parent
bdf164a5b5
commit
090d0aca84
|
@ -2,38 +2,17 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
eth_common "github.com/ethereum/go-ethereum/common"
|
eth_common "github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/golang/protobuf/proto"
|
|
||||||
"github.com/libp2p/go-libp2p-core/protocol"
|
|
||||||
swarm "github.com/libp2p/go-libp2p-swarm"
|
|
||||||
"github.com/multiformats/go-multiaddr"
|
|
||||||
"github.com/prometheus/common/log"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"github.com/certusone/wormhole/bridge/pkg/common"
|
"github.com/certusone/wormhole/bridge/pkg/common"
|
||||||
"github.com/certusone/wormhole/bridge/pkg/ethereum"
|
"github.com/certusone/wormhole/bridge/pkg/ethereum"
|
||||||
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
|
|
||||||
"github.com/certusone/wormhole/bridge/pkg/supervisor"
|
"github.com/certusone/wormhole/bridge/pkg/supervisor"
|
||||||
|
|
||||||
ipfslog "github.com/ipfs/go-log/v2"
|
ipfslog "github.com/ipfs/go-log/v2"
|
||||||
"github.com/libp2p/go-libp2p"
|
|
||||||
connmgr "github.com/libp2p/go-libp2p-connmgr"
|
|
||||||
"github.com/libp2p/go-libp2p-core/crypto"
|
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
|
||||||
"github.com/libp2p/go-libp2p-core/routing"
|
|
||||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
||||||
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
|
|
||||||
libp2ptls "github.com/libp2p/go-libp2p-tls"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -118,251 +97,3 @@ func main() {
|
||||||
select {}
|
select {}
|
||||||
}
|
}
|
||||||
|
|
||||||
func getOrCreateNodeKey(logger *zap.Logger, path string) (crypto.PrivKey, error) {
|
|
||||||
b, err := ioutil.ReadFile(path)
|
|
||||||
if err != nil {
|
|
||||||
if os.IsNotExist(err) {
|
|
||||||
logger.Info("No node key found, generating a new one...", zap.String("path", path))
|
|
||||||
|
|
||||||
// TODO(leo): what does -1 mean?
|
|
||||||
priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
s, err := crypto.MarshalPrivateKey(priv)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ioutil.WriteFile(path, s, 0600)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to write node key: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return priv, nil
|
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("failed to read node key: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
priv, err := crypto.UnmarshalPrivateKey(b)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to unmarshal node key: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("Found existing node key", zap.String("path", path))
|
|
||||||
|
|
||||||
return priv, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME: this hardcodes the private key if we're guardian-0.
|
|
||||||
// Proper fix is to add a debug mode and fetch the remote peer ID,
|
|
||||||
// or add a special bootstrap pod.
|
|
||||||
func bootstrapNodePrivateKeyHack() crypto.PrivKey {
|
|
||||||
hostname, err := os.Hostname()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if hostname == "guardian-0" {
|
|
||||||
// node ID: 12D3KooWQ1sV2kowPY1iJX1hJcVTysZjKv3sfULTGwhdpUGGZ1VF
|
|
||||||
b, err := base64.StdEncoding.DecodeString("CAESQGlv6OJOMXrZZVTCC0cgCv7goXr6QaSVMZIndOIXKNh80vYnG+EutVlZK20Nx9cLkUG5ymKB\n88LXi/vPBwP8zfY=")
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
priv, err := crypto.UnmarshalPrivateKey(b)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return priv
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func p2p(ctx context.Context) error {
|
|
||||||
|
|
||||||
logger := supervisor.Logger(ctx)
|
|
||||||
|
|
||||||
priv := bootstrapNodePrivateKeyHack()
|
|
||||||
|
|
||||||
var err error
|
|
||||||
if priv == nil {
|
|
||||||
priv, err = getOrCreateNodeKey(logger, *nodeKeyPath)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.Info("HACK: loaded hardcoded guardian-0 node key")
|
|
||||||
}
|
|
||||||
|
|
||||||
var idht *dht.IpfsDHT
|
|
||||||
|
|
||||||
h, err := libp2p.New(ctx,
|
|
||||||
// Use the keypair we generated
|
|
||||||
libp2p.Identity(priv),
|
|
||||||
// Multiple listen addresses
|
|
||||||
libp2p.ListenAddrStrings(
|
|
||||||
// Listen on QUIC only.
|
|
||||||
// TODO(leo): listen on ipv6
|
|
||||||
// TODO(leo): is this more or less stable than using both TCP and QUIC transports?
|
|
||||||
// https://github.com/libp2p/go-libp2p/issues/688
|
|
||||||
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", *p2pPort),
|
|
||||||
),
|
|
||||||
|
|
||||||
// Enable TLS security only.
|
|
||||||
libp2p.Security(libp2ptls.ID, libp2ptls.New),
|
|
||||||
|
|
||||||
// Enable QUIC transports.
|
|
||||||
libp2p.Transport(libp2pquic.NewTransport),
|
|
||||||
|
|
||||||
// Enable TCP so we can connect to bootstrap nodes.
|
|
||||||
// (can be disabled if we bootstrap our own network)
|
|
||||||
libp2p.DefaultTransports,
|
|
||||||
|
|
||||||
// Let's prevent our peer from having too many
|
|
||||||
// connections by attaching a connection manager.
|
|
||||||
libp2p.ConnectionManager(connmgr.NewConnManager(
|
|
||||||
100, // Lowwater
|
|
||||||
400, // HighWater,
|
|
||||||
time.Minute, // GracePeriod
|
|
||||||
)),
|
|
||||||
|
|
||||||
// Let this host use the DHT to find other hosts
|
|
||||||
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
|
|
||||||
// TODO(leo): Persistent data store (i.e. address book)
|
|
||||||
idht, err = dht.New(ctx, h, dht.Mode(dht.ModeServer),
|
|
||||||
// TODO(leo): This intentionally makes us incompatible with the global IPFS DHT
|
|
||||||
dht.ProtocolPrefix(protocol.ID("/"+*p2pNetworkID)),
|
|
||||||
)
|
|
||||||
return idht, err
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
defer func() {
|
|
||||||
fmt.Printf("h is %+v", h)
|
|
||||||
// FIXME: why can this be nil? We need to close the host to free the socket because apparently,
|
|
||||||
// closing the context is not enough, but sometimes h is nil when the function runs.
|
|
||||||
if h != nil {
|
|
||||||
h.Close()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("Connecting to bootstrap peers")
|
|
||||||
// TODO(leo): use our own bootstrap peers rather than the IPFS ones so we have a dedicated network
|
|
||||||
//for _, addr := range dht.DefaultBootstrapPeers {
|
|
||||||
// pi, _ := peer.AddrInfoFromP2pAddr(addr)
|
|
||||||
// // We ignore errors as some bootstrap peers may be down and that is fine.
|
|
||||||
// _ = h.Connect(ctx, *pi)
|
|
||||||
//}
|
|
||||||
|
|
||||||
// Add our own bootstrap nodes
|
|
||||||
|
|
||||||
// Count number of successful connection attempts. If we fail to connect to every bootstrap peer, kill
|
|
||||||
// the service and have supervisor retry it.
|
|
||||||
successes := 0
|
|
||||||
|
|
||||||
for _, addr := range strings.Split(*p2pBootstrap, ",") {
|
|
||||||
if addr == "" {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
ma, err := multiaddr.NewMultiaddr(addr)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
pi, err := peer.AddrInfoFromP2pAddr(ma)
|
|
||||||
if err != nil {
|
|
||||||
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = h.Connect(ctx, *pi); err != nil {
|
|
||||||
if err != swarm.ErrDialToSelf {
|
|
||||||
logger.Error("Failed to connect to bootstrap peer", zap.String("peer", addr), zap.Error(err))
|
|
||||||
} else {
|
|
||||||
// Dialing self, carrying on... (we're a bootstrap peer)
|
|
||||||
logger.Info("Tried to connect to ourselves - we're a bootstrap peer")
|
|
||||||
successes += 1
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
successes += 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if successes == 0 {
|
|
||||||
h.Close()
|
|
||||||
return fmt.Errorf("Failed to connect to any bootstrap peer")
|
|
||||||
} else {
|
|
||||||
logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(leo): crash if we couldn't connect to any bootstrap peers?
|
|
||||||
// (i.e. can we get stuck here if the other nodes have yet to come up?)
|
|
||||||
|
|
||||||
topic := fmt.Sprintf("%s/%s", *p2pNetworkID, "broadcast")
|
|
||||||
|
|
||||||
logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
|
|
||||||
ps, err := pubsub.NewGossipSub(ctx, h)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
th, err := ps.Join(topic)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to join topic: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
sub, err := th.Subscribe()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to subscribe topic: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
|
|
||||||
zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
|
|
||||||
|
|
||||||
hostname, err := os.Hostname()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
ctr := int64(0)
|
|
||||||
|
|
||||||
for {
|
|
||||||
msg := gossipv1.Heartbeat{
|
|
||||||
Hostname: hostname,
|
|
||||||
Index: ctr,
|
|
||||||
}
|
|
||||||
|
|
||||||
b, err := proto.Marshal(&msg)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = th.Publish(ctx, b)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("failed to publish message", zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(15 * time.Second)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
supervisor.Signal(ctx, supervisor.SignalHealthy)
|
|
||||||
|
|
||||||
for {
|
|
||||||
msg, err := sub.Next(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to receive pubsub message: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Info("received message", zap.String("data", string(msg.Data)), zap.String("from", msg.GetFrom().String()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/base64"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p-core/crypto"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
func getOrCreateNodeKey(logger *zap.Logger, path string) (crypto.PrivKey, error) {
|
||||||
|
b, err := ioutil.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
logger.Info("No node key found, generating a new one...", zap.String("path", path))
|
||||||
|
|
||||||
|
// TODO(leo): what does -1 mean?
|
||||||
|
priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s, err := crypto.MarshalPrivateKey(priv)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ioutil.WriteFile(path, s, 0600)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to write node key: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return priv, nil
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("failed to read node key: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
priv, err := crypto.UnmarshalPrivateKey(b)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to unmarshal node key: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("Found existing node key", zap.String("path", path))
|
||||||
|
|
||||||
|
return priv, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME: this hardcodes the private key if we're guardian-0.
|
||||||
|
// Proper fix is to add a debug mode and fetch the remote peer ID,
|
||||||
|
// or add a special bootstrap pod.
|
||||||
|
func bootstrapNodePrivateKeyHack() crypto.PrivKey {
|
||||||
|
hostname, err := os.Hostname()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if hostname == "guardian-0" {
|
||||||
|
// node ID: 12D3KooWQ1sV2kowPY1iJX1hJcVTysZjKv3sfULTGwhdpUGGZ1VF
|
||||||
|
b, err := base64.StdEncoding.DecodeString("CAESQGlv6OJOMXrZZVTCC0cgCv7goXr6QaSVMZIndOIXKNh80vYnG+EutVlZK20Nx9cLkUG5ymKB\n88LXi/vPBwP8zfY=")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
priv, err := crypto.UnmarshalPrivateKey(b)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return priv
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,211 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p"
|
||||||
|
connmgr "github.com/libp2p/go-libp2p-connmgr"
|
||||||
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
|
"github.com/libp2p/go-libp2p-core/protocol"
|
||||||
|
"github.com/libp2p/go-libp2p-core/routing"
|
||||||
|
dht "github.com/libp2p/go-libp2p-kad-dht"
|
||||||
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
|
libp2pquic "github.com/libp2p/go-libp2p-quic-transport"
|
||||||
|
swarm "github.com/libp2p/go-libp2p-swarm"
|
||||||
|
libp2ptls "github.com/libp2p/go-libp2p-tls"
|
||||||
|
"github.com/multiformats/go-multiaddr"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
|
||||||
|
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
|
||||||
|
"github.com/certusone/wormhole/bridge/pkg/supervisor"
|
||||||
|
)
|
||||||
|
|
||||||
|
func p2p(ctx context.Context) error {
|
||||||
|
|
||||||
|
logger := supervisor.Logger(ctx)
|
||||||
|
|
||||||
|
priv := bootstrapNodePrivateKeyHack()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
if priv == nil {
|
||||||
|
priv, err = getOrCreateNodeKey(logger, *nodeKeyPath)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.Info("HACK: loaded hardcoded guardian-0 node key")
|
||||||
|
}
|
||||||
|
|
||||||
|
var idht *dht.IpfsDHT
|
||||||
|
|
||||||
|
h, err := libp2p.New(ctx,
|
||||||
|
// Use the keypair we generated
|
||||||
|
libp2p.Identity(priv),
|
||||||
|
// Multiple listen addresses
|
||||||
|
libp2p.ListenAddrStrings(
|
||||||
|
// Listen on QUIC only.
|
||||||
|
// TODO(leo): listen on ipv6
|
||||||
|
// TODO(leo): is this more or less stable than using both TCP and QUIC transports?
|
||||||
|
// https://github.com/libp2p/go-libp2p/issues/688
|
||||||
|
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", *p2pPort),
|
||||||
|
),
|
||||||
|
|
||||||
|
// Enable TLS security only.
|
||||||
|
libp2p.Security(libp2ptls.ID, libp2ptls.New),
|
||||||
|
|
||||||
|
// Enable QUIC transports.
|
||||||
|
libp2p.Transport(libp2pquic.NewTransport),
|
||||||
|
|
||||||
|
// Enable TCP so we can connect to bootstrap nodes.
|
||||||
|
// (can be disabled if we bootstrap our own network)
|
||||||
|
libp2p.DefaultTransports,
|
||||||
|
|
||||||
|
// Let's prevent our peer from having too many
|
||||||
|
// connections by attaching a connection manager.
|
||||||
|
libp2p.ConnectionManager(connmgr.NewConnManager(
|
||||||
|
100, // Lowwater
|
||||||
|
400, // HighWater,
|
||||||
|
time.Minute, // GracePeriod
|
||||||
|
)),
|
||||||
|
|
||||||
|
// Let this host use the DHT to find other hosts
|
||||||
|
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
|
||||||
|
// TODO(leo): Persistent data store (i.e. address book)
|
||||||
|
idht, err = dht.New(ctx, h, dht.Mode(dht.ModeServer),
|
||||||
|
// TODO(leo): This intentionally makes us incompatible with the global IPFS DHT
|
||||||
|
dht.ProtocolPrefix(protocol.ID("/"+*p2pNetworkID)),
|
||||||
|
)
|
||||||
|
return idht, err
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
fmt.Printf("h is %+v", h)
|
||||||
|
// FIXME: why can this be nil? We need to close the host to free the socket because apparently,
|
||||||
|
// closing the context is not enough, but sometimes h is nil when the function runs.
|
||||||
|
if h != nil {
|
||||||
|
h.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("Connecting to bootstrap peers")
|
||||||
|
// TODO(leo): use our own bootstrap peers rather than the IPFS ones so we have a dedicated network
|
||||||
|
//for _, addr := range dht.DefaultBootstrapPeers {
|
||||||
|
// pi, _ := peer.AddrInfoFromP2pAddr(addr)
|
||||||
|
// // We ignore errors as some bootstrap peers may be down and that is fine.
|
||||||
|
// _ = h.Connect(ctx, *pi)
|
||||||
|
//}
|
||||||
|
|
||||||
|
// Add our own bootstrap nodes
|
||||||
|
|
||||||
|
// Count number of successful connection attempts. If we fail to connect to every bootstrap peer, kill
|
||||||
|
// the service and have supervisor retry it.
|
||||||
|
successes := 0
|
||||||
|
|
||||||
|
for _, addr := range strings.Split(*p2pBootstrap, ",") {
|
||||||
|
if addr == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ma, err := multiaddr.NewMultiaddr(addr)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pi, err := peer.AddrInfoFromP2pAddr(ma)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = h.Connect(ctx, *pi); err != nil {
|
||||||
|
if err != swarm.ErrDialToSelf {
|
||||||
|
logger.Error("Failed to connect to bootstrap peer", zap.String("peer", addr), zap.Error(err))
|
||||||
|
} else {
|
||||||
|
// Dialing self, carrying on... (we're a bootstrap peer)
|
||||||
|
logger.Info("Tried to connect to ourselves - we're a bootstrap peer")
|
||||||
|
successes += 1
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
successes += 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if successes == 0 {
|
||||||
|
h.Close()
|
||||||
|
return fmt.Errorf("Failed to connect to any bootstrap peer")
|
||||||
|
} else {
|
||||||
|
logger.Info("Connected to bootstrap peers", zap.Int("num", successes))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(leo): crash if we couldn't connect to any bootstrap peers?
|
||||||
|
// (i.e. can we get stuck here if the other nodes have yet to come up?)
|
||||||
|
|
||||||
|
topic := fmt.Sprintf("%s/%s", *p2pNetworkID, "broadcast")
|
||||||
|
|
||||||
|
logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
|
||||||
|
ps, err := pubsub.NewGossipSub(ctx, h)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
th, err := ps.Join(topic)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to join topic: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sub, err := th.Subscribe()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to subscribe topic: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
|
||||||
|
zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
|
||||||
|
|
||||||
|
hostname, err := os.Hostname()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
ctr := int64(0)
|
||||||
|
|
||||||
|
for {
|
||||||
|
msg := gossipv1.Heartbeat{
|
||||||
|
Hostname: hostname,
|
||||||
|
Index: ctr,
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := proto.Marshal(&msg)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = th.Publish(ctx, b)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warn("failed to publish message", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(15 * time.Second)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
supervisor.Signal(ctx, supervisor.SignalHealthy)
|
||||||
|
|
||||||
|
for {
|
||||||
|
msg, err := sub.Next(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to receive pubsub message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("received message", zap.String("data", string(msg.Data)), zap.String("from", msg.GetFrom().String()))
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue