Node: Processor delay metrics (#3210)

This commit is contained in:
bruce-riley 2023-07-19 10:23:16 -05:00 committed by GitHub
parent 590cd562bb
commit bd89e4e3ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 77 additions and 14 deletions

View File

@ -266,7 +266,7 @@ func runSpy(cmd *cobra.Command, args []string) {
sendC := make(chan []byte)
// Inbound observations
obsvC := make(chan *gossipv1.SignedObservation, 1024)
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], 1024)
// Inbound observation requests
obsvReqC := make(chan *gossipv1.ObservationRequest, 1024)

View File

@ -0,0 +1,29 @@
package common
import (
"time"
)
// MsgWithTimeStamp allows us to track the time of receipt of an event.
type MsgWithTimeStamp[T any] struct {
Msg *T
Timestamp time.Time
}
// CreateMsgWithTimestamp creates a new MsgWithTimeStamp with the current time.
func CreateMsgWithTimestamp[T any](msg *T) *MsgWithTimeStamp[T] {
return &MsgWithTimeStamp[T]{
Msg: msg,
Timestamp: time.Now(),
}
}
// PostMsgWithTimestamp sends the message to the specified channel using the current timestamp. Returns ErrChanFull on error.
func PostMsgWithTimestamp[T any](msg *T, c chan<- *MsgWithTimeStamp[T]) error {
select {
case c <- CreateMsgWithTimestamp[T](msg):
return nil
default:
return ErrChanFull
}
}

View File

@ -54,7 +54,7 @@ type G struct {
// Outbound gossip message queue (needs to be read/write because p2p needs read/write)
gossipSendC chan []byte
// Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations.
obsvC chan *gossipv1.SignedObservation
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// Finalized guardian observations aggregated across all chains
msgC channelPair[*common.MessagePublication]
// Ethereum incoming guardian set updates
@ -88,7 +88,7 @@ func (g *G) initializeBasic(logger *zap.Logger, rootCtxCancel context.CancelFunc
// Setup various channels...
g.gossipSendC = make(chan []byte)
g.obsvC = make(chan *gossipv1.SignedObservation, inboundObservationBufferSize)
g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize)
g.msgC = makeChannelPair[*common.MessagePublication](0)
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize)

View File

@ -184,7 +184,7 @@ func connectToPeers(ctx context.Context, logger *zap.Logger, h host.Host, peers
}
func Run(
obsvC chan<- *gossipv1.SignedObservation,
obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
obsvReqC chan<- *gossipv1.ObservationRequest,
obsvReqSendC <-chan *gossipv1.ObservationRequest,
gossipSendC chan []byte,
@ -572,10 +572,9 @@ func Run(
}()
}
case *gossipv1.GossipMessage_SignedObservation:
select {
case obsvC <- m.SignedObservation:
if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, obsvC); err == nil {
p2pMessagesReceived.WithLabelValues("observation").Inc()
default:
} else {
if components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservation because obsvC full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash)))
}

View File

@ -27,7 +27,7 @@ const LOCAL_P2P_PORTRANGE_START = 11000
type G struct {
// arguments passed to p2p.New
obsvC chan *gossipv1.SignedObservation
obsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]
obsvReqC chan *gossipv1.ObservationRequest
obsvReqSendC chan *gossipv1.ObservationRequest
sendC chan []byte
@ -62,7 +62,7 @@ func NewG(t *testing.T, nodeName string) *G {
}
g := &G{
obsvC: make(chan *gossipv1.SignedObservation, cs),
obsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], cs),
obsvReqC: make(chan *gossipv1.ObservationRequest, cs),
obsvReqSendC: make(chan *gossipv1.ObservationRequest, cs),
sendC: make(chan []byte, cs),

View File

@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"google.golang.org/protobuf/proto"
node_common "github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
@ -64,8 +65,8 @@ func (p *Processor) broadcastSignature(
p.state.signatures[hash].source = o.GetEmitterChain().String()
p.state.signatures[hash].gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs
// Fast path for our own signature
go func() { p.obsvC <- &obsv }()
// Fast path for our own signature. Put this in a go routine so it can block if the channel is full. That's also why we're not using node_common.PostMsgWithTimestamp.
go func() { p.obsvC <- node_common.CreateMsgWithTimestamp[gossipv1.SignedObservation](&obsv) }()
observationsBroadcastTotal.Inc()
}

View File

@ -46,12 +46,13 @@ var (
// handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum,
// and assembles and submits a valid VAA if possible.
func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObservation) {
func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) {
// SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!)
//
// Note that observations are never tied to the (verified) p2p identity key - the p2p network
// identity is completely decoupled from the guardian identity, p2p is just transport.
m := obs.Msg
hash := hex.EncodeToString(m.Hash)
p.logger.Debug("received observation",
@ -218,6 +219,8 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
zap.Bools("aggregation", agg))
}
observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds()))
}
func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) {

View File

@ -19,6 +19,10 @@ import (
"github.com/certusone/wormhole/node/pkg/reporter"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
dto "github.com/prometheus/client_model/go"
)
type (
@ -86,7 +90,7 @@ type Processor struct {
// gossipSendC is a channel of outbound messages to broadcast on p2p
gossipSendC chan<- []byte
// obsvC is a channel of inbound decoded observations from p2p
obsvC chan *gossipv1.SignedObservation
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// obsvReqSendC is a send-only channel of outbound re-observation requests to broadcast on p2p
obsvReqSendC chan<- *gossipv1.ObservationRequest
@ -127,13 +131,29 @@ type Processor struct {
pythnetVaas map[string]PythNetVaaEntry
}
var (
observationChanDelay = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "wormhole_signed_observation_channel_delay_us",
Help: "Latency histogram for delay of signed observations in channel",
Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0},
})
observationTotalDelay = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "wormhole_signed_observation_total_delay_us",
Help: "Latency histogram for total time to process signed observations",
Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10000.0},
})
)
func NewProcessor(
ctx context.Context,
db *db.Database,
msgC <-chan *common.MessagePublication,
setC <-chan *common.GuardianSet,
gossipSendC chan<- []byte,
obsvC chan *gossipv1.SignedObservation,
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation],
obsvReqSendC chan<- *gossipv1.ObservationRequest,
injectC <-chan *vaa.VAA,
signedInC <-chan *gossipv1.SignedVAAWithQuorum,
@ -181,6 +201,16 @@ func (p *Processor) Run(ctx context.Context) error {
if p.acct != nil {
p.acct.Close()
}
// Log these as warnings so they show up in the benchmark logs.
metric := &dto.Metric{}
_ = observationChanDelay.Write(metric)
p.logger.Warn("PROCESSOR_METRICS", zap.Any("observationChannelDelay", metric.String()))
metric = &dto.Metric{}
_ = observationTotalDelay.Write(metric)
p.logger.Warn("PROCESSOR_METRICS", zap.Any("observationProcessingDelay", metric.String()))
return ctx.Err()
case p.gs = <-p.setC:
p.logger.Info("guardian set updated",
@ -216,6 +246,7 @@ func (p *Processor) Run(ctx context.Context) error {
case v := <-p.injectC:
p.handleInjection(ctx, v)
case m := <-p.obsvC:
observationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
p.handleObservation(ctx, m)
case m := <-p.signedInC:
p.handleInboundSignedVAAWithQuorum(ctx, m)