diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index 899d40886..a614d832d 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -65,6 +65,9 @@ var heartbeatMessagePrefix = []byte("heartbeat|") var signedObservationRequestPrefix = []byte("signed_observation_request|") +// heartbeatMaxTimeDifference specifies the maximum time difference between the local clock and the timestamp in incoming heartbeat messages. Heartbeats that are this old or this much into the future will be dropped. This value should encompass clock skew and network delay. +var heartbeatMaxTimeDifference = time.Minute * 15 + func heartbeatDigest(b []byte) common.Hash { return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...)) } @@ -227,54 +230,59 @@ func Run( case <-ctx.Done(): return case <-tick.C: - DefaultRegistry.mu.Lock() - networks := make([]*gossipv1.Heartbeat_Network, 0, len(DefaultRegistry.networkStats)) - for _, v := range DefaultRegistry.networkStats { - errCtr := DefaultRegistry.GetErrorCount(vaa.ChainID(v.Id)) - v.ErrorCount = errCtr - networks = append(networks, v) - } - features := make([]string, 0) - if gov != nil { - features = append(features, "governor") - } - if acct != nil { - features = append(features, acct.FeatureString()) - } + // create a heartbeat + b := func() []byte { + DefaultRegistry.mu.Lock() + defer DefaultRegistry.mu.Unlock() + networks := make([]*gossipv1.Heartbeat_Network, 0, len(DefaultRegistry.networkStats)) + for _, v := range DefaultRegistry.networkStats { + errCtr := DefaultRegistry.GetErrorCount(vaa.ChainID(v.Id)) + v.ErrorCount = errCtr + networks = append(networks, v) + } - heartbeat := &gossipv1.Heartbeat{ - NodeName: nodeName, - Counter: ctr, - Timestamp: time.Now().UnixNano(), - Networks: networks, - Version: version.Version(), - GuardianAddr: DefaultRegistry.guardianAddress, - BootTimestamp: bootTime.UnixNano(), - Features: features, - } + features := make([]string, 0) + if gov != nil { + features = append(features, "governor") + } + if acct != nil { + features = append(features, acct.FeatureString()) + } - ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey) - if err := gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil { - panic(err) - } - collectNodeMetrics(ourAddr, h.ID(), heartbeat) + heartbeat := &gossipv1.Heartbeat{ + NodeName: nodeName, + Counter: ctr, + Timestamp: time.Now().UnixNano(), + Networks: networks, + Version: version.Version(), + GuardianAddr: DefaultRegistry.guardianAddress, + BootTimestamp: bootTime.UnixNano(), + Features: features, + } - if gov != nil { - gov.CollectMetrics(heartbeat, gossipSendC, gk, ourAddr) - } - DefaultRegistry.mu.Unlock() + ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey) + if err := gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil { + panic(err) + } + collectNodeMetrics(ourAddr, h.ID(), heartbeat) - msg := gossipv1.GossipMessage{ - Message: &gossipv1.GossipMessage_SignedHeartbeat{ - SignedHeartbeat: createSignedHeartbeat(gk, heartbeat), - }, - } + if gov != nil { + gov.CollectMetrics(heartbeat, gossipSendC, gk, ourAddr) + } - b, err := proto.Marshal(&msg) - if err != nil { - panic(err) - } + msg := gossipv1.GossipMessage{ + Message: &gossipv1.GossipMessage_SignedHeartbeat{ + SignedHeartbeat: createSignedHeartbeat(gk, heartbeat), + }, + } + + b, err := proto.Marshal(&msg) + if err != nil { + panic(err) + } + return b + }() err = th.Publish(ctx, b) if err != nil { @@ -515,6 +523,14 @@ func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *node_ return nil, fmt.Errorf("failed to unmarshal heartbeat: %w", err) } + if time.Until(time.Unix(0, h.Timestamp)).Abs() > heartbeatMaxTimeDifference { + return nil, fmt.Errorf("heartbeat is too old or too far into the future") + } + + if h.GuardianAddr != signerAddr.String() { + return nil, fmt.Errorf("GuardianAddr in heartbeat does not match signerAddr") + } + // Store verified heartbeat in global guardian set state. if err := gst.SetHeartbeat(signerAddr, from, &h); err != nil { return nil, fmt.Errorf("failed to store in guardian set state: %w", err) diff --git a/node/pkg/p2p/p2p_test.go b/node/pkg/p2p/p2p_test.go new file mode 100644 index 000000000..6363ffadb --- /dev/null +++ b/node/pkg/p2p/p2p_test.go @@ -0,0 +1,105 @@ +package p2p + +import ( + "crypto/ecdsa" + "crypto/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + node_common "github.com/certusone/wormhole/node/pkg/common" + gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + ethcrypto "github.com/ethereum/go-ethereum/crypto" +) + +func TestSignedHeartbeat(t *testing.T) { + + type testCase struct { + timestamp int64 + gk *ecdsa.PrivateKey + heartbeatGuardianAddr string + expectSuccess bool + } + + // define the tests + + gk, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader) + assert.NoError(t, err) + gAddr := ethcrypto.PubkeyToAddress(gk.PublicKey) + + gk2, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader) + assert.NoError(t, err) + gAddr2 := ethcrypto.PubkeyToAddress(gk2.PublicKey) + + tests := []testCase{ + // happy case + { + timestamp: time.Now().UnixNano(), + gk: gk, + heartbeatGuardianAddr: gAddr.String(), + expectSuccess: true, + }, + // guardian signed a heartbeat for another guardian + { + timestamp: time.Now().UnixNano(), + gk: gk, + heartbeatGuardianAddr: gAddr2.String(), + expectSuccess: false, + }, + // old heartbeat + { + timestamp: time.Now().Add(-time.Hour).UnixNano(), + gk: gk, + heartbeatGuardianAddr: gAddr2.String(), + expectSuccess: false, + }, + // heartbeat from the distant future + { + timestamp: time.Now().Add(time.Hour).UnixNano(), + gk: gk, + heartbeatGuardianAddr: gAddr2.String(), + expectSuccess: false, + }, + } + // run the tests + + testFunc := func(t *testing.T, tc testCase) { + + addr := ethcrypto.PubkeyToAddress(gk.PublicKey) + + heartbeat := &gossipv1.Heartbeat{ + NodeName: "someNode", + Counter: 1, + Timestamp: tc.timestamp, + Networks: []*gossipv1.Heartbeat_Network{}, + Version: "0.0.1beta", + GuardianAddr: tc.heartbeatGuardianAddr, + BootTimestamp: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano(), + Features: []string{}, + } + + s := createSignedHeartbeat(gk, heartbeat) + gs := &node_common.GuardianSet{ + Keys: []common.Address{addr}, + Index: 1, + } + + gst := node_common.NewGuardianSetState(nil) + + heartbeatResult, err := processSignedHeartbeat("someone", s, gs, gst, false) + + if tc.expectSuccess { + assert.NoError(t, err) + assert.EqualValues(t, heartbeat.GuardianAddr, heartbeatResult.GuardianAddr) + } else { + assert.Error(t, err) + } + } + + for _, tc := range tests { + testFunc(t, tc) + } +}