From cf4722a5468fe58d00894d9aa32a1efef55411cb Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Thu, 26 May 2022 06:53:47 +0000 Subject: [PATCH] Refactor to generalize polling Change-Id: Ie30056486ec86f6dceffed231ac227fa9c3499a7 --- node/cmd/guardiand/node.go | 61 +++++----- node/pkg/ethereum/moonbeamfin.go | 45 ++++++++ .../ethereum/{moonbeamimpl.go => pollimpl.go} | 107 ++++++++++-------- node/pkg/ethereum/polygonfin.go | 93 +++++++++++++++ node/pkg/ethereum/watcher.go | 9 +- 5 files changed, 237 insertions(+), 78 deletions(-) create mode 100644 node/pkg/ethereum/moonbeamfin.go rename node/pkg/ethereum/{moonbeamimpl.go => pollimpl.go} (58%) create mode 100644 node/pkg/ethereum/polygonfin.go diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index be59ddc8a..628f01e8c 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -70,8 +70,9 @@ var ( bscRPC *string bscContract *string - polygonRPC *string - polygonContract *string + polygonRPC *string + polygonContract *string + polygonCheckpoint *string ethRopstenRPC *string ethRopstenContract *string @@ -169,6 +170,7 @@ func init() { polygonRPC = NodeCmd.Flags().String("polygonRPC", "", "Polygon RPC URL") polygonContract = NodeCmd.Flags().String("polygonContract", "", "Polygon contract address") + polygonCheckpoint = NodeCmd.Flags().String("polygonCheckpoint", "", "Polygon checkpoint query URL") ethRopstenRPC = NodeCmd.Flags().String("ethRopstenRPC", "", "Ethereum Ropsten RPC URL") ethRopstenContract = NodeCmd.Flags().String("ethRopstenContract", "", "Ethereum Ropsten contract address") @@ -790,69 +792,76 @@ func runNode(cmd *cobra.Command, args []string) { } if err := supervisor.Run(ctx, "ethwatch", - ethereum.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode).Run); err != nil { + ethereum.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode, nil).Run); err != nil { return err } if err := supervisor.Run(ctx, "bscwatch", - ethereum.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode).Run); err != nil { + ethereum.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode, nil).Run); err != nil { return err } - polygonMinConfirmations := uint64(512) - if *testnetMode { - polygonMinConfirmations = 64 + { + var extraParams []string + var minConfirmations uint64 = 512 + if *polygonCheckpoint != "" { + extraParams = []string{*polygonCheckpoint} + minConfirmations = 1 + } else if *testnetMode { + minConfirmations = 64 + } + if err := supervisor.Run(ctx, "polygonwatch", + ethereum.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, minConfirmations, chainObsvReqC[vaa.ChainIDPolygon], + *unsafeDevMode, extraParams).Run); err != nil { + // Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is + // + // If we are not querying the checkpoint server, we should hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect + // developers from accidentally specifying an unsafe number of confirmations. We can remove this restriction as soon + // as specific public guidance exists for Polygon developers. + return err + } } - if err := supervisor.Run(ctx, "polygonwatch", - ethereum.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode).Run); err != nil { - // Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is - // - // Hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect - // developers from accidentally specifying an unsafe number of confirmations. We can remove this restriction as soon - // as specific public guidance exists for Polygon developers. - return err - } if err := supervisor.Run(ctx, "avalanchewatch", - ethereum.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil { + ethereum.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode, nil).Run); err != nil { return err } if err := supervisor.Run(ctx, "oasiswatch", - ethereum.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil { + ethereum.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode, nil).Run); err != nil { return err } if err := supervisor.Run(ctx, "aurorawatch", - ethereum.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil { + ethereum.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode, nil).Run); err != nil { return err } if err := supervisor.Run(ctx, "fantomwatch", - ethereum.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil { + ethereum.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode, nil).Run); err != nil { return err } if err := supervisor.Run(ctx, "karurawatch", - ethereum.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil { + ethereum.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode, nil).Run); err != nil { return err } if err := supervisor.Run(ctx, "acalawatch", - ethereum.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil { + ethereum.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode, nil).Run); err != nil { return err } if err := supervisor.Run(ctx, "klaytnwatch", - ethereum.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil { + ethereum.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode, nil).Run); err != nil { return err } if err := supervisor.Run(ctx, "celowatch", - ethereum.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil { + ethereum.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode, nil).Run); err != nil { return err } if *testnetMode { if err := supervisor.Run(ctx, "ethropstenwatch", - ethereum.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode).Run); err != nil { + ethereum.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode, nil).Run); err != nil { return err } if err := supervisor.Run(ctx, "moonbeamwatch", - ethereum.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil { + ethereum.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode, nil).Run); err != nil { return err } } diff --git a/node/pkg/ethereum/moonbeamfin.go b/node/pkg/ethereum/moonbeamfin.go new file mode 100644 index 000000000..2306e3791 --- /dev/null +++ b/node/pkg/ethereum/moonbeamfin.go @@ -0,0 +1,45 @@ +// This implements the finality check for Moonbeam. +// +// Moonbeam can publish blocks before they are marked final. This means we need to sit on the block until a special "is finalized" +// query returns true. The assumption is that every block number will eventually be published and finalized, it's just that the contents +// of the block (and therefore the hash) might change if there is a rollback. + +package ethereum + +import ( + "context" + common "github.com/certusone/wormhole/node/pkg/common" + ethRpc "github.com/ethereum/go-ethereum/rpc" + "go.uber.org/zap" + "time" +) + +type MoonbeamFinalizer struct { + logger *zap.Logger + networkName string + client *ethRpc.Client +} + +func (f *MoonbeamFinalizer) SetLogger(l *zap.Logger, netName string) { + f.logger = l + f.networkName = netName + f.logger.Info("using Moonbeam specific finality check", zap.String("eth_network", f.networkName)) +} + +func (f *MoonbeamFinalizer) DialContext(ctx context.Context, rawurl string) (err error) { + timeout, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + f.client, err = ethRpc.DialContext(timeout, rawurl) + return err +} + +func (f *MoonbeamFinalizer) IsBlockFinalized(ctx context.Context, block *common.NewBlock) (bool, error) { + var finalized bool + err := f.client.CallContext(ctx, &finalized, "moon_isBlockFinalized", block.Hash.Hex()) + if err != nil { + f.logger.Error("failed to check for finality", zap.String("eth_network", f.networkName), zap.Error(err)) + return false, err + } + + return finalized, nil +} diff --git a/node/pkg/ethereum/moonbeamimpl.go b/node/pkg/ethereum/pollimpl.go similarity index 58% rename from node/pkg/ethereum/moonbeamimpl.go rename to node/pkg/ethereum/pollimpl.go index 8c5ce8d61..a7432400d 100644 --- a/node/pkg/ethereum/moonbeamimpl.go +++ b/node/pkg/ethereum/pollimpl.go @@ -1,4 +1,6 @@ -// This implements the interface to the standard go-ethereum library. +// This implements polling for the next available block. + +// It can optionally call a chain specific function to verify that the block is finalized. package ethereum @@ -23,18 +25,29 @@ import ( "go.uber.org/zap" ) -type MoonbeamImpl struct { - BaseEth *EthImpl - logger *zap.Logger +type PollFinalizer interface { + SetLogger(l *zap.Logger, netName string) + DialContext(ctx context.Context, rawurl string) error + IsBlockFinalized(ctx context.Context, block *common.NewBlock) (bool, error) +} + +type PollImpl struct { + BaseEth EthImpl + Finalizer PollFinalizer + DelayInMs int + logger *zap.Logger rawClient *ethRpc.Client } -func (e *MoonbeamImpl) SetLogger(l *zap.Logger) { +func (e *PollImpl) SetLogger(l *zap.Logger) { e.logger = l - e.logger.Info("using Moonbeam specific implementation", zap.String("eth_network", e.BaseEth.NetworkName)) + e.logger.Info("using polling to check for new blocks", zap.String("eth_network", e.BaseEth.NetworkName), zap.Int("delay_in_ms", e.DelayInMs)) + if e.Finalizer != nil { + e.Finalizer.SetLogger(l, e.BaseEth.NetworkName) + } } -func (e *MoonbeamImpl) DialContext(ctx context.Context, rawurl string) (err error) { +func (e *PollImpl) DialContext(ctx context.Context, rawurl string) (err error) { timeout, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() @@ -44,59 +57,66 @@ func (e *MoonbeamImpl) DialContext(ctx context.Context, rawurl string) (err erro return err } + if e.Finalizer != nil { + err = e.Finalizer.DialContext(ctx, rawurl) + if err != nil { + return err + } + } + // This is used for doing all other go-ethereum calls. return e.BaseEth.DialContext(ctx, rawurl) } -func (e *MoonbeamImpl) NewAbiFilterer(address ethCommon.Address) (err error) { +func (e *PollImpl) NewAbiFilterer(address ethCommon.Address) (err error) { return e.BaseEth.NewAbiFilterer(address) } -func (e *MoonbeamImpl) NewAbiCaller(address ethCommon.Address) (err error) { +func (e *PollImpl) NewAbiCaller(address ethCommon.Address) (err error) { return e.BaseEth.NewAbiCaller(address) } -func (e *MoonbeamImpl) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) { +func (e *PollImpl) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) { return e.BaseEth.GetCurrentGuardianSetIndex(ctx) } -func (e *MoonbeamImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) { +func (e *PollImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) { return e.BaseEth.GetGuardianSet(ctx, index) } -func (e *MoonbeamImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { +func (e *PollImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { return e.BaseEth.WatchLogMessagePublished(ctx, timeout, sink) } -func (e *MoonbeamImpl) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) { +func (e *PollImpl) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) { return e.BaseEth.TransactionReceipt(ctx, txHash) } -func (e *MoonbeamImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) { +func (e *PollImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) { return e.BaseEth.TimeOfBlockByHash(ctx, hash) } -func (e *MoonbeamImpl) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) { +func (e *PollImpl) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) { return e.BaseEth.ParseLogMessagePublished(log) } -type MoonbeamSubscription struct { +type PollSubscription struct { errOnce sync.Once err chan error quit chan error unsubDone chan struct{} } -func (sub *MoonbeamSubscription) Err() <-chan error { +var ErrUnsubscribed = errors.New("unsubscribed") + +func (sub *PollSubscription) Err() <-chan error { return sub.err } -var errUnsubscribed = errors.New("unsubscribed") - -func (sub *MoonbeamSubscription) Unsubscribe() { +func (sub *PollSubscription) Unsubscribe() { sub.errOnce.Do(func() { select { - case sub.quit <- errUnsubscribed: + case sub.quit <- ErrUnsubscribed: <-sub.unsubDone case <-sub.unsubDone: } @@ -104,12 +124,7 @@ func (sub *MoonbeamSubscription) Unsubscribe() { }) } -// Moonbeam can publish blocks before they are marked final. This means we need to sit on the block until a special "is finalized" -// query returns true. The assumption is that every block number will eventually be published and finalized, it's just that the contents -// of the block (and therefore the hash) might change if there is a rollback. Therefore rather than subscribing for headers from geth, -// we use a polling mechanism to get the next expected block, and keep doing it until it is marked final. - -func (e *MoonbeamImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.NewBlock) (ethereum.Subscription, error) { +func (e *PollImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.NewBlock) (ethereum.Subscription, error) { if e.BaseEth.client == nil { panic("client is not initialized!") } @@ -117,7 +132,7 @@ func (e *MoonbeamImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *comm panic("rawClient is not initialized!") } - sub := &MoonbeamSubscription{ + sub := &PollSubscription{ err: make(chan error, 1), } @@ -127,7 +142,6 @@ func (e *MoonbeamImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *comm } currentBlockNumber := *latestBlock.Number - const DELAY_IN_MS = 250 var BIG_ONE = big.NewInt(1) timer := time.NewTimer(time.Millisecond) // Start immediately. @@ -175,12 +189,18 @@ func (e *MoonbeamImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *comm } } - finalized, err := e.isBlockFinalized(ctx, block.Hash.Hex()) - if err != nil { - errorOccurred = true - e.logger.Error("failed to see if block is finalized", zap.String("eth_network", e.BaseEth.NetworkName), - zap.Uint64("block", currentBlockNumber.Uint64()), zap.Error(err)) - break + var finalized bool + if e.Finalizer != nil { + var err error + finalized, err = e.Finalizer.IsBlockFinalized(ctx, block) + if err != nil { + errorOccurred = true + e.logger.Error("failed to see if block is finalized", zap.String("eth_network", e.BaseEth.NetworkName), + zap.Uint64("block", currentBlockNumber.Uint64()), zap.Error(err)) + break + } + } else { + finalized = true } if !finalized { break @@ -199,7 +219,7 @@ func (e *MoonbeamImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *comm errorCount = 0 } - timer = time.NewTimer(DELAY_IN_MS * time.Millisecond) + timer = time.NewTimer(time.Duration(e.DelayInMs) * time.Millisecond) } } }() @@ -207,7 +227,7 @@ func (e *MoonbeamImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *comm return sub, err } -func (e *MoonbeamImpl) getBlock(ctx context.Context, number *big.Int) (*common.NewBlock, error) { +func (e *PollImpl) getBlock(ctx context.Context, number *big.Int) (*common.NewBlock, error) { var numStr string if number != nil { numStr = ethHexUtils.EncodeBig(number) @@ -217,7 +237,7 @@ func (e *MoonbeamImpl) getBlock(ctx context.Context, number *big.Int) (*common.N type Marshaller struct { Number *ethHexUtils.Big - Hash ethCommon.Hash `json:"hash"` + Hash ethCommon.Hash `json:"hash"` } var m Marshaller @@ -234,14 +254,3 @@ func (e *MoonbeamImpl) getBlock(ctx context.Context, number *big.Int) (*common.N Hash: m.Hash, }, nil } - -func (e *MoonbeamImpl) isBlockFinalized(ctx context.Context, hash string) (bool, error) { - var finalized bool - err := e.rawClient.CallContext(ctx, &finalized, "moon_isBlockFinalized", hash) - if err != nil { - e.logger.Error("failed to check for finality", zap.String("eth_network", e.BaseEth.NetworkName), zap.Error(err)) - return false, err - } - - return finalized, nil -} diff --git a/node/pkg/ethereum/polygonfin.go b/node/pkg/ethereum/polygonfin.go new file mode 100644 index 000000000..001fc1e27 --- /dev/null +++ b/node/pkg/ethereum/polygonfin.go @@ -0,0 +1,93 @@ +// This implements the finality check for Polygon. +// +// It can take up to 512 blocks for polygon blocks to be finalized. Rather than wait that long, we will query the checkpoint to see if they are finalized sooner. +// +// TestNet query URL: "https://apis.matic.network/api/v1/mumbai/block-included/" +// MainNet query URL: "https://apis.matic.network/api/v1/matic/block-included/block-number/" + +package ethereum + +import ( + "context" + "encoding/json" + common "github.com/certusone/wormhole/node/pkg/common" + "go.uber.org/zap" + "io/ioutil" + "math/big" + "net/http" +) + +type PolygonFinalizer struct { + Url string + logger *zap.Logger + networkName string + highestCheckpoint big.Int +} + +func UsePolygonFinalizer(extraParams []string) bool { + return len(extraParams) != 0 && extraParams[0] != "" +} + +func (f *PolygonFinalizer) SetLogger(l *zap.Logger, netName string) { + f.logger = l + f.networkName = netName + f.logger.Info("using Polygon specific finality check", zap.String("eth_network", f.networkName), zap.String("query_url", f.Url)) +} + +func (f *PolygonFinalizer) DialContext(ctx context.Context, _rawurl string) (err error) { + return nil +} + +func (f *PolygonFinalizer) IsBlockFinalized(ctx context.Context, block *common.NewBlock) (bool, error) { + if block.Number.Cmp(&f.highestCheckpoint) <= 0 { + return true, nil + } + + url := f.Url + block.Number.String() + response, err := http.Get(url) + if err != nil { + return false, err + } + + responseData, err := ioutil.ReadAll(response.Body) + if err != nil { + return false, err + } + + var result map[string]string + json.Unmarshal([]byte(responseData), &result) + + status := result["message"] + if status == "" || status == "No block found" { + f.logger.Info("DEBUG: not finalized", zap.String("eth_network", f.networkName), zap.String("requested_block", block.Number.String())) + return false, nil + } + + if status != "success" { + f.logger.Error("unexpected checkpoint status", zap.String("eth_network", f.networkName), + zap.String("requested_block", block.Number.String()), zap.String("status", status)) + return false, nil + } + + // If we get this far, we know this block is finalized, so we will return true even in the error cases. + + endStr := result["end"] + if endStr == "" { + f.logger.Error("checkpoint reply is missing end", zap.String("eth_network", f.networkName), zap.String("requested_block", block.Number.String())) + return true, nil + } + + end, ok := new(big.Int).SetString(endStr, 10) + if !ok { + f.logger.Error("checkpoint reply contains unexpected end", zap.String("eth_network", f.networkName), + zap.String("requested_block", block.Number.String()), zap.String("end_str", endStr)) + return true, nil + } + + f.highestCheckpoint = *end + + f.logger.Info("checkpoint query returned", zap.String("eth_network", f.networkName), + zap.String("requested_block", block.Number.String()), zap.String("reply", string(responseData))) + + return (block.Number.Cmp(&f.highestCheckpoint) <= 0), nil +} diff --git a/node/pkg/ethereum/watcher.go b/node/pkg/ethereum/watcher.go index 6626c6c45..4e1bda380 100644 --- a/node/pkg/ethereum/watcher.go +++ b/node/pkg/ethereum/watcher.go @@ -126,15 +126,18 @@ func NewEthWatcher( setEvents chan *common.GuardianSet, minConfirmations uint64, obsvReqC chan *gossipv1.ObservationRequest, - unsafeDevMode bool) *Watcher { + unsafeDevMode bool, + extraParams []string) *Watcher { var ethIntf common.Ethish if chainID == vaa.ChainIDCelo && !unsafeDevMode { // When we are running in mainnet or testnet, we need to use the Celo ethereum library rather than go-ethereum. // However, in devnet, we currently run the standard ETH node for Celo, so we need to use the standard go-ethereum. ethIntf = &celo.CeloImpl{NetworkName: networkName} - } else if chainID == vaa.ChainIDMoonbeam { - ethIntf = &MoonbeamImpl{BaseEth: &EthImpl{NetworkName: networkName}} + } else if chainID == vaa.ChainIDMoonbeam && !unsafeDevMode { + ethIntf = &PollImpl{BaseEth: EthImpl{NetworkName: networkName}, Finalizer: &MoonbeamFinalizer{}, DelayInMs: 250} + } else if chainID == vaa.ChainIDPolygon && UsePolygonFinalizer(extraParams) { + ethIntf = &PollImpl{BaseEth: EthImpl{NetworkName: networkName}, Finalizer: &PolygonFinalizer{Url: extraParams[0]}, DelayInMs: 5000} } else { ethIntf = &EthImpl{NetworkName: networkName} }