node/pkg/ethereum: poll for guardian set changes

The new set of Eth contracts no longer emits an event.

Change-Id: I6c3654c88960b08b5548ed72cf09e555b079ef3a
This commit is contained in:
Leo 2021-08-10 17:35:18 +02:00
parent 08e70a5aaf
commit 212e04a72d
1 changed files with 46 additions and 74 deletions

View File

@ -42,11 +42,6 @@ var (
Name: "wormhole_eth_messages_confirmed_total", Name: "wormhole_eth_messages_confirmed_total",
Help: "Total number of Eth messages verified (post-confirmation)", Help: "Total number of Eth messages verified (post-confirmation)",
}, []string{"eth_network"}) }, []string{"eth_network"})
guardianSetChangesConfirmed = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_eth_guardian_set_changes_confirmed_total",
Help: "Total number of guardian set changes verified (we only see confirmed ones to begin with)",
}, []string{"eth_network"})
currentEthHeight = promauto.NewGaugeVec( currentEthHeight = promauto.NewGaugeVec(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Name: "wormhole_eth_current_height", Name: "wormhole_eth_current_height",
@ -155,34 +150,54 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
return fmt.Errorf("failed to subscribe to message publication events: %w", err) return fmt.Errorf("failed to subscribe to message publication events: %w", err)
} }
// Subscribe to guardian set changes // 0 is a valid guardian set, so we need a nil value here
guardianSetC := make(chan *abi.AbiGuardianSetAdded, 2) var currentGuardianSet *uint32
guardianSetEvent, err := f.WatchGuardianSetAdded(&bind.WatchOpts{Context: timeout}, guardianSetC, nil)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "subscribe_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return fmt.Errorf("failed to subscribe to guardian set events: %w", err)
}
// Get initial validator set. // Poll for guardian set.
timeout, cancel = context.WithTimeout(ctx, 15*time.Second) go func() {
defer cancel() t := time.NewTicker(15 * time.Second)
idx, gs, err := FetchCurrentGuardianSet(timeout, e.url, e.bridge) defer t.Stop()
if err != nil { for {
ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc() select {
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) case <-ctx.Done():
return fmt.Errorf("failed requesting guardian set from Ethereum: %w", err) return
} case <-t.C:
logger.Info("initial guardian set fetched", msm := time.Now()
zap.Any("value", gs), zap.Uint32("index", idx), logger.Info("fetching guardian set")
zap.String("eth_network", e.networkName)) timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
idx, gs, err := fetchCurrentGuardianSet(timeout, caller)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
logger.Error("failed requesting guardian set",
zap.Error(err), zap.String("eth_network", e.networkName))
cancel()
continue
}
if e.setChan != nil { queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())
e.setChan <- &common.GuardianSet{
Keys: gs.Keys, cancel()
Index: idx,
if currentGuardianSet != nil && *currentGuardianSet == idx {
continue
}
logger.Info("updated guardian set found",
zap.Any("value", gs), zap.Uint32("index", idx),
zap.String("eth_network", e.networkName))
currentGuardianSet = &idx
if e.setChan != nil {
e.setChan <- &common.GuardianSet{
Keys: gs.Keys,
Index: idx,
}
}
}
} }
} }()
errC := make(chan error) errC := make(chan error)
go func() { go func() {
@ -195,11 +210,6 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
errC <- fmt.Errorf("error while processing message publication subscription: %w", err) errC <- fmt.Errorf("error while processing message publication subscription: %w", err)
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return return
case err := <-guardianSetEvent.Err():
ethConnectionErrors.WithLabelValues(e.networkName, "subscription_error").Inc()
errC <- fmt.Errorf("error while processing guardian set subscription: %w", err)
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
return
case ev := <-messageC: case ev := <-messageC:
// Request timestamp for block // Request timestamp for block
msm := time.Now() msm := time.Now()
@ -237,34 +247,6 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
height: ev.Raw.BlockNumber, height: ev.Raw.BlockNumber,
} }
e.pendingMu.Unlock() e.pendingMu.Unlock()
case ev := <-guardianSetC:
logger.Info("guardian set has changed, fetching new value",
zap.Uint32("new_index", ev.Index), zap.String("eth_network", e.networkName))
guardianSetChangesConfirmed.WithLabelValues(e.networkName).Inc()
msm := time.Now()
timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
gs, err := caller.GetGuardianSet(&bind.CallOpts{Context: timeout}, ev.Index)
cancel()
queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())
if err != nil {
// We failed to process the guardian set update and are now out of sync with the chain.
// Recover by crashing the runnable, which causes the guardian set to be re-fetched.
errC <- fmt.Errorf("error requesting new guardian set value for %d: %w", ev.Index, err)
return
}
logger.Info("new guardian set fetched",
zap.Any("value", gs), zap.Uint32("index", ev.Index),
zap.String("eth_network", e.networkName))
if e.setChan != nil {
e.setChan <- &common.GuardianSet{
Keys: gs.Keys,
Index: ev.Index,
}
}
} }
} }
}() }()
@ -338,17 +320,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
} }
// Fetch the current guardian set ID and guardian set from the chain. // Fetch the current guardian set ID and guardian set from the chain.
func FetchCurrentGuardianSet(ctx context.Context, rpcURL string, bridgeContract eth_common.Address) (uint32, *abi.StructsGuardianSet, error) { func fetchCurrentGuardianSet(ctx context.Context, caller *abi.AbiCaller) (uint32, *abi.StructsGuardianSet, error) {
c, err := ethclient.DialContext(ctx, rpcURL)
if err != nil {
return 0, nil, fmt.Errorf("dialing eth client failed: %w", err)
}
caller, err := abi.NewAbiCaller(bridgeContract, c)
if err != nil {
panic(err)
}
opts := &bind.CallOpts{Context: ctx} opts := &bind.CallOpts{Context: ctx}
currentIndex, err := caller.GetCurrentGuardianSetIndex(opts) currentIndex, err := caller.GetCurrentGuardianSetIndex(opts)