Node: Polygon watcher redesign (#1858)
This commit is contained in:
parent
4e70aa0f16
commit
ce1ca0b155
|
@ -75,8 +75,10 @@ var (
|
|||
bscRPC *string
|
||||
bscContract *string
|
||||
|
||||
polygonRPC *string
|
||||
polygonContract *string
|
||||
polygonRPC *string
|
||||
polygonContract *string
|
||||
polygonRootChainRpc *string
|
||||
polygonRootChainContractAddress *string
|
||||
|
||||
auroraRPC *string
|
||||
auroraContract *string
|
||||
|
@ -206,6 +208,8 @@ func init() {
|
|||
|
||||
polygonRPC = NodeCmd.Flags().String("polygonRPC", "", "Polygon RPC URL")
|
||||
polygonContract = NodeCmd.Flags().String("polygonContract", "", "Polygon contract address")
|
||||
polygonRootChainRpc = NodeCmd.Flags().String("polygonRootChainRpc", "", "Polygon root chain RPC")
|
||||
polygonRootChainContractAddress = NodeCmd.Flags().String("polygonRootChainContractAddress", "", "Polygon root chain contract address")
|
||||
|
||||
avalancheRPC = NodeCmd.Flags().String("avalancheRPC", "", "Avalanche RPC URL")
|
||||
avalancheContract = NodeCmd.Flags().String("avalancheContract", "", "Avalanche contract address")
|
||||
|
@ -931,20 +935,24 @@ func runNode(cmd *cobra.Command, args []string) {
|
|||
}
|
||||
|
||||
if shouldStart(polygonRPC) {
|
||||
polygonMinConfirmations := uint64(512)
|
||||
if *testnetMode {
|
||||
var polygonMinConfirmations uint64 = 512
|
||||
if *polygonRootChainRpc != "" {
|
||||
// If we are using checkpointing, we don't need to wait for additional confirmations.
|
||||
polygonMinConfirmations = 1
|
||||
} else if *testnetMode {
|
||||
// Testnet users don't want to have to wait too long.
|
||||
polygonMinConfirmations = 64
|
||||
} else if !*unsafeDevMode {
|
||||
log.Fatal("Polygon checkpointing is required in mainnet")
|
||||
}
|
||||
logger.Info("Starting Polygon watcher")
|
||||
readiness.RegisterComponent(common.ReadinessPolygonSyncing)
|
||||
chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
|
||||
if err := supervisor.Run(ctx, "polygonwatch",
|
||||
evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode, nil).Run); err != nil {
|
||||
// Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is
|
||||
//
|
||||
// Hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect
|
||||
// developers from accidentally specifying an unsafe number of confirmations. We can remove this restriction as soon
|
||||
// as specific public guidance exists for Polygon developers.
|
||||
polygonWatcher := evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode, nil)
|
||||
if err := polygonWatcher.SetRootChainParams(*polygonRootChainRpc, *polygonRootChainContractAddress); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := supervisor.Run(ctx, "polygonwatch", polygonWatcher.Run); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -165,10 +165,15 @@ func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, sink chan<-
|
|||
}
|
||||
|
||||
func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, number *big.Int) (*NewBlock, error) {
|
||||
return getBlock(ctx, logger, b.Connector, number, b.useFinalized)
|
||||
}
|
||||
|
||||
// getBlock is a free function that can be called from other connectors to get a single block.
|
||||
func getBlock(ctx context.Context, logger *zap.Logger, conn Connector, number *big.Int, useFinalized bool) (*NewBlock, error) {
|
||||
var numStr string
|
||||
if number != nil {
|
||||
numStr = ethHexUtils.EncodeBig(number)
|
||||
} else if b.useFinalized {
|
||||
} else if useFinalized {
|
||||
numStr = "finalized"
|
||||
} else {
|
||||
numStr = "latest"
|
||||
|
@ -184,7 +189,7 @@ func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, n
|
|||
}
|
||||
|
||||
var m Marshaller
|
||||
err := b.Connector.RawCallContext(ctx, &m, "eth_getBlockByNumber", numStr, false)
|
||||
err := conn.RawCallContext(ctx, &m, "eth_getBlockByNumber", numStr, false)
|
||||
if err != nil {
|
||||
logger.Error("failed to get block",
|
||||
zap.String("requested_block", numStr), zap.Error(err))
|
||||
|
|
|
@ -0,0 +1,157 @@
|
|||
// On Polygon, a block is considered finalized when it is checkpointed on Ethereum.
|
||||
// This requires listening to the RootChain contract on Ethereum.
|
||||
//
|
||||
// For a discussion on Polygon finality, see here:
|
||||
// https://wiki.polygon.technology/docs/pos/heimdall/modules/checkpoint
|
||||
//
|
||||
// The RootChain proxy contract on Ethereum is available at the following addresses:
|
||||
// Mainnet: 0x86E4Dc95c7FBdBf52e33D563BbDB00823894C287
|
||||
// Testnet: 0x2890ba17efe978480615e330ecb65333b880928e
|
||||
//
|
||||
// The code for the RootChain contract is available here:
|
||||
// https://github.com/maticnetwork/contracts/tree/main/contracts
|
||||
//
|
||||
// To generate the golang abi for the root chain contract:
|
||||
// - Grab the ABIs from the Root Chain contract (not the proxy) (0x17aD93683697CE557Ef7774660394456A7412B00 on Ethereum mainnet) and put it in /tmp/RootChain.abi.
|
||||
// - mkdir node/pkg/watchers/evm/connectors/polygonabi
|
||||
// - third_party/abigen/abigen --abi /tmp/RootChain.abi --pkg polygonabi --out node/pkg/watchers/evm/connectors/polygonabi/RootChain.go
|
||||
|
||||
package connectors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
"github.com/certusone/wormhole/node/pkg/supervisor"
|
||||
rootAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/polygonabi"
|
||||
|
||||
ethereum "github.com/ethereum/go-ethereum"
|
||||
ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind"
|
||||
ethCommon "github.com/ethereum/go-ethereum/common"
|
||||
ethClient "github.com/ethereum/go-ethereum/ethclient"
|
||||
ethRpc "github.com/ethereum/go-ethereum/rpc"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type PolygonConnector struct {
|
||||
Connector
|
||||
logger *zap.Logger
|
||||
|
||||
// These are used for querying the root chain contract.
|
||||
rootRawClient *ethRpc.Client
|
||||
rootClient *ethClient.Client
|
||||
|
||||
// These are used to subscribe for new checkpoint events from the root chain contract.
|
||||
rootFilterer *rootAbi.AbiRootChainFilterer
|
||||
rootCaller *rootAbi.AbiRootChainCaller
|
||||
}
|
||||
|
||||
func NewPolygonConnector(
|
||||
ctx context.Context,
|
||||
baseConnector Connector,
|
||||
rootChainUrl string,
|
||||
rootChainAddress string,
|
||||
) (*PolygonConnector, error) {
|
||||
|
||||
rootRawClient, err := ethRpc.DialContext(ctx, rootChainUrl)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create root chain raw client for url %s: %w", rootChainUrl, err)
|
||||
}
|
||||
|
||||
rootClient := ethClient.NewClient(rootRawClient)
|
||||
|
||||
addr := ethCommon.HexToAddress(rootChainAddress)
|
||||
rootFilterer, err := rootAbi.NewAbiRootChainFilterer(addr, rootClient)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create root chain filter for url %s: %w", rootChainUrl, err)
|
||||
}
|
||||
|
||||
rootCaller, err := rootAbi.NewAbiRootChainCaller(addr, rootClient)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create root chain caller for url %s: %w", rootChainUrl, err)
|
||||
}
|
||||
|
||||
logger := supervisor.Logger(ctx).With(zap.String("eth_network", baseConnector.NetworkName()))
|
||||
logger.Info("Using checkpointing for Polygon", zap.String("rootChainUrl", rootChainUrl), zap.String("rootChainAddress", rootChainAddress))
|
||||
|
||||
connector := &PolygonConnector{
|
||||
Connector: baseConnector,
|
||||
logger: logger,
|
||||
rootRawClient: rootRawClient,
|
||||
rootClient: rootClient,
|
||||
rootFilterer: rootFilterer,
|
||||
rootCaller: rootCaller,
|
||||
}
|
||||
|
||||
return connector, nil
|
||||
}
|
||||
|
||||
func (c *PolygonConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) {
|
||||
sub := NewPollSubscription()
|
||||
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Subscribe to new checkpoint events from the root chain contract.
|
||||
messageC := make(chan *rootAbi.AbiRootChainNewHeaderBlock, 2)
|
||||
messageSub, err := c.rootFilterer.WatchNewHeaderBlock(ðBind.WatchOpts{Context: timeout}, messageC, nil, nil, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create new checkpoint watcher: %w", err)
|
||||
}
|
||||
|
||||
// Get and publish the current latest block.
|
||||
opts := ðBind.CallOpts{Context: ctx}
|
||||
initialBlock, err := c.rootCaller.GetLastChildBlock(opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get initial block: %w", err)
|
||||
}
|
||||
|
||||
if err = c.postBlock(ctx, initialBlock, sink); err != nil {
|
||||
return nil, fmt.Errorf("failed to post initial block: %w", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case err := <-messageSub.Err():
|
||||
sub.err <- err
|
||||
case checkpoint := <-messageC:
|
||||
if err := c.processCheckpoint(ctx, sink, checkpoint); err != nil {
|
||||
sub.err <- fmt.Errorf("failed to process checkpoint: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
var bigOne = big.NewInt(1)
|
||||
|
||||
func (c *PolygonConnector) processCheckpoint(ctx context.Context, sink chan<- *NewBlock, checkpoint *rootAbi.AbiRootChainNewHeaderBlock) error {
|
||||
for blockNum := checkpoint.Start; blockNum.Cmp(checkpoint.End) <= 0; blockNum.Add(blockNum, bigOne) {
|
||||
if err := c.postBlock(ctx, blockNum, sink); err != nil {
|
||||
return fmt.Errorf("failed to post block %s: %w", blockNum.String(), err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *PolygonConnector) postBlock(ctx context.Context, blockNum *big.Int, sink chan<- *NewBlock) error {
|
||||
if blockNum == nil {
|
||||
return fmt.Errorf("blockNum is nil")
|
||||
}
|
||||
|
||||
block, err := getBlock(ctx, c.logger, c.Connector, blockNum, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get block %s: %w", blockNum.String(), err)
|
||||
}
|
||||
|
||||
sink <- block
|
||||
return nil
|
||||
}
|
File diff suppressed because one or more lines are too long
|
@ -109,6 +109,10 @@ type (
|
|||
|
||||
latestFinalizedBlockNumber uint64
|
||||
l1Finalizer interfaces.L1Finalizer
|
||||
|
||||
// These parameters are currently only used for Polygon and should be set via SetRootChainParams()
|
||||
rootChainRpc string
|
||||
rootChainContract string
|
||||
}
|
||||
|
||||
pendingKey struct {
|
||||
|
@ -274,6 +278,23 @@ func (w *Watcher) Run(ctx context.Context) error {
|
|||
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
||||
return fmt.Errorf("creating block poll connector failed: %w", err)
|
||||
}
|
||||
} else if w.chainID == vaa.ChainIDPolygon && w.usePolygonCheckpointing() {
|
||||
baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
|
||||
if err != nil {
|
||||
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
||||
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
||||
return fmt.Errorf("failed to connect to polygon: %w", err)
|
||||
}
|
||||
w.ethConn, err = connectors.NewPolygonConnector(ctx,
|
||||
baseConnector,
|
||||
w.rootChainRpc,
|
||||
w.rootChainContract,
|
||||
)
|
||||
if err != nil {
|
||||
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
|
||||
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
|
||||
return fmt.Errorf("failed to create polygon connector: %w", err)
|
||||
}
|
||||
} else {
|
||||
w.ethConn, err = connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
|
||||
if err != nil {
|
||||
|
@ -776,3 +797,19 @@ func (w *Watcher) getAcalaMode(ctx context.Context) (useFinalizedBlocks bool, er
|
|||
func (w *Watcher) GetLatestFinalizedBlockNumber() uint64 {
|
||||
return atomic.LoadUint64(&w.latestFinalizedBlockNumber)
|
||||
}
|
||||
|
||||
// SetRootChainParams is used to enabled checkpointing (currently only for Polygon). It handles
|
||||
// if the feature is either enabled or disabled, but ensures the configuration is valid.
|
||||
func (w *Watcher) SetRootChainParams(rootChainRpc string, rootChainContract string) error {
|
||||
if (rootChainRpc == "") != (rootChainContract == "") {
|
||||
return fmt.Errorf("if either rootChainRpc or rootChainContract are set, they must both be set")
|
||||
}
|
||||
|
||||
w.rootChainRpc = rootChainRpc
|
||||
w.rootChainContract = rootChainContract
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Watcher) usePolygonCheckpointing() bool {
|
||||
return w.rootChainRpc != "" && w.rootChainContract != ""
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue