node/pkg/processor: ignore late observations for quorum VAAs
This avoids gossip spam and false positive Discord notifications when a connected node catches up and late observations are made. Change-Id: If9562661487d3d3d5138d27298b005f278f9e9ce
This commit is contained in:
parent
24ee63d9a1
commit
f59f4bbb2e
|
@ -45,6 +45,10 @@ var (
|
||||||
}, []string{"addr", "origin", "status"})
|
}, []string{"addr", "origin", "status"})
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
settlementTime = time.Second * 30
|
||||||
|
)
|
||||||
|
|
||||||
// handleCleanup handles periodic retransmissions and cleanup of VAAs
|
// handleCleanup handles periodic retransmissions and cleanup of VAAs
|
||||||
func (p *Processor) handleCleanup(ctx context.Context) {
|
func (p *Processor) handleCleanup(ctx context.Context) {
|
||||||
p.logger.Info("aggregation state summary", zap.Int("cached", len(p.state.vaaSignatures)))
|
p.logger.Info("aggregation state summary", zap.Int("cached", len(p.state.vaaSignatures)))
|
||||||
|
@ -54,7 +58,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
|
||||||
delta := time.Since(s.firstObserved)
|
delta := time.Since(s.firstObserved)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case !s.settled && delta.Seconds() >= 30:
|
case !s.settled && delta > settlementTime:
|
||||||
// After 30 seconds, the VAA is considered settled - it's unlikely that more observations will
|
// After 30 seconds, the VAA is considered settled - it's unlikely that more observations will
|
||||||
// arrive, barring special circumstances. This is a better time to count misses than submission,
|
// arrive, barring special circumstances. This is a better time to count misses than submission,
|
||||||
// because we submit right when we quorum rather than waiting for all observations to arrive.
|
// because we submit right when we quorum rather than waiting for all observations to arrive.
|
||||||
|
|
|
@ -3,6 +3,7 @@ package processor
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"github.com/certusone/wormhole/node/pkg/db"
|
||||||
"github.com/mr-tron/base58"
|
"github.com/mr-tron/base58"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
@ -78,6 +79,44 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
|
||||||
ConsistencyLevel: k.ConsistencyLevel,
|
ConsistencyLevel: k.ConsistencyLevel,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ignore incoming observations when our database already has a quorum VAA for it.
|
||||||
|
// This can occur when we're receiving late observations due to node catchup, and
|
||||||
|
// processing those won't do us any good.
|
||||||
|
//
|
||||||
|
// Exception: if an observation is made within the settlement time (30s), we'll
|
||||||
|
// process it so other nodes won't consider it a miss.
|
||||||
|
if vb, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(v)); err == nil {
|
||||||
|
// unmarshal vaa
|
||||||
|
var existing *vaa.VAA
|
||||||
|
if existing, err = vaa.Unmarshal(vb); err != nil {
|
||||||
|
panic("failed to unmarshal VAA from db")
|
||||||
|
}
|
||||||
|
|
||||||
|
if k.Timestamp.Sub(existing.Timestamp) > settlementTime {
|
||||||
|
p.logger.Info("ignoring observation since we already have a quorum VAA for it",
|
||||||
|
zap.Stringer("emitter_chain", k.EmitterChain),
|
||||||
|
zap.Stringer("emitter_address", k.EmitterAddress),
|
||||||
|
zap.String("emitter_address_b58", base58.Encode(k.EmitterAddress.Bytes())),
|
||||||
|
zap.Uint32("nonce", k.Nonce),
|
||||||
|
zap.Stringer("txhash", k.TxHash),
|
||||||
|
zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())),
|
||||||
|
zap.Time("timestamp", k.Timestamp),
|
||||||
|
zap.String("message_id", v.MessageID()),
|
||||||
|
zap.Duration("settlement_time", settlementTime),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else if err != db.ErrVAANotFound {
|
||||||
|
p.logger.Error("failed to get VAA from db",
|
||||||
|
zap.Stringer("emitter_chain", k.EmitterChain),
|
||||||
|
zap.Stringer("emitter_address", k.EmitterAddress),
|
||||||
|
zap.Uint32("nonce", k.Nonce),
|
||||||
|
zap.Stringer("txhash", k.TxHash),
|
||||||
|
zap.Time("timestamp", k.Timestamp),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// Generate digest of the unsigned VAA.
|
// Generate digest of the unsigned VAA.
|
||||||
digest, err := v.SigningMsg()
|
digest, err := v.SigningMsg()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue