From 0ddfa34136c3a2777f069e2d6b4461febdf4ae63 Mon Sep 17 00:00:00 2001 From: Hendrik Hofstadt Date: Wed, 29 Jan 2020 11:21:33 +0100 Subject: [PATCH] Drop duplicate requestID jobs Subscription messages are delivered at least once which might confuse the monitor. --- cmd/chainlink_exporter/aggregator.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/cmd/chainlink_exporter/aggregator.go b/cmd/chainlink_exporter/aggregator.go index f29cd2d..10ff437 100644 --- a/cmd/chainlink_exporter/aggregator.go +++ b/cmd/chainlink_exporter/aggregator.go @@ -17,6 +17,8 @@ type ( pendingJobs map[string]*abi.OracleOracleRequest + seenRequestIDs map[string]bool + monitor *Monitor lock sync.Mutex } @@ -24,10 +26,11 @@ type ( func NewAggregatorMonitor(agg *abi.Aggregator, addr common.Address, m *Monitor) *AggregatorMonitor { return &AggregatorMonitor{ - aggregator: agg, - pendingJobs: map[string]*abi.OracleOracleRequest{}, - monitor: m, - address: addr, + aggregator: agg, + pendingJobs: map[string]*abi.OracleOracleRequest{}, + seenRequestIDs: map[string]bool{}, + monitor: m, + address: addr, } } @@ -81,8 +84,17 @@ func (a *AggregatorMonitor) HandleNewBlock(height uint64) { func (a *AggregatorMonitor) handleRequest(res *abi.OracleOracleRequest) { a.lock.Lock() defer a.lock.Unlock() + requestIDString := hex.EncodeToString(res.RequestId[:]) + if _, exists := a.seenRequestIDs[requestIDString]; exists { + zap.L().Info("request dropped; already seen same reqID", zap.Uint64("height", res.Raw.BlockNumber), + zap.String("requester", res.Requester.String()), zap.Binary("request_id", res.RequestId[:]), + zap.ByteString("spec_id", res.SpecId[:]), zap.Uint64("request_height", res.Raw.BlockNumber)) + return + } else { + a.seenRequestIDs[requestIDString] = true + } - a.pendingJobs[hex.EncodeToString(res.RequestId[:])] = res + a.pendingJobs[requestIDString] = res } func (a *AggregatorMonitor) handleFulfillment(res *abi.AggregatorChainlinkFulfilled) {