Proper timeouts for subscriptions

This commit is contained in:
Hendrik Hofstadt 2020-01-31 11:31:49 +01:00
parent 40c218eb92
commit df005eac71
2 changed files with 14 additions and 3 deletions

View File

@ -2,6 +2,7 @@ package main
import ( import (
"chainlink_exporter/abi" "chainlink_exporter/abi"
"context"
"encoding/hex" "encoding/hex"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -38,8 +39,11 @@ func (a *AggregatorMonitor) Monitor() {
for { for {
zap.L().Debug("Starting aggregator routine", zap.String("address", a.address.String())) zap.L().Debug("Starting aggregator routine", zap.String("address", a.address.String()))
func() { func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
resChan := make(chan *abi.AggregatorChainlinkFulfilled) resChan := make(chan *abi.AggregatorChainlinkFulfilled)
sub, err := a.aggregator.WatchChainlinkFulfilled(&bind.WatchOpts{}, resChan, nil) sub, err := a.aggregator.WatchChainlinkFulfilled(&bind.WatchOpts{Context: ctx}, resChan, nil)
if err != nil { if err != nil {
zap.L().Error("failed to watch aggregator fulfillment", zap.Error(err), zap.String("address", a.address.String())) zap.L().Error("failed to watch aggregator fulfillment", zap.Error(err), zap.String("address", a.address.String()))
return return
@ -53,6 +57,8 @@ func (a *AggregatorMonitor) Monitor() {
return return
case res, has := <-resChan: case res, has := <-resChan:
if !has { if !has {
zap.L().Error("head subscription closed", zap.Error(err), zap.String("address", a.address.String()))
return
} }
a.handleFulfillment(res) a.handleFulfillment(res)
} }

View File

@ -211,8 +211,13 @@ func (m *Monitor) pollRoutine() {
for { for {
zap.L().Info("Starting poll routine") zap.L().Info("Starting poll routine")
func() { func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
reqChan := make(chan *abi.OracleOracleRequest, 100) reqChan := make(chan *abi.OracleOracleRequest, 100)
sub, err := m.oracle.WatchOracleRequest(nil, reqChan, nil) sub, err := m.oracle.WatchOracleRequest(&bind.WatchOpts{
Context: ctx,
}, reqChan, nil)
if err != nil { if err != nil {
zap.L().Error("failed to watch oracle requests", zap.Error(err)) zap.L().Error("failed to watch oracle requests", zap.Error(err))
return return
@ -220,7 +225,7 @@ func (m *Monitor) pollRoutine() {
defer sub.Unsubscribe() defer sub.Unsubscribe()
headChan := make(chan *types.Header, 100) headChan := make(chan *types.Header, 100)
sub2, err := m.client.SubscribeNewHead(context.Background(), headChan) sub2, err := m.client.SubscribeNewHead(ctx, headChan)
if err != nil { if err != nil {
zap.L().Error("failed to subscribe to new heads", zap.Error(err)) zap.L().Error("failed to subscribe to new heads", zap.Error(err))
return return