wormhole/node/pkg/ethereum/getlogsimpl.go

197 lines
5.6 KiB
Go

// This implements polling for log events.
// It works by using the finalizer in the polling implementation to check for log events on each new block.
package ethereum
import (
"context"
"errors"
"fmt"
"math/big"
"sync"
"time"
ethereum "github.com/ethereum/go-ethereum"
ethCommon "github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethClient "github.com/ethereum/go-ethereum/ethclient"
ethEvent "github.com/ethereum/go-ethereum/event"
common "github.com/certusone/wormhole/node/pkg/common"
ethAbi "github.com/certusone/wormhole/node/pkg/ethereum/abi"
"go.uber.org/zap"
)
type GetLogsImpl struct {
BasePoller *PollImpl
Query *GetLogsQuery
logger *zap.Logger
}
func NewGetLogsImpl(networkName string, contract ethCommon.Address, delayInMs int) *GetLogsImpl {
query := &GetLogsQuery{ContractAddress: contract}
return &GetLogsImpl{BasePoller: &PollImpl{BaseEth: EthImpl{NetworkName: networkName}, Finalizer: query, DelayInMs: delayInMs}, Query: query}
}
func (e *GetLogsImpl) SetLogger(l *zap.Logger) {
e.logger = l
e.logger.Info("using eth_getLogs api to retreive log events", zap.String("eth_network", e.BasePoller.BaseEth.NetworkName))
e.BasePoller.SetLogger(l)
}
func (e *GetLogsImpl) DialContext(ctx context.Context, rawurl string) (err error) {
e.Query.poller = e.BasePoller
return e.BasePoller.DialContext(ctx, rawurl)
}
func (e *GetLogsImpl) NewAbiFilterer(address ethCommon.Address) (err error) {
return e.BasePoller.NewAbiFilterer(address)
}
func (e *GetLogsImpl) NewAbiCaller(address ethCommon.Address) (err error) {
return e.BasePoller.NewAbiCaller(address)
}
func (e *GetLogsImpl) GetCurrentGuardianSetIndex(ctx context.Context) (uint32, error) {
return e.BasePoller.GetCurrentGuardianSetIndex(ctx)
}
func (e *GetLogsImpl) GetGuardianSet(ctx context.Context, index uint32) (ethAbi.StructsGuardianSet, error) {
return e.BasePoller.GetGuardianSet(ctx, index)
}
type GetLogsPollSubscription struct {
errOnce sync.Once
err chan error
quit chan error
unsubDone chan struct{}
}
var ErrUnsubscribedForGetLogs = errors.New("unsubscribed")
func (sub *GetLogsPollSubscription) Err() <-chan error {
return sub.err
}
func (sub *GetLogsPollSubscription) Unsubscribe() {
sub.errOnce.Do(func() {
select {
case sub.quit <- ErrUnsubscribedForGetLogs:
<-sub.unsubDone
case <-sub.unsubDone:
}
close(sub.err)
})
}
func (e *GetLogsImpl) WatchLogMessagePublished(ctx, timeout context.Context, sink chan<- *ethAbi.AbiLogMessagePublished) (ethEvent.Subscription, error) {
e.Query.sink = sink
e.Query.sub = &GetLogsPollSubscription{
err: make(chan error, 1),
}
return e.Query.sub, nil
}
func (e *GetLogsImpl) TransactionReceipt(ctx context.Context, txHash ethCommon.Hash) (*ethTypes.Receipt, error) {
return e.BasePoller.TransactionReceipt(ctx, txHash)
}
func (e *GetLogsImpl) TimeOfBlockByHash(ctx context.Context, hash ethCommon.Hash) (uint64, error) {
return e.BasePoller.TimeOfBlockByHash(ctx, hash)
}
func (e *GetLogsImpl) ParseLogMessagePublished(log ethTypes.Log) (*ethAbi.AbiLogMessagePublished, error) {
return e.BasePoller.ParseLogMessagePublished(log)
}
func (e *GetLogsImpl) SubscribeForBlocks(ctx context.Context, sink chan<- *common.NewBlock) (ethereum.Subscription, error) {
return e.BasePoller.SubscribeForBlocks(ctx, sink)
}
type GetLogsQuery struct {
logger *zap.Logger
networkName string
ContractAddress ethCommon.Address
prevBlockNum *big.Int
client *ethClient.Client
poller *PollImpl
sink chan<- *ethAbi.AbiLogMessagePublished
sub *GetLogsPollSubscription
}
func (f *GetLogsQuery) SetLogger(l *zap.Logger, netName string) {
f.logger = l
f.networkName = netName
}
func (f *GetLogsQuery) DialContext(ctx context.Context, rawurl string) (err error) {
timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
f.client, err = ethClient.DialContext(timeout, rawurl)
return err
}
var (
getLogsBigOne = big.NewInt(1)
logsLogMessageTopic = ethCommon.HexToHash("0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2")
)
// This doesn't actually check finality, instead it queries for new log events.
func (f *GetLogsQuery) IsBlockFinalized(ctx context.Context, block *common.NewBlock) (bool, error) {
if f.prevBlockNum == nil {
f.prevBlockNum = new(big.Int).Set(block.Number)
} else {
f.prevBlockNum.Add(f.prevBlockNum, getLogsBigOne)
}
filter := ethereum.FilterQuery{
FromBlock: f.prevBlockNum,
ToBlock: block.Number,
Addresses: []ethCommon.Address{f.ContractAddress},
}
*f.prevBlockNum = *block.Number
logs, err := f.client.FilterLogs(ctx, filter)
if err != nil {
f.logger.Error("GetLogsQuery: query of eth_getLogs failed",
zap.String("eth_network", f.networkName),
zap.Stringer("FromBlock", filter.FromBlock),
zap.Stringer("ToBlock", filter.ToBlock),
zap.Error(err),
)
f.sub.err <- fmt.Errorf("GetLogsQuery: failed to query for log messages: %w", err)
return true, nil // We still return true here, because we don't want this error flagged against the poller.
}
if len(logs) == 0 {
return true, nil
}
for _, log := range logs {
if log.Topics[0] == logsLogMessageTopic {
ev, err := f.poller.ParseLogMessagePublished(log)
if err != nil {
f.logger.Error("GetLogsQuery: failed to parse log entry",
zap.String("eth_network", f.networkName),
zap.Stringer("FromBlock", filter.FromBlock),
zap.Stringer("ToBlock", filter.ToBlock),
zap.Error(err),
)
f.sub.err <- fmt.Errorf("failed to parse log message: %w", err)
continue
}
f.sink <- ev
}
}
return true, nil
}