2022-09-28 04:15:57 -07:00
|
|
|
package evm
|
2020-08-06 06:43:09 -07:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
2022-06-14 07:22:49 -07:00
|
|
|
"sync"
|
|
|
|
"sync/atomic"
|
|
|
|
"time"
|
|
|
|
|
2022-09-28 06:27:13 -07:00
|
|
|
"github.com/certusone/wormhole/node/pkg/watchers/evm/connectors"
|
|
|
|
"github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi"
|
|
|
|
"github.com/certusone/wormhole/node/pkg/watchers/evm/finalizers"
|
2022-10-26 12:20:13 -07:00
|
|
|
"github.com/certusone/wormhole/node/pkg/watchers/evm/interfaces"
|
2022-09-28 04:15:57 -07:00
|
|
|
|
2021-08-26 01:35:09 -07:00
|
|
|
"github.com/certusone/wormhole/node/pkg/p2p"
|
|
|
|
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
|
2022-01-06 03:01:33 -08:00
|
|
|
"github.com/ethereum/go-ethereum/rpc"
|
2021-07-23 07:06:35 -07:00
|
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
2020-08-17 10:29:25 -07:00
|
|
|
|
2021-02-01 11:38:13 -08:00
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
|
2020-08-06 06:43:09 -07:00
|
|
|
eth_common "github.com/ethereum/go-ethereum/common"
|
2022-10-25 13:13:36 -07:00
|
|
|
eth_hexutil "github.com/ethereum/go-ethereum/common/hexutil"
|
2020-08-06 10:00:16 -07:00
|
|
|
"go.uber.org/zap"
|
2020-08-17 10:29:25 -07:00
|
|
|
|
2021-08-26 01:35:09 -07:00
|
|
|
"github.com/certusone/wormhole/node/pkg/common"
|
|
|
|
"github.com/certusone/wormhole/node/pkg/readiness"
|
|
|
|
"github.com/certusone/wormhole/node/pkg/supervisor"
|
2022-08-18 01:52:36 -07:00
|
|
|
"github.com/wormhole-foundation/wormhole/sdk/vaa"
|
2020-08-06 06:43:09 -07:00
|
|
|
)
|
|
|
|
|
2021-01-26 16:16:37 -08:00
|
|
|
var (
|
2021-07-23 07:06:35 -07:00
|
|
|
ethConnectionErrors = promauto.NewCounterVec(
|
2021-01-26 16:16:37 -08:00
|
|
|
prometheus.CounterOpts{
|
|
|
|
Name: "wormhole_eth_connection_errors_total",
|
|
|
|
Help: "Total number of Ethereum connection errors (either during initial connection or while watching)",
|
2021-07-28 05:33:42 -07:00
|
|
|
}, []string{"eth_network", "reason"})
|
2021-01-26 16:16:37 -08:00
|
|
|
|
2021-07-28 05:33:42 -07:00
|
|
|
ethMessagesObserved = promauto.NewCounterVec(
|
2021-01-26 16:16:37 -08:00
|
|
|
prometheus.CounterOpts{
|
2021-07-21 10:46:10 -07:00
|
|
|
Name: "wormhole_eth_messages_observed_total",
|
|
|
|
Help: "Total number of Eth messages observed (pre-confirmation)",
|
2021-07-28 05:33:42 -07:00
|
|
|
}, []string{"eth_network"})
|
2022-01-06 03:01:33 -08:00
|
|
|
ethMessagesOrphaned = promauto.NewCounterVec(
|
|
|
|
prometheus.CounterOpts{
|
|
|
|
Name: "wormhole_eth_messages_orphaned_total",
|
|
|
|
Help: "Total number of Eth messages dropped (orphaned)",
|
2022-01-08 16:42:57 -08:00
|
|
|
}, []string{"eth_network", "reason"})
|
2021-07-28 05:33:42 -07:00
|
|
|
ethMessagesConfirmed = promauto.NewCounterVec(
|
2021-01-26 16:16:37 -08:00
|
|
|
prometheus.CounterOpts{
|
2021-07-21 10:46:10 -07:00
|
|
|
Name: "wormhole_eth_messages_confirmed_total",
|
|
|
|
Help: "Total number of Eth messages verified (post-confirmation)",
|
2021-07-28 05:33:42 -07:00
|
|
|
}, []string{"eth_network"})
|
|
|
|
currentEthHeight = promauto.NewGaugeVec(
|
2021-01-26 16:16:37 -08:00
|
|
|
prometheus.GaugeOpts{
|
|
|
|
Name: "wormhole_eth_current_height",
|
|
|
|
Help: "Current Ethereum block height",
|
2021-07-28 05:33:42 -07:00
|
|
|
}, []string{"eth_network"})
|
2021-07-23 07:06:35 -07:00
|
|
|
queryLatency = promauto.NewHistogramVec(
|
2021-01-26 16:16:37 -08:00
|
|
|
prometheus.HistogramOpts{
|
|
|
|
Name: "wormhole_eth_query_latency",
|
|
|
|
Help: "Latency histogram for Ethereum calls (note that most interactions are streaming queries, NOT calls, and we cannot measure latency for those",
|
2021-07-28 05:33:42 -07:00
|
|
|
}, []string{"eth_network", "operation"})
|
2021-01-26 16:16:37 -08:00
|
|
|
)
|
|
|
|
|
2020-08-06 06:43:09 -07:00
|
|
|
type (
|
2021-08-30 07:19:00 -07:00
|
|
|
Watcher struct {
|
2021-07-28 05:33:42 -07:00
|
|
|
// Ethereum RPC url
|
|
|
|
url string
|
2022-09-28 04:15:57 -07:00
|
|
|
// Address of the Eth contract
|
2021-08-30 07:19:00 -07:00
|
|
|
contract eth_common.Address
|
2021-07-28 05:33:42 -07:00
|
|
|
// Human-readable name of the Eth network, for logging and monitoring.
|
|
|
|
networkName string
|
2021-08-02 05:30:16 -07:00
|
|
|
// Readiness component
|
|
|
|
readiness readiness.Component
|
2021-07-28 05:33:42 -07:00
|
|
|
// VAA ChainID of the network we're connecting to.
|
|
|
|
chainID vaa.ChainID
|
2021-07-28 06:22:06 -07:00
|
|
|
|
|
|
|
// Channel to send new messages to.
|
|
|
|
msgChan chan *common.MessagePublication
|
|
|
|
|
|
|
|
// Channel to send guardian set changes to.
|
|
|
|
// setChan can be set to nil if no guardian set changes are needed.
|
|
|
|
//
|
2021-07-28 05:33:42 -07:00
|
|
|
// We currently only fetch the guardian set from one primary chain, which should
|
|
|
|
// have this flag set to true, and false on all others.
|
|
|
|
//
|
|
|
|
// The current primary chain is Ethereum (a mostly arbitrary decision because it
|
|
|
|
// has the best API - we might want to switch the primary chain to Solana once
|
|
|
|
// the governance mechanism lives there),
|
|
|
|
setChan chan *common.GuardianSet
|
2020-08-06 10:00:16 -07:00
|
|
|
|
2022-02-08 14:16:43 -08:00
|
|
|
// Incoming re-observation requests from the network. Pre-filtered to only
|
|
|
|
// include requests for our chainID.
|
|
|
|
obsvReqC chan *gossipv1.ObservationRequest
|
|
|
|
|
2021-12-23 11:23:06 -08:00
|
|
|
pending map[pendingKey]*pendingMessage
|
2021-07-28 06:08:38 -07:00
|
|
|
pendingMu sync.Mutex
|
2021-08-21 11:56:43 -07:00
|
|
|
|
|
|
|
// 0 is a valid guardian set, so we need a nil value here
|
|
|
|
currentGuardianSet *uint32
|
2022-01-10 07:24:31 -08:00
|
|
|
|
|
|
|
// Minimum number of confirmations to accept, regardless of what the contract specifies.
|
|
|
|
minConfirmations uint64
|
2022-04-28 09:20:38 -07:00
|
|
|
|
|
|
|
// Interface to the chain specific ethereum library.
|
2022-10-25 13:13:36 -07:00
|
|
|
ethConn connectors.Connector
|
|
|
|
unsafeDevMode bool
|
2022-10-26 12:20:13 -07:00
|
|
|
|
|
|
|
latestFinalizedBlockNumber uint64
|
|
|
|
l1Finalizer interfaces.L1Finalizer
|
2020-08-06 06:43:09 -07:00
|
|
|
}
|
2020-08-06 10:00:16 -07:00
|
|
|
|
2021-12-23 11:23:06 -08:00
|
|
|
pendingKey struct {
|
2022-01-06 03:01:33 -08:00
|
|
|
TxHash eth_common.Hash
|
|
|
|
BlockHash eth_common.Hash
|
2021-12-23 11:23:06 -08:00
|
|
|
EmitterAddress vaa.Address
|
2022-01-06 03:01:33 -08:00
|
|
|
Sequence uint64
|
2021-12-23 11:23:06 -08:00
|
|
|
}
|
|
|
|
|
2021-07-28 06:08:38 -07:00
|
|
|
pendingMessage struct {
|
|
|
|
message *common.MessagePublication
|
|
|
|
height uint64
|
2020-08-06 10:00:16 -07:00
|
|
|
}
|
2020-08-06 06:43:09 -07:00
|
|
|
)
|
|
|
|
|
2021-08-30 07:19:00 -07:00
|
|
|
func NewEthWatcher(
|
2021-07-28 05:33:42 -07:00
|
|
|
url string,
|
2021-08-30 07:19:00 -07:00
|
|
|
contract eth_common.Address,
|
2021-07-28 05:33:42 -07:00
|
|
|
networkName string,
|
2021-08-02 05:30:16 -07:00
|
|
|
readiness readiness.Component,
|
2021-07-28 05:33:42 -07:00
|
|
|
chainID vaa.ChainID,
|
2021-07-28 06:08:38 -07:00
|
|
|
messageEvents chan *common.MessagePublication,
|
2022-01-10 07:24:31 -08:00
|
|
|
setEvents chan *common.GuardianSet,
|
2022-02-08 14:16:43 -08:00
|
|
|
minConfirmations uint64,
|
2022-04-28 09:20:38 -07:00
|
|
|
obsvReqC chan *gossipv1.ObservationRequest,
|
2022-10-26 12:20:13 -07:00
|
|
|
unsafeDevMode bool,
|
|
|
|
l1Finalizer interfaces.L1Finalizer,
|
|
|
|
) *Watcher {
|
2022-04-28 09:20:38 -07:00
|
|
|
|
2021-08-30 07:19:00 -07:00
|
|
|
return &Watcher{
|
2022-10-25 13:13:36 -07:00
|
|
|
url: url,
|
|
|
|
contract: contract,
|
|
|
|
networkName: networkName,
|
|
|
|
readiness: readiness,
|
|
|
|
minConfirmations: minConfirmations,
|
|
|
|
chainID: chainID,
|
|
|
|
msgChan: messageEvents,
|
|
|
|
setChan: setEvents,
|
|
|
|
obsvReqC: obsvReqC,
|
|
|
|
pending: map[pendingKey]*pendingMessage{},
|
|
|
|
unsafeDevMode: unsafeDevMode,
|
2022-10-26 12:20:13 -07:00
|
|
|
l1Finalizer: l1Finalizer,
|
2022-09-28 04:15:57 -07:00
|
|
|
}
|
2020-08-06 06:43:09 -07:00
|
|
|
}
|
|
|
|
|
2022-09-28 04:15:57 -07:00
|
|
|
func (w *Watcher) Run(ctx context.Context) error {
|
2021-07-28 05:33:42 -07:00
|
|
|
logger := supervisor.Logger(ctx)
|
|
|
|
|
2022-10-25 13:13:36 -07:00
|
|
|
useFinalizedBlocks := (w.chainID == vaa.ChainIDEthereum && (!w.unsafeDevMode))
|
|
|
|
if (w.chainID == vaa.ChainIDKarura || w.chainID == vaa.ChainIDAcala) && (!w.unsafeDevMode) {
|
|
|
|
ufb, err := w.getAcalaMode(ctx)
|
|
|
|
if err != nil {
|
2022-05-02 11:28:17 -07:00
|
|
|
return err
|
|
|
|
}
|
2022-10-25 13:13:36 -07:00
|
|
|
|
|
|
|
if ufb {
|
|
|
|
useFinalizedBlocks = true
|
|
|
|
}
|
2022-04-26 05:51:10 -07:00
|
|
|
}
|
|
|
|
|
2021-02-03 04:01:51 -08:00
|
|
|
// Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
|
2022-09-28 04:15:57 -07:00
|
|
|
p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{
|
|
|
|
ContractAddress: w.contract.Hex(),
|
2021-02-03 04:01:51 -08:00
|
|
|
})
|
|
|
|
|
2020-08-21 14:47:58 -07:00
|
|
|
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
|
|
|
|
defer cancel()
|
2020-08-06 06:43:09 -07:00
|
|
|
|
2022-09-28 04:15:57 -07:00
|
|
|
var err error
|
|
|
|
if w.chainID == vaa.ChainIDCelo && !w.unsafeDevMode {
|
|
|
|
// When we are running in mainnet or testnet, we need to use the Celo ethereum library rather than go-ethereum.
|
|
|
|
// However, in devnet, we currently run the standard ETH node for Celo, so we need to use the standard go-ethereum.
|
|
|
|
w.ethConn, err = connectors.NewCeloConnector(timeout, w.networkName, w.url, w.contract, logger)
|
|
|
|
if err != nil {
|
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
|
|
|
return fmt.Errorf("dialing eth client failed: %w", err)
|
|
|
|
}
|
2022-10-25 13:13:36 -07:00
|
|
|
} else if useFinalizedBlocks {
|
|
|
|
logger.Info("using finalized blocks")
|
2022-09-28 04:15:57 -07:00
|
|
|
baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
|
|
|
|
if err != nil {
|
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
|
|
|
return fmt.Errorf("dialing eth client failed: %w", err)
|
|
|
|
}
|
|
|
|
w.ethConn, err = connectors.NewBlockPollConnector(ctx, baseConnector, finalizers.NewDefaultFinalizer(), 250*time.Millisecond, true)
|
|
|
|
if err != nil {
|
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
|
|
|
return fmt.Errorf("creating block poll connector failed: %w", err)
|
|
|
|
}
|
|
|
|
} else if w.chainID == vaa.ChainIDMoonbeam && !w.unsafeDevMode {
|
|
|
|
baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
|
|
|
|
if err != nil {
|
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
|
|
|
return fmt.Errorf("dialing eth client failed: %w", err)
|
|
|
|
}
|
|
|
|
finalizer := finalizers.NewMoonbeamFinalizer(logger, baseConnector)
|
|
|
|
w.ethConn, err = connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false)
|
|
|
|
if err != nil {
|
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
|
|
|
return fmt.Errorf("creating block poll connector failed: %w", err)
|
|
|
|
}
|
|
|
|
} else if w.chainID == vaa.ChainIDNeon {
|
|
|
|
baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
|
|
|
|
if err != nil {
|
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
|
|
|
return fmt.Errorf("dialing eth client failed: %w", err)
|
|
|
|
}
|
|
|
|
pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizers.NewDefaultFinalizer(), 250*time.Millisecond, false)
|
|
|
|
if err != nil {
|
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
|
|
|
return fmt.Errorf("creating block poll connector failed: %w", err)
|
|
|
|
}
|
|
|
|
w.ethConn, err = connectors.NewLogPollConnector(ctx, pollConnector, baseConnector.Client())
|
|
|
|
if err != nil {
|
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
|
|
|
return fmt.Errorf("creating poll connector failed: %w", err)
|
|
|
|
}
|
2022-09-28 12:19:01 -07:00
|
|
|
} else if w.chainID == vaa.ChainIDArbitrum && !w.unsafeDevMode {
|
2022-10-26 12:20:13 -07:00
|
|
|
if w.l1Finalizer == nil {
|
|
|
|
return fmt.Errorf("unable to create arbitrum watcher because the l1 finalizer is not set")
|
|
|
|
}
|
2022-09-28 12:19:01 -07:00
|
|
|
baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
|
|
|
|
if err != nil {
|
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
|
|
|
return fmt.Errorf("dialing eth client failed: %w", err)
|
|
|
|
}
|
2022-10-26 12:20:13 -07:00
|
|
|
finalizer := finalizers.NewArbitrumFinalizer(logger, baseConnector, baseConnector.Client(), w.l1Finalizer)
|
2022-10-12 08:04:54 -07:00
|
|
|
pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false)
|
2022-09-28 12:19:01 -07:00
|
|
|
if err != nil {
|
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
|
|
|
return fmt.Errorf("creating block poll connector failed: %w", err)
|
|
|
|
}
|
2022-10-12 08:04:54 -07:00
|
|
|
w.ethConn, err = connectors.NewArbitrumConnector(ctx, pollConnector)
|
|
|
|
if err != nil {
|
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
|
|
|
return fmt.Errorf("creating arbitrum connector failed: %w", err)
|
|
|
|
}
|
2022-09-28 04:15:57 -07:00
|
|
|
} else {
|
|
|
|
w.ethConn, err = connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
|
|
|
|
if err != nil {
|
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
|
|
|
return fmt.Errorf("dialing eth client failed: %w", err)
|
|
|
|
}
|
2020-08-06 06:43:09 -07:00
|
|
|
}
|
2020-08-19 05:23:00 -07:00
|
|
|
|
2022-10-07 11:45:37 -07:00
|
|
|
// Subscribe to new message publications. We don't use a timeout here because the LogPollConnector
|
|
|
|
// will keep running. Other connectors will use a timeout internally if appropriate.
|
2022-09-28 04:15:57 -07:00
|
|
|
messageC := make(chan *ethabi.AbiLogMessagePublished, 2)
|
2022-10-07 11:45:37 -07:00
|
|
|
messageSub, err := w.ethConn.WatchLogMessagePublished(ctx, messageC)
|
2020-08-19 05:23:00 -07:00
|
|
|
if err != nil {
|
2022-09-28 04:15:57 -07:00
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "subscribe_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
2021-07-21 10:46:10 -07:00
|
|
|
return fmt.Errorf("failed to subscribe to message publication events: %w", err)
|
2020-08-19 05:23:00 -07:00
|
|
|
}
|
2022-10-19 08:20:46 -07:00
|
|
|
defer messageSub.Unsubscribe()
|
2020-08-19 05:23:00 -07:00
|
|
|
|
2021-08-21 11:56:43 -07:00
|
|
|
// Fetch initial guardian set
|
2022-09-28 04:15:57 -07:00
|
|
|
if err := w.fetchAndUpdateGuardianSet(logger, ctx, w.ethConn); err != nil {
|
2021-08-21 11:56:43 -07:00
|
|
|
return fmt.Errorf("failed to request guardian set: %v", err)
|
|
|
|
}
|
2020-08-06 06:43:09 -07:00
|
|
|
|
2022-04-11 12:07:19 -07:00
|
|
|
errC := make(chan error)
|
|
|
|
|
2021-08-10 08:35:18 -07:00
|
|
|
// Poll for guardian set.
|
|
|
|
go func() {
|
|
|
|
t := time.NewTicker(15 * time.Second)
|
|
|
|
defer t.Stop()
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case <-t.C:
|
2022-09-28 04:15:57 -07:00
|
|
|
if err := w.fetchAndUpdateGuardianSet(logger, ctx, w.ethConn); err != nil {
|
2022-04-11 12:07:19 -07:00
|
|
|
errC <- fmt.Errorf("failed to request guardian set: %v", err)
|
|
|
|
return
|
2021-08-10 08:35:18 -07:00
|
|
|
}
|
|
|
|
}
|
2021-07-28 05:33:42 -07:00
|
|
|
}
|
2021-08-10 08:35:18 -07:00
|
|
|
}()
|
2020-10-22 03:20:12 -07:00
|
|
|
|
2022-02-08 14:16:43 -08:00
|
|
|
// Track the current block number so we can compare it to the block number of
|
|
|
|
// the message publication for observation requests.
|
|
|
|
var currentBlockNumber uint64
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
2022-09-28 04:15:57 -07:00
|
|
|
case r := <-w.obsvReqC:
|
2022-02-08 14:16:43 -08:00
|
|
|
// This can't happen unless there is a programming error - the caller
|
|
|
|
// is expected to send us only requests for our chainID.
|
2022-09-28 04:15:57 -07:00
|
|
|
if vaa.ChainID(r.ChainId) != w.chainID {
|
2022-02-08 14:16:43 -08:00
|
|
|
panic("invalid chain ID")
|
|
|
|
}
|
|
|
|
|
|
|
|
tx := eth_common.BytesToHash(r.TxHash)
|
|
|
|
logger.Info("received observation request",
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName),
|
2022-02-08 14:16:43 -08:00
|
|
|
zap.String("tx_hash", tx.Hex()))
|
|
|
|
|
|
|
|
// SECURITY: Load the block number before requesting the transaction to avoid a
|
|
|
|
// race condition where requesting the tx succeeds and is then dropped due to a fork,
|
|
|
|
// but blockNumberU had already advanced beyond the required threshold.
|
|
|
|
//
|
|
|
|
// In the primary watcher flow, this is of no concern since we assume the node
|
|
|
|
// always sends the head before it sends the logs (implicit synchronization
|
|
|
|
// by relying on the same websocket connection).
|
|
|
|
blockNumberU := atomic.LoadUint64(¤tBlockNumber)
|
|
|
|
if blockNumberU == 0 {
|
|
|
|
logger.Error("no block number available, ignoring observation request",
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName))
|
2022-02-08 14:16:43 -08:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
|
2022-09-28 04:15:57 -07:00
|
|
|
blockNumber, msgs, err := MessageEventsForTransaction(timeout, w.ethConn, w.contract, w.chainID, tx)
|
2022-02-08 14:16:43 -08:00
|
|
|
cancel()
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
logger.Error("failed to process observation request",
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.Error(err), zap.String("eth_network", w.networkName))
|
2022-02-08 14:16:43 -08:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, msg := range msgs {
|
2022-09-30 09:03:01 -07:00
|
|
|
if msg.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately {
|
|
|
|
logger.Info("re-observed message publication transaction, publishing it immediately",
|
|
|
|
zap.Stringer("tx", msg.TxHash),
|
|
|
|
zap.Stringer("emitter_address", msg.EmitterAddress),
|
|
|
|
zap.Uint64("sequence", msg.Sequence),
|
|
|
|
zap.Uint64("current_block", blockNumberU),
|
|
|
|
zap.Uint64("observed_block", blockNumber),
|
|
|
|
zap.String("eth_network", w.networkName),
|
|
|
|
)
|
|
|
|
w.msgChan <- msg
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2022-02-08 14:16:43 -08:00
|
|
|
expectedConfirmations := uint64(msg.ConsistencyLevel)
|
2022-09-28 04:15:57 -07:00
|
|
|
if expectedConfirmations < w.minConfirmations {
|
|
|
|
expectedConfirmations = w.minConfirmations
|
2022-02-08 14:16:43 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
// SECURITY: In the recovery flow, we already know which transaction to
|
|
|
|
// observe, and we can assume that it has reached the expected finality
|
|
|
|
// level a long time ago. Therefore, the logic is much simpler than the
|
|
|
|
// primary watcher, which has to wait for finality.
|
|
|
|
//
|
|
|
|
// Instead, we can simply check if the transaction's block number is in
|
|
|
|
// the past by more than the expected confirmation number.
|
|
|
|
//
|
|
|
|
// Ensure that the current block number is at least expectedConfirmations
|
|
|
|
// larger than the message observation's block number.
|
|
|
|
if blockNumber+expectedConfirmations <= blockNumberU {
|
|
|
|
logger.Info("re-observed message publication transaction",
|
|
|
|
zap.Stringer("tx", msg.TxHash),
|
|
|
|
zap.Stringer("emitter_address", msg.EmitterAddress),
|
|
|
|
zap.Uint64("sequence", msg.Sequence),
|
|
|
|
zap.Uint64("current_block", blockNumberU),
|
|
|
|
zap.Uint64("observed_block", blockNumber),
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName),
|
2022-02-08 14:16:43 -08:00
|
|
|
)
|
2022-09-28 04:15:57 -07:00
|
|
|
w.msgChan <- msg
|
2022-02-08 14:16:43 -08:00
|
|
|
} else {
|
|
|
|
logger.Info("ignoring re-observed message publication transaction",
|
|
|
|
zap.Stringer("tx", msg.TxHash),
|
|
|
|
zap.Stringer("emitter_address", msg.EmitterAddress),
|
|
|
|
zap.Uint64("sequence", msg.Sequence),
|
|
|
|
zap.Uint64("current_block", blockNumberU),
|
|
|
|
zap.Uint64("observed_block", blockNumber),
|
|
|
|
zap.Uint64("expected_confirmations", expectedConfirmations),
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName),
|
2022-02-08 14:16:43 -08:00
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2020-08-06 06:43:09 -07:00
|
|
|
go func() {
|
|
|
|
for {
|
|
|
|
select {
|
2020-08-17 05:56:22 -07:00
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
2021-07-28 05:33:42 -07:00
|
|
|
case err := <-messageSub.Err():
|
2022-09-28 04:15:57 -07:00
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "subscription_error").Inc()
|
2021-07-28 05:33:42 -07:00
|
|
|
errC <- fmt.Errorf("error while processing message publication subscription: %w", err)
|
2022-09-28 04:15:57 -07:00
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
2020-08-19 05:23:00 -07:00
|
|
|
return
|
2021-07-21 10:46:10 -07:00
|
|
|
case ev := <-messageC:
|
2020-08-20 12:48:58 -07:00
|
|
|
// Request timestamp for block
|
2021-01-26 16:16:37 -08:00
|
|
|
msm := time.Now()
|
2022-01-10 11:48:33 -08:00
|
|
|
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
|
2022-09-28 04:15:57 -07:00
|
|
|
blockTime, err := w.ethConn.TimeOfBlockByHash(timeout, ev.Raw.BlockHash)
|
2020-08-21 14:47:58 -07:00
|
|
|
cancel()
|
2022-09-28 04:15:57 -07:00
|
|
|
queryLatency.WithLabelValues(w.networkName, "block_by_number").Observe(time.Since(msm).Seconds())
|
2021-01-26 16:16:37 -08:00
|
|
|
|
2020-08-20 12:48:58 -07:00
|
|
|
if err != nil {
|
2022-09-28 04:15:57 -07:00
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "block_by_number_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
2022-09-19 09:20:49 -07:00
|
|
|
errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w",
|
|
|
|
ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err)
|
2020-08-20 12:48:58 -07:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-12-23 11:23:06 -08:00
|
|
|
message := &common.MessagePublication{
|
2021-07-09 05:56:52 -07:00
|
|
|
TxHash: ev.Raw.TxHash,
|
2022-04-28 09:20:38 -07:00
|
|
|
Timestamp: time.Unix(int64(blockTime), 0),
|
2021-07-09 05:56:52 -07:00
|
|
|
Nonce: ev.Nonce,
|
|
|
|
Sequence: ev.Sequence,
|
2022-09-28 04:15:57 -07:00
|
|
|
EmitterChain: w.chainID,
|
2021-07-09 05:56:52 -07:00
|
|
|
EmitterAddress: PadAddress(ev.Sender),
|
|
|
|
Payload: ev.Payload,
|
2021-07-20 14:50:38 -07:00
|
|
|
ConsistencyLevel: ev.ConsistencyLevel,
|
2020-08-06 06:43:09 -07:00
|
|
|
}
|
2020-08-06 10:00:16 -07:00
|
|
|
|
2022-09-30 09:03:01 -07:00
|
|
|
ethMessagesObserved.WithLabelValues(w.networkName).Inc()
|
|
|
|
|
|
|
|
if message.ConsistencyLevel == vaa.ConsistencyLevelPublishImmediately {
|
|
|
|
logger.Info("found new message publication transaction, publishing it immediately",
|
|
|
|
zap.Stringer("tx", ev.Raw.TxHash),
|
|
|
|
zap.Uint64("block", ev.Raw.BlockNumber),
|
|
|
|
zap.Stringer("blockhash", ev.Raw.BlockHash),
|
|
|
|
zap.Uint64("Sequence", ev.Sequence),
|
|
|
|
zap.Uint32("Nonce", ev.Nonce),
|
|
|
|
zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
|
|
|
|
zap.String("eth_network", w.networkName))
|
|
|
|
|
|
|
|
w.msgChan <- message
|
|
|
|
ethMessagesConfirmed.WithLabelValues(w.networkName).Inc()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2022-03-03 12:31:15 -08:00
|
|
|
logger.Info("found new message publication transaction",
|
|
|
|
zap.Stringer("tx", ev.Raw.TxHash),
|
|
|
|
zap.Uint64("block", ev.Raw.BlockNumber),
|
|
|
|
zap.Stringer("blockhash", ev.Raw.BlockHash),
|
2022-06-14 07:22:49 -07:00
|
|
|
zap.Uint64("Sequence", ev.Sequence),
|
|
|
|
zap.Uint32("Nonce", ev.Nonce),
|
|
|
|
zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName))
|
2021-01-26 16:16:37 -08:00
|
|
|
|
2021-12-23 11:23:06 -08:00
|
|
|
key := pendingKey{
|
|
|
|
TxHash: message.TxHash,
|
2022-01-06 03:01:33 -08:00
|
|
|
BlockHash: ev.Raw.BlockHash,
|
2021-12-23 11:23:06 -08:00
|
|
|
EmitterAddress: message.EmitterAddress,
|
|
|
|
Sequence: message.Sequence,
|
|
|
|
}
|
|
|
|
|
2022-09-28 04:15:57 -07:00
|
|
|
w.pendingMu.Lock()
|
|
|
|
w.pending[key] = &pendingMessage{
|
2021-12-23 11:23:06 -08:00
|
|
|
message: message,
|
2021-07-28 06:08:38 -07:00
|
|
|
height: ev.Raw.BlockNumber,
|
2020-08-06 10:00:16 -07:00
|
|
|
}
|
2022-09-28 04:15:57 -07:00
|
|
|
w.pendingMu.Unlock()
|
2020-08-06 06:43:09 -07:00
|
|
|
}
|
|
|
|
}
|
2020-08-06 10:00:16 -07:00
|
|
|
}()
|
|
|
|
|
|
|
|
// Watch headers
|
2022-09-28 04:15:57 -07:00
|
|
|
headSink := make(chan *connectors.NewBlock, 2)
|
|
|
|
headerSubscription, err := w.ethConn.SubscribeForBlocks(ctx, headSink)
|
2020-08-06 10:00:16 -07:00
|
|
|
if err != nil {
|
2022-09-28 04:15:57 -07:00
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "header_subscribe_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
2020-08-06 10:00:16 -07:00
|
|
|
return fmt.Errorf("failed to subscribe to header events: %w", err)
|
|
|
|
}
|
2020-08-06 06:43:09 -07:00
|
|
|
|
2020-08-06 10:00:16 -07:00
|
|
|
go func() {
|
|
|
|
for {
|
|
|
|
select {
|
2020-08-17 05:56:22 -07:00
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
2021-08-08 08:16:41 -07:00
|
|
|
case err := <-headerSubscription.Err():
|
2022-09-28 04:15:57 -07:00
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "header_subscription_error").Inc()
|
2021-08-08 08:16:41 -07:00
|
|
|
errC <- fmt.Errorf("error while processing header subscription: %w", err)
|
2022-09-28 04:15:57 -07:00
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
2020-08-06 10:00:16 -07:00
|
|
|
return
|
|
|
|
case ev := <-headSink:
|
2022-06-23 07:40:04 -07:00
|
|
|
// These two pointers should have been checked before the event was placed on the channel, but just being safe.
|
2022-03-15 07:41:10 -07:00
|
|
|
if ev == nil {
|
2022-09-28 04:15:57 -07:00
|
|
|
logger.Error("new header event is nil", zap.String("eth_network", w.networkName))
|
2022-03-15 07:41:10 -07:00
|
|
|
continue
|
|
|
|
}
|
2022-06-23 07:40:04 -07:00
|
|
|
if ev.Number == nil {
|
2022-09-28 04:15:57 -07:00
|
|
|
logger.Error("new header block number is nil", zap.String("eth_network", w.networkName))
|
2022-06-23 07:40:04 -07:00
|
|
|
continue
|
|
|
|
}
|
2022-03-15 07:41:10 -07:00
|
|
|
|
2020-08-06 10:00:16 -07:00
|
|
|
start := time.Now()
|
2022-05-18 13:57:06 -07:00
|
|
|
currentHash := ev.Hash
|
2022-01-08 16:42:57 -08:00
|
|
|
logger.Info("processing new header",
|
|
|
|
zap.Stringer("current_block", ev.Number),
|
|
|
|
zap.Stringer("current_blockhash", currentHash),
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName))
|
|
|
|
currentEthHeight.WithLabelValues(w.networkName).Set(float64(ev.Number.Int64()))
|
|
|
|
readiness.SetReady(w.readiness)
|
|
|
|
p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{
|
2022-01-06 03:01:33 -08:00
|
|
|
Height: ev.Number.Int64(),
|
2022-09-28 04:15:57 -07:00
|
|
|
ContractAddress: w.contract.Hex(),
|
2021-02-03 04:01:51 -08:00
|
|
|
})
|
2020-11-27 15:46:37 -08:00
|
|
|
|
2022-09-28 04:15:57 -07:00
|
|
|
w.pendingMu.Lock()
|
2020-08-06 10:00:16 -07:00
|
|
|
|
|
|
|
blockNumberU := ev.Number.Uint64()
|
2022-02-08 14:16:43 -08:00
|
|
|
atomic.StoreUint64(¤tBlockNumber, blockNumberU)
|
2022-10-26 12:20:13 -07:00
|
|
|
atomic.StoreUint64(&w.latestFinalizedBlockNumber, blockNumberU)
|
2022-01-10 07:24:31 -08:00
|
|
|
|
2022-09-28 04:15:57 -07:00
|
|
|
for key, pLock := range w.pending {
|
2022-01-10 07:24:31 -08:00
|
|
|
expectedConfirmations := uint64(pLock.message.ConsistencyLevel)
|
2022-09-28 04:15:57 -07:00
|
|
|
if expectedConfirmations < w.minConfirmations {
|
|
|
|
expectedConfirmations = w.minConfirmations
|
2022-01-10 07:24:31 -08:00
|
|
|
}
|
2020-08-06 10:00:16 -07:00
|
|
|
|
|
|
|
// Transaction was dropped and never picked up again
|
2022-01-10 07:24:31 -08:00
|
|
|
if pLock.height+4*uint64(expectedConfirmations) <= blockNumberU {
|
2022-01-10 16:50:34 -08:00
|
|
|
logger.Info("observation timed out",
|
2022-01-08 16:42:57 -08:00
|
|
|
zap.Stringer("tx", pLock.message.TxHash),
|
|
|
|
zap.Stringer("blockhash", key.BlockHash),
|
|
|
|
zap.Stringer("emitter_address", key.EmitterAddress),
|
|
|
|
zap.Uint64("sequence", key.Sequence),
|
|
|
|
zap.Stringer("current_block", ev.Number),
|
|
|
|
zap.Stringer("current_blockhash", currentHash),
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName),
|
2022-01-08 16:42:57 -08:00
|
|
|
)
|
2022-09-28 04:15:57 -07:00
|
|
|
ethMessagesOrphaned.WithLabelValues(w.networkName, "timeout").Inc()
|
|
|
|
delete(w.pending, key)
|
2020-08-06 10:00:16 -07:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Transaction is now ready
|
2022-01-10 07:24:31 -08:00
|
|
|
if pLock.height+uint64(expectedConfirmations) <= blockNumberU {
|
2022-01-10 11:48:33 -08:00
|
|
|
timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
|
2022-09-28 04:15:57 -07:00
|
|
|
tx, err := w.ethConn.TransactionReceipt(timeout, pLock.message.TxHash)
|
2022-01-06 03:01:33 -08:00
|
|
|
cancel()
|
2022-01-10 11:13:33 -08:00
|
|
|
|
2022-05-04 07:24:47 -07:00
|
|
|
// If the node returns an error after waiting expectedConfirmation blocks,
|
|
|
|
// it means the chain reorged and the transaction was orphaned. The
|
|
|
|
// TransactionReceipt call is using the same websocket connection than the
|
|
|
|
// head notifications, so it's guaranteed to be atomic.
|
|
|
|
//
|
|
|
|
// Check multiple possible error cases - the node seems to return a
|
|
|
|
// "not found" error most of the time, but it could conceivably also
|
|
|
|
// return a nil tx or rpc.ErrNoResult.
|
|
|
|
if tx == nil || err == rpc.ErrNoResult || (err != nil && err.Error() == "not found") {
|
|
|
|
logger.Warn("tx was orphaned",
|
2022-03-06 17:54:57 -08:00
|
|
|
zap.Stringer("tx", pLock.message.TxHash),
|
|
|
|
zap.Stringer("blockhash", key.BlockHash),
|
|
|
|
zap.Stringer("emitter_address", key.EmitterAddress),
|
|
|
|
zap.Uint64("sequence", key.Sequence),
|
|
|
|
zap.Stringer("current_block", ev.Number),
|
|
|
|
zap.Stringer("current_blockhash", currentHash),
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName),
|
2022-03-06 17:54:57 -08:00
|
|
|
zap.Error(err))
|
2022-09-28 04:15:57 -07:00
|
|
|
delete(w.pending, key)
|
|
|
|
ethMessagesOrphaned.WithLabelValues(w.networkName, "not_found").Inc()
|
2022-03-06 17:54:57 -08:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2022-05-04 07:24:47 -07:00
|
|
|
// This should never happen - if we got this far, it means that logs were emitted,
|
|
|
|
// which is only possible if the transaction succeeded. We check it anyway just
|
|
|
|
// in case the EVM implementation is buggy.
|
|
|
|
if tx.Status != 1 {
|
|
|
|
logger.Error("transaction receipt with non-success status",
|
2022-01-08 16:42:57 -08:00
|
|
|
zap.Stringer("tx", pLock.message.TxHash),
|
|
|
|
zap.Stringer("blockhash", key.BlockHash),
|
|
|
|
zap.Stringer("emitter_address", key.EmitterAddress),
|
|
|
|
zap.Uint64("sequence", key.Sequence),
|
|
|
|
zap.Stringer("current_block", ev.Number),
|
|
|
|
zap.Stringer("current_blockhash", currentHash),
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName),
|
2022-01-08 16:42:57 -08:00
|
|
|
zap.Error(err))
|
2022-09-28 04:15:57 -07:00
|
|
|
delete(w.pending, key)
|
|
|
|
ethMessagesOrphaned.WithLabelValues(w.networkName, "tx_failed").Inc()
|
2022-01-06 03:01:33 -08:00
|
|
|
continue
|
|
|
|
}
|
2022-01-10 11:13:33 -08:00
|
|
|
|
|
|
|
// Any error other than "not found" is likely transient - we retry next block.
|
|
|
|
if err != nil {
|
|
|
|
logger.Warn("transaction could not be fetched",
|
2022-01-08 16:42:57 -08:00
|
|
|
zap.Stringer("tx", pLock.message.TxHash),
|
|
|
|
zap.Stringer("blockhash", key.BlockHash),
|
|
|
|
zap.Stringer("emitter_address", key.EmitterAddress),
|
|
|
|
zap.Uint64("sequence", key.Sequence),
|
|
|
|
zap.Stringer("current_block", ev.Number),
|
|
|
|
zap.Stringer("current_blockhash", currentHash),
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName),
|
2022-01-10 11:13:33 -08:00
|
|
|
zap.Error(err))
|
2022-01-06 03:01:33 -08:00
|
|
|
continue
|
|
|
|
}
|
2022-01-10 11:13:33 -08:00
|
|
|
|
|
|
|
// It's possible for a transaction to be orphaned and then included in a different block
|
|
|
|
// but with the same tx hash. Drop the observation (it will be re-observed and needs to
|
|
|
|
// wait for the full confirmation time again).
|
2022-01-06 03:01:33 -08:00
|
|
|
if tx.BlockHash != key.BlockHash {
|
2022-01-08 16:42:57 -08:00
|
|
|
logger.Info("tx got dropped and mined in a different block; the message should have been reobserved",
|
|
|
|
zap.Stringer("tx", pLock.message.TxHash),
|
|
|
|
zap.Stringer("blockhash", key.BlockHash),
|
|
|
|
zap.Stringer("emitter_address", key.EmitterAddress),
|
|
|
|
zap.Uint64("sequence", key.Sequence),
|
|
|
|
zap.Stringer("current_block", ev.Number),
|
|
|
|
zap.Stringer("current_blockhash", currentHash),
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName))
|
|
|
|
delete(w.pending, key)
|
|
|
|
ethMessagesOrphaned.WithLabelValues(w.networkName, "blockhash_mismatch").Inc()
|
2022-01-06 03:01:33 -08:00
|
|
|
continue
|
|
|
|
}
|
2022-01-10 11:13:33 -08:00
|
|
|
|
2022-01-08 16:42:57 -08:00
|
|
|
logger.Info("observation confirmed",
|
|
|
|
zap.Stringer("tx", pLock.message.TxHash),
|
|
|
|
zap.Stringer("blockhash", key.BlockHash),
|
|
|
|
zap.Stringer("emitter_address", key.EmitterAddress),
|
|
|
|
zap.Uint64("sequence", key.Sequence),
|
|
|
|
zap.Stringer("current_block", ev.Number),
|
|
|
|
zap.Stringer("current_blockhash", currentHash),
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName))
|
|
|
|
delete(w.pending, key)
|
|
|
|
w.msgChan <- pLock.message
|
|
|
|
ethMessagesConfirmed.WithLabelValues(w.networkName).Inc()
|
2020-08-06 10:00:16 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-28 04:15:57 -07:00
|
|
|
w.pendingMu.Unlock()
|
2022-01-08 16:42:57 -08:00
|
|
|
logger.Info("processed new header",
|
|
|
|
zap.Stringer("current_block", ev.Number),
|
|
|
|
zap.Stringer("current_blockhash", currentHash),
|
|
|
|
zap.Duration("took", time.Since(start)),
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName))
|
2020-08-06 10:00:16 -07:00
|
|
|
}
|
|
|
|
}
|
2020-08-06 06:43:09 -07:00
|
|
|
}()
|
|
|
|
|
2022-10-05 22:19:31 -07:00
|
|
|
// Now that the init is complete, peg readiness. That will also happen when we process a new head, but chains
|
|
|
|
// that wait for finality may take a while to receive the first block and we don't want to hold up the init.
|
|
|
|
readiness.SetReady(w.readiness)
|
|
|
|
|
2020-08-21 15:21:41 -07:00
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
case err := <-errC:
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-09-28 04:15:57 -07:00
|
|
|
func (w *Watcher) fetchAndUpdateGuardianSet(
|
2021-08-21 11:56:43 -07:00
|
|
|
logger *zap.Logger,
|
|
|
|
ctx context.Context,
|
2022-09-28 04:15:57 -07:00
|
|
|
ethConn connectors.Connector,
|
2021-08-21 11:56:43 -07:00
|
|
|
) error {
|
|
|
|
msm := time.Now()
|
|
|
|
logger.Info("fetching guardian set")
|
|
|
|
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
|
|
|
|
defer cancel()
|
2022-09-28 04:15:57 -07:00
|
|
|
idx, gs, err := fetchCurrentGuardianSet(timeout, ethConn)
|
2021-08-21 11:56:43 -07:00
|
|
|
if err != nil {
|
2022-09-28 04:15:57 -07:00
|
|
|
ethConnectionErrors.WithLabelValues(w.networkName, "guardian_set_fetch_error").Inc()
|
|
|
|
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
2021-08-21 11:56:43 -07:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2022-09-28 04:15:57 -07:00
|
|
|
queryLatency.WithLabelValues(w.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())
|
2021-08-21 11:56:43 -07:00
|
|
|
|
2022-09-28 04:15:57 -07:00
|
|
|
if w.currentGuardianSet != nil && *(w.currentGuardianSet) == idx {
|
2021-08-21 11:56:43 -07:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
logger.Info("updated guardian set found",
|
|
|
|
zap.Any("value", gs), zap.Uint32("index", idx),
|
2022-09-28 04:15:57 -07:00
|
|
|
zap.String("eth_network", w.networkName))
|
2021-08-21 11:56:43 -07:00
|
|
|
|
2022-09-28 04:15:57 -07:00
|
|
|
w.currentGuardianSet = &idx
|
2021-08-21 11:56:43 -07:00
|
|
|
|
2022-09-28 04:15:57 -07:00
|
|
|
if w.setChan != nil {
|
|
|
|
w.setChan <- &common.GuardianSet{
|
2021-08-21 11:56:43 -07:00
|
|
|
Keys: gs.Keys,
|
|
|
|
Index: idx,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-08-21 15:21:41 -07:00
|
|
|
// Fetch the current guardian set ID and guardian set from the chain.
|
2022-09-28 04:15:57 -07:00
|
|
|
func fetchCurrentGuardianSet(ctx context.Context, ethConn connectors.Connector) (uint32, *ethabi.StructsGuardianSet, error) {
|
|
|
|
currentIndex, err := ethConn.GetCurrentGuardianSetIndex(ctx)
|
2020-08-19 05:23:00 -07:00
|
|
|
if err != nil {
|
2020-08-21 15:21:41 -07:00
|
|
|
return 0, nil, fmt.Errorf("error requesting current guardian set index: %w", err)
|
2020-08-19 05:23:00 -07:00
|
|
|
}
|
|
|
|
|
2022-09-28 04:15:57 -07:00
|
|
|
gs, err := ethConn.GetGuardianSet(ctx, currentIndex)
|
2020-08-19 05:23:00 -07:00
|
|
|
if err != nil {
|
2020-08-21 15:21:41 -07:00
|
|
|
return 0, nil, fmt.Errorf("error requesting current guardian set value: %w", err)
|
2020-08-19 05:23:00 -07:00
|
|
|
}
|
|
|
|
|
2020-08-21 15:21:41 -07:00
|
|
|
return currentIndex, &gs, nil
|
2020-08-06 06:43:09 -07:00
|
|
|
}
|
2022-04-26 05:51:10 -07:00
|
|
|
|
2022-10-25 13:13:36 -07:00
|
|
|
func (w *Watcher) getAcalaMode(ctx context.Context) (useFinalizedBlocks bool, errRet error) {
|
2022-05-02 11:28:17 -07:00
|
|
|
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
|
|
|
|
defer cancel()
|
2022-04-26 05:51:10 -07:00
|
|
|
|
2022-09-28 04:15:57 -07:00
|
|
|
c, err := rpc.DialContext(timeout, w.url)
|
2022-05-02 11:28:17 -07:00
|
|
|
if err != nil {
|
2022-10-25 13:13:36 -07:00
|
|
|
errRet = fmt.Errorf("failed to connect to url %s to check acala mode: %w", w.url, err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// First check to see if polling for finalized blocks is suported.
|
|
|
|
type Marshaller struct {
|
|
|
|
Number *eth_hexutil.Big
|
|
|
|
}
|
|
|
|
|
|
|
|
var m Marshaller
|
|
|
|
err = c.CallContext(ctx, &m, "eth_getBlockByNumber", "finalized", false)
|
|
|
|
if err == nil {
|
|
|
|
useFinalizedBlocks = true
|
|
|
|
return
|
2022-05-02 11:28:17 -07:00
|
|
|
}
|
2022-04-26 05:51:10 -07:00
|
|
|
|
2022-10-25 13:13:36 -07:00
|
|
|
// If finalized blocks are not supported, then we had better be in safe mode!
|
2022-05-02 11:28:17 -07:00
|
|
|
var safe bool
|
|
|
|
err = c.CallContext(ctx, &safe, "net_isSafeMode")
|
|
|
|
if err != nil {
|
2022-10-25 13:13:36 -07:00
|
|
|
errRet = fmt.Errorf("check for safe mode for url %s failed: %w", w.url, err)
|
|
|
|
return
|
2022-05-02 11:28:17 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
if !safe {
|
2022-10-25 13:13:36 -07:00
|
|
|
errRet = fmt.Errorf("url %s does not support finalized blocks and is not using safe mode", w.url)
|
2022-04-26 05:51:10 -07:00
|
|
|
}
|
|
|
|
|
2022-10-25 13:13:36 -07:00
|
|
|
return
|
2022-04-26 05:51:10 -07:00
|
|
|
}
|
2022-10-26 12:20:13 -07:00
|
|
|
|
|
|
|
func (w *Watcher) GetLatestFinalizedBlockNumber() uint64 {
|
|
|
|
return atomic.LoadUint64(&w.latestFinalizedBlockNumber)
|
|
|
|
}
|