bridge: implement aggregation timeouts and retransmissions
Fixes #21
ghstack-source-id: a89630d9e3
Pull Request resolved: https://github.com/certusone/wormhole/pull/72
This commit is contained in:
parent
a853317421
commit
9f75d19d11
|
@ -0,0 +1,48 @@
|
|||
package processor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// handleCleanup handles periodic retransmissions and cleanup of VAAs
|
||||
func (p *Processor) handleCleanup(ctx context.Context) {
|
||||
p.logger.Info("aggregation state summary", zap.Int("pending", len(p.state.vaaSignatures)))
|
||||
|
||||
for hash, s := range p.state.vaaSignatures {
|
||||
delta := time.Now().Sub(s.firstObserved)
|
||||
|
||||
switch {
|
||||
case s.submitted && delta.Hours() >= 1:
|
||||
// We could delete submitted VAAs right away, but then we'd lose context about additional (late)
|
||||
// observation that come in. Therefore, keep it for a reasonable amount of time.
|
||||
// If a very late observation arrives after cleanup, a nil aggregation state will be created
|
||||
// and then expired after a while (as noted in observation.go, this can be abused by a byzantine guardian).
|
||||
p.logger.Info("expiring submitted VAA", zap.String("digest", hash), zap.Duration("delta", delta))
|
||||
delete(p.state.vaaSignatures, hash)
|
||||
case !s.submitted && s.retryCount >= 10:
|
||||
// Clearly, this horse is dead and continued beatings won't bring it closer to quorum.
|
||||
p.logger.Info("expiring unsubmitted VAA after exhausting retries", zap.String("digest", hash), zap.Duration("delta", delta))
|
||||
delete(p.state.vaaSignatures, hash)
|
||||
case !s.submitted && delta.Minutes() >= 5:
|
||||
// Poor VAA has been unsubmitted for five minutes - clearly, something went wrong.
|
||||
// If we have previously submitted an observation, we can make another attempt to get it over
|
||||
// the finish line by rebroadcasting our sig. If we do not have a VAA, it means we either never observed it,
|
||||
// or it got revived by a malfunctioning guardian node, in which case, we can't do anything
|
||||
// about it and just delete it to keep our state nice and lean.
|
||||
if s.ourMsg != nil {
|
||||
p.logger.Info("resubmitting VAA observation",
|
||||
zap.String("digest", hash),
|
||||
zap.Duration("delta", delta),
|
||||
zap.Int("retry", 1))
|
||||
p.sendC <- s.ourMsg
|
||||
s.retryCount += 1
|
||||
} else {
|
||||
p.logger.Info("expiring unsubmitted nil VAA", zap.String("digest", hash), zap.Duration("delta", delta))
|
||||
delete(p.state.vaaSignatures, hash)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -108,6 +108,7 @@ func (p *Processor) handleLockup(ctx context.Context, k *common.ChainLock) {
|
|||
}
|
||||
|
||||
p.state.vaaSignatures[hash].ourVAA = v
|
||||
p.state.vaaSignatures[hash].ourMsg = msg
|
||||
|
||||
// Fast path for our own signature
|
||||
go func() { p.obsvC <- &obsv }()
|
||||
|
|
|
@ -164,7 +164,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.LockupObs
|
|||
panic(fmt.Sprintf("unknown VAA payload type: %+v", v))
|
||||
}
|
||||
} else {
|
||||
p.logger.Info("quorum not met, doing nothing",
|
||||
p.logger.Info("quorum not met or already submitted, doing nothing",
|
||||
zap.String("digest", hash))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ type (
|
|||
ourVAA *vaa.VAA
|
||||
signatures map[ethcommon.Address][]byte
|
||||
submitted bool
|
||||
retryCount uint
|
||||
ourMsg []byte
|
||||
}
|
||||
|
||||
vaaMap map[string]*vaaState
|
||||
|
@ -66,6 +68,8 @@ type Processor struct {
|
|||
state *aggregationState
|
||||
// gk pk as eth address
|
||||
ourAddr ethcommon.Address
|
||||
// cleanup triggers periodic state cleanup
|
||||
cleanup *time.Ticker
|
||||
}
|
||||
|
||||
func NewProcessor(
|
||||
|
@ -98,6 +102,8 @@ func NewProcessor(
|
|||
}
|
||||
|
||||
func (p *Processor) Run(ctx context.Context) error {
|
||||
p.cleanup = time.NewTicker(30 * time.Second)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -116,6 +122,8 @@ func (p *Processor) Run(ctx context.Context) error {
|
|||
p.handleLockup(ctx, k)
|
||||
case m := <-p.obsvC:
|
||||
p.handleObservation(ctx, m)
|
||||
case <-p.cleanup.C:
|
||||
p.handleCleanup(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue