node: processor: Rate-limit reobservations
We currently run the cleanup loop every 30 seconds, which means that once 5 minutes have passed for an observation without quorum we will send out re-observation requests to the p2p network every 30 seconds. This is a bit excessive so limit sending these requests out to once every 5 minutes.
This commit is contained in:
parent
f14835f4b4
commit
0552e2fe0c
|
@ -56,6 +56,7 @@ var (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
settlementTime = time.Second * 30
|
settlementTime = time.Second * 30
|
||||||
|
retryTime = time.Minute * 5
|
||||||
)
|
)
|
||||||
|
|
||||||
// handleCleanup handles periodic retransmissions and cleanup of observations
|
// handleCleanup handles periodic retransmissions and cleanup of observations
|
||||||
|
@ -175,7 +176,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
|
||||||
p.logger.Info("expiring unsubmitted observation after exhausting retries", zap.String("digest", hash), zap.Duration("delta", delta))
|
p.logger.Info("expiring unsubmitted observation after exhausting retries", zap.String("digest", hash), zap.Duration("delta", delta))
|
||||||
delete(p.state.signatures, hash)
|
delete(p.state.signatures, hash)
|
||||||
aggregationStateTimeout.Inc()
|
aggregationStateTimeout.Inc()
|
||||||
case !s.submitted && delta.Minutes() >= 5:
|
case !s.submitted && delta.Minutes() >= 5 && time.Since(s.lastRetry) >= retryTime:
|
||||||
// Poor observation has been unsubmitted for five minutes - clearly, something went wrong.
|
// Poor observation has been unsubmitted for five minutes - clearly, something went wrong.
|
||||||
// If we have previously submitted an observation, we can make another attempt to get it over
|
// If we have previously submitted an observation, we can make another attempt to get it over
|
||||||
// the finish line by sending a re-observation request to the network and rebroadcasting our
|
// the finish line by sending a re-observation request to the network and rebroadcasting our
|
||||||
|
@ -196,6 +197,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
|
||||||
}
|
}
|
||||||
p.sendC <- s.ourMsg
|
p.sendC <- s.ourMsg
|
||||||
s.retryCount += 1
|
s.retryCount += 1
|
||||||
|
s.lastRetry = time.Now()
|
||||||
aggregationStateRetries.Inc()
|
aggregationStateRetries.Inc()
|
||||||
} else {
|
} else {
|
||||||
// For nil state entries, we log the quorum to determine whether the
|
// For nil state entries, we log the quorum to determine whether the
|
||||||
|
|
|
@ -40,6 +40,8 @@ type (
|
||||||
state struct {
|
state struct {
|
||||||
// First time this digest was seen (possibly even before we observed it ourselves).
|
// First time this digest was seen (possibly even before we observed it ourselves).
|
||||||
firstObserved time.Time
|
firstObserved time.Time
|
||||||
|
// The most recent time that a re-observation request was sent to the guardian network.
|
||||||
|
lastRetry time.Time
|
||||||
// Copy of our observation.
|
// Copy of our observation.
|
||||||
ourObservation Observation
|
ourObservation Observation
|
||||||
// Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
|
// Map of signatures seen by guardian. During guardian set updates, this may contain signatures belonging
|
||||||
|
|
Loading…
Reference in New Issue