From bd89e4e3ab41957524f2848ffdcb287224e00058 Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Wed, 19 Jul 2023 10:23:16 -0500 Subject: [PATCH] Node: Processor delay metrics (#3210) --- node/cmd/spy/spy.go | 2 +- node/pkg/common/msg_with_timestamp.go | 29 ++++++++++++++++++++++ node/pkg/node/node.go | 4 +-- node/pkg/p2p/p2p.go | 7 +++--- node/pkg/p2p/watermark_test.go | 4 +-- node/pkg/processor/broadcast.go | 5 ++-- node/pkg/processor/observation.go | 5 +++- node/pkg/processor/processor.go | 35 +++++++++++++++++++++++++-- 8 files changed, 77 insertions(+), 14 deletions(-) create mode 100644 node/pkg/common/msg_with_timestamp.go diff --git a/node/cmd/spy/spy.go b/node/cmd/spy/spy.go index d6b561fb4..0498c49f0 100644 --- a/node/cmd/spy/spy.go +++ b/node/cmd/spy/spy.go @@ -266,7 +266,7 @@ func runSpy(cmd *cobra.Command, args []string) { sendC := make(chan []byte) // Inbound observations - obsvC := make(chan *gossipv1.SignedObservation, 1024) + obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024) // Inbound observation requests obsvReqC := make(chan *gossipv1.ObservationRequest, 1024) diff --git a/node/pkg/common/msg_with_timestamp.go b/node/pkg/common/msg_with_timestamp.go new file mode 100644 index 000000000..c40807d37 --- /dev/null +++ b/node/pkg/common/msg_with_timestamp.go @@ -0,0 +1,29 @@ +package common + +import ( + "time" +) + +// MsgWithTimeStamp allows us to track the time of receipt of an event. +type MsgWithTimeStamp[T any] struct { + Msg *T + Timestamp time.Time +} + +// CreateMsgWithTimestamp creates a new MsgWithTimeStamp with the current time. +func CreateMsgWithTimestamp[T any](msg *T) *MsgWithTimeStamp[T] { + return &MsgWithTimeStamp[T]{ + Msg: msg, + Timestamp: time.Now(), + } +} + +// PostMsgWithTimestamp sends the message to the specified channel using the current timestamp. Returns ErrChanFull on error. +func PostMsgWithTimestamp[T any](msg *T, c chan<- *MsgWithTimeStamp[T]) error { + select { + case c <- CreateMsgWithTimestamp[T](msg): + return nil + default: + return ErrChanFull + } +} diff --git a/node/pkg/node/node.go b/node/pkg/node/node.go index 4bb0fed17..b741b4c4a 100644 --- a/node/pkg/node/node.go +++ b/node/pkg/node/node.go @@ -54,7 +54,7 @@ type G struct { // Outbound gossip message queue (needs to be read/write because p2p needs read/write) gossipSendC chan []byte // Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations. - obsvC chan *gossipv1.SignedObservation + obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation] // Finalized guardian observations aggregated across all chains msgC channelPair[*common.MessagePublication] // Ethereum incoming guardian set updates @@ -88,7 +88,7 @@ func (g *G) initializeBasic(logger *zap.Logger, rootCtxCancel context.CancelFunc // Setup various channels... g.gossipSendC = make(chan []byte) - g.obsvC = make(chan *gossipv1.SignedObservation, inboundObservationBufferSize) + g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize) g.msgC = makeChannelPair[*common.MessagePublication](0) g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup. g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize) diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index ff63fe652..0cf00a2d3 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -184,7 +184,7 @@ func connectToPeers(ctx context.Context, logger *zap.Logger, h host.Host, peers } func Run( - obsvC chan<- *gossipv1.SignedObservation, + obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], obsvReqC chan<- *gossipv1.ObservationRequest, obsvReqSendC <-chan *gossipv1.ObservationRequest, gossipSendC chan []byte, @@ -572,10 +572,9 @@ func Run( }() } case *gossipv1.GossipMessage_SignedObservation: - select { - case obsvC <- m.SignedObservation: + if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, obsvC); err == nil { p2pMessagesReceived.WithLabelValues("observation").Inc() - default: + } else { if components.WarnChannelOverflow { logger.Warn("Ignoring SignedObservation because obsvC full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) } diff --git a/node/pkg/p2p/watermark_test.go b/node/pkg/p2p/watermark_test.go index f3a49ea66..687f0e3eb 100644 --- a/node/pkg/p2p/watermark_test.go +++ b/node/pkg/p2p/watermark_test.go @@ -27,7 +27,7 @@ const LOCAL_P2P_PORTRANGE_START = 11000 type G struct { // arguments passed to p2p.New - obsvC chan *gossipv1.SignedObservation + obsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation] obsvReqC chan *gossipv1.ObservationRequest obsvReqSendC chan *gossipv1.ObservationRequest sendC chan []byte @@ -62,7 +62,7 @@ func NewG(t *testing.T, nodeName string) *G { } g := &G{ - obsvC: make(chan *gossipv1.SignedObservation, cs), + obsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], cs), obsvReqC: make(chan *gossipv1.ObservationRequest, cs), obsvReqSendC: make(chan *gossipv1.ObservationRequest, cs), sendC: make(chan []byte, cs), diff --git a/node/pkg/processor/broadcast.go b/node/pkg/processor/broadcast.go index bf097c735..7d5b6a1ea 100644 --- a/node/pkg/processor/broadcast.go +++ b/node/pkg/processor/broadcast.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "google.golang.org/protobuf/proto" + node_common "github.com/certusone/wormhole/node/pkg/common" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" "github.com/wormhole-foundation/wormhole/sdk/vaa" ) @@ -64,8 +65,8 @@ func (p *Processor) broadcastSignature( p.state.signatures[hash].source = o.GetEmitterChain().String() p.state.signatures[hash].gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs - // Fast path for our own signature - go func() { p.obsvC <- &obsv }() + // Fast path for our own signature. Put this in a go routine so it can block if the channel is full. That's also why we're not using node_common.PostMsgWithTimestamp. + go func() { p.obsvC <- node_common.CreateMsgWithTimestamp[gossipv1.SignedObservation](&obsv) }() observationsBroadcastTotal.Inc() } diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index f84322732..9714fd487 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -46,12 +46,13 @@ var ( // handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum, // and assembles and submits a valid VAA if possible. -func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObservation) { +func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) { // SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!) // // Note that observations are never tied to the (verified) p2p identity key - the p2p network // identity is completely decoupled from the guardian identity, p2p is just transport. + m := obs.Msg hash := hex.EncodeToString(m.Hash) p.logger.Debug("received observation", @@ -218,6 +219,8 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs zap.Bools("aggregation", agg)) } + + observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds())) } func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) { diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 0cf9fb6a4..3bedd0fe0 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -19,6 +19,10 @@ import ( "github.com/certusone/wormhole/node/pkg/reporter" "github.com/certusone/wormhole/node/pkg/supervisor" "github.com/wormhole-foundation/wormhole/sdk/vaa" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + dto "github.com/prometheus/client_model/go" ) type ( @@ -86,7 +90,7 @@ type Processor struct { // gossipSendC is a channel of outbound messages to broadcast on p2p gossipSendC chan<- []byte // obsvC is a channel of inbound decoded observations from p2p - obsvC chan *gossipv1.SignedObservation + obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation] // obsvReqSendC is a send-only channel of outbound re-observation requests to broadcast on p2p obsvReqSendC chan<- *gossipv1.ObservationRequest @@ -127,13 +131,29 @@ type Processor struct { pythnetVaas map[string]PythNetVaaEntry } +var ( + observationChanDelay = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "wormhole_signed_observation_channel_delay_us", + Help: "Latency histogram for delay of signed observations in channel", + Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0}, + }) + + observationTotalDelay = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "wormhole_signed_observation_total_delay_us", + Help: "Latency histogram for total time to process signed observations", + Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0}, + }) +) + func NewProcessor( ctx context.Context, db *db.Database, msgC <-chan *common.MessagePublication, setC <-chan *common.GuardianSet, gossipSendC chan<- []byte, - obsvC chan *gossipv1.SignedObservation, + obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], obsvReqSendC chan<- *gossipv1.ObservationRequest, injectC <-chan *vaa.VAA, signedInC <-chan *gossipv1.SignedVAAWithQuorum, @@ -181,6 +201,16 @@ func (p *Processor) Run(ctx context.Context) error { if p.acct != nil { p.acct.Close() } + + // Log these as warnings so they show up in the benchmark logs. + metric := &dto.Metric{} + _ = observationChanDelay.Write(metric) + p.logger.Warn("PROCESSOR_METRICS", zap.Any("observationChannelDelay", metric.String())) + + metric = &dto.Metric{} + _ = observationTotalDelay.Write(metric) + p.logger.Warn("PROCESSOR_METRICS", zap.Any("observationProcessingDelay", metric.String())) + return ctx.Err() case p.gs = <-p.setC: p.logger.Info("guardian set updated", @@ -216,6 +246,7 @@ func (p *Processor) Run(ctx context.Context) error { case v := <-p.injectC: p.handleInjection(ctx, v) case m := <-p.obsvC: + observationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds())) p.handleObservation(ctx, m) case m := <-p.signedInC: p.handleInboundSignedVAAWithQuorum(ctx, m)