node/p2p: Warn about channel overflows in GoTest mode

This commit is contained in:
tbjump 2023-06-27 13:36:52 +00:00 committed by tbjump
parent 14a1251c06
commit 9c668072ef
2 changed files with 34 additions and 16 deletions

View File

@ -46,6 +46,10 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId string, bootstrap
components := p2p.DefaultComponents()
components.Port = port
if g.env == common.GoTest {
components.WarnChannelOverflow = true
}
g.runnables["p2p"] = p2p.Run(
g.obsvC,
g.obsvReqC.writeC,

View File

@ -3,6 +3,7 @@ package p2p
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"errors"
"fmt"
"strings"
@ -10,10 +11,10 @@ import (
"time"
"github.com/certusone/wormhole/node/pkg/accountant"
node_common "github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/governor"
"github.com/certusone/wormhole/node/pkg/version"
"github.com/ethereum/go-ethereum/common"
eth_common "github.com/ethereum/go-ethereum/common"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -71,11 +72,11 @@ var signedObservationRequestPrefix = []byte("signed_observation_request|")
// heartbeatMaxTimeDifference specifies the maximum time difference between the local clock and the timestamp in incoming heartbeat messages. Heartbeats that are this old or this much into the future will be dropped. This value should encompass clock skew and network delay.
var heartbeatMaxTimeDifference = time.Minute * 15
func heartbeatDigest(b []byte) common.Hash {
func heartbeatDigest(b []byte) eth_common.Hash {
return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...))
}
func signedObservationRequestDigest(b []byte) common.Hash {
func signedObservationRequestDigest(b []byte) eth_common.Hash {
return ethcrypto.Keccak256Hash(append(signedObservationRequestPrefix, b...))
}
@ -88,10 +89,12 @@ type Components struct {
// ConnMgr is the ConnectionManager that the Guardian is going to use
ConnMgr *connmgr.BasicConnMgr
// ProtectedHostByGuardianKey is used to ensure that only one p2p peer can be protected by any given known guardian key
ProtectedHostByGuardianKey map[common.Address]peer.ID
ProtectedHostByGuardianKey map[eth_common.Address]peer.ID
// ProtectedHostByGuardianKeyLock is only useful to prevent a race condition in test as ProtectedHostByGuardianKey
// is only accessed by a single routine at any given time in a running Guardian.
ProtectedHostByGuardianKeyLock sync.Mutex
// WarnChannelOverflow: If true, errors due to overflowing channels will produce logger.Warn
WarnChannelOverflow bool
}
func (f *Components) ListeningAddresses() []string {
@ -118,7 +121,7 @@ func DefaultComponents() *Components {
},
Port: DefaultPort,
ConnMgr: mgr,
ProtectedHostByGuardianKey: make(map[common.Address]peer.ID),
ProtectedHostByGuardianKey: make(map[eth_common.Address]peer.ID),
}
}
@ -184,7 +187,7 @@ func Run(
signedInC chan<- *gossipv1.SignedVAAWithQuorum,
priv crypto.PrivKey,
gk *ecdsa.PrivateKey,
gst *node_common.GuardianSetState,
gst *common.GuardianSetState,
networkID string,
bootstrapPeers string,
nodeName string,
@ -539,7 +542,7 @@ func Run(
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
} else {
guardianAddr := common.BytesToAddress(s.GuardianAddr)
guardianAddr := eth_common.BytesToAddress(s.GuardianAddr)
prevPeerId, ok := components.ProtectedHostByGuardianKey[guardianAddr]
if ok {
if prevPeerId != peerId {
@ -569,6 +572,9 @@ func Run(
case obsvC <- m.SignedObservation:
p2pMessagesReceived.WithLabelValues("observation").Inc()
default:
if components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservation because obsvC full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash)))
}
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
case *gossipv1.GossipMessage_SignedVaaWithQuorum:
@ -576,6 +582,14 @@ func Run(
case signedInC <- m.SignedVaaWithQuorum:
p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc()
default:
if components.WarnChannelOverflow {
// TODO do not log this in production
var hexStr string
if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil {
hexStr = vaa.HexDigest()
}
logger.Warn("Ignoring SignedVaaWithQuorum because signedInC full", zap.String("hash", hexStr))
}
p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc()
}
case *gossipv1.GossipMessage_SignedObservationRequest:
@ -649,10 +663,10 @@ func createSignedHeartbeat(gk *ecdsa.PrivateKey, heartbeat *gossipv1.Heartbeat)
}
}
func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *node_common.GuardianSet, gst *node_common.GuardianSetState, disableVerify bool) (*gossipv1.Heartbeat, error) {
envelopeAddr := common.BytesToAddress(s.GuardianAddr)
func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *common.GuardianSet, gst *common.GuardianSetState, disableVerify bool) (*gossipv1.Heartbeat, error) {
envelopeAddr := eth_common.BytesToAddress(s.GuardianAddr)
idx, ok := gs.KeyIndex(envelopeAddr)
var pk common.Address
var pk eth_common.Address
if !ok {
if !disableVerify {
return nil, fmt.Errorf("invalid message: %s not in guardian set", envelopeAddr)
@ -673,7 +687,7 @@ func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *node_
return nil, errors.New("failed to recover public key")
}
signerAddr := common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:])
signerAddr := eth_common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:])
if pk != signerAddr && !disableVerify {
return nil, fmt.Errorf("invalid signer: %v", signerAddr)
}
@ -702,10 +716,10 @@ func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *node_
return &h, nil
}
func processSignedObservationRequest(s *gossipv1.SignedObservationRequest, gs *node_common.GuardianSet) (*gossipv1.ObservationRequest, error) {
envelopeAddr := common.BytesToAddress(s.GuardianAddr)
func processSignedObservationRequest(s *gossipv1.SignedObservationRequest, gs *common.GuardianSet) (*gossipv1.ObservationRequest, error) {
envelopeAddr := eth_common.BytesToAddress(s.GuardianAddr)
idx, ok := gs.KeyIndex(envelopeAddr)
var pk common.Address
var pk eth_common.Address
if !ok {
return nil, fmt.Errorf("invalid message: %s not in guardian set", envelopeAddr)
} else {
@ -724,7 +738,7 @@ func processSignedObservationRequest(s *gossipv1.SignedObservationRequest, gs *n
return nil, errors.New("failed to recover public key")
}
signerAddr := common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:])
signerAddr := eth_common.BytesToAddress(ethcrypto.Keccak256(pubKey[1:])[12:])
if pk != signerAddr {
return nil, fmt.Errorf("invalid signer: %v", signerAddr)
}