Node/EVM: Linea poller (#3872)

* Node/EVM: Linea poller

* Explicitly check finality type in watcher
This commit is contained in:
bruce-riley 2024-04-22 10:11:45 -05:00 committed by GitHub
parent 2712dd3f3f
commit 9af1fac9e1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 5584 additions and 19 deletions

View File

@ -182,8 +182,10 @@ var (
xlayerRPC *string
xlayerContract *string
lineaRPC *string
lineaContract *string
lineaRPC *string
lineaContract *string
lineaRollUpUrl *string
lineaRollUpContract *string
berachainRPC *string
berachainContract *string
@ -386,6 +388,8 @@ func init() {
lineaRPC = node.RegisterFlagWithValidationOrFail(NodeCmd, "lineaRPC", "Linea RPC URL", "ws://eth-devnet:8545", []string{"ws", "wss"})
lineaContract = NodeCmd.Flags().String("lineaContract", "", "Linea contract address")
lineaRollUpUrl = NodeCmd.Flags().String("lineaRollUpUrl", "", "Linea roll up URL")
lineaRollUpContract = NodeCmd.Flags().String("lineaRollUpContract", "", "Linea roll up contract address")
berachainRPC = node.RegisterFlagWithValidationOrFail(NodeCmd, "berachainRPC", "Berachain RPC URL", "ws://eth-devnet:8545", []string{"ws", "wss"})
berachainContract = NodeCmd.Flags().String("berachainContract", "", "Berachain contract address")
@ -802,6 +806,9 @@ func runNode(cmd *cobra.Command, args []string) {
if (*lineaRPC == "") != (*lineaContract == "") {
logger.Fatal("Both --lineaContract and --lineaRPC must be set together or both unset")
}
if (*lineaRPC != "") && (*lineaRollUpUrl == "" || *lineaRollUpContract == "") && !*unsafeDevMode {
logger.Fatal("If --lineaRPC is specified, --lineaRollUpUrl and --lineaRollUpContract must also be specified")
}
if *berachainRPC != "" && !*testnetMode && !*unsafeDevMode {
logger.Fatal("berachain is currently only supported in devnet and testnet")
@ -1506,11 +1513,13 @@ func runNode(cmd *cobra.Command, args []string) {
if shouldStart(lineaRPC) {
wc := &evm.WatcherConfig{
NetworkID: "linea",
ChainID: vaa.ChainIDLinea,
Rpc: *lineaRPC,
Contract: *lineaContract,
CcqBackfillCache: *ccqBackfillCache,
NetworkID: "linea",
ChainID: vaa.ChainIDLinea,
Rpc: *lineaRPC,
Contract: *lineaContract,
CcqBackfillCache: *ccqBackfillCache,
LineaRollUpUrl: *lineaRollUpUrl,
LineaRollUpContract: *lineaRollUpContract,
}
watcherConfigs = append(watcherConfigs, wc)

View File

@ -1,6 +1,8 @@
package evm
import (
"errors"
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/query"
@ -20,6 +22,10 @@ type WatcherConfig struct {
L1FinalizerRequired watchers.NetworkID // (optional)
l1Finalizer interfaces.L1Finalizer
CcqBackfillCache bool
// These parameters are currently only used for Linea and should be set via SetLineaParams()
LineaRollUpUrl string
LineaRollUpContract string
}
func (wc *WatcherConfig) GetNetworkID() watchers.NetworkID {
@ -57,5 +63,12 @@ func (wc *WatcherConfig) Create(
watcher := NewEthWatcher(wc.Rpc, eth_common.HexToAddress(wc.Contract), string(wc.NetworkID), wc.ChainID, msgC, setWriteC, obsvReqC, queryReqC, queryResponseC, devMode, wc.CcqBackfillCache)
watcher.SetL1Finalizer(wc.l1Finalizer)
if wc.ChainID == vaa.ChainIDLinea {
if err := watcher.SetLineaParams(wc.LineaRollUpUrl, wc.LineaRollUpContract); err != nil {
return nil, nil, err
}
} else if wc.LineaRollUpUrl != "" || wc.LineaRollUpContract != "" {
return nil, nil, errors.New("LineaRollUpUrl and LineaRollUpContract may only be specified for Linea")
}
return watcher, watcher.Run, nil
}

View File

@ -0,0 +1,218 @@
// A block is considered finalized on Linea when it is marked finalized by the LineaRollup contract on Ethereum.
//
// For a discussion of finality on Linea, see here:
// https://www.notion.so/wormholefoundation/Testnet-Info-V2-633e4aa64a634d56a7ce07a103789774?pvs=4#03513c2eb3654d33aff2206a562d25b1
//
// The LineaRollup proxy contract on ethereum is available at the following addresses:
// Mainnet: 0xd19d4B5d358258f05D7B411E21A1460D11B0876F
// Testnet: 0xB218f8A4Bc926cF1cA7b3423c154a0D627Bdb7E5
//
// To generate the golang abi for the LineaRollup contract:
// - Grab the ABIs from the LineaRollup contract (not the proxy) (0x934Dd4C63E285551CEceF8459103554D0096c179 on Ethereum mainnet) and put it in /tmp/LineaRollup.abi.
// - mkdir node/pkg/watchers/evm/connectors/lineaabi
// - Install abigen: go install github.com/ethereum/go-ethereum/cmd/abigen@latest
// - abigen --abi /tmp/LineaRollup.abi --pkg lineaabi --out node/pkg/watchers/evm/connectors/lineaabi/LineaRollup.go
package connectors
import (
"context"
"fmt"
"time"
"github.com/certusone/wormhole/node/pkg/common"
rollUpAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/lineaabi"
ethereum "github.com/ethereum/go-ethereum"
ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind"
ethCommon "github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethClient "github.com/ethereum/go-ethereum/ethclient"
ethRpc "github.com/ethereum/go-ethereum/rpc"
"go.uber.org/zap"
)
// LineaConnector listens for new finalized blocks for Linea by reading the roll up contract on Ethereum.
type LineaConnector struct {
Connector
logger *zap.Logger
// These are used for querying the roll up contract.
rollUpRawClient *ethRpc.Client
rollUpClient *ethClient.Client
// These are used to subscribe for new block finalized events from the roll up contract.
rollUpFilterer *rollUpAbi.LineaabiFilterer
rollUpCaller *rollUpAbi.LineaabiCaller
latestBlockNum uint64
latestFinalizedBlockNum uint64
}
// NewLineaConnector creates a new Linea poll connector using the specified roll up contract.
func NewLineaConnector(
ctx context.Context,
logger *zap.Logger,
baseConnector Connector,
rollUpUrl string,
rollUpAddress string,
) (*LineaConnector, error) {
rollUpRawClient, err := ethRpc.DialContext(ctx, rollUpUrl)
if err != nil {
return nil, fmt.Errorf("failed to create roll up raw client for url %s: %w", rollUpUrl, err)
}
rollUpClient := ethClient.NewClient(rollUpRawClient)
addr := ethCommon.HexToAddress(rollUpAddress)
rollUpFilterer, err := rollUpAbi.NewLineaabiFilterer(addr, rollUpClient)
if err != nil {
return nil, fmt.Errorf("failed to create roll up filter for url %s: %w", rollUpUrl, err)
}
rollUpCaller, err := rollUpAbi.NewLineaabiCaller(addr, rollUpClient)
if err != nil {
return nil, fmt.Errorf("failed to create roll up caller for url %s: %w", rollUpUrl, err)
}
logger.Info("Using roll up for Linea", zap.String("rollUpUrl", rollUpUrl), zap.String("rollUpAddress", rollUpAddress))
connector := &LineaConnector{
Connector: baseConnector,
logger: logger,
rollUpRawClient: rollUpRawClient,
rollUpClient: rollUpClient,
rollUpFilterer: rollUpFilterer,
rollUpCaller: rollUpCaller,
}
return connector, nil
}
// SubscribeForBlocks starts polling. It implements the standard connector interface.
func (c *LineaConnector) SubscribeForBlocks(ctx context.Context, errC chan error, sink chan<- *NewBlock) (ethereum.Subscription, error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
// Use the standard geth head sink to get latest blocks.
headSink := make(chan *ethTypes.Header, 2)
headerSubscription, err := c.Connector.Client().SubscribeNewHead(ctx, headSink)
if err != nil {
return nil, fmt.Errorf("failed to subscribe for latest blocks: %w", err)
}
// Subscribe to data finalized events from the roll up contract.
dataFinalizedChan := make(chan *rollUpAbi.LineaabiDataFinalized, 2)
dataFinalizedSub, err := c.rollUpFilterer.WatchDataFinalized(&ethBind.WatchOpts{Context: timeout}, dataFinalizedChan, nil, nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to subscribe for events from roll up contract: %w", err)
}
// Get the current latest block on Linea.
latestBlock, err := GetBlockByFinality(timeout, c.logger, c.Connector, Latest)
if err != nil {
return nil, fmt.Errorf("failed to get current latest block: %w", err)
}
c.latestBlockNum = latestBlock.Number.Uint64()
// Get and publish the current latest finalized block.
opts := &ethBind.CallOpts{Context: timeout}
initialBlock, err := c.rollUpCaller.CurrentL2BlockNumber(opts)
if err != nil {
return nil, fmt.Errorf("failed to get initial block: %w", err)
}
c.latestFinalizedBlockNum = initialBlock.Uint64()
if c.latestFinalizedBlockNum > c.latestBlockNum {
return nil, fmt.Errorf("latest finalized block reported by L1 (%d) is ahead of latest block reported by L2 (%d), L2 node seems to be stuck",
c.latestFinalizedBlockNum, c.latestBlockNum)
}
c.logger.Info("queried initial finalized block", zap.Uint64("initialBlock", c.latestFinalizedBlockNum), zap.Uint64("latestBlock", c.latestBlockNum))
if err = c.postFinalizedAndSafe(ctx, c.latestFinalizedBlockNum, sink); err != nil {
return nil, fmt.Errorf("failed to post initial block: %w", err)
}
common.RunWithScissors(ctx, errC, "linea_block_poller", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
dataFinalizedSub.Unsubscribe()
return nil
case err := <-dataFinalizedSub.Err():
errC <- fmt.Errorf("finalized data watcher posted an error: %w", err)
dataFinalizedSub.Unsubscribe()
return nil
case evt := <-dataFinalizedChan:
if err := c.processDataFinalizedEvent(ctx, sink, evt); err != nil {
errC <- fmt.Errorf("failed to process block finalized event: %w", err)
dataFinalizedSub.Unsubscribe()
return nil
}
case ev := <-headSink:
if ev == nil {
c.logger.Error("new latest header event is nil")
continue
}
if ev.Number == nil {
c.logger.Error("new latest header block number is nil")
continue
}
c.latestBlockNum = ev.Number.Uint64()
sink <- &NewBlock{
Number: ev.Number,
Time: ev.Time,
Hash: ev.Hash(),
Finality: Latest,
}
}
}
})
return headerSubscription, nil
}
// processDataFinalizedEvent handles a DataFinalized event published by the roll up contract.
func (c *LineaConnector) processDataFinalizedEvent(ctx context.Context, sink chan<- *NewBlock, evt *rollUpAbi.LineaabiDataFinalized) error {
latestFinalizedBlockNum := evt.LastBlockFinalized.Uint64()
// Leaving this log info in for now because these events come very infrequently.
c.logger.Info("processing data finalized event",
zap.Uint64("latestFinalizedBlockNum", latestFinalizedBlockNum),
zap.Uint64("prevFinalizedBlockNum", c.latestFinalizedBlockNum),
)
if latestFinalizedBlockNum > c.latestBlockNum {
return fmt.Errorf("latest finalized block reported by L1 (%d) is ahead of latest block reported by L2 (%d), L2 node seems to be stuck",
latestFinalizedBlockNum, c.latestBlockNum)
}
for blockNum := c.latestFinalizedBlockNum + 1; blockNum <= latestFinalizedBlockNum; blockNum++ {
if err := c.postFinalizedAndSafe(ctx, blockNum, sink); err != nil {
c.latestFinalizedBlockNum = blockNum - 1
return fmt.Errorf("failed to post block %d: %w", blockNum, err)
}
}
c.latestFinalizedBlockNum = latestFinalizedBlockNum
return nil
}
// postFinalizedAndSafe publishes a block as finalized and safe. It takes a block number and looks it up on chain to publish the current values.
func (c *LineaConnector) postFinalizedAndSafe(ctx context.Context, blockNum uint64, sink chan<- *NewBlock) error {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
block, err := GetBlockByNumberUint64(timeout, c.logger, c.Connector, blockNum, Finalized)
if err != nil {
return fmt.Errorf("failed to get block %d: %w", blockNum, err)
}
// Publish the finalized block.
sink <- block
// Publish same thing for the safe block.
sink <- block.Copy(Safe)
return nil
}

File diff suppressed because one or more lines are too long

View File

@ -2,6 +2,7 @@ package evm
import (
"context"
"errors"
"fmt"
"math"
"math/big"
@ -136,6 +137,10 @@ type (
ccqBatchSize int64
ccqBackfillCache bool
ccqLogger *zap.Logger
// These parameters are currently only used for Linea and should be set via SetLineaParams()
lineaRollUpUrl string
lineaRollUpContract string
}
pendingKey struct {
@ -242,6 +247,19 @@ func (w *Watcher) Run(parentCtx context.Context) error {
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("dialing eth client failed: %w", err)
}
} else if w.chainID == vaa.ChainIDLinea {
baseConnector, err := connectors.NewEthereumBaseConnector(timeout, w.networkName, w.url, w.contract, logger)
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("dialing eth client failed: %w", err)
}
w.ethConn, err = connectors.NewLineaConnector(ctx, logger, baseConnector, w.lineaRollUpUrl, w.lineaRollUpContract)
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("failed to create Linea poller: %w", err)
}
} else {
// Everything else is instant finality.
logger.Info("assuming instant finality")
@ -699,41 +717,61 @@ func fetchCurrentGuardianSet(ctx context.Context, ethConn connectors.Connector)
// getFinality determines if the chain supports "finalized" and "safe". This is hard coded so it requires thought to change something. However, it also reads the RPC
// to make sure the node actually supports the expected values, and returns an error if it doesn't. Note that we do not support using safe mode but not finalized mode.
func (w *Watcher) getFinality(ctx context.Context) (bool, bool, error) {
// TODO: Need to handle finality for Linea before it can be deployed in Mainnet.
finalized := false
safe := false
// Tilt supports polling for both finalized and safe.
if w.unsafeDevMode {
finalized = true
safe = true
// The following chains support polling for both finalized and safe.
} else if w.chainID == vaa.ChainIDAcala ||
w.chainID == vaa.ChainIDArbitrum ||
w.chainID == vaa.ChainIDArbitrumSepolia ||
w.chainID == vaa.ChainIDBase ||
w.chainID == vaa.ChainIDBaseSepolia ||
w.chainID == vaa.ChainIDBlast ||
w.chainID == vaa.ChainIDBSC ||
w.chainID == vaa.ChainIDEthereum ||
w.chainID == vaa.ChainIDHolesky ||
w.chainID == vaa.ChainIDKarura ||
w.chainID == vaa.ChainIDMantle ||
w.chainID == vaa.ChainIDMoonbeam ||
w.chainID == vaa.ChainIDOptimism ||
w.chainID == vaa.ChainIDSepolia ||
w.chainID == vaa.ChainIDHolesky ||
w.chainID == vaa.ChainIDArbitrumSepolia ||
w.chainID == vaa.ChainIDBaseSepolia ||
w.chainID == vaa.ChainIDOptimismSepolia ||
w.chainID == vaa.ChainIDSepolia ||
w.chainID == vaa.ChainIDXLayer {
finalized = true
safe = true
} else if w.chainID == vaa.ChainIDScroll {
// As of 11/10/2023 Scroll supports polling for finalized but not safe.
finalized = true
} else if w.chainID == vaa.ChainIDPolygon ||
w.chainID == vaa.ChainIDPolygonSepolia {
// The following chains have their own specialized finalizers.
} else if w.chainID == vaa.ChainIDCelo ||
w.chainID == vaa.ChainIDLinea {
return false, false, nil
// Polygon now supports polling for finalized but not safe.
// https://forum.polygon.technology/t/optimizing-decentralized-apps-ux-with-milestones-a-significantly-accelerated-finality-solution/13154
} else if w.chainID == vaa.ChainIDPolygon ||
w.chainID == vaa.ChainIDPolygonSepolia {
finalized = true
} else if w.chainID == vaa.ChainIDBerachain {
// Berachain supports instant finality: https://docs.berachain.com/faq/
// As of 11/10/2023 Scroll supports polling for finalized but not safe.
} else if w.chainID == vaa.ChainIDScroll {
finalized = true
// The following chains support instant finality.
} else if w.chainID == vaa.ChainIDAvalanche ||
w.chainID == vaa.ChainIDBerachain || // Berachain supports instant finality: https://docs.berachain.com/faq/
w.chainID == vaa.ChainIDOasis ||
w.chainID == vaa.ChainIDAurora ||
w.chainID == vaa.ChainIDFantom ||
w.chainID == vaa.ChainIDKlaytn {
return false, false, nil
// Anything else is undefined / not supported.
} else {
return false, false, fmt.Errorf("unsupported chain: %s", w.chainID.String())
}
// If finalized / safe should be supported, read the RPC to make sure they actually are.
@ -935,3 +973,22 @@ func (w *Watcher) waitForBlockTime(ctx context.Context, logger *zap.Logger, errC
func msgIdFromLogEvent(chainID vaa.ChainID, ev *ethabi.AbiLogMessagePublished) string {
return fmt.Sprintf("%v/%v/%v", uint16(chainID), PadAddress(ev.Sender), ev.Sequence)
}
// SetLineaParams is used to enable polling on Linea using the roll up contract on Ethereum.
func (w *Watcher) SetLineaParams(lineaRollUpUrl string, lineaRollUpContract string) error {
if w.chainID != vaa.ChainIDLinea {
return errors.New("function only allowed for Linea")
}
if w.unsafeDevMode && lineaRollUpUrl == "" && lineaRollUpContract == "" {
return nil
}
if lineaRollUpUrl == "" {
return fmt.Errorf("lineaRollUpUrl must be set")
}
if lineaRollUpContract == "" {
return fmt.Errorf("lineaRollUpContract must be set")
}
w.lineaRollUpUrl = lineaRollUpUrl
w.lineaRollUpContract = lineaRollUpContract
return nil
}