node/pkg/solana: run one Solana watcher per consistency level

certusone/wormhole#248

Change-Id: I98abc6b4e635b8b5679fcda5342c90b0e5c96077
This commit is contained in:
Leo 2021-07-29 14:24:35 +02:00 committed by Leopold Schabel
parent 8b5e82df1b
commit 5bfa3b0055
2 changed files with 40 additions and 20 deletions

View File

@ -3,6 +3,7 @@ package guardiand
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/gagliardetto/solana-go/rpc"
"log" "log"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
@ -404,8 +405,13 @@ func runBridge(cmd *cobra.Command, args []string) {
return err return err
} }
if err := supervisor.Run(ctx, "solwatch", if err := supervisor.Run(ctx, "solwatch-confirmed",
solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solBridgeAddress, lockC).Run); err != nil { solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solBridgeAddress, lockC, rpc.CommitmentConfirmed).Run); err != nil {
return err
}
if err := supervisor.Run(ctx, "solwatch-finalized",
solana.NewSolanaWatcher(*solanaWsRPC, *solanaRPC, solBridgeAddress, lockC, rpc.CommitmentFinalized).Run); err != nil {
return err return err
} }

View File

@ -23,6 +23,7 @@ type SolanaWatcher struct {
bridge solana.PublicKey bridge solana.PublicKey
wsUrl string wsUrl string
rpcUrl string rpcUrl string
commitment rpc.CommitmentType
messageEvent chan *common.MessagePublication messageEvent chan *common.MessagePublication
} }
@ -31,7 +32,7 @@ var (
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "wormhole_solana_connection_errors_total", Name: "wormhole_solana_connection_errors_total",
Help: "Total number of Solana connection errors", Help: "Total number of Solana connection errors",
}, []string{"reason"}) }, []string{"commitment", "reason"})
solanaAccountSkips = promauto.NewCounterVec( solanaAccountSkips = promauto.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "wormhole_solana_account_updates_skipped_total", Name: "wormhole_solana_account_updates_skipped_total",
@ -42,11 +43,11 @@ var (
Name: "wormhole_solana_observations_confirmed_total", Name: "wormhole_solana_observations_confirmed_total",
Help: "Total number of verified Solana observations found", Help: "Total number of verified Solana observations found",
}) })
currentSolanaHeight = promauto.NewGauge( currentSolanaHeight = promauto.NewGaugeVec(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Name: "wormhole_solana_current_height", Name: "wormhole_solana_current_height",
Help: "Current Solana slot height (at default commitment level, not the level used for observations)", Help: "Current Solana slot height",
}) }, []string{"commitment"})
queryLatency = promauto.NewHistogramVec( queryLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{ prometheus.HistogramOpts{
Name: "wormhole_solana_query_latency", Name: "wormhole_solana_query_latency",
@ -56,8 +57,23 @@ var (
const rpcTimeout = time.Second * 5 const rpcTimeout = time.Second * 5
func NewSolanaWatcher(wsUrl, rpcUrl string, bridgeAddress solana.PublicKey, messageEvents chan *common.MessagePublication) *SolanaWatcher { // Mappings from consistency levels constants to commitment level.
return &SolanaWatcher{bridge: bridgeAddress, wsUrl: wsUrl, rpcUrl: rpcUrl, messageEvent: messageEvents} const (
consistencyLevelConfirmed = 1
consistencyLevelFinalized = 32
)
func NewSolanaWatcher(
wsUrl, rpcUrl string,
bridgeAddress solana.PublicKey,
messageEvents chan *common.MessagePublication,
commitment rpc.CommitmentType) *SolanaWatcher {
return &SolanaWatcher{
bridge: bridgeAddress,
wsUrl: wsUrl, rpcUrl: rpcUrl,
messageEvent: messageEvents,
commitment: commitment,
}
} }
func (s *SolanaWatcher) Run(ctx context.Context) error { func (s *SolanaWatcher) Run(ctx context.Context) error {
@ -81,30 +97,28 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-timer.C: case <-timer.C:
commitment := rpc.CommitmentFinalized
// Get current slot height // Get current slot height
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
defer cancel() defer cancel()
start := time.Now() start := time.Now()
slot, err := rpcClient.GetSlot(rCtx, commitment) slot, err := rpcClient.GetSlot(rCtx, s.commitment)
queryLatency.WithLabelValues("get_slot", string(commitment)).Observe(time.Since(start).Seconds()) queryLatency.WithLabelValues("get_slot", string(s.commitment)).Observe(time.Since(start).Seconds())
if err != nil { if err != nil {
solanaConnectionErrors.WithLabelValues("get_slot_error").Inc() solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_slot_error").Inc()
errC <- err errC <- err
return return
} }
if lastSlot == 0 { if lastSlot == 0 {
lastSlot = slot - 1 lastSlot = slot - 1
} }
currentSolanaHeight.Set(float64(slot)) currentSolanaHeight.WithLabelValues(string(s.commitment)).Set(float64(slot))
readiness.SetReady(common.ReadinessSolanaSyncing) readiness.SetReady(common.ReadinessSolanaSyncing)
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{ p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSolana, &gossipv1.Heartbeat_Network{
Height: int64(slot), Height: int64(slot),
BridgeAddress: bridgeAddr, BridgeAddress: bridgeAddr,
}) })
logger.Info("fetched current Solana height", logger.Info("fetched current Solana height",
zap.String("commitment", string(commitment)), zap.String("commitment", string(s.commitment)),
zap.Uint64("slot", slot), zap.Uint64("slot", slot),
zap.Uint64("lastSlot", lastSlot), zap.Uint64("lastSlot", lastSlot),
zap.Uint64("pendingSlots", slot-lastSlot), zap.Uint64("pendingSlots", slot-lastSlot),
@ -119,10 +133,10 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
rCtx, cancel = context.WithTimeout(ctx, rpcTimeout) rCtx, cancel = context.WithTimeout(ctx, rpcTimeout)
defer cancel() defer cancel()
start = time.Now() start = time.Now()
slots, err := rpcClient.GetConfirmedBlocks(rCtx, rangeStart, &rangeEnd, commitment) slots, err := rpcClient.GetConfirmedBlocks(rCtx, rangeStart, &rangeEnd, s.commitment)
queryLatency.WithLabelValues("get_confirmed_blocks", string(commitment)).Observe(time.Since(start).Seconds()) queryLatency.WithLabelValues("get_confirmed_blocks", string(s.commitment)).Observe(time.Since(start).Seconds())
if err != nil { if err != nil {
solanaConnectionErrors.WithLabelValues("get_confirmed_blocks_error").Inc() solanaConnectionErrors.WithLabelValues(string(s.commitment), "get_confirmed_blocks_error").Inc()
errC <- err errC <- err
return return
} }
@ -130,7 +144,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
logger.Info("fetched slots in range", logger.Info("fetched slots in range",
zap.Uint64("from", rangeStart), zap.Uint64("to", rangeEnd), zap.Uint64("from", rangeStart), zap.Uint64("to", rangeEnd),
zap.Duration("took", time.Since(start)), zap.Duration("took", time.Since(start)),
zap.String("commitment", string(commitment))) zap.String("commitment", string(s.commitment)))
// Requesting each slot // Requesting each slot
for _, slot := range slots { for _, slot := range slots {
@ -140,7 +154,7 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
continue continue
} }
go s.fetchBlock(ctx, logger, commitment, rpcClient, slot) go s.fetchBlock(ctx, logger, s.commitment, rpcClient, slot)
} }
lastSlot = slot lastSlot = slot