package main import ( "chainlink_exporter/abi" "context" "encoding/hex" "fmt" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/params" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" "go.uber.org/zap" "math/big" "sync" "time" "unicode/utf8" ) const ( PRECISION = 1000000000 ) type ( Monitor struct { client *ethclient.Client aggregators map[common.Address]*AggregatorMonitor addr common.Address fulfillmentAddr common.Address oracle *abi.Oracle linkContract *abi.ERC lock sync.Mutex lastResTime *atomic.Uint64 lastReqTime *atomic.Uint64 lastResGauge prometheus.Gauge lastReqGauge prometheus.Gauge currentHeightGauge prometheus.Gauge balanceGauge prometheus.Gauge linkBalanceGauge *prometheus.GaugeVec responseTimeHistogram *prometheus.HistogramVec revenueCounter *prometheus.CounterVec fulfillmentCounter *prometheus.CounterVec missCounter *prometheus.CounterVec } ) func NewMonitor(addr common.Address, fulfillmentAddr common.Address, linkAddr common.Address, client *ethclient.Client) (*Monitor, error) { m := &Monitor{ addr: addr, fulfillmentAddr: fulfillmentAddr, client: client, aggregators: map[common.Address]*AggregatorMonitor{}, lock: sync.Mutex{}, lastResTime: atomic.NewUint64(0), lastReqTime: atomic.NewUint64(0), lastResGauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "cl", Subsystem: "mon", Name: "last_response", Help: "Height of the last response", }), lastReqGauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "cl", Subsystem: "mon", Name: "last_request", Help: "Height of the last oracle request", }), currentHeightGauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "cl", Subsystem: "mon", Name: "height", Help: "Last synced height", }), responseTimeHistogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cl", Subsystem: "mon", Name: "response_time", Help: "Average response time in blocks", Buckets: []float64{1, 2, 3, 4, 5, 10, 15}, }, []string{"spec_id"}), fulfillmentCounter: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "cl", Subsystem: "mon", Name: "fulfilled", Help: "Number of successfully fulfilled requests", }, []string{"spec_id", "requester"}), missCounter: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "cl", Subsystem: "mon", Name: "missed", Help: "Number of missed requests", }, []string{"spec_id", "requester"}), revenueCounter: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "cl", Subsystem: "mon", Name: "revenue", Help: "Number of LINK tokens earned", }, []string{"spec_id", "requester", "status"}), balanceGauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "cl", Subsystem: "mon", Name: "eth_balance", Help: "Balance of the oracle account", }), linkBalanceGauge: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "cl", Subsystem: "mon", Name: "link_balance", Help: "Link balance of the oracle", }, []string{"type"}), } prometheus.MustRegister(m.lastResGauge) prometheus.MustRegister(m.lastReqGauge) prometheus.MustRegister(m.currentHeightGauge) prometheus.MustRegister(m.responseTimeHistogram) prometheus.MustRegister(m.fulfillmentCounter) prometheus.MustRegister(m.revenueCounter) prometheus.MustRegister(m.missCounter) prometheus.MustRegister(m.balanceGauge) prometheus.MustRegister(m.linkBalanceGauge) oracle, err := abi.NewOracle(addr, m.client) if err != nil { return nil, err } m.oracle = oracle linkContract, err := abi.NewERC(linkAddr, m.client) if err != nil { return nil, err } m.linkContract = linkContract _, err = oracle.Owner(nil) if err != nil { return nil, fmt.Errorf("address does not belong to an oracle") } return m, nil } func (m *Monitor) Start() { go m.headRoutine() go m.requestRoutine() go m.metricRoutine() } func (m *Monitor) metricRoutine() { zap.L().Info("Starting metric routine") ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case <-ticker.C: m.lastResGauge.Set(float64(m.lastResTime.Load())) m.lastReqGauge.Set(float64(m.lastReqTime.Load())) } } } func (m *Monitor) updateBalances() { zap.L().Debug("fetching balances") ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) defer cancel() balance, err := m.client.BalanceAt(ctx, m.fulfillmentAddr, nil) if err != nil { zap.L().Error("failed to fetch oracle balance", zap.Error(err)) return } balance.Div(balance, big.NewInt(params.Ether/PRECISION)) m.balanceGauge.Set(float64(balance.Uint64()) / PRECISION) owner, err := m.oracle.Owner(&bind.CallOpts{ Context: ctx, }) withdrawableLinkBalance, err := m.oracle.Withdrawable(&bind.CallOpts{ From: owner, Context: ctx, }) if err != nil { zap.L().Error("failed to fetch withdrawable LINK balance", zap.Error(err)) return } withdrawableLinkBalance.Div(withdrawableLinkBalance, big.NewInt(params.Ether/PRECISION)) m.linkBalanceGauge.WithLabelValues("withdrawable").Set(float64(withdrawableLinkBalance.Uint64()) / PRECISION) linkBalance, err := m.linkContract.BalanceOf(&bind.CallOpts{Context: ctx}, m.addr) if err != nil { zap.L().Error("failed to fetch LINK balance", zap.Error(err)) return } linkBalance.Div(linkBalance, big.NewInt(params.Ether/PRECISION)) m.linkBalanceGauge.WithLabelValues("balance").Set(float64(linkBalance.Uint64()) / PRECISION) zap.L().Debug("fetched balances") } func (m *Monitor) headRoutine() { for { zap.L().Info("Starting head routine") func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() headChan := make(chan *types.Header, 100) sub, err := m.client.SubscribeNewHead(ctx, headChan) if err != nil { zap.L().Error("failed to subscribe to new heads", zap.Error(err)) return } for { select { case err = <-sub.Err(): zap.L().Error("head subscription errored", zap.Error(err)) return case header, has := <-headChan: if !has { zap.L().Error("head subscription closed", zap.Error(err)) return } // Update balances go m.updateBalances() // Update metrics and update aggregator monitors m.currentHeightGauge.Set(float64(header.Number.Uint64())) for _, monitor := range m.aggregators { monitor.HandleNewBlock(header.Number.Uint64()) } } } }() zap.L().Warn("head routine died. restarting in 5sec") time.Sleep(5 * time.Second) } } func (m *Monitor) requestRoutine() { for { zap.L().Info("Starting request routine") func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() reqChan := make(chan *abi.OracleOracleRequest, 100) sub, err := m.oracle.WatchOracleRequest(&bind.WatchOpts{ Context: ctx, }, reqChan, nil) if err != nil { zap.L().Error("failed to watch oracle requests", zap.Error(err)) return } for { select { case err = <-sub.Err(): zap.L().Error("oracle requests subscription errored", zap.Error(err)) return case req, has := <-reqChan: if !has { zap.L().Error("request subscription closed", zap.Error(err)) return } err := m.handleRequest(req) if err != nil { zap.L().Warn("failed to handle request", zap.Error(err)) continue } } } }() zap.L().Warn("request routine died. restarting in 5sec") time.Sleep(5 * time.Second) } } func (m *Monitor) handleRequest(req *abi.OracleOracleRequest) error { logger := zap.L().With(zap.Uint64("height", req.Raw.BlockNumber), zap.String("requester", req.Requester.String()), zap.Binary("request_id", req.RequestId[:]), zap.String("spec_id", sanitizeSpecID(req.SpecId))) logger.Info("received request") if old := m.lastReqTime.Load(); old < req.Raw.BlockNumber { m.lastReqTime.CAS(old, req.Raw.BlockNumber) } if agg, contains := m.aggregators[req.Requester]; contains { agg.handleRequest(req) return nil } logger.Debug("requester unknown; creating a new aggregator monitor") m.lock.Lock() defer m.lock.Unlock() agg, err := abi.NewAggregator(req.Requester, m.client) if err != nil { return err } _, err = agg.Owner(nil) if err != nil { logger.Warn("requester is not an aggregator") return nil } am := NewAggregatorMonitor(agg, req.Requester, m) am.handleRequest(req) go am.Monitor() m.aggregators[req.Requester] = am return nil } func (m *Monitor) HandleFulfillment(res *abi.AggregatorChainlinkFulfilled, req *abi.OracleOracleRequest) { if old := m.lastResTime.Load(); old < res.Raw.BlockNumber { m.lastResTime.CAS(old, res.Raw.BlockNumber) } deltaBlocks := res.Raw.BlockNumber - req.Raw.BlockNumber sanitizedSpecID := sanitizeSpecID(req.SpecId) m.responseTimeHistogram.WithLabelValues(sanitizedSpecID).Observe(float64(deltaBlocks)) m.fulfillmentCounter.WithLabelValues(sanitizedSpecID, req.Requester.String()).Inc() m.revenueCounter.WithLabelValues(sanitizedSpecID, req.Requester.String(), "fulfilled").Add(float64(req.Payment.Uint64()) / params.Ether) } func (m *Monitor) HandleMiss(req *abi.OracleOracleRequest) { sanitizedSpecID := sanitizeSpecID(req.SpecId) m.missCounter.WithLabelValues(sanitizedSpecID, req.Requester.String()).Inc() m.revenueCounter.WithLabelValues(sanitizedSpecID, req.Requester.String(), "missed").Add(float64(req.Payment.Uint64()) / params.Ether) } func sanitizeSpecID(specID [32]byte) string { if !utf8.Valid(specID[:]) { return hex.EncodeToString(specID[:]) } else { return string(specID[:]) } }