diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 29c496a2c..03cde0298 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -11,6 +11,8 @@ import ( "path" "strings" + "github.com/certusone/wormhole/node/pkg/evm" + "github.com/benbjohnson/clock" "github.com/certusone/wormhole/node/pkg/db" "github.com/certusone/wormhole/node/pkg/notify/discord" @@ -25,7 +27,6 @@ import ( "github.com/certusone/wormhole/node/pkg/common" "github.com/certusone/wormhole/node/pkg/devnet" - "github.com/certusone/wormhole/node/pkg/ethereum" "github.com/certusone/wormhole/node/pkg/governor" "github.com/certusone/wormhole/node/pkg/p2p" "github.com/certusone/wormhole/node/pkg/processor" @@ -915,12 +916,12 @@ func runNode(cmd *cobra.Command, args []string) { } if err := supervisor.Run(ctx, "ethwatch", - ethereum.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*ethRPC, ethContractAddr, "eth", common.ReadinessEthSyncing, vaa.ChainIDEthereum, lockC, setC, 1, chainObsvReqC[vaa.ChainIDEthereum], *unsafeDevMode).Run); err != nil { return err } if err := supervisor.Run(ctx, "bscwatch", - ethereum.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*bscRPC, bscContractAddr, "bsc", common.ReadinessBSCSyncing, vaa.ChainIDBSC, lockC, nil, 1, chainObsvReqC[vaa.ChainIDBSC], *unsafeDevMode).Run); err != nil { return err } @@ -930,7 +931,7 @@ func runNode(cmd *cobra.Command, args []string) { } if err := supervisor.Run(ctx, "polygonwatch", - ethereum.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*polygonRPC, polygonContractAddr, "polygon", common.ReadinessPolygonSyncing, vaa.ChainIDPolygon, lockC, nil, polygonMinConfirmations, chainObsvReqC[vaa.ChainIDPolygon], *unsafeDevMode).Run); err != nil { // Special case: Polygon can fork like PoW Ethereum, and it's not clear what the safe number of blocks is // // Hardcode the minimum number of confirmations to 512 regardless of what the smart contract specifies to protect @@ -939,49 +940,49 @@ func runNode(cmd *cobra.Command, args []string) { return err } if err := supervisor.Run(ctx, "avalanchewatch", - ethereum.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*avalancheRPC, avalancheContractAddr, "avalanche", common.ReadinessAvalancheSyncing, vaa.ChainIDAvalanche, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAvalanche], *unsafeDevMode).Run); err != nil { return err } if err := supervisor.Run(ctx, "oasiswatch", - ethereum.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*oasisRPC, oasisContractAddr, "oasis", common.ReadinessOasisSyncing, vaa.ChainIDOasis, lockC, nil, 1, chainObsvReqC[vaa.ChainIDOasis], *unsafeDevMode).Run); err != nil { return err } if err := supervisor.Run(ctx, "aurorawatch", - ethereum.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*auroraRPC, auroraContractAddr, "aurora", common.ReadinessAuroraSyncing, vaa.ChainIDAurora, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAurora], *unsafeDevMode).Run); err != nil { return err } if err := supervisor.Run(ctx, "fantomwatch", - ethereum.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*fantomRPC, fantomContractAddr, "fantom", common.ReadinessFantomSyncing, vaa.ChainIDFantom, lockC, nil, 1, chainObsvReqC[vaa.ChainIDFantom], *unsafeDevMode).Run); err != nil { return err } if err := supervisor.Run(ctx, "karurawatch", - ethereum.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*karuraRPC, karuraContractAddr, "karura", common.ReadinessKaruraSyncing, vaa.ChainIDKarura, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKarura], *unsafeDevMode).Run); err != nil { return err } if err := supervisor.Run(ctx, "acalawatch", - ethereum.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*acalaRPC, acalaContractAddr, "acala", common.ReadinessAcalaSyncing, vaa.ChainIDAcala, lockC, nil, 1, chainObsvReqC[vaa.ChainIDAcala], *unsafeDevMode).Run); err != nil { return err } if err := supervisor.Run(ctx, "klaytnwatch", - ethereum.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*klaytnRPC, klaytnContractAddr, "klaytn", common.ReadinessKlaytnSyncing, vaa.ChainIDKlaytn, lockC, nil, 1, chainObsvReqC[vaa.ChainIDKlaytn], *unsafeDevMode).Run); err != nil { return err } if err := supervisor.Run(ctx, "celowatch", - ethereum.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*celoRPC, celoContractAddr, "celo", common.ReadinessCeloSyncing, vaa.ChainIDCelo, lockC, nil, 1, chainObsvReqC[vaa.ChainIDCelo], *unsafeDevMode).Run); err != nil { return err } if err := supervisor.Run(ctx, "moonbeamwatch", - ethereum.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*moonbeamRPC, moonbeamContractAddr, "moonbeam", common.ReadinessMoonbeamSyncing, vaa.ChainIDMoonbeam, lockC, nil, 1, chainObsvReqC[vaa.ChainIDMoonbeam], *unsafeDevMode).Run); err != nil { return err } if *testnetMode { if err := supervisor.Run(ctx, "ethropstenwatch", - ethereum.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*ethRopstenRPC, ethRopstenContractAddr, "ethropsten", common.ReadinessEthRopstenSyncing, vaa.ChainIDEthereumRopsten, lockC, nil, 1, chainObsvReqC[vaa.ChainIDEthereumRopsten], *unsafeDevMode).Run); err != nil { return err } if err := supervisor.Run(ctx, "neonwatch", - ethereum.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode).Run); err != nil { + evm.NewEthWatcher(*neonRPC, neonContractAddr, "neon", common.ReadinessNeonSyncing, vaa.ChainIDNeon, lockC, nil, 32, chainObsvReqC[vaa.ChainIDNeon], *unsafeDevMode).Run); err != nil { return err } } diff --git a/node/hack/parse_eth_tx/parse_eth_tx.go b/node/hack/parse_eth_tx/parse_eth_tx.go index 689c8355d..a2145cf79 100644 --- a/node/hack/parse_eth_tx/parse_eth_tx.go +++ b/node/hack/parse_eth_tx/parse_eth_tx.go @@ -10,9 +10,10 @@ import ( "flag" "log" - "github.com/certusone/wormhole/node/pkg/celo" - "github.com/certusone/wormhole/node/pkg/common" - "github.com/certusone/wormhole/node/pkg/ethereum" + "github.com/certusone/wormhole/node/pkg/evm" + "github.com/certusone/wormhole/node/pkg/evm/connectors" + "go.uber.org/zap" + ethCommon "github.com/ethereum/go-ethereum/common" "github.com/wormhole-foundation/wormhole/sdk/vaa" ) @@ -34,27 +35,25 @@ func main() { ctx := context.Background() - var ethIntf common.Ethish - if chainID == vaa.ChainIDCelo { - ethIntf = &celo.CeloImpl{NetworkName: "celo"} - } else { - ethIntf = ðereum.EthImpl{NetworkName: "eth"} - } - - err := ethIntf.DialContext(ctx, *flagEthRPC) - if err != nil { - log.Fatal(err) - } - contractAddr := ethCommon.HexToAddress(*flagContractAddr) + + var ethIntf connectors.Connector + var err error + if chainID == vaa.ChainIDCelo { + ethIntf, err = connectors.NewCeloConnector(ctx, "", *flagEthRPC, contractAddr, zap.L()) + if err != nil { + log.Fatalf("dialing eth client failed: %v", err) + } + } else { + ethIntf, err = connectors.NewEthereumConnector(ctx, "", *flagEthRPC, contractAddr, zap.L()) + if err != nil { + log.Fatalf("dialing eth client failed: %v", err) + } + } + transactionHash := ethCommon.HexToHash(*flagTx) - err = ethIntf.NewAbiFilterer(contractAddr) - if err != nil { - log.Fatal(err) - } - - block, msgs, err := ethereum.MessageEventsForTransaction(ctx, ethIntf, contractAddr, chainID, transactionHash) + block, msgs, err := evm.MessageEventsForTransaction(ctx, ethIntf, contractAddr, chainID, transactionHash) if err != nil { log.Fatal(err) } diff --git a/node/hack/repair_eth/repair_eth.go b/node/hack/repair_eth/repair_eth.go index 54814c61e..7d5fc65a6 100644 --- a/node/hack/repair_eth/repair_eth.go +++ b/node/hack/repair_eth/repair_eth.go @@ -14,8 +14,9 @@ import ( "strings" "time" + "github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi" + "github.com/certusone/wormhole/node/pkg/db" - "github.com/certusone/wormhole/node/pkg/ethereum/abi" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1" abi2 "github.com/ethereum/go-ethereum/accounts/abi" @@ -362,7 +363,7 @@ func main() { Timeout: 5 * time.Second, } - ethAbi, err := abi2.JSON(strings.NewReader(abi.AbiABI)) + ethAbi, err := abi2.JSON(strings.NewReader(ethabi.AbiABI)) if err != nil { log.Fatalf("failed to parse Eth ABI: %v", err) } diff --git a/node/pkg/common/ethish.go b/node/pkg/common/ethish.go deleted file mode 100644 index 119992b59..000000000 --- a/node/pkg/common/ethish.go +++ /dev/null @@ -1,37 +0,0 @@ -// This specifies the interface to the chain specific Eth / EVM libraries. -// This interface should be implemented for each chain that has a unique go-ethereum or "go-ethereum-ish" library. - -package common - -import ( - "context" - "math/big" - - ethereum "github.com/ethereum/go-ethereum" - ethCommon "github.com/ethereum/go-ethereum/common" - ethTypes "github.com/ethereum/go-ethereum/core/types" - ethEvent "github.com/ethereum/go-ethereum/event" - - ethAbi "github.com/certusone/wormhole/node/pkg/ethereum/abi" - - "go.uber.org/zap" -) - -type NewBlock struct { - Number *big.Int - Hash ethCommon.Hash -} - -type Ethish interface { - SetLogger(l *zap.Logger) - DialContext(ctx context.Context, rawurl string) error - NewAbiFilterer(address ethCommon.Address) error - NewAbiCaller(address ethCommon.Address) error - GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) - GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) - WatchLogMessagePublished(ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) - TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) - TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) - ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) - SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) -} diff --git a/node/pkg/devnet/guardianset_vaa.go b/node/pkg/devnet/guardianset_vaa.go index 18c3dbce4..e15343c99 100644 --- a/node/pkg/devnet/guardianset_vaa.go +++ b/node/pkg/devnet/guardianset_vaa.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi" + "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -12,7 +14,6 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "go.uber.org/zap" - "github.com/certusone/wormhole/node/pkg/ethereum/abi" "github.com/certusone/wormhole/node/pkg/supervisor" "github.com/wormhole-foundation/wormhole/sdk/vaa" ) @@ -55,7 +56,7 @@ func SubmitVAA(ctx context.Context, rpcURL string, vaa *vaa.VAA) (*types.Transac } kt := GetKeyedTransactor(ctx) - contract, err := abi.NewAbi(GanacheWormholeContractAddress, c) + contract, err := ethabi.NewAbi(GanacheWormholeContractAddress, c) if err != nil { panic(err) } diff --git a/node/pkg/ethereum/erc20/abi.go b/node/pkg/ethereum/erc20/abi.go deleted file mode 100644 index d8bd1c7ea..000000000 --- a/node/pkg/ethereum/erc20/abi.go +++ /dev/null @@ -1,769 +0,0 @@ -// Code generated - DO NOT EDIT. -// This file is a generated binding and any manual changes will be lost. - -package erc20 - -import ( - "math/big" - "strings" - - ethereum "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts/abi" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/event" -) - -// Reference imports to suppress errors if they are not otherwise used. -var ( - _ = big.NewInt - _ = strings.NewReader - _ = ethereum.NotFound - _ = bind.Bind - _ = common.Big1 - _ = types.BloomLookup - _ = event.NewSubscription -) - -// Erc20ABI is the input ABI used to generate the binding from. -const Erc20ABI = "[{\"inputs\":[{\"internalType\":\"string\",\"name\":\"name\",\"type\":\"string\"},{\"internalType\":\"string\",\"name\":\"symbol\",\"type\":\"string\"}],\"stateMutability\":\"nonpayable\",\"type\":\"constructor\"},{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"internalType\":\"address\",\"name\":\"owner\",\"type\":\"address\"},{\"indexed\":true,\"internalType\":\"address\",\"name\":\"spender\",\"type\":\"address\"},{\"indexed\":false,\"internalType\":\"uint256\",\"name\":\"value\",\"type\":\"uint256\"}],\"name\":\"Approval\",\"type\":\"event\"},{\"anonymous\":false,\"inputs\":[{\"indexed\":true,\"internalType\":\"address\",\"name\":\"from\",\"type\":\"address\"},{\"indexed\":true,\"internalType\":\"address\",\"name\":\"to\",\"type\":\"address\"},{\"indexed\":false,\"internalType\":\"uint256\",\"name\":\"value\",\"type\":\"uint256\"}],\"name\":\"Transfer\",\"type\":\"event\"},{\"inputs\":[],\"name\":\"name\",\"outputs\":[{\"internalType\":\"string\",\"name\":\"\",\"type\":\"string\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[],\"name\":\"symbol\",\"outputs\":[{\"internalType\":\"string\",\"name\":\"\",\"type\":\"string\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[],\"name\":\"decimals\",\"outputs\":[{\"internalType\":\"uint8\",\"name\":\"\",\"type\":\"uint8\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[],\"name\":\"totalSupply\",\"outputs\":[{\"internalType\":\"uint256\",\"name\":\"\",\"type\":\"uint256\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"account\",\"type\":\"address\"}],\"name\":\"balanceOf\",\"outputs\":[{\"internalType\":\"uint256\",\"name\":\"\",\"type\":\"uint256\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"recipient\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"amount\",\"type\":\"uint256\"}],\"name\":\"transfer\",\"outputs\":[{\"internalType\":\"bool\",\"name\":\"\",\"type\":\"bool\"}],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"owner\",\"type\":\"address\"},{\"internalType\":\"address\",\"name\":\"spender\",\"type\":\"address\"}],\"name\":\"allowance\",\"outputs\":[{\"internalType\":\"uint256\",\"name\":\"\",\"type\":\"uint256\"}],\"stateMutability\":\"view\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"spender\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"amount\",\"type\":\"uint256\"}],\"name\":\"approve\",\"outputs\":[{\"internalType\":\"bool\",\"name\":\"\",\"type\":\"bool\"}],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"sender\",\"type\":\"address\"},{\"internalType\":\"address\",\"name\":\"recipient\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"amount\",\"type\":\"uint256\"}],\"name\":\"transferFrom\",\"outputs\":[{\"internalType\":\"bool\",\"name\":\"\",\"type\":\"bool\"}],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"spender\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"addedValue\",\"type\":\"uint256\"}],\"name\":\"increaseAllowance\",\"outputs\":[{\"internalType\":\"bool\",\"name\":\"\",\"type\":\"bool\"}],\"stateMutability\":\"nonpayable\",\"type\":\"function\"},{\"inputs\":[{\"internalType\":\"address\",\"name\":\"spender\",\"type\":\"address\"},{\"internalType\":\"uint256\",\"name\":\"subtractedValue\",\"type\":\"uint256\"}],\"name\":\"decreaseAllowance\",\"outputs\":[{\"internalType\":\"bool\",\"name\":\"\",\"type\":\"bool\"}],\"stateMutability\":\"nonpayable\",\"type\":\"function\"}]" - -// Erc20 is an auto generated Go binding around an Ethereum contract. -type Erc20 struct { - Erc20Caller // Read-only binding to the contract - Erc20Transactor // Write-only binding to the contract - Erc20Filterer // Log filterer for contract events -} - -// Erc20Caller is an auto generated read-only Go binding around an Ethereum contract. -type Erc20Caller struct { - contract *bind.BoundContract // Generic contract wrapper for the low level calls -} - -// Erc20Transactor is an auto generated write-only Go binding around an Ethereum contract. -type Erc20Transactor struct { - contract *bind.BoundContract // Generic contract wrapper for the low level calls -} - -// Erc20Filterer is an auto generated log filtering Go binding around an Ethereum contract events. -type Erc20Filterer struct { - contract *bind.BoundContract // Generic contract wrapper for the low level calls -} - -// Erc20Session is an auto generated Go binding around an Ethereum contract, -// with pre-set call and transact options. -type Erc20Session struct { - Contract *Erc20 // Generic contract binding to set the session for - CallOpts bind.CallOpts // Call options to use throughout this session - TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session -} - -// Erc20CallerSession is an auto generated read-only Go binding around an Ethereum contract, -// with pre-set call options. -type Erc20CallerSession struct { - Contract *Erc20Caller // Generic contract caller binding to set the session for - CallOpts bind.CallOpts // Call options to use throughout this session -} - -// Erc20TransactorSession is an auto generated write-only Go binding around an Ethereum contract, -// with pre-set transact options. -type Erc20TransactorSession struct { - Contract *Erc20Transactor // Generic contract transactor binding to set the session for - TransactOpts bind.TransactOpts // Transaction auth options to use throughout this session -} - -// Erc20Raw is an auto generated low-level Go binding around an Ethereum contract. -type Erc20Raw struct { - Contract *Erc20 // Generic contract binding to access the raw methods on -} - -// Erc20CallerRaw is an auto generated low-level read-only Go binding around an Ethereum contract. -type Erc20CallerRaw struct { - Contract *Erc20Caller // Generic read-only contract binding to access the raw methods on -} - -// Erc20TransactorRaw is an auto generated low-level write-only Go binding around an Ethereum contract. -type Erc20TransactorRaw struct { - Contract *Erc20Transactor // Generic write-only contract binding to access the raw methods on -} - -// NewErc20 creates a new instance of Erc20, bound to a specific deployed contract. -func NewErc20(address common.Address, backend bind.ContractBackend) (*Erc20, error) { - contract, err := bindErc20(address, backend, backend, backend) - if err != nil { - return nil, err - } - return &Erc20{Erc20Caller: Erc20Caller{contract: contract}, Erc20Transactor: Erc20Transactor{contract: contract}, Erc20Filterer: Erc20Filterer{contract: contract}}, nil -} - -// NewErc20Caller creates a new read-only instance of Erc20, bound to a specific deployed contract. -func NewErc20Caller(address common.Address, caller bind.ContractCaller) (*Erc20Caller, error) { - contract, err := bindErc20(address, caller, nil, nil) - if err != nil { - return nil, err - } - return &Erc20Caller{contract: contract}, nil -} - -// NewErc20Transactor creates a new write-only instance of Erc20, bound to a specific deployed contract. -func NewErc20Transactor(address common.Address, transactor bind.ContractTransactor) (*Erc20Transactor, error) { - contract, err := bindErc20(address, nil, transactor, nil) - if err != nil { - return nil, err - } - return &Erc20Transactor{contract: contract}, nil -} - -// NewErc20Filterer creates a new log filterer instance of Erc20, bound to a specific deployed contract. -func NewErc20Filterer(address common.Address, filterer bind.ContractFilterer) (*Erc20Filterer, error) { - contract, err := bindErc20(address, nil, nil, filterer) - if err != nil { - return nil, err - } - return &Erc20Filterer{contract: contract}, nil -} - -// bindErc20 binds a generic wrapper to an already deployed contract. -func bindErc20(address common.Address, caller bind.ContractCaller, transactor bind.ContractTransactor, filterer bind.ContractFilterer) (*bind.BoundContract, error) { - parsed, err := abi.JSON(strings.NewReader(Erc20ABI)) - if err != nil { - return nil, err - } - return bind.NewBoundContract(address, parsed, caller, transactor, filterer), nil -} - -// Call invokes the (constant) contract method with params as input values and -// sets the output to result. The result type might be a single field for simple -// returns, a slice of interfaces for anonymous returns and a struct for named -// returns. -func (_Erc20 *Erc20Raw) Call(opts *bind.CallOpts, result *[]interface{}, method string, params ...interface{}) error { - return _Erc20.Contract.Erc20Caller.contract.Call(opts, result, method, params...) -} - -// Transfer initiates a plain transaction to move funds to the contract, calling -// its default method if one is available. -func (_Erc20 *Erc20Raw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) { - return _Erc20.Contract.Erc20Transactor.contract.Transfer(opts) -} - -// Transact invokes the (paid) contract method with params as input values. -func (_Erc20 *Erc20Raw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) { - return _Erc20.Contract.Erc20Transactor.contract.Transact(opts, method, params...) -} - -// Call invokes the (constant) contract method with params as input values and -// sets the output to result. The result type might be a single field for simple -// returns, a slice of interfaces for anonymous returns and a struct for named -// returns. -func (_Erc20 *Erc20CallerRaw) Call(opts *bind.CallOpts, result *[]interface{}, method string, params ...interface{}) error { - return _Erc20.Contract.contract.Call(opts, result, method, params...) -} - -// Transfer initiates a plain transaction to move funds to the contract, calling -// its default method if one is available. -func (_Erc20 *Erc20TransactorRaw) Transfer(opts *bind.TransactOpts) (*types.Transaction, error) { - return _Erc20.Contract.contract.Transfer(opts) -} - -// Transact invokes the (paid) contract method with params as input values. -func (_Erc20 *Erc20TransactorRaw) Transact(opts *bind.TransactOpts, method string, params ...interface{}) (*types.Transaction, error) { - return _Erc20.Contract.contract.Transact(opts, method, params...) -} - -// Allowance is a free data retrieval call binding the contract method 0xdd62ed3e. -// -// Solidity: function allowance(address owner, address spender) view returns(uint256) -func (_Erc20 *Erc20Caller) Allowance(opts *bind.CallOpts, owner common.Address, spender common.Address) (*big.Int, error) { - var out []interface{} - err := _Erc20.contract.Call(opts, &out, "allowance", owner, spender) - - if err != nil { - return *new(*big.Int), err - } - - out0 := *abi.ConvertType(out[0], new(*big.Int)).(**big.Int) - - return out0, err - -} - -// Allowance is a free data retrieval call binding the contract method 0xdd62ed3e. -// -// Solidity: function allowance(address owner, address spender) view returns(uint256) -func (_Erc20 *Erc20Session) Allowance(owner common.Address, spender common.Address) (*big.Int, error) { - return _Erc20.Contract.Allowance(&_Erc20.CallOpts, owner, spender) -} - -// Allowance is a free data retrieval call binding the contract method 0xdd62ed3e. -// -// Solidity: function allowance(address owner, address spender) view returns(uint256) -func (_Erc20 *Erc20CallerSession) Allowance(owner common.Address, spender common.Address) (*big.Int, error) { - return _Erc20.Contract.Allowance(&_Erc20.CallOpts, owner, spender) -} - -// BalanceOf is a free data retrieval call binding the contract method 0x70a08231. -// -// Solidity: function balanceOf(address account) view returns(uint256) -func (_Erc20 *Erc20Caller) BalanceOf(opts *bind.CallOpts, account common.Address) (*big.Int, error) { - var out []interface{} - err := _Erc20.contract.Call(opts, &out, "balanceOf", account) - - if err != nil { - return *new(*big.Int), err - } - - out0 := *abi.ConvertType(out[0], new(*big.Int)).(**big.Int) - - return out0, err - -} - -// BalanceOf is a free data retrieval call binding the contract method 0x70a08231. -// -// Solidity: function balanceOf(address account) view returns(uint256) -func (_Erc20 *Erc20Session) BalanceOf(account common.Address) (*big.Int, error) { - return _Erc20.Contract.BalanceOf(&_Erc20.CallOpts, account) -} - -// BalanceOf is a free data retrieval call binding the contract method 0x70a08231. -// -// Solidity: function balanceOf(address account) view returns(uint256) -func (_Erc20 *Erc20CallerSession) BalanceOf(account common.Address) (*big.Int, error) { - return _Erc20.Contract.BalanceOf(&_Erc20.CallOpts, account) -} - -// Decimals is a free data retrieval call binding the contract method 0x313ce567. -// -// Solidity: function decimals() view returns(uint8) -func (_Erc20 *Erc20Caller) Decimals(opts *bind.CallOpts) (uint8, error) { - var out []interface{} - err := _Erc20.contract.Call(opts, &out, "decimals") - - if err != nil { - return *new(uint8), err - } - - out0 := *abi.ConvertType(out[0], new(uint8)).(*uint8) - - return out0, err - -} - -// Decimals is a free data retrieval call binding the contract method 0x313ce567. -// -// Solidity: function decimals() view returns(uint8) -func (_Erc20 *Erc20Session) Decimals() (uint8, error) { - return _Erc20.Contract.Decimals(&_Erc20.CallOpts) -} - -// Decimals is a free data retrieval call binding the contract method 0x313ce567. -// -// Solidity: function decimals() view returns(uint8) -func (_Erc20 *Erc20CallerSession) Decimals() (uint8, error) { - return _Erc20.Contract.Decimals(&_Erc20.CallOpts) -} - -// Name is a free data retrieval call binding the contract method 0x06fdde03. -// -// Solidity: function name() view returns(string) -func (_Erc20 *Erc20Caller) Name(opts *bind.CallOpts) (string, error) { - var out []interface{} - err := _Erc20.contract.Call(opts, &out, "name") - - if err != nil { - return *new(string), err - } - - out0 := *abi.ConvertType(out[0], new(string)).(*string) - - return out0, err - -} - -// Name is a free data retrieval call binding the contract method 0x06fdde03. -// -// Solidity: function name() view returns(string) -func (_Erc20 *Erc20Session) Name() (string, error) { - return _Erc20.Contract.Name(&_Erc20.CallOpts) -} - -// Name is a free data retrieval call binding the contract method 0x06fdde03. -// -// Solidity: function name() view returns(string) -func (_Erc20 *Erc20CallerSession) Name() (string, error) { - return _Erc20.Contract.Name(&_Erc20.CallOpts) -} - -// Symbol is a free data retrieval call binding the contract method 0x95d89b41. -// -// Solidity: function symbol() view returns(string) -func (_Erc20 *Erc20Caller) Symbol(opts *bind.CallOpts) (string, error) { - var out []interface{} - err := _Erc20.contract.Call(opts, &out, "symbol") - - if err != nil { - return *new(string), err - } - - out0 := *abi.ConvertType(out[0], new(string)).(*string) - - return out0, err - -} - -// Symbol is a free data retrieval call binding the contract method 0x95d89b41. -// -// Solidity: function symbol() view returns(string) -func (_Erc20 *Erc20Session) Symbol() (string, error) { - return _Erc20.Contract.Symbol(&_Erc20.CallOpts) -} - -// Symbol is a free data retrieval call binding the contract method 0x95d89b41. -// -// Solidity: function symbol() view returns(string) -func (_Erc20 *Erc20CallerSession) Symbol() (string, error) { - return _Erc20.Contract.Symbol(&_Erc20.CallOpts) -} - -// TotalSupply is a free data retrieval call binding the contract method 0x18160ddd. -// -// Solidity: function totalSupply() view returns(uint256) -func (_Erc20 *Erc20Caller) TotalSupply(opts *bind.CallOpts) (*big.Int, error) { - var out []interface{} - err := _Erc20.contract.Call(opts, &out, "totalSupply") - - if err != nil { - return *new(*big.Int), err - } - - out0 := *abi.ConvertType(out[0], new(*big.Int)).(**big.Int) - - return out0, err - -} - -// TotalSupply is a free data retrieval call binding the contract method 0x18160ddd. -// -// Solidity: function totalSupply() view returns(uint256) -func (_Erc20 *Erc20Session) TotalSupply() (*big.Int, error) { - return _Erc20.Contract.TotalSupply(&_Erc20.CallOpts) -} - -// TotalSupply is a free data retrieval call binding the contract method 0x18160ddd. -// -// Solidity: function totalSupply() view returns(uint256) -func (_Erc20 *Erc20CallerSession) TotalSupply() (*big.Int, error) { - return _Erc20.Contract.TotalSupply(&_Erc20.CallOpts) -} - -// Approve is a paid mutator transaction binding the contract method 0x095ea7b3. -// -// Solidity: function approve(address spender, uint256 amount) returns(bool) -func (_Erc20 *Erc20Transactor) Approve(opts *bind.TransactOpts, spender common.Address, amount *big.Int) (*types.Transaction, error) { - return _Erc20.contract.Transact(opts, "approve", spender, amount) -} - -// Approve is a paid mutator transaction binding the contract method 0x095ea7b3. -// -// Solidity: function approve(address spender, uint256 amount) returns(bool) -func (_Erc20 *Erc20Session) Approve(spender common.Address, amount *big.Int) (*types.Transaction, error) { - return _Erc20.Contract.Approve(&_Erc20.TransactOpts, spender, amount) -} - -// Approve is a paid mutator transaction binding the contract method 0x095ea7b3. -// -// Solidity: function approve(address spender, uint256 amount) returns(bool) -func (_Erc20 *Erc20TransactorSession) Approve(spender common.Address, amount *big.Int) (*types.Transaction, error) { - return _Erc20.Contract.Approve(&_Erc20.TransactOpts, spender, amount) -} - -// DecreaseAllowance is a paid mutator transaction binding the contract method 0xa457c2d7. -// -// Solidity: function decreaseAllowance(address spender, uint256 subtractedValue) returns(bool) -func (_Erc20 *Erc20Transactor) DecreaseAllowance(opts *bind.TransactOpts, spender common.Address, subtractedValue *big.Int) (*types.Transaction, error) { - return _Erc20.contract.Transact(opts, "decreaseAllowance", spender, subtractedValue) -} - -// DecreaseAllowance is a paid mutator transaction binding the contract method 0xa457c2d7. -// -// Solidity: function decreaseAllowance(address spender, uint256 subtractedValue) returns(bool) -func (_Erc20 *Erc20Session) DecreaseAllowance(spender common.Address, subtractedValue *big.Int) (*types.Transaction, error) { - return _Erc20.Contract.DecreaseAllowance(&_Erc20.TransactOpts, spender, subtractedValue) -} - -// DecreaseAllowance is a paid mutator transaction binding the contract method 0xa457c2d7. -// -// Solidity: function decreaseAllowance(address spender, uint256 subtractedValue) returns(bool) -func (_Erc20 *Erc20TransactorSession) DecreaseAllowance(spender common.Address, subtractedValue *big.Int) (*types.Transaction, error) { - return _Erc20.Contract.DecreaseAllowance(&_Erc20.TransactOpts, spender, subtractedValue) -} - -// IncreaseAllowance is a paid mutator transaction binding the contract method 0x39509351. -// -// Solidity: function increaseAllowance(address spender, uint256 addedValue) returns(bool) -func (_Erc20 *Erc20Transactor) IncreaseAllowance(opts *bind.TransactOpts, spender common.Address, addedValue *big.Int) (*types.Transaction, error) { - return _Erc20.contract.Transact(opts, "increaseAllowance", spender, addedValue) -} - -// IncreaseAllowance is a paid mutator transaction binding the contract method 0x39509351. -// -// Solidity: function increaseAllowance(address spender, uint256 addedValue) returns(bool) -func (_Erc20 *Erc20Session) IncreaseAllowance(spender common.Address, addedValue *big.Int) (*types.Transaction, error) { - return _Erc20.Contract.IncreaseAllowance(&_Erc20.TransactOpts, spender, addedValue) -} - -// IncreaseAllowance is a paid mutator transaction binding the contract method 0x39509351. -// -// Solidity: function increaseAllowance(address spender, uint256 addedValue) returns(bool) -func (_Erc20 *Erc20TransactorSession) IncreaseAllowance(spender common.Address, addedValue *big.Int) (*types.Transaction, error) { - return _Erc20.Contract.IncreaseAllowance(&_Erc20.TransactOpts, spender, addedValue) -} - -// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb. -// -// Solidity: function transfer(address recipient, uint256 amount) returns(bool) -func (_Erc20 *Erc20Transactor) Transfer(opts *bind.TransactOpts, recipient common.Address, amount *big.Int) (*types.Transaction, error) { - return _Erc20.contract.Transact(opts, "transfer", recipient, amount) -} - -// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb. -// -// Solidity: function transfer(address recipient, uint256 amount) returns(bool) -func (_Erc20 *Erc20Session) Transfer(recipient common.Address, amount *big.Int) (*types.Transaction, error) { - return _Erc20.Contract.Transfer(&_Erc20.TransactOpts, recipient, amount) -} - -// Transfer is a paid mutator transaction binding the contract method 0xa9059cbb. -// -// Solidity: function transfer(address recipient, uint256 amount) returns(bool) -func (_Erc20 *Erc20TransactorSession) Transfer(recipient common.Address, amount *big.Int) (*types.Transaction, error) { - return _Erc20.Contract.Transfer(&_Erc20.TransactOpts, recipient, amount) -} - -// TransferFrom is a paid mutator transaction binding the contract method 0x23b872dd. -// -// Solidity: function transferFrom(address sender, address recipient, uint256 amount) returns(bool) -func (_Erc20 *Erc20Transactor) TransferFrom(opts *bind.TransactOpts, sender common.Address, recipient common.Address, amount *big.Int) (*types.Transaction, error) { - return _Erc20.contract.Transact(opts, "transferFrom", sender, recipient, amount) -} - -// TransferFrom is a paid mutator transaction binding the contract method 0x23b872dd. -// -// Solidity: function transferFrom(address sender, address recipient, uint256 amount) returns(bool) -func (_Erc20 *Erc20Session) TransferFrom(sender common.Address, recipient common.Address, amount *big.Int) (*types.Transaction, error) { - return _Erc20.Contract.TransferFrom(&_Erc20.TransactOpts, sender, recipient, amount) -} - -// TransferFrom is a paid mutator transaction binding the contract method 0x23b872dd. -// -// Solidity: function transferFrom(address sender, address recipient, uint256 amount) returns(bool) -func (_Erc20 *Erc20TransactorSession) TransferFrom(sender common.Address, recipient common.Address, amount *big.Int) (*types.Transaction, error) { - return _Erc20.Contract.TransferFrom(&_Erc20.TransactOpts, sender, recipient, amount) -} - -// Erc20ApprovalIterator is returned from FilterApproval and is used to iterate over the raw logs and unpacked data for Approval events raised by the Erc20 contract. -type Erc20ApprovalIterator struct { - Event *Erc20Approval // Event containing the contract specifics and raw log - - contract *bind.BoundContract // Generic contract to use for unpacking event data - event string // Event name to use for unpacking event data - - logs chan types.Log // Log channel receiving the found contract events - sub ethereum.Subscription // Subscription for errors, completion and termination - done bool // Whether the subscription completed delivering logs - fail error // Occurred error to stop iteration -} - -// Next advances the iterator to the subsequent event, returning whether there -// are any more events found. In case of a retrieval or parsing error, false is -// returned and Error() can be queried for the exact failure. -func (it *Erc20ApprovalIterator) Next() bool { - // If the iterator failed, stop iterating - if it.fail != nil { - return false - } - // If the iterator completed, deliver directly whatever's available - if it.done { - select { - case log := <-it.logs: - it.Event = new(Erc20Approval) - if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { - it.fail = err - return false - } - it.Event.Raw = log - return true - - default: - return false - } - } - // Iterator still in progress, wait for either a data or an error event - select { - case log := <-it.logs: - it.Event = new(Erc20Approval) - if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { - it.fail = err - return false - } - it.Event.Raw = log - return true - - case err := <-it.sub.Err(): - it.done = true - it.fail = err - return it.Next() - } -} - -// Error returns any retrieval or parsing error occurred during filtering. -func (it *Erc20ApprovalIterator) Error() error { - return it.fail -} - -// Close terminates the iteration process, releasing any pending underlying -// resources. -func (it *Erc20ApprovalIterator) Close() error { - it.sub.Unsubscribe() - return nil -} - -// Erc20Approval represents a Approval event raised by the Erc20 contract. -type Erc20Approval struct { - Owner common.Address - Spender common.Address - Value *big.Int - Raw types.Log // Blockchain specific contextual infos -} - -// FilterApproval is a free log retrieval operation binding the contract event 0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925. -// -// Solidity: event Approval(address indexed owner, address indexed spender, uint256 value) -func (_Erc20 *Erc20Filterer) FilterApproval(opts *bind.FilterOpts, owner []common.Address, spender []common.Address) (*Erc20ApprovalIterator, error) { - - var ownerRule []interface{} - for _, ownerItem := range owner { - ownerRule = append(ownerRule, ownerItem) - } - var spenderRule []interface{} - for _, spenderItem := range spender { - spenderRule = append(spenderRule, spenderItem) - } - - logs, sub, err := _Erc20.contract.FilterLogs(opts, "Approval", ownerRule, spenderRule) - if err != nil { - return nil, err - } - return &Erc20ApprovalIterator{contract: _Erc20.contract, event: "Approval", logs: logs, sub: sub}, nil -} - -// WatchApproval is a free log subscription operation binding the contract event 0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925. -// -// Solidity: event Approval(address indexed owner, address indexed spender, uint256 value) -func (_Erc20 *Erc20Filterer) WatchApproval(opts *bind.WatchOpts, sink chan<- *Erc20Approval, owner []common.Address, spender []common.Address) (event.Subscription, error) { - - var ownerRule []interface{} - for _, ownerItem := range owner { - ownerRule = append(ownerRule, ownerItem) - } - var spenderRule []interface{} - for _, spenderItem := range spender { - spenderRule = append(spenderRule, spenderItem) - } - - logs, sub, err := _Erc20.contract.WatchLogs(opts, "Approval", ownerRule, spenderRule) - if err != nil { - return nil, err - } - return event.NewSubscription(func(quit <-chan struct{}) error { - defer sub.Unsubscribe() - for { - select { - case log := <-logs: - // New log arrived, parse the event and forward to the user - event := new(Erc20Approval) - if err := _Erc20.contract.UnpackLog(event, "Approval", log); err != nil { - return err - } - event.Raw = log - - select { - case sink <- event: - case err := <-sub.Err(): - return err - case <-quit: - return nil - } - case err := <-sub.Err(): - return err - case <-quit: - return nil - } - } - }), nil -} - -// ParseApproval is a log parse operation binding the contract event 0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925. -// -// Solidity: event Approval(address indexed owner, address indexed spender, uint256 value) -func (_Erc20 *Erc20Filterer) ParseApproval(log types.Log) (*Erc20Approval, error) { - event := new(Erc20Approval) - if err := _Erc20.contract.UnpackLog(event, "Approval", log); err != nil { - return nil, err - } - return event, nil -} - -// Erc20TransferIterator is returned from FilterTransfer and is used to iterate over the raw logs and unpacked data for Transfer events raised by the Erc20 contract. -type Erc20TransferIterator struct { - Event *Erc20Transfer // Event containing the contract specifics and raw log - - contract *bind.BoundContract // Generic contract to use for unpacking event data - event string // Event name to use for unpacking event data - - logs chan types.Log // Log channel receiving the found contract events - sub ethereum.Subscription // Subscription for errors, completion and termination - done bool // Whether the subscription completed delivering logs - fail error // Occurred error to stop iteration -} - -// Next advances the iterator to the subsequent event, returning whether there -// are any more events found. In case of a retrieval or parsing error, false is -// returned and Error() can be queried for the exact failure. -func (it *Erc20TransferIterator) Next() bool { - // If the iterator failed, stop iterating - if it.fail != nil { - return false - } - // If the iterator completed, deliver directly whatever's available - if it.done { - select { - case log := <-it.logs: - it.Event = new(Erc20Transfer) - if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { - it.fail = err - return false - } - it.Event.Raw = log - return true - - default: - return false - } - } - // Iterator still in progress, wait for either a data or an error event - select { - case log := <-it.logs: - it.Event = new(Erc20Transfer) - if err := it.contract.UnpackLog(it.Event, it.event, log); err != nil { - it.fail = err - return false - } - it.Event.Raw = log - return true - - case err := <-it.sub.Err(): - it.done = true - it.fail = err - return it.Next() - } -} - -// Error returns any retrieval or parsing error occurred during filtering. -func (it *Erc20TransferIterator) Error() error { - return it.fail -} - -// Close terminates the iteration process, releasing any pending underlying -// resources. -func (it *Erc20TransferIterator) Close() error { - it.sub.Unsubscribe() - return nil -} - -// Erc20Transfer represents a Transfer event raised by the Erc20 contract. -type Erc20Transfer struct { - From common.Address - To common.Address - Value *big.Int - Raw types.Log // Blockchain specific contextual infos -} - -// FilterTransfer is a free log retrieval operation binding the contract event 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef. -// -// Solidity: event Transfer(address indexed from, address indexed to, uint256 value) -func (_Erc20 *Erc20Filterer) FilterTransfer(opts *bind.FilterOpts, from []common.Address, to []common.Address) (*Erc20TransferIterator, error) { - - var fromRule []interface{} - for _, fromItem := range from { - fromRule = append(fromRule, fromItem) - } - var toRule []interface{} - for _, toItem := range to { - toRule = append(toRule, toItem) - } - - logs, sub, err := _Erc20.contract.FilterLogs(opts, "Transfer", fromRule, toRule) - if err != nil { - return nil, err - } - return &Erc20TransferIterator{contract: _Erc20.contract, event: "Transfer", logs: logs, sub: sub}, nil -} - -// WatchTransfer is a free log subscription operation binding the contract event 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef. -// -// Solidity: event Transfer(address indexed from, address indexed to, uint256 value) -func (_Erc20 *Erc20Filterer) WatchTransfer(opts *bind.WatchOpts, sink chan<- *Erc20Transfer, from []common.Address, to []common.Address) (event.Subscription, error) { - - var fromRule []interface{} - for _, fromItem := range from { - fromRule = append(fromRule, fromItem) - } - var toRule []interface{} - for _, toItem := range to { - toRule = append(toRule, toItem) - } - - logs, sub, err := _Erc20.contract.WatchLogs(opts, "Transfer", fromRule, toRule) - if err != nil { - return nil, err - } - return event.NewSubscription(func(quit <-chan struct{}) error { - defer sub.Unsubscribe() - for { - select { - case log := <-logs: - // New log arrived, parse the event and forward to the user - event := new(Erc20Transfer) - if err := _Erc20.contract.UnpackLog(event, "Transfer", log); err != nil { - return err - } - event.Raw = log - - select { - case sink <- event: - case err := <-sub.Err(): - return err - case <-quit: - return nil - } - case err := <-sub.Err(): - return err - case <-quit: - return nil - } - } - }), nil -} - -// ParseTransfer is a log parse operation binding the contract event 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef. -// -// Solidity: event Transfer(address indexed from, address indexed to, uint256 value) -func (_Erc20 *Erc20Filterer) ParseTransfer(log types.Log) (*Erc20Transfer, error) { - event := new(Erc20Transfer) - if err := _Erc20.contract.UnpackLog(event, "Transfer", log); err != nil { - return nil, err - } - return event, nil -} diff --git a/node/pkg/ethereum/ethimpl.go b/node/pkg/ethereum/ethimpl.go deleted file mode 100644 index 591321bd9..000000000 --- a/node/pkg/ethereum/ethimpl.go +++ /dev/null @@ -1,138 +0,0 @@ -// This implements the interface to the standard go-ethereum library. - -package ethereum - -import ( - "context" - - ethereum "github.com/ethereum/go-ethereum" - ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind" - ethCommon "github.com/ethereum/go-ethereum/common" - ethTypes "github.com/ethereum/go-ethereum/core/types" - ethClient "github.com/ethereum/go-ethereum/ethclient" - ethEvent "github.com/ethereum/go-ethereum/event" - - common "github.com/certusone/wormhole/node/pkg/common" - ethAbi "github.com/certusone/wormhole/node/pkg/ethereum/abi" - - "go.uber.org/zap" -) - -type EthImpl struct { - NetworkName string - logger *zap.Logger - client *ethClient.Client - filterer *ethAbi.AbiFilterer - caller *ethAbi.AbiCaller -} - -func (e *EthImpl) SetLogger(l *zap.Logger) { - e.logger = l -} - -func (e *EthImpl) DialContext(ctx context.Context, rawurl string) (err error) { - e.client, err = ethClient.DialContext(ctx, rawurl) - return -} - -func (e *EthImpl) NewAbiFilterer(address ethCommon.Address) (err error) { - e.filterer, err = ethAbi.NewAbiFilterer(address, e.client) - return -} - -func (e *EthImpl) NewAbiCaller(address ethCommon.Address) (err error) { - e.caller, err = ethAbi.NewAbiCaller(address, e.client) - return -} - -func (e *EthImpl) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) { - if e.caller == nil { - panic("caller is not initialized!") - } - - opts := ðBind.CallOpts{Context: ctx} - return e.caller.GetCurrentGuardianSetIndex(opts) -} - -func (e *EthImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) { - if e.caller == nil { - panic("caller is not initialized!") - } - - opts := ðBind.CallOpts{Context: ctx} - return e.caller.GetGuardianSet(opts, index) -} - -func (e *EthImpl) WatchLogMessagePublished(_ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { - if e.filterer == nil { - panic("filterer is not initialized!") - } - - return e.filterer.WatchLogMessagePublished(ðBind.WatchOpts{Context: timeout}, sink, nil) -} - -func (e *EthImpl) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) { - if e.client == nil { - panic("client is not initialized!") - } - - return e.client.TransactionReceipt(ctx, txHash) -} - -func (e *EthImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) { - if e.client == nil { - panic("client is not initialized!") - } - - block, err := e.client.BlockByHash(ctx, hash) - if err != nil { - return 0, err - } - - return block.Time(), err -} - -func (e *EthImpl) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) { - if e.filterer == nil { - panic("filterer is not initialized!") - } - - return e.filterer.ParseLogMessagePublished(log) -} - -func (e *EthImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.NewBlock) (ethereum.Subscription, error) { - if e.client == nil { - panic("client is not initialized!") - } - - headSink := make(chan *ethTypes.Header, 2) - headerSubscription, err := e.client.SubscribeNewHead(ctx, headSink) - if err != nil { - return headerSubscription, err - } - - // The purpose of this is to map events from the geth event channel to the new block event channel. - go func() { - for { - select { - case <-ctx.Done(): - return - case ev := <-headSink: - if ev == nil { - e.logger.Error("new header event is nil", zap.String("eth_network", e.NetworkName)) - continue - } - if ev.Number == nil { - e.logger.Error("new header block number is nil", zap.String("eth_network", e.NetworkName)) - continue - } - sink <- &common.NewBlock{ - Number: ev.Number, - Hash: ev.Hash(), - } - } - } - }() - - return headerSubscription, err -} diff --git a/node/pkg/ethereum/getlogsimpl.go b/node/pkg/ethereum/getlogsimpl.go deleted file mode 100644 index 3ff83c9b2..000000000 --- a/node/pkg/ethereum/getlogsimpl.go +++ /dev/null @@ -1,196 +0,0 @@ -// This implements polling for log events. - -// It works by using the finalizer in the polling implementation to check for log events on each new block. - -package ethereum - -import ( - "context" - "errors" - "fmt" - "math/big" - "sync" - "time" - - ethereum "github.com/ethereum/go-ethereum" - ethCommon "github.com/ethereum/go-ethereum/common" - ethTypes "github.com/ethereum/go-ethereum/core/types" - ethClient "github.com/ethereum/go-ethereum/ethclient" - ethEvent "github.com/ethereum/go-ethereum/event" - - common "github.com/certusone/wormhole/node/pkg/common" - ethAbi "github.com/certusone/wormhole/node/pkg/ethereum/abi" - - "go.uber.org/zap" -) - -type GetLogsImpl struct { - BasePoller *PollImpl - Query *GetLogsQuery - logger *zap.Logger -} - -func NewGetLogsImpl(networkName string, contract ethCommon.Address, delayInMs int) *GetLogsImpl { - query := &GetLogsQuery{ContractAddress: contract} - return &GetLogsImpl{BasePoller: &PollImpl{BaseEth: EthImpl{NetworkName: networkName}, Finalizer: query, DelayInMs: delayInMs}, Query: query} -} - -func (e *GetLogsImpl) SetLogger(l *zap.Logger) { - e.logger = l - e.logger.Info("using eth_getLogs api to retreive log events", zap.String("eth_network", e.BasePoller.BaseEth.NetworkName)) - e.BasePoller.SetLogger(l) -} - -func (e *GetLogsImpl) DialContext(ctx context.Context, rawurl string) (err error) { - e.Query.poller = e.BasePoller - return e.BasePoller.DialContext(ctx, rawurl) -} - -func (e *GetLogsImpl) NewAbiFilterer(address ethCommon.Address) (err error) { - return e.BasePoller.NewAbiFilterer(address) -} - -func (e *GetLogsImpl) NewAbiCaller(address ethCommon.Address) (err error) { - return e.BasePoller.NewAbiCaller(address) -} - -func (e *GetLogsImpl) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) { - return e.BasePoller.GetCurrentGuardianSetIndex(ctx) -} - -func (e *GetLogsImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) { - return e.BasePoller.GetGuardianSet(ctx, index) -} - -type GetLogsPollSubscription struct { - errOnce sync.Once - err chan error - quit chan error - unsubDone chan struct{} -} - -var ErrUnsubscribedForGetLogs = errors.New("unsubscribed") - -func (sub *GetLogsPollSubscription) Err() <-chan error { - return sub.err -} - -func (sub *GetLogsPollSubscription) Unsubscribe() { - sub.errOnce.Do(func() { - select { - case sub.quit <- ErrUnsubscribedForGetLogs: - <-sub.unsubDone - case <-sub.unsubDone: - } - close(sub.err) - }) -} - -func (e *GetLogsImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { - e.Query.sink = sink - - e.Query.sub = &GetLogsPollSubscription{ - err: make(chan error, 1), - } - - return e.Query.sub, nil -} - -func (e *GetLogsImpl) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) { - return e.BasePoller.TransactionReceipt(ctx, txHash) -} - -func (e *GetLogsImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) { - return e.BasePoller.TimeOfBlockByHash(ctx, hash) -} - -func (e *GetLogsImpl) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) { - return e.BasePoller.ParseLogMessagePublished(log) -} - -func (e *GetLogsImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.NewBlock) (ethereum.Subscription, error) { - return e.BasePoller.SubscribeForBlocks(ctx, sink) -} - -type GetLogsQuery struct { - logger *zap.Logger - networkName string - ContractAddress ethCommon.Address - prevBlockNum *big.Int - client *ethClient.Client - poller *PollImpl - sink chan<- *ethAbi.AbiLogMessagePublished - sub *GetLogsPollSubscription -} - -func (f *GetLogsQuery) SetLogger(l *zap.Logger, netName string) { - f.logger = l - f.networkName = netName -} - -func (f *GetLogsQuery) DialContext(ctx context.Context, rawurl string) (err error) { - timeout, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() - f.client, err = ethClient.DialContext(timeout, rawurl) - return err -} - -var ( - getLogsBigOne = big.NewInt(1) - logsLogMessageTopic = ethCommon.HexToHash("0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2") -) - -// This doesn't actually check finality, instead it queries for new log events. -func (f *GetLogsQuery) IsBlockFinalized(ctx context.Context, block *common.NewBlock) (bool, error) { - if f.prevBlockNum == nil { - f.prevBlockNum = new(big.Int).Set(block.Number) - } else { - f.prevBlockNum.Add(f.prevBlockNum, getLogsBigOne) - } - - filter := ethereum.FilterQuery{ - FromBlock: f.prevBlockNum, - ToBlock: block.Number, - Addresses: []ethCommon.Address{f.ContractAddress}, - } - - *f.prevBlockNum = *block.Number - - logs, err := f.client.FilterLogs(ctx, filter) - if err != nil { - f.logger.Error("GetLogsQuery: query of eth_getLogs failed", - zap.String("eth_network", f.networkName), - zap.Stringer("FromBlock", filter.FromBlock), - zap.Stringer("ToBlock", filter.ToBlock), - zap.Error(err), - ) - - f.sub.err <- fmt.Errorf("GetLogsQuery: failed to query for log messages: %w", err) - return true, nil // We still return true here, because we don't want this error flagged against the poller. - } - - if len(logs) == 0 { - return true, nil - } - - for _, log := range logs { - if log.Topics[0] == logsLogMessageTopic { - ev, err := f.poller.ParseLogMessagePublished(log) - if err != nil { - f.logger.Error("GetLogsQuery: failed to parse log entry", - zap.String("eth_network", f.networkName), - zap.Stringer("FromBlock", filter.FromBlock), - zap.Stringer("ToBlock", filter.ToBlock), - zap.Error(err), - ) - - f.sub.err <- fmt.Errorf("failed to parse log message: %w", err) - continue - } - - f.sink <- ev - } - } - - return true, nil -} diff --git a/node/pkg/ethereum/moonbeamfin.go b/node/pkg/ethereum/moonbeamfin.go deleted file mode 100644 index c2f27e1f8..000000000 --- a/node/pkg/ethereum/moonbeamfin.go +++ /dev/null @@ -1,46 +0,0 @@ -// This implements the finality check for Moonbeam. -// -// Moonbeam can publish blocks before they are marked final. This means we need to sit on the block until a special "is finalized" -// query returns true. The assumption is that every block number will eventually be published and finalized, it's just that the contents -// of the block (and therefore the hash) might change if there is a rollback. - -package ethereum - -import ( - "context" - "time" - - common "github.com/certusone/wormhole/node/pkg/common" - ethRpc "github.com/ethereum/go-ethereum/rpc" - "go.uber.org/zap" -) - -type MoonbeamFinalizer struct { - logger *zap.Logger - networkName string - client *ethRpc.Client -} - -func (f *MoonbeamFinalizer) SetLogger(l *zap.Logger, netName string) { - f.logger = l - f.networkName = netName - f.logger.Info("using Moonbeam specific finality check", zap.String("eth_network", f.networkName)) -} - -func (f *MoonbeamFinalizer) DialContext(ctx context.Context, rawurl string) (err error) { - timeout, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() - f.client, err = ethRpc.DialContext(timeout, rawurl) - return err -} - -func (f *MoonbeamFinalizer) IsBlockFinalized(ctx context.Context, block *common.NewBlock) (bool, error) { - var finalized bool - err := f.client.CallContext(ctx, &finalized, "moon_isBlockFinalized", block.Hash.Hex()) - if err != nil { - f.logger.Error("failed to check for finality", zap.String("eth_network", f.networkName), zap.Error(err)) - return false, err - } - - return finalized, nil -} diff --git a/node/pkg/ethereum/pollimpl.go b/node/pkg/ethereum/pollimpl.go deleted file mode 100644 index 1661f0f64..000000000 --- a/node/pkg/ethereum/pollimpl.go +++ /dev/null @@ -1,274 +0,0 @@ -// This implements polling for the next available block. - -// It can optionally call a chain specific function to verify that the block is finalized. - -package ethereum - -import ( - "context" - "errors" - "fmt" - "math/big" - "sync" - "time" - - ethereum "github.com/ethereum/go-ethereum" - ethCommon "github.com/ethereum/go-ethereum/common" - ethHexUtils "github.com/ethereum/go-ethereum/common/hexutil" - ethTypes "github.com/ethereum/go-ethereum/core/types" - ethEvent "github.com/ethereum/go-ethereum/event" - ethRpc "github.com/ethereum/go-ethereum/rpc" - - common "github.com/certusone/wormhole/node/pkg/common" - ethAbi "github.com/certusone/wormhole/node/pkg/ethereum/abi" - - "go.uber.org/zap" -) - -type PollFinalizer interface { - SetLogger(l *zap.Logger, netName string) - DialContext(ctx context.Context, rawurl string) error - IsBlockFinalized(ctx context.Context, block *common.NewBlock) (bool, error) -} - -type PollImpl struct { - BaseEth EthImpl - Finalizer PollFinalizer - DelayInMs int - IsEthPoS bool - hasEthSwitchedToPoS bool - logger *zap.Logger - rawClient *ethRpc.Client -} - -func (e *PollImpl) SetLogger(l *zap.Logger) { - e.logger = l - e.logger.Info("using polling to check for new blocks", zap.String("eth_network", e.BaseEth.NetworkName), zap.Int("delay_in_ms", e.DelayInMs)) - if e.Finalizer != nil { - e.Finalizer.SetLogger(l, e.BaseEth.NetworkName) - } -} - -func (e *PollImpl) SetEthSwitched() { - e.hasEthSwitchedToPoS = true - e.logger.Info("switching from latest to finalized", zap.String("eth_network", e.BaseEth.NetworkName), zap.Int("delay_in_ms", e.DelayInMs)) -} - -func (e *PollImpl) DialContext(ctx context.Context, rawurl string) (err error) { - timeout, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() - - // This is used for doing raw eth_ RPC calls. - e.rawClient, err = ethRpc.DialContext(timeout, rawurl) - if err != nil { - return err - } - - if e.Finalizer != nil { - err = e.Finalizer.DialContext(ctx, rawurl) - if err != nil { - return err - } - } - - // This is used for doing all other go-ethereum calls. - return e.BaseEth.DialContext(ctx, rawurl) -} - -func (e *PollImpl) NewAbiFilterer(address ethCommon.Address) (err error) { - return e.BaseEth.NewAbiFilterer(address) -} - -func (e *PollImpl) NewAbiCaller(address ethCommon.Address) (err error) { - return e.BaseEth.NewAbiCaller(address) -} - -func (e *PollImpl) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) { - return e.BaseEth.GetCurrentGuardianSetIndex(ctx) -} - -func (e *PollImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) { - return e.BaseEth.GetGuardianSet(ctx, index) -} - -func (e *PollImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { - return e.BaseEth.WatchLogMessagePublished(ctx, timeout, sink) -} - -func (e *PollImpl) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) { - return e.BaseEth.TransactionReceipt(ctx, txHash) -} - -func (e *PollImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) { - return e.BaseEth.TimeOfBlockByHash(ctx, hash) -} - -func (e *PollImpl) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) { - return e.BaseEth.ParseLogMessagePublished(log) -} - -type PollSubscription struct { - errOnce sync.Once - err chan error - quit chan error - unsubDone chan struct{} -} - -var ErrUnsubscribed = errors.New("unsubscribed") - -func (sub *PollSubscription) Err() <-chan error { - return sub.err -} - -func (sub *PollSubscription) Unsubscribe() { - sub.errOnce.Do(func() { - select { - case sub.quit <- ErrUnsubscribed: - <-sub.unsubDone - case <-sub.unsubDone: - } - close(sub.err) - }) -} - -func (e *PollImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.NewBlock) (ethereum.Subscription, error) { - if e.BaseEth.client == nil { - panic("client is not initialized!") - } - if e.rawClient == nil { - panic("rawClient is not initialized!") - } - - sub := &PollSubscription{ - err: make(chan error, 1), - } - - latestBlock, err := e.getBlock(ctx, nil) - if err != nil { - return nil, err - } - currentBlockNumber := *latestBlock.Number - - var BIG_ONE = big.NewInt(1) - - timer := time.NewTimer(time.Millisecond) // Start immediately. - go func() { - var errorCount int - for { - select { - case <-ctx.Done(): - return - case <-timer.C: - var errorOccurred bool - for { - var block *common.NewBlock - var err error - errorOccurred = false - - // See if the next block has been created yet. - if currentBlockNumber.Cmp(latestBlock.Number) > 0 { - tmpLatestBlock, latestBlockErr := e.getBlock(ctx, nil) - if latestBlockErr != nil { - errorOccurred = true - e.logger.Error("failed to look up latest block", zap.String("eth_network", e.BaseEth.NetworkName), - zap.Uint64("block", currentBlockNumber.Uint64()), zap.Error(latestBlockErr)) - break - } - latestBlock = tmpLatestBlock - - if currentBlockNumber.Cmp(latestBlock.Number) > 0 { - // We have to wait for this block to become available. - break - } - - if currentBlockNumber.Cmp(latestBlock.Number) == 0 { - block = latestBlock - } - } - - // Fetch the hash every time, in case it changes due to a rollback. The only exception is if we just got it above. - if block == nil { - block, err = e.getBlock(ctx, ¤tBlockNumber) - if err != nil { - errorOccurred = true - e.logger.Error("failed to get current block", zap.String("eth_network", e.BaseEth.NetworkName), - zap.Uint64("block", currentBlockNumber.Uint64()), zap.Error(err)) - break - } - } - - if e.Finalizer != nil { - finalized, err := e.Finalizer.IsBlockFinalized(ctx, block) - if err != nil { - errorOccurred = true - e.logger.Error("failed to see if block is finalized", zap.String("eth_network", e.BaseEth.NetworkName), - zap.Uint64("block", currentBlockNumber.Uint64()), zap.Error(err)) - break - } - - if !finalized { - break - } - } - - sink <- block - currentBlockNumber.Add(¤tBlockNumber, BIG_ONE) - } - - if errorOccurred { - errorCount++ - if errorCount > 1 { - sub.err <- fmt.Errorf("polling encountered multiple errors") - } - } else { - errorCount = 0 - } - - timer = time.NewTimer(time.Duration(e.DelayInMs) * time.Millisecond) - } - } - }() - - return sub, err -} - -func (e *PollImpl) getBlock(ctx context.Context, number *big.Int) (*common.NewBlock, error) { - var numStr string - if number != nil { - numStr = ethHexUtils.EncodeBig(number) - } else if e.hasEthSwitchedToPoS { - numStr = "finalized" - } else { - numStr = "latest" - } - - type Marshaller struct { - Number *ethHexUtils.Big - Hash ethCommon.Hash `json:"hash"` - Difficulty *ethHexUtils.Big - } - - var m Marshaller - err := e.rawClient.CallContext(ctx, &m, "eth_getBlockByNumber", numStr, false) - if err != nil { - e.logger.Error("failed to get block", zap.String("eth_network", e.BaseEth.NetworkName), - zap.String("requested_block", numStr), zap.Error(err)) - return nil, err - } - if m.Number == nil { - e.logger.Error("failed to unmarshal block", zap.String("eth_network", e.BaseEth.NetworkName), - zap.String("requested_block", numStr), - ) - return nil, fmt.Errorf("failed to unmarshal block: Number is nil") - } - d := big.Int(*m.Difficulty) - if e.IsEthPoS && !e.hasEthSwitchedToPoS && d.Cmp(big.NewInt(0)) == 0 { - e.SetEthSwitched() - return e.getBlock(ctx, number) - } - n := big.Int(*m.Number) - return &common.NewBlock{ - Number: &n, - Hash: m.Hash, - }, nil -} diff --git a/node/pkg/ethereum/by_transaction.go b/node/pkg/evm/by_transaction.go similarity index 90% rename from node/pkg/ethereum/by_transaction.go rename to node/pkg/evm/by_transaction.go index 170e8a919..a0ff5294a 100644 --- a/node/pkg/ethereum/by_transaction.go +++ b/node/pkg/evm/by_transaction.go @@ -1,10 +1,12 @@ -package ethereum +package evm import ( "context" "fmt" "time" + "github.com/certusone/wormhole/node/pkg/evm/connectors" + "github.com/certusone/wormhole/node/pkg/common" eth_common "github.com/ethereum/go-ethereum/common" "github.com/wormhole-foundation/wormhole/sdk/vaa" @@ -21,13 +23,13 @@ var ( // Returns the block number and a list of MessagePublication events. func MessageEventsForTransaction( ctx context.Context, - ethIntf common.Ethish, + ethConn connectors.Connector, contract eth_common.Address, chainId vaa.ChainID, tx eth_common.Hash) (uint64, []*common.MessagePublication, error) { // Get transactions logs from transaction - receipt, err := ethIntf.TransactionReceipt(ctx, tx) + receipt, err := ethConn.TransactionReceipt(ctx, tx) if err != nil { return 0, nil, fmt.Errorf("failed to get transaction receipt: %w", err) } @@ -45,7 +47,7 @@ func MessageEventsForTransaction( } // Get block - blockTime, err := ethIntf.TimeOfBlockByHash(ctx, receipt.BlockHash) + blockTime, err := ethConn.TimeOfBlockByHash(ctx, receipt.BlockHash) if err != nil { return 0, nil, fmt.Errorf("failed to get block time: %w", err) } @@ -67,7 +69,7 @@ func MessageEventsForTransaction( continue } - ev, err := ethIntf.ParseLogMessagePublished(*l) + ev, err := ethConn.ParseLogMessagePublished(*l) if err != nil { return 0, nil, fmt.Errorf("failed to parse log: %w", err) } diff --git a/node/pkg/celo/celoimpl.go b/node/pkg/evm/connectors/celo.go similarity index 52% rename from node/pkg/celo/celoimpl.go rename to node/pkg/evm/connectors/celo.go index 2fac263ec..f1e161fc1 100644 --- a/node/pkg/celo/celoimpl.go +++ b/node/pkg/evm/connectors/celo.go @@ -1,71 +1,80 @@ -// This implements the interface to the "go-ethereum-ish" library used by Celo. - -package celo +package connectors import ( "context" + celoAbi "github.com/certusone/wormhole/node/pkg/evm/connectors/celoabi" + ethAbi "github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi" + celoBind "github.com/celo-org/celo-blockchain/accounts/abi/bind" celoCommon "github.com/celo-org/celo-blockchain/common" celoTypes "github.com/celo-org/celo-blockchain/core/types" celoClient "github.com/celo-org/celo-blockchain/ethclient" + celoRpc "github.com/celo-org/celo-blockchain/rpc" ethereum "github.com/ethereum/go-ethereum" ethCommon "github.com/ethereum/go-ethereum/common" ethTypes "github.com/ethereum/go-ethereum/core/types" ethEvent "github.com/ethereum/go-ethereum/event" - celoAbi "github.com/certusone/wormhole/node/pkg/celo/abi" - common "github.com/certusone/wormhole/node/pkg/common" - ethAbi "github.com/certusone/wormhole/node/pkg/ethereum/abi" - "go.uber.org/zap" ) -type CeloImpl struct { - NetworkName string +// CeloConnector implements EVM network query capabilities for the Celo network. It's almost identical to +// EthereumConnector except it's using the Celo fork and provides shims between their respective types. +type CeloConnector struct { + networkName string + address ethCommon.Address logger *zap.Logger client *celoClient.Client + rawClient *celoRpc.Client filterer *celoAbi.AbiFilterer caller *celoAbi.AbiCaller } -func (e *CeloImpl) SetLogger(l *zap.Logger) { - e.logger = l - e.logger.Info("using celo specific ethereum library", zap.String("eth_network", e.NetworkName)) -} +func NewCeloConnector(ctx context.Context, networkName, rawUrl string, address ethCommon.Address, logger *zap.Logger) (*CeloConnector, error) { + rawClient, err := celoRpc.DialContext(ctx, rawUrl) + if err != nil { + return nil, err + } + client := celoClient.NewClient(rawClient) -func (e *CeloImpl) DialContext(ctx context.Context, rawurl string) (err error) { - e.client, err = celoClient.DialContext(ctx, rawurl) - return -} - -func (e *CeloImpl) NewAbiFilterer(address ethCommon.Address) (err error) { - e.filterer, err = celoAbi.NewAbiFilterer(celoCommon.BytesToAddress(address.Bytes()), e.client) - return -} - -func (e *CeloImpl) NewAbiCaller(address ethCommon.Address) (err error) { - e.caller, err = celoAbi.NewAbiCaller(celoCommon.BytesToAddress(address.Bytes()), e.client) - return -} - -func (e *CeloImpl) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) { - if e.caller == nil { - panic("caller is not initialized!") + filterer, err := celoAbi.NewAbiFilterer(celoCommon.BytesToAddress(address.Bytes()), client) + if err != nil { + panic(err) + } + caller, err := celoAbi.NewAbiCaller(celoCommon.BytesToAddress(address.Bytes()), client) + if err != nil { + panic(err) } - opts := &celoBind.CallOpts{Context: ctx} - return e.caller.GetCurrentGuardianSetIndex(opts) + return &CeloConnector{ + networkName: networkName, + address: address, + logger: logger.With(zap.String("eth_network", networkName)), + client: client, + rawClient: rawClient, + filterer: filterer, + caller: caller, + }, nil } -func (e *CeloImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) { - if e.caller == nil { - panic("caller is not initialized!") - } +func (c *CeloConnector) NetworkName() string { + return c.networkName +} +func (c *CeloConnector) ContractAddress() ethCommon.Address { + return c.address +} + +func (c *CeloConnector) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) { opts := &celoBind.CallOpts{Context: ctx} - celoGs, err := e.caller.GetGuardianSet(opts, index) + return c.caller.GetCurrentGuardianSetIndex(opts) +} + +func (c *CeloConnector) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) { + opts := &celoBind.CallOpts{Context: ctx} + celoGs, err := c.caller.GetGuardianSet(opts, index) if err != nil { return ethAbi.StructsGuardianSet{}, err } @@ -81,13 +90,9 @@ func (e *CeloImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.Str }, err } -func (e *CeloImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { - if e.filterer == nil { - panic("filterer is not initialized!") - } - +func (c *CeloConnector) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { messageC := make(chan *celoAbi.AbiLogMessagePublished, 2) - messageSub, err := e.filterer.WatchLogMessagePublished(&celoBind.WatchOpts{Context: timeout}, messageC, nil) + messageSub, err := c.filterer.WatchLogMessagePublished(&celoBind.WatchOpts{Context: ctx}, messageC, nil) if err != nil { return messageSub, err } @@ -96,10 +101,11 @@ func (e *CeloImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink c go func() { for { select { - case <-ctx.Done(): + // This will return when the subscription is unsubscribed as the error channel gets closed + case <-messageSub.Err(): return case celoEvent := <-messageC: - sink <- convertEventToEth(celoEvent) + sink <- convertCeloEventToEth(celoEvent) } } }() @@ -107,25 +113,17 @@ func (e *CeloImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink c return messageSub, err } -func (e *CeloImpl) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) { - if e.client == nil { - panic("client is not initialized!") - } - - celoReceipt, err := e.client.TransactionReceipt(ctx, celoCommon.BytesToHash(txHash.Bytes())) +func (c *CeloConnector) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) { + celoReceipt, err := c.client.TransactionReceipt(ctx, celoCommon.BytesToHash(txHash.Bytes())) if err != nil { return nil, err } - return convertReceiptToEth(celoReceipt), err + return convertCeloReceiptToEth(celoReceipt), err } -func (e *CeloImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) { - if e.client == nil { - panic("client is not initialized!") - } - - block, err := e.client.BlockByHash(ctx, celoCommon.BytesToHash(hash.Bytes())) +func (c *CeloConnector) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) { + block, err := c.client.BlockByHash(ctx, celoCommon.BytesToHash(hash.Bytes())) if err != nil { return 0, err } @@ -133,26 +131,18 @@ func (e *CeloImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) ( return block.Time(), err } -func (e *CeloImpl) ParseLogMessagePublished(ethLog ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) { - if e.filterer == nil { - panic("filterer is not initialized!") - } - - celoEvent, err := e.filterer.ParseLogMessagePublished(*convertLogFromEth(ðLog)) +func (c *CeloConnector) ParseLogMessagePublished(ethLog ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) { + celoEvent, err := c.filterer.ParseLogMessagePublished(*convertCeloLogFromEth(ðLog)) if err != nil { return nil, err } - return convertEventToEth(celoEvent), err + return convertCeloEventToEth(celoEvent), err } -func (e *CeloImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.NewBlock) (ethereum.Subscription, error) { - if e.client == nil { - panic("client is not initialized!") - } - +func (c *CeloConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) { headSink := make(chan *celoTypes.Header, 2) - headerSubscription, err := e.client.SubscribeNewHead(ctx, headSink) + headerSubscription, err := c.client.SubscribeNewHead(ctx, headSink) if err != nil { return headerSubscription, err } @@ -165,14 +155,14 @@ func (e *CeloImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.N return case ev := <-headSink: if ev == nil { - e.logger.Error("new header event is nil", zap.String("eth_network", e.NetworkName)) + c.logger.Error("new header event is nil") continue } if ev.Number == nil { - e.logger.Error("new header block number is nil", zap.String("eth_network", e.NetworkName)) + c.logger.Error("new header block number is nil") continue } - sink <- &common.NewBlock{ + sink <- &NewBlock{ Number: ev.Number, Hash: ethCommon.BytesToHash(ev.Hash().Bytes()), } @@ -183,18 +173,22 @@ func (e *CeloImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.N return headerSubscription, err } -func convertEventToEth(ev *celoAbi.AbiLogMessagePublished) *ethAbi.AbiLogMessagePublished { +func (c *CeloConnector) RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { + return c.rawClient.CallContext(ctx, result, method, args...) +} + +func convertCeloEventToEth(ev *celoAbi.AbiLogMessagePublished) *ethAbi.AbiLogMessagePublished { return ðAbi.AbiLogMessagePublished{ Sender: ethCommon.BytesToAddress(ev.Sender.Bytes()), Sequence: ev.Sequence, Nonce: ev.Nonce, Payload: ev.Payload, ConsistencyLevel: ev.ConsistencyLevel, - Raw: *convertLogToEth(&ev.Raw), + Raw: *convertCeloLogToEth(&ev.Raw), } } -func convertLogToEth(l *celoTypes.Log) *ethTypes.Log { +func convertCeloLogToEth(l *celoTypes.Log) *ethTypes.Log { topics := make([]ethCommon.Hash, len(l.Topics)) for n, t := range l.Topics { topics[n] = ethCommon.BytesToHash(t.Bytes()) @@ -213,10 +207,10 @@ func convertLogToEth(l *celoTypes.Log) *ethTypes.Log { } } -func convertReceiptToEth(celoReceipt *celoTypes.Receipt) *ethTypes.Receipt { +func convertCeloReceiptToEth(celoReceipt *celoTypes.Receipt) *ethTypes.Receipt { ethLogs := make([]*ethTypes.Log, len(celoReceipt.Logs)) for n, l := range celoReceipt.Logs { - ethLogs[n] = convertLogToEth(l) + ethLogs[n] = convertCeloLogToEth(l) } return ðTypes.Receipt{ @@ -235,7 +229,7 @@ func convertReceiptToEth(celoReceipt *celoTypes.Receipt) *ethTypes.Receipt { } } -func convertLogFromEth(l *ethTypes.Log) *celoTypes.Log { +func convertCeloLogFromEth(l *ethTypes.Log) *celoTypes.Log { topics := make([]celoCommon.Hash, len(l.Topics)) for n, t := range l.Topics { topics[n] = celoCommon.BytesToHash(t.Bytes()) diff --git a/node/pkg/celo/abi/abi.go b/node/pkg/evm/connectors/celoabi/abi.go similarity index 99% rename from node/pkg/celo/abi/abi.go rename to node/pkg/evm/connectors/celoabi/abi.go index 7c5f86996..7e0a186a3 100644 --- a/node/pkg/celo/abi/abi.go +++ b/node/pkg/evm/connectors/celoabi/abi.go @@ -1,7 +1,7 @@ // Code generated - DO NOT EDIT. // This file is a generated binding and any manual changes will be lost. -package abi +package celoabi import ( "errors" diff --git a/node/pkg/evm/connectors/common.go b/node/pkg/evm/connectors/common.go new file mode 100644 index 000000000..a7eb0520e --- /dev/null +++ b/node/pkg/evm/connectors/common.go @@ -0,0 +1,65 @@ +package connectors + +import ( + "context" + "errors" + "math/big" + "sync" + + "github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" +) + +type NewBlock struct { + Number *big.Int + Hash common.Hash +} + +// Connector exposes Wormhole-specific interactions with an EVM-based network +type Connector interface { + NetworkName() string + ContractAddress() common.Address + GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) + GetGuardianSet(ctx context.Context, index uint32) (ethabi.StructsGuardianSet, error) + WatchLogMessagePublished(ctx context.Context, sink chan<- *ethabi.AbiLogMessagePublished) (event.Subscription, error) + TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) + TimeOfBlockByHash(ctx context.Context, hash common.Hash) (uint64, error) + ParseLogMessagePublished(log types.Log) (*ethabi.AbiLogMessagePublished, error) + SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) + RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error +} + +type PollSubscription struct { + errOnce sync.Once + err chan error + quit chan error + unsubDone chan struct{} +} + +func NewPollSubscription() *PollSubscription { + return &PollSubscription{ + err: make(chan error, 1), + quit: make(chan error, 1), + unsubDone: make(chan struct{}, 1), + } +} + +var ErrUnsubscribed = errors.New("unsubscribed") + +func (sub *PollSubscription) Err() <-chan error { + return sub.err +} + +func (sub *PollSubscription) Unsubscribe() { + sub.errOnce.Do(func() { + select { + case sub.quit <- ErrUnsubscribed: + <-sub.unsubDone + case <-sub.unsubDone: + } + close(sub.err) + }) +} diff --git a/node/pkg/ethereum/abi/abi.go b/node/pkg/evm/connectors/ethabi/abi.go similarity index 99% rename from node/pkg/ethereum/abi/abi.go rename to node/pkg/evm/connectors/ethabi/abi.go index 4c0e49d8a..3a45d5b0c 100644 --- a/node/pkg/ethereum/abi/abi.go +++ b/node/pkg/evm/connectors/ethabi/abi.go @@ -1,7 +1,7 @@ // Code generated - DO NOT EDIT. // This file is a generated binding and any manual changes will be lost. -package abi +package ethabi import ( "math/big" diff --git a/node/pkg/evm/connectors/ethereum.go b/node/pkg/evm/connectors/ethereum.go new file mode 100644 index 000000000..4e4688c8b --- /dev/null +++ b/node/pkg/evm/connectors/ethereum.go @@ -0,0 +1,136 @@ +package connectors + +import ( + "context" + + ethRpc "github.com/ethereum/go-ethereum/rpc" + + ethAbi "github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi" + ethereum "github.com/ethereum/go-ethereum" + ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind" + ethCommon "github.com/ethereum/go-ethereum/common" + ethTypes "github.com/ethereum/go-ethereum/core/types" + ethClient "github.com/ethereum/go-ethereum/ethclient" + ethEvent "github.com/ethereum/go-ethereum/event" + + "go.uber.org/zap" +) + +// EthereumConnector implements EVM network query capabilities for go-ethereum based networks and networks supporting +// the standard web3 rpc. +type EthereumConnector struct { + networkName string + address ethCommon.Address + logger *zap.Logger + client *ethClient.Client + rawClient *ethRpc.Client + filterer *ethAbi.AbiFilterer + caller *ethAbi.AbiCaller +} + +func NewEthereumConnector(ctx context.Context, networkName, rawUrl string, address ethCommon.Address, logger *zap.Logger) (*EthereumConnector, error) { + rawClient, err := ethRpc.DialContext(ctx, rawUrl) + if err != nil { + return nil, err + } + + client := ethClient.NewClient(rawClient) + + filterer, err := ethAbi.NewAbiFilterer(ethCommon.BytesToAddress(address.Bytes()), client) + if err != nil { + panic(err) + } + caller, err := ethAbi.NewAbiCaller(ethCommon.BytesToAddress(address.Bytes()), client) + if err != nil { + panic(err) + } + + return &EthereumConnector{ + networkName: networkName, + address: address, + logger: logger.With(zap.String("eth_network", networkName)), + client: client, + filterer: filterer, + caller: caller, + rawClient: rawClient, + }, nil +} + +func (e *EthereumConnector) NetworkName() string { + return e.networkName +} + +func (e *EthereumConnector) ContractAddress() ethCommon.Address { + return e.address +} + +func (e *EthereumConnector) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) { + return e.caller.GetCurrentGuardianSetIndex(ðBind.CallOpts{Context: ctx}) +} + +func (e *EthereumConnector) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) { + return e.caller.GetGuardianSet(ðBind.CallOpts{Context: ctx}, index) +} + +func (e *EthereumConnector) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { + return e.filterer.WatchLogMessagePublished(ðBind.WatchOpts{Context: ctx}, sink, nil) +} + +func (e *EthereumConnector) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) { + return e.client.TransactionReceipt(ctx, txHash) +} + +func (e *EthereumConnector) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) { + block, err := e.client.BlockByHash(ctx, hash) + if err != nil { + return 0, err + } + + return block.Time(), err +} + +func (e *EthereumConnector) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) { + return e.filterer.ParseLogMessagePublished(log) +} + +func (e *EthereumConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) { + headSink := make(chan *ethTypes.Header, 2) + headerSubscription, err := e.client.SubscribeNewHead(ctx, headSink) + if err != nil { + return nil, err + } + + // The purpose of this is to map events from the geth event channel to the new block event channel. + go func() { + for { + select { + case <-ctx.Done(): + return + case ev := <-headSink: + if ev == nil { + e.logger.Error("new header event is nil") + continue + } + if ev.Number == nil { + e.logger.Error("new header block number is nil") + continue + } + sink <- &NewBlock{ + Number: ev.Number, + Hash: ev.Hash(), + } + } + } + }() + + return headerSubscription, err +} + +func (e *EthereumConnector) RawCallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { + return e.rawClient.CallContext(ctx, result, method, args...) + +} + +func (e *EthereumConnector) Client() *ethClient.Client { + return e.client +} diff --git a/node/pkg/evm/connectors/logpoller.go b/node/pkg/evm/connectors/logpoller.go new file mode 100644 index 000000000..80c4d9114 --- /dev/null +++ b/node/pkg/evm/connectors/logpoller.go @@ -0,0 +1,152 @@ +package connectors + +import ( + "context" + "fmt" + "math/big" + "time" + + ethAbi "github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi" + "github.com/certusone/wormhole/node/pkg/supervisor" + ethereum "github.com/ethereum/go-ethereum" + ethCommon "github.com/ethereum/go-ethereum/common" + ethClient "github.com/ethereum/go-ethereum/ethclient" + ethEvent "github.com/ethereum/go-ethereum/event" + + "go.uber.org/zap" +) + +// LogPollConnector pulls logs on each new block event when subscribing using WatchLogMessagePublished instead of using +// a websocket connection. It can be used in conjunction with a BlockPollConnector and Finalizer to only return +// finalized message log events. +type LogPollConnector struct { + Connector + client *ethClient.Client + messageFeed ethEvent.Feed + errFeed ethEvent.Feed + + prevBlockNum *big.Int +} + +func NewLogPollConnector(ctx context.Context, baseConnector Connector, client *ethClient.Client) (*LogPollConnector, error) { + connector := &LogPollConnector{Connector: baseConnector, client: client} + // The supervisor will keep the poller running + err := supervisor.Run(ctx, "logPoller", connector.run) + if err != nil { + return nil, err + } + return connector, nil +} + +func (l *LogPollConnector) run(ctx context.Context) error { + logger := supervisor.Logger(ctx).With(zap.String("eth_network", l.Connector.NetworkName())) + + blockChan := make(chan *NewBlock) + sub, err := l.SubscribeForBlocks(ctx, blockChan) + if err != nil { + return err + } + defer sub.Unsubscribe() + + supervisor.Signal(ctx, supervisor.SignalHealthy) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-sub.Err(): + return err + case block := <-blockChan: + if err := l.processBlock(ctx, logger, block); err != nil { + l.errFeed.Send(err.Error()) + } + } + } +} + +func (l *LogPollConnector) WatchLogMessagePublished(ctx context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) { + sub := NewPollSubscription() + messageSub := l.messageFeed.Subscribe(sink) + + // The feed library does not support error forwarding, so we're emulating that using a custom subscription and + // an error feed. + innerErrSink := make(chan string, 10) + innerErrSub := l.errFeed.Subscribe(innerErrSink) + + go func() { + for { + select { + case <-ctx.Done(): + messageSub.Unsubscribe() + innerErrSub.Unsubscribe() + return + case <-sub.quit: + messageSub.Unsubscribe() + innerErrSub.Unsubscribe() + sub.unsubDone <- struct{}{} + return + case v := <-innerErrSink: + sub.err <- fmt.Errorf(v) + } + } + }() + return sub, nil +} + +var ( + getLogsBigOne = big.NewInt(1) + logsLogMessageTopic = ethCommon.HexToHash("0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2") +) + +func (l *LogPollConnector) processBlock(ctx context.Context, logger *zap.Logger, block *NewBlock) error { + if l.prevBlockNum == nil { + l.prevBlockNum = new(big.Int).Set(block.Number) + } else { + l.prevBlockNum.Add(l.prevBlockNum, getLogsBigOne) + } + + filter := ethereum.FilterQuery{ + FromBlock: l.prevBlockNum, + ToBlock: block.Number, + Addresses: []ethCommon.Address{l.ContractAddress()}, + } + + *l.prevBlockNum = *block.Number + + tCtx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + logs, err := l.client.FilterLogs(tCtx, filter) + if err != nil { + logger.Error("GetLogsQuery: query of eth_getLogs failed", + zap.Stringer("FromBlock", filter.FromBlock), + zap.Stringer("ToBlock", filter.ToBlock), + zap.Error(err), + ) + + return fmt.Errorf("GetLogsQuery: failed to query for log messages: %w", err) + } + + if len(logs) == 0 { + return nil + } + + for _, log := range logs { + if log.Topics[0] != logsLogMessageTopic { + continue + } + ev, err := l.ParseLogMessagePublished(log) + if err != nil { + logger.Error("GetLogsQuery: failed to parse log entry", + zap.Stringer("FromBlock", filter.FromBlock), + zap.Stringer("ToBlock", filter.ToBlock), + zap.Error(err), + ) + + l.errFeed.Send(fmt.Errorf("failed to parse log message: %w", err)) + continue + } + + l.messageFeed.Send(ev) + } + + return nil +} diff --git a/node/pkg/evm/connectors/poller.go b/node/pkg/evm/connectors/poller.go new file mode 100644 index 000000000..3531af01d --- /dev/null +++ b/node/pkg/evm/connectors/poller.go @@ -0,0 +1,196 @@ +package connectors + +import ( + "context" + "fmt" + "math/big" + "time" + + "github.com/certusone/wormhole/node/pkg/supervisor" + ethEvent "github.com/ethereum/go-ethereum/event" + + ethereum "github.com/ethereum/go-ethereum" + ethCommon "github.com/ethereum/go-ethereum/common" + ethHexUtils "github.com/ethereum/go-ethereum/common/hexutil" + "go.uber.org/zap" +) + +type PollFinalizer interface { + IsBlockFinalized(ctx context.Context, block *NewBlock) (bool, error) +} + +// BlockPollConnector polls for new blocks instead of subscribing when using SubscribeForBlocks. It allows to specify a +// finalizer which will be used to only return finalized blocks on subscriptions. +type BlockPollConnector struct { + Connector + Delay time.Duration + isEthPoS bool + hasEthSwitchedToPoS bool + finalizer PollFinalizer + + blockFeed ethEvent.Feed + errFeed ethEvent.Feed +} + +func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finalizer PollFinalizer, delay time.Duration, isEthPoS bool) (*BlockPollConnector, error) { + connector := &BlockPollConnector{ + Connector: baseConnector, + Delay: delay, + isEthPoS: isEthPoS, + hasEthSwitchedToPoS: false, + finalizer: finalizer, + } + err := supervisor.Run(ctx, "blockPoller", connector.run) + if err != nil { + return nil, err + } + return connector, nil +} + +func (b *BlockPollConnector) run(ctx context.Context) error { + logger := supervisor.Logger(ctx).With(zap.String("eth_network", b.Connector.NetworkName())) + + lastBlock, err := b.getBlock(ctx, logger, nil) + if err != nil { + return err + } + + timer := time.NewTimer(time.Millisecond) // Start immediately. + supervisor.Signal(ctx, supervisor.SignalHealthy) + + for { + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + lastBlock, err = b.pollBlocks(ctx, logger, lastBlock) + if err != nil { + b.errFeed.Send("polling encountered an error") + } + timer.Reset(b.Delay) + } + } +} + +func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger, lastBlock *NewBlock) (lastPublishedBlock *NewBlock, retErr error) { + lastPublishedBlock = lastBlock + + // Fetch the latest block on the chain + // We could do this on every iteration such that if a new block is created while this function is being executed, + // it would automatically fetch new blocks but in order to reduce API load this will be done on the next iteration. + latestBlock, err := b.getBlock(ctx, logger, nil) + if err != nil { + logger.Error("failed to look up latest block", + zap.Uint64("lastSeenBlock", lastBlock.Number.Uint64()), zap.Error(err)) + return lastPublishedBlock, fmt.Errorf("failed to look up latest block: %w", err) + } + for { + if lastPublishedBlock.Number.Cmp(latestBlock.Number) >= 0 { + // We have to wait for a new block to become available + return + } + + // Try to fetch the next block between lastBlock and latestBlock + nextBlockNumber := new(big.Int).Add(lastPublishedBlock.Number, big.NewInt(1)) + block, err := b.getBlock(ctx, logger, nextBlockNumber) + if err != nil { + logger.Error("failed to fetch next block", + zap.Uint64("block", nextBlockNumber.Uint64()), zap.Error(err)) + return lastPublishedBlock, fmt.Errorf("failed to fetch next block (%d): %w", nextBlockNumber.Uint64(), err) + } + + if b.finalizer != nil { + finalized, err := b.finalizer.IsBlockFinalized(ctx, block) + if err != nil { + logger.Error("failed to check block finalization", + zap.Uint64("block", block.Number.Uint64()), zap.Error(err)) + return lastPublishedBlock, fmt.Errorf("failed to check block finalization (%d): %w", block.Number.Uint64(), err) + } + + if !finalized { + break + } + } + + b.blockFeed.Send(block) + lastPublishedBlock = block + } + + return +} + +func (b *BlockPollConnector) SetEthSwitched() { + b.hasEthSwitchedToPoS = true +} + +func (b *BlockPollConnector) SubscribeForBlocks(ctx context.Context, sink chan<- *NewBlock) (ethereum.Subscription, error) { + sub := NewPollSubscription() + blockSub := b.blockFeed.Subscribe(sink) + + // The feed library does not support error forwarding, so we're emulating that using a custom subscription and + // an error feed. The feed library can't handle interfaces which is why we post strings. + innerErrSink := make(chan string, 10) + innerErrSub := b.errFeed.Subscribe(innerErrSink) + + go func() { + for { + select { + case <-ctx.Done(): + blockSub.Unsubscribe() + innerErrSub.Unsubscribe() + return + case <-sub.quit: + blockSub.Unsubscribe() + innerErrSub.Unsubscribe() + sub.unsubDone <- struct{}{} + return + case v := <-innerErrSink: + sub.err <- fmt.Errorf(v) + } + } + }() + return sub, nil +} + +func (b *BlockPollConnector) getBlock(ctx context.Context, logger *zap.Logger, number *big.Int) (*NewBlock, error) { + var numStr string + if number != nil { + numStr = ethHexUtils.EncodeBig(number) + } else if b.hasEthSwitchedToPoS { + numStr = "finalized" + } else { + numStr = "latest" + } + + type Marshaller struct { + Number *ethHexUtils.Big + Hash ethCommon.Hash `json:"hash"` + Difficulty *ethHexUtils.Big + } + + var m Marshaller + err := b.Connector.RawCallContext(ctx, &m, "eth_getBlockByNumber", numStr, false) + if err != nil { + logger.Error("failed to get block", + zap.String("requested_block", numStr), zap.Error(err)) + return nil, err + } + if m.Number == nil { + logger.Error("failed to unmarshal block", + zap.String("requested_block", numStr), + ) + return nil, fmt.Errorf("failed to unmarshal block: Number is nil") + } + d := big.Int(*m.Difficulty) + if b.isEthPoS && !b.hasEthSwitchedToPoS && d.Cmp(big.NewInt(0)) == 0 { + logger.Info("switching from latest to finalized", zap.Duration("delay", b.Delay)) + b.SetEthSwitched() + return b.getBlock(ctx, logger, number) + } + n := big.Int(*m.Number) + return &NewBlock{ + Number: &n, + Hash: m.Hash, + }, nil +} diff --git a/node/pkg/evm/finalizers/default.go b/node/pkg/evm/finalizers/default.go new file mode 100644 index 000000000..703e4dafc --- /dev/null +++ b/node/pkg/evm/finalizers/default.go @@ -0,0 +1,19 @@ +package finalizers + +import ( + "context" + + "github.com/certusone/wormhole/node/pkg/evm/connectors" +) + +// DefaultFinalizer assumes all blocks to be finalized. +type DefaultFinalizer struct { +} + +func NewDefaultFinalizer() *DefaultFinalizer { + return &DefaultFinalizer{} +} + +func (d *DefaultFinalizer) IsBlockFinalized(ctx context.Context, block *connectors.NewBlock) (bool, error) { + return true, nil +} diff --git a/node/pkg/evm/finalizers/moonbeam.go b/node/pkg/evm/finalizers/moonbeam.go new file mode 100644 index 000000000..10fa920ae --- /dev/null +++ b/node/pkg/evm/finalizers/moonbeam.go @@ -0,0 +1,35 @@ +package finalizers + +import ( + "context" + + "github.com/certusone/wormhole/node/pkg/evm/connectors" + "go.uber.org/zap" +) + +// MoonbeamFinalizer implements the finality check for Moonbeam. +// Moonbeam can publish blocks before they are marked final. This means we need to sit on the block until a special "is finalized" +// query returns true. The assumption is that every block number will eventually be published and finalized, it's just that the contents +// of the block (and therefore the hash) might change if there is a rollback. +type MoonbeamFinalizer struct { + logger *zap.Logger + connector connectors.Connector +} + +func NewMoonbeamFinalizer(logger *zap.Logger, connector connectors.Connector) *MoonbeamFinalizer { + return &MoonbeamFinalizer{ + logger: logger, + connector: connector, + } +} + +func (m *MoonbeamFinalizer) IsBlockFinalized(ctx context.Context, block *connectors.NewBlock) (bool, error) { + var finalized bool + err := m.connector.RawCallContext(ctx, &finalized, "moon_isBlockFinalized", block.Hash.Hex()) + if err != nil { + m.logger.Error("failed to check for finality", zap.String("eth_network", m.connector.NetworkName()), zap.Error(err)) + return false, err + } + + return finalized, nil +} diff --git a/node/pkg/ethereum/utils.go b/node/pkg/evm/utils.go similarity index 95% rename from node/pkg/ethereum/utils.go rename to node/pkg/evm/utils.go index b04174206..7cb88806c 100644 --- a/node/pkg/ethereum/utils.go +++ b/node/pkg/evm/utils.go @@ -1,4 +1,4 @@ -package ethereum +package evm import ( "github.com/ethereum/go-ethereum/common" diff --git a/node/pkg/ethereum/utils_test.go b/node/pkg/evm/utils_test.go similarity index 96% rename from node/pkg/ethereum/utils_test.go rename to node/pkg/evm/utils_test.go index f38e9ed09..be57b7e24 100644 --- a/node/pkg/ethereum/utils_test.go +++ b/node/pkg/evm/utils_test.go @@ -1,4 +1,4 @@ -package ethereum +package evm import ( "testing" diff --git a/node/pkg/ethereum/watcher.go b/node/pkg/evm/watcher.go similarity index 67% rename from node/pkg/ethereum/watcher.go rename to node/pkg/evm/watcher.go index e4daca3e2..8d23e5194 100644 --- a/node/pkg/ethereum/watcher.go +++ b/node/pkg/evm/watcher.go @@ -1,4 +1,4 @@ -package ethereum +package evm import ( "context" @@ -7,6 +7,10 @@ import ( "sync/atomic" "time" + "github.com/certusone/wormhole/node/pkg/evm/connectors" + "github.com/certusone/wormhole/node/pkg/evm/connectors/ethabi" + "github.com/certusone/wormhole/node/pkg/evm/finalizers" + "github.com/certusone/wormhole/node/pkg/p2p" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" "github.com/ethereum/go-ethereum/rpc" @@ -17,9 +21,7 @@ import ( eth_common "github.com/ethereum/go-ethereum/common" "go.uber.org/zap" - "github.com/certusone/wormhole/node/pkg/celo" "github.com/certusone/wormhole/node/pkg/common" - "github.com/certusone/wormhole/node/pkg/ethereum/abi" "github.com/certusone/wormhole/node/pkg/readiness" "github.com/certusone/wormhole/node/pkg/supervisor" "github.com/wormhole-foundation/wormhole/sdk/vaa" @@ -63,7 +65,7 @@ type ( Watcher struct { // Ethereum RPC url url string - // Address of the Eth contract contract + // Address of the Eth contract contract eth_common.Address // Human-readable name of the Eth network, for logging and monitoring. networkName string @@ -100,8 +102,9 @@ type ( minConfirmations uint64 // Interface to the chain specific ethereum library. - ethIntf common.Ethish + ethConn connectors.Connector shouldCheckSafeMode bool + unsafeDevMode bool } pendingKey struct { @@ -129,21 +132,6 @@ func NewEthWatcher( obsvReqC chan *gossipv1.ObservationRequest, unsafeDevMode bool) *Watcher { - var ethIntf common.Ethish - if chainID == vaa.ChainIDCelo && !unsafeDevMode { - // When we are running in mainnet or testnet, we need to use the Celo ethereum library rather than go-ethereum. - // However, in devnet, we currently run the standard ETH node for Celo, so we need to use the standard go-ethereum. - ethIntf = &celo.CeloImpl{NetworkName: networkName} - } else if chainID == vaa.ChainIDEthereum && !unsafeDevMode { - ethIntf = &PollImpl{BaseEth: EthImpl{NetworkName: networkName}, DelayInMs: 250, IsEthPoS: true} - } else if chainID == vaa.ChainIDMoonbeam && !unsafeDevMode { - ethIntf = &PollImpl{BaseEth: EthImpl{NetworkName: networkName}, Finalizer: &MoonbeamFinalizer{}, DelayInMs: 250} - } else if chainID == vaa.ChainIDNeon { - ethIntf = NewGetLogsImpl(networkName, contract, 250) - } else { - ethIntf = &EthImpl{NetworkName: networkName} - } - return &Watcher{ url: url, contract: contract, @@ -155,42 +143,91 @@ func NewEthWatcher( setChan: setEvents, obsvReqC: obsvReqC, pending: map[pendingKey]*pendingMessage{}, - ethIntf: ethIntf, - shouldCheckSafeMode: (chainID == vaa.ChainIDKarura || chainID == vaa.ChainIDAcala) && (!unsafeDevMode)} + shouldCheckSafeMode: (chainID == vaa.ChainIDKarura || chainID == vaa.ChainIDAcala) && (!unsafeDevMode), + unsafeDevMode: unsafeDevMode, + } } -func (e *Watcher) Run(ctx context.Context) error { +func (w *Watcher) Run(ctx context.Context) error { logger := supervisor.Logger(ctx) - e.ethIntf.SetLogger(logger) - if e.shouldCheckSafeMode { - if err := e.checkForSafeMode(ctx); err != nil { + if w.shouldCheckSafeMode { + if err := w.checkForSafeMode(ctx); err != nil { return err } } // Initialize gossip metrics (we want to broadcast the address even if we're not yet syncing) - p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{ - ContractAddress: e.contract.Hex(), + p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{ + ContractAddress: w.contract.Hex(), }) timeout, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() - err := e.ethIntf.DialContext(timeout, e.url) - if err != nil { - ethConnectionErrors.WithLabelValues(e.networkName, "dial_error").Inc() - p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) - return fmt.Errorf("dialing eth client failed: %w", err) - } - err = e.ethIntf.NewAbiFilterer(e.contract) - if err != nil { - return fmt.Errorf("could not create wormhole contract filter: %w", err) - } - - err = e.ethIntf.NewAbiCaller(e.contract) - if err != nil { - panic(err) + var err error + if w.chainID == vaa.ChainIDCelo && !w.unsafeDevMode { + // When we are running in mainnet or testnet, we need to use the Celo ethereum library rather than go-ethereum. + // However, in devnet, we currently run the standard ETH node for Celo, so we need to use the standard go-ethereum. + w.ethConn, err = connectors.NewCeloConnector(timeout, w.networkName, w.url, w.contract, logger) + if err != nil { + ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) + return fmt.Errorf("dialing eth client failed: %w", err) + } + } else if w.chainID == vaa.ChainIDEthereum && !w.unsafeDevMode { + baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger) + if err != nil { + ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) + return fmt.Errorf("dialing eth client failed: %w", err) + } + w.ethConn, err = connectors.NewBlockPollConnector(ctx, baseConnector, finalizers.NewDefaultFinalizer(), 250*time.Millisecond, true) + if err != nil { + ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) + return fmt.Errorf("creating block poll connector failed: %w", err) + } + } else if w.chainID == vaa.ChainIDMoonbeam && !w.unsafeDevMode { + baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger) + if err != nil { + ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) + return fmt.Errorf("dialing eth client failed: %w", err) + } + finalizer := finalizers.NewMoonbeamFinalizer(logger, baseConnector) + w.ethConn, err = connectors.NewBlockPollConnector(ctx, baseConnector, finalizer, 250*time.Millisecond, false) + if err != nil { + ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) + return fmt.Errorf("creating block poll connector failed: %w", err) + } + } else if w.chainID == vaa.ChainIDNeon { + baseConnector, err := connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger) + if err != nil { + ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) + return fmt.Errorf("dialing eth client failed: %w", err) + } + pollConnector, err := connectors.NewBlockPollConnector(ctx, baseConnector, finalizers.NewDefaultFinalizer(), 250*time.Millisecond, false) + if err != nil { + ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) + return fmt.Errorf("creating block poll connector failed: %w", err) + } + w.ethConn, err = connectors.NewLogPollConnector(ctx, pollConnector, baseConnector.Client()) + if err != nil { + ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) + return fmt.Errorf("creating poll connector failed: %w", err) + } + } else { + w.ethConn, err = connectors.NewEthereumConnector(timeout, w.networkName, w.url, w.contract, logger) + if err != nil { + ethConnectionErrors.WithLabelValues(w.networkName, "dial_error").Inc() + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) + return fmt.Errorf("dialing eth client failed: %w", err) + } } // Timeout for initializing subscriptions @@ -198,16 +235,17 @@ func (e *Watcher) Run(ctx context.Context) error { defer cancel() // Subscribe to new message publications - messageC := make(chan *abi.AbiLogMessagePublished, 2) - messageSub, err := e.ethIntf.WatchLogMessagePublished(ctx, timeout, messageC) + messageC := make(chan *ethabi.AbiLogMessagePublished, 2) + messageSub, err := w.ethConn.WatchLogMessagePublished(timeout, messageC) + defer messageSub.Unsubscribe() if err != nil { - ethConnectionErrors.WithLabelValues(e.networkName, "subscribe_error").Inc() - p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + ethConnectionErrors.WithLabelValues(w.networkName, "subscribe_error").Inc() + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return fmt.Errorf("failed to subscribe to message publication events: %w", err) } // Fetch initial guardian set - if err := e.fetchAndUpdateGuardianSet(logger, ctx, e.ethIntf); err != nil { + if err := w.fetchAndUpdateGuardianSet(logger, ctx, w.ethConn); err != nil { return fmt.Errorf("failed to request guardian set: %v", err) } @@ -222,7 +260,7 @@ func (e *Watcher) Run(ctx context.Context) error { case <-ctx.Done(): return case <-t.C: - if err := e.fetchAndUpdateGuardianSet(logger, ctx, e.ethIntf); err != nil { + if err := w.fetchAndUpdateGuardianSet(logger, ctx, w.ethConn); err != nil { errC <- fmt.Errorf("failed to request guardian set: %v", err) return } @@ -239,16 +277,16 @@ func (e *Watcher) Run(ctx context.Context) error { select { case <-ctx.Done(): return - case r := <-e.obsvReqC: + case r := <-w.obsvReqC: // This can't happen unless there is a programming error - the caller // is expected to send us only requests for our chainID. - if vaa.ChainID(r.ChainId) != e.chainID { + if vaa.ChainID(r.ChainId) != w.chainID { panic("invalid chain ID") } tx := eth_common.BytesToHash(r.TxHash) logger.Info("received observation request", - zap.String("eth_network", e.networkName), + zap.String("eth_network", w.networkName), zap.String("tx_hash", tx.Hex())) // SECURITY: Load the block number before requesting the transaction to avoid a @@ -261,24 +299,24 @@ func (e *Watcher) Run(ctx context.Context) error { blockNumberU := atomic.LoadUint64(¤tBlockNumber) if blockNumberU == 0 { logger.Error("no block number available, ignoring observation request", - zap.String("eth_network", e.networkName)) + zap.String("eth_network", w.networkName)) continue } timeout, cancel := context.WithTimeout(ctx, 5*time.Second) - blockNumber, msgs, err := MessageEventsForTransaction(timeout, e.ethIntf, e.contract, e.chainID, tx) + blockNumber, msgs, err := MessageEventsForTransaction(timeout, w.ethConn, w.contract, w.chainID, tx) cancel() if err != nil { logger.Error("failed to process observation request", - zap.Error(err), zap.String("eth_network", e.networkName)) + zap.Error(err), zap.String("eth_network", w.networkName)) continue } for _, msg := range msgs { expectedConfirmations := uint64(msg.ConsistencyLevel) - if expectedConfirmations < e.minConfirmations { - expectedConfirmations = e.minConfirmations + if expectedConfirmations < w.minConfirmations { + expectedConfirmations = w.minConfirmations } // SECURITY: In the recovery flow, we already know which transaction to @@ -298,9 +336,9 @@ func (e *Watcher) Run(ctx context.Context) error { zap.Uint64("sequence", msg.Sequence), zap.Uint64("current_block", blockNumberU), zap.Uint64("observed_block", blockNumber), - zap.String("eth_network", e.networkName), + zap.String("eth_network", w.networkName), ) - e.msgChan <- msg + w.msgChan <- msg } else { logger.Info("ignoring re-observed message publication transaction", zap.Stringer("tx", msg.TxHash), @@ -309,7 +347,7 @@ func (e *Watcher) Run(ctx context.Context) error { zap.Uint64("current_block", blockNumberU), zap.Uint64("observed_block", blockNumber), zap.Uint64("expected_confirmations", expectedConfirmations), - zap.String("eth_network", e.networkName), + zap.String("eth_network", w.networkName), ) } } @@ -323,21 +361,21 @@ func (e *Watcher) Run(ctx context.Context) error { case <-ctx.Done(): return case err := <-messageSub.Err(): - ethConnectionErrors.WithLabelValues(e.networkName, "subscription_error").Inc() + ethConnectionErrors.WithLabelValues(w.networkName, "subscription_error").Inc() errC <- fmt.Errorf("error while processing message publication subscription: %w", err) - p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return case ev := <-messageC: // Request timestamp for block msm := time.Now() timeout, cancel := context.WithTimeout(ctx, 15*time.Second) - blockTime, err := e.ethIntf.TimeOfBlockByHash(timeout, ev.Raw.BlockHash) + blockTime, err := w.ethConn.TimeOfBlockByHash(timeout, ev.Raw.BlockHash) cancel() - queryLatency.WithLabelValues(e.networkName, "block_by_number").Observe(time.Since(msm).Seconds()) + queryLatency.WithLabelValues(w.networkName, "block_by_number").Observe(time.Since(msm).Seconds()) if err != nil { - ethConnectionErrors.WithLabelValues(e.networkName, "block_by_number_error").Inc() - p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + ethConnectionErrors.WithLabelValues(w.networkName, "block_by_number_error").Inc() + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) errC <- fmt.Errorf("failed to request timestamp for block %d, hash %s: %w", ev.Raw.BlockNumber, ev.Raw.BlockHash.String(), err) return @@ -348,7 +386,7 @@ func (e *Watcher) Run(ctx context.Context) error { Timestamp: time.Unix(int64(blockTime), 0), Nonce: ev.Nonce, Sequence: ev.Sequence, - EmitterChain: e.chainID, + EmitterChain: w.chainID, EmitterAddress: PadAddress(ev.Sender), Payload: ev.Payload, ConsistencyLevel: ev.ConsistencyLevel, @@ -361,9 +399,9 @@ func (e *Watcher) Run(ctx context.Context) error { zap.Uint64("Sequence", ev.Sequence), zap.Uint32("Nonce", ev.Nonce), zap.Uint8("ConsistencyLevel", ev.ConsistencyLevel), - zap.String("eth_network", e.networkName)) + zap.String("eth_network", w.networkName)) - ethMessagesObserved.WithLabelValues(e.networkName).Inc() + ethMessagesObserved.WithLabelValues(w.networkName).Inc() key := pendingKey{ TxHash: message.TxHash, @@ -372,22 +410,22 @@ func (e *Watcher) Run(ctx context.Context) error { Sequence: message.Sequence, } - e.pendingMu.Lock() - e.pending[key] = &pendingMessage{ + w.pendingMu.Lock() + w.pending[key] = &pendingMessage{ message: message, height: ev.Raw.BlockNumber, } - e.pendingMu.Unlock() + w.pendingMu.Unlock() } } }() // Watch headers - headSink := make(chan *common.NewBlock, 2) - headerSubscription, err := e.ethIntf.SubscribeForBlocks(ctx, headSink) + headSink := make(chan *connectors.NewBlock, 2) + headerSubscription, err := w.ethConn.SubscribeForBlocks(ctx, headSink) if err != nil { - ethConnectionErrors.WithLabelValues(e.networkName, "header_subscribe_error").Inc() - p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + ethConnectionErrors.WithLabelValues(w.networkName, "header_subscribe_error").Inc() + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return fmt.Errorf("failed to subscribe to header events: %w", err) } @@ -397,18 +435,18 @@ func (e *Watcher) Run(ctx context.Context) error { case <-ctx.Done(): return case err := <-headerSubscription.Err(): - ethConnectionErrors.WithLabelValues(e.networkName, "header_subscription_error").Inc() + ethConnectionErrors.WithLabelValues(w.networkName, "header_subscription_error").Inc() errC <- fmt.Errorf("error while processing header subscription: %w", err) - p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return case ev := <-headSink: // These two pointers should have been checked before the event was placed on the channel, but just being safe. if ev == nil { - logger.Error("new header event is nil", zap.String("eth_network", e.networkName)) + logger.Error("new header event is nil", zap.String("eth_network", w.networkName)) continue } if ev.Number == nil { - logger.Error("new header block number is nil", zap.String("eth_network", e.networkName)) + logger.Error("new header block number is nil", zap.String("eth_network", w.networkName)) continue } @@ -417,23 +455,23 @@ func (e *Watcher) Run(ctx context.Context) error { logger.Info("processing new header", zap.Stringer("current_block", ev.Number), zap.Stringer("current_blockhash", currentHash), - zap.String("eth_network", e.networkName)) - currentEthHeight.WithLabelValues(e.networkName).Set(float64(ev.Number.Int64())) - readiness.SetReady(e.readiness) - p2p.DefaultRegistry.SetNetworkStats(e.chainID, &gossipv1.Heartbeat_Network{ + zap.String("eth_network", w.networkName)) + currentEthHeight.WithLabelValues(w.networkName).Set(float64(ev.Number.Int64())) + readiness.SetReady(w.readiness) + p2p.DefaultRegistry.SetNetworkStats(w.chainID, &gossipv1.Heartbeat_Network{ Height: ev.Number.Int64(), - ContractAddress: e.contract.Hex(), + ContractAddress: w.contract.Hex(), }) - e.pendingMu.Lock() + w.pendingMu.Lock() blockNumberU := ev.Number.Uint64() atomic.StoreUint64(¤tBlockNumber, blockNumberU) - for key, pLock := range e.pending { + for key, pLock := range w.pending { expectedConfirmations := uint64(pLock.message.ConsistencyLevel) - if expectedConfirmations < e.minConfirmations { - expectedConfirmations = e.minConfirmations + if expectedConfirmations < w.minConfirmations { + expectedConfirmations = w.minConfirmations } // Transaction was dropped and never picked up again @@ -445,17 +483,17 @@ func (e *Watcher) Run(ctx context.Context) error { zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), zap.Stringer("current_blockhash", currentHash), - zap.String("eth_network", e.networkName), + zap.String("eth_network", w.networkName), ) - ethMessagesOrphaned.WithLabelValues(e.networkName, "timeout").Inc() - delete(e.pending, key) + ethMessagesOrphaned.WithLabelValues(w.networkName, "timeout").Inc() + delete(w.pending, key) continue } // Transaction is now ready if pLock.height+uint64(expectedConfirmations) <= blockNumberU { timeout, cancel := context.WithTimeout(ctx, 5*time.Second) - tx, err := e.ethIntf.TransactionReceipt(timeout, pLock.message.TxHash) + tx, err := w.ethConn.TransactionReceipt(timeout, pLock.message.TxHash) cancel() // If the node returns an error after waiting expectedConfirmation blocks, @@ -474,10 +512,10 @@ func (e *Watcher) Run(ctx context.Context) error { zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), zap.Stringer("current_blockhash", currentHash), - zap.String("eth_network", e.networkName), + zap.String("eth_network", w.networkName), zap.Error(err)) - delete(e.pending, key) - ethMessagesOrphaned.WithLabelValues(e.networkName, "not_found").Inc() + delete(w.pending, key) + ethMessagesOrphaned.WithLabelValues(w.networkName, "not_found").Inc() continue } @@ -492,10 +530,10 @@ func (e *Watcher) Run(ctx context.Context) error { zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), zap.Stringer("current_blockhash", currentHash), - zap.String("eth_network", e.networkName), + zap.String("eth_network", w.networkName), zap.Error(err)) - delete(e.pending, key) - ethMessagesOrphaned.WithLabelValues(e.networkName, "tx_failed").Inc() + delete(w.pending, key) + ethMessagesOrphaned.WithLabelValues(w.networkName, "tx_failed").Inc() continue } @@ -508,7 +546,7 @@ func (e *Watcher) Run(ctx context.Context) error { zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), zap.Stringer("current_blockhash", currentHash), - zap.String("eth_network", e.networkName), + zap.String("eth_network", w.networkName), zap.Error(err)) continue } @@ -524,9 +562,9 @@ func (e *Watcher) Run(ctx context.Context) error { zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), zap.Stringer("current_blockhash", currentHash), - zap.String("eth_network", e.networkName)) - delete(e.pending, key) - ethMessagesOrphaned.WithLabelValues(e.networkName, "blockhash_mismatch").Inc() + zap.String("eth_network", w.networkName)) + delete(w.pending, key) + ethMessagesOrphaned.WithLabelValues(w.networkName, "blockhash_mismatch").Inc() continue } @@ -537,19 +575,19 @@ func (e *Watcher) Run(ctx context.Context) error { zap.Uint64("sequence", key.Sequence), zap.Stringer("current_block", ev.Number), zap.Stringer("current_blockhash", currentHash), - zap.String("eth_network", e.networkName)) - delete(e.pending, key) - e.msgChan <- pLock.message - ethMessagesConfirmed.WithLabelValues(e.networkName).Inc() + zap.String("eth_network", w.networkName)) + delete(w.pending, key) + w.msgChan <- pLock.message + ethMessagesConfirmed.WithLabelValues(w.networkName).Inc() } } - e.pendingMu.Unlock() + w.pendingMu.Unlock() logger.Info("processed new header", zap.Stringer("current_block", ev.Number), zap.Stringer("current_blockhash", currentHash), zap.Duration("took", time.Since(start)), - zap.String("eth_network", e.networkName)) + zap.String("eth_network", w.networkName)) } } }() @@ -562,36 +600,36 @@ func (e *Watcher) Run(ctx context.Context) error { } } -func (e *Watcher) fetchAndUpdateGuardianSet( +func (w *Watcher) fetchAndUpdateGuardianSet( logger *zap.Logger, ctx context.Context, - ethIntf common.Ethish, + ethConn connectors.Connector, ) error { msm := time.Now() logger.Info("fetching guardian set") timeout, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() - idx, gs, err := fetchCurrentGuardianSet(timeout, ethIntf) + idx, gs, err := fetchCurrentGuardianSet(timeout, ethConn) if err != nil { - ethConnectionErrors.WithLabelValues(e.networkName, "guardian_set_fetch_error").Inc() - p2p.DefaultRegistry.AddErrorCount(e.chainID, 1) + ethConnectionErrors.WithLabelValues(w.networkName, "guardian_set_fetch_error").Inc() + p2p.DefaultRegistry.AddErrorCount(w.chainID, 1) return err } - queryLatency.WithLabelValues(e.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds()) + queryLatency.WithLabelValues(w.networkName, "get_guardian_set").Observe(time.Since(msm).Seconds()) - if e.currentGuardianSet != nil && *(e.currentGuardianSet) == idx { + if w.currentGuardianSet != nil && *(w.currentGuardianSet) == idx { return nil } logger.Info("updated guardian set found", zap.Any("value", gs), zap.Uint32("index", idx), - zap.String("eth_network", e.networkName)) + zap.String("eth_network", w.networkName)) - e.currentGuardianSet = &idx + w.currentGuardianSet = &idx - if e.setChan != nil { - e.setChan <- &common.GuardianSet{ + if w.setChan != nil { + w.setChan <- &common.GuardianSet{ Keys: gs.Keys, Index: idx, } @@ -601,13 +639,13 @@ func (e *Watcher) fetchAndUpdateGuardianSet( } // Fetch the current guardian set ID and guardian set from the chain. -func fetchCurrentGuardianSet(ctx context.Context, ethIntf common.Ethish) (uint32, *abi.StructsGuardianSet, error) { - currentIndex, err := ethIntf.GetCurrentGuardianSetIndex(ctx) +func fetchCurrentGuardianSet(ctx context.Context, ethConn connectors.Connector) (uint32, *ethabi.StructsGuardianSet, error) { + currentIndex, err := ethConn.GetCurrentGuardianSetIndex(ctx) if err != nil { return 0, nil, fmt.Errorf("error requesting current guardian set index: %w", err) } - gs, err := ethIntf.GetGuardianSet(ctx, currentIndex) + gs, err := ethConn.GetGuardianSet(ctx, currentIndex) if err != nil { return 0, nil, fmt.Errorf("error requesting current guardian set value: %w", err) } @@ -615,23 +653,23 @@ func fetchCurrentGuardianSet(ctx context.Context, ethIntf common.Ethish) (uint32 return currentIndex, &gs, nil } -func (e *Watcher) checkForSafeMode(ctx context.Context) error { +func (w *Watcher) checkForSafeMode(ctx context.Context) error { timeout, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() - c, err := rpc.DialContext(timeout, e.url) + c, err := rpc.DialContext(timeout, w.url) if err != nil { - return fmt.Errorf("failed to connect to url %s to check for safe mode: %w", e.url, err) + return fmt.Errorf("failed to connect to url %s to check for safe mode: %w", w.url, err) } var safe bool err = c.CallContext(ctx, &safe, "net_isSafeMode") if err != nil { - return fmt.Errorf("check for safe mode for url %s failed: %w", e.url, err) + return fmt.Errorf("check for safe mode for url %s failed: %w", w.url, err) } if !safe { - return fmt.Errorf("url %s is not using safe mode", e.url) + return fmt.Errorf("url %s is not using safe mode", w.url) } return nil