Node: Logging cleanup (#3888)

* Node: Logging cleanup

* Code review rework

---------

Co-authored-by: Bruce Riley <bruce@wormhole.labs.xyz>
This commit is contained in:
bruce-riley 2024-04-19 13:22:56 -05:00 committed by GitHub
parent c797acb7e9
commit 04a13542d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 185 additions and 91 deletions

View File

@ -590,16 +590,19 @@ func Run(
}
if envelope.GetFrom() == h.ID() {
logger.Debug("received message from ourselves, ignoring",
zap.Any("payload", msg.Message))
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message))
}
p2pMessagesReceived.WithLabelValues("loopback").Inc()
continue
}
logger.Debug("received message",
zap.Any("payload", msg.Message),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("received message",
zap.Any("payload", msg.Message),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
}
switch m := msg.Message.(type) {
case *gossipv1.GossipMessage_SignedHeartbeat:
@ -659,9 +662,9 @@ func Run(
}
}
} else {
logger.Debug("p2p_node_id_not_in_heartbeat",
zap.Error(err),
zap.Any("payload", heartbeat.NodeName))
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName))
}
}
}()
}
@ -693,24 +696,26 @@ func Run(
s := m.SignedObservationRequest
gs := gst.Get()
if gs == nil {
logger.Debug("dropping SignedObservationRequest - no guardian set",
zap.Any("value", s),
zap.String("from", envelope.GetFrom().String()))
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String()))
}
break
}
r, err := processSignedObservationRequest(s, gs)
if err != nil {
p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc()
logger.Debug("invalid signed observation request received",
zap.Error(err),
zap.Any("payload", msg.Message),
zap.Any("value", s),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("invalid signed observation request received",
zap.Error(err),
zap.Any("payload", msg.Message),
zap.Any("value", s),
zap.Binary("raw", envelope.Data),
zap.String("from", envelope.GetFrom().String()))
}
} else {
logger.Debug("valid signed observation request received",
zap.Any("value", r),
zap.String("from", envelope.GetFrom().String()))
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String()))
}
select {
case obsvReqC <- r:

View File

@ -15,6 +15,7 @@ import (
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
@ -86,7 +87,11 @@ func (p *Processor) handleCleanup(ctx context.Context) {
//
// This is a rare case, and we can safely expire the state, since we
// have a quorum VAA.
p.logger.Info("Expiring late VAA", zap.String("digest", hash), zap.Duration("delta", delta))
p.logger.Info("Expiring late VAA",
zap.String("message_id", ourVaa.VAA.MessageID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
)
aggregationStateLate.Inc()
delete(p.state.signatures, hash)
continue
@ -118,14 +123,17 @@ func (p *Processor) handleCleanup(ctx context.Context) {
chain = s.ourObservation.GetEmitterChain()
}
p.logger.Debug("observation considered settled",
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
zap.Int("required_sigs", wantSigs),
zap.Bool("quorum", quorum),
zap.Stringer("emitter_chain", chain),
)
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("observation considered settled",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
zap.Int("required_sigs", wantSigs),
zap.Bool("quorum", quorum),
zap.Stringer("emitter_chain", chain),
)
}
for _, k := range gs.Keys {
if _, ok := s.signatures[k]; ok {
@ -139,12 +147,23 @@ func (p *Processor) handleCleanup(ctx context.Context) {
// observation that come in. Therefore, keep it for a reasonable amount of time.
// If a very late observation arrives after cleanup, a nil aggregation state will be created
// and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian).
p.logger.Debug("expiring submitted observation", zap.String("digest", hash), zap.Duration("delta", delta))
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("expiring submitted observation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
)
}
delete(p.state.signatures, hash)
aggregationStateExpiration.Inc()
case !s.submitted && ((s.ourMsg != nil && delta > retryLimitOurs) || (s.ourMsg == nil && delta > retryLimitNotOurs)):
// Clearly, this horse is dead and continued beatings won't bring it closer to quorum.
p.logger.Info("expiring unsubmitted observation after exhausting retries", zap.String("digest", hash), zap.Duration("delta", delta), zap.Bool("weObserved", s.ourMsg != nil))
p.logger.Info("expiring unsubmitted observation after exhausting retries",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Bool("weObserved", s.ourMsg != nil),
)
delete(p.state.signatures, hash)
aggregationStateTimeout.Inc()
case !s.submitted && delta >= FirstRetryMinWait && time.Since(s.nextRetry) >= 0:
@ -157,7 +176,11 @@ func (p *Processor) handleCleanup(ctx context.Context) {
if s.ourMsg != nil {
// Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes
if !s.ourObservation.IsReliable() {
p.logger.Info("expiring unsubmitted unreliable observation", zap.String("digest", hash), zap.Duration("delta", delta))
p.logger.Info("expiring unsubmitted unreliable observation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
)
delete(p.state.signatures, hash)
aggregationStateTimeout.Inc()
break
@ -165,20 +188,35 @@ func (p *Processor) handleCleanup(ctx context.Context) {
// Reobservation requests should not be resubmitted but we will keep waiting for more observations.
if s.ourObservation.IsReobservation() {
p.logger.Debug("not submitting reobservation request for reobservation", zap.String("digest", hash), zap.Duration("delta", delta))
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("not submitting reobservation request for reobservation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
)
}
break
}
// If we have already stored this VAA, there is no reason for us to request reobservation.
alreadyInDB, err := p.signedVaaAlreadyInDB(hash, s)
if err != nil {
p.logger.Error("failed to check if observation is already in DB, requesting reobservation", zap.String("hash", hash), zap.Error(err))
p.logger.Error("failed to check if observation is already in DB, requesting reobservation",
zap.String("message_id", s.LoggingID()),
zap.String("hash", hash),
zap.Error(err))
}
if alreadyInDB {
p.logger.Debug("observation already in DB, not requesting reobservation", zap.String("digest", hash))
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("observation already in DB, not requesting reobservation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
)
}
} else {
p.logger.Info("resubmitting observation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.String("firstObserved", s.firstObserved.String()),
@ -188,7 +226,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
TxHash: s.txHash,
}
if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil {
p.logger.Warn("failed to broadcast re-observation request", zap.Error(err))
p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err))
}
p.gossipSendC <- s.ourMsg
s.retryCtr++
@ -202,13 +240,16 @@ func (p *Processor) handleCleanup(ctx context.Context) {
hasSigs := len(s.signatures)
wantSigs := vaa.CalculateQuorum(len(p.gs.Keys))
p.logger.Debug("expiring unsubmitted nil observation",
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
zap.Int("required_sigs", wantSigs),
zap.Bool("quorum", hasSigs >= wantSigs),
)
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("expiring unsubmitted nil observation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
zap.Int("required_sigs", wantSigs),
zap.Bool("quorum", hasSigs >= wantSigs),
)
}
delete(p.state.signatures, hash)
aggregationStateUnobserved.Inc()
}
@ -239,7 +280,12 @@ func (p *Processor) signedVaaAlreadyInDB(hash string, s *state) (bool, error) {
vb, err := p.db.GetSignedVAABytes(*vaaID)
if err != nil {
if err == db.ErrVAANotFound {
p.logger.Debug("VAA not in DB", zap.String("digest", hash), zap.String("message_id", s.ourObservation.MessageID()))
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("VAA not in DB",
zap.String("message_id", s.ourObservation.MessageID()),
zap.String("digest", hash),
)
}
return false, nil
} else {
return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err)
@ -253,7 +299,12 @@ func (p *Processor) signedVaaAlreadyInDB(hash string, s *state) (bool, error) {
oldHash := hex.EncodeToString(v.SigningDigest().Bytes())
if hash != oldHash {
p.logger.Debug("VAA already in DB but hash is different", zap.String("old_hash", oldHash), zap.String("new_hash", hash))
if p.logger.Core().Enabled(zapcore.DebugLevel) {
p.logger.Debug("VAA already in DB but hash is different",
zap.String("message_id", s.ourObservation.MessageID()),
zap.String("old_hash", oldHash),
zap.String("new_hash", hash))
}
return false, fmt.Errorf("hash mismatch in_db: %s, new: %s", oldHash, hash)
}

View File

@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
@ -39,8 +40,7 @@ var (
func (p *Processor) handleMessage(k *common.MessagePublication) {
if p.gs == nil {
p.logger.Warn("dropping observation since we haven't initialized our guardian set yet",
zap.Stringer("emitter_chain", k.EmitterChain),
zap.Stringer("emitter_address", k.EmitterAddress),
zap.String("message_id", k.MessageIDString()),
zap.Uint32("nonce", k.Nonce),
zap.Stringer("txhash", k.TxHash),
zap.Time("timestamp", k.Timestamp),
@ -48,13 +48,14 @@ func (p *Processor) handleMessage(k *common.MessagePublication) {
return
}
p.logger.Debug("message publication confirmed",
zap.Stringer("emitter_chain", k.EmitterChain),
zap.Stringer("emitter_address", k.EmitterAddress),
zap.Uint32("nonce", k.Nonce),
zap.Stringer("txhash", k.TxHash),
zap.Time("timestamp", k.Timestamp),
)
if p.logger.Core().Enabled(zapcore.DebugLevel) {
p.logger.Debug("message publication confirmed",
zap.String("message_id", k.MessageIDString()),
zap.Uint32("nonce", k.Nonce),
zap.Stringer("txhash", k.TxHash),
zap.Time("timestamp", k.Timestamp),
)
}
messagesObservedTotal.With(prometheus.Labels{
"emitter_chain": k.EmitterChain.String(),
@ -89,21 +90,18 @@ func (p *Processor) handleMessage(k *common.MessagePublication) {
panic(err)
}
p.logger.Debug("observed and signed confirmed message publication",
zap.Stringer("source_chain", k.EmitterChain),
zap.Stringer("txhash", k.TxHash),
zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())),
zap.String("digest", hex.EncodeToString(digest.Bytes())),
zap.Uint32("nonce", k.Nonce),
zap.Uint64("sequence", k.Sequence),
zap.Stringer("emitter_chain", k.EmitterChain),
zap.Stringer("emitter_address", k.EmitterAddress),
zap.String("emitter_address_b58", base58.Encode(k.EmitterAddress.Bytes())),
zap.Uint8("consistency_level", k.ConsistencyLevel),
zap.String("message_id", v.MessageID()),
zap.String("signature", hex.EncodeToString(s)),
zap.Bool("isReobservation", k.IsReobservation),
)
if p.logger.Core().Enabled(zapcore.DebugLevel) {
p.logger.Debug("observed and signed confirmed message publication",
zap.String("message_id", k.MessageIDString()),
zap.Stringer("txhash", k.TxHash),
zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())),
zap.String("digest", hex.EncodeToString(digest.Bytes())),
zap.Uint32("nonce", k.Nonce),
zap.Uint8("consistency_level", k.ConsistencyLevel),
zap.String("signature", hex.EncodeToString(s)),
zap.Bool("isReobservation", k.IsReobservation),
)
}
messagesSignedTotal.With(prometheus.Labels{
"emitter_chain": k.EmitterChain.String()}).Add(1)

View File

@ -90,12 +90,12 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
if p.logger.Core().Enabled(zapcore.DebugLevel) {
p.logger.Debug("received observation",
zap.String("message_id", m.MessageId),
zap.String("digest", hash),
zap.String("signature", hex.EncodeToString(m.Signature)),
zap.String("addr", hex.EncodeToString(m.Addr)),
zap.String("txhash", hex.EncodeToString(m.TxHash)),
zap.String("txhash_b58", base58.Encode(m.TxHash)),
zap.String("message_id", m.MessageId),
)
}
@ -106,6 +106,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
pk, err := crypto.Ecrecover(m.Hash, m.Signature)
if err != nil {
p.logger.Warn("failed to verify signature on observation",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
zap.String("signature", hex.EncodeToString(m.Signature)),
zap.String("addr", hex.EncodeToString(m.Addr)),
@ -120,6 +121,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
if their_addr != signer_pk {
p.logger.Info("invalid observation - address does not match pubkey",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
zap.String("signature", hex.EncodeToString(m.Signature)),
zap.String("addr", hex.EncodeToString(m.Addr)),
@ -155,6 +157,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
// May as well not have received it/been offline - drop it and wait for the guardian set.
if gs == nil {
p.logger.Warn("dropping observations since we haven't initialized our guardian set yet",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
zap.String("their_addr", their_addr.Hex()),
)
@ -166,12 +169,15 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
// who have the outdated guardian set, we'll just wait for the message to be retransmitted eventually.
_, ok := gs.KeyIndex(their_addr)
if !ok {
p.logger.Debug("received observation by unknown guardian - is our guardian set outdated?",
zap.String("digest", hash),
zap.String("their_addr", their_addr.Hex()),
zap.Uint32("index", gs.Index),
//zap.Any("keys", gs.KeysAsHexStrings()),
)
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("received observation by unknown guardian - is our guardian set outdated?",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
zap.String("their_addr", their_addr.Hex()),
zap.Uint32("index", gs.Index),
//zap.Any("keys", gs.KeysAsHexStrings()),
)
}
observationsFailedTotal.WithLabelValues("unknown_guardian").Inc()
return
}
@ -219,10 +225,12 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
// We will later check for quorum again after assembling the VAA for a particular guardian set.
if len(s.signatures) < quorum {
// no quorum yet, we're done here
p.logger.Debug("quorum not yet met",
zap.String("digest", hash),
zap.String("messageId", m.MessageId),
)
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("quorum not yet met",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
)
}
return
}
@ -232,6 +240,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
zap.Any("set", gs.KeysAsHexStrings()),
zap.Uint32("index", gs.Index),
@ -246,12 +255,20 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
// we have reached quorum *with the active guardian set*
s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p)
} else {
p.logger.Debug("quorum not met or already submitted, doing nothing", // 1.2M out of 3M info messages / hour / guardian
zap.String("digest", hash))
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("quorum not met or already submitted, doing nothing", // 1.2M out of 3M info messages / hour / guardian
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
)
}
}
} else {
p.logger.Debug("we have not yet seen this observation - temporarily storing signature", // 175K out of 3M info messages / hour / guardian
zap.String("digest", hash))
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("we have not yet seen this observation - temporarily storing signature", // 175K out of 3M info messages / hour / guardian
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
)
}
}
@ -270,7 +287,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
if p.haveSignedVAA(*db.VaaIDFromVAA(v)) {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("ignored SignedVAAWithQuorum message for VAA we already stored",
zap.String("vaaID", string(db.VaaIDFromVAA(v).Bytes())),
zap.String("message_id", v.MessageID()),
)
}
return
@ -278,6 +295,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
if p.gs == nil {
p.logger.Warn("dropping SignedVAAWithQuorum message since we haven't initialized our guardian set yet",
zap.String("message_id", v.MessageID()),
zap.String("digest", hex.EncodeToString(v.SigningDigest().Bytes())),
zap.Any("message", m),
)
@ -287,6 +305,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
// Check if guardianSet doesn't have any keys
if len(p.gs.Keys) == 0 {
p.logger.Warn("dropping SignedVAAWithQuorum message since we have a guardian set without keys",
zap.String("message_id", v.MessageID()),
zap.String("digest", hex.EncodeToString(v.SigningDigest().Bytes())),
zap.Any("message", m),
)
@ -294,7 +313,8 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
}
if err := v.Verify(p.gs.Keys); err != nil {
p.logger.Warn("dropping SignedVAAWithQuorum message because it failed verification: " + err.Error())
// We format the error as part of the message so the tests can check for it.
p.logger.Warn("dropping SignedVAAWithQuorum message because it failed verification: "+err.Error(), zap.String("message_id", v.MessageID()))
return
}
@ -306,14 +326,18 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
// Store signed VAA in database.
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("storing inbound signed VAA with quorum",
zap.String("message_id", v.MessageID()),
zap.String("digest", hex.EncodeToString(v.SigningDigest().Bytes())),
zap.Any("vaa", v),
zap.String("bytes", hex.EncodeToString(m.Vaa)),
zap.String("message_id", v.MessageID()))
)
}
if err := p.storeSignedVAA(v); err != nil {
p.logger.Error("failed to store signed VAA", zap.Error(err))
p.logger.Error("failed to store signed VAA",
zap.String("message_id", v.MessageID()),
zap.Error(err),
)
return
}
}

View File

@ -3,6 +3,7 @@ package processor
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"fmt"
"time"
@ -82,6 +83,16 @@ type (
}
)
// LoggingID can be used to identify a state object in a log message. Note that it should not
// be used to uniquely identify an observation. It is only meant for logging purposes.
func (s *state) LoggingID() string {
if s.ourObservation != nil {
return s.ourObservation.MessageID()
}
return hex.EncodeToString(s.txHash)
}
type PythNetVaaEntry struct {
v *vaa.VAA
updateTime time.Time // Used for determining when to delete entries

View File

@ -28,11 +28,16 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) {
// Store signed VAA in database.
p.logger.Info("signed VAA with quorum",
zap.String("message_id", signed.MessageID()),
zap.String("digest", hash),
zap.String("message_id", signed.MessageID()))
)
if err := p.storeSignedVAA(signed); err != nil {
p.logger.Error("failed to store signed VAA", zap.Error(err))
p.logger.Error("failed to store signed VAA",
zap.String("message_id", signed.MessageID()),
zap.String("digest", hash),
zap.Error(err),
)
}
p.broadcastSignedVAA(signed)