2022-09-28 04:15:57 -07:00
package evm
2020-08-06 06:43:09 -07:00
import (
"context"
"fmt"
2022-06-14 07:22:49 -07:00
"sync"
"sync/atomic"
"time"
2022-09-28 06:27:13 -07:00
"github.com/certusone/wormhole/node/pkg/watchers/evm/connectors"
"github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi"
"github.com/certusone/wormhole/node/pkg/watchers/evm/finalizers"
2022-11-14 06:07:45 -08:00
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
2022-09-28 04:15:57 -07:00
2021-08-26 01:35:09 -07:00
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
2022-01-06 03:01:33 -08:00
"github.com/ethereum/go-ethereum/rpc"
2021-07-23 07:06:35 -07:00
"github.com/prometheus/client_golang/prometheus/promauto"
2020-08-17 10:29:25 -07:00
2021-02-01 11:38:13 -08:00
"github.com/prometheus/client_golang/prometheus"
2020-08-06 06:43:09 -07:00
eth_common "github.com/ethereum/go-ethereum/common"
2022-10-25 13:13:36 -07:00
eth_hexutil "github.com/ethereum/go-ethereum/common/hexutil"
2020-08-06 10:00:16 -07:00
"go.uber.org/zap"
2020-08-17 10:29:25 -07:00
2021-08-26 01:35:09 -07:00
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/readiness"
"github.com/certusone/wormhole/node/pkg/supervisor"
2022-08-18 01:52:36 -07:00
"github.com/wormhole-foundation/wormhole/sdk/vaa"
2020-08-06 06:43:09 -07:00
)
2021-01-26 16:16:37 -08:00
var (
2021-07-23 07:06:35 -07:00
ethConnectionErrors = promauto . NewCounterVec (
2021-01-26 16:16:37 -08:00
prometheus . CounterOpts {
Name : "wormhole_eth_connection_errors_total" ,
Help : "Total number of Ethereum connection errors (either during initial connection or while watching)" ,
2021-07-28 05:33:42 -07:00
} , [ ] string { "eth_network" , "reason" } )
2021-01-26 16:16:37 -08:00
2021-07-28 05:33:42 -07:00
ethMessagesObserved = promauto . NewCounterVec (
2021-01-26 16:16:37 -08:00
prometheus . CounterOpts {
2021-07-21 10:46:10 -07:00
Name : "wormhole_eth_messages_observed_total" ,
Help : "Total number of Eth messages observed (pre-confirmation)" ,
2021-07-28 05:33:42 -07:00
} , [ ] string { "eth_network" } )
2022-01-06 03:01:33 -08:00
ethMessagesOrphaned = promauto . NewCounterVec (
prometheus . CounterOpts {
Name : "wormhole_eth_messages_orphaned_total" ,
Help : "Total number of Eth messages dropped (orphaned)" ,
2022-01-08 16:42:57 -08:00
} , [ ] string { "eth_network" , "reason" } )
2021-07-28 05:33:42 -07:00
ethMessagesConfirmed = promauto . NewCounterVec (
2021-01-26 16:16:37 -08:00
prometheus . CounterOpts {
2021-07-21 10:46:10 -07:00
Name : "wormhole_eth_messages_confirmed_total" ,
Help : "Total number of Eth messages verified (post-confirmation)" ,
2021-07-28 05:33:42 -07:00
} , [ ] string { "eth_network" } )
currentEthHeight = promauto . NewGaugeVec (
2021-01-26 16:16:37 -08:00
prometheus . GaugeOpts {
Name : "wormhole_eth_current_height" ,
Help : "Current Ethereum block height" ,
2021-07-28 05:33:42 -07:00
} , [ ] string { "eth_network" } )
2021-07-23 07:06:35 -07:00
queryLatency = promauto . NewHistogramVec (
2021-01-26 16:16:37 -08:00
prometheus . HistogramOpts {
Name : "wormhole_eth_query_latency" ,
Help : "Latency histogram for Ethereum calls (note that most interactions are streaming queries, NOT calls, and we cannot measure latency for those" ,
2021-07-28 05:33:42 -07:00
} , [ ] string { "eth_network" , "operation" } )
2021-01-26 16:16:37 -08:00
)
2020-08-06 06:43:09 -07:00
type (
2021-08-30 07:19:00 -07:00
Watcher struct {
2021-07-28 05:33:42 -07:00
// Ethereum RPC url
url string
2022-09-28 04:15:57 -07:00
// Address of the Eth contract
2021-08-30 07:19:00 -07:00
contract eth_common . Address
2021-07-28 05:33:42 -07:00
// Human-readable name of the Eth network, for logging and monitoring.
networkName string
2021-08-02 05:30:16 -07:00
// Readiness component
readiness readiness . Component
2021-07-28 05:33:42 -07:00
// VAA ChainID of the network we're connecting to.
chainID vaa . ChainID
2021-07-28 06:22:06 -07:00
// Channel to send new messages to.
msgChan chan * common . MessagePublication
// Channel to send guardian set changes to.
// setChan can be set to nil if no guardian set changes are needed.
//
2021-07-28 05:33:42 -07:00
// We currently only fetch the guardian set from one primary chain, which should
// have this flag set to true, and false on all others.
//
// The current primary chain is Ethereum (a mostly arbitrary decision because it
// has the best API - we might want to switch the primary chain to Solana once
// the governance mechanism lives there),
setChan chan * common . GuardianSet
2020-08-06 10:00:16 -07:00
2022-02-08 14:16:43 -08:00
// Incoming re-observation requests from the network. Pre-filtered to only
// include requests for our chainID.
obsvReqC chan * gossipv1 . ObservationRequest
2021-12-23 11:23:06 -08:00
pending map [ pendingKey ] * pendingMessage
2021-07-28 06:08:38 -07:00
pendingMu sync . Mutex
2021-08-21 11:56:43 -07:00
// 0 is a valid guardian set, so we need a nil value here
currentGuardianSet * uint32
2022-01-10 07:24:31 -08:00
2022-11-14 17:38:04 -08:00
// waitForConfirmations indicates if we should wait for the number of confirmations specified by the consistencyLevel in the message.
// On many of the chains, we already wait for finalized blocks so there is no point in waiting any additional blocks after finality.
// Therefore this parameter defaults to false. This feature can / should be enabled on chains where we don't wait for finality.
waitForConfirmations bool
// maxWaitConfirmations is the maximum number of confirmations to wait before declaring a transaction abandoned. If we are honoring
// the consistency level (waitForConfirmations is set to true), then we wait maxWaitConfirmations plus the consistency level. This
// parameter defaults to 60, which should be plenty long enough for most chains. If not, this parameter can be set.
maxWaitConfirmations uint64
2022-04-28 09:20:38 -07:00
// Interface to the chain specific ethereum library.
2022-10-25 13:13:36 -07:00
ethConn connectors . Connector
unsafeDevMode bool
2022-10-26 12:20:13 -07:00
latestFinalizedBlockNumber uint64
l1Finalizer interfaces . L1Finalizer
2022-11-11 06:12:16 -08:00
// These parameters are currently only used for Polygon and should be set via SetRootChainParams()
rootChainRpc string
rootChainContract string
2020-08-06 06:43:09 -07:00
}
2020-08-06 10:00:16 -07:00
2021-12-23 11:23:06 -08:00
pendingKey struct {
2022-01-06 03:01:33 -08:00
TxHash eth_common . Hash
BlockHash eth_common . Hash
2021-12-23 11:23:06 -08:00
EmitterAddress vaa . Address
2022-01-06 03:01:33 -08:00
Sequence uint64
2021-12-23 11:23:06 -08:00
}
2021-07-28 06:08:38 -07:00
pendingMessage struct {
message * common . MessagePublication
height uint64
2020-08-06 10:00:16 -07:00
}
2020-08-06 06:43:09 -07:00
)
2021-08-30 07:19:00 -07:00
func NewEthWatcher (
2021-07-28 05:33:42 -07:00
url string ,
2021-08-30 07:19:00 -07:00
contract eth_common . Address ,
2021-07-28 05:33:42 -07:00
networkName string ,
2021-08-02 05:30:16 -07:00
readiness readiness . Component ,
2021-07-28 05:33:42 -07:00
chainID vaa . ChainID ,
2021-07-28 06:08:38 -07:00
messageEvents chan * common . MessagePublication ,
2022-01-10 07:24:31 -08:00
setEvents chan * common . GuardianSet ,
2022-04-28 09:20:38 -07:00
obsvReqC chan * gossipv1 . ObservationRequest ,
2022-10-26 12:20:13 -07:00
unsafeDevMode bool ,
) * Watcher {
2022-04-28 09:20:38 -07:00
2021-08-30 07:19:00 -07:00
return & Watcher {
2022-11-14 17:38:04 -08:00
url : url ,
contract : contract ,
networkName : networkName ,
readiness : readiness ,
waitForConfirmations : false ,
maxWaitConfirmations : 60 ,
chainID : chainID ,
msgChan : messageEvents ,
setChan : setEvents ,
obsvReqC : obsvReqC ,
pending : map [ pendingKey ] * pendingMessage { } ,
unsafeDevMode : unsafeDevMode ,
2022-09-28 04:15:57 -07:00
}
2020-08-06 06:43:09 -07:00
}
2022-09-28 04:15:57 -07:00
func ( w * Watcher ) Run ( ctx context . Context ) error {
2021-07-28 05:33:42 -07:00
logger := supervisor . Logger ( ctx )
2022-10-25 13:13:36 -07:00
useFinalizedBlocks := ( w . chainID == vaa . ChainIDEthereum && ( ! w . unsafeDevMode ) )
if ( w . chainID == vaa . ChainIDKarura || w . chainID == vaa . ChainIDAcala ) && ( ! w . unsafeDevMode ) {
ufb , err := w . getAcalaMode ( ctx )
if err != nil {
2022-05-02 11:28:17 -07:00
return err
}
2022-10-25 13:13:36 -07:00
if ufb {
useFinalizedBlocks = true
}
2022-04-26 05:51:10 -07:00
}
2021-02-03 04:01:51 -08:00
// Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
2022-09-28 04:15:57 -07:00
p2p . DefaultRegistry . SetNetworkStats ( w . chainID , & gossipv1 . Heartbeat_Network {
ContractAddress : w . contract . Hex ( ) ,
2021-02-03 04:01:51 -08:00
} )
2020-08-21 14:47:58 -07:00
timeout , cancel := context . WithTimeout ( ctx , 15 * time . Second )
defer cancel ( )
2020-08-06 06:43:09 -07:00
2022-11-28 05:55:35 -08:00
safeBlocksSupported := false
2022-09-28 04:15:57 -07:00
var err error
if w . chainID == vaa . ChainIDCelo && ! w . unsafeDevMode {
// When we are running in mainnet or testnet, we need to use the Celo ethereum library rather than go-ethereum.
// However, in devnet, we currently run the standard ETH node for Celo, so we need to use the standard go-ethereum.
w . ethConn , err = connectors . NewCeloConnector ( timeout , w . networkName , w . url , w . contract , logger )
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "dialing eth client failed: %w" , err )
}
2022-10-25 13:13:36 -07:00
} else if useFinalizedBlocks {
2022-11-28 05:55:35 -08:00
if w . chainID == vaa . ChainIDEthereum && ! w . unsafeDevMode {
safeBlocksSupported = true
logger . Info ( "using finalized blocks, will publish safe blocks" )
} else {
logger . Info ( "using finalized blocks" )
}
2022-09-28 04:15:57 -07:00
baseConnector , err := connectors . NewEthereumConnector ( timeout , w . networkName , w . url , w . contract , logger )
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "dialing eth client failed: %w" , err )
}
2022-11-28 05:55:35 -08:00
w . ethConn , err = connectors . NewBlockPollConnector ( ctx , baseConnector , finalizers . NewDefaultFinalizer ( ) , 250 * time . Millisecond , true , safeBlocksSupported )
2022-09-28 04:15:57 -07:00
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "creating block poll connector failed: %w" , err )
}
} else if w . chainID == vaa . ChainIDMoonbeam && ! w . unsafeDevMode {
baseConnector , err := connectors . NewEthereumConnector ( timeout , w . networkName , w . url , w . contract , logger )
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "dialing eth client failed: %w" , err )
}
finalizer := finalizers . NewMoonbeamFinalizer ( logger , baseConnector )
2022-11-28 05:55:35 -08:00
w . ethConn , err = connectors . NewBlockPollConnector ( ctx , baseConnector , finalizer , 250 * time . Millisecond , false , false )
2022-09-28 04:15:57 -07:00
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "creating block poll connector failed: %w" , err )
}
2022-11-14 06:07:45 -08:00
} else if w . chainID == vaa . ChainIDNeon && ! w . unsafeDevMode {
if w . l1Finalizer == nil {
return fmt . Errorf ( "unable to create neon watcher because the l1 finalizer is not set" )
}
2022-09-28 04:15:57 -07:00
baseConnector , err := connectors . NewEthereumConnector ( timeout , w . networkName , w . url , w . contract , logger )
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "dialing eth client failed: %w" , err )
}
2022-11-14 06:07:45 -08:00
finalizer := finalizers . NewNeonFinalizer ( logger , baseConnector , baseConnector . Client ( ) , w . l1Finalizer )
2022-11-28 05:55:35 -08:00
pollConnector , err := connectors . NewBlockPollConnector ( ctx , baseConnector , finalizer , 250 * time . Millisecond , false , false )
2022-09-28 04:15:57 -07:00
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "creating block poll connector failed: %w" , err )
}
w . ethConn , err = connectors . NewLogPollConnector ( ctx , pollConnector , baseConnector . Client ( ) )
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "creating poll connector failed: %w" , err )
}
2022-09-28 12:19:01 -07:00
} else if w . chainID == vaa . ChainIDArbitrum && ! w . unsafeDevMode {
2022-10-26 12:20:13 -07:00
if w . l1Finalizer == nil {
return fmt . Errorf ( "unable to create arbitrum watcher because the l1 finalizer is not set" )
}
2022-09-28 12:19:01 -07:00
baseConnector , err := connectors . NewEthereumConnector ( timeout , w . networkName , w . url , w . contract , logger )
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "dialing eth client failed: %w" , err )
}
2022-10-26 12:20:13 -07:00
finalizer := finalizers . NewArbitrumFinalizer ( logger , baseConnector , baseConnector . Client ( ) , w . l1Finalizer )
2022-11-28 05:55:35 -08:00
pollConnector , err := connectors . NewBlockPollConnector ( ctx , baseConnector , finalizer , 250 * time . Millisecond , false , false )
2022-09-28 12:19:01 -07:00
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "creating block poll connector failed: %w" , err )
}
2022-10-12 08:04:54 -07:00
w . ethConn , err = connectors . NewArbitrumConnector ( ctx , pollConnector )
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "creating arbitrum connector failed: %w" , err )
}
2022-11-10 05:50:08 -08:00
} else if w . chainID == vaa . ChainIDOptimism && ! w . unsafeDevMode {
if w . l1Finalizer == nil {
return fmt . Errorf ( "unable to create optimism watcher because the l1 finalizer is not set" )
}
baseConnector , err := connectors . NewEthereumConnector ( timeout , w . networkName , w . url , w . contract , logger )
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "dialing eth client failed: %w" , err )
}
finalizer := finalizers . NewOptimismFinalizer ( timeout , logger , baseConnector , w . l1Finalizer )
2022-11-28 05:55:35 -08:00
w . ethConn , err = connectors . NewBlockPollConnector ( ctx , baseConnector , finalizer , 250 * time . Millisecond , false , false )
2022-11-10 05:50:08 -08:00
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "creating block poll connector failed: %w" , err )
}
2022-11-11 06:12:16 -08:00
} else if w . chainID == vaa . ChainIDPolygon && w . usePolygonCheckpointing ( ) {
baseConnector , err := connectors . NewEthereumConnector ( timeout , w . networkName , w . url , w . contract , logger )
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "failed to connect to polygon: %w" , err )
}
w . ethConn , err = connectors . NewPolygonConnector ( ctx ,
baseConnector ,
w . rootChainRpc ,
w . rootChainContract ,
)
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "failed to create polygon connector: %w" , err )
}
2022-09-28 04:15:57 -07:00
} else {
w . ethConn , err = connectors . NewEthereumConnector ( timeout , w . networkName , w . url , w . contract , logger )
if err != nil {
ethConnectionErrors . WithLabelValues ( w . networkName , "dial_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
return fmt . Errorf ( "dialing eth client failed: %w" , err )
}
2020-08-06 06:43:09 -07:00
}
2020-08-19 05:23:00 -07:00
2022-10-07 11:45:37 -07:00
// Subscribe to new message publications. We don't use a timeout here because the LogPollConnector
// will keep running. Other connectors will use a timeout internally if appropriate.
2022-09-28 04:15:57 -07:00
messageC := make ( chan * ethabi . AbiLogMessagePublished , 2 )
2022-10-07 11:45:37 -07:00
messageSub , err := w . ethConn . WatchLogMessagePublished ( ctx , messageC )
2020-08-19 05:23:00 -07:00
if err != nil {
2022-09-28 04:15:57 -07:00
ethConnectionErrors . WithLabelValues ( w . networkName , "subscribe_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
2021-07-21 10:46:10 -07:00
return fmt . Errorf ( "failed to subscribe to message publication events: %w" , err )
2020-08-19 05:23:00 -07:00
}
2022-10-19 08:20:46 -07:00
defer messageSub . Unsubscribe ( )
2020-08-19 05:23:00 -07:00
2021-08-21 11:56:43 -07:00
// Fetch initial guardian set
2022-09-28 04:15:57 -07:00
if err := w . fetchAndUpdateGuardianSet ( logger , ctx , w . ethConn ) ; err != nil {
2021-08-21 11:56:43 -07:00
return fmt . Errorf ( "failed to request guardian set: %v" , err )
}
2020-08-06 06:43:09 -07:00
2022-04-11 12:07:19 -07:00
errC := make ( chan error )
2021-08-10 08:35:18 -07:00
// Poll for guardian set.
go func ( ) {
t := time . NewTicker ( 15 * time . Second )
defer t . Stop ( )
for {
select {
case <- ctx . Done ( ) :
return
case <- t . C :
2022-09-28 04:15:57 -07:00
if err := w . fetchAndUpdateGuardianSet ( logger , ctx , w . ethConn ) ; err != nil {
2022-04-11 12:07:19 -07:00
errC <- fmt . Errorf ( "failed to request guardian set: %v" , err )
return
2021-08-10 08:35:18 -07:00
}
}
2021-07-28 05:33:42 -07:00
}
2021-08-10 08:35:18 -07:00
} ( )
2020-10-22 03:20:12 -07:00
2022-11-28 05:55:35 -08:00
// Track the current block numbers so we can compare it to the block number of
2022-02-08 14:16:43 -08:00
// the message publication for observation requests.
var currentBlockNumber uint64
2022-11-28 05:55:35 -08:00
var currentSafeBlockNumber uint64
2022-02-08 14:16:43 -08:00
go func ( ) {
for {
select {
case <- ctx . Done ( ) :
return
2022-09-28 04:15:57 -07:00
case r := <- w . obsvReqC :
2022-02-08 14:16:43 -08:00
// This can't happen unless there is a programming error - the caller
// is expected to send us only requests for our chainID.
2022-09-28 04:15:57 -07:00
if vaa . ChainID ( r . ChainId ) != w . chainID {
2022-02-08 14:16:43 -08:00
panic ( "invalid chain ID" )
}
tx := eth_common . BytesToHash ( r . TxHash )
logger . Info ( "received observation request" ,
2022-09-28 04:15:57 -07:00
zap . String ( "eth_network" , w . networkName ) ,
2022-02-08 14:16:43 -08:00
zap . String ( "tx_hash" , tx . Hex ( ) ) )
// SECURITY: Load the block number before requesting the transaction to avoid a
// race condition where requesting the tx succeeds and is then dropped due to a fork,
// but blockNumberU had already advanced beyond the required threshold.
//
// In the primary watcher flow, this is of no concern since we assume the node
// always sends the head before it sends the logs (implicit synchronization
// by relying on the same websocket connection).
blockNumberU := atomic . LoadUint64 ( & currentBlockNumber )
2022-11-28 05:55:35 -08:00
safeBlockNumberU := atomic . LoadUint64 ( & currentSafeBlockNumber )
2022-02-08 14:16:43 -08:00
timeout , cancel := context . WithTimeout ( ctx , 5 * time . Second )
2022-09-28 04:15:57 -07:00
blockNumber , msgs , err := MessageEventsForTransaction ( timeout , w . ethConn , w . contract , w . chainID , tx )
2022-02-08 14:16:43 -08:00
cancel ( )
if err != nil {
logger . Error ( "failed to process observation request" ,
2022-09-28 04:15:57 -07:00
zap . Error ( err ) , zap . String ( "eth_network" , w . networkName ) )
2022-02-08 14:16:43 -08:00
continue
}
for _ , msg := range msgs {
2022-09-30 09:03:01 -07:00
if msg . ConsistencyLevel == vaa . ConsistencyLevelPublishImmediately {
logger . Info ( "re-observed message publication transaction, publishing it immediately" ,
zap . Stringer ( "tx" , msg . TxHash ) ,
zap . Stringer ( "emitter_address" , msg . EmitterAddress ) ,
zap . Uint64 ( "sequence" , msg . Sequence ) ,
zap . Uint64 ( "current_block" , blockNumberU ) ,
zap . Uint64 ( "observed_block" , blockNumber ) ,
zap . String ( "eth_network" , w . networkName ) ,
)
w . msgChan <- msg
continue
}
2022-11-28 05:55:35 -08:00
if msg . ConsistencyLevel == vaa . ConsistencyLevelSafe && safeBlocksSupported {
if safeBlockNumberU == 0 {
logger . Error ( "no safe block number available, ignoring observation request" ,
zap . String ( "eth_network" , w . networkName ) )
continue
}
if blockNumber <= safeBlockNumberU {
logger . Info ( "re-observed message publication transaction" ,
zap . Stringer ( "tx" , msg . TxHash ) ,
zap . Stringer ( "emitter_address" , msg . EmitterAddress ) ,
zap . Uint64 ( "sequence" , msg . Sequence ) ,
zap . Uint64 ( "current_safe_block" , safeBlockNumberU ) ,
zap . Uint64 ( "observed_block" , blockNumber ) ,
zap . String ( "eth_network" , w . networkName ) ,
)
w . msgChan <- msg
} else {
logger . Info ( "ignoring re-observed message publication transaction" ,
zap . Stringer ( "tx" , msg . TxHash ) ,
zap . Stringer ( "emitter_address" , msg . EmitterAddress ) ,
zap . Uint64 ( "sequence" , msg . Sequence ) ,
zap . Uint64 ( "current_safe_block" , safeBlockNumberU ) ,
zap . Uint64 ( "observed_block" , blockNumber ) ,
zap . String ( "eth_network" , w . networkName ) ,
)
}
continue
}
if blockNumberU == 0 {
logger . Error ( "no block number available, ignoring observation request" ,
zap . String ( "eth_network" , w . networkName ) )
continue
}
2022-11-14 17:38:04 -08:00
var expectedConfirmations uint64
if w . waitForConfirmations {
expectedConfirmations = uint64 ( msg . ConsistencyLevel )
2022-02-08 14:16:43 -08:00
}
// SECURITY: In the recovery flow, we already know which transaction to
// observe, and we can assume that it has reached the expected finality
// level a long time ago. Therefore, the logic is much simpler than the
// primary watcher, which has to wait for finality.
//
// Instead, we can simply check if the transaction's block number is in
// the past by more than the expected confirmation number.
//
// Ensure that the current block number is at least expectedConfirmations
// larger than the message observation's block number.
if blockNumber + expectedConfirmations <= blockNumberU {
logger . Info ( "re-observed message publication transaction" ,
zap . Stringer ( "tx" , msg . TxHash ) ,
zap . Stringer ( "emitter_address" , msg . EmitterAddress ) ,
zap . Uint64 ( "sequence" , msg . Sequence ) ,
zap . Uint64 ( "current_block" , blockNumberU ) ,
zap . Uint64 ( "observed_block" , blockNumber ) ,
2022-09-28 04:15:57 -07:00
zap . String ( "eth_network" , w . networkName ) ,
2022-02-08 14:16:43 -08:00
)
2022-09-28 04:15:57 -07:00
w . msgChan <- msg
2022-02-08 14:16:43 -08:00
} else {
logger . Info ( "ignoring re-observed message publication transaction" ,
zap . Stringer ( "tx" , msg . TxHash ) ,
zap . Stringer ( "emitter_address" , msg . EmitterAddress ) ,
zap . Uint64 ( "sequence" , msg . Sequence ) ,
zap . Uint64 ( "current_block" , blockNumberU ) ,
zap . Uint64 ( "observed_block" , blockNumber ) ,
zap . Uint64 ( "expected_confirmations" , expectedConfirmations ) ,
2022-09-28 04:15:57 -07:00
zap . String ( "eth_network" , w . networkName ) ,
2022-02-08 14:16:43 -08:00
)
}
}
}
}
} ( )
2020-08-06 06:43:09 -07:00
go func ( ) {
for {
select {
2020-08-17 05:56:22 -07:00
case <- ctx . Done ( ) :
return
2021-07-28 05:33:42 -07:00
case err := <- messageSub . Err ( ) :
2022-09-28 04:15:57 -07:00
ethConnectionErrors . WithLabelValues ( w . networkName , "subscription_error" ) . Inc ( )
2021-07-28 05:33:42 -07:00
errC <- fmt . Errorf ( "error while processing message publication subscription: %w" , err )
2022-09-28 04:15:57 -07:00
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
2020-08-19 05:23:00 -07:00
return
2021-07-21 10:46:10 -07:00
case ev := <- messageC :
2020-08-20 12:48:58 -07:00
// Request timestamp for block
2021-01-26 16:16:37 -08:00
msm := time . Now ( )
2022-01-10 11:48:33 -08:00
timeout , cancel := context . WithTimeout ( ctx , 15 * time . Second )
2022-09-28 04:15:57 -07:00
blockTime , err := w . ethConn . TimeOfBlockByHash ( timeout , ev . Raw . BlockHash )
2020-08-21 14:47:58 -07:00
cancel ( )
2022-09-28 04:15:57 -07:00
queryLatency . WithLabelValues ( w . networkName , "block_by_number" ) . Observe ( time . Since ( msm ) . Seconds ( ) )
2021-01-26 16:16:37 -08:00
2020-08-20 12:48:58 -07:00
if err != nil {
2022-09-28 04:15:57 -07:00
ethConnectionErrors . WithLabelValues ( w . networkName , "block_by_number_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
2022-09-19 09:20:49 -07:00
errC <- fmt . Errorf ( "failed to request timestamp for block %d, hash %s: %w" ,
ev . Raw . BlockNumber , ev . Raw . BlockHash . String ( ) , err )
2020-08-20 12:48:58 -07:00
return
}
2021-12-23 11:23:06 -08:00
message := & common . MessagePublication {
2021-07-09 05:56:52 -07:00
TxHash : ev . Raw . TxHash ,
2022-04-28 09:20:38 -07:00
Timestamp : time . Unix ( int64 ( blockTime ) , 0 ) ,
2021-07-09 05:56:52 -07:00
Nonce : ev . Nonce ,
Sequence : ev . Sequence ,
2022-09-28 04:15:57 -07:00
EmitterChain : w . chainID ,
2021-07-09 05:56:52 -07:00
EmitterAddress : PadAddress ( ev . Sender ) ,
Payload : ev . Payload ,
2021-07-20 14:50:38 -07:00
ConsistencyLevel : ev . ConsistencyLevel ,
2020-08-06 06:43:09 -07:00
}
2020-08-06 10:00:16 -07:00
2022-09-30 09:03:01 -07:00
ethMessagesObserved . WithLabelValues ( w . networkName ) . Inc ( )
if message . ConsistencyLevel == vaa . ConsistencyLevelPublishImmediately {
logger . Info ( "found new message publication transaction, publishing it immediately" ,
zap . Stringer ( "tx" , ev . Raw . TxHash ) ,
zap . Uint64 ( "block" , ev . Raw . BlockNumber ) ,
zap . Stringer ( "blockhash" , ev . Raw . BlockHash ) ,
zap . Uint64 ( "Sequence" , ev . Sequence ) ,
zap . Uint32 ( "Nonce" , ev . Nonce ) ,
zap . Uint8 ( "ConsistencyLevel" , ev . ConsistencyLevel ) ,
zap . String ( "eth_network" , w . networkName ) )
w . msgChan <- message
ethMessagesConfirmed . WithLabelValues ( w . networkName ) . Inc ( )
continue
}
2022-03-03 12:31:15 -08:00
logger . Info ( "found new message publication transaction" ,
zap . Stringer ( "tx" , ev . Raw . TxHash ) ,
zap . Uint64 ( "block" , ev . Raw . BlockNumber ) ,
zap . Stringer ( "blockhash" , ev . Raw . BlockHash ) ,
2022-06-14 07:22:49 -07:00
zap . Uint64 ( "Sequence" , ev . Sequence ) ,
zap . Uint32 ( "Nonce" , ev . Nonce ) ,
zap . Uint8 ( "ConsistencyLevel" , ev . ConsistencyLevel ) ,
2022-09-28 04:15:57 -07:00
zap . String ( "eth_network" , w . networkName ) )
2021-01-26 16:16:37 -08:00
2021-12-23 11:23:06 -08:00
key := pendingKey {
TxHash : message . TxHash ,
2022-01-06 03:01:33 -08:00
BlockHash : ev . Raw . BlockHash ,
2021-12-23 11:23:06 -08:00
EmitterAddress : message . EmitterAddress ,
Sequence : message . Sequence ,
}
2022-09-28 04:15:57 -07:00
w . pendingMu . Lock ( )
w . pending [ key ] = & pendingMessage {
2021-12-23 11:23:06 -08:00
message : message ,
2021-07-28 06:08:38 -07:00
height : ev . Raw . BlockNumber ,
2020-08-06 10:00:16 -07:00
}
2022-09-28 04:15:57 -07:00
w . pendingMu . Unlock ( )
2020-08-06 06:43:09 -07:00
}
}
2020-08-06 10:00:16 -07:00
} ( )
// Watch headers
2022-09-28 04:15:57 -07:00
headSink := make ( chan * connectors . NewBlock , 2 )
headerSubscription , err := w . ethConn . SubscribeForBlocks ( ctx , headSink )
2020-08-06 10:00:16 -07:00
if err != nil {
2022-09-28 04:15:57 -07:00
ethConnectionErrors . WithLabelValues ( w . networkName , "header_subscribe_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
2020-08-06 10:00:16 -07:00
return fmt . Errorf ( "failed to subscribe to header events: %w" , err )
}
2020-08-06 06:43:09 -07:00
2020-08-06 10:00:16 -07:00
go func ( ) {
for {
select {
2020-08-17 05:56:22 -07:00
case <- ctx . Done ( ) :
return
2021-08-08 08:16:41 -07:00
case err := <- headerSubscription . Err ( ) :
2022-09-28 04:15:57 -07:00
ethConnectionErrors . WithLabelValues ( w . networkName , "header_subscription_error" ) . Inc ( )
2021-08-08 08:16:41 -07:00
errC <- fmt . Errorf ( "error while processing header subscription: %w" , err )
2022-09-28 04:15:57 -07:00
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
2020-08-06 10:00:16 -07:00
return
case ev := <- headSink :
2022-06-23 07:40:04 -07:00
// These two pointers should have been checked before the event was placed on the channel, but just being safe.
2022-03-15 07:41:10 -07:00
if ev == nil {
2022-09-28 04:15:57 -07:00
logger . Error ( "new header event is nil" , zap . String ( "eth_network" , w . networkName ) )
2022-03-15 07:41:10 -07:00
continue
}
2022-06-23 07:40:04 -07:00
if ev . Number == nil {
2022-11-28 05:55:35 -08:00
logger . Error ( "new header block number is nil" , zap . String ( "eth_network" , w . networkName ) , zap . Bool ( "is_safe_block" , ev . Safe ) )
2022-06-23 07:40:04 -07:00
continue
}
2022-03-15 07:41:10 -07:00
2020-08-06 10:00:16 -07:00
start := time . Now ( )
2022-05-18 13:57:06 -07:00
currentHash := ev . Hash
2022-01-08 16:42:57 -08:00
logger . Info ( "processing new header" ,
zap . Stringer ( "current_block" , ev . Number ) ,
zap . Stringer ( "current_blockhash" , currentHash ) ,
2022-11-28 05:55:35 -08:00
zap . Bool ( "is_safe_block" , ev . Safe ) ,
2022-09-28 04:15:57 -07:00
zap . String ( "eth_network" , w . networkName ) )
currentEthHeight . WithLabelValues ( w . networkName ) . Set ( float64 ( ev . Number . Int64 ( ) ) )
readiness . SetReady ( w . readiness )
p2p . DefaultRegistry . SetNetworkStats ( w . chainID , & gossipv1 . Heartbeat_Network {
2022-01-06 03:01:33 -08:00
Height : ev . Number . Int64 ( ) ,
2022-09-28 04:15:57 -07:00
ContractAddress : w . contract . Hex ( ) ,
2021-02-03 04:01:51 -08:00
} )
2020-11-27 15:46:37 -08:00
2022-09-28 04:15:57 -07:00
w . pendingMu . Lock ( )
2020-08-06 10:00:16 -07:00
blockNumberU := ev . Number . Uint64 ( )
2022-11-28 05:55:35 -08:00
if ev . Safe {
atomic . StoreUint64 ( & currentSafeBlockNumber , blockNumberU )
} else {
atomic . StoreUint64 ( & currentBlockNumber , blockNumberU )
atomic . StoreUint64 ( & w . latestFinalizedBlockNumber , blockNumberU )
}
2022-01-10 07:24:31 -08:00
2022-09-28 04:15:57 -07:00
for key , pLock := range w . pending {
2022-11-28 05:55:35 -08:00
// If this block is safe, only process messages wanting safe.
// If it's not safe, only process messages wanting finalized.
if safeBlocksSupported {
if ev . Safe != ( pLock . message . ConsistencyLevel == vaa . ConsistencyLevelSafe ) {
continue
}
}
2022-11-14 17:38:04 -08:00
var expectedConfirmations uint64
2022-11-28 05:55:35 -08:00
if w . waitForConfirmations && ! ev . Safe {
2022-11-14 17:38:04 -08:00
expectedConfirmations = uint64 ( pLock . message . ConsistencyLevel )
2022-01-10 07:24:31 -08:00
}
2020-08-06 10:00:16 -07:00
// Transaction was dropped and never picked up again
2022-11-14 17:38:04 -08:00
if pLock . height + expectedConfirmations + w . maxWaitConfirmations <= blockNumberU {
2022-01-10 16:50:34 -08:00
logger . Info ( "observation timed out" ,
2022-01-08 16:42:57 -08:00
zap . Stringer ( "tx" , pLock . message . TxHash ) ,
zap . Stringer ( "blockhash" , key . BlockHash ) ,
zap . Stringer ( "emitter_address" , key . EmitterAddress ) ,
zap . Uint64 ( "sequence" , key . Sequence ) ,
zap . Stringer ( "current_block" , ev . Number ) ,
2022-11-28 05:55:35 -08:00
zap . Bool ( "is_safe_block" , ev . Safe ) ,
2022-01-08 16:42:57 -08:00
zap . Stringer ( "current_blockhash" , currentHash ) ,
2022-09-28 04:15:57 -07:00
zap . String ( "eth_network" , w . networkName ) ,
2022-11-14 17:38:04 -08:00
zap . Uint64 ( "expectedConfirmations" , expectedConfirmations ) ,
zap . Uint64 ( "maxWaitConfirmations" , w . maxWaitConfirmations ) ,
2022-01-08 16:42:57 -08:00
)
2022-09-28 04:15:57 -07:00
ethMessagesOrphaned . WithLabelValues ( w . networkName , "timeout" ) . Inc ( )
delete ( w . pending , key )
2020-08-06 10:00:16 -07:00
continue
}
// Transaction is now ready
2022-11-14 17:38:04 -08:00
if pLock . height + expectedConfirmations <= blockNumberU {
2022-01-10 11:48:33 -08:00
timeout , cancel := context . WithTimeout ( ctx , 5 * time . Second )
2022-09-28 04:15:57 -07:00
tx , err := w . ethConn . TransactionReceipt ( timeout , pLock . message . TxHash )
2022-01-06 03:01:33 -08:00
cancel ( )
2022-01-10 11:13:33 -08:00
2022-05-04 07:24:47 -07:00
// If the node returns an error after waiting expectedConfirmation blocks,
// it means the chain reorged and the transaction was orphaned. The
// TransactionReceipt call is using the same websocket connection than the
// head notifications, so it's guaranteed to be atomic.
//
// Check multiple possible error cases - the node seems to return a
// "not found" error most of the time, but it could conceivably also
// return a nil tx or rpc.ErrNoResult.
if tx == nil || err == rpc . ErrNoResult || ( err != nil && err . Error ( ) == "not found" ) {
logger . Warn ( "tx was orphaned" ,
2022-03-06 17:54:57 -08:00
zap . Stringer ( "tx" , pLock . message . TxHash ) ,
zap . Stringer ( "blockhash" , key . BlockHash ) ,
zap . Stringer ( "emitter_address" , key . EmitterAddress ) ,
zap . Uint64 ( "sequence" , key . Sequence ) ,
zap . Stringer ( "current_block" , ev . Number ) ,
2022-11-28 05:55:35 -08:00
zap . Bool ( "is_safe_block" , ev . Safe ) ,
2022-03-06 17:54:57 -08:00
zap . Stringer ( "current_blockhash" , currentHash ) ,
2022-09-28 04:15:57 -07:00
zap . String ( "eth_network" , w . networkName ) ,
2022-03-06 17:54:57 -08:00
zap . Error ( err ) )
2022-09-28 04:15:57 -07:00
delete ( w . pending , key )
ethMessagesOrphaned . WithLabelValues ( w . networkName , "not_found" ) . Inc ( )
2022-03-06 17:54:57 -08:00
continue
}
2022-05-04 07:24:47 -07:00
// This should never happen - if we got this far, it means that logs were emitted,
// which is only possible if the transaction succeeded. We check it anyway just
// in case the EVM implementation is buggy.
if tx . Status != 1 {
logger . Error ( "transaction receipt with non-success status" ,
2022-01-08 16:42:57 -08:00
zap . Stringer ( "tx" , pLock . message . TxHash ) ,
zap . Stringer ( "blockhash" , key . BlockHash ) ,
zap . Stringer ( "emitter_address" , key . EmitterAddress ) ,
zap . Uint64 ( "sequence" , key . Sequence ) ,
zap . Stringer ( "current_block" , ev . Number ) ,
2022-11-28 05:55:35 -08:00
zap . Bool ( "is_safe_block" , ev . Safe ) ,
2022-01-08 16:42:57 -08:00
zap . Stringer ( "current_blockhash" , currentHash ) ,
2022-09-28 04:15:57 -07:00
zap . String ( "eth_network" , w . networkName ) ,
2022-01-08 16:42:57 -08:00
zap . Error ( err ) )
2022-09-28 04:15:57 -07:00
delete ( w . pending , key )
ethMessagesOrphaned . WithLabelValues ( w . networkName , "tx_failed" ) . Inc ( )
2022-01-06 03:01:33 -08:00
continue
}
2022-01-10 11:13:33 -08:00
// Any error other than "not found" is likely transient - we retry next block.
if err != nil {
logger . Warn ( "transaction could not be fetched" ,
2022-01-08 16:42:57 -08:00
zap . Stringer ( "tx" , pLock . message . TxHash ) ,
zap . Stringer ( "blockhash" , key . BlockHash ) ,
zap . Stringer ( "emitter_address" , key . EmitterAddress ) ,
zap . Uint64 ( "sequence" , key . Sequence ) ,
zap . Stringer ( "current_block" , ev . Number ) ,
2022-11-28 05:55:35 -08:00
zap . Bool ( "is_safe_block" , ev . Safe ) ,
2022-01-08 16:42:57 -08:00
zap . Stringer ( "current_blockhash" , currentHash ) ,
2022-09-28 04:15:57 -07:00
zap . String ( "eth_network" , w . networkName ) ,
2022-01-10 11:13:33 -08:00
zap . Error ( err ) )
2022-01-06 03:01:33 -08:00
continue
}
2022-01-10 11:13:33 -08:00
// It's possible for a transaction to be orphaned and then included in a different block
// but with the same tx hash. Drop the observation (it will be re-observed and needs to
// wait for the full confirmation time again).
2022-01-06 03:01:33 -08:00
if tx . BlockHash != key . BlockHash {
2022-01-08 16:42:57 -08:00
logger . Info ( "tx got dropped and mined in a different block; the message should have been reobserved" ,
zap . Stringer ( "tx" , pLock . message . TxHash ) ,
zap . Stringer ( "blockhash" , key . BlockHash ) ,
zap . Stringer ( "emitter_address" , key . EmitterAddress ) ,
zap . Uint64 ( "sequence" , key . Sequence ) ,
zap . Stringer ( "current_block" , ev . Number ) ,
2022-11-28 05:55:35 -08:00
zap . Bool ( "is_safe_block" , ev . Safe ) ,
2022-01-08 16:42:57 -08:00
zap . Stringer ( "current_blockhash" , currentHash ) ,
2022-09-28 04:15:57 -07:00
zap . String ( "eth_network" , w . networkName ) )
delete ( w . pending , key )
ethMessagesOrphaned . WithLabelValues ( w . networkName , "blockhash_mismatch" ) . Inc ( )
2022-01-06 03:01:33 -08:00
continue
}
2022-01-10 11:13:33 -08:00
2022-01-08 16:42:57 -08:00
logger . Info ( "observation confirmed" ,
zap . Stringer ( "tx" , pLock . message . TxHash ) ,
zap . Stringer ( "blockhash" , key . BlockHash ) ,
zap . Stringer ( "emitter_address" , key . EmitterAddress ) ,
zap . Uint64 ( "sequence" , key . Sequence ) ,
zap . Stringer ( "current_block" , ev . Number ) ,
2022-11-28 05:55:35 -08:00
zap . Bool ( "is_safe_block" , ev . Safe ) ,
2022-01-08 16:42:57 -08:00
zap . Stringer ( "current_blockhash" , currentHash ) ,
2022-09-28 04:15:57 -07:00
zap . String ( "eth_network" , w . networkName ) )
delete ( w . pending , key )
w . msgChan <- pLock . message
ethMessagesConfirmed . WithLabelValues ( w . networkName ) . Inc ( )
2020-08-06 10:00:16 -07:00
}
}
2022-09-28 04:15:57 -07:00
w . pendingMu . Unlock ( )
2022-01-08 16:42:57 -08:00
logger . Info ( "processed new header" ,
zap . Stringer ( "current_block" , ev . Number ) ,
2022-11-28 05:55:35 -08:00
zap . Bool ( "is_safe_block" , ev . Safe ) ,
2022-01-08 16:42:57 -08:00
zap . Stringer ( "current_blockhash" , currentHash ) ,
zap . Duration ( "took" , time . Since ( start ) ) ,
2022-09-28 04:15:57 -07:00
zap . String ( "eth_network" , w . networkName ) )
2020-08-06 10:00:16 -07:00
}
}
2020-08-06 06:43:09 -07:00
} ( )
2022-10-05 22:19:31 -07:00
// Now that the init is complete, peg readiness. That will also happen when we process a new head, but chains
// that wait for finality may take a while to receive the first block and we don't want to hold up the init.
readiness . SetReady ( w . readiness )
2020-08-21 15:21:41 -07:00
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
case err := <- errC :
return err
}
}
2022-09-28 04:15:57 -07:00
func ( w * Watcher ) fetchAndUpdateGuardianSet (
2021-08-21 11:56:43 -07:00
logger * zap . Logger ,
ctx context . Context ,
2022-09-28 04:15:57 -07:00
ethConn connectors . Connector ,
2021-08-21 11:56:43 -07:00
) error {
msm := time . Now ( )
logger . Info ( "fetching guardian set" )
timeout , cancel := context . WithTimeout ( ctx , 15 * time . Second )
defer cancel ( )
2022-09-28 04:15:57 -07:00
idx , gs , err := fetchCurrentGuardianSet ( timeout , ethConn )
2021-08-21 11:56:43 -07:00
if err != nil {
2022-09-28 04:15:57 -07:00
ethConnectionErrors . WithLabelValues ( w . networkName , "guardian_set_fetch_error" ) . Inc ( )
p2p . DefaultRegistry . AddErrorCount ( w . chainID , 1 )
2021-08-21 11:56:43 -07:00
return err
}
2022-09-28 04:15:57 -07:00
queryLatency . WithLabelValues ( w . networkName , "get_guardian_set" ) . Observe ( time . Since ( msm ) . Seconds ( ) )
2021-08-21 11:56:43 -07:00
2022-09-28 04:15:57 -07:00
if w . currentGuardianSet != nil && * ( w . currentGuardianSet ) == idx {
2021-08-21 11:56:43 -07:00
return nil
}
logger . Info ( "updated guardian set found" ,
zap . Any ( "value" , gs ) , zap . Uint32 ( "index" , idx ) ,
2022-09-28 04:15:57 -07:00
zap . String ( "eth_network" , w . networkName ) )
2021-08-21 11:56:43 -07:00
2022-09-28 04:15:57 -07:00
w . currentGuardianSet = & idx
2021-08-21 11:56:43 -07:00
2022-09-28 04:15:57 -07:00
if w . setChan != nil {
w . setChan <- & common . GuardianSet {
2021-08-21 11:56:43 -07:00
Keys : gs . Keys ,
Index : idx ,
}
}
return nil
}
2020-08-21 15:21:41 -07:00
// Fetch the current guardian set ID and guardian set from the chain.
2022-09-28 04:15:57 -07:00
func fetchCurrentGuardianSet ( ctx context . Context , ethConn connectors . Connector ) ( uint32 , * ethabi . StructsGuardianSet , error ) {
currentIndex , err := ethConn . GetCurrentGuardianSetIndex ( ctx )
2020-08-19 05:23:00 -07:00
if err != nil {
2020-08-21 15:21:41 -07:00
return 0 , nil , fmt . Errorf ( "error requesting current guardian set index: %w" , err )
2020-08-19 05:23:00 -07:00
}
2022-09-28 04:15:57 -07:00
gs , err := ethConn . GetGuardianSet ( ctx , currentIndex )
2020-08-19 05:23:00 -07:00
if err != nil {
2020-08-21 15:21:41 -07:00
return 0 , nil , fmt . Errorf ( "error requesting current guardian set value: %w" , err )
2020-08-19 05:23:00 -07:00
}
2020-08-21 15:21:41 -07:00
return currentIndex , & gs , nil
2020-08-06 06:43:09 -07:00
}
2022-04-26 05:51:10 -07:00
2022-10-25 13:13:36 -07:00
func ( w * Watcher ) getAcalaMode ( ctx context . Context ) ( useFinalizedBlocks bool , errRet error ) {
2022-05-02 11:28:17 -07:00
timeout , cancel := context . WithTimeout ( ctx , 15 * time . Second )
defer cancel ( )
2022-04-26 05:51:10 -07:00
2022-09-28 04:15:57 -07:00
c , err := rpc . DialContext ( timeout , w . url )
2022-05-02 11:28:17 -07:00
if err != nil {
2022-10-25 13:13:36 -07:00
errRet = fmt . Errorf ( "failed to connect to url %s to check acala mode: %w" , w . url , err )
return
}
// First check to see if polling for finalized blocks is suported.
type Marshaller struct {
Number * eth_hexutil . Big
}
var m Marshaller
err = c . CallContext ( ctx , & m , "eth_getBlockByNumber" , "finalized" , false )
if err == nil {
useFinalizedBlocks = true
return
2022-05-02 11:28:17 -07:00
}
2022-04-26 05:51:10 -07:00
2022-10-25 13:13:36 -07:00
// If finalized blocks are not supported, then we had better be in safe mode!
2022-05-02 11:28:17 -07:00
var safe bool
err = c . CallContext ( ctx , & safe , "net_isSafeMode" )
if err != nil {
2022-10-25 13:13:36 -07:00
errRet = fmt . Errorf ( "check for safe mode for url %s failed: %w" , w . url , err )
return
2022-05-02 11:28:17 -07:00
}
if ! safe {
2022-10-25 13:13:36 -07:00
errRet = fmt . Errorf ( "url %s does not support finalized blocks and is not using safe mode" , w . url )
2022-04-26 05:51:10 -07:00
}
2022-10-25 13:13:36 -07:00
return
2022-04-26 05:51:10 -07:00
}
2022-10-26 12:20:13 -07:00
2022-11-14 06:07:45 -08:00
// SetL1Finalizer is used to set the layer one finalizer.
func ( w * Watcher ) SetL1Finalizer ( l1Finalizer interfaces . L1Finalizer ) {
w . l1Finalizer = l1Finalizer
}
// GetLatestFinalizedBlockNumber() implements the L1Finalizer interface and allows other watchers to
// get the latest finalized block number from this watcher.
2022-10-26 12:20:13 -07:00
func ( w * Watcher ) GetLatestFinalizedBlockNumber ( ) uint64 {
return atomic . LoadUint64 ( & w . latestFinalizedBlockNumber )
}
2022-11-11 06:12:16 -08:00
// SetRootChainParams is used to enabled checkpointing (currently only for Polygon). It handles
// if the feature is either enabled or disabled, but ensures the configuration is valid.
func ( w * Watcher ) SetRootChainParams ( rootChainRpc string , rootChainContract string ) error {
if ( rootChainRpc == "" ) != ( rootChainContract == "" ) {
return fmt . Errorf ( "if either rootChainRpc or rootChainContract are set, they must both be set" )
}
w . rootChainRpc = rootChainRpc
w . rootChainContract = rootChainContract
return nil
}
func ( w * Watcher ) usePolygonCheckpointing ( ) bool {
return w . rootChainRpc != "" && w . rootChainContract != ""
}
2022-11-14 17:38:04 -08:00
// SetWaitForConfirmations is used to override whether we should wait for the number of confirmations specified by the consistencyLevel in the message.
func ( w * Watcher ) SetWaitForConfirmations ( waitForConfirmations bool ) {
w . waitForConfirmations = waitForConfirmations
}
// SetMaxWaitConfirmations is used to override the maximum number of confirmations to wait before declaring a transaction abandoned.
func ( w * Watcher ) SetMaxWaitConfirmations ( maxWaitConfirmations uint64 ) {
w . maxWaitConfirmations = maxWaitConfirmations
}