diff --git a/cmd/chainlink_exporter/aggregator.go b/cmd/chainlink_exporter/aggregator.go index 10ff437..9e1260e 100644 --- a/cmd/chainlink_exporter/aggregator.go +++ b/cmd/chainlink_exporter/aggregator.go @@ -2,6 +2,7 @@ package main import ( "chainlink_exporter/abi" + "context" "encoding/hex" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -38,8 +39,11 @@ func (a *AggregatorMonitor) Monitor() { for { zap.L().Debug("Starting aggregator routine", zap.String("address", a.address.String())) func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + resChan := make(chan *abi.AggregatorChainlinkFulfilled) - sub, err := a.aggregator.WatchChainlinkFulfilled(&bind.WatchOpts{}, resChan, nil) + sub, err := a.aggregator.WatchChainlinkFulfilled(&bind.WatchOpts{Context: ctx}, resChan, nil) if err != nil { zap.L().Error("failed to watch aggregator fulfillment", zap.Error(err), zap.String("address", a.address.String())) return @@ -53,6 +57,8 @@ func (a *AggregatorMonitor) Monitor() { return case res, has := <-resChan: if !has { + zap.L().Error("head subscription closed", zap.Error(err), zap.String("address", a.address.String())) + return } a.handleFulfillment(res) } diff --git a/cmd/chainlink_exporter/monitor.go b/cmd/chainlink_exporter/monitor.go index ef393d1..75c9b8d 100644 --- a/cmd/chainlink_exporter/monitor.go +++ b/cmd/chainlink_exporter/monitor.go @@ -211,8 +211,13 @@ func (m *Monitor) pollRoutine() { for { zap.L().Info("Starting poll routine") func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + reqChan := make(chan *abi.OracleOracleRequest, 100) - sub, err := m.oracle.WatchOracleRequest(nil, reqChan, nil) + sub, err := m.oracle.WatchOracleRequest(&bind.WatchOpts{ + Context: ctx, + }, reqChan, nil) if err != nil { zap.L().Error("failed to watch oracle requests", zap.Error(err)) return @@ -220,7 +225,7 @@ func (m *Monitor) pollRoutine() { defer sub.Unsubscribe() headChan := make(chan *types.Header, 100) - sub2, err := m.client.SubscribeNewHead(context.Background(), headChan) + sub2, err := m.client.SubscribeNewHead(ctx, headChan) if err != nil { zap.L().Error("failed to subscribe to new heads", zap.Error(err)) return