diff --git a/cmd/chainlink_exporter/aggregator.go b/cmd/chainlink_exporter/aggregator.go index 9e1260e..f67e875 100644 --- a/cmd/chainlink_exporter/aggregator.go +++ b/cmd/chainlink_exporter/aggregator.go @@ -48,7 +48,6 @@ func (a *AggregatorMonitor) Monitor() { zap.L().Error("failed to watch aggregator fulfillment", zap.Error(err), zap.String("address", a.address.String())) return } - defer sub.Unsubscribe() for { select { diff --git a/cmd/chainlink_exporter/monitor.go b/cmd/chainlink_exporter/monitor.go index 75c9b8d..8be2915 100644 --- a/cmd/chainlink_exporter/monitor.go +++ b/cmd/chainlink_exporter/monitor.go @@ -146,7 +146,8 @@ func NewMonitor(addr common.Address, fulfillmentAddr common.Address, linkAddr co } func (m *Monitor) Start() { - go m.pollRoutine() + go m.headRoutine() + go m.requestRoutine() go m.metricRoutine() } @@ -167,8 +168,9 @@ func (m *Monitor) metricRoutine() { func (m *Monitor) updateBalances() { zap.L().Debug("fetching balances") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) defer cancel() + balance, err := m.client.BalanceAt(ctx, m.fulfillmentAddr, nil) if err != nil { zap.L().Error("failed to fetch oracle balance", zap.Error(err)) @@ -178,8 +180,6 @@ func (m *Monitor) updateBalances() { balance.Div(balance, big.NewInt(params.Ether/PRECISION)) m.balanceGauge.Set(float64(balance.Uint64()) / PRECISION) - ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) - defer cancel() owner, err := m.oracle.Owner(&bind.CallOpts{ Context: ctx, }) @@ -194,8 +194,6 @@ func (m *Monitor) updateBalances() { withdrawableLinkBalance.Div(withdrawableLinkBalance, big.NewInt(params.Ether/PRECISION)) m.linkBalanceGauge.WithLabelValues("withdrawable").Set(float64(withdrawableLinkBalance.Uint64()) / PRECISION) - ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) - defer cancel() linkBalance, err := m.linkContract.BalanceOf(&bind.CallOpts{Context: ctx}, m.addr) if err != nil { zap.L().Error("failed to fetch LINK balance", zap.Error(err)) @@ -207,49 +205,25 @@ func (m *Monitor) updateBalances() { zap.L().Debug("fetched balances") } -func (m *Monitor) pollRoutine() { +func (m *Monitor) headRoutine() { for { - zap.L().Info("Starting poll routine") + zap.L().Info("Starting head routine") func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - reqChan := make(chan *abi.OracleOracleRequest, 100) - sub, err := m.oracle.WatchOracleRequest(&bind.WatchOpts{ - Context: ctx, - }, reqChan, nil) - if err != nil { - zap.L().Error("failed to watch oracle requests", zap.Error(err)) - return - } - defer sub.Unsubscribe() - headChan := make(chan *types.Header, 100) - sub2, err := m.client.SubscribeNewHead(ctx, headChan) + sub, err := m.client.SubscribeNewHead(ctx, headChan) if err != nil { zap.L().Error("failed to subscribe to new heads", zap.Error(err)) return } - defer sub2.Unsubscribe() for { select { case err = <-sub.Err(): - zap.L().Error("oracle requests subscription errored", zap.Error(err)) - return - case err = <-sub2.Err(): zap.L().Error("head subscription errored", zap.Error(err)) return - case req, has := <-reqChan: - if !has { - zap.L().Error("request subscription closed", zap.Error(err)) - return - } - err := m.handleRequest(req) - if err != nil { - zap.L().Warn("failed to handle request", zap.Error(err)) - continue - } case header, has := <-headChan: if !has { zap.L().Error("head subscription closed", zap.Error(err)) @@ -268,7 +242,48 @@ func (m *Monitor) pollRoutine() { } }() - zap.L().Warn("poll routine died. restarting in 5sec") + zap.L().Warn("head routine died. restarting in 5sec") + time.Sleep(5 * time.Second) + } +} + +func (m *Monitor) requestRoutine() { + for { + zap.L().Info("Starting request routine") + func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + reqChan := make(chan *abi.OracleOracleRequest, 100) + sub, err := m.oracle.WatchOracleRequest(&bind.WatchOpts{ + Context: ctx, + }, reqChan, nil) + if err != nil { + zap.L().Error("failed to watch oracle requests", zap.Error(err)) + return + } + + for { + select { + case err = <-sub.Err(): + zap.L().Error("oracle requests subscription errored", zap.Error(err)) + return + + case req, has := <-reqChan: + if !has { + zap.L().Error("request subscription closed", zap.Error(err)) + return + } + err := m.handleRequest(req) + if err != nil { + zap.L().Warn("failed to handle request", zap.Error(err)) + continue + } + } + } + }() + + zap.L().Warn("request routine died. restarting in 5sec") time.Sleep(5 * time.Second) } }