Safe logs request for BSC

This commit is contained in:
Kirill Fedoseev 2021-12-04 12:21:52 +03:00
parent a9fb03713f
commit e3744b3d43
5 changed files with 101 additions and 16 deletions

View File

@ -9,12 +9,13 @@ chains:
block_index_interval: 60s
bsc:
rpc:
host: https://bsc-dataseed1.defibit.io/
host: https://bsc-dataseed4.ninicoin.io/
timeout: 20s
rps: 10
chain_id: 56
block_time: 3s
block_index_interval: 60s
safe_logs_request: true
kovan:
rpc:
host: https://kovan.poa.network

View File

@ -1,8 +1,8 @@
package config
import (
"bytes"
"fmt"
"io/ioutil"
"os"
"time"
@ -22,6 +22,7 @@ type ChainConfig struct {
ChainID string `yaml:"chain_id"`
BlockTime time.Duration `yaml:"block_time"`
BlockIndexInterval time.Duration `yaml:"block_index_interval"`
SafeLogsRequest bool `yaml:"safe_logs_request"`
}
type BridgeSideConfig struct {
@ -69,13 +70,15 @@ type Config struct {
}
func readYamlConfig(cfg *Config) error {
f, err := ioutil.ReadFile("config.yml")
f, err := os.ReadFile("config.yml")
if err != nil {
return fmt.Errorf("can't access config file: %w", err)
}
f = []byte(os.ExpandEnv(string(f)))
err = yaml.Unmarshal(f, cfg)
dec := yaml.NewDecoder(bytes.NewReader(f))
dec.KnownFields(true)
err = dec.Decode(cfg)
if err != nil {
return fmt.Errorf("can't parse yaml config: %w", err)
}

View File

@ -8,16 +8,18 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
)
type Client struct {
ChainID string
url string
timeout time.Duration
client *ethclient.Client
ChainID string
url string
timeout time.Duration
rawClient *rpc.Client
client *ethclient.Client
}
func NewClient(url string, timeout time.Duration, chainID string) (*Client, error) {
@ -29,10 +31,11 @@ func NewClient(url string, timeout time.Duration, chainID string) (*Client, erro
return nil, fmt.Errorf("can't dial JSON rpc url: %w", err)
}
client := &Client{
ChainID: chainID,
url: url,
timeout: timeout,
client: ethclient.NewClient(rawClient),
ChainID: chainID,
url: url,
timeout: timeout,
rawClient: rawClient,
client: ethclient.NewClient(rawClient),
}
ctx2, cancel2 := context.WithTimeout(context.Background(), timeout)
defer cancel2()
@ -76,6 +79,52 @@ func (c *Client) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]type
return logs, err
}
// FilterLogsSafe is the same as FilterLogs, but makes an additional eth_blockNumber
// request to ensure that the node behind RPC is synced to the needed point.
func (c *Client) FilterLogsSafe(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
defer ObserveDuration(c.ChainID, c.url, "eth_getLogsSafe")()
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
var err error
defer func() {
ObserveError(c.ChainID, c.url, "eth_getLogsSafe", err)
}()
var arg interface{}
arg, err = toFilterArg(q)
if err != nil {
return nil, fmt.Errorf("can't encode filter argument: %w", err)
}
var logs []types.Log
var blockNumber hexutil.Uint64
batches := []rpc.BatchElem{
{
Method: "eth_getLogs",
Args: []interface{}{arg},
Result: &logs,
},
{
Method: "eth_blockNumber",
Result: &blockNumber,
},
}
err = c.rawClient.BatchCallContext(ctx, batches)
if err != nil {
return nil, fmt.Errorf("can't make batch request: %w", err)
}
if err = batches[0].Error; err != nil {
return nil, fmt.Errorf("can't request logs: %w", err)
}
if err = batches[1].Error; err != nil {
return nil, fmt.Errorf("can't request block number: %w", err)
}
if uint64(blockNumber) < q.ToBlock.Uint64() {
return nil, fmt.Errorf("node is not synced, current block %d is older than toBlock %d in the query", blockNumber, q.ToBlock.Uint64())
}
return logs, nil
}
func (c *Client) TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, error) {
defer ObserveDuration(c.ChainID, c.url, "eth_getTransactionByHash")()
ctx, cancel := context.WithTimeout(ctx, c.timeout)
@ -85,3 +134,24 @@ func (c *Client) TransactionByHash(ctx context.Context, txHash common.Hash) (*ty
ObserveError(c.ChainID, c.url, "eth_getTransactionByHash", err)
return tx, err
}
func toFilterArg(q ethereum.FilterQuery) (interface{}, error) {
arg := map[string]interface{}{
"address": q.Addresses,
"topics": q.Topics,
}
if q.BlockHash != nil {
return nil, fmt.Errorf("logs query from BlockHash is not supported")
} else {
if q.FromBlock == nil {
arg["fromBlock"] = "0x0"
} else {
arg["fromBlock"] = hexutil.EncodeBig(q.FromBlock)
}
if q.ToBlock == nil || q.ToBlock.Int64() <= 0 {
return nil, fmt.Errorf("only positive toBlock is supported")
}
arg["toBlock"] = hexutil.EncodeBig(q.ToBlock)
}
return arg, nil
}

View File

@ -66,9 +66,10 @@ func main() {
cfg.Bridges = newBridgeCfg
}
for _, bridgeCfg := range cfg.Bridges {
m, err2 := monitor.NewMonitor(ctx, logger.WithField("bridge_id", bridgeCfg.ID), dbConn, repo, bridgeCfg)
bridgeLogger := logger.WithField("bridge_id", bridgeCfg.ID)
m, err2 := monitor.NewMonitor(ctx, bridgeLogger, dbConn, repo, bridgeCfg)
if err2 != nil {
logger.WithError(err2).Fatal("can't initialize bridge monitor")
bridgeLogger.WithError(err2).Fatal("can't initialize bridge monitor")
}
monitors = append(monitors, m)

View File

@ -274,6 +274,9 @@ func (m *ContractMonitor) StartLogsFetcher(ctx context.Context) {
"from_block": blocksRange.From,
"to_block": blocksRange.To,
}).Error("failed logs fetching, retrying")
if utils.ContextSleep(ctx, 10*time.Second) == nil {
return
}
continue
}
break
@ -283,11 +286,18 @@ func (m *ContractMonitor) StartLogsFetcher(ctx context.Context) {
}
func (m *ContractMonitor) tryToFetchLogs(ctx context.Context, blocksRange *BlocksRange) error {
logs, err := m.client.FilterLogs(ctx, ethereum.FilterQuery{
q := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(blocksRange.From)),
ToBlock: big.NewInt(int64(blocksRange.To)),
Addresses: []common.Address{m.cfg.Address},
})
}
var logs []types.Log
var err error
if m.cfg.Chain.SafeLogsRequest {
logs, err = m.client.FilterLogsSafe(ctx, q)
} else {
logs, err = m.client.FilterLogs(ctx, q)
}
if err != nil {
return err
}