Add backfiller command to process a chain (#364)

* Add backfiller command to process a chain

* Add deployment for solana backfiller
This commit is contained in:
ftocal 2023-05-31 10:29:47 -03:00 committed by GitHub
parent c510df7948
commit 06d6950ad3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 897 additions and 475 deletions

View File

@ -0,0 +1,29 @@
# contract-watcher
This component is in charge of obtaining the redeemed vaa's to obtain information from the target chain. .
## Usage
### Service
```bash
contract-watcher service
```
### Backfiller
```bash
contract-watcher backfiller [flags]
```
#### Command-line arguments
- **--chain-name** *string* chain name
- **--chain-url** *string* chain URL
- **--from** *uint* first block to be processed
- **--log-level** *string* log level (default "INFO")
- **--mongo-database** *string* mongo database
- **--mongo-uri** *string* mongo connection
- **--network** *string* network (mainnet or testnet)
- **--page-size** *int* maximum number to process at one time (default 100)
- **--persist-blocks** persist processed blocks in storage
- **--rate-limit** *int* rate limit per second (default 3)
- **--to** *uint* last block to be processed (included)

View File

@ -0,0 +1,84 @@
package builder
import (
"time"
solana_go "github.com/gagliardetto/solana-go"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/config"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/ankr"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/aptos"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/evm"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/solana"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/terra"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/storage"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/watcher"
"go.uber.org/ratelimit"
"go.uber.org/zap"
)
func CreateEVMWatcher(rateLimit int, chainURL string, wb config.WatcherBlockchain, repo *storage.Repository,
logger *zap.Logger) watcher.ContractWatcher {
evmLimiter := ratelimit.New(rateLimit, ratelimit.Per(time.Second))
ankrClient := ankr.NewAnkrSDK(chainURL, evmLimiter)
params := watcher.EVMParams{ChainID: wb.ChainID, Blockchain: wb.Name, ContractAddress: wb.Address,
SizeBlocks: wb.SizeBlocks, WaitSeconds: wb.WaitSeconds, InitialBlock: wb.InitialBlock}
return watcher.NewEVMWatcher(ankrClient, repo, params, logger)
}
func CreateSolanaWatcher(rateLimit int, chainURL string, wb config.WatcherBlockchain, logger *zap.Logger, repo *storage.Repository) watcher.ContractWatcher {
contractAddress, err := solana_go.PublicKeyFromBase58(wb.Address)
if err != nil {
logger.Fatal("failed to parse solana contract address", zap.Error(err))
}
solanaLimiter := ratelimit.New(rateLimit, ratelimit.Per(time.Second))
solanaClient := solana.NewSolanaSDK(chainURL, solanaLimiter, solana.WithRetries(3, 10*time.Second))
params := watcher.SolanaParams{Blockchain: wb.Name, ContractAddress: contractAddress,
SizeBlocks: wb.SizeBlocks, WaitSeconds: wb.WaitSeconds, InitialBlock: wb.InitialBlock}
return watcher.NewSolanaWatcher(solanaClient, repo, params, logger)
}
func CreateTerraWatcher(rateLimit int, chainURL string, wb config.WatcherBlockchain, logger *zap.Logger, repo *storage.Repository) watcher.ContractWatcher {
terraLimiter := ratelimit.New(rateLimit, ratelimit.Per(time.Second))
terraClient := terra.NewTerraSDK(chainURL, terraLimiter)
params := watcher.TerraParams{ChainID: wb.ChainID, Blockchain: wb.Name,
ContractAddress: wb.Address, WaitSeconds: wb.WaitSeconds, InitialBlock: wb.InitialBlock}
return watcher.NewTerraWatcher(terraClient, params, repo, logger)
}
func CreateAptosWatcher(rateLimit int, chainURL string, wb config.WatcherBlockchain, logger *zap.Logger, repo *storage.Repository) watcher.ContractWatcher {
aptosLimiter := ratelimit.New(rateLimit, ratelimit.Per(time.Second))
aptosClient := aptos.NewAptosSDK(chainURL, aptosLimiter)
params := watcher.AptosParams{
Blockchain: wb.Name,
ContractAddress: wb.Address,
SizeBlocks: wb.SizeBlocks,
WaitSeconds: wb.WaitSeconds,
InitialBlock: wb.InitialBlock}
return watcher.NewAptosWatcher(aptosClient, params, repo, logger)
}
func CreateOasisWatcher(rateLimit int, chainURL string, wb config.WatcherBlockchain, logger *zap.Logger, repo *storage.Repository) watcher.ContractWatcher {
oasisLimiter := ratelimit.New(rateLimit, ratelimit.Per(time.Second))
oasisClient := evm.NewEvmSDK(chainURL, oasisLimiter)
params := watcher.EVMParams{
ChainID: wb.ChainID,
Blockchain: wb.Name,
ContractAddress: wb.Address,
SizeBlocks: wb.SizeBlocks,
WaitSeconds: wb.WaitSeconds,
InitialBlock: wb.InitialBlock}
return watcher.NewEvmStandarWatcher(oasisClient, params, repo, logger)
}
func CreateMoonbeamWatcher(rateLimit int, chainURL string, wb config.WatcherBlockchain, logger *zap.Logger, repo *storage.Repository) watcher.ContractWatcher {
moonbeamLimiter := ratelimit.New(rateLimit, ratelimit.Per(time.Second))
moonbeamClient := evm.NewEvmSDK(chainURL, moonbeamLimiter)
params := watcher.EVMParams{
ChainID: wb.ChainID,
Blockchain: wb.Name,
ContractAddress: wb.Address,
SizeBlocks: wb.SizeBlocks,
WaitSeconds: wb.WaitSeconds,
InitialBlock: wb.InitialBlock}
return watcher.NewEvmStandarWatcher(moonbeamClient, params, repo, logger)
}

View File

@ -0,0 +1,114 @@
package backfiller
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/builder"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/config"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/db"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/storage"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/watcher"
"go.uber.org/zap"
)
func Run(config *config.BackfillerConfiguration) {
rootCtx := context.Background()
logger := logger.New("wormhole-explorer-contract-watcher", logger.WithLevel(config.LogLevel))
logger.Info("Starting wormhole-explorer-contract-watcher as backfiller ...")
//setup DB connection
db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
// create repositories
repo := storage.NewRepository(db.Database, logger)
var watcher watcher.ContractWatcher
switch config.Network {
case domain.P2pMainNet:
watcher = newWatcherForMainnet(config, repo, logger)
case domain.P2pTestNet:
watcher = newWatcherForTestnet(config, repo, logger)
default:
logger.Fatal("P2P network not supported")
}
logger.Info("Processing backfill ...",
zap.String("network", config.Network),
zap.String("chain", config.ChainName),
zap.Bool("persistBlock", config.PersistBlock),
zap.Uint64("from", config.FromBlock),
zap.Uint64("to", config.ToBlock))
watcher.Backfill(rootCtx, config.FromBlock, config.ToBlock, config.PageSize, config.PersistBlock)
logger.Info("Closing database connections ...")
db.Close()
logger.Info("Finish wormhole-explorer-contract-watcher as backfiller")
}
func newWatcherForMainnet(cfg *config.BackfillerConfiguration, repo *storage.Repository, logger *zap.Logger) watcher.ContractWatcher {
var watcher watcher.ContractWatcher
switch cfg.ChainName {
case config.ETHEREUM_MAINNET.ChainID.String():
watcher = builder.CreateEVMWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.ETHEREUM_MAINNET, repo, logger)
case config.POLYGON_MAINNET.ChainID.String():
watcher = builder.CreateEVMWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.POLYGON_MAINNET, repo, logger)
case config.BSC_MAINNET.ChainID.String():
watcher = builder.CreateEVMWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.BSC_MAINNET, repo, logger)
case config.FANTOM_MAINNET.ChainID.String():
watcher = builder.CreateEVMWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.FANTOM_MAINNET, repo, logger)
case config.AVALANCHE_MAINNET.ChainID.String():
watcher = builder.CreateEVMWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.AVALANCHE_MAINNET, repo, logger)
case config.SOLANA_MAINNET.ChainID.String():
watcher = builder.CreateSolanaWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.SOLANA_MAINNET, logger, repo)
case config.TERRA_MAINNET.ChainID.String():
watcher = builder.CreateTerraWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.TERRA_MAINNET, logger, repo)
case config.APTOS_MAINNET.ChainID.String():
watcher = builder.CreateAptosWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.APTOS_MAINNET, logger, repo)
case config.OASIS_MAINNET.ChainID.String():
watcher = builder.CreateOasisWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.OASIS_MAINNET, logger, repo)
case config.MOONBEAM_MAINNET.ChainID.String():
watcher = builder.CreateMoonbeamWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.MOONBEAM_MAINNET, logger, repo)
default:
logger.Fatal("chain not supported")
}
return watcher
}
func newWatcherForTestnet(cfg *config.BackfillerConfiguration, repo *storage.Repository, logger *zap.Logger) watcher.ContractWatcher {
var watcher watcher.ContractWatcher
switch cfg.ChainName {
case config.ETHEREUM_TESTNET.ChainID.String():
watcher = builder.CreateEVMWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.ETHEREUM_TESTNET, repo, logger)
case config.POLYGON_TESTNET.ChainID.String():
watcher = builder.CreateEVMWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.POLYGON_TESTNET, repo, logger)
case config.BSC_TESTNET.ChainID.String():
watcher = builder.CreateEVMWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.BSC_TESTNET, repo, logger)
case config.FANTOM_TESTNET.ChainID.String():
watcher = builder.CreateEVMWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.FANTOM_TESTNET, repo, logger)
case config.AVALANCHE_TESTNET.ChainID.String():
watcher = builder.CreateEVMWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.AVALANCHE_TESTNET, repo, logger)
case config.SOLANA_TESTNET.ChainID.String():
watcher = builder.CreateSolanaWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.SOLANA_TESTNET, logger, repo)
case config.APTOS_TESTNET.ChainID.String():
watcher = builder.CreateAptosWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.APTOS_TESTNET, logger, repo)
case config.OASIS_TESTNET.ChainID.String():
watcher = builder.CreateOasisWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.OASIS_TESTNET, logger, repo)
case config.MOONBEAM_TESTNET.ChainID.String():
watcher = builder.CreateMoonbeamWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.MOONBEAM_TESTNET, logger, repo)
default:
logger.Fatal("chain not supported")
}
return watcher
}

View File

@ -1,273 +1,88 @@
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
solana_go "github.com/gagliardetto/solana-go"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/spf13/cobra"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/cmd/backfiller"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/cmd/service"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/config"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/http/infrastructure"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/ankr"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/aptos"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/db"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/evm"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/solana"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/terra"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/processor"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/storage"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/watcher"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/ratelimit"
"go.uber.org/zap"
)
type exitCode int
func handleExit() {
if r := recover(); r != nil {
if e, ok := r.(exitCode); ok {
os.Exit(int(e))
}
panic(r) // not an Exit, bubble up
}
}
func main() {
defer handleExit()
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
config, err := config.New(rootCtx)
if err != nil {
log.Fatal("Error creating config", err)
}
logger := logger.New("wormhole-explorer-contract-watcher", logger.WithLevel(config.LogLevel))
logger.Info("Starting wormhole-explorer-contract-watcher ...")
//setup DB connection
db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
// get health check functions.
healthChecks, err := newHealthChecks(rootCtx, db.Database)
if err != nil {
logger.Fatal("failed to create health checks", zap.Error(err))
}
// create repositories
repo := storage.NewRepository(db.Database, logger)
// create watchers
watchers := newWatchers(config, repo, logger)
//create processor
processor := processor.NewProcessor(watchers, logger)
processor.Start(rootCtx)
// create and start server.
server := infrastructure.NewServer(logger, config.Port, config.PprofEnabled, healthChecks...)
server.Start()
logger.Info("Started wormhole-explorer-contract-watcher")
// Waiting for signal
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-rootCtx.Done():
logger.Warn("Terminating with root context cancelled.")
case signal := <-sigterm:
logger.Info("Terminating with signal.", zap.String("signal", signal.String()))
}
logger.Info("root context cancelled, exiting...")
rootCtxCancel()
logger.Info("Closing processor ...")
processor.Close()
logger.Info("Closing database connections ...")
db.Close()
logger.Info("Closing Http server ...")
server.Stop()
logger.Info("Finished wormhole-explorer-contract-watcher")
execute()
}
func newHealthChecks(ctx context.Context, db *mongo.Database) ([]health.Check, error) {
return []health.Check{health.Mongo(db)}, nil
}
type watcherBlockchain struct {
chainID vaa.ChainID
name string
address string
sizeBlocks uint8
waitSeconds uint16
initialBlock int64
}
type watchersConfig struct {
evms []watcherBlockchain
solana *watcherBlockchain
terra *watcherBlockchain
aptos *watcherBlockchain
oasis *watcherBlockchain
moonbeam *watcherBlockchain
rateLimit rateLimitConfig
}
type rateLimitConfig struct {
evm int
solana int
terra int
aptos int
oasis int
moonbeam int
}
func newWatchers(config *config.Configuration, repo *storage.Repository, logger *zap.Logger) []watcher.ContractWatcher {
var watchers *watchersConfig
switch config.P2pNetwork {
case domain.P2pMainNet:
watchers = newEVMWatchersForMainnet()
case domain.P2pTestNet:
watchers = newEVMWatchersForTestnet()
default:
watchers = &watchersConfig{}
}
result := make([]watcher.ContractWatcher, 0)
// add evm watchers
evmLimiter := ratelimit.New(watchers.rateLimit.evm, ratelimit.Per(time.Second))
ankrClient := ankr.NewAnkrSDK(config.AnkrUrl, evmLimiter)
for _, w := range watchers.evms {
params := watcher.EVMParams{ChainID: w.chainID, Blockchain: w.name, ContractAddress: w.address,
SizeBlocks: w.sizeBlocks, WaitSeconds: w.waitSeconds, InitialBlock: w.initialBlock}
result = append(result, watcher.NewEVMWatcher(ankrClient, repo, params, logger))
}
// add solana watcher
if watchers.solana != nil {
contractAddress, err := solana_go.PublicKeyFromBase58(watchers.solana.address)
if err != nil {
logger.Fatal("failed to parse solana contract address", zap.Error(err))
}
solanaLimiter := ratelimit.New(watchers.rateLimit.solana, ratelimit.Per(time.Second))
solanaClient := solana.NewSolanaSDK(config.SolanaUrl, solanaLimiter, solana.WithRetries(3, 10*time.Second))
params := watcher.SolanaParams{Blockchain: watchers.solana.name, ContractAddress: contractAddress,
SizeBlocks: watchers.solana.sizeBlocks, WaitSeconds: watchers.solana.waitSeconds, InitialBlock: watchers.solana.initialBlock}
result = append(result, watcher.NewSolanaWatcher(solanaClient, repo, params, logger))
}
// add terra watcher
if watchers.terra != nil {
terraLimiter := ratelimit.New(watchers.rateLimit.terra, ratelimit.Per(time.Second))
terraClient := terra.NewTerraSDK(config.TerraUrl, terraLimiter)
params := watcher.TerraParams{ChainID: watchers.terra.chainID, Blockchain: watchers.terra.name,
ContractAddress: watchers.terra.address, WaitSeconds: watchers.terra.waitSeconds, InitialBlock: watchers.terra.initialBlock}
result = append(result, watcher.NewTerraWatcher(terraClient, params, repo, logger))
}
// add aptos watcher
if watchers.aptos != nil {
aptosLimiter := ratelimit.New(watchers.rateLimit.aptos, ratelimit.Per(time.Second))
aptosClient := aptos.NewAptosSDK(config.AptosUrl, aptosLimiter)
params := watcher.AptosParams{
Blockchain: watchers.aptos.name,
ContractAddress: watchers.aptos.address,
SizeBlocks: watchers.aptos.sizeBlocks,
WaitSeconds: watchers.aptos.waitSeconds,
InitialBlock: watchers.aptos.initialBlock}
result = append(result, watcher.NewAptosWatcher(aptosClient, params, repo, logger))
}
// add oasis watcher
if watchers.oasis != nil {
oasisLimiter := ratelimit.New(watchers.rateLimit.oasis, ratelimit.Per(time.Second))
oasisClient := evm.NewEvmSDK(config.OasisUrl, oasisLimiter)
params := watcher.EVMParams{
ChainID: watchers.oasis.chainID,
Blockchain: watchers.oasis.name,
ContractAddress: watchers.oasis.address,
SizeBlocks: watchers.oasis.sizeBlocks,
WaitSeconds: watchers.oasis.waitSeconds,
InitialBlock: watchers.oasis.initialBlock}
result = append(result, watcher.NewEvmStandarWatcher(oasisClient, params, repo, logger))
}
// add moonbeam watcher
if watchers.moonbeam != nil {
moonbeamLimiter := ratelimit.New(watchers.rateLimit.moonbeam, ratelimit.Per(time.Second))
moonbeamClient := evm.NewEvmSDK(config.MoonbeamUrl, moonbeamLimiter)
params := watcher.EVMParams{
ChainID: watchers.moonbeam.chainID,
Blockchain: watchers.moonbeam.name,
ContractAddress: watchers.moonbeam.address,
SizeBlocks: watchers.moonbeam.sizeBlocks,
WaitSeconds: watchers.moonbeam.waitSeconds,
InitialBlock: watchers.moonbeam.initialBlock}
result = append(result, watcher.NewEvmStandarWatcher(moonbeamClient, params, repo, logger))
}
return result
}
func newEVMWatchersForMainnet() *watchersConfig {
return &watchersConfig{
evms: []watcherBlockchain{
ETHEREUM_MAINNET,
POLYGON_MAINNET,
BSC_MAINNET,
FANTOM_MAINNET,
AVALANCHE_MAINNET,
},
solana: &SOLANA_MAINNET,
terra: &TERRA_MAINNET,
aptos: &APTOS_MAINNET,
oasis: &OASIS_MAINNET,
moonbeam: &MOONBEAM_MAINNET,
rateLimit: rateLimitConfig{
evm: 1000,
solana: 3,
terra: 10,
aptos: 3,
oasis: 3,
moonbeam: 5,
func execute() error {
root := &cobra.Command{
Use: "contract-watcher",
Run: func(cmd *cobra.Command, args []string) {
if len(args) == 0 {
service.Run()
}
},
}
addServiceCommand(root)
addBackfillerCommand(root)
return root.Execute()
}
func newEVMWatchersForTestnet() *watchersConfig {
return &watchersConfig{
evms: []watcherBlockchain{
ETHEREUM_TESTNET,
POLYGON_TESTNET,
BSC_TESTNET,
FANTOM_TESTNET,
AVALANCHE_TESTNET,
},
solana: &SOLANA_TESTNET,
aptos: &APTOS_TESTNET,
oasis: &OASIS_TESTNET,
moonbeam: &MOONBEAM_TESTNET,
rateLimit: rateLimitConfig{
evm: 10,
solana: 2,
terra: 5,
aptos: 1,
oasis: 1,
moonbeam: 2,
func addServiceCommand(root *cobra.Command) {
serviceCommand := &cobra.Command{
Use: "service",
Short: "Run contract-watcher as service",
Run: func(_ *cobra.Command, _ []string) {
service.Run()
},
}
root.AddCommand(serviceCommand)
}
func addBackfillerCommand(parent *cobra.Command) {
var network, mongoUri, mongoDb, chainName, chainURL, logLevel string
var fromBlock, toBlock, pageSize uint64
var rateLimit int
var persistBlock bool
backfillerCommand := &cobra.Command{
Use: "backfiller",
Short: "Run backfiller to backfill data",
Run: func(c *cobra.Command, _ []string) {
cfg := &config.BackfillerConfiguration{
LogLevel: logLevel,
Network: network,
MongoURI: mongoUri,
MongoDatabase: mongoDb,
ChainName: chainName,
ChainUrl: chainURL,
FromBlock: fromBlock,
ToBlock: toBlock,
RateLimitPerSecond: rateLimit,
PageSize: pageSize,
PersistBlock: persistBlock,
}
backfiller.Run(cfg)
},
}
backfillerCommand.Flags().StringVar(&logLevel, "log-level", "INFO", "log level")
backfillerCommand.Flags().StringVar(&network, "network", "", "network (mainnet or testnet)")
backfillerCommand.Flags().StringVar(&mongoUri, "mongo-uri", "", "Mongo connection")
backfillerCommand.Flags().StringVar(&mongoDb, "mongo-database", "", "Mongo database")
backfillerCommand.Flags().StringVar(&chainName, "chain-name", "", "chain name")
backfillerCommand.Flags().StringVar(&chainURL, "chain-url", "", "chain URL")
backfillerCommand.Flags().Uint64Var(&fromBlock, "from", 0, "first block to be processed")
backfillerCommand.Flags().Uint64Var(&toBlock, "to", 0, "last block to be processed (included)")
backfillerCommand.Flags().IntVar(&rateLimit, "rate-limit", 3, "rate limit per second")
backfillerCommand.Flags().Uint64Var(&pageSize, "page-size", 100, "maximum number to process at one time")
backfillerCommand.Flags().BoolVar(&persistBlock, "persist-blocks", false, "persist processed blocks in storage")
backfillerCommand.MarkFlagRequired("network")
backfillerCommand.MarkFlagRequired("mongo-uri")
backfillerCommand.MarkFlagRequired("mongo-database")
backfillerCommand.MarkFlagRequired("chain-name")
backfillerCommand.MarkFlagRequired("chain-url")
backfillerCommand.MarkFlagRequired("from")
backfillerCommand.MarkFlagRequired("to")
parent.AddCommand(backfillerCommand)
}

View File

@ -1,93 +0,0 @@
package main
import "github.com/wormhole-foundation/wormhole/sdk/vaa"
var ETHEREUM_MAINNET = watcherBlockchain{
chainID: vaa.ChainIDEthereum,
name: "eth",
address: "0x3ee18B2214AFF97000D974cf647E7C347E8fa585",
sizeBlocks: 100,
waitSeconds: 10,
initialBlock: 16820790,
}
var POLYGON_MAINNET = watcherBlockchain{
chainID: vaa.ChainIDPolygon,
name: "polygon",
address: "0x5a58505a96D1dbf8dF91cB21B54419FC36e93fdE",
sizeBlocks: 100,
waitSeconds: 10,
initialBlock: 40307020,
}
var BSC_MAINNET = watcherBlockchain{
chainID: vaa.ChainIDBSC,
name: "bsc",
address: "0xB6F6D86a8f9879A9c87f643768d9efc38c1Da6E7",
sizeBlocks: 100,
waitSeconds: 10,
initialBlock: 26436320,
}
var FANTOM_MAINNET = watcherBlockchain{
chainID: vaa.ChainIDFantom,
name: "fantom",
address: "0x7C9Fc5741288cDFdD83CeB07f3ea7e22618D79D2",
sizeBlocks: 100,
waitSeconds: 10,
initialBlock: 57525624,
}
var SOLANA_MAINNET = watcherBlockchain{
chainID: vaa.ChainIDSolana,
name: "solana",
address: "wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb",
sizeBlocks: 50,
waitSeconds: 10,
initialBlock: 183675278,
}
var TERRA_MAINNET = watcherBlockchain{
chainID: vaa.ChainIDTerra,
name: "terra",
address: "terra10nmmwe8r3g99a9newtqa7a75xfgs2e8z87r2sf",
sizeBlocks: 0,
waitSeconds: 10,
initialBlock: 3911168,
}
var AVALANCHE_MAINNET = watcherBlockchain{
chainID: vaa.ChainIDAvalanche,
name: "avalanche",
address: "0x0e082F06FF657D94310cB8cE8B0D9a04541d8052",
sizeBlocks: 100,
waitSeconds: 10,
initialBlock: 8237181,
}
var APTOS_MAINNET = watcherBlockchain{
chainID: vaa.ChainIDAptos,
name: "aptos",
address: "0x576410486a2da45eee6c949c995670112ddf2fbeedab20350d506328eefc9d4f",
sizeBlocks: 50,
waitSeconds: 10,
initialBlock: 1094430,
}
var OASIS_MAINNET = watcherBlockchain{
chainID: vaa.ChainIDOasis,
name: "oasis",
address: "0x5848C791e09901b40A9Ef749f2a6735b418d7564",
sizeBlocks: 50,
waitSeconds: 10,
initialBlock: 1762,
}
var MOONBEAM_MAINNET = watcherBlockchain{
chainID: vaa.ChainIDMoonbeam,
name: "moonbeam",
address: "0xb1731c586ca89a23809861c6103f0b96b3f57d92",
sizeBlocks: 50,
waitSeconds: 10,
initialBlock: 1853330,
}

View File

@ -0,0 +1,227 @@
package service
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/builder"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/config"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/http/infrastructure"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/ankr"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/db"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/processor"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/storage"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/watcher"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/ratelimit"
"go.uber.org/zap"
)
type exitCode int
func handleExit() {
if r := recover(); r != nil {
if e, ok := r.(exitCode); ok {
os.Exit(int(e))
}
panic(r) // not an Exit, bubble up
}
}
type watchersConfig struct {
evms []config.WatcherBlockchain
solana *config.WatcherBlockchain
terra *config.WatcherBlockchain
aptos *config.WatcherBlockchain
oasis *config.WatcherBlockchain
moonbeam *config.WatcherBlockchain
rateLimit rateLimitConfig
}
type rateLimitConfig struct {
evm int
solana int
terra int
aptos int
oasis int
moonbeam int
}
func Run() {
defer handleExit()
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
config, err := config.New(rootCtx)
if err != nil {
log.Fatal("Error creating config", err)
}
logger := logger.New("wormhole-explorer-contract-watcher", logger.WithLevel(config.LogLevel))
logger.Info("Starting wormhole-explorer-contract-watcher ...")
//setup DB connection
db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
// get health check functions.
healthChecks, err := newHealthChecks(rootCtx, db.Database)
if err != nil {
logger.Fatal("failed to create health checks", zap.Error(err))
}
// create repositories
repo := storage.NewRepository(db.Database, logger)
// create watchers
watchers := newWatchers(config, repo, logger)
//create processor
processor := processor.NewProcessor(watchers, logger)
processor.Start(rootCtx)
// create and start server.
server := infrastructure.NewServer(logger, config.Port, config.PprofEnabled, healthChecks...)
server.Start()
logger.Info("Started wormhole-explorer-contract-watcher")
// Waiting for signal
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-rootCtx.Done():
logger.Warn("Terminating with root context cancelled.")
case signal := <-sigterm:
logger.Info("Terminating with signal.", zap.String("signal", signal.String()))
}
logger.Info("root context cancelled, exiting...")
rootCtxCancel()
logger.Info("Closing processor ...")
processor.Close()
logger.Info("Closing database connections ...")
db.Close()
logger.Info("Closing Http server ...")
server.Stop()
logger.Info("Finished wormhole-explorer-contract-watcher")
}
func newHealthChecks(ctx context.Context, db *mongo.Database) ([]health.Check, error) {
return []health.Check{health.Mongo(db)}, nil
}
func newWatchers(config *config.ServiceConfiguration, repo *storage.Repository, logger *zap.Logger) []watcher.ContractWatcher {
var watchers *watchersConfig
switch config.P2pNetwork {
case domain.P2pMainNet:
watchers = newEVMWatchersForMainnet()
case domain.P2pTestNet:
watchers = newEVMWatchersForTestnet()
default:
watchers = &watchersConfig{}
}
result := make([]watcher.ContractWatcher, 0)
// add evm watchers
evmLimiter := ratelimit.New(watchers.rateLimit.evm, ratelimit.Per(time.Second))
ankrClient := ankr.NewAnkrSDK(config.AnkrUrl, evmLimiter)
for _, w := range watchers.evms {
params := watcher.EVMParams{ChainID: w.ChainID, Blockchain: w.Name, ContractAddress: w.Address,
SizeBlocks: w.SizeBlocks, WaitSeconds: w.WaitSeconds, InitialBlock: w.InitialBlock}
result = append(result, watcher.NewEVMWatcher(ankrClient, repo, params, logger))
}
// add solana watcher
if watchers.solana != nil {
solanWatcher := builder.CreateSolanaWatcher(watchers.rateLimit.solana, config.SolanaUrl, *watchers.solana, logger, repo)
result = append(result, solanWatcher)
}
// add terra watcher
if watchers.terra != nil {
terraWatcher := builder.CreateTerraWatcher(watchers.rateLimit.terra, config.TerraUrl, *watchers.terra, logger, repo)
result = append(result, terraWatcher)
}
// add aptos watcher
if watchers.aptos != nil {
aptosWatcher := builder.CreateAptosWatcher(watchers.rateLimit.aptos, config.AptosUrl, *watchers.aptos, logger, repo)
result = append(result, aptosWatcher)
}
// add oasis watcher
if watchers.oasis != nil {
oasisWatcher := builder.CreateOasisWatcher(watchers.rateLimit.oasis, config.OasisUrl, *watchers.oasis, logger, repo)
result = append(result, oasisWatcher)
}
// add moonbeam watcher
if watchers.moonbeam != nil {
moonbeamWatcher := builder.CreateMoonbeamWatcher(watchers.rateLimit.moonbeam, config.MoonbeamUrl, *watchers.moonbeam, logger, repo)
result = append(result, moonbeamWatcher)
}
return result
}
func newEVMWatchersForMainnet() *watchersConfig {
return &watchersConfig{
evms: []config.WatcherBlockchain{
config.ETHEREUM_MAINNET,
config.POLYGON_MAINNET,
config.BSC_MAINNET,
config.FANTOM_MAINNET,
config.AVALANCHE_MAINNET,
},
solana: &config.SOLANA_MAINNET,
terra: &config.TERRA_MAINNET,
aptos: &config.APTOS_MAINNET,
oasis: &config.OASIS_MAINNET,
moonbeam: &config.MOONBEAM_MAINNET,
rateLimit: rateLimitConfig{
evm: 1000,
solana: 3,
terra: 10,
aptos: 3,
oasis: 3,
moonbeam: 5,
},
}
}
func newEVMWatchersForTestnet() *watchersConfig {
return &watchersConfig{
evms: []config.WatcherBlockchain{
config.ETHEREUM_TESTNET,
config.POLYGON_TESTNET,
config.BSC_TESTNET,
config.FANTOM_TESTNET,
config.AVALANCHE_TESTNET,
},
solana: &config.SOLANA_TESTNET,
aptos: &config.APTOS_TESTNET,
oasis: &config.OASIS_TESTNET,
moonbeam: &config.MOONBEAM_TESTNET,
rateLimit: rateLimitConfig{
evm: 10,
solana: 2,
terra: 5,
aptos: 1,
oasis: 1,
moonbeam: 2,
},
}
}

View File

@ -1,84 +0,0 @@
package main
import "github.com/wormhole-foundation/wormhole/sdk/vaa"
var ETHEREUM_TESTNET = watcherBlockchain{
chainID: vaa.ChainIDEthereum,
name: "eth_goerli",
address: "0xF890982f9310df57d00f659cf4fd87e65adEd8d7",
sizeBlocks: 100,
waitSeconds: 10,
initialBlock: 8660321,
}
var POLYGON_TESTNET = watcherBlockchain{
chainID: vaa.ChainIDPolygon,
name: "polygon_mumbai",
address: "0x377D55a7928c046E18eEbb61977e714d2a76472a",
sizeBlocks: 100,
waitSeconds: 10,
initialBlock: 33151522,
}
var BSC_TESTNET = watcherBlockchain{
chainID: vaa.ChainIDBSC,
name: "bsc_testnet_chapel",
address: "0x9dcF9D205C9De35334D646BeE44b2D2859712A09",
sizeBlocks: 100,
waitSeconds: 10,
initialBlock: 28071327,
}
var FANTOM_TESTNET = watcherBlockchain{
chainID: vaa.ChainIDFantom,
name: "fantom_testnet",
address: "0x599CEa2204B4FaECd584Ab1F2b6aCA137a0afbE8",
sizeBlocks: 100,
waitSeconds: 10,
initialBlock: 14524466,
}
var SOLANA_TESTNET = watcherBlockchain{
chainID: vaa.ChainIDSolana,
name: "solana",
address: "DZnkkTmCiFWfYTfT41X3Rd1kDgozqzxWaHqsw6W4x2oe",
sizeBlocks: 10,
waitSeconds: 10,
initialBlock: 16820790,
}
var AVALANCHE_TESTNET = watcherBlockchain{
chainID: vaa.ChainIDAvalanche,
name: "avalanche_fuji",
address: "0x61E44E506Ca5659E6c0bba9b678586fA2d729756",
sizeBlocks: 100,
waitSeconds: 10,
initialBlock: 11014526,
}
var APTOS_TESTNET = watcherBlockchain{
chainID: vaa.ChainIDAptos,
name: "aptos",
address: "0x576410486a2da45eee6c949c995670112ddf2fbeedab20350d506328eefc9d4f",
sizeBlocks: 50,
waitSeconds: 10,
initialBlock: 21522262,
}
var OASIS_TESTNET = watcherBlockchain{
chainID: vaa.ChainIDOasis,
name: "oasis",
address: "0x88d8004A9BdbfD9D28090A02010C19897a29605c",
sizeBlocks: 50,
waitSeconds: 10,
initialBlock: 130400,
}
var MOONBEAM_TESTNET = watcherBlockchain{
chainID: vaa.ChainIDMoonbeam,
name: "moonbeam",
address: "0xbc976D4b9D57E57c3cA52e1Fd136C45FF7955A96",
sizeBlocks: 50,
waitSeconds: 10,
initialBlock: 2097310,
}

View File

@ -7,8 +7,8 @@ import (
"github.com/sethvargo/go-envconfig"
)
// Configuration represents the application configuration with the default values.
type Configuration struct {
// ServiceConfiguration represents the application configuration when running as service with the default values.
type ServiceConfiguration struct {
Env string `env:"ENV,default=development"`
LogLevel string `env:"LOG_LEVEL,default=INFO"`
Port string `env:"PORT,default=8000"`
@ -24,11 +24,26 @@ type Configuration struct {
P2pNetwork string `env:"P2P_NETWORK,required"`
}
// BackfillerConfiguration represents the application configuration when running as backfiller.
type BackfillerConfiguration struct {
LogLevel string `env:"LOG_LEVEL,default=INFO"`
MongoURI string `env:"MONGODB_URI,required"`
MongoDatabase string `env:"MONGODB_DATABASE,required"`
ChainName string `env:"CHAIN_NAME,required"`
ChainUrl string `env:"CHAIN_URL,required"`
FromBlock uint64 `env:"FROM_BLOCK,required"`
ToBlock uint64 `env:"TO_BLOCK,required"`
Network string `env:"NETWORK,required"`
RateLimitPerSecond int `env:"RATE_LIMIT_PER_SECOND,default=10"`
PageSize uint64 `env:"PAGE_SIZE,default=100"`
PersistBlock bool `env:"PERSIST_BLOCK,default=false"`
}
// New creates a configuration with the values from .env file and environment variables.
func New(ctx context.Context) (*Configuration, error) {
func New(ctx context.Context) (*ServiceConfiguration, error) {
_ = godotenv.Load(".env", "../.env")
var configuration Configuration
var configuration ServiceConfiguration
if err := envconfig.Process(ctx, &configuration); err != nil {
return nil, err
}

View File

@ -0,0 +1,93 @@
package config
import "github.com/wormhole-foundation/wormhole/sdk/vaa"
var ETHEREUM_MAINNET = WatcherBlockchain{
ChainID: vaa.ChainIDEthereum,
Name: "eth",
Address: "0x3ee18B2214AFF97000D974cf647E7C347E8fa585",
SizeBlocks: 100,
WaitSeconds: 10,
InitialBlock: 16820790,
}
var POLYGON_MAINNET = WatcherBlockchain{
ChainID: vaa.ChainIDPolygon,
Name: "polygon",
Address: "0x5a58505a96D1dbf8dF91cB21B54419FC36e93fdE",
SizeBlocks: 100,
WaitSeconds: 10,
InitialBlock: 40307020,
}
var BSC_MAINNET = WatcherBlockchain{
ChainID: vaa.ChainIDBSC,
Name: "bsc",
Address: "0xB6F6D86a8f9879A9c87f643768d9efc38c1Da6E7",
SizeBlocks: 100,
WaitSeconds: 10,
InitialBlock: 26436320,
}
var FANTOM_MAINNET = WatcherBlockchain{
ChainID: vaa.ChainIDFantom,
Name: "fantom",
Address: "0x7C9Fc5741288cDFdD83CeB07f3ea7e22618D79D2",
SizeBlocks: 100,
WaitSeconds: 10,
InitialBlock: 57525624,
}
var SOLANA_MAINNET = WatcherBlockchain{
ChainID: vaa.ChainIDSolana,
Name: "solana",
Address: "wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb",
SizeBlocks: 50,
WaitSeconds: 10,
InitialBlock: 183675278,
}
var TERRA_MAINNET = WatcherBlockchain{
ChainID: vaa.ChainIDTerra,
Name: "terra",
Address: "terra10nmmwe8r3g99a9newtqa7a75xfgs2e8z87r2sf",
SizeBlocks: 0,
WaitSeconds: 10,
InitialBlock: 3911168,
}
var AVALANCHE_MAINNET = WatcherBlockchain{
ChainID: vaa.ChainIDAvalanche,
Name: "avalanche",
Address: "0x0e082F06FF657D94310cB8cE8B0D9a04541d8052",
SizeBlocks: 100,
WaitSeconds: 10,
InitialBlock: 8237181,
}
var APTOS_MAINNET = WatcherBlockchain{
ChainID: vaa.ChainIDAptos,
Name: "aptos",
Address: "0x576410486a2da45eee6c949c995670112ddf2fbeedab20350d506328eefc9d4f",
SizeBlocks: 50,
WaitSeconds: 10,
InitialBlock: 1094430,
}
var OASIS_MAINNET = WatcherBlockchain{
ChainID: vaa.ChainIDOasis,
Name: "oasis",
Address: "0x5848C791e09901b40A9Ef749f2a6735b418d7564",
SizeBlocks: 50,
WaitSeconds: 10,
InitialBlock: 1762,
}
var MOONBEAM_MAINNET = WatcherBlockchain{
ChainID: vaa.ChainIDMoonbeam,
Name: "moonbeam",
Address: "0xb1731c586ca89a23809861c6103f0b96b3f57d92",
SizeBlocks: 50,
WaitSeconds: 10,
InitialBlock: 1853330,
}

View File

@ -0,0 +1,84 @@
package config
import "github.com/wormhole-foundation/wormhole/sdk/vaa"
var ETHEREUM_TESTNET = WatcherBlockchain{
ChainID: vaa.ChainIDEthereum,
Name: "eth_goerli",
Address: "0xF890982f9310df57d00f659cf4fd87e65adEd8d7",
SizeBlocks: 100,
WaitSeconds: 10,
InitialBlock: 8660321,
}
var POLYGON_TESTNET = WatcherBlockchain{
ChainID: vaa.ChainIDPolygon,
Name: "polygon_mumbai",
Address: "0x377D55a7928c046E18eEbb61977e714d2a76472a",
SizeBlocks: 100,
WaitSeconds: 10,
InitialBlock: 33151522,
}
var BSC_TESTNET = WatcherBlockchain{
ChainID: vaa.ChainIDBSC,
Name: "bsc_testnet_chapel",
Address: "0x9dcF9D205C9De35334D646BeE44b2D2859712A09",
SizeBlocks: 100,
WaitSeconds: 10,
InitialBlock: 28071327,
}
var FANTOM_TESTNET = WatcherBlockchain{
ChainID: vaa.ChainIDFantom,
Name: "fantom_testnet",
Address: "0x599CEa2204B4FaECd584Ab1F2b6aCA137a0afbE8",
SizeBlocks: 100,
WaitSeconds: 10,
InitialBlock: 14524466,
}
var SOLANA_TESTNET = WatcherBlockchain{
ChainID: vaa.ChainIDSolana,
Name: "solana",
Address: "DZnkkTmCiFWfYTfT41X3Rd1kDgozqzxWaHqsw6W4x2oe",
SizeBlocks: 10,
WaitSeconds: 10,
InitialBlock: 16820790,
}
var AVALANCHE_TESTNET = WatcherBlockchain{
ChainID: vaa.ChainIDAvalanche,
Name: "avalanche_fuji",
Address: "0x61E44E506Ca5659E6c0bba9b678586fA2d729756",
SizeBlocks: 100,
WaitSeconds: 10,
InitialBlock: 11014526,
}
var APTOS_TESTNET = WatcherBlockchain{
ChainID: vaa.ChainIDAptos,
Name: "aptos",
Address: "0x576410486a2da45eee6c949c995670112ddf2fbeedab20350d506328eefc9d4f",
SizeBlocks: 50,
WaitSeconds: 10,
InitialBlock: 21522262,
}
var OASIS_TESTNET = WatcherBlockchain{
ChainID: vaa.ChainIDOasis,
Name: "oasis",
Address: "0x88d8004A9BdbfD9D28090A02010C19897a29605c",
SizeBlocks: 50,
WaitSeconds: 10,
InitialBlock: 130400,
}
var MOONBEAM_TESTNET = WatcherBlockchain{
ChainID: vaa.ChainIDMoonbeam,
Name: "moonbeam",
Address: "0xbc976D4b9D57E57c3cA52e1Fd136C45FF7955A96",
SizeBlocks: 50,
WaitSeconds: 10,
InitialBlock: 2097310,
}

View File

@ -0,0 +1,12 @@
package config
import "github.com/wormhole-foundation/wormhole/sdk/vaa"
type WatcherBlockchain struct {
ChainID vaa.ChainID
Name string
Address string
SizeBlocks uint8
WaitSeconds uint16
InitialBlock int64
}

View File

@ -94,7 +94,7 @@ func (w *AptosWatcher) Start(ctx context.Context) error {
toBlock = lastBlock
}
w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.processBlock(ctx, fromBlock, toBlock)
w.processBlock(ctx, fromBlock, toBlock, true)
w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
// process all the blocks between current and last block.
@ -117,7 +117,17 @@ func (w *AptosWatcher) Close() {
w.wg.Wait()
}
func (w *AptosWatcher) processBlock(ctx context.Context, fromBlock uint64, toBlock uint64) {
func (w *AptosWatcher) Backfill(ctx context.Context, fromBlock uint64, toBlock uint64, pageSize uint64, persistBlock bool) {
totalBlocks := getTotalBlocks(fromBlock, toBlock, pageSize)
for i := uint64(0); i < totalBlocks; i++ {
fromBlock, toBlock := getPage(fromBlock, i, pageSize, toBlock)
w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.processBlock(ctx, fromBlock, toBlock, persistBlock)
w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
}
func (w *AptosWatcher) processBlock(ctx context.Context, fromBlock uint64, toBlock uint64, updateWatcherBlock bool) {
for block := fromBlock; block <= toBlock; block++ {
w.logger.Debug("processing block", zap.Uint64("block", block))
@ -140,13 +150,16 @@ func (w *AptosWatcher) processBlock(ctx context.Context, fromBlock uint64, toBlo
for _, tx := range result.Transactions {
w.processTransaction(ctx, tx, block, blockTime)
}
// update the last block number processed in the database.
watcherBlock := storage.WatcherBlock{
ID: w.blockchain,
BlockNumber: int64(block),
UpdatedAt: time.Now(),
if updateWatcherBlock {
// update the last block number processed in the database.
watcherBlock := storage.WatcherBlock{
ID: w.blockchain,
BlockNumber: int64(block),
UpdatedAt: time.Now(),
}
return w.repository.UpdateWatcherBlock(ctx, watcherBlock)
}
return w.repository.UpdateWatcherBlock(ctx, watcherBlock)
return nil
},
retry.Attempts(aptosMaxRetries),
retry.Delay(aptosRetryDelay),

View File

@ -4,8 +4,9 @@ import "context"
// ContractTracker is an interface for tracking contracts
// It Tracks contract operations and persist the tx data
// BackfillContract is used to backfill the contract data from the past
// Backfill is used to backfill the contract data from the past
type ContractWatcher interface {
Start(ctx context.Context) error
Close()
Backfill(ctx context.Context, fromBlock uint64, toBlock uint64, pageSize uint64, persistBlock bool)
}

View File

@ -84,7 +84,7 @@ func (w *EvmStandarWatcher) Start(ctx context.Context) error {
for i := uint64(0); i < totalBlocks; i++ {
fromBlock, toBlock := getPage(currentBlock, i, w.maxBlocks, lastBlock)
w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.processBlock(ctx, fromBlock, toBlock)
w.processBlock(ctx, fromBlock, toBlock, true)
w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
// process all the blocks between current and last block.
@ -103,7 +103,17 @@ func (w *EvmStandarWatcher) Start(ctx context.Context) error {
}
func (w *EvmStandarWatcher) processBlock(ctx context.Context, fromBlock uint64, toBlock uint64) {
func (w *EvmStandarWatcher) Backfill(ctx context.Context, fromBlock uint64, toBlock uint64, pageSize uint64, persistBlock bool) {
totalBlocks := getTotalBlocks(fromBlock, toBlock, pageSize)
for i := uint64(0); i < totalBlocks; i++ {
fromBlock, toBlock := getPage(fromBlock, i, pageSize, toBlock)
w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.processBlock(ctx, fromBlock, toBlock, persistBlock)
w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
}
func (w *EvmStandarWatcher) processBlock(ctx context.Context, fromBlock uint64, toBlock uint64, updateWatcherBlock bool) {
for block := fromBlock; block <= toBlock; block++ {
w.logger.Debug("processing block", zap.Uint64("block", block))
retry.Do(
@ -160,13 +170,17 @@ func (w *EvmStandarWatcher) processBlock(ctx context.Context, fromBlock uint64,
}
processTransaction(ctx, w.chainID, evmTx, w.repository, w.logger)
}
// update the last block number processed in the database.
watcherBlock := storage.WatcherBlock{
ID: w.blockchain,
BlockNumber: int64(block),
UpdatedAt: time.Now(),
if updateWatcherBlock {
// update the last block number processed in the database.
watcherBlock := storage.WatcherBlock{
ID: w.blockchain,
BlockNumber: int64(block),
UpdatedAt: time.Now(),
}
return w.repository.UpdateWatcherBlock(ctx, watcherBlock)
}
return w.repository.UpdateWatcherBlock(ctx, watcherBlock)
return nil
},
retry.Attempts(evmMaxRetries),
retry.Delay(evmRetryDelay),

View File

@ -80,7 +80,7 @@ func (w *EVMWatcher) Start(ctx context.Context) error {
toBlock = lastBlock
}
w.logger.Info("processing blocks", zap.Int64("from", fromBlock), zap.Int64("to", toBlock))
w.processBlock(ctx, fromBlock, toBlock)
w.processBlock(ctx, fromBlock, toBlock, true)
w.logger.Info("blocks processed", zap.Int64("from", fromBlock), zap.Int64("to", toBlock))
}
// process all the blocks between current and last block.
@ -99,7 +99,17 @@ func (w *EVMWatcher) Start(ctx context.Context) error {
}
func (w *EVMWatcher) processBlock(ctx context.Context, currentBlock int64, lastBlock int64) {
func (w *EVMWatcher) Backfill(ctx context.Context, fromBlock uint64, toBlock uint64, pageSize uint64, persistBlock bool) {
totalBlocks := getTotalBlocks(fromBlock, toBlock, pageSize)
for i := uint64(0); i < totalBlocks; i++ {
fromBlock, toBlock := getPage(fromBlock, i, pageSize, toBlock)
w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.processBlock(ctx, int64(fromBlock), int64(toBlock), persistBlock)
w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
}
func (w *EVMWatcher) processBlock(ctx context.Context, currentBlock int64, lastBlock int64, updateWatcherBlock bool) {
pageToken := ""
hasPage := true
@ -153,13 +163,15 @@ func (w *EVMWatcher) processBlock(ctx context.Context, currentBlock int64, lastB
zap.Int64("newBlockNumber", newBlockNumber),
zap.String("lastBlockNumberHex", lastBlockNumberHex))
if newBlockNumber != -1 {
watcherBlock := storage.WatcherBlock{
ID: w.blockchain,
BlockNumber: newBlockNumber,
UpdatedAt: time.Now(),
if updateWatcherBlock {
if newBlockNumber != -1 {
watcherBlock := storage.WatcherBlock{
ID: w.blockchain,
BlockNumber: newBlockNumber,
UpdatedAt: time.Now(),
}
w.repository.UpdateWatcherBlock(ctx, watcherBlock)
}
w.repository.UpdateWatcherBlock(ctx, watcherBlock)
}
pageToken = r.Result.NextPageToken

View File

@ -1,12 +1,12 @@
package watcher
func getTotalBlocks(lastBlock, currentBlock, maxBlocks uint64) uint64 {
return (lastBlock-currentBlock)/maxBlocks + 1
func getTotalBlocks(lastBlock, currentBlock, pageSize uint64) uint64 {
return (lastBlock-currentBlock)/pageSize + 1
}
func getPage(currentBlock, index, maxBlocks, lastBlock uint64) (uint64, uint64) {
fromBlock := currentBlock + index*maxBlocks
toBlock := fromBlock + maxBlocks - 1
func getPage(currentBlock, index, pageSize, lastBlock uint64) (uint64, uint64) {
fromBlock := currentBlock + index*pageSize
toBlock := fromBlock + pageSize - 1
if toBlock > lastBlock {
toBlock = lastBlock
}

View File

@ -146,7 +146,7 @@ func (w *SolanaWatcher) Start(ctx context.Context) error {
toBlock = lastBlock
}
w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.processBlock(ctx, fromBlock, toBlock)
w.processBlock(ctx, fromBlock, toBlock, true)
w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
// process all the blocks between current and last block.
@ -169,7 +169,17 @@ func (w *SolanaWatcher) Close() {
w.wg.Wait()
}
func (w *SolanaWatcher) processBlock(ctx context.Context, fromBlock uint64, toBlock uint64) {
func (w *SolanaWatcher) Backfill(ctx context.Context, fromBlock uint64, toBlock uint64, pageSize uint64, persistBlock bool) {
totalBlocks := getTotalBlocks(fromBlock, toBlock, pageSize)
for i := uint64(0); i < totalBlocks; i++ {
fromBlock, toBlock := getPage(fromBlock, i, pageSize, toBlock)
w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.processBlock(ctx, fromBlock, toBlock, persistBlock)
w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
}
func (w *SolanaWatcher) processBlock(ctx context.Context, fromBlock uint64, toBlock uint64, updateWatcherBlock bool) {
for block := fromBlock; block <= toBlock; block++ {
logger := w.logger.With(zap.Uint64("block", block))
@ -192,13 +202,17 @@ func (w *SolanaWatcher) processBlock(ctx context.Context, fromBlock uint64, toBl
for txNum, txRpc := range result.Transactions {
w.processTransaction(ctx, &txRpc, block, txNum, result.BlockTime)
}
// update the last block number processed in the database.
watcherBlock := storage.WatcherBlock{
ID: w.blockchain,
BlockNumber: int64(block),
UpdatedAt: time.Now(),
if updateWatcherBlock {
watcherBlock := storage.WatcherBlock{
ID: w.blockchain,
BlockNumber: int64(block),
UpdatedAt: time.Now(),
}
return w.repository.UpdateWatcherBlock(ctx, watcherBlock)
}
return w.repository.UpdateWatcherBlock(ctx, watcherBlock)
return nil
},
retry.Attempts(maxRetries),
retry.Delay(retryDelay),

View File

@ -120,6 +120,27 @@ func (w *TerraWatcher) Start(ctx context.Context) error {
}
}
func (w *TerraWatcher) Backfill(ctx context.Context, fromBlock uint64, toBlock uint64, pageSize uint64, persistBlock bool) {
totalBlocks := getTotalBlocks(fromBlock, toBlock, pageSize)
for i := uint64(0); i < totalBlocks; i++ {
fromBlock, toBlock := getPage(fromBlock, i, pageSize, toBlock)
w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
for block := fromBlock; block <= toBlock; block++ {
w.processBlock(ctx, int64(block))
if persistBlock {
// update block watcher
watcherBlock := storage.WatcherBlock{
ID: w.blockchain,
BlockNumber: int64(block),
UpdatedAt: time.Now(),
}
w.repository.UpdateWatcherBlock(ctx, watcherBlock)
}
}
w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
}
func (w *TerraWatcher) processBlock(ctx context.Context, block int64) {
var offset *int
hasPage := true

View File

@ -0,0 +1,51 @@
---
apiVersion: batch/v1
kind: Job
metadata:
name: {{ .NAME }}-solana-backfiller
namespace: {{ .NAMESPACE }}
spec:
template:
metadata:
labels:
app: {{ .NAME }}-solana-backfiller
spec:
restartPolicy: Never
terminationGracePeriodSeconds: 40
containers:
- name: {{ .NAME }}-solana-backfiller
image: {{ .IMAGE_NAME }}
imagePullPolicy: Always
env:
- name: MONGODB_URI
valueFrom:
secretKeyRef:
name: mongodb
key: mongo-uri
- name: MONGODB_DATABASE
valueFrom:
configMapKeyRef:
name: config
key: mongo-database
- name: SOLANA_URL
valueFrom:
secretKeyRef:
name: blockchain
key: solana-url
command: ["/contract-watcher"]
args:
- backfiller
- --mongo-uri
- "$(MONGODB_URI)"
- --mongo-database
- "$(MONGODB_DATABASE)"
- --network
- "{{ .P2P_NETWORK }}"
- --chain-name
- "solana"
- --chain-url
- "$(SOLANA_URL)"
- --from
- "183675278"
- --to
- "196868281"