Add configurable block indexers

This commit is contained in:
Kirill Fedoseev 2021-06-08 14:13:34 +03:00
parent 4986ed7cdb
commit 620cec8d4e
6 changed files with 44 additions and 26 deletions

View File

@ -4,11 +4,13 @@ chains:
host: https://mainnet.infura.io/v3/${INFURA_PROJECT_KEY}
timeout: 10000
block_time: 15000
block_index_interval: 10000
xdai:
rpc:
host: https://dai.poa.network
timeout: 10000
block_time: 5000
block_index_interval: 5000
bridges:
xdai-amb:
home:

View File

@ -13,8 +13,9 @@ type RPCConfig struct {
}
type ChainConfig struct {
RPC *RPCConfig `yaml:"rpc"`
BlockTime uint64 `yaml:"block_time"`
RPC *RPCConfig `yaml:"rpc"`
BlockTime uint64 `yaml:"block_time"`
BlockIndexInterval uint64 `yaml:"block_index_interval"`
}
type BridgeSideConfig struct {
@ -80,9 +81,9 @@ func processConfig(cfg *Config) {
}
}
func ReadConfig() Config {
func ReadConfig() *Config {
var cfg Config
readYamlConfig(&cfg)
processConfig(&cfg)
return cfg
return &cfg
}

View File

@ -11,6 +11,7 @@ import (
type Client struct {
timeout time.Duration
chainId string
*ethclient.Client
}
@ -19,17 +20,24 @@ func NewClient(url string, timeout int64) (*Client, error) {
if err != nil {
return nil, err
}
return &Client{time.Millisecond * time.Duration(timeout), rawClient}, nil
return &Client{timeout: time.Millisecond * time.Duration(timeout), Client: rawClient}, nil
}
func (c *Client) GetCtx() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), c.timeout)
}
func (c *Client) ChainID() (*big.Int, error) {
ctx, cancel := c.GetCtx()
defer cancel()
return c.Client.ChainID(ctx)
func (c *Client) ChainID() (string, error) {
if len(c.chainId) == 0 {
ctx, cancel := c.GetCtx()
defer cancel()
chainId, err := c.Client.ChainID(ctx)
if err != nil {
return "", err
}
c.chainId = chainId.String()
}
return c.chainId, nil
}
func (c *Client) BlockNumber() (uint64, error) {

10
main.go
View File

@ -7,6 +7,7 @@ import (
"amb-monitor/logging"
"amb-monitor/monitor"
"context"
"fmt"
"os"
"os/signal"
)
@ -34,8 +35,8 @@ func main() {
if err != nil {
logger.Fatal(err)
}
clients[state.Home.ChainId] = state.Home.Client
clients[state.Foreign.ChainId] = state.Foreign.Client
clients[bridge.Home.ChainName] = state.Home.Client
clients[bridge.Foreign.ChainName] = state.Foreign.Client
logger.Printf("starting monitor for bridge %s\n", bridgeName)
go state.Home.StartBlockWatcher(context.Background())
@ -51,7 +52,10 @@ func main() {
go state.StartReindexer(context.Background(), conn)
}
go monitor.StartBlockIndexer(context.Background(), conn, clients)
for chainId, client := range clients {
fmt.Println(chainId, cfg.Chains)
go monitor.StartBlockIndexer(context.Background(), conn, chainId, client, cfg.Chains[chainId])
}
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt)

View File

@ -1,6 +1,7 @@
package monitor
import (
"amb-monitor/config"
"amb-monitor/db"
"amb-monitor/ethclient"
"amb-monitor/logging"
@ -181,10 +182,14 @@ func (state *BridgeMonitor) StartReindexer(ctx context.Context, conn *db.DB) {
}
}
func StartBlockIndexer(ctx context.Context, conn *db.DB, clients map[string]*ethclient.Client) {
logger := logging.GetLogger("block-indexer")
func StartBlockIndexer(ctx context.Context, conn *db.DB, chainName string, client *ethclient.Client, cfg *config.ChainConfig) {
logger := logging.GetLogger("block-indexer-" + chainName)
t := time.NewTicker(5 * time.Second)
t := time.NewTicker(time.Duration(cfg.BlockIndexInterval) * time.Millisecond)
chainId, err := client.ChainID()
if err != nil {
panic(err)
}
for {
select {
@ -193,7 +198,11 @@ func StartBlockIndexer(ctx context.Context, conn *db.DB, clients map[string]*eth
return
case <-t.C:
logger.Println("searching for a batch of blocks without timestamps")
rows, err := conn.Query(ctx, "SELECT id, chain_id::text, block_number FROM block WHERE timestamp IS NULL LIMIT 100")
rows, err := conn.Query(
ctx,
"SELECT id, block_number FROM block WHERE chain_id = $1 AND timestamp IS NULL LIMIT 100",
chainId,
)
if err != nil {
logger.Println(err)
rows.Close()
@ -203,22 +212,16 @@ func StartBlockIndexer(ctx context.Context, conn *db.DB, clients map[string]*eth
var wg sync.WaitGroup
var batch pgx.Batch
var id, blockNumber uint64
var chainId string
for rows.Next() {
err := rows.Scan(&id, &chainId, &blockNumber)
err := rows.Scan(&id, &blockNumber)
if err != nil {
logger.Println(err)
continue
}
client, ok := clients[chainId]
if !ok {
panic("required eth client does not exist")
}
wg.Add(1)
go func(id uint64, blockNumber uint64, chainId string) {
go func(id uint64, blockNumber uint64) {
defer wg.Done()
header, err := client.HeaderByNumber(blockNumber)
@ -227,7 +230,7 @@ func StartBlockIndexer(ctx context.Context, conn *db.DB, clients map[string]*eth
} else {
batch.Queue("UPDATE block SET timestamp=$2 WHERE id=$1", id, time.Unix(int64(header.Time), 0))
}
}(id, blockNumber, chainId)
}(id, blockNumber)
}
if rows.Err() != nil {
logger.Println(rows.Err())

View File

@ -69,7 +69,7 @@ func NewSideState(cfg *config.BridgeSideConfig, conn *db.DB) (*BridgeSideMonitor
}
if n, ok := res[0].(uint64); ok {
state := &BridgeSideMonitor{
ChainId: chainId.String(),
ChainId: chainId,
RequiredBlockConfirmations: n,
StartBlock: cfg.StartBlock,
CurrentBlock: cfg.StartBlock,