node/pkg/ethereum: configurable chain ID and name

Fixes wormhole/issues#247

Change-Id: Ieb792b73970603283e4ffc4a8b9217c85964fb9f
This commit is contained in:
Leo 2021-07-28 14:33:42 +02:00 committed by Leopold Schabel
parent 2c56a916eb
commit 863e0e69ec
2 changed files with 97 additions and 56 deletions

View File

@ -374,7 +374,7 @@ func runBridge(cmd *cobra.Command, args []string) {
}
if err := supervisor.Run(ctx, "ethwatch",
ethereum.NewEthBridgeWatcher(*ethRPC, ethContractAddr, lockC, setC).Run); err != nil {
ethereum.NewEthBridgeWatcher(*ethRPC, ethContractAddr, "eth", vaa.ChainIDEthereum, true, lockC, setC).Run); err != nil {
return err
}

View File

@ -30,45 +30,61 @@ var (
prometheus.CounterOpts{
Name: "wormhole_eth_connection_errors_total",
Help: "Total number of Ethereum connection errors (either during initial connection or while watching)",
}, []string{"reason"})
}, []string{"eth_network", "reason"})
ethMessagesObserved = promauto.NewCounter(
ethMessagesObserved = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_eth_messages_observed_total",
Help: "Total number of Eth messages observed (pre-confirmation)",
})
ethMessagesConfirmed = promauto.NewCounter(
}, []string{"eth_network"})
ethMessagesConfirmed = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_eth_messages_confirmed_total",
Help: "Total number of Eth messages verified (post-confirmation)",
})
guardianSetChangesConfirmed = promauto.NewCounter(
}, []string{"eth_network"})
guardianSetChangesConfirmed = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_eth_guardian_set_changes_confirmed_total",
Help: "Total number of guardian set changes verified (we only see confirmed ones to begin with)",
})
currentEthHeight = promauto.NewGauge(
}, []string{"eth_network"})
currentEthHeight = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "wormhole_eth_current_height",
Help: "Current Ethereum block height",
})
}, []string{"eth_network"})
queryLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "wormhole_eth_query_latency",
Help: "Latency histogram for Ethereum calls (note that most interactions are streaming queries, NOT calls, and we cannot measure latency for those",
}, []string{"operation"})
}, []string{"eth_network", "operation"})
)
type (
EthBridgeWatcher struct {
url string
// Ethereum RPC url
url string
// Address of the Eth bridge contract
bridge eth_common.Address
// Human-readable name of the Eth network, for logging and monitoring.
networkName string
// VAA ChainID of the network we're connecting to.
chainID vaa.ChainID
// Whether to publish a message to setChan whenever the guardian set changes.
// We currently only fetch the guardian set from one primary chain, which should
// have this flag set to true, and false on all others.
//
// The current primary chain is Ethereum (a mostly arbitrary decision because it
// has the best API - we might want to switch the primary chain to Solana once
// the governance mechanism lives there),
emitGuardianSet bool
// Channel to send new messages to.
lockChan chan *common.MessagePublication
// Channel to send guardian set changes to.
setChan chan *common.GuardianSet
pendingLocks map[eth_common.Hash]*pendingLock
pendingLocksGuard sync.Mutex
lockChan chan *common.MessagePublication
setChan chan *common.GuardianSet
}
pendingLock struct {
@ -77,13 +93,30 @@ type (
}
)
func NewEthBridgeWatcher(url string, bridge eth_common.Address, lockEvents chan *common.MessagePublication, setEvents chan *common.GuardianSet) *EthBridgeWatcher {
return &EthBridgeWatcher{url: url, bridge: bridge, lockChan: lockEvents, setChan: setEvents, pendingLocks: map[eth_common.Hash]*pendingLock{}}
func NewEthBridgeWatcher(
url string,
bridge eth_common.Address,
networkName string,
chainID vaa.ChainID,
emitGuardianSet bool,
lockEvents chan *common.MessagePublication,
setEvents chan *common.GuardianSet) *EthBridgeWatcher {
return &EthBridgeWatcher{
url: url,
bridge: bridge,
networkName: networkName,
emitGuardianSet: emitGuardianSet,
chainID: chainID,
lockChan: lockEvents,
setChan: setEvents,
pendingLocks: map[eth_common.Hash]*pendingLock{}}
}
func (e *EthBridgeWatcher) Run(ctx context.Context) error {
logger := supervisor.Logger(ctx)
// Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDEthereum, &gossipv1.Heartbeat_Network{
p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{
BridgeAddress: e.bridge.Hex(),
})
@ -91,7 +124,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
defer cancel()
c, err := ethclient.DialContext(timeout, e.url)
if err != nil {
ethConnectionErrors.WithLabelValues("dial_error").Inc()
ethConnectionErrors.WithLabelValues(e.networkName, "dial_error").Inc()
return fmt.Errorf("dialing eth client failed: %w", err)
}
@ -113,7 +146,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
messageC := make(chan *abi.AbiLogMessagePublished, 2)
messageSub, err := f.WatchLogMessagePublished(&bind.WatchOpts{Context: timeout}, messageC, nil)
if err != nil {
ethConnectionErrors.WithLabelValues("subscribe_error").Inc()
ethConnectionErrors.WithLabelValues(e.networkName, "subscribe_error").Inc()
return fmt.Errorf("failed to subscribe to message publication events: %w", err)
}
@ -121,40 +154,42 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
guardianSetC := make(chan *abi.AbiGuardianSetAdded, 2)
guardianSetEvent, err := f.WatchGuardianSetAdded(&bind.WatchOpts{Context: timeout}, guardianSetC, nil)
if err != nil {
ethConnectionErrors.WithLabelValues("subscribe_error").Inc()
ethConnectionErrors.WithLabelValues(e.networkName, "subscribe_error").Inc()
return fmt.Errorf("failed to subscribe to guardian set events: %w", err)
}
errC := make(chan error)
logger := supervisor.Logger(ctx)
// Get initial validator set from Ethereum. We could also fetch it from Solana,
// because both sets are synchronized, we simply made an arbitrary decision to use Ethereum.
// Get initial validator set.
timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
defer cancel()
idx, gs, err := FetchCurrentGuardianSet(timeout, e.url, e.bridge)
if err != nil {
ethConnectionErrors.WithLabelValues("guardian_set_fetch_error").Inc()
ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc()
return fmt.Errorf("failed requesting guardian set from Ethereum: %w", err)
}
logger.Info("initial guardian set fetched", zap.Any("value", gs), zap.Uint32("index", idx))
e.setChan <- &common.GuardianSet{
Keys: gs.Keys,
Index: idx,
logger.Info("initial guardian set fetched",
zap.Any("value", gs), zap.Uint32("index", idx),
zap.String("eth_network", e.networkName))
if e.emitGuardianSet {
e.setChan <- &common.GuardianSet{
Keys: gs.Keys,
Index: idx,
}
}
errC := make(chan error)
go func() {
for {
select {
case <-ctx.Done():
return
case e := <-messageSub.Err():
ethConnectionErrors.WithLabelValues("subscription_error").Inc()
errC <- fmt.Errorf("error while processing message publication subscription: %w", e)
case err := <-messageSub.Err():
ethConnectionErrors.WithLabelValues(e.networkName, "subscription_error").Inc()
errC <- fmt.Errorf("error while processing message publication subscription: %w", err)
return
case e := <-guardianSetEvent.Err():
ethConnectionErrors.WithLabelValues("subscription_error").Inc()
errC <- fmt.Errorf("error while processing guardian set subscription: %w", e)
case err := <-guardianSetEvent.Err():
ethConnectionErrors.WithLabelValues(e.networkName, "subscription_error").Inc()
errC <- fmt.Errorf("error while processing guardian set subscription: %w", err)
return
case ev := <-messageC:
// Request timestamp for block
@ -162,10 +197,10 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
b, err := c.BlockByNumber(timeout, big.NewInt(int64(ev.Raw.BlockNumber)))
cancel()
queryLatency.WithLabelValues("block_by_number").Observe(time.Since(msm).Seconds())
queryLatency.WithLabelValues(e.networkName, "block_by_number").Observe(time.Since(msm).Seconds())
if err != nil {
ethConnectionErrors.WithLabelValues("block_by_number_error").Inc()
ethConnectionErrors.WithLabelValues(e.networkName, "block_by_number_error").Inc()
errC <- fmt.Errorf("failed to request timestamp for block %d: %w", ev.Raw.BlockNumber, err)
return
}
@ -175,16 +210,16 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
Timestamp: time.Unix(int64(b.Time()), 0),
Nonce: ev.Nonce,
Sequence: ev.Sequence,
EmitterChain: vaa.ChainIDEthereum,
EmitterChain: e.chainID,
EmitterAddress: PadAddress(ev.Sender),
Payload: ev.Payload,
ConsistencyLevel: ev.ConsistencyLevel,
}
logger.Info("found new message publication transaction", zap.Stringer("tx", ev.Raw.TxHash),
zap.Uint64("block", ev.Raw.BlockNumber))
zap.Uint64("block", ev.Raw.BlockNumber), zap.String("eth_network", e.networkName))
ethMessagesObserved.Inc()
ethMessagesObserved.WithLabelValues(e.networkName).Inc()
e.pendingLocksGuard.Lock()
e.pendingLocks[ev.Raw.TxHash] = &pendingLock{
@ -194,15 +229,15 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
e.pendingLocksGuard.Unlock()
case ev := <-guardianSetC:
logger.Info("guardian set has changed, fetching new value",
zap.Uint32("new_index", ev.Index))
zap.Uint32("new_index", ev.Index), zap.String("eth_network", e.networkName))
guardianSetChangesConfirmed.Inc()
guardianSetChangesConfirmed.WithLabelValues(e.networkName).Inc()
msm := time.Now()
timeout, cancel = context.WithTimeout(ctx, 15*time.Second)
gs, err := caller.GetGuardianSet(&bind.CallOpts{Context: timeout}, ev.Index)
cancel()
queryLatency.WithLabelValues("get_guardian_set").Observe(time.Since(msm).Seconds())
queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())
if err != nil {
// We failed to process the guardian set update and are now out of sync with the chain.
// Recover by crashing the runnable, which causes the guardian set to be re-fetched.
@ -210,10 +245,15 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
return
}
logger.Info("new guardian set fetched", zap.Any("value", gs), zap.Uint32("index", ev.Index))
e.setChan <- &common.GuardianSet{
Keys: gs.Keys,
Index: ev.Index,
logger.Info("new guardian set fetched",
zap.Any("value", gs), zap.Uint32("index", ev.Index),
zap.String("eth_network", e.networkName))
if e.emitGuardianSet {
e.setChan <- &common.GuardianSet{
Keys: gs.Keys,
Index: ev.Index,
}
}
}
}
@ -236,10 +276,11 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
return
case ev := <-headSink:
start := time.Now()
logger.Info("processing new header", zap.Stringer("block", ev.Number))
currentEthHeight.Set(float64(ev.Number.Int64()))
logger.Info("processing new header", zap.Stringer("block", ev.Number),
zap.String("eth_network", e.networkName))
currentEthHeight.WithLabelValues(e.networkName).Set(float64(ev.Number.Int64()))
readiness.SetReady(common.ReadinessEthSyncing)
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDEthereum, &gossipv1.Heartbeat_Network{
p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{
Height: ev.Number.Int64(),
BridgeAddress: e.bridge.Hex(),
})
@ -252,7 +293,7 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
// Transaction was dropped and never picked up again
if pLock.height+4*uint64(pLock.lock.ConsistencyLevel) <= blockNumberU {
logger.Debug("observation timed out", zap.Stringer("tx", pLock.lock.TxHash),
zap.Stringer("block", ev.Number))
zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName))
delete(e.pendingLocks, hash)
continue
}
@ -260,16 +301,16 @@ func (e *EthBridgeWatcher) Run(ctx context.Context) error {
// Transaction is now ready
if pLock.height+uint64(pLock.lock.ConsistencyLevel) <= ev.Number.Uint64() {
logger.Debug("observation confirmed", zap.Stringer("tx", pLock.lock.TxHash),
zap.Stringer("block", ev.Number))
zap.Stringer("block", ev.Number), zap.String("eth_network", e.networkName))
delete(e.pendingLocks, hash)
e.lockChan <- pLock.lock
ethMessagesConfirmed.Inc()
ethMessagesConfirmed.WithLabelValues(e.networkName).Inc()
}
}
e.pendingLocksGuard.Unlock()
logger.Info("processed new header", zap.Stringer("block", ev.Number),
zap.Duration("took", time.Since(start)))
zap.Duration("took", time.Since(start)), zap.String("eth_network", e.networkName))
}
}
}()