Drop duplicate requestID jobs

Subscription messages are delivered at least once which might confuse the monitor.
This commit is contained in:
Hendrik Hofstadt 2020-01-29 11:21:33 +01:00
parent 0b02e46925
commit 0ddfa34136
1 changed files with 17 additions and 5 deletions

View File

@ -17,6 +17,8 @@ type (
pendingJobs map[string]*abi.OracleOracleRequest
seenRequestIDs map[string]bool
monitor *Monitor
lock sync.Mutex
}
@ -24,10 +26,11 @@ type (
func NewAggregatorMonitor(agg *abi.Aggregator, addr common.Address, m *Monitor) *AggregatorMonitor {
return &AggregatorMonitor{
aggregator: agg,
pendingJobs: map[string]*abi.OracleOracleRequest{},
monitor: m,
address: addr,
aggregator: agg,
pendingJobs: map[string]*abi.OracleOracleRequest{},
seenRequestIDs: map[string]bool{},
monitor: m,
address: addr,
}
}
@ -81,8 +84,17 @@ func (a *AggregatorMonitor) HandleNewBlock(height uint64) {
func (a *AggregatorMonitor) handleRequest(res *abi.OracleOracleRequest) {
a.lock.Lock()
defer a.lock.Unlock()
requestIDString := hex.EncodeToString(res.RequestId[:])
if _, exists := a.seenRequestIDs[requestIDString]; exists {
zap.L().Info("request dropped; already seen same reqID", zap.Uint64("height", res.Raw.BlockNumber),
zap.String("requester", res.Requester.String()), zap.Binary("request_id", res.RequestId[:]),
zap.ByteString("spec_id", res.SpecId[:]), zap.Uint64("request_height", res.Raw.BlockNumber))
return
} else {
a.seenRequestIDs[requestIDString] = true
}
a.pendingJobs[hex.EncodeToString(res.RequestId[:])] = res
a.pendingJobs[requestIDString] = res
}
func (a *AggregatorMonitor) handleFulfillment(res *abi.AggregatorChainlinkFulfilled) {