wormhole/node/pkg/p2p/p2p.go

688 lines
21 KiB
Go
Raw Normal View History

package p2p
2020-08-17 03:29:52 -07:00
import (
"context"
"crypto/ecdsa"
"errors"
2020-08-17 03:29:52 -07:00
"fmt"
"strings"
"sync"
"time"
"github.com/certusone/wormhole/node/pkg/accountant"
node_common "github.com/certusone/wormhole/node/pkg/common"
Chain governor (#1277) * Rebase * Reload from db on start up Change-Id: I1deac9db28ad1157ea7e0c84af41c35b38497f4e * Console commands Change-Id: Ic242038312b7c837443a2df8823f100b3cdffd77 * Query prices from CoinGecko Change-Id: I9a8c282ba374d32ef7045d11979a27ede3c52827 * Move chain config to separate file Change-Id: I6a790eca765bce1f2caf48134e225df5c4daff15 * More code cleanup Change-Id: Id12affa78cdc2d394d6dab0c53bb7ad06df8007e * Few minor tweaks Change-Id: I6cb511599d669e0b3d716d9f314ac0f26935ee92 * Create separate tests for different packages Change-Id: Idb4da6817c9daad2a7420abc11bdaa702ae056dc * Fix lint errors Change-Id: I137c9e7e4574aee9c9fec22e91e19eee0e86a349 * Simplify chainlock message tests * Add more governor db test coverage * Next batch of review rework Change-Id: Ife54852fca6c6990d1ffb3d60a8dd7f49d526f0a * Still more rework Change-Id: I43a8aa7fa4e1a7cea4d7fde68c963123c1ca8d53 * More rework Change-Id: I9382412af4ffeda74967a834a6b0195a9d28b720 * Fix lint errors Change-Id: Idaafce9b0314192557b7207911375d000bac5ae2 * Add rest and prometheus support Change-Id: Ib870ed7eb305ef1ebbf6a7cedabc665c37c19171 * Add separate configs for testnet and devnet Change-Id: I76b11d8940a8dc9935b3f276a008ed20ef60b850 * Update mainnet tokens to fix decimals Change-Id: Iab018827766bc7748038b7be2f51342afb79b87c * Let small enqueued VAAs go out when big ones can't Change-Id: I7d3ef88d4579702d0c6ff4eaf5a8524799610ff6 * Tweak testnet config parameters Change-Id: Id2c54151a7183ab3fb4af8060929198f6021ba4e * Rework / enhancements from testnet testing Change-Id: I1387b1d22667fa6ffe0bb1832dbc0b31196505d3 * Use known emitter maps Change-Id: If330ee9d30ac3c2d1c6dca674f7777dc759de230 * Fix typo and out of date comments Change-Id: I54a19792104ccc6ca023020303a710ef3ba18f74 Co-authored-by: claudijd <jclaudius@jumptrading.com>
2022-07-19 11:08:06 -07:00
"github.com/certusone/wormhole/node/pkg/governor"
"github.com/certusone/wormhole/node/pkg/version"
"github.com/ethereum/go-ethereum/common"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
2020-08-17 03:29:52 -07:00
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
2020-08-17 03:29:52 -07:00
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
2020-08-17 03:29:52 -07:00
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
2020-08-17 03:29:52 -07:00
)
const DefaultPort = 8999
var (
p2pHeartbeatsSent = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_p2p_heartbeats_sent_total",
Help: "Total number of p2p heartbeats sent",
})
p2pMessagesSent = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_p2p_broadcast_messages_sent_total",
Help: "Total number of p2p pubsub broadcast messages sent",
})
p2pMessagesReceived = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_p2p_broadcast_messages_received_total",
Help: "Total number of p2p pubsub broadcast messages received",
}, []string{"type"})
p2pReceiveChannelOverflow = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_p2p_receive_channel_overflow",
Help: "Total number of p2p received messages dropped due to channel overflow",
}, []string{"type"})
)
var heartbeatMessagePrefix = []byte("heartbeat|")
var signedObservationRequestPrefix = []byte("signed_observation_request|")
// heartbeatMaxTimeDifference specifies the maximum time difference between the local clock and the timestamp in incoming heartbeat messages. Heartbeats that are this old or this much into the future will be dropped. This value should encompass clock skew and network delay.
var heartbeatMaxTimeDifference = time.Minute * 15
func heartbeatDigest(b []byte) common.Hash {
return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...))
}
func signedObservationRequestDigest(b []byte) common.Hash {
return ethcrypto.Keccak256Hash(append(signedObservationRequestPrefix, b...))
}
type Components struct {
// P2PIDInHeartbeat determines if the guardian will put it's libp2p node ID in the authenticated heartbeat payload
P2PIDInHeartbeat bool
ListeningAddressesPatterns []string
// Port on which the Guardian is going to bind
Port uint
// ConnMgr is the ConnectionManager that the Guardian is going to use
ConnMgr *connmgr.BasicConnMgr
// ProtectedHostByGuardianKey is used to ensure that only one p2p peer can be protected by any given known guardian key
ProtectedHostByGuardianKey map[common.Address]peer.ID
// ProtectedHostByGuardianKeyLock is only useful to prevent a race condition in test as ProtectedHostByGuardianKey
// is only accessed by a single routine at any given time in a running Guardian.
ProtectedHostByGuardianKeyLock sync.Mutex
}
func (f *Components) ListeningAddresses() []string {
la := make([]string, 0, len(f.ListeningAddressesPatterns))
for _, pattern := range f.ListeningAddressesPatterns {
la = append(la, fmt.Sprintf(pattern, f.Port))
}
return la
}
func DefaultComponents() *Components {
mgr, err := DefaultConnectionManager()
if err != nil {
panic(err)
}
return &Components{
P2PIDInHeartbeat: true,
ListeningAddressesPatterns: []string{
// Listen on QUIC only.
// https://github.com/libp2p/go-libp2p/issues/688
"/ip4/0.0.0.0/udp/%d/quic",
"/ip6/::/udp/%d/quic",
},
Port: DefaultPort,
ConnMgr: mgr,
ProtectedHostByGuardianKey: make(map[common.Address]peer.ID),
}
}
const LowWaterMarkDefault = 100
const HighWaterMarkDefault = 400
func DefaultConnectionManager() (*connmgr.BasicConnMgr, error) {
return connmgr.NewConnManager(
LowWaterMarkDefault,
HighWaterMarkDefault,
// GracePeriod set to 0 means that new peers are not protected by a grace period
connmgr.WithGracePeriod(0),
)
}
Node: Initial guardiand changes for accounting (#2181) * node: guardiand support for accounting Change-Id: I97fe1f6d6d71a5803881ff4c793e3c30f22b14d8 * Node: Tie accounting into the guardian Change-Id: I31600d18176f516b75b3eb046fd7ac6e54e1b133 * Node: accounting tests and metrics Change-Id: Ieb139772edf464ed1ab202861babeaf0f857ad6b * Node: minor tweak to accounting metrics Change-Id: Iad2b7e34870734f0c5e5d538c0ac86269a9a4728 * Node: load accounting key Change-Id: I228ce23e63b556d751000b97097202eda48650aa * More work in progress Change-Id: I85088d26c05cf02d26043cf6ee8c67efd13f2ea4 * Node: send observations to accounting contract Change-Id: Ib90909c2ee705d5e2a7e6cf3a6ec4ba7519e2eb1 * Node: Fix lint error in accounting tests Change-Id: Id73397cf45107243a9f68ba82bed3ccf2b0299b5 * Node: Need to copy libwasmvm.so Change-Id: I2856c8964ca082f1f4014d6db9fb1b2dc4e64409 * Node: Rename wormchain to wormconn Change-Id: I6782be733ebdd92b908228d3984a906aa4c795f7 * Node: moving accounting check after governor Change-Id: I064c77d30514715c6f8b6b5da50806a5e1adf657 * Node: Add accounting status to heartbeat Change-Id: I0ae3e476386cfaccc5c877ee1351dbe41c0358c7 * Node: start of accounting integration work Change-Id: I8ad206eb7fc07aa9e1a2ebc321f2c490ec36b51e * Node: More broadcast tx stuff Change-Id: Id2cc83df859310c013665eaa9c6ce3033bb1d9c5 * Node: Can actually send a request to accounting Change-Id: I6af5d59c53939f58b2f13ae501914bef260592f2 * Node: More accounting tx broadcast stuff Change-Id: If758e49f8928807e87053320e9330c7208aad490 * Node: config changes for accounting Change-Id: I2803cceb188d04c557a52aa9aa8ba7296da8879f * Node: More accounting changes Change-Id: Id979af0ec6ab8484bc094072f3febf39355351ca * Node/Acct: Use new observation request format * Node/acct: use new contract interface * Node/acct: fix minor copy/paste error * Node: Clean up comments and lint errors * Node: disable accounting in dev by default * Node: Fix test failure * Remove test code * Switch messages to debug, rename Run() * check for "out of gas" * Use worker routine to submit observations * Rename mutex to reflect what it protects * Create handleEvents func * Remove FinalizeObservation * Node/Acct: Trying to use tm library for watcher * Node/acct: switch watcher to use tm library * Node/Acct: Need separate WS parm for accounting * Node/Acct: Fix compile error in tests * Node/Acct: Minor rework * Node: add wormchain as a dep to remove stale code * Node/Acct: GS index is not correct in requests * Node/Acct: Peg connection error metric * Node/Acct: Add wormchain to node docker file * Node/Acct: Fix for double base64 decode * Node/Acct: Change public key to sender address * Node/Acct: Fix lint error * Node/Acct: key pass phrase change * Node/Acct: Pass guardian index in obs req * Node/Acct: No go on submit observation * Node/Acct: Don't double encode tx_hash * Node/Acct: Remove unneeded base64 encoding * Node/Acct: handle submit channel overflow * Node/Acct: Added a TODO to document a review issue * Node/Acct: Fix for checking if channel is full Co-authored-by: Conor Patrick <conorpp94@gmail.com>
2023-01-16 04:33:01 -08:00
func Run(
obsvC chan<- *gossipv1.SignedObservation,
obsvReqC chan<- *gossipv1.ObservationRequest,
obsvReqSendC <-chan *gossipv1.ObservationRequest,
gossipSendC chan []byte,
signedInC chan<- *gossipv1.SignedVAAWithQuorum,
Node: Initial guardiand changes for accounting (#2181) * node: guardiand support for accounting Change-Id: I97fe1f6d6d71a5803881ff4c793e3c30f22b14d8 * Node: Tie accounting into the guardian Change-Id: I31600d18176f516b75b3eb046fd7ac6e54e1b133 * Node: accounting tests and metrics Change-Id: Ieb139772edf464ed1ab202861babeaf0f857ad6b * Node: minor tweak to accounting metrics Change-Id: Iad2b7e34870734f0c5e5d538c0ac86269a9a4728 * Node: load accounting key Change-Id: I228ce23e63b556d751000b97097202eda48650aa * More work in progress Change-Id: I85088d26c05cf02d26043cf6ee8c67efd13f2ea4 * Node: send observations to accounting contract Change-Id: Ib90909c2ee705d5e2a7e6cf3a6ec4ba7519e2eb1 * Node: Fix lint error in accounting tests Change-Id: Id73397cf45107243a9f68ba82bed3ccf2b0299b5 * Node: Need to copy libwasmvm.so Change-Id: I2856c8964ca082f1f4014d6db9fb1b2dc4e64409 * Node: Rename wormchain to wormconn Change-Id: I6782be733ebdd92b908228d3984a906aa4c795f7 * Node: moving accounting check after governor Change-Id: I064c77d30514715c6f8b6b5da50806a5e1adf657 * Node: Add accounting status to heartbeat Change-Id: I0ae3e476386cfaccc5c877ee1351dbe41c0358c7 * Node: start of accounting integration work Change-Id: I8ad206eb7fc07aa9e1a2ebc321f2c490ec36b51e * Node: More broadcast tx stuff Change-Id: Id2cc83df859310c013665eaa9c6ce3033bb1d9c5 * Node: Can actually send a request to accounting Change-Id: I6af5d59c53939f58b2f13ae501914bef260592f2 * Node: More accounting tx broadcast stuff Change-Id: If758e49f8928807e87053320e9330c7208aad490 * Node: config changes for accounting Change-Id: I2803cceb188d04c557a52aa9aa8ba7296da8879f * Node: More accounting changes Change-Id: Id979af0ec6ab8484bc094072f3febf39355351ca * Node/Acct: Use new observation request format * Node/acct: use new contract interface * Node/acct: fix minor copy/paste error * Node: Clean up comments and lint errors * Node: disable accounting in dev by default * Node: Fix test failure * Remove test code * Switch messages to debug, rename Run() * check for "out of gas" * Use worker routine to submit observations * Rename mutex to reflect what it protects * Create handleEvents func * Remove FinalizeObservation * Node/Acct: Trying to use tm library for watcher * Node/acct: switch watcher to use tm library * Node/Acct: Need separate WS parm for accounting * Node/Acct: Fix compile error in tests * Node/Acct: Minor rework * Node: add wormchain as a dep to remove stale code * Node/Acct: GS index is not correct in requests * Node/Acct: Peg connection error metric * Node/Acct: Add wormchain to node docker file * Node/Acct: Fix for double base64 decode * Node/Acct: Change public key to sender address * Node/Acct: Fix lint error * Node/Acct: key pass phrase change * Node/Acct: Pass guardian index in obs req * Node/Acct: No go on submit observation * Node/Acct: Don't double encode tx_hash * Node/Acct: Remove unneeded base64 encoding * Node/Acct: handle submit channel overflow * Node/Acct: Added a TODO to document a review issue * Node/Acct: Fix for checking if channel is full Co-authored-by: Conor Patrick <conorpp94@gmail.com>
2023-01-16 04:33:01 -08:00
priv crypto.PrivKey,
gk *ecdsa.PrivateKey,
gst *node_common.GuardianSetState,
networkID string,
bootstrapPeers string,
nodeName string,
disableHeartbeatVerify bool,
rootCtxCancel context.CancelFunc,
acct *accountant.Accountant,
Node: Initial guardiand changes for accounting (#2181) * node: guardiand support for accounting Change-Id: I97fe1f6d6d71a5803881ff4c793e3c30f22b14d8 * Node: Tie accounting into the guardian Change-Id: I31600d18176f516b75b3eb046fd7ac6e54e1b133 * Node: accounting tests and metrics Change-Id: Ieb139772edf464ed1ab202861babeaf0f857ad6b * Node: minor tweak to accounting metrics Change-Id: Iad2b7e34870734f0c5e5d538c0ac86269a9a4728 * Node: load accounting key Change-Id: I228ce23e63b556d751000b97097202eda48650aa * More work in progress Change-Id: I85088d26c05cf02d26043cf6ee8c67efd13f2ea4 * Node: send observations to accounting contract Change-Id: Ib90909c2ee705d5e2a7e6cf3a6ec4ba7519e2eb1 * Node: Fix lint error in accounting tests Change-Id: Id73397cf45107243a9f68ba82bed3ccf2b0299b5 * Node: Need to copy libwasmvm.so Change-Id: I2856c8964ca082f1f4014d6db9fb1b2dc4e64409 * Node: Rename wormchain to wormconn Change-Id: I6782be733ebdd92b908228d3984a906aa4c795f7 * Node: moving accounting check after governor Change-Id: I064c77d30514715c6f8b6b5da50806a5e1adf657 * Node: Add accounting status to heartbeat Change-Id: I0ae3e476386cfaccc5c877ee1351dbe41c0358c7 * Node: start of accounting integration work Change-Id: I8ad206eb7fc07aa9e1a2ebc321f2c490ec36b51e * Node: More broadcast tx stuff Change-Id: Id2cc83df859310c013665eaa9c6ce3033bb1d9c5 * Node: Can actually send a request to accounting Change-Id: I6af5d59c53939f58b2f13ae501914bef260592f2 * Node: More accounting tx broadcast stuff Change-Id: If758e49f8928807e87053320e9330c7208aad490 * Node: config changes for accounting Change-Id: I2803cceb188d04c557a52aa9aa8ba7296da8879f * Node: More accounting changes Change-Id: Id979af0ec6ab8484bc094072f3febf39355351ca * Node/Acct: Use new observation request format * Node/acct: use new contract interface * Node/acct: fix minor copy/paste error * Node: Clean up comments and lint errors * Node: disable accounting in dev by default * Node: Fix test failure * Remove test code * Switch messages to debug, rename Run() * check for "out of gas" * Use worker routine to submit observations * Rename mutex to reflect what it protects * Create handleEvents func * Remove FinalizeObservation * Node/Acct: Trying to use tm library for watcher * Node/acct: switch watcher to use tm library * Node/Acct: Need separate WS parm for accounting * Node/Acct: Fix compile error in tests * Node/Acct: Minor rework * Node: add wormchain as a dep to remove stale code * Node/Acct: GS index is not correct in requests * Node/Acct: Peg connection error metric * Node/Acct: Add wormchain to node docker file * Node/Acct: Fix for double base64 decode * Node/Acct: Change public key to sender address * Node/Acct: Fix lint error * Node/Acct: key pass phrase change * Node/Acct: Pass guardian index in obs req * Node/Acct: No go on submit observation * Node/Acct: Don't double encode tx_hash * Node/Acct: Remove unneeded base64 encoding * Node/Acct: handle submit channel overflow * Node/Acct: Added a TODO to document a review issue * Node/Acct: Fix for checking if channel is full Co-authored-by: Conor Patrick <conorpp94@gmail.com>
2023-01-16 04:33:01 -08:00
gov *governor.ChainGovernor,
signedGovCfg chan *gossipv1.SignedChainGovernorConfig,
signedGovSt chan *gossipv1.SignedChainGovernorStatus,
components *Components,
ibcFeaturesFunc func() string,
Node: Initial guardiand changes for accounting (#2181) * node: guardiand support for accounting Change-Id: I97fe1f6d6d71a5803881ff4c793e3c30f22b14d8 * Node: Tie accounting into the guardian Change-Id: I31600d18176f516b75b3eb046fd7ac6e54e1b133 * Node: accounting tests and metrics Change-Id: Ieb139772edf464ed1ab202861babeaf0f857ad6b * Node: minor tweak to accounting metrics Change-Id: Iad2b7e34870734f0c5e5d538c0ac86269a9a4728 * Node: load accounting key Change-Id: I228ce23e63b556d751000b97097202eda48650aa * More work in progress Change-Id: I85088d26c05cf02d26043cf6ee8c67efd13f2ea4 * Node: send observations to accounting contract Change-Id: Ib90909c2ee705d5e2a7e6cf3a6ec4ba7519e2eb1 * Node: Fix lint error in accounting tests Change-Id: Id73397cf45107243a9f68ba82bed3ccf2b0299b5 * Node: Need to copy libwasmvm.so Change-Id: I2856c8964ca082f1f4014d6db9fb1b2dc4e64409 * Node: Rename wormchain to wormconn Change-Id: I6782be733ebdd92b908228d3984a906aa4c795f7 * Node: moving accounting check after governor Change-Id: I064c77d30514715c6f8b6b5da50806a5e1adf657 * Node: Add accounting status to heartbeat Change-Id: I0ae3e476386cfaccc5c877ee1351dbe41c0358c7 * Node: start of accounting integration work Change-Id: I8ad206eb7fc07aa9e1a2ebc321f2c490ec36b51e * Node: More broadcast tx stuff Change-Id: Id2cc83df859310c013665eaa9c6ce3033bb1d9c5 * Node: Can actually send a request to accounting Change-Id: I6af5d59c53939f58b2f13ae501914bef260592f2 * Node: More accounting tx broadcast stuff Change-Id: If758e49f8928807e87053320e9330c7208aad490 * Node: config changes for accounting Change-Id: I2803cceb188d04c557a52aa9aa8ba7296da8879f * Node: More accounting changes Change-Id: Id979af0ec6ab8484bc094072f3febf39355351ca * Node/Acct: Use new observation request format * Node/acct: use new contract interface * Node/acct: fix minor copy/paste error * Node: Clean up comments and lint errors * Node: disable accounting in dev by default * Node: Fix test failure * Remove test code * Switch messages to debug, rename Run() * check for "out of gas" * Use worker routine to submit observations * Rename mutex to reflect what it protects * Create handleEvents func * Remove FinalizeObservation * Node/Acct: Trying to use tm library for watcher * Node/acct: switch watcher to use tm library * Node/Acct: Need separate WS parm for accounting * Node/Acct: Fix compile error in tests * Node/Acct: Minor rework * Node: add wormchain as a dep to remove stale code * Node/Acct: GS index is not correct in requests * Node/Acct: Peg connection error metric * Node/Acct: Add wormchain to node docker file * Node/Acct: Fix for double base64 decode * Node/Acct: Change public key to sender address * Node/Acct: Fix lint error * Node/Acct: key pass phrase change * Node/Acct: Pass guardian index in obs req * Node/Acct: No go on submit observation * Node/Acct: Don't double encode tx_hash * Node/Acct: Remove unneeded base64 encoding * Node/Acct: handle submit channel overflow * Node/Acct: Added a TODO to document a review issue * Node/Acct: Fix for checking if channel is full Co-authored-by: Conor Patrick <conorpp94@gmail.com>
2023-01-16 04:33:01 -08:00
) func(ctx context.Context) error {
if components == nil {
components = DefaultComponents()
}
return func(ctx context.Context) (re error) {
p2pReceiveChannelOverflow.WithLabelValues("observation").Add(0)
p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Add(0)
p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Add(0)
logger := supervisor.Logger(ctx)
2020-08-17 09:20:15 -07:00
h, err := libp2p.New(
// Use the keypair we generated
libp2p.Identity(priv),
// Multiple listen addresses
libp2p.ListenAddrStrings(
components.ListeningAddresses()...,
),
// Enable TLS security as the only security protocol.
libp2p.Security(libp2ptls.ID, libp2ptls.New),
// Enable QUIC transport as the only transport.
libp2p.Transport(libp2pquic.NewTransport),
// Let's prevent our peer from having too many
// connections by attaching a connection manager.
libp2p.ConnectionManager(components.ConnMgr),
// Let this host use the DHT to find other hosts
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
logger.Info("Connecting to bootstrap peers", zap.String("bootstrap_peers", bootstrapPeers))
bootstrappers := make([]peer.AddrInfo, 0)
for _, addr := range strings.Split(bootstrapPeers, ",") {
if addr == "" {
continue
}
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
continue
}
pi, err := peer.AddrInfoFromP2pAddr(ma)
if err != nil {
logger.Error("Invalid bootstrap address", zap.String("peer", addr), zap.Error(err))
continue
}
if pi.ID == h.ID() {
logger.Info("We're a bootstrap node")
continue
}
bootstrappers = append(bootstrappers, *pi)
}
// TODO(leo): Persistent data store (i.e. address book)
idht, err := dht.New(ctx, h, dht.Mode(dht.ModeServer),
// This intentionally makes us incompatible with the global IPFS DHT
dht.ProtocolPrefix(protocol.ID("/"+networkID)),
dht.BootstrapPeers(bootstrappers...),
)
return idht, err
}),
)
if err != nil {
panic(err)
}
2020-08-17 03:29:52 -07:00
defer func() {
// TODO: libp2p cannot be cleanly restarted (https://github.com/libp2p/go-libp2p/issues/992)
logger.Error("p2p routine has exited, cancelling root context...", zap.Error(re))
rootCtxCancel()
}()
2020-08-17 03:29:52 -07:00
nodeIdBytes, err := h.ID().Marshal()
if err != nil {
panic(err)
}
topic := fmt.Sprintf("%s/%s", networkID, "broadcast")
logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
ps, err := pubsub.NewGossipSub(ctx, h)
if err != nil {
panic(err)
}
th, err := ps.Join(topic)
if err != nil {
return fmt.Errorf("failed to join topic: %w", err)
}
// Increase the buffer size to prevent failed delivery
// to slower subscribers
sub, err := th.Subscribe(pubsub.WithBufferSize(1024))
if err != nil {
return fmt.Errorf("failed to subscribe topic: %w", err)
}
logger.Info("Node has been started", zap.String("peer_id", h.ID().String()),
zap.String("addrs", fmt.Sprintf("%v", h.Addrs())))
bootTime := time.Now()
// Periodically run guardian state set cleanup.
go func() {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
gst.Cleanup()
case <-ctx.Done():
return
}
}
}()
go func() {
// Disable heartbeat when no node name is provided (spy mode)
if nodeName == "" {
return
}
ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey)
ctr := int64(0)
// Guardians should send out their first heartbeat immediately to speed up test runs.
// But we also want to wait a little bit such that network connections can be established by then.
timer := time.NewTimer(time.Second * 2)
2023-05-24 14:23:42 -07:00
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
2023-05-24 14:23:42 -07:00
case <-timer.C:
timer.Reset(15 * time.Second)
// create a heartbeat
b := func() []byte {
DefaultRegistry.mu.Lock()
defer DefaultRegistry.mu.Unlock()
networks := make([]*gossipv1.Heartbeat_Network, 0, len(DefaultRegistry.networkStats))
for _, v := range DefaultRegistry.networkStats {
errCtr := DefaultRegistry.GetErrorCount(vaa.ChainID(v.Id))
v.ErrorCount = errCtr
networks = append(networks, v)
}
features := make([]string, 0)
if gov != nil {
features = append(features, "governor")
}
if acct != nil {
features = append(features, acct.FeatureString())
}
if ibcFeaturesFunc != nil {
ibcFlags := ibcFeaturesFunc()
if ibcFlags != "" {
features = append(features, ibcFlags)
}
}
heartbeat := &gossipv1.Heartbeat{
NodeName: nodeName,
Counter: ctr,
Timestamp: time.Now().UnixNano(),
Networks: networks,
Version: version.Version(),
GuardianAddr: ourAddr.String(),
BootTimestamp: bootTime.UnixNano(),
Features: features,
}
if components.P2PIDInHeartbeat {
heartbeat.P2PNodeId = nodeIdBytes
}
if err := gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil {
panic(err)
}
collectNodeMetrics(ourAddr, h.ID(), heartbeat)
if gov != nil {
gov.CollectMetrics(heartbeat, gossipSendC, gk, ourAddr)
}
msg := gossipv1.GossipMessage{
Message: &gossipv1.GossipMessage_SignedHeartbeat{
SignedHeartbeat: createSignedHeartbeat(gk, heartbeat),
},
}
b, err := proto.Marshal(&msg)
if err != nil {
panic(err)
}
return b
}()
err = th.Publish(ctx, b)
if err != nil {
logger.Warn("failed to publish heartbeat message", zap.Error(err))
}
p2pHeartbeatsSent.Inc()
ctr += 1
}
}
}()
go func() {
for {
select {
case <-ctx.Done():
return
case msg := <-gossipSendC:
err := th.Publish(ctx, msg)
p2pMessagesSent.Inc()
if err != nil {
logger.Error("failed to publish message from queue", zap.Error(err))
}
case msg := <-obsvReqSendC:
b, err := proto.Marshal(msg)
if err != nil {
panic(err)
}
// Sign the observation request using our node's guardian key.
digest := signedObservationRequestDigest(b)
sig, err := ethcrypto.Sign(digest.Bytes(), gk)
if err != nil {
panic(err)
}
sReq := &gossipv1.SignedObservationRequest{
ObservationRequest: b,
Signature: sig,
GuardianAddr: ethcrypto.PubkeyToAddress(gk.PublicKey).Bytes(),
}
envelope := &gossipv1.GossipMessage{
Message: &gossipv1.GossipMessage_SignedObservationRequest{
SignedObservationRequest: sReq}}
b, err = proto.Marshal(envelope)
if err != nil {
panic(err)
}
// Send to local observation request queue (the loopback message is ignored)
obsvReqC <- msg
err = th.Publish(ctx, b)
p2pMessagesSent.Inc()
if err != nil {
logger.Error("failed to publish observation request", zap.Error(err))
} else {
logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq))
}
}
}
}()
2020-08-17 03:29:52 -07:00
for {
envelope, err := sub.Next(ctx)
2020-08-17 03:29:52 -07:00
if err != nil {
return fmt.Errorf("failed to receive pubsub message: %w", err)
2020-08-17 03:29:52 -07:00
}
var msg gossipv1.GossipMessage
err = proto.Unmarshal(envelope.Data, &msg)
2020-08-17 03:29:52 -07:00
if err != nil {
logger.Info("received invalid message",
zap.Binary("data", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
p2pMessagesReceived.WithLabelValues("invalid").Inc()
continue
2020-08-17 03:29:52 -07:00
}
if envelope.GetFrom() == h.ID() {
logger.Debug("received message from ourselves, ignoring",
zap.Any("payload", msg.Message))
p2pMessagesReceived.WithLabelValues("loopback").Inc()
continue
}
2020-08-17 03:29:52 -07:00
logger.Debug("received message",
zap.Any("payload", msg.Message),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
switch m := msg.Message.(type) {
case *gossipv1.GossipMessage_SignedHeartbeat:
s := m.SignedHeartbeat
gs := gst.Get()
if gs == nil {
// No valid guardian set yet - dropping heartbeat
logger.Debug("skipping heartbeat - no guardian set",
zap.Any("value", s),
zap.String("from", envelope.GetFrom().String()))
break
}
if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, gst, disableHeartbeatVerify); err != nil {
p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc()
logger.Debug("invalid signed heartbeat received",
zap.Error(err),
zap.Any("payload", msg.Message),
zap.Any("value", s),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
} else {
p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc()
logger.Debug("valid signed heartbeat received",
zap.Any("value", heartbeat),
zap.String("from", envelope.GetFrom().String()))
func() {
if len(heartbeat.P2PNodeId) != 0 {
components.ProtectedHostByGuardianKeyLock.Lock()
defer components.ProtectedHostByGuardianKeyLock.Unlock()
var peerId peer.ID
if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil {
logger.Error("p2p_node_id_in_heartbeat_invalid",
zap.Any("payload", msg.Message),
zap.Any("value", s),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
} else {
guardianAddr := common.BytesToAddress(s.GuardianAddr)
prevPeerId, ok := components.ProtectedHostByGuardianKey[guardianAddr]
if ok {
if prevPeerId != peerId {
logger.Info("p2p_guardian_peer_changed",
zap.String("guardian_addr", guardianAddr.String()),
zap.String("prevPeerId", prevPeerId.String()),
zap.String("newPeerId", peerId.String()),
)
components.ConnMgr.Unprotect(prevPeerId, "heartbeat")
components.ConnMgr.Protect(peerId, "heartbeat")
components.ProtectedHostByGuardianKey[guardianAddr] = peerId
}
} else {
components.ConnMgr.Protect(peerId, "heartbeat")
components.ProtectedHostByGuardianKey[guardianAddr] = peerId
}
}
} else {
logger.Debug("p2p_node_id_not_in_heartbeat",
zap.Error(err),
zap.Any("payload", heartbeat.NodeName))
}
}()
}
case *gossipv1.GossipMessage_SignedObservation:
select {
case obsvC <- m.SignedObservation:
p2pMessagesReceived.WithLabelValues("observation").Inc()
default:
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
case *gossipv1.GossipMessage_SignedVaaWithQuorum:
select {
case signedInC <- m.SignedVaaWithQuorum:
p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc()
default:
p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc()
}
case *gossipv1.GossipMessage_SignedObservationRequest:
s := m.SignedObservationRequest
gs := gst.Get()
if gs == nil {
logger.Debug("dropping SignedObservationRequest - no guardian set",
zap.Any("value", s),
zap.String("from", envelope.GetFrom().String()))
break
}
r, err := processSignedObservationRequest(s, gs)
if err != nil {
p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc()
logger.Debug("invalid signed observation request received",
zap.Error(err),
zap.Any("payload", msg.Message),
zap.Any("value", s),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
} else {
2023-07-06 07:33:12 -07:00
logger.Debug("valid signed observation request received",
zap.Any("value", r),
zap.String("from", envelope.GetFrom().String()))
select {
case obsvReqC <- r:
p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc()
default:
p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc()
}
}
case *gossipv1.GossipMessage_SignedChainGovernorConfig:
if signedGovCfg != nil {
signedGovCfg <- m.SignedChainGovernorConfig
}
case *gossipv1.GossipMessage_SignedChainGovernorStatus:
if signedGovSt != nil {
signedGovSt <- m.SignedChainGovernorStatus
}
default:
p2pMessagesReceived.WithLabelValues("unknown").Inc()
logger.Warn("received unknown message type (running outdated software?)",
zap.Any("payload", msg.Message),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
}
2020-08-17 03:29:52 -07:00
}
}
}
func createSignedHeartbeat(gk *ecdsa.PrivateKey, heartbeat *gossipv1.Heartbeat) *gossipv1.SignedHeartbeat {
ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey)
b, err := proto.Marshal(heartbeat)
if err != nil {
panic(err)
}
// Sign the heartbeat using our node's guardian key.
digest := heartbeatDigest(b)
sig, err := ethcrypto.Sign(digest.Bytes(), gk)
if err != nil {
panic(err)
}
return &gossipv1.SignedHeartbeat{
Heartbeat: b,
Signature: sig,
GuardianAddr: ourAddr.Bytes(),
}
}
func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *node_common.GuardianSet, gst *node_common.GuardianSetState, disableVerify bool) (*gossipv1.Heartbeat, error) {
envelopeAddr := common.BytesToAddress(s.GuardianAddr)
idx, ok := gs.KeyIndex(envelopeAddr)
var pk common.Address
if !ok {
if !disableVerify {
return nil, fmt.Errorf("invalid message: %s not in guardian set", envelopeAddr)
}
} else {
pk = gs.Keys[idx]
}
digest := heartbeatDigest(s.Heartbeat)
// SECURITY: see whitepapers/0009_guardian_key.md
if len(heartbeatMessagePrefix)+len(s.Heartbeat) < 34 {
return nil, fmt.Errorf("invalid message: too short")
}
pubKey, err := ethcrypto.Ecrecover(digest.Bytes(), s.Signature)
if err != nil {
return nil, errors.New("failed to recover public key")
}
signerAddr := common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:])
if pk != signerAddr && !disableVerify {
return nil, fmt.Errorf("invalid signer: %v", signerAddr)
}
var h gossipv1.Heartbeat
err = proto.Unmarshal(s.Heartbeat, &h)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal heartbeat: %w", err)
}
if time.Until(time.Unix(0, h.Timestamp)).Abs() > heartbeatMaxTimeDifference {
return nil, fmt.Errorf("heartbeat is too old or too far into the future")
}
if h.GuardianAddr != signerAddr.String() {
return nil, fmt.Errorf("GuardianAddr in heartbeat does not match signerAddr")
}
// Store verified heartbeat in global guardian set state.
if err := gst.SetHeartbeat(signerAddr, from, &h); err != nil {
return nil, fmt.Errorf("failed to store in guardian set state: %w", err)
}
collectNodeMetrics(signerAddr, from, &h)
return &h, nil
}
func processSignedObservationRequest(s *gossipv1.SignedObservationRequest, gs *node_common.GuardianSet) (*gossipv1.ObservationRequest, error) {
envelopeAddr := common.BytesToAddress(s.GuardianAddr)
idx, ok := gs.KeyIndex(envelopeAddr)
var pk common.Address
if !ok {
return nil, fmt.Errorf("invalid message: %s not in guardian set", envelopeAddr)
} else {
pk = gs.Keys[idx]
}
// SECURITY: see whitepapers/0009_guardian_key.md
if len(signedObservationRequestPrefix)+len(s.ObservationRequest) < 34 {
return nil, fmt.Errorf("invalid observation request: too short")
}
digest := signedObservationRequestDigest(s.ObservationRequest)
pubKey, err := ethcrypto.Ecrecover(digest.Bytes(), s.Signature)
if err != nil {
return nil, errors.New("failed to recover public key")
}
signerAddr := common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:])
if pk != signerAddr {
return nil, fmt.Errorf("invalid signer: %v", signerAddr)
}
var h gossipv1.ObservationRequest
err = proto.Unmarshal(s.ObservationRequest, &h)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal observation request: %w", err)
}
// TODO: implement per-guardian rate limiting
return &h, nil
}