bridge: add metrics for ethereum, p2p and solana

This commit is contained in:
Leo 2021-01-27 01:16:37 +01:00
parent bc356a5e51
commit b23f43ed1e
5 changed files with 191 additions and 2 deletions

View File

@ -3,6 +3,7 @@ package ethereum
import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"math/big"
"sync"
"time"
@ -20,6 +21,49 @@ import (
"github.com/certusone/wormhole/bridge/pkg/vaa"
)
var (
ethConnectionErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_eth_connection_errors_total",
Help: "Total number of Ethereum connection errors (either during initial connection or while watching)",
}, []string{"reason"})
ethLockupsFound = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_eth_lockups_found_total",
Help: "Total number of Eth lockups found (pre-confirmation)",
})
ethLockupsConfirmed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_eth_lockups_confirmed_total",
Help: "Total number of Eth lockups verified (post-confirmation)",
})
guardianSetChangesConfirmed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_eth_guardian_set_changes_confirmed_total",
Help: "Total number of guardian set changes verified (we only see confirmed ones to begin with)",
})
currentEthHeight = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "wormhole_eth_current_height",
Help: "Current Ethereum block height",
})
queryLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "wormhole_eth_query_latency",
Help: "Latency histogram for Ethereum calls (note that most interactions are streaming queries, NOT calls, and we cannot measure latency for those",
}, []string{"operation"})
)
func init() {
prometheus.MustRegister(ethConnectionErrors)
prometheus.MustRegister(ethLockupsFound)
prometheus.MustRegister(ethLockupsConfirmed)
prometheus.MustRegister(guardianSetChangesConfirmed)
prometheus.MustRegister(currentEthHeight)
prometheus.MustRegister(queryLatency)
}
type (
EthBridgeWatcher struct {
url string
@ -48,6 +92,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
defer cancel()
c, err := ethclient.DialContext(timeout, e.url)
if err != nil {
ethConnectionErrors.WithLabelValues("dial_error").Inc()
return fmt.Errorf("dialing eth client failed: %w", err)
}
@ -69,6 +114,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
tokensLockedC := make(chan *abi.AbiLogTokensLocked, 2)
tokensLockedSub, err := f.WatchLogTokensLocked(&bind.WatchOpts{Context: timeout}, tokensLockedC, nil, nil)
if err != nil {
ethConnectionErrors.WithLabelValues("subscribe_error").Inc()
return fmt.Errorf("failed to subscribe to token lockup events: %w", err)
}
@ -76,6 +122,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
guardianSetC := make(chan *abi.AbiLogGuardianSetChanged, 2)
guardianSetEvent, err := f.WatchLogGuardianSetChanged(&bind.WatchOpts{Context: timeout}, guardianSetC)
if err != nil {
ethConnectionErrors.WithLabelValues("subscribe_error").Inc()
return fmt.Errorf("failed to subscribe to guardian set events: %w", err)
}
@ -88,6 +135,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
defer cancel()
idx, gs, err := FetchCurrentGuardianSet(timeout, e.url, e.bridge)
if err != nil {
ethConnectionErrors.WithLabelValues("guardian_set_fetch_error").Inc()
return fmt.Errorf("failed requesting guardian set from Ethereum: %w", err)
}
logger.Info("initial guardian set fetched", zap.Any("value", gs), zap.Uint32("index", idx))
@ -102,17 +150,23 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
case <-ctx.Done():
return
case e := <-tokensLockedSub.Err():
ethConnectionErrors.WithLabelValues("subscription_error").Inc()
errC <- fmt.Errorf("error while processing token lockup subscription: %w", e)
return
case e := <-guardianSetEvent.Err():
ethConnectionErrors.WithLabelValues("subscription_error").Inc()
errC <- fmt.Errorf("error while processing guardian set subscription: %w", e)
return
case ev := <-tokensLockedC:
// Request timestamp for block
msm := time.Now()
timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
b, err := c.BlockByNumber(timeout, big.NewInt(int64(ev.Raw.BlockNumber)))
cancel()
queryLatency.WithLabelValues("block_by_number").Observe(time.Since(msm).Seconds())
if err != nil {
ethConnectionErrors.WithLabelValues("block_by_number_error").Inc()
errC <- fmt.Errorf("failed to request timestamp for block %d: %w", ev.Raw.BlockNumber, err)
return
}
@ -133,6 +187,9 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
logger.Info("found new lockup transaction", zap.Stringer("tx", ev.Raw.TxHash),
zap.Uint64("block", ev.Raw.BlockNumber))
ethLockupsFound.Inc()
e.pendingLocksGuard.Lock()
e.pendingLocks[ev.Raw.TxHash] = &pendingLock{
lock: lock,
@ -143,7 +200,11 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
logger.Info("guardian set has changed, fetching new value",
zap.Uint32("new_index", ev.NewGuardianIndex))
guardianSetChangesConfirmed.Inc()
msm := time.Now()
gs, err := caller.GetGuardianSet(&bind.CallOpts{Context: timeout}, ev.NewGuardianIndex)
queryLatency.WithLabelValues("get_guardian_set").Observe(time.Since(msm).Seconds())
if err != nil {
// We failed to process the guardian set update and are now out of sync with the chain.
// Recover by crashing the runnable, which causes the guardian set to be re-fetched.
@ -178,7 +239,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
case ev := <-headSink:
start := time.Now()
logger.Info("processing new header", zap.Stringer("block", ev.Number))
currentEthHeight.Set(float64(ev.Number.Int64()))
readiness.SetReady(common.ReadinessEthSyncing)
e.pendingLocksGuard.Lock()
@ -200,6 +261,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
zap.Stringer("block", ev.Number))
delete(e.pendingLocks, hash)
e.lockChan <- pLock.lock
ethLockupsConfirmed.Inc()
}
}

View File

@ -3,6 +3,7 @@ package p2p
import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"strings"
"time"
@ -26,6 +27,30 @@ import (
"github.com/certusone/wormhole/bridge/pkg/supervisor"
)
var (
p2pHeartbeatsSent = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_p2p_heartbeats_sent_total",
Help: "Total number of p2p heartbeats sent",
})
p2pMessagesSent = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_p2p_broadcast_messages_sent_total",
Help: "Total number of p2p pubsub broadcast messages sent",
})
p2pMessagesReceived = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_p2p_broadcast_messages_received_total",
Help: "Total number of p2p pubsub broadcast messages received",
}, []string{"type"})
)
func init() {
prometheus.MustRegister(p2pHeartbeatsSent)
prometheus.MustRegister(p2pMessagesSent)
prometheus.MustRegister(p2pMessagesReceived)
}
func Run(obsvC chan *gossipv1.SignedObservation,
sendC chan []byte,
priv crypto.PrivKey,
@ -178,6 +203,7 @@ func Run(obsvC chan *gossipv1.SignedObservation,
logger.Warn("failed to publish heartbeat message", zap.Error(err))
}
p2pHeartbeatsSent.Inc()
ctr += 1
}
}
@ -190,6 +216,7 @@ func Run(obsvC chan *gossipv1.SignedObservation,
return
case msg := <-sendC:
err := th.Publish(ctx, msg)
p2pMessagesSent.Inc()
if err != nil {
logger.Error("failed to publish message from queue", zap.Error(err))
}
@ -209,12 +236,14 @@ func Run(obsvC chan *gossipv1.SignedObservation,
logger.Info("received invalid message",
zap.String("data", string(envelope.Data)),
zap.String("from", envelope.GetFrom().String()))
p2pMessagesReceived.WithLabelValues("invalid").Inc()
continue
}
if envelope.GetFrom() == h.ID() {
logger.Debug("received message from ourselves, ignoring",
zap.Any("payload", msg.Message))
p2pMessagesReceived.WithLabelValues("loopback").Inc()
continue
}
@ -228,9 +257,12 @@ func Run(obsvC chan *gossipv1.SignedObservation,
logger.Debug("heartbeat received",
zap.Any("value", m.Heartbeat),
zap.String("from", envelope.GetFrom().String()))
p2pMessagesReceived.WithLabelValues("heartbeat").Inc()
case *gossipv1.GossipMessage_SignedObservation:
obsvC <- m.SignedObservation
p2pMessagesReceived.WithLabelValues("observation").Inc()
default:
p2pMessagesReceived.WithLabelValues("unknown").Inc()
logger.Warn("received unknown message type (running outdated software?)",
zap.Any("payload", msg.Message),
zap.Binary("raw", envelope.Data),

View File

@ -53,6 +53,7 @@ func (p *Processor) broadcastSignature(v *vaa.VAA, signature []byte) {
p.state.vaaSignatures[hash] = &vaaState{
firstObserved: time.Now(),
signatures: map[ethcommon.Address][]byte{},
source: "loopback",
}
}

View File

@ -11,6 +11,7 @@ import (
"github.com/dfuse-io/solana-go"
"github.com/dfuse-io/solana-go/rpc"
eth_common "github.com/ethereum/go-ethereum/common"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"math/big"
"time"
@ -23,6 +24,42 @@ type SolanaWatcher struct {
lockEvent chan *common.ChainLock
}
var (
solanaConnectionErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_solana_connection_errors_total",
Help: "Total number of Solana connection errors",
}, []string{"reason"})
solanaAccountSkips = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_solana_account_updates_skipped_total",
Help: "Total number of account updates skipped due to invalid data",
}, []string{"reason"})
solanaLockupsConfirmed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_solana_lockups_confirmed_total",
Help: "Total number of verified Solana lockups found",
})
currentSolanaHeight = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "wormhole_solana_current_height",
Help: "Current Solana slot height (at default commitment level, not the level used for lockups)",
})
queryLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "wormhole_solana_query_latency",
Help: "Latency histogram for Solana RPC calls",
}, []string{"operation"})
)
func init() {
prometheus.MustRegister(solanaConnectionErrors)
prometheus.MustRegister(solanaAccountSkips)
prometheus.MustRegister(solanaLockupsConfirmed)
prometheus.MustRegister(currentSolanaHeight)
prometheus.MustRegister(queryLatency)
}
func NewSolanaWatcher(wsUrl, rpcUrl string, bridgeAddress solana.PublicKey, lockEvents chan *common.ChainLock) *SolanaWatcher {
return &SolanaWatcher{bridge: bridgeAddress, wsUrl: wsUrl, rpcUrl: rpcUrl, lockEvent: lockEvents}
}
@ -42,12 +79,24 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
return
case <-timer.C:
func() {
// Get current slot height
rCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
start := time.Now()
slot, err := rpcClient.GetSlot(rCtx, "")
queryLatency.WithLabelValues("get_slot").Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues("get_slot_error").Inc()
errC <- err
return
}
currentSolanaHeight.Set(float64(slot))
// Find TransferOutProposal accounts without a VAA
rCtx, cancel = context.WithTimeout(ctx, time.Second*5)
defer cancel()
start = time.Now()
accounts, err := rpcClient.GetProgramAccounts(rCtx, s.bridge, &rpc.GetProgramAccountsOpts{
Commitment: rpc.CommitmentMax,
Filters: []rpc.RPCFilter{
@ -62,7 +111,9 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
},
},
})
queryLatency.WithLabelValues("get_program_accounts").Observe(time.Since(start).Seconds())
if err != nil {
solanaConnectionErrors.WithLabelValues("get_program_account_error").Inc()
errC <- err
return
}
@ -75,6 +126,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
for _, acc := range accounts {
proposal, err := ParseTransferOutProposal(acc.Account.Data)
if err != nil {
solanaAccountSkips.WithLabelValues("parse_transfer_out").Inc()
logger.Warn(
"failed to parse transfer proposal",
zap.Stringer("account", acc.Pubkey),
@ -85,6 +137,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
// VAA submitted
if proposal.VaaTime.Unix() != 0 {
solanaAccountSkips.WithLabelValues("is_submitted_vaa").Inc()
continue
}
@ -104,6 +157,8 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
TokenDecimals: proposal.Asset.Decimals,
Amount: proposal.Amount,
}
solanaLockupsConfirmed.Inc()
logger.Info("found lockup without VAA", zap.Stringer("lockup_address", acc.Pubkey))
s.lockEvent <- lock
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"strings"
"time"
@ -21,6 +22,24 @@ import (
"github.com/certusone/wormhole/bridge/pkg/vaa"
)
var (
solanaVAASubmitted = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_solana_vaa_submitted_total",
Help: "Total number of VAAs successfully submitted to the chain",
})
solanaFeePayerBalance = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "wormhole_solana_fee_account_balance_lamports",
Help: "Current fee payer account balance in lamports",
})
)
func init() {
prometheus.MustRegister(solanaVAASubmitted)
prometheus.MustRegister(solanaFeePayerBalance)
}
type (
SolanaVAASubmitter struct {
url string
@ -54,16 +73,33 @@ func (e *SolanaVAASubmitter) Run(ctx context.Context) error {
// Check whether agent is up by doing a GetBalance call.
balance, err := c.GetBalance(timeout, &agentv1.GetBalanceRequest{})
if err != nil {
solanaConnectionErrors.WithLabelValues("get_balance_error").Inc()
return fmt.Errorf("failed to get balance: %v", err)
}
readiness.SetReady(common.ReadinessSolanaSyncing)
logger.Info("account balance", zap.Uint64("lamports", balance.Balance))
// Periodically request the balance for monitoring
btick := time.NewTicker(1 * time.Minute)
go func() {
for {
select {
case <-ctx.Done():
return
case <-btick.C:
timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
balance, err = c.GetBalance(timeout, &agentv1.GetBalanceRequest{})
if err != nil {
solanaConnectionErrors.WithLabelValues("get_balance_error").Inc()
// With PostVAA, it's hard to tell, but if this one fails we know
// that something went wrong and we should restart the service.
errC <- fmt.Errorf("failed to get balance: %v", err)
cancel()
break
}
cancel()
solanaFeePayerBalance.Set(float64(balance.Balance))
case v := <-e.vaaChan:
vaaBytes, err := v.Marshal()
if err != nil {
@ -96,6 +132,7 @@ func (e *SolanaVAASubmitter) Run(ctx context.Context) error {
codes.Unavailable,
codes.Aborted:
solanaConnectionErrors.WithLabelValues("postvaa_transient_error").Inc()
logger.Error("transient error, requeuing VAA", zap.Error(err), zap.String("digest", h))
// Tombstone goroutine
@ -114,12 +151,14 @@ func (e *SolanaVAASubmitter) Run(ctx context.Context) error {
fallthrough
default:
solanaConnectionErrors.WithLabelValues("postvaa_internal_error").Inc()
logger.Error("error submitting VAA", zap.Error(err), zap.String("digest", h))
}
break
}
solanaVAASubmitted.Inc()
logger.Info("submitted VAA",
zap.String("tx_sig", res.Signature), zap.String("digest", h))
}