node/processor: exponential backoff for reobservation requests (#3207)
* node/processor: exponential backoff for reobservation requests --------- Co-authored-by: tbjump <>
This commit is contained in:
parent
7423be52c1
commit
c37bdca23d
|
@ -0,0 +1,13 @@
|
|||
package processor
|
||||
|
||||
import (
|
||||
mathrand "math/rand"
|
||||
"time"
|
||||
)
|
||||
|
||||
func nextRetryDuration(ctr uint) time.Duration {
|
||||
m := 1 << ctr
|
||||
wait := firstRetryMinWait * time.Duration(m)
|
||||
jitter := time.Duration(mathrand.Int63n(int64(wait))) // nolint:gosec
|
||||
return wait + jitter
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
package processor
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestBackoff(t *testing.T) {
|
||||
for i := 0; i < 10; i++ {
|
||||
assert.Greater(t, firstRetryMinWait*1*2+time.Second, nextRetryDuration(0))
|
||||
assert.Less(t, firstRetryMinWait*1-time.Second, nextRetryDuration(0))
|
||||
|
||||
assert.Greater(t, firstRetryMinWait*2*2+time.Second, nextRetryDuration(1))
|
||||
assert.Less(t, firstRetryMinWait*2-time.Second, nextRetryDuration(1))
|
||||
|
||||
assert.Greater(t, firstRetryMinWait*4*2+time.Second, nextRetryDuration(2))
|
||||
assert.Less(t, firstRetryMinWait*4-time.Second, nextRetryDuration(2))
|
||||
|
||||
assert.Greater(t, firstRetryMinWait*8*2+time.Second, nextRetryDuration(3))
|
||||
assert.Less(t, firstRetryMinWait*8-time.Second, nextRetryDuration(3))
|
||||
|
||||
assert.Greater(t, firstRetryMinWait*1024*2+time.Second, nextRetryDuration(10))
|
||||
assert.Less(t, firstRetryMinWait*1024-time.Second, nextRetryDuration(10))
|
||||
}
|
||||
}
|
|
@ -52,6 +52,7 @@ func (p *Processor) broadcastSignature(
|
|||
if p.state.signatures[hash] == nil {
|
||||
p.state.signatures[hash] = &state{
|
||||
firstObserved: time.Now(),
|
||||
nextRetry: time.Now().Add(nextRetryDuration(0)),
|
||||
signatures: map[ethcommon.Address][]byte{},
|
||||
source: "loopback",
|
||||
}
|
||||
|
|
|
@ -60,6 +60,7 @@ const (
|
|||
retryTime = time.Minute * 5
|
||||
retryLimitOurs = time.Hour * 24
|
||||
retryLimitNotOurs = time.Hour
|
||||
firstRetryMinWait = time.Minute * 5
|
||||
)
|
||||
|
||||
// handleCleanup handles periodic retransmissions and cleanup of observations
|
||||
|
@ -147,7 +148,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
|
|||
p.logger.Info("expiring unsubmitted observation after exhausting retries", zap.String("digest", hash), zap.Duration("delta", delta), zap.Bool("weObserved", s.ourMsg != nil))
|
||||
delete(p.state.signatures, hash)
|
||||
aggregationStateTimeout.Inc()
|
||||
case !s.submitted && delta.Minutes() >= 5 && time.Since(s.lastRetry) >= retryTime:
|
||||
case !s.submitted && delta >= firstRetryMinWait && time.Since(s.nextRetry) >= 0:
|
||||
// Poor observation has been unsubmitted for five minutes - clearly, something went wrong.
|
||||
// If we have previously submitted an observation, and it was reliable, we can make another attempt to get
|
||||
// it over the finish line by sending a re-observation request to the network and rebroadcasting our
|
||||
|
@ -185,7 +186,8 @@ func (p *Processor) handleCleanup(ctx context.Context) {
|
|||
p.logger.Warn("failed to broadcast re-observation request", zap.Error(err))
|
||||
}
|
||||
p.gossipSendC <- s.ourMsg
|
||||
s.lastRetry = time.Now()
|
||||
s.retryCtr++
|
||||
s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr))
|
||||
aggregationStateRetries.Inc()
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -161,6 +161,7 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs
|
|||
|
||||
p.state.signatures[hash] = &state{
|
||||
firstObserved: time.Now(),
|
||||
nextRetry: time.Now().Add(nextRetryDuration(0)),
|
||||
signatures: map[common.Address][]byte{},
|
||||
source: "unknown",
|
||||
}
|
||||
|
|
|
@ -42,8 +42,10 @@ type (
|
|||
state struct {
|
||||
// First time this digest was seen (possibly even before we observed it ourselves).
|
||||
firstObserved time.Time
|
||||
// The most recent time that a re-observation request was sent to the guardian network.
|
||||
lastRetry time.Time
|
||||
// A re-observation request shall not be sent before this time.
|
||||
nextRetry time.Time
|
||||
// Number of times we sent a re-observation request
|
||||
retryCtr uint
|
||||
// Copy of our observation.
|
||||
ourObservation Observation
|
||||
// Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
|
||||
|
|
Loading…
Reference in New Issue