2020-10-28 14:41:37 -07:00
package p2p
2020-08-17 03:29:52 -07:00
import (
"context"
2021-07-31 05:36:57 -07:00
"crypto/ecdsa"
2021-07-31 09:51:38 -07:00
"errors"
2020-08-17 03:29:52 -07:00
"fmt"
2022-06-21 12:18:16 -07:00
"strings"
"time"
2023-01-17 05:30:50 -08:00
"github.com/certusone/wormhole/node/pkg/accountant"
2021-08-30 07:19:00 -07:00
node_common "github.com/certusone/wormhole/node/pkg/common"
2022-07-19 11:08:06 -07:00
"github.com/certusone/wormhole/node/pkg/governor"
2021-08-26 01:35:09 -07:00
"github.com/certusone/wormhole/node/pkg/version"
2021-07-31 05:36:57 -07:00
"github.com/ethereum/go-ethereum/common"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
2021-01-26 16:16:37 -08:00
"github.com/prometheus/client_golang/prometheus"
2021-07-23 07:06:35 -07:00
"github.com/prometheus/client_golang/prometheus/promauto"
2022-08-18 01:52:36 -07:00
"github.com/wormhole-foundation/wormhole/sdk/vaa"
2020-08-17 03:29:52 -07:00
2022-09-05 20:36:58 -07:00
"github.com/libp2p/go-libp2p/core/peer"
2020-10-28 14:41:37 -07:00
"github.com/multiformats/go-multiaddr"
2020-08-17 03:29:52 -07:00
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
2022-09-05 20:36:58 -07:00
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
2020-08-17 03:29:52 -07:00
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
2021-08-26 01:35:09 -07:00
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
2020-08-17 03:29:52 -07:00
)
2021-01-26 16:16:37 -08:00
var (
2021-07-23 07:06:35 -07:00
p2pHeartbeatsSent = promauto . NewCounter (
2021-01-26 16:16:37 -08:00
prometheus . CounterOpts {
Name : "wormhole_p2p_heartbeats_sent_total" ,
Help : "Total number of p2p heartbeats sent" ,
} )
2021-07-23 07:06:35 -07:00
p2pMessagesSent = promauto . NewCounter (
2021-01-26 16:16:37 -08:00
prometheus . CounterOpts {
Name : "wormhole_p2p_broadcast_messages_sent_total" ,
Help : "Total number of p2p pubsub broadcast messages sent" ,
} )
2021-07-23 07:06:35 -07:00
p2pMessagesReceived = promauto . NewCounterVec (
2021-01-26 16:16:37 -08:00
prometheus . CounterOpts {
Name : "wormhole_p2p_broadcast_messages_received_total" ,
Help : "Total number of p2p pubsub broadcast messages received" ,
} , [ ] string { "type" } )
2023-02-13 07:11:17 -08:00
p2pReceiveChannelOverflow = promauto . NewCounterVec (
prometheus . CounterOpts {
Name : "wormhole_p2p_receive_channel_overflow" ,
Help : "Total number of p2p received messages dropped due to channel overflow" ,
} , [ ] string { "type" } )
2021-01-26 16:16:37 -08:00
)
2021-07-31 05:36:57 -07:00
var heartbeatMessagePrefix = [ ] byte ( "heartbeat|" )
2021-12-20 13:23:45 -08:00
var signedObservationRequestPrefix = [ ] byte ( "signed_observation_request|" )
2023-02-03 17:26:07 -08:00
// heartbeatMaxTimeDifference specifies the maximum time difference between the local clock and the timestamp in incoming heartbeat messages. Heartbeats that are this old or this much into the future will be dropped. This value should encompass clock skew and network delay.
var heartbeatMaxTimeDifference = time . Minute * 15
2021-07-31 05:36:57 -07:00
func heartbeatDigest ( b [ ] byte ) common . Hash {
return ethcrypto . Keccak256Hash ( append ( heartbeatMessagePrefix , b ... ) )
}
2021-12-20 13:23:45 -08:00
func signedObservationRequestDigest ( b [ ] byte ) common . Hash {
return ethcrypto . Keccak256Hash ( append ( signedObservationRequestPrefix , b ... ) )
}
2023-01-16 04:33:01 -08:00
func Run (
2023-01-20 13:15:13 -08:00
obsvC chan <- * gossipv1 . SignedObservation ,
obsvReqC chan <- * gossipv1 . ObservationRequest ,
obsvReqSendC <- chan * gossipv1 . ObservationRequest ,
gossipSendC chan [ ] byte ,
signedInC chan <- * gossipv1 . SignedVAAWithQuorum ,
2023-01-16 04:33:01 -08:00
priv crypto . PrivKey ,
gk * ecdsa . PrivateKey ,
gst * node_common . GuardianSetState ,
port uint ,
networkID string ,
bootstrapPeers string ,
nodeName string ,
disableHeartbeatVerify bool ,
rootCtxCancel context . CancelFunc ,
2023-01-17 05:30:50 -08:00
acct * accountant . Accountant ,
2023-01-16 04:33:01 -08:00
gov * governor . ChainGovernor ,
signedGovCfg chan * gossipv1 . SignedChainGovernorConfig ,
signedGovSt chan * gossipv1 . SignedChainGovernorStatus ,
) func ( ctx context . Context ) error {
2020-08-19 05:23:00 -07:00
return func ( ctx context . Context ) ( re error ) {
2023-02-13 07:11:17 -08:00
p2pReceiveChannelOverflow . WithLabelValues ( "observation" ) . Add ( 0 )
p2pReceiveChannelOverflow . WithLabelValues ( "signed_vaa_with_quorum" ) . Add ( 0 )
p2pReceiveChannelOverflow . WithLabelValues ( "signed_observation_request" ) . Add ( 0 )
2020-08-19 05:23:00 -07:00
logger := supervisor . Logger ( ctx )
2020-08-17 09:20:15 -07:00
2022-09-05 20:36:58 -07:00
mgr , err := connmgr . NewConnManager (
100 , // LowWater
400 , // HighWater,
connmgr . WithGracePeriod ( time . Minute ) ,
)
if err != nil {
return fmt . Errorf ( "failed to create p2p connection manager: %w" , err )
}
h , err := libp2p . New (
2020-08-28 06:10:42 -07:00
// Use the keypair we generated
libp2p . Identity ( priv ) ,
// Multiple listen addresses
libp2p . ListenAddrStrings (
// Listen on QUIC only.
// https://github.com/libp2p/go-libp2p/issues/688
2020-10-28 14:41:37 -07:00
fmt . Sprintf ( "/ip4/0.0.0.0/udp/%d/quic" , port ) ,
fmt . Sprintf ( "/ip6/::/udp/%d/quic" , port ) ,
2020-08-28 06:10:42 -07:00
) ,
// Enable TLS security as the only security protocol.
libp2p . Security ( libp2ptls . ID , libp2ptls . New ) ,
// Enable QUIC transport as the only transport.
libp2p . Transport ( libp2pquic . NewTransport ) ,
// Let's prevent our peer from having too many
// connections by attaching a connection manager.
2022-09-05 20:36:58 -07:00
libp2p . ConnectionManager ( mgr ) ,
2020-08-28 06:10:42 -07:00
// Let this host use the DHT to find other hosts
libp2p . Routing ( func ( h host . Host ) ( routing . PeerRouting , error ) {
2022-08-24 09:27:59 -07:00
logger . Info ( "Connecting to bootstrap peers" , zap . String ( "bootstrap_peers" , bootstrapPeers ) )
bootstrappers := make ( [ ] peer . AddrInfo , 0 )
for _ , addr := range strings . Split ( bootstrapPeers , "," ) {
if addr == "" {
continue
}
ma , err := multiaddr . NewMultiaddr ( addr )
if err != nil {
logger . Error ( "Invalid bootstrap address" , zap . String ( "peer" , addr ) , zap . Error ( err ) )
continue
}
pi , err := peer . AddrInfoFromP2pAddr ( ma )
if err != nil {
logger . Error ( "Invalid bootstrap address" , zap . String ( "peer" , addr ) , zap . Error ( err ) )
continue
}
if pi . ID == h . ID ( ) {
logger . Info ( "We're a bootstrap node" )
continue
}
bootstrappers = append ( bootstrappers , * pi )
}
2020-08-28 06:10:42 -07:00
// TODO(leo): Persistent data store (i.e. address book)
2020-10-28 14:41:37 -07:00
idht , err := dht . New ( ctx , h , dht . Mode ( dht . ModeServer ) ,
2021-07-31 07:29:13 -07:00
// This intentionally makes us incompatible with the global IPFS DHT
2020-10-28 14:41:37 -07:00
dht . ProtocolPrefix ( protocol . ID ( "/" + networkID ) ) ,
2022-08-24 09:27:59 -07:00
dht . BootstrapPeers ( bootstrappers ... ) ,
2020-08-28 06:10:42 -07:00
)
return idht , err
} ) ,
2020-08-19 05:23:00 -07:00
)
2020-08-28 06:10:42 -07:00
if err != nil {
panic ( err )
}
2020-08-17 03:29:52 -07:00
2020-08-28 06:10:42 -07:00
defer func ( ) {
// TODO: libp2p cannot be cleanly restarted (https://github.com/libp2p/go-libp2p/issues/992)
logger . Error ( "p2p routine has exited, cancelling root context..." , zap . Error ( re ) )
rootCtxCancel ( )
} ( )
2020-08-17 03:29:52 -07:00
2020-10-28 14:41:37 -07:00
topic := fmt . Sprintf ( "%s/%s" , networkID , "broadcast" )
2020-08-31 07:11:09 -07:00
logger . Info ( "Subscribing pubsub topic" , zap . String ( "topic" , topic ) )
ps , err := pubsub . NewGossipSub ( ctx , h )
if err != nil {
panic ( err )
}
th , err := ps . Join ( topic )
if err != nil {
return fmt . Errorf ( "failed to join topic: %w" , err )
}
sub , err := th . Subscribe ( )
if err != nil {
return fmt . Errorf ( "failed to subscribe topic: %w" , err )
}
2020-08-19 05:23:00 -07:00
logger . Info ( "Node has been started" , zap . String ( "peer_id" , h . ID ( ) . String ( ) ) ,
zap . String ( "addrs" , fmt . Sprintf ( "%v" , h . Addrs ( ) ) ) )
2021-08-08 03:05:14 -07:00
bootTime := time . Now ( )
2021-10-28 15:17:22 -07:00
// Periodically run guardian state set cleanup.
go func ( ) {
ticker := time . NewTicker ( 15 * time . Second )
defer ticker . Stop ( )
for {
select {
case <- ticker . C :
gst . Cleanup ( )
case <- ctx . Done ( ) :
return
}
}
} ( )
2020-08-19 05:23:00 -07:00
go func ( ) {
2021-12-02 16:02:32 -08:00
// Disable heartbeat when no node name is provided (spy mode)
if nodeName == "" {
return
}
2020-08-19 05:23:00 -07:00
ctr := int64 ( 0 )
tick := time . NewTicker ( 15 * time . Second )
defer tick . Stop ( )
for {
select {
case <- ctx . Done ( ) :
return
case <- tick . C :
2022-08-09 20:22:14 -07:00
2023-02-03 17:26:07 -08:00
// create a heartbeat
b := func ( ) [ ] byte {
DefaultRegistry . mu . Lock ( )
defer DefaultRegistry . mu . Unlock ( )
networks := make ( [ ] * gossipv1 . Heartbeat_Network , 0 , len ( DefaultRegistry . networkStats ) )
for _ , v := range DefaultRegistry . networkStats {
errCtr := DefaultRegistry . GetErrorCount ( vaa . ChainID ( v . Id ) )
v . ErrorCount = errCtr
networks = append ( networks , v )
}
features := make ( [ ] string , 0 )
if gov != nil {
features = append ( features , "governor" )
}
if acct != nil {
features = append ( features , acct . FeatureString ( ) )
}
heartbeat := & gossipv1 . Heartbeat {
NodeName : nodeName ,
Counter : ctr ,
Timestamp : time . Now ( ) . UnixNano ( ) ,
Networks : networks ,
Version : version . Version ( ) ,
GuardianAddr : DefaultRegistry . guardianAddress ,
BootTimestamp : bootTime . UnixNano ( ) ,
Features : features ,
}
ourAddr := ethcrypto . PubkeyToAddress ( gk . PublicKey )
if err := gst . SetHeartbeat ( ourAddr , h . ID ( ) , heartbeat ) ; err != nil {
panic ( err )
}
collectNodeMetrics ( ourAddr , h . ID ( ) , heartbeat )
if gov != nil {
gov . CollectMetrics ( heartbeat , gossipSendC , gk , ourAddr )
}
msg := gossipv1 . GossipMessage {
Message : & gossipv1 . GossipMessage_SignedHeartbeat {
SignedHeartbeat : createSignedHeartbeat ( gk , heartbeat ) ,
} ,
}
b , err := proto . Marshal ( & msg )
if err != nil {
panic ( err )
}
return b
} ( )
2021-07-31 05:36:57 -07:00
2020-08-19 05:23:00 -07:00
err = th . Publish ( ctx , b )
if err != nil {
logger . Warn ( "failed to publish heartbeat message" , zap . Error ( err ) )
}
2020-11-06 03:50:54 -08:00
2021-01-26 16:16:37 -08:00
p2pHeartbeatsSent . Inc ( )
2020-11-06 03:50:54 -08:00
ctr += 1
2020-08-19 05:23:00 -07:00
}
}
} ( )
go func ( ) {
for {
select {
case <- ctx . Done ( ) :
return
2023-01-20 13:15:13 -08:00
case msg := <- gossipSendC :
2020-08-19 05:23:00 -07:00
err := th . Publish ( ctx , msg )
2021-01-26 16:16:37 -08:00
p2pMessagesSent . Inc ( )
2020-08-19 05:23:00 -07:00
if err != nil {
logger . Error ( "failed to publish message from queue" , zap . Error ( err ) )
}
2021-12-20 13:23:45 -08:00
case msg := <- obsvReqSendC :
b , err := proto . Marshal ( msg )
if err != nil {
panic ( err )
}
// Sign the observation request using our node's guardian key.
digest := signedObservationRequestDigest ( b )
sig , err := ethcrypto . Sign ( digest . Bytes ( ) , gk )
if err != nil {
panic ( err )
}
sReq := & gossipv1 . SignedObservationRequest {
ObservationRequest : b ,
Signature : sig ,
GuardianAddr : ethcrypto . PubkeyToAddress ( gk . PublicKey ) . Bytes ( ) ,
}
envelope := & gossipv1 . GossipMessage {
Message : & gossipv1 . GossipMessage_SignedObservationRequest {
SignedObservationRequest : sReq } }
b , err = proto . Marshal ( envelope )
if err != nil {
panic ( err )
}
// Send to local observation request queue (the loopback message is ignored)
obsvReqC <- msg
err = th . Publish ( ctx , b )
p2pMessagesSent . Inc ( )
if err != nil {
logger . Error ( "failed to publish observation request" , zap . Error ( err ) )
} else {
logger . Info ( "published signed observation request" , zap . Any ( "signed_observation_request" , sReq ) )
}
2020-08-19 05:23:00 -07:00
}
}
} ( )
2020-08-17 03:29:52 -07:00
for {
2020-10-28 14:41:37 -07:00
envelope , err := sub . Next ( ctx )
2020-08-17 03:29:52 -07:00
if err != nil {
2020-08-19 05:23:00 -07:00
return fmt . Errorf ( "failed to receive pubsub message: %w" , err )
2020-08-17 03:29:52 -07:00
}
2020-08-19 05:23:00 -07:00
var msg gossipv1 . GossipMessage
2020-10-28 14:41:37 -07:00
err = proto . Unmarshal ( envelope . Data , & msg )
2020-08-17 03:29:52 -07:00
if err != nil {
2020-08-19 05:23:00 -07:00
logger . Info ( "received invalid message" ,
2022-01-25 03:59:18 -08:00
zap . Binary ( "data" , envelope . Data ) ,
2020-10-28 14:41:37 -07:00
zap . String ( "from" , envelope . GetFrom ( ) . String ( ) ) )
2021-01-26 16:16:37 -08:00
p2pMessagesReceived . WithLabelValues ( "invalid" ) . Inc ( )
2020-08-19 05:23:00 -07:00
continue
2020-08-17 03:29:52 -07:00
}
2020-10-28 14:41:37 -07:00
if envelope . GetFrom ( ) == h . ID ( ) {
2020-10-28 14:41:34 -07:00
logger . Debug ( "received message from ourselves, ignoring" ,
zap . Any ( "payload" , msg . Message ) )
2021-01-26 16:16:37 -08:00
p2pMessagesReceived . WithLabelValues ( "loopback" ) . Inc ( )
2020-10-28 14:41:34 -07:00
continue
}
2020-08-17 03:29:52 -07:00
2020-08-19 05:23:00 -07:00
logger . Debug ( "received message" ,
zap . Any ( "payload" , msg . Message ) ,
2020-10-28 14:41:37 -07:00
zap . Binary ( "raw" , envelope . Data ) ,
zap . String ( "from" , envelope . GetFrom ( ) . String ( ) ) )
2020-08-19 05:23:00 -07:00
switch m := msg . Message . ( type ) {
2021-07-31 05:36:57 -07:00
case * gossipv1 . GossipMessage_SignedHeartbeat :
s := m . SignedHeartbeat
2021-07-31 09:51:38 -07:00
gs := gst . Get ( )
if gs == nil {
// No valid guardian set yet - dropping heartbeat
logger . Debug ( "skipping heartbeat - no guardian set" ,
zap . Any ( "value" , s ) ,
zap . String ( "from" , envelope . GetFrom ( ) . String ( ) ) )
break
}
2021-08-08 02:38:21 -07:00
if heartbeat , err := processSignedHeartbeat ( envelope . GetFrom ( ) , s , gs , gst , disableHeartbeatVerify ) ; err != nil {
2021-07-31 05:36:57 -07:00
p2pMessagesReceived . WithLabelValues ( "invalid_heartbeat" ) . Inc ( )
2021-08-12 04:46:44 -07:00
logger . Debug ( "invalid signed heartbeat received" ,
2021-07-31 05:36:57 -07:00
zap . Error ( err ) ,
zap . Any ( "payload" , msg . Message ) ,
zap . Any ( "value" , s ) ,
zap . Binary ( "raw" , envelope . Data ) ,
zap . String ( "from" , envelope . GetFrom ( ) . String ( ) ) )
} else {
p2pMessagesReceived . WithLabelValues ( "valid_heartbeat" ) . Inc ( )
logger . Debug ( "valid signed heartbeat received" ,
zap . Any ( "value" , heartbeat ) ,
zap . String ( "from" , envelope . GetFrom ( ) . String ( ) ) )
}
2020-11-20 13:35:00 -08:00
case * gossipv1 . GossipMessage_SignedObservation :
2023-02-13 07:11:17 -08:00
select {
case obsvC <- m . SignedObservation :
p2pMessagesReceived . WithLabelValues ( "observation" ) . Inc ( )
default :
p2pReceiveChannelOverflow . WithLabelValues ( "observation" ) . Inc ( )
}
2021-09-13 06:03:26 -07:00
case * gossipv1 . GossipMessage_SignedVaaWithQuorum :
2023-02-13 07:11:17 -08:00
select {
case signedInC <- m . SignedVaaWithQuorum :
p2pMessagesReceived . WithLabelValues ( "signed_vaa_with_quorum" ) . Inc ( )
default :
p2pReceiveChannelOverflow . WithLabelValues ( "signed_vaa_with_quorum" ) . Inc ( )
}
2021-12-20 13:23:45 -08:00
case * gossipv1 . GossipMessage_SignedObservationRequest :
s := m . SignedObservationRequest
gs := gst . Get ( )
if gs == nil {
logger . Debug ( "dropping SignedObservationRequest - no guardian set" ,
zap . Any ( "value" , s ) ,
zap . String ( "from" , envelope . GetFrom ( ) . String ( ) ) )
break
}
r , err := processSignedObservationRequest ( s , gs )
if err != nil {
p2pMessagesReceived . WithLabelValues ( "invalid_signed_observation_request" ) . Inc ( )
logger . Debug ( "invalid signed observation request received" ,
zap . Error ( err ) ,
zap . Any ( "payload" , msg . Message ) ,
zap . Any ( "value" , s ) ,
zap . Binary ( "raw" , envelope . Data ) ,
zap . String ( "from" , envelope . GetFrom ( ) . String ( ) ) )
} else {
logger . Info ( "valid signed observation request received" ,
zap . Any ( "value" , r ) ,
zap . String ( "from" , envelope . GetFrom ( ) . String ( ) ) )
2023-02-13 07:11:17 -08:00
select {
case obsvReqC <- r :
p2pMessagesReceived . WithLabelValues ( "signed_observation_request" ) . Inc ( )
default :
p2pReceiveChannelOverflow . WithLabelValues ( "signed_observation_request" ) . Inc ( )
}
2021-12-20 13:23:45 -08:00
}
2022-09-26 09:24:30 -07:00
case * gossipv1 . GossipMessage_SignedChainGovernorConfig :
logger . Debug ( "cgov: received config message" )
2022-10-19 20:32:05 -07:00
if signedGovCfg != nil {
signedGovCfg <- m . SignedChainGovernorConfig
}
2022-09-26 09:24:30 -07:00
case * gossipv1 . GossipMessage_SignedChainGovernorStatus :
logger . Debug ( "cgov: received status message" )
2022-10-19 20:32:05 -07:00
if signedGovSt != nil {
signedGovSt <- m . SignedChainGovernorStatus
}
2020-08-19 05:23:00 -07:00
default :
2021-01-26 16:16:37 -08:00
p2pMessagesReceived . WithLabelValues ( "unknown" ) . Inc ( )
2020-08-19 05:23:00 -07:00
logger . Warn ( "received unknown message type (running outdated software?)" ,
zap . Any ( "payload" , msg . Message ) ,
2020-10-28 14:41:37 -07:00
zap . Binary ( "raw" , envelope . Data ) ,
zap . String ( "from" , envelope . GetFrom ( ) . String ( ) ) )
2020-08-19 05:23:00 -07:00
}
2020-08-17 03:29:52 -07:00
}
}
}
2021-07-31 05:36:57 -07:00
2023-02-03 16:05:42 -08:00
func createSignedHeartbeat ( gk * ecdsa . PrivateKey , heartbeat * gossipv1 . Heartbeat ) * gossipv1 . SignedHeartbeat {
ourAddr := ethcrypto . PubkeyToAddress ( gk . PublicKey )
b , err := proto . Marshal ( heartbeat )
if err != nil {
panic ( err )
}
// Sign the heartbeat using our node's guardian key.
digest := heartbeatDigest ( b )
sig , err := ethcrypto . Sign ( digest . Bytes ( ) , gk )
if err != nil {
panic ( err )
}
return & gossipv1 . SignedHeartbeat {
Heartbeat : b ,
Signature : sig ,
GuardianAddr : ourAddr . Bytes ( ) ,
}
}
2021-08-30 07:19:00 -07:00
func processSignedHeartbeat ( from peer . ID , s * gossipv1 . SignedHeartbeat , gs * node_common . GuardianSet , gst * node_common . GuardianSetState , disableVerify bool ) ( * gossipv1 . Heartbeat , error ) {
2021-07-31 09:51:38 -07:00
envelopeAddr := common . BytesToAddress ( s . GuardianAddr )
idx , ok := gs . KeyIndex ( envelopeAddr )
2021-08-05 08:01:36 -07:00
var pk common . Address
2021-07-31 09:51:38 -07:00
if ! ok {
2021-08-05 08:01:36 -07:00
if ! disableVerify {
return nil , fmt . Errorf ( "invalid message: %s not in guardian set" , envelopeAddr )
}
} else {
pk = gs . Keys [ idx ]
2021-07-31 09:51:38 -07:00
}
digest := heartbeatDigest ( s . Heartbeat )
2022-11-28 08:23:34 -08:00
// SECURITY: see whitepapers/0009_guardian_key.md
if len ( heartbeatMessagePrefix ) + len ( s . Heartbeat ) < 34 {
return nil , fmt . Errorf ( "invalid message: too short" )
}
2021-07-31 09:51:38 -07:00
pubKey , err := ethcrypto . Ecrecover ( digest . Bytes ( ) , s . Signature )
if err != nil {
return nil , errors . New ( "failed to recover public key" )
}
signerAddr := common . BytesToAddress ( ethcrypto . Keccak256 ( pubKey [ 1 : ] ) [ 12 : ] )
2021-08-05 08:01:36 -07:00
if pk != signerAddr && ! disableVerify {
2021-07-31 09:51:38 -07:00
return nil , fmt . Errorf ( "invalid signer: %v" , signerAddr )
}
2021-07-31 05:36:57 -07:00
var h gossipv1 . Heartbeat
2021-07-31 09:51:38 -07:00
err = proto . Unmarshal ( s . Heartbeat , & h )
2021-07-31 05:36:57 -07:00
if err != nil {
return nil , fmt . Errorf ( "failed to unmarshal heartbeat: %w" , err )
}
2023-02-03 17:26:07 -08:00
if time . Until ( time . Unix ( 0 , h . Timestamp ) ) . Abs ( ) > heartbeatMaxTimeDifference {
return nil , fmt . Errorf ( "heartbeat is too old or too far into the future" )
}
if h . GuardianAddr != signerAddr . String ( ) {
return nil , fmt . Errorf ( "GuardianAddr in heartbeat does not match signerAddr" )
}
2021-08-03 11:03:00 -07:00
// Store verified heartbeat in global guardian set state.
2021-08-08 02:38:21 -07:00
if err := gst . SetHeartbeat ( signerAddr , from , & h ) ; err != nil {
return nil , fmt . Errorf ( "failed to store in guardian set state: %w" , err )
}
2021-08-03 11:03:00 -07:00
2021-08-08 03:20:36 -07:00
collectNodeMetrics ( signerAddr , from , & h )
2021-07-31 05:36:57 -07:00
return & h , nil
}
2021-12-20 13:23:45 -08:00
func processSignedObservationRequest ( s * gossipv1 . SignedObservationRequest , gs * node_common . GuardianSet ) ( * gossipv1 . ObservationRequest , error ) {
envelopeAddr := common . BytesToAddress ( s . GuardianAddr )
idx , ok := gs . KeyIndex ( envelopeAddr )
var pk common . Address
if ! ok {
return nil , fmt . Errorf ( "invalid message: %s not in guardian set" , envelopeAddr )
} else {
pk = gs . Keys [ idx ]
}
2022-11-28 08:42:39 -08:00
// SECURITY: see whitepapers/0009_guardian_key.md
if len ( signedObservationRequestPrefix ) + len ( s . ObservationRequest ) < 34 {
return nil , fmt . Errorf ( "invalid observation request: too short" )
}
2021-12-20 13:23:45 -08:00
digest := signedObservationRequestDigest ( s . ObservationRequest )
pubKey , err := ethcrypto . Ecrecover ( digest . Bytes ( ) , s . Signature )
if err != nil {
return nil , errors . New ( "failed to recover public key" )
}
signerAddr := common . BytesToAddress ( ethcrypto . Keccak256 ( pubKey [ 1 : ] ) [ 12 : ] )
if pk != signerAddr {
return nil , fmt . Errorf ( "invalid signer: %v" , signerAddr )
}
var h gossipv1 . ObservationRequest
err = proto . Unmarshal ( s . ObservationRequest , & h )
if err != nil {
return nil , fmt . Errorf ( "failed to unmarshal observation request: %w" , err )
}
// TODO: implement per-guardian rate limiting
return & h , nil
}