Node metric and performance tweaks (#3967)

* Node: Metric and performance tweaks

* Tweak observation metric

* pubsub StrictNoSign

* bigger observation metric buckets

* biggerer observation metric buckets

* Node: Add metric for libp2p drops

* Disable StrictNoSign for now

---------

Co-authored-by: Evan Gray <battledingo@gmail.com>
This commit is contained in:
bruce-riley 2024-06-06 09:04:28 -05:00 committed by GitHub
parent d5fd72bd4a
commit ac7794b7aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 152 additions and 44 deletions

View File

@ -8,6 +8,7 @@ import (
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -49,8 +50,30 @@ const MaxStateAge = 1 * time.Minute
type GuardianSet struct {
// Guardian's public key hashes truncated by the ETH standard hashing mechanism (20 bytes).
Keys []common.Address
// On-chain set index
Index uint32
// Quorum value for this set of keys
Quorum int
// A map from address to index. Testing showed that, on average, a map is almost three times faster than a sequential search of the key slice.
// Testing also showed that the map was twice as fast as using a sorted slice and `slices.BinarySearchFunc`. That being said, on a 4GHz CPU,
// the sequential search takes an average of 800 nanos and the map look up takes about 260 nanos. Is this worth doing?
keyMap map[common.Address]int
}
func NewGuardianSet(keys []common.Address, index uint32) *GuardianSet {
keyMap := map[common.Address]int{}
for idx, key := range keys {
keyMap[key] = idx
}
return &GuardianSet{
Keys: keys,
Index: index,
Quorum: vaa.CalculateQuorum(len(keys)),
keyMap: keyMap,
}
}
func (g *GuardianSet) KeysAsHexStrings() []string {
@ -66,9 +89,15 @@ func (g *GuardianSet) KeysAsHexStrings() []string {
// KeyIndex returns a given address index from the guardian set. Returns (-1, false)
// if the address wasn't found and (addr, true) otherwise.
func (g *GuardianSet) KeyIndex(addr common.Address) (int, bool) {
for n, k := range g.Keys {
if k == addr {
return n, true
if g.keyMap != nil {
if idx, found := g.keyMap[addr]; found {
return idx, true
}
} else {
for n, k := range g.Keys {
if k == addr {
return n, true
}
}
}

View File

@ -1,12 +1,42 @@
package common
import (
"reflect"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
func TestNewGuardianSet(t *testing.T) {
keys := []common.Address{
common.HexToAddress("0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"),
common.HexToAddress("0x88D7D8B32a9105d228100E72dFFe2Fae0705D31c"),
common.HexToAddress("0x58076F561CC62A47087B567C86f986426dFCD000"),
common.HexToAddress("0xBd6e9833490F8fA87c733A183CD076a6cBD29074"),
common.HexToAddress("0xb853FCF0a5C78C1b56D15fCE7a154e6ebe9ED7a2"),
common.HexToAddress("0xAF3503dBD2E37518ab04D7CE78b630F98b15b78a"),
common.HexToAddress("0x785632deA5609064803B1c8EA8bB2c77a6004Bd1"),
common.HexToAddress("0x09a281a698C0F5BA31f158585B41F4f33659e54D"),
common.HexToAddress("0x3178443AB76a60E21690DBfB17f7F59F09Ae3Ea1"),
common.HexToAddress("0x647ec26ae49b14060660504f4DA1c2059E1C5Ab6"),
common.HexToAddress("0x810AC3D8E1258Bd2F004a94Ca0cd4c68Fc1C0611"),
common.HexToAddress("0x80610e96d645b12f47ae5cf4546b18538739e90F"),
common.HexToAddress("0x2edb0D8530E31A218E72B9480202AcBaeB06178d"),
common.HexToAddress("0xa78858e5e5c4705CdD4B668FFe3Be5bae4867c9D"),
common.HexToAddress("0x5Efe3A05Efc62D60e1D19fAeB56A80223CDd3472"),
common.HexToAddress("0xD791b7D32C05aBB1cc00b6381FA0c4928f0c56fC"),
common.HexToAddress("0x14Bc029B8809069093D712A3fd4DfAb31963597e"),
common.HexToAddress("0x246Ab29FC6EBeDf2D392a51ab2Dc5C59d0902A03"),
common.HexToAddress("0x132A84dFD920b35a3D0BA5f7A0635dF298F9033e"),
}
gs := NewGuardianSet(keys, 1)
assert.True(t, reflect.DeepEqual(keys, gs.Keys))
assert.Equal(t, uint32(1), gs.Index)
assert.Equal(t, vaa.CalculateQuorum(len(keys)), gs.Quorum)
}
func TestKeyIndex(t *testing.T) {
type test struct {
guardianSet GuardianSet
@ -15,13 +45,13 @@ func TestKeyIndex(t *testing.T) {
keyIndex int
}
guardianSet := GuardianSet{
Keys: []common.Address{
guardianSet := *NewGuardianSet(
[]common.Address{
common.HexToAddress("0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaed"),
common.HexToAddress("0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaee"),
},
Index: 1,
}
1,
)
tests := []test{
{guardianSet: guardianSet, address: "0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaed", result: true, keyIndex: 0},

View File

@ -669,10 +669,10 @@ func runConsensusTests(t *testing.T, testCases []testCase, numGuardians int) {
supervisor.Signal(ctx, supervisor.SignalHealthy)
// Inform them of the Guardian Set
commonGuardianSet := common.GuardianSet{
Keys: mockGuardianSetToGuardianAddrList(t, gs),
Index: guardianSetIndex,
}
commonGuardianSet := *common.NewGuardianSet(
mockGuardianSetToGuardianAddrList(t, gs),
guardianSetIndex,
)
for i, g := range gs {
logger.Info("Sending guardian set update", zap.Int("guardian_index", i))
g.MockSetC <- &commonGuardianSet
@ -1183,10 +1183,10 @@ func runConsensusBenchmark(t *testing.B, name string, numGuardians int, numMessa
supervisor.Signal(ctx, supervisor.SignalHealthy)
// Inform them of the Guardian Set
commonGuardianSet := common.GuardianSet{
Keys: mockGuardianSetToGuardianAddrList(t, gs),
Index: guardianSetIndex,
}
commonGuardianSet := *common.NewGuardianSet(
mockGuardianSetToGuardianAddrList(t, gs),
guardianSetIndex,
)
for i, g := range gs {
logger.Info("Sending guardian set update", zap.Int("guardian_index", i))
g.MockSetC <- &commonGuardianSet

View File

@ -27,6 +27,7 @@ import (
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2ppb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
@ -71,6 +72,16 @@ var (
Name: "wormhole_p2p_receive_channel_overflow",
Help: "Total number of p2p received messages dropped due to channel overflow",
}, []string{"type"})
p2pDrop = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_p2p_drops",
Help: "Total number of messages that were dropped by libp2p",
})
p2pReject = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_p2p_rejects",
Help: "Total number of messages rejected by libp2p",
})
)
var heartbeatMessagePrefix = []byte("heartbeat|")
@ -155,6 +166,21 @@ func DefaultConnectionManager() (*connmgr.BasicConnMgr, error) {
)
}
// traceHandler is used to intercept libp2p trace events so we can peg metrics.
type traceHandler struct {
}
// Trace is the interface to the libp2p trace handler. It pegs metrics as appropriate.
func (*traceHandler) Trace(evt *libp2ppb.TraceEvent) {
if evt.Type != nil {
if *evt.Type == libp2ppb.TraceEvent_DROP_RPC {
p2pDrop.Inc()
} else if *evt.Type == libp2ppb.TraceEvent_REJECT_MESSAGE {
p2pReject.Inc()
}
}
}
// BootstrapAddrs takes a comma-separated string of multi-address strings and returns an array of []peer.AddrInfo that does not include `self`.
// if `self` is part of `bootstrapPeers`, return isBootstrapNode=true
func BootstrapAddrs(logger *zap.Logger, bootstrapPeers string, self peer.ID) (bootstrappers []peer.AddrInfo, isBootstrapNode bool) {
@ -342,9 +368,13 @@ func Run(
}
logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
ourTracer := &traceHandler{}
ps, err := pubsub.NewGossipSub(ctx, h,
pubsub.WithValidateQueueSize(P2P_VALIDATE_QUEUE_SIZE),
pubsub.WithGossipSubParams(components.GossipParams),
pubsub.WithEventTracer(ourTracer),
// TODO: Investigate making this change. May need to use LaxSign until everyone has upgraded to that.
// pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
)
if err != nil {
panic(err)

View File

@ -17,11 +17,23 @@ import (
)
var (
observationsBroadcastTotal = promauto.NewCounter(
observationsBroadcast = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_observations_broadcast_total",
Name: "wormhole_observations_queued_for_broadcast",
Help: "Total number of signed observations queued for broadcast",
})
observationsPostedInternally = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_observations_posted_internally",
Help: "Total number of our observations posted internally",
})
signedVAAsBroadcast = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_signed_vaas_queued_for_broadcast",
Help: "Total number of signed vaas queued for broadcast",
})
)
func (p *Processor) broadcastSignature(
@ -45,9 +57,10 @@ func (p *Processor) broadcastSignature(
panic(err)
}
// Broadcast the observation.
p.gossipSendC <- msg
observationsBroadcast.Inc()
// Store our VAA in case we're going to submit it to Solana
hash := hex.EncodeToString(digest.Bytes())
if p.state.signatures[hash] == nil {
@ -75,7 +88,7 @@ func (p *Processor) broadcastSignature(
go func() { p.obsvC <- om }()
}
observationsBroadcastTotal.Inc()
observationsPostedInternally.Inc()
}
func (p *Processor) broadcastSignedVAA(v *vaa.VAA) {
@ -93,7 +106,9 @@ func (p *Processor) broadcastSignedVAA(v *vaa.VAA) {
panic(err)
}
// Broadcast the signed VAA.
p.gossipSendC <- msg
signedVAAsBroadcast.Inc()
if p.gatewayRelayer != nil {
p.gatewayRelayer.SubmitVAA(v)

View File

@ -115,8 +115,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
}
hasSigs := len(s.signatures)
wantSigs := vaa.CalculateQuorum(len(gs.Keys))
quorum := hasSigs >= wantSigs
quorum := hasSigs >= gs.Quorum
var chain vaa.ChainID
if s.ourObservation != nil {
@ -129,7 +128,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
zap.Int("required_sigs", wantSigs),
zap.Int("required_sigs", gs.Quorum),
zap.Bool("quorum", quorum),
zap.Stringer("emitter_chain", chain),
)
@ -220,6 +219,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.String("firstObserved", s.firstObserved.String()),
zap.Int("numSignatures", len(s.signatures)),
)
req := &gossipv1.ObservationRequest{
ChainId: uint32(s.ourObservation.GetEmitterChain()),
@ -238,7 +238,6 @@ func (p *Processor) handleCleanup(ctx context.Context) {
// network reached consensus without us. We don't know the correct guardian
// set, so we simply use the most recent one.
hasSigs := len(s.signatures)
wantSigs := vaa.CalculateQuorum(len(p.gs.Keys))
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("expiring unsubmitted nil observation",
@ -246,8 +245,8 @@ func (p *Processor) handleCleanup(ctx context.Context) {
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
zap.Int("required_sigs", wantSigs),
zap.Bool("quorum", hasSigs >= wantSigs),
zap.Int("required_sigs", p.gs.Quorum),
zap.Bool("quorum", hasSigs >= p.gs.Quorum),
)
}
delete(p.state.signatures, hash)

View File

@ -47,7 +47,7 @@ var (
// signaturesToVaaFormat converts a map[common.Address][]byte (processor state format) to []*vaa.Signature (VAA format) given a set of keys gsKeys
// It also returns a bool array indicating which key in gsKeys had a signature
// The processor state format is used for effeciently storing signatures during aggregation while the VAA format is more efficient for on-chain verification.
// The processor state format is used for efficiently storing signatures during aggregation while the VAA format is more efficient for on-chain verification.
func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common.Address) ([]*vaa.Signature, []bool) {
// Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA.
agg := make([]bool, len(gsKeys))
@ -80,6 +80,8 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
// Note that observations are never tied to the (verified) p2p identity key - the p2p network
// identity is completely decoupled from the guardian identity, p2p is just transport.
observationsReceivedTotal.Inc()
m := obs.Msg
hash := hex.EncodeToString(m.Hash)
s := p.state.signatures[hash]
@ -99,8 +101,6 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
)
}
observationsReceivedTotal.Inc()
// Verify the Guardian's signature. This verifies that m.Signature matches m.Hash and recovers
// the public key that was used to sign the payload.
pk, err := crypto.Ecrecover(m.Hash, m.Signature)
@ -213,17 +213,22 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
s.signatures[their_addr] = m.Signature
if s.ourObservation != nil {
if s.submitted {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("already submitted, doing nothing",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
)
}
} else if s.ourObservation != nil {
// We have made this observation on chain!
quorum := vaa.CalculateQuorum(len(gs.Keys))
// Check if we have more signatures than required for quorum.
// s.signatures may contain signatures from multiple guardian sets during guardian set updates
// Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation,
// but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set.
// We will later check for quorum again after assembling the VAA for a particular guardian set.
if len(s.signatures) < quorum {
if len(s.signatures) < gs.Quorum {
// no quorum yet, we're done here
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("quorum not yet met",
@ -245,18 +250,18 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
zap.Any("set", gs.KeysAsHexStrings()),
zap.Uint32("index", gs.Index),
zap.Bools("aggregation", agg),
zap.Int("required_sigs", quorum),
zap.Int("required_sigs", gs.Quorum),
zap.Int("have_sigs", len(sigsVaaFormat)),
zap.Bool("quorum", len(sigsVaaFormat) >= quorum),
zap.Bool("quorum", len(sigsVaaFormat) >= gs.Quorum),
)
}
if len(sigsVaaFormat) >= quorum && !s.submitted {
if len(sigsVaaFormat) >= gs.Quorum {
// we have reached quorum *with the active guardian set*
s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p)
} else {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("quorum not met or already submitted, doing nothing", // 1.2M out of 3M info messages / hour / guardian
p.logger.Debug("quorum not met, doing nothing",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
)
@ -269,7 +274,6 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
zap.String("digest", hash),
)
}
}
observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds()))

View File

@ -146,14 +146,14 @@ var (
prometheus.HistogramOpts{
Name: "wormhole_signed_observation_channel_delay_us",
Help: "Latency histogram for delay of signed observations in channel",
Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0},
Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0},
})
observationTotalDelay = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "wormhole_signed_observation_total_delay_us",
Help: "Latency histogram for total time to process signed observations",
Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0},
Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0},
})
)
@ -222,7 +222,9 @@ func (p *Processor) Run(ctx context.Context) error {
case p.gs = <-p.setC:
p.logger.Info("guardian set updated",
zap.Strings("set", p.gs.KeysAsHexStrings()),
zap.Uint32("index", p.gs.Index))
zap.Uint32("index", p.gs.Index),
zap.Int("quorum", p.gs.Quorum),
)
p.gst.Set(p.gs)
case k := <-p.msgC:
if p.governor != nil {

View File

@ -553,8 +553,10 @@ func (w *Watcher) Run(parentCtx context.Context) error {
// Transaction is now ready
if pLock.height <= blockNumberU {
msm := time.Now()
timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
tx, err := w.ethConn.TransactionReceipt(timeout, pLock.message.TxHash)
queryLatency.WithLabelValues(w.networkName, "transaction_receipt").Observe(time.Since(msm).Seconds())
cancel()
// If the node returns an error after waiting expectedConfirmation blocks,
@ -690,10 +692,7 @@ func (w *Watcher) fetchAndUpdateGuardianSet(
w.currentGuardianSet = &idx
if w.setC != nil {
w.setC <- &common.GuardianSet{
Keys: gs.Keys,
Index: idx,
}
w.setC <- common.NewGuardianSet(gs.Keys, idx)
}
return nil