Refactor to generalize polling
Change-Id: Ie30056486ec86f6dceffed231ac227fa9c3499a7
This commit is contained in:
parent
bc7834448e
commit
cf4722a546
|
@ -70,8 +70,9 @@ var (
|
|||
bscRPC *string
|
||||
bscContract *string
|
||||
|
||||
polygonRPC *string
|
||||
polygonContract *string
|
||||
polygonRPC *string
|
||||
polygonContract *string
|
||||
polygonCheckpoint *string
|
||||
|
||||
ethRopstenRPC *string
|
||||
ethRopstenContract *string
|
||||
|
@ -169,6 +170,7 @@ func init() {
|
|||
|
||||
polygonRPC = NodeCmd.Flags().String("polygonRPC", "", "Polygon RPC URL")
|
||||
polygonContract = NodeCmd.Flags().String("polygonContract", "", "Polygon contract address")
|
||||
polygonCheckpoint = NodeCmd.Flags().String("polygonCheckpoint", "", "Polygon checkpoint query URL")
|
||||
|
||||
ethRopstenRPC = NodeCmd.Flags().String("ethRopstenRPC", "", "Ethereum Ropsten RPC URL")
|
||||
ethRopstenContract = NodeCmd.Flags().String("ethRopstenContract", "", "Ethereum Ropsten contract address")
|
||||
|
@ -790,69 +792,76 @@ func runNode(cmd *cobra.Command, args []string) {
|
|||
}
|
||||
|
||||
if err := supervisor.Run(ctx, "ethwatch",
|
||||
ethereum.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode).Run); err != nil {
|
||||
ethereum.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode, nil).Run); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := supervisor.Run(ctx, "bscwatch",
|
||||
ethereum.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode).Run); err != nil {
|
||||
ethereum.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode, nil).Run); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
polygonMinConfirmations := uint64(512)
|
||||
if *testnetMode {
|
||||
polygonMinConfirmations = 64
|
||||
{
|
||||
var extraParams []string
|
||||
var minConfirmations uint64 = 512
|
||||
if *polygonCheckpoint != "" {
|
||||
extraParams = []string{*polygonCheckpoint}
|
||||
minConfirmations = 1
|
||||
} else if *testnetMode {
|
||||
minConfirmations = 64
|
||||
}
|
||||
if err := supervisor.Run(ctx, "polygonwatch",
|
||||
ethereum.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, minConfirmations, chainObsvReqC[vaa.ChainIDPolygon],
|
||||
*unsafeDevMode, extraParams).Run); err != nil {
|
||||
// Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is
|
||||
//
|
||||
// If we are not querying the checkpoint server, we should hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect
|
||||
// developers from accidentally specifying an unsafe number of confirmations. We can remove this restriction as soon
|
||||
// as specific public guidance exists for Polygon developers.
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := supervisor.Run(ctx, "polygonwatch",
|
||||
ethereum.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode).Run); err != nil {
|
||||
// Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is
|
||||
//
|
||||
// Hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect
|
||||
// developers from accidentally specifying an unsafe number of confirmations. We can remove this restriction as soon
|
||||
// as specific public guidance exists for Polygon developers.
|
||||
return err
|
||||
}
|
||||
if err := supervisor.Run(ctx, "avalanchewatch",
|
||||
ethereum.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil {
|
||||
ethereum.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode, nil).Run); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := supervisor.Run(ctx, "oasiswatch",
|
||||
ethereum.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil {
|
||||
ethereum.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode, nil).Run); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := supervisor.Run(ctx, "aurorawatch",
|
||||
ethereum.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil {
|
||||
ethereum.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode, nil).Run); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := supervisor.Run(ctx, "fantomwatch",
|
||||
ethereum.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil {
|
||||
ethereum.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode, nil).Run); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := supervisor.Run(ctx, "karurawatch",
|
||||
ethereum.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil {
|
||||
ethereum.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode, nil).Run); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := supervisor.Run(ctx, "acalawatch",
|
||||
ethereum.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil {
|
||||
ethereum.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode, nil).Run); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := supervisor.Run(ctx, "klaytnwatch",
|
||||
ethereum.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil {
|
||||
ethereum.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode, nil).Run); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := supervisor.Run(ctx, "celowatch",
|
||||
ethereum.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil {
|
||||
ethereum.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode, nil).Run); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if *testnetMode {
|
||||
if err := supervisor.Run(ctx, "ethropstenwatch",
|
||||
ethereum.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode).Run); err != nil {
|
||||
ethereum.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode, nil).Run); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := supervisor.Run(ctx, "moonbeamwatch",
|
||||
ethereum.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil {
|
||||
ethereum.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode, nil).Run); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
// This implements the finality check for Moonbeam.
|
||||
//
|
||||
// Moonbeam can publish blocks before they are marked final. This means we need to sit on the block until a special "is finalized"
|
||||
// query returns true. The assumption is that every block number will eventually be published and finalized, it's just that the contents
|
||||
// of the block (and therefore the hash) might change if there is a rollback.
|
||||
|
||||
package ethereum
|
||||
|
||||
import (
|
||||
"context"
|
||||
common "github.com/certusone/wormhole/node/pkg/common"
|
||||
ethRpc "github.com/ethereum/go-ethereum/rpc"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
|
||||
type MoonbeamFinalizer struct {
|
||||
logger *zap.Logger
|
||||
networkName string
|
||||
client *ethRpc.Client
|
||||
}
|
||||
|
||||
func (f *MoonbeamFinalizer) SetLogger(l *zap.Logger, netName string) {
|
||||
f.logger = l
|
||||
f.networkName = netName
|
||||
f.logger.Info("using Moonbeam specific finality check", zap.String("eth_network", f.networkName))
|
||||
}
|
||||
|
||||
func (f *MoonbeamFinalizer) DialContext(ctx context.Context, rawurl string) (err error) {
|
||||
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
|
||||
defer cancel()
|
||||
f.client, err = ethRpc.DialContext(timeout, rawurl)
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *MoonbeamFinalizer) IsBlockFinalized(ctx context.Context, block *common.NewBlock) (bool, error) {
|
||||
var finalized bool
|
||||
err := f.client.CallContext(ctx, &finalized, "moon_isBlockFinalized", block.Hash.Hex())
|
||||
if err != nil {
|
||||
f.logger.Error("failed to check for finality", zap.String("eth_network", f.networkName), zap.Error(err))
|
||||
return false, err
|
||||
}
|
||||
|
||||
return finalized, nil
|
||||
}
|
|
@ -1,4 +1,6 @@
|
|||
// This implements the interface to the standard go-ethereum library.
|
||||
// This implements polling for the next available block.
|
||||
|
||||
// It can optionally call a chain specific function to verify that the block is finalized.
|
||||
|
||||
package ethereum
|
||||
|
||||
|
@ -23,18 +25,29 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type MoonbeamImpl struct {
|
||||
BaseEth *EthImpl
|
||||
logger *zap.Logger
|
||||
type PollFinalizer interface {
|
||||
SetLogger(l *zap.Logger, netName string)
|
||||
DialContext(ctx context.Context, rawurl string) error
|
||||
IsBlockFinalized(ctx context.Context, block *common.NewBlock) (bool, error)
|
||||
}
|
||||
|
||||
type PollImpl struct {
|
||||
BaseEth EthImpl
|
||||
Finalizer PollFinalizer
|
||||
DelayInMs int
|
||||
logger *zap.Logger
|
||||
rawClient *ethRpc.Client
|
||||
}
|
||||
|
||||
func (e *MoonbeamImpl) SetLogger(l *zap.Logger) {
|
||||
func (e *PollImpl) SetLogger(l *zap.Logger) {
|
||||
e.logger = l
|
||||
e.logger.Info("using Moonbeam specific implementation", zap.String("eth_network", e.BaseEth.NetworkName))
|
||||
e.logger.Info("using polling to check for new blocks", zap.String("eth_network", e.BaseEth.NetworkName), zap.Int("delay_in_ms", e.DelayInMs))
|
||||
if e.Finalizer != nil {
|
||||
e.Finalizer.SetLogger(l, e.BaseEth.NetworkName)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *MoonbeamImpl) DialContext(ctx context.Context, rawurl string) (err error) {
|
||||
func (e *PollImpl) DialContext(ctx context.Context, rawurl string) (err error) {
|
||||
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
|
||||
defer cancel()
|
||||
|
||||
|
@ -44,59 +57,66 @@ func (e *MoonbeamImpl) DialContext(ctx context.Context, rawurl string) (err erro
|
|||
return err
|
||||
}
|
||||
|
||||
if e.Finalizer != nil {
|
||||
err = e.Finalizer.DialContext(ctx, rawurl)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// This is used for doing all other go-ethereum calls.
|
||||
return e.BaseEth.DialContext(ctx, rawurl)
|
||||
}
|
||||
|
||||
func (e *MoonbeamImpl) NewAbiFilterer(address ethCommon.Address) (err error) {
|
||||
func (e *PollImpl) NewAbiFilterer(address ethCommon.Address) (err error) {
|
||||
return e.BaseEth.NewAbiFilterer(address)
|
||||
}
|
||||
|
||||
func (e *MoonbeamImpl) NewAbiCaller(address ethCommon.Address) (err error) {
|
||||
func (e *PollImpl) NewAbiCaller(address ethCommon.Address) (err error) {
|
||||
return e.BaseEth.NewAbiCaller(address)
|
||||
}
|
||||
|
||||
func (e *MoonbeamImpl) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) {
|
||||
func (e *PollImpl) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) {
|
||||
return e.BaseEth.GetCurrentGuardianSetIndex(ctx)
|
||||
}
|
||||
|
||||
func (e *MoonbeamImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) {
|
||||
func (e *PollImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) {
|
||||
return e.BaseEth.GetGuardianSet(ctx, index)
|
||||
}
|
||||
|
||||
func (e *MoonbeamImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) {
|
||||
func (e *PollImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) {
|
||||
return e.BaseEth.WatchLogMessagePublished(ctx, timeout, sink)
|
||||
}
|
||||
|
||||
func (e *MoonbeamImpl) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) {
|
||||
func (e *PollImpl) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) {
|
||||
return e.BaseEth.TransactionReceipt(ctx, txHash)
|
||||
}
|
||||
|
||||
func (e *MoonbeamImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) {
|
||||
func (e *PollImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) {
|
||||
return e.BaseEth.TimeOfBlockByHash(ctx, hash)
|
||||
}
|
||||
|
||||
func (e *MoonbeamImpl) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) {
|
||||
func (e *PollImpl) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) {
|
||||
return e.BaseEth.ParseLogMessagePublished(log)
|
||||
}
|
||||
|
||||
type MoonbeamSubscription struct {
|
||||
type PollSubscription struct {
|
||||
errOnce sync.Once
|
||||
err chan error
|
||||
quit chan error
|
||||
unsubDone chan struct{}
|
||||
}
|
||||
|
||||
func (sub *MoonbeamSubscription) Err() <-chan error {
|
||||
var ErrUnsubscribed = errors.New("unsubscribed")
|
||||
|
||||
func (sub *PollSubscription) Err() <-chan error {
|
||||
return sub.err
|
||||
}
|
||||
|
||||
var errUnsubscribed = errors.New("unsubscribed")
|
||||
|
||||
func (sub *MoonbeamSubscription) Unsubscribe() {
|
||||
func (sub *PollSubscription) Unsubscribe() {
|
||||
sub.errOnce.Do(func() {
|
||||
select {
|
||||
case sub.quit <- errUnsubscribed:
|
||||
case sub.quit <- ErrUnsubscribed:
|
||||
<-sub.unsubDone
|
||||
case <-sub.unsubDone:
|
||||
}
|
||||
|
@ -104,12 +124,7 @@ func (sub *MoonbeamSubscription) Unsubscribe() {
|
|||
})
|
||||
}
|
||||
|
||||
// Moonbeam can publish blocks before they are marked final. This means we need to sit on the block until a special "is finalized"
|
||||
// query returns true. The assumption is that every block number will eventually be published and finalized, it's just that the contents
|
||||
// of the block (and therefore the hash) might change if there is a rollback. Therefore rather than subscribing for headers from geth,
|
||||
// we use a polling mechanism to get the next expected block, and keep doing it until it is marked final.
|
||||
|
||||
func (e *MoonbeamImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.NewBlock) (ethereum.Subscription, error) {
|
||||
func (e *PollImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.NewBlock) (ethereum.Subscription, error) {
|
||||
if e.BaseEth.client == nil {
|
||||
panic("client is not initialized!")
|
||||
}
|
||||
|
@ -117,7 +132,7 @@ func (e *MoonbeamImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *comm
|
|||
panic("rawClient is not initialized!")
|
||||
}
|
||||
|
||||
sub := &MoonbeamSubscription{
|
||||
sub := &PollSubscription{
|
||||
err: make(chan error, 1),
|
||||
}
|
||||
|
||||
|
@ -127,7 +142,6 @@ func (e *MoonbeamImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *comm
|
|||
}
|
||||
currentBlockNumber := *latestBlock.Number
|
||||
|
||||
const DELAY_IN_MS = 250
|
||||
var BIG_ONE = big.NewInt(1)
|
||||
|
||||
timer := time.NewTimer(time.Millisecond) // Start immediately.
|
||||
|
@ -175,12 +189,18 @@ func (e *MoonbeamImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *comm
|
|||
}
|
||||
}
|
||||
|
||||
finalized, err := e.isBlockFinalized(ctx, block.Hash.Hex())
|
||||
if err != nil {
|
||||
errorOccurred = true
|
||||
e.logger.Error("failed to see if block is finalized", zap.String("eth_network", e.BaseEth.NetworkName),
|
||||
zap.Uint64("block", currentBlockNumber.Uint64()), zap.Error(err))
|
||||
break
|
||||
var finalized bool
|
||||
if e.Finalizer != nil {
|
||||
var err error
|
||||
finalized, err = e.Finalizer.IsBlockFinalized(ctx, block)
|
||||
if err != nil {
|
||||
errorOccurred = true
|
||||
e.logger.Error("failed to see if block is finalized", zap.String("eth_network", e.BaseEth.NetworkName),
|
||||
zap.Uint64("block", currentBlockNumber.Uint64()), zap.Error(err))
|
||||
break
|
||||
}
|
||||
} else {
|
||||
finalized = true
|
||||
}
|
||||
if !finalized {
|
||||
break
|
||||
|
@ -199,7 +219,7 @@ func (e *MoonbeamImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *comm
|
|||
errorCount = 0
|
||||
}
|
||||
|
||||
timer = time.NewTimer(DELAY_IN_MS * time.Millisecond)
|
||||
timer = time.NewTimer(time.Duration(e.DelayInMs) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -207,7 +227,7 @@ func (e *MoonbeamImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *comm
|
|||
return sub, err
|
||||
}
|
||||
|
||||
func (e *MoonbeamImpl) getBlock(ctx context.Context, number *big.Int) (*common.NewBlock, error) {
|
||||
func (e *PollImpl) getBlock(ctx context.Context, number *big.Int) (*common.NewBlock, error) {
|
||||
var numStr string
|
||||
if number != nil {
|
||||
numStr = ethHexUtils.EncodeBig(number)
|
||||
|
@ -217,7 +237,7 @@ func (e *MoonbeamImpl) getBlock(ctx context.Context, number *big.Int) (*common.N
|
|||
|
||||
type Marshaller struct {
|
||||
Number *ethHexUtils.Big
|
||||
Hash ethCommon.Hash `json:"hash"`
|
||||
Hash ethCommon.Hash `json:"hash"`
|
||||
}
|
||||
|
||||
var m Marshaller
|
||||
|
@ -234,14 +254,3 @@ func (e *MoonbeamImpl) getBlock(ctx context.Context, number *big.Int) (*common.N
|
|||
Hash: m.Hash,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *MoonbeamImpl) isBlockFinalized(ctx context.Context, hash string) (bool, error) {
|
||||
var finalized bool
|
||||
err := e.rawClient.CallContext(ctx, &finalized, "moon_isBlockFinalized", hash)
|
||||
if err != nil {
|
||||
e.logger.Error("failed to check for finality", zap.String("eth_network", e.BaseEth.NetworkName), zap.Error(err))
|
||||
return false, err
|
||||
}
|
||||
|
||||
return finalized, nil
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
// This implements the finality check for Polygon.
|
||||
//
|
||||
// It can take up to 512 blocks for polygon blocks to be finalized. Rather than wait that long, we will query the checkpoint to see if they are finalized sooner.
|
||||
//
|
||||
// TestNet query URL: "https://apis.matic.network/api/v1/mumbai/block-included/"
|
||||
// MainNet query URL: "https://apis.matic.network/api/v1/matic/block-included/block-number/"
|
||||
|
||||
package ethereum
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
common "github.com/certusone/wormhole/node/pkg/common"
|
||||
"go.uber.org/zap"
|
||||
"io/ioutil"
|
||||
"math/big"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type PolygonFinalizer struct {
|
||||
Url string
|
||||
logger *zap.Logger
|
||||
networkName string
|
||||
highestCheckpoint big.Int
|
||||
}
|
||||
|
||||
func UsePolygonFinalizer(extraParams []string) bool {
|
||||
return len(extraParams) != 0 && extraParams[0] != ""
|
||||
}
|
||||
|
||||
func (f *PolygonFinalizer) SetLogger(l *zap.Logger, netName string) {
|
||||
f.logger = l
|
||||
f.networkName = netName
|
||||
f.logger.Info("using Polygon specific finality check", zap.String("eth_network", f.networkName), zap.String("query_url", f.Url))
|
||||
}
|
||||
|
||||
func (f *PolygonFinalizer) DialContext(ctx context.Context, _rawurl string) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *PolygonFinalizer) IsBlockFinalized(ctx context.Context, block *common.NewBlock) (bool, error) {
|
||||
if block.Number.Cmp(&f.highestCheckpoint) <= 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
url := f.Url + block.Number.String()
|
||||
response, err := http.Get(url)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
responseData, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
var result map[string]string
|
||||
json.Unmarshal([]byte(responseData), &result)
|
||||
|
||||
status := result["message"]
|
||||
if status == "" || status == "No block found" {
|
||||
f.logger.Info("DEBUG: not finalized", zap.String("eth_network", f.networkName), zap.String("requested_block", block.Number.String()))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if status != "success" {
|
||||
f.logger.Error("unexpected checkpoint status", zap.String("eth_network", f.networkName),
|
||||
zap.String("requested_block", block.Number.String()), zap.String("status", status))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// If we get this far, we know this block is finalized, so we will return true even in the error cases.
|
||||
|
||||
endStr := result["end"]
|
||||
if endStr == "" {
|
||||
f.logger.Error("checkpoint reply is missing end", zap.String("eth_network", f.networkName), zap.String("requested_block", block.Number.String()))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
end, ok := new(big.Int).SetString(endStr, 10)
|
||||
if !ok {
|
||||
f.logger.Error("checkpoint reply contains unexpected end", zap.String("eth_network", f.networkName),
|
||||
zap.String("requested_block", block.Number.String()), zap.String("end_str", endStr))
|
||||
return true, nil
|
||||
}
|
||||
|
||||
f.highestCheckpoint = *end
|
||||
|
||||
f.logger.Info("checkpoint query returned", zap.String("eth_network", f.networkName),
|
||||
zap.String("requested_block", block.Number.String()), zap.String("reply", string(responseData)))
|
||||
|
||||
return (block.Number.Cmp(&f.highestCheckpoint) <= 0), nil
|
||||
}
|
|
@ -126,15 +126,18 @@ func NewEthWatcher(
|
|||
setEvents chan *common.GuardianSet,
|
||||
minConfirmations uint64,
|
||||
obsvReqC chan *gossipv1.ObservationRequest,
|
||||
unsafeDevMode bool) *Watcher {
|
||||
unsafeDevMode bool,
|
||||
extraParams []string) *Watcher {
|
||||
|
||||
var ethIntf common.Ethish
|
||||
if chainID == vaa.ChainIDCelo && !unsafeDevMode {
|
||||
// When we are running in mainnet or testnet, we need to use the Celo ethereum library rather than go-ethereum.
|
||||
// However, in devnet, we currently run the standard ETH node for Celo, so we need to use the standard go-ethereum.
|
||||
ethIntf = &celo.CeloImpl{NetworkName: networkName}
|
||||
} else if chainID == vaa.ChainIDMoonbeam {
|
||||
ethIntf = &MoonbeamImpl{BaseEth: &EthImpl{NetworkName: networkName}}
|
||||
} else if chainID == vaa.ChainIDMoonbeam && !unsafeDevMode {
|
||||
ethIntf = &PollImpl{BaseEth: EthImpl{NetworkName: networkName}, Finalizer: &MoonbeamFinalizer{}, DelayInMs: 250}
|
||||
} else if chainID == vaa.ChainIDPolygon && UsePolygonFinalizer(extraParams) {
|
||||
ethIntf = &PollImpl{BaseEth: EthImpl{NetworkName: networkName}, Finalizer: &PolygonFinalizer{Url: extraParams[0]}, DelayInMs: 5000}
|
||||
} else {
|
||||
ethIntf = &EthImpl{NetworkName: networkName}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue