diff --git a/bridge/pkg/ethereum/watcher.go b/bridge/pkg/ethereum/watcher.go index eaad0a0b9..f5805f297 100644 --- a/bridge/pkg/ethereum/watcher.go +++ b/bridge/pkg/ethereum/watcher.go @@ -42,11 +42,6 @@ var ( Name: "wormhole_eth_messages_confirmed_total", Help: "Total number of Eth messages verified (post-confirmation)", }, []string{"eth_network"}) - guardianSetChangesConfirmed = promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: "wormhole_eth_guardian_set_changes_confirmed_total", - Help: "Total number of guardian set changes verified (we only see confirmed ones to begin with)", - }, []string{"eth_network"}) currentEthHeight = promauto.NewGaugeVec( prometheus.GaugeOpts{ Name: "wormhole_eth_current_height", @@ -155,34 +150,54 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { return fmt.Errorf("failed to subscribe to message publication events: %w", err) } - // Subscribe to guardian set changes - guardianSetC := make(chan *abi.AbiGuardianSetAdded, 2) - guardianSetEvent, err := f.WatchGuardianSetAdded(&bind.WatchOpts{Context: timeout}, guardianSetC, nil) - if err != nil { - ethConnectionErrors.WithLabelValues(e.networkName, "subscribe_error").Inc() - p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) - return fmt.Errorf("failed to subscribe to guardian set events: %w", err) - } + // 0 is a valid guardian set, so we need a nil value here + var currentGuardianSet *uint32 - // Get initial validator set. - timeout, cancel = context.WithTimeout(ctx, 15*time.Second) - defer cancel() - idx, gs, err := FetchCurrentGuardianSet(timeout, e.url, e.bridge) - if err != nil { - ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc() - p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) - return fmt.Errorf("failed requesting guardian set from Ethereum: %w", err) - } - logger.Info("initial guardian set fetched", - zap.Any("value", gs), zap.Uint32("index", idx), - zap.String("eth_network", e.networkName)) + // Poll for guardian set. + go func() { + t := time.NewTicker(15 * time.Second) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + msm := time.Now() + logger.Info("fetching guardian set") + timeout, cancel = context.WithTimeout(ctx, 15*time.Second) + idx, gs, err := fetchCurrentGuardianSet(timeout, caller) + if err != nil { + ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc() + p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + logger.Error("failed requesting guardian set", + zap.Error(err), zap.String("eth_network", e.networkName)) + cancel() + continue + } - if e.setChan != nil { - e.setChan <- &common.GuardianSet{ - Keys: gs.Keys, - Index: idx, + queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds()) + + cancel() + + if currentGuardianSet != nil && *currentGuardianSet == idx { + continue + } + + logger.Info("updated guardian set found", + zap.Any("value", gs), zap.Uint32("index", idx), + zap.String("eth_network", e.networkName)) + + currentGuardianSet = &idx + + if e.setChan != nil { + e.setChan <- &common.GuardianSet{ + Keys: gs.Keys, + Index: idx, + } + } + } } - } + }() errC := make(chan error) go func() { @@ -195,11 +210,6 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { errC <- fmt.Errorf("error while processing message publication subscription: %w", err) p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) return - case err := <-guardianSetEvent.Err(): - ethConnectionErrors.WithLabelValues(e.networkName, "subscription_error").Inc() - errC <- fmt.Errorf("error while processing guardian set subscription: %w", err) - p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) - return case ev := <-messageC: // Request timestamp for block msm := time.Now() @@ -237,34 +247,6 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { height: ev.Raw.BlockNumber, } e.pendingMu.Unlock() - case ev := <-guardianSetC: - logger.Info("guardian set has changed, fetching new value", - zap.Uint32("new_index", ev.Index), zap.String("eth_network", e.networkName)) - - guardianSetChangesConfirmed.WithLabelValues(e.networkName).Inc() - - msm := time.Now() - timeout, cancel = context.WithTimeout(ctx, 15*time.Second) - gs, err := caller.GetGuardianSet(&bind.CallOpts{Context: timeout}, ev.Index) - cancel() - queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds()) - if err != nil { - // We failed to process the guardian set update and are now out of sync with the chain. - // Recover by crashing the runnable, which causes the guardian set to be re-fetched. - errC <- fmt.Errorf("error requesting new guardian set value for %d: %w", ev.Index, err) - return - } - - logger.Info("new guardian set fetched", - zap.Any("value", gs), zap.Uint32("index", ev.Index), - zap.String("eth_network", e.networkName)) - - if e.setChan != nil { - e.setChan <- &common.GuardianSet{ - Keys: gs.Keys, - Index: ev.Index, - } - } } } }() @@ -338,17 +320,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { } // Fetch the current guardian set ID and guardian set from the chain. -func FetchCurrentGuardianSet(ctx context.Context, rpcURL string, bridgeContract eth_common.Address) (uint32, *abi.StructsGuardianSet, error) { - c, err := ethclient.DialContext(ctx, rpcURL) - if err != nil { - return 0, nil, fmt.Errorf("dialing eth client failed: %w", err) - } - - caller, err := abi.NewAbiCaller(bridgeContract, c) - if err != nil { - panic(err) - } - +func fetchCurrentGuardianSet(ctx context.Context, caller *abi.AbiCaller) (uint32, *abi.StructsGuardianSet, error) { opts := &bind.CallOpts{Context: ctx} currentIndex, err := caller.GetCurrentGuardianSetIndex(opts)