node/pkg/p2p: add per-chain error counters

Change-Id: I47700ccb2dc93aefefa8ab6b6f0659f30912e142
This commit is contained in:
Leo 2021-08-08 17:16:41 +02:00 committed by Leopold Schabel
parent 6126cfaf40
commit 67793cd144
7 changed files with 58 additions and 3 deletions

View File

@ -128,6 +128,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
c, err := ethclient.DialContext(timeout, e.url)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return fmt.Errorf("dialing eth client failed: %w", err)
}
@ -150,6 +151,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
messageSub, err := f.WatchLogMessagePublished(&bind.WatchOpts{Context: timeout}, messageC, nil)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "subscribe_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return fmt.Errorf("failed to subscribe to message publication events: %w", err)
}
@ -158,6 +160,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
guardianSetEvent, err := f.WatchGuardianSetAdded(&bind.WatchOpts{Context: timeout}, guardianSetC, nil)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "subscribe_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return fmt.Errorf("failed to subscribe to guardian set events: %w", err)
}
@ -167,6 +170,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
idx, gs, err := FetchCurrentGuardianSet(timeout, e.url, e.bridge)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return fmt.Errorf("failed requesting guardian set from Ethereum: %w", err)
}
logger.Info("initial guardian set fetched",
@ -189,10 +193,12 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
case err := <-messageSub.Err():
ethConnectionErrors.WithLabelValues(e.networkName, "subscription_error").Inc()
errC <- fmt.Errorf("error while processing message publication subscription: %w", err)
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return
case err := <-guardianSetEvent.Err():
ethConnectionErrors.WithLabelValues(e.networkName, "subscription_error").Inc()
errC <- fmt.Errorf("error while processing guardian set subscription: %w", err)
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return
case ev := <-messageC:
// Request timestamp for block
@ -204,6 +210,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "block_by_number_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
errC <- fmt.Errorf("failed to request timestamp for block %d: %w", ev.Raw.BlockNumber, err)
return
}
@ -266,6 +273,8 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
headSink := make(chan *types.Header, 2)
headerSubscription, err := c.SubscribeNewHead(ctx, headSink)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "header_subscribe_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return fmt.Errorf("failed to subscribe to header events: %w", err)
}
@ -274,8 +283,10 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return
case e := <-headerSubscription.Err():
errC <- fmt.Errorf("error while processing header subscription: %w", e)
case err := <-headerSubscription.Err():
ethConnectionErrors.WithLabelValues(e.networkName, "header_subscription_error").Inc()
errC <- fmt.Errorf("error while processing header subscription: %w", err)
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return
case ev := <-headSink:
start := time.Now()

View File

@ -15,6 +15,11 @@ var (
Name: "wormhole_network_node_height",
Help: "Network height of the given guardian node per network",
}, []string{"guardian_addr", "node_id", "node_name", "network"})
wormholeNetworkNodeErrors = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "wormhole_network_node_errors_count",
Help: "Number of errors the given guardian node encountered per network",
}, []string{"guardian_addr", "node_id", "node_name", "network"})
)
func collectNodeMetrics(addr common.Address, peerId peer.ID, hb *gossipv1.Heartbeat) {
@ -27,5 +32,8 @@ func collectNodeMetrics(addr common.Address, peerId peer.ID, hb *gossipv1.Heartb
wormholeNetworkNodeHeight.WithLabelValues(
addr.Hex(), peerId.Pretty(), hb.NodeName, chain.String()).Set(float64(n.Height))
wormholeNetworkNodeErrors.WithLabelValues(
addr.Hex(), peerId.Pretty(), hb.NodeName, chain.String()).Set(float64(n.ErrorCount))
}
}

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
bridge_common "github.com/certusone/wormhole/bridge/pkg/common"
"github.com/certusone/wormhole/bridge/pkg/vaa"
"github.com/certusone/wormhole/bridge/pkg/version"
"github.com/ethereum/go-ethereum/common"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
@ -191,6 +192,8 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat
DefaultRegistry.mu.Lock()
networks := make([]*gossipv1.Heartbeat_Network, 0, len(DefaultRegistry.networkStats))
for _, v := range DefaultRegistry.networkStats {
errCtr := DefaultRegistry.GetErrorCount(vaa.ChainID(v.Id))
v.ErrorCount = errCtr
networks = append(networks, v)
}

View File

@ -4,6 +4,7 @@ import (
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/bridge/pkg/vaa"
"sync"
"sync/atomic"
)
// The p2p package implements a simple global metrics registry singleton for node status values transmitted on-chain.
@ -14,13 +15,17 @@ type registry struct {
// Mapping of chain IDs to network status messages.
networkStats map[vaa.ChainID]*gossipv1.Heartbeat_Network
// Atomic per-chain error counters
errorCounters map[vaa.ChainID]uint64
// Value of Heartbeat.guardian_addr.
guardianAddress string
}
func NewRegistry() *registry {
return &registry{
networkStats: map[vaa.ChainID]*gossipv1.Heartbeat_Network{},
networkStats: map[vaa.ChainID]*gossipv1.Heartbeat_Network{},
errorCounters: map[vaa.ChainID]uint64{},
}
}
@ -44,3 +49,13 @@ func (r *registry) SetNetworkStats(chain vaa.ChainID, data *gossipv1.Heartbeat_N
r.networkStats[chain] = data
r.mu.Unlock()
}
func (r *registry) AddErrorCount(chain vaa.ChainID, delta uint64) {
ctr := r.errorCounters[chain]
atomic.AddUint64(&ctr, delta)
}
func (r *registry) GetErrorCount(chain vaa.ChainID) uint64 {
ctr := r.errorCounters[chain]
return atomic.LoadUint64(&ctr)
}

View File

@ -133,6 +133,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
slot, err := s.rpcClient.GetSlot(rCtx, s.commitment)
queryLatency.WithLabelValues("get_slot", string(s.commitment)).Observe(time.Since(start).Seconds())
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_slot_error").Inc()
errC <- err
return
@ -165,6 +166,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
slots, err := s.rpcClient.GetConfirmedBlocks(rCtx, rangeStart, &rangeEnd, s.commitment)
queryLatency.WithLabelValues("get_confirmed_blocks", string(s.commitment)).Observe(time.Since(start).Seconds())
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_blocks_error").Inc()
errC <- err
return
@ -214,6 +216,7 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, slot uint64) {
queryLatency.WithLabelValues("get_confirmed_block", string(s.commitment)).Observe(time.Since(start).Seconds())
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_block_error").Inc()
s.logger.Error("failed to request block", zap.Error(err), zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
@ -221,8 +224,10 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, slot uint64) {
}
if out == nil {
solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_block_error").Inc()
s.logger.Error("nil response when requesting block", zap.Error(err), zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
return
}
@ -277,6 +282,7 @@ OUTER:
})
queryLatency.WithLabelValues("get_confirmed_transaction", string(s.commitment)).Observe(time.Since(start).Seconds())
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_transaction_error").Inc()
s.logger.Error("failed to request transaction",
zap.Error(err),
@ -356,6 +362,7 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, acc solana.Publ
})
queryLatency.WithLabelValues("get_account_info", string(s.commitment)).Observe(time.Since(start).Seconds())
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_account_info_error").Inc()
s.logger.Error("failed to request account",
zap.Error(err),
@ -366,6 +373,7 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, acc solana.Publ
}
if !info.Value.Owner.Equals(s.bridge) {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
solanaConnectionErrors.WithLabelValues(string(s.commitment), "account_owner_mismatch").Inc()
s.logger.Error("account has invalid owner",
zap.Uint64("slot", slot),
@ -377,6 +385,7 @@ func (s *SolanaWatcher) fetchMessageAccount(ctx context.Context, acc solana.Publ
data := info.Value.Data.GetBinary()
if string(data[:3]) != "msg" {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSolana, 1)
solanaConnectionErrors.WithLabelValues(string(s.commitment), "bad_account_data").Inc()
s.logger.Error("account is not a message account",
zap.Uint64("slot", slot),

View File

@ -88,6 +88,7 @@ func (e *BridgeWatcher) Run(ctx context.Context) error {
c, _, err := websocket.DefaultDialer.DialContext(ctx, e.urlWS, nil)
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
terraConnectionErrors.WithLabelValues("websocket_dial_error").Inc()
return fmt.Errorf("websocket dial failed: %w", err)
}
@ -103,6 +104,7 @@ func (e *BridgeWatcher) Run(ctx context.Context) error {
}
err = c.WriteJSON(command)
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
terraConnectionErrors.WithLabelValues("websocket_subscription_error").Inc()
return fmt.Errorf("websocket subscription failed: %w", err)
}
@ -110,6 +112,7 @@ func (e *BridgeWatcher) Run(ctx context.Context) error {
// Wait for the success response
_, _, err = c.ReadMessage()
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
terraConnectionErrors.WithLabelValues("event_subscription_error").Inc()
return fmt.Errorf("event subscription failed: %w", err)
}
@ -158,6 +161,7 @@ func (e *BridgeWatcher) Run(ctx context.Context) error {
for {
_, message, err := c.ReadMessage()
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
terraConnectionErrors.WithLabelValues("channel_read_error").Inc()
logger.Error("error reading channel", zap.Error(err))
errC <- err
@ -224,6 +228,7 @@ func (e *BridgeWatcher) Run(ctx context.Context) error {
requestURL := fmt.Sprintf("%s/wasm/contracts/%s/store?query_msg={\"guardian_set_info\":{}}", e.urlLCD, e.bridge)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil)
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
terraConnectionErrors.WithLabelValues("guardian_set_req_error").Inc()
logger.Error("query guardian set request error", zap.Error(err))
errC <- err
@ -233,6 +238,7 @@ func (e *BridgeWatcher) Run(ctx context.Context) error {
msm := time.Now()
resp, err := client.Do(req)
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
logger.Error("query guardian set response error", zap.Error(err))
errC <- err
return
@ -241,6 +247,7 @@ func (e *BridgeWatcher) Run(ctx context.Context) error {
body, err := ioutil.ReadAll(resp.Body)
queryLatency.WithLabelValues("guardian_set_info").Observe(time.Since(msm).Seconds())
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDTerra, 1)
logger.Error("query guardian set error", zap.Error(err))
errC <- err
resp.Body.Close()

View File

@ -42,6 +42,8 @@ message Heartbeat {
int64 height = 2;
// Chain-specific human-readable representation of the bridge contract address.
string bridge_address = 3;
// Connection error count
uint64 error_count = 4;
}
repeated Network networks = 4;