From 863e0e69ecae7a039e2afe4897fd10e8d54d2181 Mon Sep 17 00:00:00 2001 From: Leo Date: Wed, 28 Jul 2021 14:33:42 +0200 Subject: [PATCH] node/pkg/ethereum: configurable chain ID and name Fixes wormhole/issues#247 Change-Id: Ieb792b73970603283e4ffc4a8b9217c85964fb9f --- bridge/cmd/guardiand/bridge.go | 2 +- bridge/pkg/ethereum/watcher.go | 151 +++++++++++++++++++++------------ 2 files changed, 97 insertions(+), 56 deletions(-) diff --git a/bridge/cmd/guardiand/bridge.go b/bridge/cmd/guardiand/bridge.go index fda94f7b..6b695908 100644 --- a/bridge/cmd/guardiand/bridge.go +++ b/bridge/cmd/guardiand/bridge.go @@ -374,7 +374,7 @@ func runBridge(cmd *cobra.Command, args []string) { } if err := supervisor.Run(ctx, "ethwatch", - ethereum.NewEthBridgeWatcher(*ethRPC, ethContractAddr, lockC, setC).Run); err != nil { + ethereum.NewEthBridgeWatcher(*ethRPC, ethContractAddr, "eth", vaa.ChainIDEthereum, true, lockC, setC).Run); err != nil { return err } diff --git a/bridge/pkg/ethereum/watcher.go b/bridge/pkg/ethereum/watcher.go index 88f0c2a2..cd73a17a 100644 --- a/bridge/pkg/ethereum/watcher.go +++ b/bridge/pkg/ethereum/watcher.go @@ -30,45 +30,61 @@ var ( prometheus.CounterOpts{ Name: "wormhole_eth_connection_errors_total", Help: "Total number of Ethereum connection errors (either during initial connection or while watching)", - }, []string{"reason"}) + }, []string{"eth_network", "reason"}) - ethMessagesObserved = promauto.NewCounter( + ethMessagesObserved = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "wormhole_eth_messages_observed_total", Help: "Total number of Eth messages observed (pre-confirmation)", - }) - ethMessagesConfirmed = promauto.NewCounter( + }, []string{"eth_network"}) + ethMessagesConfirmed = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "wormhole_eth_messages_confirmed_total", Help: "Total number of Eth messages verified (post-confirmation)", - }) - guardianSetChangesConfirmed = promauto.NewCounter( + }, []string{"eth_network"}) + guardianSetChangesConfirmed = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "wormhole_eth_guardian_set_changes_confirmed_total", Help: "Total number of guardian set changes verified (we only see confirmed ones to begin with)", - }) - currentEthHeight = promauto.NewGauge( + }, []string{"eth_network"}) + currentEthHeight = promauto.NewGaugeVec( prometheus.GaugeOpts{ Name: "wormhole_eth_current_height", Help: "Current Ethereum block height", - }) + }, []string{"eth_network"}) queryLatency = promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: "wormhole_eth_query_latency", Help: "Latency histogram for Ethereum calls (note that most interactions are streaming queries, NOT calls, and we cannot measure latency for those", - }, []string{"operation"}) + }, []string{"eth_network", "operation"}) ) type ( EthBridgeWatcher struct { - url string + // Ethereum RPC url + url string + // Address of the Eth bridge contract bridge eth_common.Address + // Human-readable name of the Eth network, for logging and monitoring. + networkName string + // VAA ChainID of the network we're connecting to. + chainID vaa.ChainID + // Whether to publish a message to setChan whenever the guardian set changes. + // We currently only fetch the guardian set from one primary chain, which should + // have this flag set to true, and false on all others. + // + // The current primary chain is Ethereum (a mostly arbitrary decision because it + // has the best API - we might want to switch the primary chain to Solana once + // the governance mechanism lives there), + emitGuardianSet bool + + // Channel to send new messages to. + lockChan chan *common.MessagePublication + // Channel to send guardian set changes to. + setChan chan *common.GuardianSet pendingLocks map[eth_common.Hash]*pendingLock pendingLocksGuard sync.Mutex - - lockChan chan *common.MessagePublication - setChan chan *common.GuardianSet } pendingLock struct { @@ -77,13 +93,30 @@ type ( } ) -func NewEthBridgeWatcher(url string, bridge eth_common.Address, lockEvents chan *common.MessagePublication, setEvents chan *common.GuardianSet) *EthBridgeWatcher { - return &EthBridgeWatcher{url: url, bridge: bridge, lockChan: lockEvents, setChan: setEvents, pendingLocks: map[eth_common.Hash]*pendingLock{}} +func NewEthBridgeWatcher( + url string, + bridge eth_common.Address, + networkName string, + chainID vaa.ChainID, + emitGuardianSet bool, + lockEvents chan *common.MessagePublication, + setEvents chan *common.GuardianSet) *EthBridgeWatcher { + return &EthBridgeWatcher{ + url: url, + bridge: bridge, + networkName: networkName, + emitGuardianSet: emitGuardianSet, + chainID: chainID, + lockChan: lockEvents, + setChan: setEvents, + pendingLocks: map[eth_common.Hash]*pendingLock{}} } func (e *EthBridgeWatcher) Run(ctx context.Context) error { + logger := supervisor.Logger(ctx) + // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing) - p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDEthereum, &gossipv1.Heartbeat_Network{ + p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{ BridgeAddress: e.bridge.Hex(), }) @@ -91,7 +124,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { defer cancel() c, err := ethclient.DialContext(timeout, e.url) if err != nil { - ethConnectionErrors.WithLabelValues("dial_error").Inc() + ethConnectionErrors.WithLabelValues(e.networkName, "dial_error").Inc() return fmt.Errorf("dialing eth client failed: %w", err) } @@ -113,7 +146,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { messageC := make(chan *abi.AbiLogMessagePublished, 2) messageSub, err := f.WatchLogMessagePublished(&bind.WatchOpts{Context: timeout}, messageC, nil) if err != nil { - ethConnectionErrors.WithLabelValues("subscribe_error").Inc() + ethConnectionErrors.WithLabelValues(e.networkName, "subscribe_error").Inc() return fmt.Errorf("failed to subscribe to message publication events: %w", err) } @@ -121,40 +154,42 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { guardianSetC := make(chan *abi.AbiGuardianSetAdded, 2) guardianSetEvent, err := f.WatchGuardianSetAdded(&bind.WatchOpts{Context: timeout}, guardianSetC, nil) if err != nil { - ethConnectionErrors.WithLabelValues("subscribe_error").Inc() + ethConnectionErrors.WithLabelValues(e.networkName, "subscribe_error").Inc() return fmt.Errorf("failed to subscribe to guardian set events: %w", err) } - errC := make(chan error) - logger := supervisor.Logger(ctx) - - // Get initial validator set from Ethereum. We could also fetch it from Solana, - // because both sets are synchronized, we simply made an arbitrary decision to use Ethereum. + // Get initial validator set. timeout, cancel = context.WithTimeout(ctx, 15*time.Second) defer cancel() idx, gs, err := FetchCurrentGuardianSet(timeout, e.url, e.bridge) if err != nil { - ethConnectionErrors.WithLabelValues("guardian_set_fetch_error").Inc() + ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc() return fmt.Errorf("failed requesting guardian set from Ethereum: %w", err) } - logger.Info("initial guardian set fetched", zap.Any("value", gs), zap.Uint32("index", idx)) - e.setChan <- &common.GuardianSet{ - Keys: gs.Keys, - Index: idx, + logger.Info("initial guardian set fetched", + zap.Any("value", gs), zap.Uint32("index", idx), + zap.String("eth_network", e.networkName)) + + if e.emitGuardianSet { + e.setChan <- &common.GuardianSet{ + Keys: gs.Keys, + Index: idx, + } } + errC := make(chan error) go func() { for { select { case <-ctx.Done(): return - case e := <-messageSub.Err(): - ethConnectionErrors.WithLabelValues("subscription_error").Inc() - errC <- fmt.Errorf("error while processing message publication subscription: %w", e) + case err := <-messageSub.Err(): + ethConnectionErrors.WithLabelValues(e.networkName, "subscription_error").Inc() + errC <- fmt.Errorf("error while processing message publication subscription: %w", err) return - case e := <-guardianSetEvent.Err(): - ethConnectionErrors.WithLabelValues("subscription_error").Inc() - errC <- fmt.Errorf("error while processing guardian set subscription: %w", e) + case err := <-guardianSetEvent.Err(): + ethConnectionErrors.WithLabelValues(e.networkName, "subscription_error").Inc() + errC <- fmt.Errorf("error while processing guardian set subscription: %w", err) return case ev := <-messageC: // Request timestamp for block @@ -162,10 +197,10 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { timeout, cancel = context.WithTimeout(ctx, 15*time.Second) b, err := c.BlockByNumber(timeout, big.NewInt(int64(ev.Raw.BlockNumber))) cancel() - queryLatency.WithLabelValues("block_by_number").Observe(time.Since(msm).Seconds()) + queryLatency.WithLabelValues(e.networkName, "block_by_number").Observe(time.Since(msm).Seconds()) if err != nil { - ethConnectionErrors.WithLabelValues("block_by_number_error").Inc() + ethConnectionErrors.WithLabelValues(e.networkName, "block_by_number_error").Inc() errC <- fmt.Errorf("failed to request timestamp for block %d: %w", ev.Raw.BlockNumber, err) return } @@ -175,16 +210,16 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { Timestamp: time.Unix(int64(b.Time()), 0), Nonce: ev.Nonce, Sequence: ev.Sequence, - EmitterChain: vaa.ChainIDEthereum, + EmitterChain: e.chainID, EmitterAddress: PadAddress(ev.Sender), Payload: ev.Payload, ConsistencyLevel: ev.ConsistencyLevel, } logger.Info("found new message publication transaction", zap.Stringer("tx", ev.Raw.TxHash), - zap.Uint64("block", ev.Raw.BlockNumber)) + zap.Uint64("block", ev.Raw.BlockNumber), zap.String("eth_network", e.networkName)) - ethMessagesObserved.Inc() + ethMessagesObserved.WithLabelValues(e.networkName).Inc() e.pendingLocksGuard.Lock() e.pendingLocks[ev.Raw.TxHash] = &pendingLock{ @@ -194,15 +229,15 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { e.pendingLocksGuard.Unlock() case ev := <-guardianSetC: logger.Info("guardian set has changed, fetching new value", - zap.Uint32("new_index", ev.Index)) + zap.Uint32("new_index", ev.Index), zap.String("eth_network", e.networkName)) - guardianSetChangesConfirmed.Inc() + guardianSetChangesConfirmed.WithLabelValues(e.networkName).Inc() msm := time.Now() timeout, cancel = context.WithTimeout(ctx, 15*time.Second) gs, err := caller.GetGuardianSet(&bind.CallOpts{Context: timeout}, ev.Index) cancel() - queryLatency.WithLabelValues("get_guardian_set").Observe(time.Since(msm).Seconds()) + queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds()) if err != nil { // We failed to process the guardian set update and are now out of sync with the chain. // Recover by crashing the runnable, which causes the guardian set to be re-fetched. @@ -210,10 +245,15 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { return } - logger.Info("new guardian set fetched", zap.Any("value", gs), zap.Uint32("index", ev.Index)) - e.setChan <- &common.GuardianSet{ - Keys: gs.Keys, - Index: ev.Index, + logger.Info("new guardian set fetched", + zap.Any("value", gs), zap.Uint32("index", ev.Index), + zap.String("eth_network", e.networkName)) + + if e.emitGuardianSet { + e.setChan <- &common.GuardianSet{ + Keys: gs.Keys, + Index: ev.Index, + } } } } @@ -236,10 +276,11 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { return case ev := <-headSink: start := time.Now() - logger.Info("processing new header", zap.Stringer("block", ev.Number)) - currentEthHeight.Set(float64(ev.Number.Int64())) + logger.Info("processing new header", zap.Stringer("block", ev.Number), + zap.String("eth_network", e.networkName)) + currentEthHeight.WithLabelValues(e.networkName).Set(float64(ev.Number.Int64())) readiness.SetReady(common.ReadinessEthSyncing) - p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDEthereum, &gossipv1.Heartbeat_Network{ + p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{ Height: ev.Number.Int64(), BridgeAddress: e.bridge.Hex(), }) @@ -252,7 +293,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { // Transaction was dropped and never picked up again if pLock.height+4*uint64(pLock.lock.ConsistencyLevel) <= blockNumberU { logger.Debug("observation timed out", zap.Stringer("tx", pLock.lock.TxHash), - zap.Stringer("block", ev.Number)) + zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) delete(e.pendingLocks, hash) continue } @@ -260,16 +301,16 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error { // Transaction is now ready if pLock.height+uint64(pLock.lock.ConsistencyLevel) <= ev.Number.Uint64() { logger.Debug("observation confirmed", zap.Stringer("tx", pLock.lock.TxHash), - zap.Stringer("block", ev.Number)) + zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName)) delete(e.pendingLocks, hash) e.lockChan <- pLock.lock - ethMessagesConfirmed.Inc() + ethMessagesConfirmed.WithLabelValues(e.networkName).Inc() } } e.pendingLocksGuard.Unlock() logger.Info("processed new header", zap.Stringer("block", ev.Number), - zap.Duration("took", time.Since(start))) + zap.Duration("took", time.Since(start)), zap.String("eth_network", e.networkName)) } } }()