Refactor ethwatcher (#1618)

* reduce code duplication in ethereum watcher

Change-Id: I1d78e9cb86cb126e42959ef5805c0ee1b83e23a6

* reorganize finalizers

Change-Id: I64c61ebd8a03d4ba9f27ce82fbc4cae8b35f19d2

* rename ethereum to evm

Change-Id: I6d1e56df65c74edb3643b3506adcff8588e0a87b

* reorganize connectors

Change-Id: I21b31fb00f34dd6501e957b8f528723d5f548a52

* restructure into modular connectors

Change-Id: I111403a29464acfccad18b261658f59176a8795d
This commit is contained in:
Hendrik Hofstadt 2022-09-28 13:15:57 +02:00 committed by GitHub
parent 69962f96f5
commit ddd4f26f20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 896 additions and 1717 deletions

View File

@ -11,6 +11,8 @@ import (
"path"
"strings"
"github.com/certusone/wormhole/node/pkg/evm"
"github.com/benbjohnson/clock"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/notify/discord"
@ -25,7 +27,6 @@ import (
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/devnet"
"github.com/certusone/wormhole/node/pkg/ethereum"
"github.com/certusone/wormhole/node/pkg/governor"
"github.com/certusone/wormhole/node/pkg/p2p"
"github.com/certusone/wormhole/node/pkg/processor"
@ -915,12 +916,12 @@ func runNode(cmd *cobra.Command, args []string) {
}
if err := supervisor.Run(ctx, "ethwatch",
ethereum.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode).Run); err != nil {
return err
}
if err := supervisor.Run(ctx, "bscwatch",
ethereum.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode).Run); err != nil {
return err
}
@ -930,7 +931,7 @@ func runNode(cmd *cobra.Command, args []string) {
}
if err := supervisor.Run(ctx, "polygonwatch",
ethereum.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode).Run); err != nil {
// Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is
//
// Hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect
@ -939,49 +940,49 @@ func runNode(cmd *cobra.Command, args []string) {
return err
}
if err := supervisor.Run(ctx, "avalanchewatch",
ethereum.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil {
return err
}
if err := supervisor.Run(ctx, "oasiswatch",
ethereum.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil {
return err
}
if err := supervisor.Run(ctx, "aurorawatch",
ethereum.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil {
return err
}
if err := supervisor.Run(ctx, "fantomwatch",
ethereum.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil {
return err
}
if err := supervisor.Run(ctx, "karurawatch",
ethereum.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil {
return err
}
if err := supervisor.Run(ctx, "acalawatch",
ethereum.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil {
return err
}
if err := supervisor.Run(ctx, "klaytnwatch",
ethereum.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil {
return err
}
if err := supervisor.Run(ctx, "celowatch",
ethereum.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil {
return err
}
if err := supervisor.Run(ctx, "moonbeamwatch",
ethereum.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil {
return err
}
if *testnetMode {
if err := supervisor.Run(ctx, "ethropstenwatch",
ethereum.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode).Run); err != nil {
return err
}
if err := supervisor.Run(ctx, "neonwatch",
ethereum.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode).Run); err != nil {
evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode).Run); err != nil {
return err
}
}

View File

@ -10,9 +10,10 @@ import (
"flag"
"log"
"github.com/certusone/wormhole/node/pkg/celo"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/ethereum"
"github.com/certusone/wormhole/node/pkg/evm"
"github.com/certusone/wormhole/node/pkg/evm/connectors"
"go.uber.org/zap"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
@ -34,27 +35,25 @@ func main() {
ctx := context.Background()
var ethIntf common.Ethish
if chainID == vaa.ChainIDCelo {
ethIntf = &celo.CeloImpl{NetworkName: "celo"}
} else {
ethIntf = &ethereum.EthImpl{NetworkName: "eth"}
}
err := ethIntf.DialContext(ctx, *flagEthRPC)
if err != nil {
log.Fatal(err)
}
contractAddr := ethCommon.HexToAddress(*flagContractAddr)
var ethIntf connectors.Connector
var err error
if chainID == vaa.ChainIDCelo {
ethIntf, err = connectors.NewCeloConnector(ctx, "", *flagEthRPC, contractAddr, zap.L())
if err != nil {
log.Fatalf("dialing eth client failed: %v", err)
}
} else {
ethIntf, err = connectors.NewEthereumConnector(ctx, "", *flagEthRPC, contractAddr, zap.L())
if err != nil {
log.Fatalf("dialing eth client failed: %v", err)
}
}
transactionHash := ethCommon.HexToHash(*flagTx)
err = ethIntf.NewAbiFilterer(contractAddr)
if err != nil {
log.Fatal(err)
}
block, msgs, err := ethereum.MessageEventsForTransaction(ctx, ethIntf, contractAddr, chainID, transactionHash)
block, msgs, err := evm.MessageEventsForTransaction(ctx, ethIntf, contractAddr, chainID, transactionHash)
if err != nil {
log.Fatal(err)
}

View File

@ -14,8 +14,9 @@ import (
"strings"
"time"
"github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/ethereum/abi"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
abi2 "github.com/ethereum/go-ethereum/accounts/abi"
@ -362,7 +363,7 @@ func main() {
Timeout: 5 * time.Second,
}
ethAbi, err := abi2.JSON(strings.NewReader(abi.AbiABI))
ethAbi, err := abi2.JSON(strings.NewReader(ethabi.AbiABI))
if err != nil {
log.Fatalf("failed to parse Eth ABI: %v", err)
}

View File

@ -1,37 +0,0 @@
// This specifies the interface to the chain specific Eth / EVM libraries.
// This interface should be implemented for each chain that has a unique go-ethereum or "go-ethereum-ish" library.
package common
import (
"context"
"math/big"
ethereum "github.com/ethereum/go-ethereum"
ethCommon "github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethEvent "github.com/ethereum/go-ethereum/event"
ethAbi "github.com/certusone/wormhole/node/pkg/ethereum/abi"
"go.uber.org/zap"
)
type NewBlock struct {
Number *big.Int
Hash ethCommon.Hash
}
type Ethish interface {
SetLogger(l *zap.Logger)
DialContext(ctx context.Context, rawurl string) error
NewAbiFilterer(address ethCommon.Address) error
NewAbiCaller(address ethCommon.Address) error
GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error)
GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error)
WatchLogMessagePublished(ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error)
TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error)
TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error)
ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error)
SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error)
}

View File

@ -5,6 +5,8 @@ import (
"fmt"
"time"
"github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@ -12,7 +14,6 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"go.uber.org/zap"
"github.com/certusone/wormhole/node/pkg/ethereum/abi"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
@ -55,7 +56,7 @@ func SubmitVAA(ctx context.Context, rpcURL string, vaa *vaa.VAA) (*types.Transac
}
kt := GetKeyedTransactor(ctx)
contract, err := abi.NewAbi(GanacheWormholeContractAddress, c)
contract, err := ethabi.NewAbi(GanacheWormholeContractAddress, c)
if err != nil {
panic(err)
}

View File

@ -1,769 +0,0 @@
// Code generated - DO NOT EDIT.
// This file is a generated binding and any manual changes will be lost.
package erc20
import (
"math/big"
"strings"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
)
// Reference imports to suppress errors if they are not otherwise used.
var (
_ = big.NewInt
_ = strings.NewReader
_ = ethereum.NotFound
_ = bind.Bind
_ = common.Big1
_ = types.BloomLookup
_ = event.NewSubscription
)
// Erc20ABI is the input ABI used to generate the binding from.
const Erc20ABI = "[{\"inputs\":[{\"internalType\":\"string\",\"name\":\"name\",\"type\":\"string\"},{\"internalType\":\"string\",\"name\":\"symbol\",\"type\":\"string\"}],\"stateMutability\":\"nonpayable\",\"type\":\"constructor\"},{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"internalType\":\"address\",\"name\":\"owner\",\"type\":\"address\"},{\"indexed\":true,\"internalType\":\"address\",\"name\":\"spender\",\"type\":\"address\"},{\"indexed\":false,\"internalType\":\"uint256\",\"name\":\"value\",\"type\":\"uint256\"}],\"name\":\"Approval\",\"type\":\"event\"},{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"internalType\":\"address\",\"name\":\"from\",\"type\":\"address\"},{\"indexed\":true,\"internalType\":\"address\",\"name\":\"to\",\"type\":\"address\"},{\"indexed\":false,\"internalType\":\"uint256\",\"name\":\"value\",\"type\":\"uint256\"}],\"name\":\"Transfer\",\"type\":\"event\"},{\"inputs\":[],\"name\":\"name\",\"outputs\":[{\"internalType\":\"string\",\"name\":\"\",\"type\":\"string\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[],\"name\":\"symbol\",\"outputs\":[{\"internalType\":\"string\",\"name\":\"\",\"type\":\"string\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[],\"name\":\"decimals\",\"outputs\":[{\"internalType\":\"uint8\",\"name\":\"\",\"type\":\"uint8\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[],\"name\":\"totalSupply\",\"outputs\":[{\"internalType\":\"uint256\",\"name\":\"\",\"type\":\"uint256\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"account\",\"type\":\"address\"}],\"name\":\"balanceOf\",\"outputs\":[{\"internalType\":\"uint256\",\"name\":\"\",\"type\":\"uint256\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"recipient\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"amount\",\"type\":\"uint256\"}],\"name\":\"transfer\",\"outputs\":[{\"internalType\":\"bool\",\"name\":\"\",\"type\":\"bool\"}],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"owner\",\"type\":\"address\"},{\"internalType\":\"address\",\"name\":\"spender\",\"type\":\"address\"}],\"name\":\"allowance\",\"outputs\":[{\"internalType\":\"uint256\",\"name\":\"\",\"type\":\"uint256\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"spender\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"amount\",\"type\":\"uint256\"}],\"name\":\"approve\",\"outputs\":[{\"internalType\":\"bool\",\"name\":\"\",\"type\":\"bool\"}],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"sender\",\"type\":\"address\"},{\"internalType\":\"address\",\"name\":\"recipient\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"amount\",\"type\":\"uint256\"}],\"name\":\"transferFrom\",\"outputs\":[{\"internalType\":\"bool\",\"name\":\"\",\"type\":\"bool\"}],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"spender\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"addedValue\",\"type\":\"uint256\"}],\"name\":\"increaseAllowance\",\"outputs\":[{\"internalType\":\"bool\",\"name\":\"\",\"type\":\"bool\"}],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"spender\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"subtractedValue\",\"type\":\"uint256\"}],\"name\":\"decreaseAllowance\",\"outputs\":[{\"internalType\":\"bool\",\"name\":\"\",\"type\":\"bool\"}],\"stateMutability\":\"nonpayable\",\"type\":\"function\"}]"
// Erc20 is an auto generated Go binding around an Ethereum contract.
type Erc20 struct {
Erc20Caller // Read-only binding to the contract
Erc20Transactor // Write-only binding to the contract
Erc20Filterer // Log filterer for contract events
}
// Erc20Caller is an auto generated read-only Go binding around an Ethereum contract.
type Erc20Caller struct {
contract *bind.BoundContract // Generic contract wrapper for the low level calls
}
// Erc20Transactor is an auto generated write-only Go binding around an Ethereum contract.
type Erc20Transactor struct {
contract *bind.BoundContract // Generic contract wrapper for the low level calls
}
// Erc20Filterer is an auto generated log filtering Go binding around an Ethereum contract events.
type Erc20Filterer struct {
contract *bind.BoundContract // Generic contract wrapper for the low level calls
}
// Erc20Session is an auto generated Go binding around an Ethereum contract,
// with pre-set call and transact options.
type Erc20Session struct {
Contract *Erc20 // Generic contract binding to set the session for
CallOpts bind.CallOpts // Call options to use throughout this session
TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session
}
// Erc20CallerSession is an auto generated read-only Go binding around an Ethereum contract,
// with pre-set call options.
type Erc20CallerSession struct {
Contract *Erc20Caller // Generic contract caller binding to set the session for
CallOpts bind.CallOpts // Call options to use throughout this session
}
// Erc20TransactorSession is an auto generated write-only Go binding around an Ethereum contract,
// with pre-set transact options.
type Erc20TransactorSession struct {
Contract *Erc20Transactor // Generic contract transactor binding to set the session for
TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session
}
// Erc20Raw is an auto generated low-level Go binding around an Ethereum contract.
type Erc20Raw struct {
Contract *Erc20 // Generic contract binding to access the raw methods on
}
// Erc20CallerRaw is an auto generated low-level read-only Go binding around an Ethereum contract.
type Erc20CallerRaw struct {
Contract *Erc20Caller // Generic read-only contract binding to access the raw methods on
}
// Erc20TransactorRaw is an auto generated low-level write-only Go binding around an Ethereum contract.
type Erc20TransactorRaw struct {
Contract *Erc20Transactor // Generic write-only contract binding to access the raw methods on
}
// NewErc20 creates a new instance of Erc20, bound to a specific deployed contract.
func NewErc20(address common.Address, backend bind.ContractBackend) (*Erc20, error) {
contract, err := bindErc20(address, backend, backend, backend)
if err != nil {
return nil, err
}
return &Erc20{Erc20Caller: Erc20Caller{contract: contract}, Erc20Transactor: Erc20Transactor{contract: contract}, Erc20Filterer: Erc20Filterer{contract: contract}}, nil
}
// NewErc20Caller creates a new read-only instance of Erc20, bound to a specific deployed contract.
func NewErc20Caller(address common.Address, caller bind.ContractCaller) (*Erc20Caller, error) {
contract, err := bindErc20(address, caller, nil, nil)
if err != nil {
return nil, err
}
return &Erc20Caller{contract: contract}, nil
}
// NewErc20Transactor creates a new write-only instance of Erc20, bound to a specific deployed contract.
func NewErc20Transactor(address common.Address, transactor bind.ContractTransactor) (*Erc20Transactor, error) {
contract, err := bindErc20(address, nil, transactor, nil)
if err != nil {
return nil, err
}
return &Erc20Transactor{contract: contract}, nil
}
// NewErc20Filterer creates a new log filterer instance of Erc20, bound to a specific deployed contract.
func NewErc20Filterer(address common.Address, filterer bind.ContractFilterer) (*Erc20Filterer, error) {
contract, err := bindErc20(address, nil, nil, filterer)
if err != nil {
return nil, err
}
return &Erc20Filterer{contract: contract}, nil
}
// bindErc20 binds a generic wrapper to an already deployed contract.
func bindErc20(address common.Address, caller bind.ContractCaller, transactor bind.ContractTransactor, filterer bind.ContractFilterer) (*bind.BoundContract, error) {
parsed, err := abi.JSON(strings.NewReader(Erc20ABI))
if err != nil {
return nil, err
}
return bind.NewBoundContract(address, parsed, caller, transactor, filterer), nil
}
// Call invokes the (constant) contract method with params as input values and
// sets the output to result. The result type might be a single field for simple
// returns, a slice of interfaces for anonymous returns and a struct for named
// returns.
func (_Erc20 *Erc20Raw) Call(opts *bind.CallOpts, result *[]interface{}, method string, params ...interface{}) error {
return _Erc20.Contract.Erc20Caller.contract.Call(opts, result, method, params...)
}
// Transfer initiates a plain transaction to move funds to the contract, calling
// its default method if one is available.
func (_Erc20 *Erc20Raw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) {
return _Erc20.Contract.Erc20Transactor.contract.Transfer(opts)
}
// Transact invokes the (paid) contract method with params as input values.
func (_Erc20 *Erc20Raw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) {
return _Erc20.Contract.Erc20Transactor.contract.Transact(opts, method, params...)
}
// Call invokes the (constant) contract method with params as input values and
// sets the output to result. The result type might be a single field for simple
// returns, a slice of interfaces for anonymous returns and a struct for named
// returns.
func (_Erc20 *Erc20CallerRaw) Call(opts *bind.CallOpts, result *[]interface{}, method string, params ...interface{}) error {
return _Erc20.Contract.contract.Call(opts, result, method, params...)
}
// Transfer initiates a plain transaction to move funds to the contract, calling
// its default method if one is available.
func (_Erc20 *Erc20TransactorRaw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) {
return _Erc20.Contract.contract.Transfer(opts)
}
// Transact invokes the (paid) contract method with params as input values.
func (_Erc20 *Erc20TransactorRaw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) {
return _Erc20.Contract.contract.Transact(opts, method, params...)
}
// Allowance is a free data retrieval call binding the contract method 0xdd62ed3e.
//
// Solidity: function allowance(address owner, address spender) view returns(uint256)
func (_Erc20 *Erc20Caller) Allowance(opts *bind.CallOpts, owner common.Address, spender common.Address) (*big.Int, error) {
var out []interface{}
err := _Erc20.contract.Call(opts, &out, "allowance", owner, spender)
if err != nil {
return *new(*big.Int), err
}
out0 := *abi.ConvertType(out[0], new(*big.Int)).(**big.Int)
return out0, err
}
// Allowance is a free data retrieval call binding the contract method 0xdd62ed3e.
//
// Solidity: function allowance(address owner, address spender) view returns(uint256)
func (_Erc20 *Erc20Session) Allowance(owner common.Address, spender common.Address) (*big.Int, error) {
return _Erc20.Contract.Allowance(&_Erc20.CallOpts, owner, spender)
}
// Allowance is a free data retrieval call binding the contract method 0xdd62ed3e.
//
// Solidity: function allowance(address owner, address spender) view returns(uint256)
func (_Erc20 *Erc20CallerSession) Allowance(owner common.Address, spender common.Address) (*big.Int, error) {
return _Erc20.Contract.Allowance(&_Erc20.CallOpts, owner, spender)
}
// BalanceOf is a free data retrieval call binding the contract method 0x70a08231.
//
// Solidity: function balanceOf(address account) view returns(uint256)
func (_Erc20 *Erc20Caller) BalanceOf(opts *bind.CallOpts, account common.Address) (*big.Int, error) {
var out []interface{}
err := _Erc20.contract.Call(opts, &out, "balanceOf", account)
if err != nil {
return *new(*big.Int), err
}
out0 := *abi.ConvertType(out[0], new(*big.Int)).(**big.Int)
return out0, err
}
// BalanceOf is a free data retrieval call binding the contract method 0x70a08231.
//
// Solidity: function balanceOf(address account) view returns(uint256)
func (_Erc20 *Erc20Session) BalanceOf(account common.Address) (*big.Int, error) {
return _Erc20.Contract.BalanceOf(&_Erc20.CallOpts, account)
}
// BalanceOf is a free data retrieval call binding the contract method 0x70a08231.
//
// Solidity: function balanceOf(address account) view returns(uint256)
func (_Erc20 *Erc20CallerSession) BalanceOf(account common.Address) (*big.Int, error) {
return _Erc20.Contract.BalanceOf(&_Erc20.CallOpts, account)
}
// Decimals is a free data retrieval call binding the contract method 0x313ce567.
//
// Solidity: function decimals() view returns(uint8)
func (_Erc20 *Erc20Caller) Decimals(opts *bind.CallOpts) (uint8, error) {
var out []interface{}
err := _Erc20.contract.Call(opts, &out, "decimals")
if err != nil {
return *new(uint8), err
}
out0 := *abi.ConvertType(out[0], new(uint8)).(*uint8)
return out0, err
}
// Decimals is a free data retrieval call binding the contract method 0x313ce567.
//
// Solidity: function decimals() view returns(uint8)
func (_Erc20 *Erc20Session) Decimals() (uint8, error) {
return _Erc20.Contract.Decimals(&_Erc20.CallOpts)
}
// Decimals is a free data retrieval call binding the contract method 0x313ce567.
//
// Solidity: function decimals() view returns(uint8)
func (_Erc20 *Erc20CallerSession) Decimals() (uint8, error) {
return _Erc20.Contract.Decimals(&_Erc20.CallOpts)
}
// Name is a free data retrieval call binding the contract method 0x06fdde03.
//
// Solidity: function name() view returns(string)
func (_Erc20 *Erc20Caller) Name(opts *bind.CallOpts) (string, error) {
var out []interface{}
err := _Erc20.contract.Call(opts, &out, "name")
if err != nil {
return *new(string), err
}
out0 := *abi.ConvertType(out[0], new(string)).(*string)
return out0, err
}
// Name is a free data retrieval call binding the contract method 0x06fdde03.
//
// Solidity: function name() view returns(string)
func (_Erc20 *Erc20Session) Name() (string, error) {
return _Erc20.Contract.Name(&_Erc20.CallOpts)
}
// Name is a free data retrieval call binding the contract method 0x06fdde03.
//
// Solidity: function name() view returns(string)
func (_Erc20 *Erc20CallerSession) Name() (string, error) {
return _Erc20.Contract.Name(&_Erc20.CallOpts)
}
// Symbol is a free data retrieval call binding the contract method 0x95d89b41.
//
// Solidity: function symbol() view returns(string)
func (_Erc20 *Erc20Caller) Symbol(opts *bind.CallOpts) (string, error) {
var out []interface{}
err := _Erc20.contract.Call(opts, &out, "symbol")
if err != nil {
return *new(string), err
}
out0 := *abi.ConvertType(out[0], new(string)).(*string)
return out0, err
}
// Symbol is a free data retrieval call binding the contract method 0x95d89b41.
//
// Solidity: function symbol() view returns(string)
func (_Erc20 *Erc20Session) Symbol() (string, error) {
return _Erc20.Contract.Symbol(&_Erc20.CallOpts)
}
// Symbol is a free data retrieval call binding the contract method 0x95d89b41.
//
// Solidity: function symbol() view returns(string)
func (_Erc20 *Erc20CallerSession) Symbol() (string, error) {
return _Erc20.Contract.Symbol(&_Erc20.CallOpts)
}
// TotalSupply is a free data retrieval call binding the contract method 0x18160ddd.
//
// Solidity: function totalSupply() view returns(uint256)
func (_Erc20 *Erc20Caller) TotalSupply(opts *bind.CallOpts) (*big.Int, error) {
var out []interface{}
err := _Erc20.contract.Call(opts, &out, "totalSupply")
if err != nil {
return *new(*big.Int), err
}
out0 := *abi.ConvertType(out[0], new(*big.Int)).(**big.Int)
return out0, err
}
// TotalSupply is a free data retrieval call binding the contract method 0x18160ddd.
//
// Solidity: function totalSupply() view returns(uint256)
func (_Erc20 *Erc20Session) TotalSupply() (*big.Int, error) {
return _Erc20.Contract.TotalSupply(&_Erc20.CallOpts)
}
// TotalSupply is a free data retrieval call binding the contract method 0x18160ddd.
//
// Solidity: function totalSupply() view returns(uint256)
func (_Erc20 *Erc20CallerSession) TotalSupply() (*big.Int, error) {
return _Erc20.Contract.TotalSupply(&_Erc20.CallOpts)
}
// Approve is a paid mutator transaction binding the contract method 0x095ea7b3.
//
// Solidity: function approve(address spender, uint256 amount) returns(bool)
func (_Erc20 *Erc20Transactor) Approve(opts *bind.TransactOpts, spender common.Address, amount *big.Int) (*types.Transaction, error) {
return _Erc20.contract.Transact(opts, "approve", spender, amount)
}
// Approve is a paid mutator transaction binding the contract method 0x095ea7b3.
//
// Solidity: function approve(address spender, uint256 amount) returns(bool)
func (_Erc20 *Erc20Session) Approve(spender common.Address, amount *big.Int) (*types.Transaction, error) {
return _Erc20.Contract.Approve(&_Erc20.TransactOpts, spender, amount)
}
// Approve is a paid mutator transaction binding the contract method 0x095ea7b3.
//
// Solidity: function approve(address spender, uint256 amount) returns(bool)
func (_Erc20 *Erc20TransactorSession) Approve(spender common.Address, amount *big.Int) (*types.Transaction, error) {
return _Erc20.Contract.Approve(&_Erc20.TransactOpts, spender, amount)
}
// DecreaseAllowance is a paid mutator transaction binding the contract method 0xa457c2d7.
//
// Solidity: function decreaseAllowance(address spender, uint256 subtractedValue) returns(bool)
func (_Erc20 *Erc20Transactor) DecreaseAllowance(opts *bind.TransactOpts, spender common.Address, subtractedValue *big.Int) (*types.Transaction, error) {
return _Erc20.contract.Transact(opts, "decreaseAllowance", spender, subtractedValue)
}
// DecreaseAllowance is a paid mutator transaction binding the contract method 0xa457c2d7.
//
// Solidity: function decreaseAllowance(address spender, uint256 subtractedValue) returns(bool)
func (_Erc20 *Erc20Session) DecreaseAllowance(spender common.Address, subtractedValue *big.Int) (*types.Transaction, error) {
return _Erc20.Contract.DecreaseAllowance(&_Erc20.TransactOpts, spender, subtractedValue)
}
// DecreaseAllowance is a paid mutator transaction binding the contract method 0xa457c2d7.
//
// Solidity: function decreaseAllowance(address spender, uint256 subtractedValue) returns(bool)
func (_Erc20 *Erc20TransactorSession) DecreaseAllowance(spender common.Address, subtractedValue *big.Int) (*types.Transaction, error) {
return _Erc20.Contract.DecreaseAllowance(&_Erc20.TransactOpts, spender, subtractedValue)
}
// IncreaseAllowance is a paid mutator transaction binding the contract method 0x39509351.
//
// Solidity: function increaseAllowance(address spender, uint256 addedValue) returns(bool)
func (_Erc20 *Erc20Transactor) IncreaseAllowance(opts *bind.TransactOpts, spender common.Address, addedValue *big.Int) (*types.Transaction, error) {
return _Erc20.contract.Transact(opts, "increaseAllowance", spender, addedValue)
}
// IncreaseAllowance is a paid mutator transaction binding the contract method 0x39509351.
//
// Solidity: function increaseAllowance(address spender, uint256 addedValue) returns(bool)
func (_Erc20 *Erc20Session) IncreaseAllowance(spender common.Address, addedValue *big.Int) (*types.Transaction, error) {
return _Erc20.Contract.IncreaseAllowance(&_Erc20.TransactOpts, spender, addedValue)
}
// IncreaseAllowance is a paid mutator transaction binding the contract method 0x39509351.
//
// Solidity: function increaseAllowance(address spender, uint256 addedValue) returns(bool)
func (_Erc20 *Erc20TransactorSession) IncreaseAllowance(spender common.Address, addedValue *big.Int) (*types.Transaction, error) {
return _Erc20.Contract.IncreaseAllowance(&_Erc20.TransactOpts, spender, addedValue)
}
// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb.
//
// Solidity: function transfer(address recipient, uint256 amount) returns(bool)
func (_Erc20 *Erc20Transactor) Transfer(opts *bind.TransactOpts, recipient common.Address, amount *big.Int) (*types.Transaction, error) {
return _Erc20.contract.Transact(opts, "transfer", recipient, amount)
}
// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb.
//
// Solidity: function transfer(address recipient, uint256 amount) returns(bool)
func (_Erc20 *Erc20Session) Transfer(recipient common.Address, amount *big.Int) (*types.Transaction, error) {
return _Erc20.Contract.Transfer(&_Erc20.TransactOpts, recipient, amount)
}
// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb.
//
// Solidity: function transfer(address recipient, uint256 amount) returns(bool)
func (_Erc20 *Erc20TransactorSession) Transfer(recipient common.Address, amount *big.Int) (*types.Transaction, error) {
return _Erc20.Contract.Transfer(&_Erc20.TransactOpts, recipient, amount)
}
// TransferFrom is a paid mutator transaction binding the contract method 0x23b872dd.
//
// Solidity: function transferFrom(address sender, address recipient, uint256 amount) returns(bool)
func (_Erc20 *Erc20Transactor) TransferFrom(opts *bind.TransactOpts, sender common.Address, recipient common.Address, amount *big.Int) (*types.Transaction, error) {
return _Erc20.contract.Transact(opts, "transferFrom", sender, recipient, amount)
}
// TransferFrom is a paid mutator transaction binding the contract method 0x23b872dd.
//
// Solidity: function transferFrom(address sender, address recipient, uint256 amount) returns(bool)
func (_Erc20 *Erc20Session) TransferFrom(sender common.Address, recipient common.Address, amount *big.Int) (*types.Transaction, error) {
return _Erc20.Contract.TransferFrom(&_Erc20.TransactOpts, sender, recipient, amount)
}
// TransferFrom is a paid mutator transaction binding the contract method 0x23b872dd.
//
// Solidity: function transferFrom(address sender, address recipient, uint256 amount) returns(bool)
func (_Erc20 *Erc20TransactorSession) TransferFrom(sender common.Address, recipient common.Address, amount *big.Int) (*types.Transaction, error) {
return _Erc20.Contract.TransferFrom(&_Erc20.TransactOpts, sender, recipient, amount)
}
// Erc20ApprovalIterator is returned from FilterApproval and is used to iterate over the raw logs and unpacked data for Approval events raised by the Erc20 contract.
type Erc20ApprovalIterator struct {
Event *Erc20Approval // Event containing the contract specifics and raw log
contract *bind.BoundContract // Generic contract to use for unpacking event data
event string // Event name to use for unpacking event data
logs chan types.Log // Log channel receiving the found contract events
sub ethereum.Subscription // Subscription for errors, completion and termination
done bool // Whether the subscription completed delivering logs
fail error // Occurred error to stop iteration
}
// Next advances the iterator to the subsequent event, returning whether there
// are any more events found. In case of a retrieval or parsing error, false is
// returned and Error() can be queried for the exact failure.
func (it *Erc20ApprovalIterator) Next() bool {
// If the iterator failed, stop iterating
if it.fail != nil {
return false
}
// If the iterator completed, deliver directly whatever's available
if it.done {
select {
case log := <-it.logs:
it.Event = new(Erc20Approval)
if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil {
it.fail = err
return false
}
it.Event.Raw = log
return true
default:
return false
}
}
// Iterator still in progress, wait for either a data or an error event
select {
case log := <-it.logs:
it.Event = new(Erc20Approval)
if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil {
it.fail = err
return false
}
it.Event.Raw = log
return true
case err := <-it.sub.Err():
it.done = true
it.fail = err
return it.Next()
}
}
// Error returns any retrieval or parsing error occurred during filtering.
func (it *Erc20ApprovalIterator) Error() error {
return it.fail
}
// Close terminates the iteration process, releasing any pending underlying
// resources.
func (it *Erc20ApprovalIterator) Close() error {
it.sub.Unsubscribe()
return nil
}
// Erc20Approval represents a Approval event raised by the Erc20 contract.
type Erc20Approval struct {
Owner common.Address
Spender common.Address
Value *big.Int
Raw types.Log // Blockchain specific contextual infos
}
// FilterApproval is a free log retrieval operation binding the contract event 0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925.
//
// Solidity: event Approval(address indexed owner, address indexed spender, uint256 value)
func (_Erc20 *Erc20Filterer) FilterApproval(opts *bind.FilterOpts, owner []common.Address, spender []common.Address) (*Erc20ApprovalIterator, error) {
var ownerRule []interface{}
for _, ownerItem := range owner {
ownerRule = append(ownerRule, ownerItem)
}
var spenderRule []interface{}
for _, spenderItem := range spender {
spenderRule = append(spenderRule, spenderItem)
}
logs, sub, err := _Erc20.contract.FilterLogs(opts, "Approval", ownerRule, spenderRule)
if err != nil {
return nil, err
}
return &Erc20ApprovalIterator{contract: _Erc20.contract, event: "Approval", logs: logs, sub: sub}, nil
}
// WatchApproval is a free log subscription operation binding the contract event 0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925.
//
// Solidity: event Approval(address indexed owner, address indexed spender, uint256 value)
func (_Erc20 *Erc20Filterer) WatchApproval(opts *bind.WatchOpts, sink chan<- *Erc20Approval, owner []common.Address, spender []common.Address) (event.Subscription, error) {
var ownerRule []interface{}
for _, ownerItem := range owner {
ownerRule = append(ownerRule, ownerItem)
}
var spenderRule []interface{}
for _, spenderItem := range spender {
spenderRule = append(spenderRule, spenderItem)
}
logs, sub, err := _Erc20.contract.WatchLogs(opts, "Approval", ownerRule, spenderRule)
if err != nil {
return nil, err
}
return event.NewSubscription(func(quit <-chan struct{}) error {
defer sub.Unsubscribe()
for {
select {
case log := <-logs:
// New log arrived, parse the event and forward to the user
event := new(Erc20Approval)
if err := _Erc20.contract.UnpackLog(event, "Approval", log); err != nil {
return err
}
event.Raw = log
select {
case sink <- event:
case err := <-sub.Err():
return err
case <-quit:
return nil
}
case err := <-sub.Err():
return err
case <-quit:
return nil
}
}
}), nil
}
// ParseApproval is a log parse operation binding the contract event 0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925.
//
// Solidity: event Approval(address indexed owner, address indexed spender, uint256 value)
func (_Erc20 *Erc20Filterer) ParseApproval(log types.Log) (*Erc20Approval, error) {
event := new(Erc20Approval)
if err := _Erc20.contract.UnpackLog(event, "Approval", log); err != nil {
return nil, err
}
return event, nil
}
// Erc20TransferIterator is returned from FilterTransfer and is used to iterate over the raw logs and unpacked data for Transfer events raised by the Erc20 contract.
type Erc20TransferIterator struct {
Event *Erc20Transfer // Event containing the contract specifics and raw log
contract *bind.BoundContract // Generic contract to use for unpacking event data
event string // Event name to use for unpacking event data
logs chan types.Log // Log channel receiving the found contract events
sub ethereum.Subscription // Subscription for errors, completion and termination
done bool // Whether the subscription completed delivering logs
fail error // Occurred error to stop iteration
}
// Next advances the iterator to the subsequent event, returning whether there
// are any more events found. In case of a retrieval or parsing error, false is
// returned and Error() can be queried for the exact failure.
func (it *Erc20TransferIterator) Next() bool {
// If the iterator failed, stop iterating
if it.fail != nil {
return false
}
// If the iterator completed, deliver directly whatever's available
if it.done {
select {
case log := <-it.logs:
it.Event = new(Erc20Transfer)
if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil {
it.fail = err
return false
}
it.Event.Raw = log
return true
default:
return false
}
}
// Iterator still in progress, wait for either a data or an error event
select {
case log := <-it.logs:
it.Event = new(Erc20Transfer)
if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil {
it.fail = err
return false
}
it.Event.Raw = log
return true
case err := <-it.sub.Err():
it.done = true
it.fail = err
return it.Next()
}
}
// Error returns any retrieval or parsing error occurred during filtering.
func (it *Erc20TransferIterator) Error() error {
return it.fail
}
// Close terminates the iteration process, releasing any pending underlying
// resources.
func (it *Erc20TransferIterator) Close() error {
it.sub.Unsubscribe()
return nil
}
// Erc20Transfer represents a Transfer event raised by the Erc20 contract.
type Erc20Transfer struct {
From common.Address
To common.Address
Value *big.Int
Raw types.Log // Blockchain specific contextual infos
}
// FilterTransfer is a free log retrieval operation binding the contract event 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef.
//
// Solidity: event Transfer(address indexed from, address indexed to, uint256 value)
func (_Erc20 *Erc20Filterer) FilterTransfer(opts *bind.FilterOpts, from []common.Address, to []common.Address) (*Erc20TransferIterator, error) {
var fromRule []interface{}
for _, fromItem := range from {
fromRule = append(fromRule, fromItem)
}
var toRule []interface{}
for _, toItem := range to {
toRule = append(toRule, toItem)
}
logs, sub, err := _Erc20.contract.FilterLogs(opts, "Transfer", fromRule, toRule)
if err != nil {
return nil, err
}
return &Erc20TransferIterator{contract: _Erc20.contract, event: "Transfer", logs: logs, sub: sub}, nil
}
// WatchTransfer is a free log subscription operation binding the contract event 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef.
//
// Solidity: event Transfer(address indexed from, address indexed to, uint256 value)
func (_Erc20 *Erc20Filterer) WatchTransfer(opts *bind.WatchOpts, sink chan<- *Erc20Transfer, from []common.Address, to []common.Address) (event.Subscription, error) {
var fromRule []interface{}
for _, fromItem := range from {
fromRule = append(fromRule, fromItem)
}
var toRule []interface{}
for _, toItem := range to {
toRule = append(toRule, toItem)
}
logs, sub, err := _Erc20.contract.WatchLogs(opts, "Transfer", fromRule, toRule)
if err != nil {
return nil, err
}
return event.NewSubscription(func(quit <-chan struct{}) error {
defer sub.Unsubscribe()
for {
select {
case log := <-logs:
// New log arrived, parse the event and forward to the user
event := new(Erc20Transfer)
if err := _Erc20.contract.UnpackLog(event, "Transfer", log); err != nil {
return err
}
event.Raw = log
select {
case sink <- event:
case err := <-sub.Err():
return err
case <-quit:
return nil
}
case err := <-sub.Err():
return err
case <-quit:
return nil
}
}
}), nil
}
// ParseTransfer is a log parse operation binding the contract event 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef.
//
// Solidity: event Transfer(address indexed from, address indexed to, uint256 value)
func (_Erc20 *Erc20Filterer) ParseTransfer(log types.Log) (*Erc20Transfer, error) {
event := new(Erc20Transfer)
if err := _Erc20.contract.UnpackLog(event, "Transfer", log); err != nil {
return nil, err
}
return event, nil
}

View File

@ -1,138 +0,0 @@
// This implements the interface to the standard go-ethereum library.
package ethereum
import (
"context"
ethereum "github.com/ethereum/go-ethereum"
ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind"
ethCommon "github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethClient "github.com/ethereum/go-ethereum/ethclient"
ethEvent "github.com/ethereum/go-ethereum/event"
common "github.com/certusone/wormhole/node/pkg/common"
ethAbi "github.com/certusone/wormhole/node/pkg/ethereum/abi"
"go.uber.org/zap"
)
type EthImpl struct {
NetworkName string
logger *zap.Logger
client *ethClient.Client
filterer *ethAbi.AbiFilterer
caller *ethAbi.AbiCaller
}
func (e *EthImpl) SetLogger(l *zap.Logger) {
e.logger = l
}
func (e *EthImpl) DialContext(ctx context.Context, rawurl string) (err error) {
e.client, err = ethClient.DialContext(ctx, rawurl)
return
}
func (e *EthImpl) NewAbiFilterer(address ethCommon.Address) (err error) {
e.filterer, err = ethAbi.NewAbiFilterer(address, e.client)
return
}
func (e *EthImpl) NewAbiCaller(address ethCommon.Address) (err error) {
e.caller, err = ethAbi.NewAbiCaller(address, e.client)
return
}
func (e *EthImpl) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) {
if e.caller == nil {
panic("caller is not initialized!")
}
opts := &ethBind.CallOpts{Context: ctx}
return e.caller.GetCurrentGuardianSetIndex(opts)
}
func (e *EthImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) {
if e.caller == nil {
panic("caller is not initialized!")
}
opts := &ethBind.CallOpts{Context: ctx}
return e.caller.GetGuardianSet(opts, index)
}
func (e *EthImpl) WatchLogMessagePublished(_ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) {
if e.filterer == nil {
panic("filterer is not initialized!")
}
return e.filterer.WatchLogMessagePublished(&ethBind.WatchOpts{Context: timeout}, sink, nil)
}
func (e *EthImpl) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) {
if e.client == nil {
panic("client is not initialized!")
}
return e.client.TransactionReceipt(ctx, txHash)
}
func (e *EthImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) {
if e.client == nil {
panic("client is not initialized!")
}
block, err := e.client.BlockByHash(ctx, hash)
if err != nil {
return 0, err
}
return block.Time(), err
}
func (e *EthImpl) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) {
if e.filterer == nil {
panic("filterer is not initialized!")
}
return e.filterer.ParseLogMessagePublished(log)
}
func (e *EthImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.NewBlock) (ethereum.Subscription, error) {
if e.client == nil {
panic("client is not initialized!")
}
headSink := make(chan *ethTypes.Header, 2)
headerSubscription, err := e.client.SubscribeNewHead(ctx, headSink)
if err != nil {
return headerSubscription, err
}
// The purpose of this is to map events from the geth event channel to the new block event channel.
go func() {
for {
select {
case <-ctx.Done():
return
case ev := <-headSink:
if ev == nil {
e.logger.Error("new header event is nil", zap.String("eth_network", e.NetworkName))
continue
}
if ev.Number == nil {
e.logger.Error("new header block number is nil", zap.String("eth_network", e.NetworkName))
continue
}
sink <- &common.NewBlock{
Number: ev.Number,
Hash: ev.Hash(),
}
}
}
}()
return headerSubscription, err
}

View File

@ -1,196 +0,0 @@
// This implements polling for log events.
// It works by using the finalizer in the polling implementation to check for log events on each new block.
package ethereum
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"
ethereum "github.com/ethereum/go-ethereum"
ethCommon "github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethClient "github.com/ethereum/go-ethereum/ethclient"
ethEvent "github.com/ethereum/go-ethereum/event"
common "github.com/certusone/wormhole/node/pkg/common"
ethAbi "github.com/certusone/wormhole/node/pkg/ethereum/abi"
"go.uber.org/zap"
)
type GetLogsImpl struct {
BasePoller *PollImpl
Query *GetLogsQuery
logger *zap.Logger
}
func NewGetLogsImpl(networkName string, contract ethCommon.Address, delayInMs int) *GetLogsImpl {
query := &GetLogsQuery{ContractAddress: contract}
return &GetLogsImpl{BasePoller: &PollImpl{BaseEth: EthImpl{NetworkName: networkName}, Finalizer: query, DelayInMs: delayInMs}, Query: query}
}
func (e *GetLogsImpl) SetLogger(l *zap.Logger) {
e.logger = l
e.logger.Info("using eth_getLogs api to retreive log events", zap.String("eth_network", e.BasePoller.BaseEth.NetworkName))
e.BasePoller.SetLogger(l)
}
func (e *GetLogsImpl) DialContext(ctx context.Context, rawurl string) (err error) {
e.Query.poller = e.BasePoller
return e.BasePoller.DialContext(ctx, rawurl)
}
func (e *GetLogsImpl) NewAbiFilterer(address ethCommon.Address) (err error) {
return e.BasePoller.NewAbiFilterer(address)
}
func (e *GetLogsImpl) NewAbiCaller(address ethCommon.Address) (err error) {
return e.BasePoller.NewAbiCaller(address)
}
func (e *GetLogsImpl) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) {
return e.BasePoller.GetCurrentGuardianSetIndex(ctx)
}
func (e *GetLogsImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) {
return e.BasePoller.GetGuardianSet(ctx, index)
}
type GetLogsPollSubscription struct {
errOnce sync.Once
err chan error
quit chan error
unsubDone chan struct{}
}
var ErrUnsubscribedForGetLogs = errors.New("unsubscribed")
func (sub *GetLogsPollSubscription) Err() <-chan error {
return sub.err
}
func (sub *GetLogsPollSubscription) Unsubscribe() {
sub.errOnce.Do(func() {
select {
case sub.quit <- ErrUnsubscribedForGetLogs:
<-sub.unsubDone
case <-sub.unsubDone:
}
close(sub.err)
})
}
func (e *GetLogsImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) {
e.Query.sink = sink
e.Query.sub = &GetLogsPollSubscription{
err: make(chan error, 1),
}
return e.Query.sub, nil
}
func (e *GetLogsImpl) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) {
return e.BasePoller.TransactionReceipt(ctx, txHash)
}
func (e *GetLogsImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) {
return e.BasePoller.TimeOfBlockByHash(ctx, hash)
}
func (e *GetLogsImpl) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) {
return e.BasePoller.ParseLogMessagePublished(log)
}
func (e *GetLogsImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.NewBlock) (ethereum.Subscription, error) {
return e.BasePoller.SubscribeForBlocks(ctx, sink)
}
type GetLogsQuery struct {
logger *zap.Logger
networkName string
ContractAddress ethCommon.Address
prevBlockNum *big.Int
client *ethClient.Client
poller *PollImpl
sink chan<- *ethAbi.AbiLogMessagePublished
sub *GetLogsPollSubscription
}
func (f *GetLogsQuery) SetLogger(l *zap.Logger, netName string) {
f.logger = l
f.networkName = netName
}
func (f *GetLogsQuery) DialContext(ctx context.Context, rawurl string) (err error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
f.client, err = ethClient.DialContext(timeout, rawurl)
return err
}
var (
getLogsBigOne = big.NewInt(1)
logsLogMessageTopic = ethCommon.HexToHash("0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2")
)
// This doesn't actually check finality, instead it queries for new log events.
func (f *GetLogsQuery) IsBlockFinalized(ctx context.Context, block *common.NewBlock) (bool, error) {
if f.prevBlockNum == nil {
f.prevBlockNum = new(big.Int).Set(block.Number)
} else {
f.prevBlockNum.Add(f.prevBlockNum, getLogsBigOne)
}
filter := ethereum.FilterQuery{
FromBlock: f.prevBlockNum,
ToBlock: block.Number,
Addresses: []ethCommon.Address{f.ContractAddress},
}
*f.prevBlockNum = *block.Number
logs, err := f.client.FilterLogs(ctx, filter)
if err != nil {
f.logger.Error("GetLogsQuery: query of eth_getLogs failed",
zap.String("eth_network", f.networkName),
zap.Stringer("FromBlock", filter.FromBlock),
zap.Stringer("ToBlock", filter.ToBlock),
zap.Error(err),
)
f.sub.err <- fmt.Errorf("GetLogsQuery: failed to query for log messages: %w", err)
return true, nil // We still return true here, because we don't want this error flagged against the poller.
}
if len(logs) == 0 {
return true, nil
}
for _, log := range logs {
if log.Topics[0] == logsLogMessageTopic {
ev, err := f.poller.ParseLogMessagePublished(log)
if err != nil {
f.logger.Error("GetLogsQuery: failed to parse log entry",
zap.String("eth_network", f.networkName),
zap.Stringer("FromBlock", filter.FromBlock),
zap.Stringer("ToBlock", filter.ToBlock),
zap.Error(err),
)
f.sub.err <- fmt.Errorf("failed to parse log message: %w", err)
continue
}
f.sink <- ev
}
}
return true, nil
}

View File

@ -1,46 +0,0 @@
// This implements the finality check for Moonbeam.
//
// Moonbeam can publish blocks before they are marked final. This means we need to sit on the block until a special "is finalized"
// query returns true. The assumption is that every block number will eventually be published and finalized, it's just that the contents
// of the block (and therefore the hash) might change if there is a rollback.
package ethereum
import (
"context"
"time"
common "github.com/certusone/wormhole/node/pkg/common"
ethRpc "github.com/ethereum/go-ethereum/rpc"
"go.uber.org/zap"
)
type MoonbeamFinalizer struct {
logger *zap.Logger
networkName string
client *ethRpc.Client
}
func (f *MoonbeamFinalizer) SetLogger(l *zap.Logger, netName string) {
f.logger = l
f.networkName = netName
f.logger.Info("using Moonbeam specific finality check", zap.String("eth_network", f.networkName))
}
func (f *MoonbeamFinalizer) DialContext(ctx context.Context, rawurl string) (err error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
f.client, err = ethRpc.DialContext(timeout, rawurl)
return err
}
func (f *MoonbeamFinalizer) IsBlockFinalized(ctx context.Context, block *common.NewBlock) (bool, error) {
var finalized bool
err := f.client.CallContext(ctx, &finalized, "moon_isBlockFinalized", block.Hash.Hex())
if err != nil {
f.logger.Error("failed to check for finality", zap.String("eth_network", f.networkName), zap.Error(err))
return false, err
}
return finalized, nil
}

View File

@ -1,274 +0,0 @@
// This implements polling for the next available block.
// It can optionally call a chain specific function to verify that the block is finalized.
package ethereum
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"
ethereum "github.com/ethereum/go-ethereum"
ethCommon "github.com/ethereum/go-ethereum/common"
ethHexUtils "github.com/ethereum/go-ethereum/common/hexutil"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethEvent "github.com/ethereum/go-ethereum/event"
ethRpc "github.com/ethereum/go-ethereum/rpc"
common "github.com/certusone/wormhole/node/pkg/common"
ethAbi "github.com/certusone/wormhole/node/pkg/ethereum/abi"
"go.uber.org/zap"
)
type PollFinalizer interface {
SetLogger(l *zap.Logger, netName string)
DialContext(ctx context.Context, rawurl string) error
IsBlockFinalized(ctx context.Context, block *common.NewBlock) (bool, error)
}
type PollImpl struct {
BaseEth EthImpl
Finalizer PollFinalizer
DelayInMs int
IsEthPoS bool
hasEthSwitchedToPoS bool
logger *zap.Logger
rawClient *ethRpc.Client
}
func (e *PollImpl) SetLogger(l *zap.Logger) {
e.logger = l
e.logger.Info("using polling to check for new blocks", zap.String("eth_network", e.BaseEth.NetworkName), zap.Int("delay_in_ms", e.DelayInMs))
if e.Finalizer != nil {
e.Finalizer.SetLogger(l, e.BaseEth.NetworkName)
}
}
func (e *PollImpl) SetEthSwitched() {
e.hasEthSwitchedToPoS = true
e.logger.Info("switching from latest to finalized", zap.String("eth_network", e.BaseEth.NetworkName), zap.Int("delay_in_ms", e.DelayInMs))
}
func (e *PollImpl) DialContext(ctx context.Context, rawurl string) (err error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
// This is used for doing raw eth_ RPC calls.
e.rawClient, err = ethRpc.DialContext(timeout, rawurl)
if err != nil {
return err
}
if e.Finalizer != nil {
err = e.Finalizer.DialContext(ctx, rawurl)
if err != nil {
return err
}
}
// This is used for doing all other go-ethereum calls.
return e.BaseEth.DialContext(ctx, rawurl)
}
func (e *PollImpl) NewAbiFilterer(address ethCommon.Address) (err error) {
return e.BaseEth.NewAbiFilterer(address)
}
func (e *PollImpl) NewAbiCaller(address ethCommon.Address) (err error) {
return e.BaseEth.NewAbiCaller(address)
}
func (e *PollImpl) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) {
return e.BaseEth.GetCurrentGuardianSetIndex(ctx)
}
func (e *PollImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) {
return e.BaseEth.GetGuardianSet(ctx, index)
}
func (e *PollImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) {
return e.BaseEth.WatchLogMessagePublished(ctx, timeout, sink)
}
func (e *PollImpl) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) {
return e.BaseEth.TransactionReceipt(ctx, txHash)
}
func (e *PollImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) {
return e.BaseEth.TimeOfBlockByHash(ctx, hash)
}
func (e *PollImpl) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) {
return e.BaseEth.ParseLogMessagePublished(log)
}
type PollSubscription struct {
errOnce sync.Once
err chan error
quit chan error
unsubDone chan struct{}
}
var ErrUnsubscribed = errors.New("unsubscribed")
func (sub *PollSubscription) Err() <-chan error {
return sub.err
}
func (sub *PollSubscription) Unsubscribe() {
sub.errOnce.Do(func() {
select {
case sub.quit <- ErrUnsubscribed:
<-sub.unsubDone
case <-sub.unsubDone:
}
close(sub.err)
})
}
func (e *PollImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.NewBlock) (ethereum.Subscription, error) {
if e.BaseEth.client == nil {
panic("client is not initialized!")
}
if e.rawClient == nil {
panic("rawClient is not initialized!")
}
sub := &PollSubscription{
err: make(chan error, 1),
}
latestBlock, err := e.getBlock(ctx, nil)
if err != nil {
return nil, err
}
currentBlockNumber := *latestBlock.Number
var BIG_ONE = big.NewInt(1)
timer := time.NewTimer(time.Millisecond) // Start immediately.
go func() {
var errorCount int
for {
select {
case <-ctx.Done():
return
case <-timer.C:
var errorOccurred bool
for {
var block *common.NewBlock
var err error
errorOccurred = false
// See if the next block has been created yet.
if currentBlockNumber.Cmp(latestBlock.Number) > 0 {
tmpLatestBlock, latestBlockErr := e.getBlock(ctx, nil)
if latestBlockErr != nil {
errorOccurred = true
e.logger.Error("failed to look up latest block", zap.String("eth_network", e.BaseEth.NetworkName),
zap.Uint64("block", currentBlockNumber.Uint64()), zap.Error(latestBlockErr))
break
}
latestBlock = tmpLatestBlock
if currentBlockNumber.Cmp(latestBlock.Number) > 0 {
// We have to wait for this block to become available.
break
}
if currentBlockNumber.Cmp(latestBlock.Number) == 0 {
block = latestBlock
}
}
// Fetch the hash every time, in case it changes due to a rollback. The only exception is if we just got it above.
if block == nil {
block, err = e.getBlock(ctx, &currentBlockNumber)
if err != nil {
errorOccurred = true
e.logger.Error("failed to get current block", zap.String("eth_network", e.BaseEth.NetworkName),
zap.Uint64("block", currentBlockNumber.Uint64()), zap.Error(err))
break
}
}
if e.Finalizer != nil {
finalized, err := e.Finalizer.IsBlockFinalized(ctx, block)
if err != nil {
errorOccurred = true
e.logger.Error("failed to see if block is finalized", zap.String("eth_network", e.BaseEth.NetworkName),
zap.Uint64("block", currentBlockNumber.Uint64()), zap.Error(err))
break
}
if !finalized {
break
}
}
sink <- block
currentBlockNumber.Add(&currentBlockNumber, BIG_ONE)
}
if errorOccurred {
errorCount++
if errorCount > 1 {
sub.err <- fmt.Errorf("polling encountered multiple errors")
}
} else {
errorCount = 0
}
timer = time.NewTimer(time.Duration(e.DelayInMs) * time.Millisecond)
}
}
}()
return sub, err
}
func (e *PollImpl) getBlock(ctx context.Context, number *big.Int) (*common.NewBlock, error) {
var numStr string
if number != nil {
numStr = ethHexUtils.EncodeBig(number)
} else if e.hasEthSwitchedToPoS {
numStr = "finalized"
} else {
numStr = "latest"
}
type Marshaller struct {
Number *ethHexUtils.Big
Hash ethCommon.Hash `json:"hash"`
Difficulty *ethHexUtils.Big
}
var m Marshaller
err := e.rawClient.CallContext(ctx, &m, "eth_getBlockByNumber", numStr, false)
if err != nil {
e.logger.Error("failed to get block", zap.String("eth_network", e.BaseEth.NetworkName),
zap.String("requested_block", numStr), zap.Error(err))
return nil, err
}
if m.Number == nil {
e.logger.Error("failed to unmarshal block", zap.String("eth_network", e.BaseEth.NetworkName),
zap.String("requested_block", numStr),
)
return nil, fmt.Errorf("failed to unmarshal block: Number is nil")
}
d := big.Int(*m.Difficulty)
if e.IsEthPoS && !e.hasEthSwitchedToPoS && d.Cmp(big.NewInt(0)) == 0 {
e.SetEthSwitched()
return e.getBlock(ctx, number)
}
n := big.Int(*m.Number)
return &common.NewBlock{
Number: &n,
Hash: m.Hash,
}, nil
}

View File

@ -1,10 +1,12 @@
package ethereum
package evm
import (
"context"
"fmt"
"time"
"github.com/certusone/wormhole/node/pkg/evm/connectors"
"github.com/certusone/wormhole/node/pkg/common"
eth_common "github.com/ethereum/go-ethereum/common"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
@ -21,13 +23,13 @@ var (
// Returns the block number and a list of MessagePublication events.
func MessageEventsForTransaction(
ctx context.Context,
ethIntf common.Ethish,
ethConn connectors.Connector,
contract eth_common.Address,
chainId vaa.ChainID,
tx eth_common.Hash) (uint64, []*common.MessagePublication, error) {
// Get transactions logs from transaction
receipt, err := ethIntf.TransactionReceipt(ctx, tx)
receipt, err := ethConn.TransactionReceipt(ctx, tx)
if err != nil {
return 0, nil, fmt.Errorf("failed to get transaction receipt: %w", err)
}
@ -45,7 +47,7 @@ func MessageEventsForTransaction(
}
// Get block
blockTime, err := ethIntf.TimeOfBlockByHash(ctx, receipt.BlockHash)
blockTime, err := ethConn.TimeOfBlockByHash(ctx, receipt.BlockHash)
if err != nil {
return 0, nil, fmt.Errorf("failed to get block time: %w", err)
}
@ -67,7 +69,7 @@ func MessageEventsForTransaction(
continue
}
ev, err := ethIntf.ParseLogMessagePublished(*l)
ev, err := ethConn.ParseLogMessagePublished(*l)
if err != nil {
return 0, nil, fmt.Errorf("failed to parse log: %w", err)
}

View File

@ -1,71 +1,80 @@
// This implements the interface to the "go-ethereum-ish" library used by Celo.
package celo
package connectors
import (
"context"
celoAbi "github.com/certusone/wormhole/node/pkg/evm/connectors/celoabi"
ethAbi "github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi"
celoBind "github.com/celo-org/celo-blockchain/accounts/abi/bind"
celoCommon "github.com/celo-org/celo-blockchain/common"
celoTypes "github.com/celo-org/celo-blockchain/core/types"
celoClient "github.com/celo-org/celo-blockchain/ethclient"
celoRpc "github.com/celo-org/celo-blockchain/rpc"
ethereum "github.com/ethereum/go-ethereum"
ethCommon "github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethEvent "github.com/ethereum/go-ethereum/event"
celoAbi "github.com/certusone/wormhole/node/pkg/celo/abi"
common "github.com/certusone/wormhole/node/pkg/common"
ethAbi "github.com/certusone/wormhole/node/pkg/ethereum/abi"
"go.uber.org/zap"
)
type CeloImpl struct {
NetworkName string
// CeloConnector implements EVM network query capabilities for the Celo network. It's almost identical to
// EthereumConnector except it's using the Celo fork and provides shims between their respective types.
type CeloConnector struct {
networkName string
address ethCommon.Address
logger *zap.Logger
client *celoClient.Client
rawClient *celoRpc.Client
filterer *celoAbi.AbiFilterer
caller *celoAbi.AbiCaller
}
func (e *CeloImpl) SetLogger(l *zap.Logger) {
e.logger = l
e.logger.Info("using celo specific ethereum library", zap.String("eth_network", e.NetworkName))
func NewCeloConnector(ctx context.Context, networkName, rawUrl string, address ethCommon.Address, logger *zap.Logger) (*CeloConnector, error) {
rawClient, err := celoRpc.DialContext(ctx, rawUrl)
if err != nil {
return nil, err
}
client := celoClient.NewClient(rawClient)
filterer, err := celoAbi.NewAbiFilterer(celoCommon.BytesToAddress(address.Bytes()), client)
if err != nil {
panic(err)
}
caller, err := celoAbi.NewAbiCaller(celoCommon.BytesToAddress(address.Bytes()), client)
if err != nil {
panic(err)
}
func (e *CeloImpl) DialContext(ctx context.Context, rawurl string) (err error) {
e.client, err = celoClient.DialContext(ctx, rawurl)
return
return &CeloConnector{
networkName: networkName,
address: address,
logger: logger.With(zap.String("eth_network", networkName)),
client: client,
rawClient: rawClient,
filterer: filterer,
caller: caller,
}, nil
}
func (e *CeloImpl) NewAbiFilterer(address ethCommon.Address) (err error) {
e.filterer, err = celoAbi.NewAbiFilterer(celoCommon.BytesToAddress(address.Bytes()), e.client)
return
func (c *CeloConnector) NetworkName() string {
return c.networkName
}
func (e *CeloImpl) NewAbiCaller(address ethCommon.Address) (err error) {
e.caller, err = celoAbi.NewAbiCaller(celoCommon.BytesToAddress(address.Bytes()), e.client)
return
}
func (e *CeloImpl) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) {
if e.caller == nil {
panic("caller is not initialized!")
func (c *CeloConnector) ContractAddress() ethCommon.Address {
return c.address
}
func (c *CeloConnector) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) {
opts := &celoBind.CallOpts{Context: ctx}
return e.caller.GetCurrentGuardianSetIndex(opts)
}
func (e *CeloImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) {
if e.caller == nil {
panic("caller is not initialized!")
return c.caller.GetCurrentGuardianSetIndex(opts)
}
func (c *CeloConnector) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) {
opts := &celoBind.CallOpts{Context: ctx}
celoGs, err := e.caller.GetGuardianSet(opts, index)
celoGs, err := c.caller.GetGuardianSet(opts, index)
if err != nil {
return ethAbi.StructsGuardianSet{}, err
}
@ -81,13 +90,9 @@ func (e *CeloImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.Str
}, err
}
func (e *CeloImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) {
if e.filterer == nil {
panic("filterer is not initialized!")
}
func (c *CeloConnector) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) {
messageC := make(chan *celoAbi.AbiLogMessagePublished, 2)
messageSub, err := e.filterer.WatchLogMessagePublished(&celoBind.WatchOpts{Context: timeout}, messageC, nil)
messageSub, err := c.filterer.WatchLogMessagePublished(&celoBind.WatchOpts{Context: ctx}, messageC, nil)
if err != nil {
return messageSub, err
}
@ -96,10 +101,11 @@ func (e *CeloImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink c
go func() {
for {
select {
case <-ctx.Done():
// This will return when the subscription is unsubscribed as the error channel gets closed
case <-messageSub.Err():
return
case celoEvent := <-messageC:
sink <- convertEventToEth(celoEvent)
sink <- convertCeloEventToEth(celoEvent)
}
}
}()
@ -107,25 +113,17 @@ func (e *CeloImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink c
return messageSub, err
}
func (e *CeloImpl) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) {
if e.client == nil {
panic("client is not initialized!")
}
celoReceipt, err := e.client.TransactionReceipt(ctx, celoCommon.BytesToHash(txHash.Bytes()))
func (c *CeloConnector) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) {
celoReceipt, err := c.client.TransactionReceipt(ctx, celoCommon.BytesToHash(txHash.Bytes()))
if err != nil {
return nil, err
}
return convertReceiptToEth(celoReceipt), err
return convertCeloReceiptToEth(celoReceipt), err
}
func (e *CeloImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) {
if e.client == nil {
panic("client is not initialized!")
}
block, err := e.client.BlockByHash(ctx, celoCommon.BytesToHash(hash.Bytes()))
func (c *CeloConnector) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) {
block, err := c.client.BlockByHash(ctx, celoCommon.BytesToHash(hash.Bytes()))
if err != nil {
return 0, err
}
@ -133,26 +131,18 @@ func (e *CeloImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (
return block.Time(), err
}
func (e *CeloImpl) ParseLogMessagePublished(ethLog ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) {
if e.filterer == nil {
panic("filterer is not initialized!")
}
celoEvent, err := e.filterer.ParseLogMessagePublished(*convertLogFromEth(&ethLog))
func (c *CeloConnector) ParseLogMessagePublished(ethLog ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) {
celoEvent, err := c.filterer.ParseLogMessagePublished(*convertCeloLogFromEth(&ethLog))
if err != nil {
return nil, err
}
return convertEventToEth(celoEvent), err
}
func (e *CeloImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.NewBlock) (ethereum.Subscription, error) {
if e.client == nil {
panic("client is not initialized!")
return convertCeloEventToEth(celoEvent), err
}
func (c *CeloConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) {
headSink := make(chan *celoTypes.Header, 2)
headerSubscription, err := e.client.SubscribeNewHead(ctx, headSink)
headerSubscription, err := c.client.SubscribeNewHead(ctx, headSink)
if err != nil {
return headerSubscription, err
}
@ -165,14 +155,14 @@ func (e *CeloImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.N
return
case ev := <-headSink:
if ev == nil {
e.logger.Error("new header event is nil", zap.String("eth_network", e.NetworkName))
c.logger.Error("new header event is nil")
continue
}
if ev.Number == nil {
e.logger.Error("new header block number is nil", zap.String("eth_network", e.NetworkName))
c.logger.Error("new header block number is nil")
continue
}
sink <- &common.NewBlock{
sink <- &NewBlock{
Number: ev.Number,
Hash: ethCommon.BytesToHash(ev.Hash().Bytes()),
}
@ -183,18 +173,22 @@ func (e *CeloImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.N
return headerSubscription, err
}
func convertEventToEth(ev *celoAbi.AbiLogMessagePublished) *ethAbi.AbiLogMessagePublished {
func (c *CeloConnector) RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
return c.rawClient.CallContext(ctx, result, method, args...)
}
func convertCeloEventToEth(ev *celoAbi.AbiLogMessagePublished) *ethAbi.AbiLogMessagePublished {
return &ethAbi.AbiLogMessagePublished{
Sender: ethCommon.BytesToAddress(ev.Sender.Bytes()),
Sequence: ev.Sequence,
Nonce: ev.Nonce,
Payload: ev.Payload,
ConsistencyLevel: ev.ConsistencyLevel,
Raw: *convertLogToEth(&ev.Raw),
Raw: *convertCeloLogToEth(&ev.Raw),
}
}
func convertLogToEth(l *celoTypes.Log) *ethTypes.Log {
func convertCeloLogToEth(l *celoTypes.Log) *ethTypes.Log {
topics := make([]ethCommon.Hash, len(l.Topics))
for n, t := range l.Topics {
topics[n] = ethCommon.BytesToHash(t.Bytes())
@ -213,10 +207,10 @@ func convertLogToEth(l *celoTypes.Log) *ethTypes.Log {
}
}
func convertReceiptToEth(celoReceipt *celoTypes.Receipt) *ethTypes.Receipt {
func convertCeloReceiptToEth(celoReceipt *celoTypes.Receipt) *ethTypes.Receipt {
ethLogs := make([]*ethTypes.Log, len(celoReceipt.Logs))
for n, l := range celoReceipt.Logs {
ethLogs[n] = convertLogToEth(l)
ethLogs[n] = convertCeloLogToEth(l)
}
return &ethTypes.Receipt{
@ -235,7 +229,7 @@ func convertReceiptToEth(celoReceipt *celoTypes.Receipt) *ethTypes.Receipt {
}
}
func convertLogFromEth(l *ethTypes.Log) *celoTypes.Log {
func convertCeloLogFromEth(l *ethTypes.Log) *celoTypes.Log {
topics := make([]celoCommon.Hash, len(l.Topics))
for n, t := range l.Topics {
topics[n] = celoCommon.BytesToHash(t.Bytes())

View File

@ -1,7 +1,7 @@
// Code generated - DO NOT EDIT.
// This file is a generated binding and any manual changes will be lost.
package abi
package celoabi
import (
"errors"

View File

@ -0,0 +1,65 @@
package connectors
import (
"context"
"errors"
"math/big"
"sync"
"github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
)
type NewBlock struct {
Number *big.Int
Hash common.Hash
}
// Connector exposes Wormhole-specific interactions with an EVM-based network
type Connector interface {
NetworkName() string
ContractAddress() common.Address
GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error)
GetGuardianSet(ctx context.Context, index uint32) (ethabi.StructsGuardianSet, error)
WatchLogMessagePublished(ctx context.Context, sink chan<- *ethabi.AbiLogMessagePublished) (event.Subscription, error)
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
TimeOfBlockByHash(ctx context.Context, hash common.Hash) (uint64, error)
ParseLogMessagePublished(log types.Log) (*ethabi.AbiLogMessagePublished, error)
SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error)
RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error
}
type PollSubscription struct {
errOnce sync.Once
err chan error
quit chan error
unsubDone chan struct{}
}
func NewPollSubscription() *PollSubscription {
return &PollSubscription{
err: make(chan error, 1),
quit: make(chan error, 1),
unsubDone: make(chan struct{}, 1),
}
}
var ErrUnsubscribed = errors.New("unsubscribed")
func (sub *PollSubscription) Err() <-chan error {
return sub.err
}
func (sub *PollSubscription) Unsubscribe() {
sub.errOnce.Do(func() {
select {
case sub.quit <- ErrUnsubscribed:
<-sub.unsubDone
case <-sub.unsubDone:
}
close(sub.err)
})
}

View File

@ -1,7 +1,7 @@
// Code generated - DO NOT EDIT.
// This file is a generated binding and any manual changes will be lost.
package abi
package ethabi
import (
"math/big"

View File

@ -0,0 +1,136 @@
package connectors
import (
"context"
ethRpc "github.com/ethereum/go-ethereum/rpc"
ethAbi "github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi"
ethereum "github.com/ethereum/go-ethereum"
ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind"
ethCommon "github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethClient "github.com/ethereum/go-ethereum/ethclient"
ethEvent "github.com/ethereum/go-ethereum/event"
"go.uber.org/zap"
)
// EthereumConnector implements EVM network query capabilities for go-ethereum based networks and networks supporting
// the standard web3 rpc.
type EthereumConnector struct {
networkName string
address ethCommon.Address
logger *zap.Logger
client *ethClient.Client
rawClient *ethRpc.Client
filterer *ethAbi.AbiFilterer
caller *ethAbi.AbiCaller
}
func NewEthereumConnector(ctx context.Context, networkName, rawUrl string, address ethCommon.Address, logger *zap.Logger) (*EthereumConnector, error) {
rawClient, err := ethRpc.DialContext(ctx, rawUrl)
if err != nil {
return nil, err
}
client := ethClient.NewClient(rawClient)
filterer, err := ethAbi.NewAbiFilterer(ethCommon.BytesToAddress(address.Bytes()), client)
if err != nil {
panic(err)
}
caller, err := ethAbi.NewAbiCaller(ethCommon.BytesToAddress(address.Bytes()), client)
if err != nil {
panic(err)
}
return &EthereumConnector{
networkName: networkName,
address: address,
logger: logger.With(zap.String("eth_network", networkName)),
client: client,
filterer: filterer,
caller: caller,
rawClient: rawClient,
}, nil
}
func (e *EthereumConnector) NetworkName() string {
return e.networkName
}
func (e *EthereumConnector) ContractAddress() ethCommon.Address {
return e.address
}
func (e *EthereumConnector) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) {
return e.caller.GetCurrentGuardianSetIndex(&ethBind.CallOpts{Context: ctx})
}
func (e *EthereumConnector) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) {
return e.caller.GetGuardianSet(&ethBind.CallOpts{Context: ctx}, index)
}
func (e *EthereumConnector) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) {
return e.filterer.WatchLogMessagePublished(&ethBind.WatchOpts{Context: ctx}, sink, nil)
}
func (e *EthereumConnector) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) {
return e.client.TransactionReceipt(ctx, txHash)
}
func (e *EthereumConnector) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) {
block, err := e.client.BlockByHash(ctx, hash)
if err != nil {
return 0, err
}
return block.Time(), err
}
func (e *EthereumConnector) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) {
return e.filterer.ParseLogMessagePublished(log)
}
func (e *EthereumConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) {
headSink := make(chan *ethTypes.Header, 2)
headerSubscription, err := e.client.SubscribeNewHead(ctx, headSink)
if err != nil {
return nil, err
}
// The purpose of this is to map events from the geth event channel to the new block event channel.
go func() {
for {
select {
case <-ctx.Done():
return
case ev := <-headSink:
if ev == nil {
e.logger.Error("new header event is nil")
continue
}
if ev.Number == nil {
e.logger.Error("new header block number is nil")
continue
}
sink <- &NewBlock{
Number: ev.Number,
Hash: ev.Hash(),
}
}
}
}()
return headerSubscription, err
}
func (e *EthereumConnector) RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
return e.rawClient.CallContext(ctx, result, method, args...)
}
func (e *EthereumConnector) Client() *ethClient.Client {
return e.client
}

View File

@ -0,0 +1,152 @@
package connectors
import (
"context"
"fmt"
"math/big"
"time"
ethAbi "github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi"
"github.com/certusone/wormhole/node/pkg/supervisor"
ethereum "github.com/ethereum/go-ethereum"
ethCommon "github.com/ethereum/go-ethereum/common"
ethClient "github.com/ethereum/go-ethereum/ethclient"
ethEvent "github.com/ethereum/go-ethereum/event"
"go.uber.org/zap"
)
// LogPollConnector pulls logs on each new block event when subscribing using WatchLogMessagePublished instead of using
// a websocket connection. It can be used in conjunction with a BlockPollConnector and Finalizer to only return
// finalized message log events.
type LogPollConnector struct {
Connector
client *ethClient.Client
messageFeed ethEvent.Feed
errFeed ethEvent.Feed
prevBlockNum *big.Int
}
func NewLogPollConnector(ctx context.Context, baseConnector Connector, client *ethClient.Client) (*LogPollConnector, error) {
connector := &LogPollConnector{Connector: baseConnector, client: client}
// The supervisor will keep the poller running
err := supervisor.Run(ctx, "logPoller", connector.run)
if err != nil {
return nil, err
}
return connector, nil
}
func (l *LogPollConnector) run(ctx context.Context) error {
logger := supervisor.Logger(ctx).With(zap.String("eth_network", l.Connector.NetworkName()))
blockChan := make(chan *NewBlock)
sub, err := l.SubscribeForBlocks(ctx, blockChan)
if err != nil {
return err
}
defer sub.Unsubscribe()
supervisor.Signal(ctx, supervisor.SignalHealthy)
for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-sub.Err():
return err
case block := <-blockChan:
if err := l.processBlock(ctx, logger, block); err != nil {
l.errFeed.Send(err.Error())
}
}
}
}
func (l *LogPollConnector) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) {
sub := NewPollSubscription()
messageSub := l.messageFeed.Subscribe(sink)
// The feed library does not support error forwarding, so we're emulating that using a custom subscription and
// an error feed.
innerErrSink := make(chan string, 10)
innerErrSub := l.errFeed.Subscribe(innerErrSink)
go func() {
for {
select {
case <-ctx.Done():
messageSub.Unsubscribe()
innerErrSub.Unsubscribe()
return
case <-sub.quit:
messageSub.Unsubscribe()
innerErrSub.Unsubscribe()
sub.unsubDone <- struct{}{}
return
case v := <-innerErrSink:
sub.err <- fmt.Errorf(v)
}
}
}()
return sub, nil
}
var (
getLogsBigOne = big.NewInt(1)
logsLogMessageTopic = ethCommon.HexToHash("0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2")
)
func (l *LogPollConnector) processBlock(ctx context.Context, logger *zap.Logger, block *NewBlock) error {
if l.prevBlockNum == nil {
l.prevBlockNum = new(big.Int).Set(block.Number)
} else {
l.prevBlockNum.Add(l.prevBlockNum, getLogsBigOne)
}
filter := ethereum.FilterQuery{
FromBlock: l.prevBlockNum,
ToBlock: block.Number,
Addresses: []ethCommon.Address{l.ContractAddress()},
}
*l.prevBlockNum = *block.Number
tCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
logs, err := l.client.FilterLogs(tCtx, filter)
if err != nil {
logger.Error("GetLogsQuery: query of eth_getLogs failed",
zap.Stringer("FromBlock", filter.FromBlock),
zap.Stringer("ToBlock", filter.ToBlock),
zap.Error(err),
)
return fmt.Errorf("GetLogsQuery: failed to query for log messages: %w", err)
}
if len(logs) == 0 {
return nil
}
for _, log := range logs {
if log.Topics[0] != logsLogMessageTopic {
continue
}
ev, err := l.ParseLogMessagePublished(log)
if err != nil {
logger.Error("GetLogsQuery: failed to parse log entry",
zap.Stringer("FromBlock", filter.FromBlock),
zap.Stringer("ToBlock", filter.ToBlock),
zap.Error(err),
)
l.errFeed.Send(fmt.Errorf("failed to parse log message: %w", err))
continue
}
l.messageFeed.Send(ev)
}
return nil
}

View File

@ -0,0 +1,196 @@
package connectors
import (
"context"
"fmt"
"math/big"
"time"
"github.com/certusone/wormhole/node/pkg/supervisor"
ethEvent "github.com/ethereum/go-ethereum/event"
ethereum "github.com/ethereum/go-ethereum"
ethCommon "github.com/ethereum/go-ethereum/common"
ethHexUtils "github.com/ethereum/go-ethereum/common/hexutil"
"go.uber.org/zap"
)
type PollFinalizer interface {
IsBlockFinalized(ctx context.Context, block *NewBlock) (bool, error)
}
// BlockPollConnector polls for new blocks instead of subscribing when using SubscribeForBlocks. It allows to specify a
// finalizer which will be used to only return finalized blocks on subscriptions.
type BlockPollConnector struct {
Connector
Delay time.Duration
isEthPoS bool
hasEthSwitchedToPoS bool
finalizer PollFinalizer
blockFeed ethEvent.Feed
errFeed ethEvent.Feed
}
func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration, isEthPoS bool) (*BlockPollConnector, error) {
connector := &BlockPollConnector{
Connector: baseConnector,
Delay: delay,
isEthPoS: isEthPoS,
hasEthSwitchedToPoS: false,
finalizer: finalizer,
}
err := supervisor.Run(ctx, "blockPoller", connector.run)
if err != nil {
return nil, err
}
return connector, nil
}
func (b *BlockPollConnector) run(ctx context.Context) error {
logger := supervisor.Logger(ctx).With(zap.String("eth_network", b.Connector.NetworkName()))
lastBlock, err := b.getBlock(ctx, logger, nil)
if err != nil {
return err
}
timer := time.NewTimer(time.Millisecond) // Start immediately.
supervisor.Signal(ctx, supervisor.SignalHealthy)
for {
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
lastBlock, err = b.pollBlocks(ctx, logger, lastBlock)
if err != nil {
b.errFeed.Send("polling encountered an error")
}
timer.Reset(b.Delay)
}
}
}
func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, lastBlock *NewBlock) (lastPublishedBlock *NewBlock, retErr error) {
lastPublishedBlock = lastBlock
// Fetch the latest block on the chain
// We could do this on every iteration such that if a new block is created while this function is being executed,
// it would automatically fetch new blocks but in order to reduce API load this will be done on the next iteration.
latestBlock, err := b.getBlock(ctx, logger, nil)
if err != nil {
logger.Error("failed to look up latest block",
zap.Uint64("lastSeenBlock", lastBlock.Number.Uint64()), zap.Error(err))
return lastPublishedBlock, fmt.Errorf("failed to look up latest block: %w", err)
}
for {
if lastPublishedBlock.Number.Cmp(latestBlock.Number) >= 0 {
// We have to wait for a new block to become available
return
}
// Try to fetch the next block between lastBlock and latestBlock
nextBlockNumber := new(big.Int).Add(lastPublishedBlock.Number, big.NewInt(1))
block, err := b.getBlock(ctx, logger, nextBlockNumber)
if err != nil {
logger.Error("failed to fetch next block",
zap.Uint64("block", nextBlockNumber.Uint64()), zap.Error(err))
return lastPublishedBlock, fmt.Errorf("failed to fetch next block (%d): %w", nextBlockNumber.Uint64(), err)
}
if b.finalizer != nil {
finalized, err := b.finalizer.IsBlockFinalized(ctx, block)
if err != nil {
logger.Error("failed to check block finalization",
zap.Uint64("block", block.Number.Uint64()), zap.Error(err))
return lastPublishedBlock, fmt.Errorf("failed to check block finalization (%d): %w", block.Number.Uint64(), err)
}
if !finalized {
break
}
}
b.blockFeed.Send(block)
lastPublishedBlock = block
}
return
}
func (b *BlockPollConnector) SetEthSwitched() {
b.hasEthSwitchedToPoS = true
}
func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) {
sub := NewPollSubscription()
blockSub := b.blockFeed.Subscribe(sink)
// The feed library does not support error forwarding, so we're emulating that using a custom subscription and
// an error feed. The feed library can't handle interfaces which is why we post strings.
innerErrSink := make(chan string, 10)
innerErrSub := b.errFeed.Subscribe(innerErrSink)
go func() {
for {
select {
case <-ctx.Done():
blockSub.Unsubscribe()
innerErrSub.Unsubscribe()
return
case <-sub.quit:
blockSub.Unsubscribe()
innerErrSub.Unsubscribe()
sub.unsubDone <- struct{}{}
return
case v := <-innerErrSink:
sub.err <- fmt.Errorf(v)
}
}
}()
return sub, nil
}
func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, number *big.Int) (*NewBlock, error) {
var numStr string
if number != nil {
numStr = ethHexUtils.EncodeBig(number)
} else if b.hasEthSwitchedToPoS {
numStr = "finalized"
} else {
numStr = "latest"
}
type Marshaller struct {
Number *ethHexUtils.Big
Hash ethCommon.Hash `json:"hash"`
Difficulty *ethHexUtils.Big
}
var m Marshaller
err := b.Connector.RawCallContext(ctx, &m, "eth_getBlockByNumber", numStr, false)
if err != nil {
logger.Error("failed to get block",
zap.String("requested_block", numStr), zap.Error(err))
return nil, err
}
if m.Number == nil {
logger.Error("failed to unmarshal block",
zap.String("requested_block", numStr),
)
return nil, fmt.Errorf("failed to unmarshal block: Number is nil")
}
d := big.Int(*m.Difficulty)
if b.isEthPoS && !b.hasEthSwitchedToPoS && d.Cmp(big.NewInt(0)) == 0 {
logger.Info("switching from latest to finalized", zap.Duration("delay", b.Delay))
b.SetEthSwitched()
return b.getBlock(ctx, logger, number)
}
n := big.Int(*m.Number)
return &NewBlock{
Number: &n,
Hash: m.Hash,
}, nil
}

View File

@ -0,0 +1,19 @@
package finalizers
import (
"context"
"github.com/certusone/wormhole/node/pkg/evm/connectors"
)
// DefaultFinalizer assumes all blocks to be finalized.
type DefaultFinalizer struct {
}
func NewDefaultFinalizer() *DefaultFinalizer {
return &DefaultFinalizer{}
}
func (d *DefaultFinalizer) IsBlockFinalized(ctx context.Context, block *connectors.NewBlock) (bool, error) {
return true, nil
}

View File

@ -0,0 +1,35 @@
package finalizers
import (
"context"
"github.com/certusone/wormhole/node/pkg/evm/connectors"
"go.uber.org/zap"
)
// MoonbeamFinalizer implements the finality check for Moonbeam.
// Moonbeam can publish blocks before they are marked final. This means we need to sit on the block until a special "is finalized"
// query returns true. The assumption is that every block number will eventually be published and finalized, it's just that the contents
// of the block (and therefore the hash) might change if there is a rollback.
type MoonbeamFinalizer struct {
logger *zap.Logger
connector connectors.Connector
}
func NewMoonbeamFinalizer(logger *zap.Logger, connector connectors.Connector) *MoonbeamFinalizer {
return &MoonbeamFinalizer{
logger: logger,
connector: connector,
}
}
func (m *MoonbeamFinalizer) IsBlockFinalized(ctx context.Context, block *connectors.NewBlock) (bool, error) {
var finalized bool
err := m.connector.RawCallContext(ctx, &finalized, "moon_isBlockFinalized", block.Hash.Hex())
if err != nil {
m.logger.Error("failed to check for finality", zap.String("eth_network", m.connector.NetworkName()), zap.Error(err))
return false, err
}
return finalized, nil
}

View File

@ -1,4 +1,4 @@
package ethereum
package evm
import (
"github.com/ethereum/go-ethereum/common"

View File

@ -1,4 +1,4 @@
package ethereum
package evm
import (
"testing"

View File

@ -1,4 +1,4 @@
package ethereum
package evm
import (
"context"
@ -7,6 +7,10 @@ import (
"sync/atomic"
"time"
"github.com/certusone/wormhole/node/pkg/evm/connectors"
"github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi"
"github.com/certusone/wormhole/node/pkg/evm/finalizers"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/ethereum/go-ethereum/rpc"
@ -17,9 +21,7 @@ import (
eth_common "github.com/ethereum/go-ethereum/common"
"go.uber.org/zap"
"github.com/certusone/wormhole/node/pkg/celo"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/ethereum/abi"
"github.com/certusone/wormhole/node/pkg/readiness"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
@ -63,7 +65,7 @@ type (
Watcher struct {
// Ethereum RPC url
url string
// Address of the Eth contract contract
// Address of the Eth contract
contract eth_common.Address
// Human-readable name of the Eth network, for logging and monitoring.
networkName string
@ -100,8 +102,9 @@ type (
minConfirmations uint64
// Interface to the chain specific ethereum library.
ethIntf common.Ethish
ethConn connectors.Connector
shouldCheckSafeMode bool
unsafeDevMode bool
}
pendingKey struct {
@ -129,21 +132,6 @@ func NewEthWatcher(
obsvReqC chan *gossipv1.ObservationRequest,
unsafeDevMode bool) *Watcher {
var ethIntf common.Ethish
if chainID == vaa.ChainIDCelo && !unsafeDevMode {
// When we are running in mainnet or testnet, we need to use the Celo ethereum library rather than go-ethereum.
// However, in devnet, we currently run the standard ETH node for Celo, so we need to use the standard go-ethereum.
ethIntf = &celo.CeloImpl{NetworkName: networkName}
} else if chainID == vaa.ChainIDEthereum && !unsafeDevMode {
ethIntf = &PollImpl{BaseEth: EthImpl{NetworkName: networkName}, DelayInMs: 250, IsEthPoS: true}
} else if chainID == vaa.ChainIDMoonbeam && !unsafeDevMode {
ethIntf = &PollImpl{BaseEth: EthImpl{NetworkName: networkName}, Finalizer: &MoonbeamFinalizer{}, DelayInMs: 250}
} else if chainID == vaa.ChainIDNeon {
ethIntf = NewGetLogsImpl(networkName, contract, 250)
} else {
ethIntf = &EthImpl{NetworkName: networkName}
}
return &Watcher{
url: url,
contract: contract,
@ -155,42 +143,91 @@ func NewEthWatcher(
setChan: setEvents,
obsvReqC: obsvReqC,
pending: map[pendingKey]*pendingMessage{},
ethIntf: ethIntf,
shouldCheckSafeMode: (chainID == vaa.ChainIDKarura || chainID == vaa.ChainIDAcala) && (!unsafeDevMode)}
shouldCheckSafeMode: (chainID == vaa.ChainIDKarura || chainID == vaa.ChainIDAcala) && (!unsafeDevMode),
unsafeDevMode: unsafeDevMode,
}
}
func (e *Watcher) Run(ctx context.Context) error {
func (w *Watcher) Run(ctx context.Context) error {
logger := supervisor.Logger(ctx)
e.ethIntf.SetLogger(logger)
if e.shouldCheckSafeMode {
if err := e.checkForSafeMode(ctx); err != nil {
if w.shouldCheckSafeMode {
if err := w.checkForSafeMode(ctx); err != nil {
return err
}
}
// Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing)
p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{
ContractAddress: e.contract.Hex(),
p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{
ContractAddress: w.contract.Hex(),
})
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
err := e.ethIntf.DialContext(timeout, e.url)
var err error
if w.chainID == vaa.ChainIDCelo && !w.unsafeDevMode {
// When we are running in mainnet or testnet, we need to use the Celo ethereum library rather than go-ethereum.
// However, in devnet, we currently run the standard ETH node for Celo, so we need to use the standard go-ethereum.
w.ethConn, err = connectors.NewCeloConnector(timeout, w.networkName, w.url, w.contract, logger)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("dialing eth client failed: %w", err)
}
err = e.ethIntf.NewAbiFilterer(e.contract)
} else if w.chainID == vaa.ChainIDEthereum && !w.unsafeDevMode {
baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
if err != nil {
return fmt.Errorf("could not create wormhole contract filter: %w", err)
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("dialing eth client failed: %w", err)
}
err = e.ethIntf.NewAbiCaller(e.contract)
w.ethConn, err = connectors.NewBlockPollConnector(ctx, baseConnector, finalizers.NewDefaultFinalizer(), 250*time.Millisecond, true)
if err != nil {
panic(err)
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("creating block poll connector failed: %w", err)
}
} else if w.chainID == vaa.ChainIDMoonbeam && !w.unsafeDevMode {
baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("dialing eth client failed: %w", err)
}
finalizer := finalizers.NewMoonbeamFinalizer(logger, baseConnector)
w.ethConn, err = connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false)
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("creating block poll connector failed: %w", err)
}
} else if w.chainID == vaa.ChainIDNeon {
baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("dialing eth client failed: %w", err)
}
pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizers.NewDefaultFinalizer(), 250*time.Millisecond, false)
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("creating block poll connector failed: %w", err)
}
w.ethConn, err = connectors.NewLogPollConnector(ctx, pollConnector, baseConnector.Client())
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("creating poll connector failed: %w", err)
}
} else {
w.ethConn, err = connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger)
if err != nil {
ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("dialing eth client failed: %w", err)
}
}
// Timeout for initializing subscriptions
@ -198,16 +235,17 @@ func (e *Watcher) Run(ctx context.Context) error {
defer cancel()
// Subscribe to new message publications
messageC := make(chan *abi.AbiLogMessagePublished, 2)
messageSub, err := e.ethIntf.WatchLogMessagePublished(ctx, timeout, messageC)
messageC := make(chan *ethabi.AbiLogMessagePublished, 2)
messageSub, err := w.ethConn.WatchLogMessagePublished(timeout, messageC)
defer messageSub.Unsubscribe()
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "subscribe_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
ethConnectionErrors.WithLabelValues(w.networkName, "subscribe_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("failed to subscribe to message publication events: %w", err)
}
// Fetch initial guardian set
if err := e.fetchAndUpdateGuardianSet(logger, ctx, e.ethIntf); err != nil {
if err := w.fetchAndUpdateGuardianSet(logger, ctx, w.ethConn); err != nil {
return fmt.Errorf("failed to request guardian set: %v", err)
}
@ -222,7 +260,7 @@ func (e *Watcher) Run(ctx context.Context) error {
case <-ctx.Done():
return
case <-t.C:
if err := e.fetchAndUpdateGuardianSet(logger, ctx, e.ethIntf); err != nil {
if err := w.fetchAndUpdateGuardianSet(logger, ctx, w.ethConn); err != nil {
errC <- fmt.Errorf("failed to request guardian set: %v", err)
return
}
@ -239,16 +277,16 @@ func (e *Watcher) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return
case r := <-e.obsvReqC:
case r := <-w.obsvReqC:
// This can't happen unless there is a programming error - the caller
// is expected to send us only requests for our chainID.
if vaa.ChainID(r.ChainId) != e.chainID {
if vaa.ChainID(r.ChainId) != w.chainID {
panic("invalid chain ID")
}
tx := eth_common.BytesToHash(r.TxHash)
logger.Info("received observation request",
zap.String("eth_network", e.networkName),
zap.String("eth_network", w.networkName),
zap.String("tx_hash", tx.Hex()))
// SECURITY: Load the block number before requesting the transaction to avoid a
@ -261,24 +299,24 @@ func (e *Watcher) Run(ctx context.Context) error {
blockNumberU := atomic.LoadUint64(&currentBlockNumber)
if blockNumberU == 0 {
logger.Error("no block number available, ignoring observation request",
zap.String("eth_network", e.networkName))
zap.String("eth_network", w.networkName))
continue
}
timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
blockNumber, msgs, err := MessageEventsForTransaction(timeout, e.ethIntf, e.contract, e.chainID, tx)
blockNumber, msgs, err := MessageEventsForTransaction(timeout, w.ethConn, w.contract, w.chainID, tx)
cancel()
if err != nil {
logger.Error("failed to process observation request",
zap.Error(err), zap.String("eth_network", e.networkName))
zap.Error(err), zap.String("eth_network", w.networkName))
continue
}
for _, msg := range msgs {
expectedConfirmations := uint64(msg.ConsistencyLevel)
if expectedConfirmations < e.minConfirmations {
expectedConfirmations = e.minConfirmations
if expectedConfirmations < w.minConfirmations {
expectedConfirmations = w.minConfirmations
}
// SECURITY: In the recovery flow, we already know which transaction to
@ -298,9 +336,9 @@ func (e *Watcher) Run(ctx context.Context) error {
zap.Uint64("sequence", msg.Sequence),
zap.Uint64("current_block", blockNumberU),
zap.Uint64("observed_block", blockNumber),
zap.String("eth_network", e.networkName),
zap.String("eth_network", w.networkName),
)
e.msgChan <- msg
w.msgChan <- msg
} else {
logger.Info("ignoring re-observed message publication transaction",
zap.Stringer("tx", msg.TxHash),
@ -309,7 +347,7 @@ func (e *Watcher) Run(ctx context.Context) error {
zap.Uint64("current_block", blockNumberU),
zap.Uint64("observed_block", blockNumber),
zap.Uint64("expected_confirmations", expectedConfirmations),
zap.String("eth_network", e.networkName),
zap.String("eth_network", w.networkName),
)
}
}
@ -323,21 +361,21 @@ func (e *Watcher) Run(ctx context.Context) error {
case <-ctx.Done():
return
case err := <-messageSub.Err():
ethConnectionErrors.WithLabelValues(e.networkName, "subscription_error").Inc()
ethConnectionErrors.WithLabelValues(w.networkName, "subscription_error").Inc()
errC <- fmt.Errorf("error while processing message publication subscription: %w", err)
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return
case ev := <-messageC:
// Request timestamp for block
msm := time.Now()
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
blockTime, err := e.ethIntf.TimeOfBlockByHash(timeout, ev.Raw.BlockHash)
blockTime, err := w.ethConn.TimeOfBlockByHash(timeout, ev.Raw.BlockHash)
cancel()
queryLatency.WithLabelValues(e.networkName, "block_by_number").Observe(time.Since(msm).Seconds())
queryLatency.WithLabelValues(w.networkName, "block_by_number").Observe(time.Since(msm).Seconds())
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "block_by_number_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
ethConnectionErrors.WithLabelValues(w.networkName, "block_by_number_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w",
ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err)
return
@ -348,7 +386,7 @@ func (e *Watcher) Run(ctx context.Context) error {
Timestamp: time.Unix(int64(blockTime), 0),
Nonce: ev.Nonce,
Sequence: ev.Sequence,
EmitterChain: e.chainID,
EmitterChain: w.chainID,
EmitterAddress: PadAddress(ev.Sender),
Payload: ev.Payload,
ConsistencyLevel: ev.ConsistencyLevel,
@ -361,9 +399,9 @@ func (e *Watcher) Run(ctx context.Context) error {
zap.Uint64("Sequence", ev.Sequence),
zap.Uint32("Nonce", ev.Nonce),
zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel),
zap.String("eth_network", e.networkName))
zap.String("eth_network", w.networkName))
ethMessagesObserved.WithLabelValues(e.networkName).Inc()
ethMessagesObserved.WithLabelValues(w.networkName).Inc()
key := pendingKey{
TxHash: message.TxHash,
@ -372,22 +410,22 @@ func (e *Watcher) Run(ctx context.Context) error {
Sequence: message.Sequence,
}
e.pendingMu.Lock()
e.pending[key] = &pendingMessage{
w.pendingMu.Lock()
w.pending[key] = &pendingMessage{
message: message,
height: ev.Raw.BlockNumber,
}
e.pendingMu.Unlock()
w.pendingMu.Unlock()
}
}
}()
// Watch headers
headSink := make(chan *common.NewBlock, 2)
headerSubscription, err := e.ethIntf.SubscribeForBlocks(ctx, headSink)
headSink := make(chan *connectors.NewBlock, 2)
headerSubscription, err := w.ethConn.SubscribeForBlocks(ctx, headSink)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "header_subscribe_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
ethConnectionErrors.WithLabelValues(w.networkName, "header_subscribe_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return fmt.Errorf("failed to subscribe to header events: %w", err)
}
@ -397,18 +435,18 @@ func (e *Watcher) Run(ctx context.Context) error {
case <-ctx.Done():
return
case err := <-headerSubscription.Err():
ethConnectionErrors.WithLabelValues(e.networkName, "header_subscription_error").Inc()
ethConnectionErrors.WithLabelValues(w.networkName, "header_subscription_error").Inc()
errC <- fmt.Errorf("error while processing header subscription: %w", err)
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return
case ev := <-headSink:
// These two pointers should have been checked before the event was placed on the channel, but just being safe.
if ev == nil {
logger.Error("new header event is nil", zap.String("eth_network", e.networkName))
logger.Error("new header event is nil", zap.String("eth_network", w.networkName))
continue
}
if ev.Number == nil {
logger.Error("new header block number is nil", zap.String("eth_network", e.networkName))
logger.Error("new header block number is nil", zap.String("eth_network", w.networkName))
continue
}
@ -417,23 +455,23 @@ func (e *Watcher) Run(ctx context.Context) error {
logger.Info("processing new header",
zap.Stringer("current_block", ev.Number),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", e.networkName))
currentEthHeight.WithLabelValues(e.networkName).Set(float64(ev.Number.Int64()))
readiness.SetReady(e.readiness)
p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{
zap.String("eth_network", w.networkName))
currentEthHeight.WithLabelValues(w.networkName).Set(float64(ev.Number.Int64()))
readiness.SetReady(w.readiness)
p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{
Height: ev.Number.Int64(),
ContractAddress: e.contract.Hex(),
ContractAddress: w.contract.Hex(),
})
e.pendingMu.Lock()
w.pendingMu.Lock()
blockNumberU := ev.Number.Uint64()
atomic.StoreUint64(&currentBlockNumber, blockNumberU)
for key, pLock := range e.pending {
for key, pLock := range w.pending {
expectedConfirmations := uint64(pLock.message.ConsistencyLevel)
if expectedConfirmations < e.minConfirmations {
expectedConfirmations = e.minConfirmations
if expectedConfirmations < w.minConfirmations {
expectedConfirmations = w.minConfirmations
}
// Transaction was dropped and never picked up again
@ -445,17 +483,17 @@ func (e *Watcher) Run(ctx context.Context) error {
zap.Uint64("sequence", key.Sequence),
zap.Stringer("current_block", ev.Number),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", e.networkName),
zap.String("eth_network", w.networkName),
)
ethMessagesOrphaned.WithLabelValues(e.networkName, "timeout").Inc()
delete(e.pending, key)
ethMessagesOrphaned.WithLabelValues(w.networkName, "timeout").Inc()
delete(w.pending, key)
continue
}
// Transaction is now ready
if pLock.height+uint64(expectedConfirmations) <= blockNumberU {
timeout, cancel := context.WithTimeout(ctx, 5*time.Second)
tx, err := e.ethIntf.TransactionReceipt(timeout, pLock.message.TxHash)
tx, err := w.ethConn.TransactionReceipt(timeout, pLock.message.TxHash)
cancel()
// If the node returns an error after waiting expectedConfirmation blocks,
@ -474,10 +512,10 @@ func (e *Watcher) Run(ctx context.Context) error {
zap.Uint64("sequence", key.Sequence),
zap.Stringer("current_block", ev.Number),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", e.networkName),
zap.String("eth_network", w.networkName),
zap.Error(err))
delete(e.pending, key)
ethMessagesOrphaned.WithLabelValues(e.networkName, "not_found").Inc()
delete(w.pending, key)
ethMessagesOrphaned.WithLabelValues(w.networkName, "not_found").Inc()
continue
}
@ -492,10 +530,10 @@ func (e *Watcher) Run(ctx context.Context) error {
zap.Uint64("sequence", key.Sequence),
zap.Stringer("current_block", ev.Number),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", e.networkName),
zap.String("eth_network", w.networkName),
zap.Error(err))
delete(e.pending, key)
ethMessagesOrphaned.WithLabelValues(e.networkName, "tx_failed").Inc()
delete(w.pending, key)
ethMessagesOrphaned.WithLabelValues(w.networkName, "tx_failed").Inc()
continue
}
@ -508,7 +546,7 @@ func (e *Watcher) Run(ctx context.Context) error {
zap.Uint64("sequence", key.Sequence),
zap.Stringer("current_block", ev.Number),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", e.networkName),
zap.String("eth_network", w.networkName),
zap.Error(err))
continue
}
@ -524,9 +562,9 @@ func (e *Watcher) Run(ctx context.Context) error {
zap.Uint64("sequence", key.Sequence),
zap.Stringer("current_block", ev.Number),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", e.networkName))
delete(e.pending, key)
ethMessagesOrphaned.WithLabelValues(e.networkName, "blockhash_mismatch").Inc()
zap.String("eth_network", w.networkName))
delete(w.pending, key)
ethMessagesOrphaned.WithLabelValues(w.networkName, "blockhash_mismatch").Inc()
continue
}
@ -537,19 +575,19 @@ func (e *Watcher) Run(ctx context.Context) error {
zap.Uint64("sequence", key.Sequence),
zap.Stringer("current_block", ev.Number),
zap.Stringer("current_blockhash", currentHash),
zap.String("eth_network", e.networkName))
delete(e.pending, key)
e.msgChan <- pLock.message
ethMessagesConfirmed.WithLabelValues(e.networkName).Inc()
zap.String("eth_network", w.networkName))
delete(w.pending, key)
w.msgChan <- pLock.message
ethMessagesConfirmed.WithLabelValues(w.networkName).Inc()
}
}
e.pendingMu.Unlock()
w.pendingMu.Unlock()
logger.Info("processed new header",
zap.Stringer("current_block", ev.Number),
zap.Stringer("current_blockhash", currentHash),
zap.Duration("took", time.Since(start)),
zap.String("eth_network", e.networkName))
zap.String("eth_network", w.networkName))
}
}
}()
@ -562,36 +600,36 @@ func (e *Watcher) Run(ctx context.Context) error {
}
}
func (e *Watcher) fetchAndUpdateGuardianSet(
func (w *Watcher) fetchAndUpdateGuardianSet(
logger *zap.Logger,
ctx context.Context,
ethIntf common.Ethish,
ethConn connectors.Connector,
) error {
msm := time.Now()
logger.Info("fetching guardian set")
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
idx, gs, err := fetchCurrentGuardianSet(timeout, ethIntf)
idx, gs, err := fetchCurrentGuardianSet(timeout, ethConn)
if err != nil {
ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc()
p2p.DefaultRegistry.AddErrorCount(e.chainID, 1)
ethConnectionErrors.WithLabelValues(w.networkName, "guardian_set_fetch_error").Inc()
p2p.DefaultRegistry.AddErrorCount(w.chainID, 1)
return err
}
queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())
queryLatency.WithLabelValues(w.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds())
if e.currentGuardianSet != nil && *(e.currentGuardianSet) == idx {
if w.currentGuardianSet != nil && *(w.currentGuardianSet) == idx {
return nil
}
logger.Info("updated guardian set found",
zap.Any("value", gs), zap.Uint32("index", idx),
zap.String("eth_network", e.networkName))
zap.String("eth_network", w.networkName))
e.currentGuardianSet = &idx
w.currentGuardianSet = &idx
if e.setChan != nil {
e.setChan <- &common.GuardianSet{
if w.setChan != nil {
w.setChan <- &common.GuardianSet{
Keys: gs.Keys,
Index: idx,
}
@ -601,13 +639,13 @@ func (e *Watcher) fetchAndUpdateGuardianSet(
}
// Fetch the current guardian set ID and guardian set from the chain.
func fetchCurrentGuardianSet(ctx context.Context, ethIntf common.Ethish) (uint32, *abi.StructsGuardianSet, error) {
currentIndex, err := ethIntf.GetCurrentGuardianSetIndex(ctx)
func fetchCurrentGuardianSet(ctx context.Context, ethConn connectors.Connector) (uint32, *ethabi.StructsGuardianSet, error) {
currentIndex, err := ethConn.GetCurrentGuardianSetIndex(ctx)
if err != nil {
return 0, nil, fmt.Errorf("error requesting current guardian set index: %w", err)
}
gs, err := ethIntf.GetGuardianSet(ctx, currentIndex)
gs, err := ethConn.GetGuardianSet(ctx, currentIndex)
if err != nil {
return 0, nil, fmt.Errorf("error requesting current guardian set value: %w", err)
}
@ -615,23 +653,23 @@ func fetchCurrentGuardianSet(ctx context.Context, ethIntf common.Ethish) (uint32
return currentIndex, &gs, nil
}
func (e *Watcher) checkForSafeMode(ctx context.Context) error {
func (w *Watcher) checkForSafeMode(ctx context.Context) error {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
c, err := rpc.DialContext(timeout, e.url)
c, err := rpc.DialContext(timeout, w.url)
if err != nil {
return fmt.Errorf("failed to connect to url %s to check for safe mode: %w", e.url, err)
return fmt.Errorf("failed to connect to url %s to check for safe mode: %w", w.url, err)
}
var safe bool
err = c.CallContext(ctx, &safe, "net_isSafeMode")
if err != nil {
return fmt.Errorf("check for safe mode for url %s failed: %w", e.url, err)
return fmt.Errorf("check for safe mode for url %s failed: %w", w.url, err)
}
if !safe {
return fmt.Errorf("url %s is not using safe mode", e.url)
return fmt.Errorf("url %s is not using safe mode", w.url)
}
return nil