diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index 0ec4a26bd..185919bfd 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -46,6 +46,10 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId string, bootstrap components := p2p.DefaultComponents() components.Port = port + if g.env == common.GoTest { + components.WarnChannelOverflow = true + } + g.runnables["p2p"] = p2p.Run( g.obsvC, g.obsvReqC.writeC, diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index a2816a059..02ceb78a2 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -3,6 +3,7 @@ package p2p import ( "context" "crypto/ecdsa" + "encoding/hex" "errors" "fmt" "strings" @@ -10,10 +11,10 @@ import ( "time" "github.com/certusone/wormhole/node/pkg/accountant" - node_common "github.com/certusone/wormhole/node/pkg/common" + "github.com/certusone/wormhole/node/pkg/common" "github.com/certusone/wormhole/node/pkg/governor" "github.com/certusone/wormhole/node/pkg/version" - "github.com/ethereum/go-ethereum/common" + eth_common "github.com/ethereum/go-ethereum/common" ethcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -71,11 +72,11 @@ var signedObservationRequestPrefix = []byte("signed_observation_request|") // heartbeatMaxTimeDifference specifies the maximum time difference between the local clock and the timestamp in incoming heartbeat messages. Heartbeats that are this old or this much into the future will be dropped. This value should encompass clock skew and network delay. var heartbeatMaxTimeDifference = time.Minute * 15 -func heartbeatDigest(b []byte) common.Hash { +func heartbeatDigest(b []byte) eth_common.Hash { return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...)) } -func signedObservationRequestDigest(b []byte) common.Hash { +func signedObservationRequestDigest(b []byte) eth_common.Hash { return ethcrypto.Keccak256Hash(append(signedObservationRequestPrefix, b...)) } @@ -88,10 +89,12 @@ type Components struct { // ConnMgr is the ConnectionManager that the Guardian is going to use ConnMgr *connmgr.BasicConnMgr // ProtectedHostByGuardianKey is used to ensure that only one p2p peer can be protected by any given known guardian key - ProtectedHostByGuardianKey map[common.Address]peer.ID + ProtectedHostByGuardianKey map[eth_common.Address]peer.ID // ProtectedHostByGuardianKeyLock is only useful to prevent a race condition in test as ProtectedHostByGuardianKey // is only accessed by a single routine at any given time in a running Guardian. ProtectedHostByGuardianKeyLock sync.Mutex + // WarnChannelOverflow: If true, errors due to overflowing channels will produce logger.Warn + WarnChannelOverflow bool } func (f *Components) ListeningAddresses() []string { @@ -118,7 +121,7 @@ func DefaultComponents() *Components { }, Port: DefaultPort, ConnMgr: mgr, - ProtectedHostByGuardianKey: make(map[common.Address]peer.ID), + ProtectedHostByGuardianKey: make(map[eth_common.Address]peer.ID), } } @@ -184,7 +187,7 @@ func Run( signedInC chan<- *gossipv1.SignedVAAWithQuorum, priv crypto.PrivKey, gk *ecdsa.PrivateKey, - gst *node_common.GuardianSetState, + gst *common.GuardianSetState, networkID string, bootstrapPeers string, nodeName string, @@ -539,7 +542,7 @@ func Run( zap.Binary("raw", envelope.Data), zap.String("from", envelope.GetFrom().String())) } else { - guardianAddr := common.BytesToAddress(s.GuardianAddr) + guardianAddr := eth_common.BytesToAddress(s.GuardianAddr) prevPeerId, ok := components.ProtectedHostByGuardianKey[guardianAddr] if ok { if prevPeerId != peerId { @@ -569,6 +572,9 @@ func Run( case obsvC <- m.SignedObservation: p2pMessagesReceived.WithLabelValues("observation").Inc() default: + if components.WarnChannelOverflow { + logger.Warn("Ignoring SignedObservation because obsvC full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) + } p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() } case *gossipv1.GossipMessage_SignedVaaWithQuorum: @@ -576,6 +582,14 @@ func Run( case signedInC <- m.SignedVaaWithQuorum: p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc() default: + if components.WarnChannelOverflow { + // TODO do not log this in production + var hexStr string + if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil { + hexStr = vaa.HexDigest() + } + logger.Warn("Ignoring SignedVaaWithQuorum because signedInC full", zap.String("hash", hexStr)) + } p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc() } case *gossipv1.GossipMessage_SignedObservationRequest: @@ -649,10 +663,10 @@ func createSignedHeartbeat(gk *ecdsa.PrivateKey, heartbeat *gossipv1.Heartbeat) } } -func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *node_common.GuardianSet, gst *node_common.GuardianSetState, disableVerify bool) (*gossipv1.Heartbeat, error) { - envelopeAddr := common.BytesToAddress(s.GuardianAddr) +func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *common.GuardianSet, gst *common.GuardianSetState, disableVerify bool) (*gossipv1.Heartbeat, error) { + envelopeAddr := eth_common.BytesToAddress(s.GuardianAddr) idx, ok := gs.KeyIndex(envelopeAddr) - var pk common.Address + var pk eth_common.Address if !ok { if !disableVerify { return nil, fmt.Errorf("invalid message: %s not in guardian set", envelopeAddr) @@ -673,7 +687,7 @@ func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *node_ return nil, errors.New("failed to recover public key") } - signerAddr := common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:]) + signerAddr := eth_common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:]) if pk != signerAddr && !disableVerify { return nil, fmt.Errorf("invalid signer: %v", signerAddr) } @@ -702,10 +716,10 @@ func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *node_ return &h, nil } -func processSignedObservationRequest(s *gossipv1.SignedObservationRequest, gs *node_common.GuardianSet) (*gossipv1.ObservationRequest, error) { - envelopeAddr := common.BytesToAddress(s.GuardianAddr) +func processSignedObservationRequest(s *gossipv1.SignedObservationRequest, gs *common.GuardianSet) (*gossipv1.ObservationRequest, error) { + envelopeAddr := eth_common.BytesToAddress(s.GuardianAddr) idx, ok := gs.KeyIndex(envelopeAddr) - var pk common.Address + var pk eth_common.Address if !ok { return nil, fmt.Errorf("invalid message: %s not in guardian set", envelopeAddr) } else { @@ -724,7 +738,7 @@ func processSignedObservationRequest(s *gossipv1.SignedObservationRequest, gs *n return nil, errors.New("failed to recover public key") } - signerAddr := common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:]) + signerAddr := eth_common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:]) if pk != signerAddr { return nil, fmt.Errorf("invalid signer: %v", signerAddr) }