node/pkg/processor: expire late observations
Fixes https://github.com/certusone/wormhole/issues/685. Example occurrence this fixes: https://i.imgur.com/gZWKf1n.png Possible future optimizations include: - Ignore late messages in the processor (but we can only ignore them post settlement time, so we need the cleanup logic regardless). - Ignoring late observations from other nodes. - Using the stored VAA to calculate misses. - Drop incomplete local observations. However, this is not trivial since we do not know the message ID for those. commit-id:47e1e59f
This commit is contained in:
parent
b8c30314b5
commit
68bdd4b0b6
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"github.com/certusone/wormhole/node/pkg/common"
|
"github.com/certusone/wormhole/node/pkg/common"
|
||||||
|
"github.com/certusone/wormhole/node/pkg/db"
|
||||||
"github.com/certusone/wormhole/node/pkg/vaa"
|
"github.com/certusone/wormhole/node/pkg/vaa"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
|
@ -23,6 +24,11 @@ var (
|
||||||
Name: "wormhole_aggregation_state_expirations_total",
|
Name: "wormhole_aggregation_state_expirations_total",
|
||||||
Help: "Total number of expired submitted aggregation states",
|
Help: "Total number of expired submitted aggregation states",
|
||||||
})
|
})
|
||||||
|
aggregationStateLate = promauto.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "wormhole_aggregation_state_late_total",
|
||||||
|
Help: "Total number of late aggregation states (cluster achieved consensus without us)",
|
||||||
|
})
|
||||||
aggregationStateTimeout = promauto.NewCounter(
|
aggregationStateTimeout = promauto.NewCounter(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Name: "wormhole_aggregation_state_timeout_total",
|
Name: "wormhole_aggregation_state_timeout_total",
|
||||||
|
@ -58,6 +64,29 @@ func (p *Processor) handleCleanup(ctx context.Context) {
|
||||||
delta := time.Since(s.firstObserved)
|
delta := time.Since(s.firstObserved)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
|
case !s.submitted && s.ourVAA != nil && delta > settlementTime:
|
||||||
|
// Expire pending VAAs post settlement time if we have a stored quorum VAA.
|
||||||
|
//
|
||||||
|
// This occurs when we observed a message after the cluster has already reached
|
||||||
|
// consensus on it, causing us to never achieve quorum.
|
||||||
|
|
||||||
|
if _, err := p.db.GetSignedVAABytes(*db.VaaIDFromVAA(s.ourVAA)); err == nil {
|
||||||
|
// If we have a stored quorum VAA, we can safely expire the state.
|
||||||
|
//
|
||||||
|
// This is a rare case, and we can safely expire the state, since we
|
||||||
|
// have a quorum VAA.
|
||||||
|
p.logger.Info("Expiring late VAA", zap.String("digest", hash), zap.Duration("delta", delta))
|
||||||
|
aggregationStateLate.Inc()
|
||||||
|
delete(p.state.vaaSignatures, hash)
|
||||||
|
break
|
||||||
|
} else if err != db.ErrVAANotFound {
|
||||||
|
p.logger.Error("failed to look up VAA in database",
|
||||||
|
zap.String("digest", hash),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fallthrough
|
||||||
case !s.settled && delta > settlementTime:
|
case !s.settled && delta > settlementTime:
|
||||||
// After 30 seconds, the VAA is considered settled - it's unlikely that more observations will
|
// After 30 seconds, the VAA is considered settled - it's unlikely that more observations will
|
||||||
// arrive, barring special circumstances. This is a better time to count misses than submission,
|
// arrive, barring special circumstances. This is a better time to count misses than submission,
|
||||||
|
|
Loading…
Reference in New Issue