remove aptos from contract-watcher (#1177)

This commit is contained in:
walker-16 2024-03-11 14:21:15 -03:00 committed by GitHub
parent a93dd4a036
commit 2d61225ac2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 0 additions and 466 deletions

View File

@ -6,7 +6,6 @@ import (
solana_go "github.com/gagliardetto/solana-go"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/config"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/ankr"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/aptos"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/evm"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/solana"
@ -46,18 +45,6 @@ func CreateTerraWatcher(rateLimit int, chainURL string, wb config.WatcherBlockch
return watcher.NewTerraWatcher(terraClient, params, repo, metrics, logger)
}
func CreateAptosWatcher(rateLimit int, chainURL string, wb config.WatcherBlockchain, logger *zap.Logger, repo *storage.Repository, metrics metrics.Metrics) watcher.ContractWatcher {
aptosLimiter := ratelimit.New(rateLimit, ratelimit.Per(time.Second))
aptosClient := aptos.NewAptosSDK(chainURL, aptosLimiter, metrics)
params := watcher.AptosParams{
Blockchain: wb.Name,
ContractAddress: wb.Address,
SizeBlocks: wb.SizeBlocks,
WaitSeconds: wb.WaitSeconds,
InitialBlock: wb.InitialBlock}
return watcher.NewAptosWatcher(aptosClient, params, repo, metrics, logger)
}
func CreateEvmWatcher(
rateLimit int,
chainURL string,

View File

@ -81,8 +81,6 @@ func newWatcherForMainnet(cfg *config.BackfillerConfiguration, repo *storage.Rep
watcher = builder.CreateAnkrEvmWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.AVALANCHE_MAINNET, repo, metrics, logger)
case config.TERRA_MAINNET.ChainID.String():
watcher = builder.CreateTerraWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.TERRA_MAINNET, logger, repo, metrics)
case config.APTOS_MAINNET.ChainID.String():
watcher = builder.CreateAptosWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.APTOS_MAINNET, logger, repo, metrics)
case config.OASIS_MAINNET.ChainID.String():
watcher = builder.CreateEvmWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.OASIS_MAINNET, logger, repo, metrics)
case config.MOONBEAM_MAINNET.ChainID.String():
@ -114,8 +112,6 @@ func newWatcherForTestnet(cfg *config.BackfillerConfiguration, repo *storage.Rep
watcher = builder.CreateAnkrEvmWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.FANTOM_TESTNET, repo, metrics, logger)
case config.AVALANCHE_TESTNET.ChainID.String():
watcher = builder.CreateAnkrEvmWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.AVALANCHE_TESTNET, repo, metrics, logger)
case config.APTOS_TESTNET.ChainID.String():
watcher = builder.CreateAptosWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.APTOS_TESTNET, logger, repo, metrics)
case config.OASIS_TESTNET.ChainID.String():
watcher = builder.CreateEvmWatcher(cfg.RateLimitPerSecond, cfg.ChainUrl, config.OASIS_TESTNET, logger, repo, metrics)
case config.MOONBEAM_TESTNET.ChainID.String():

View File

@ -42,7 +42,6 @@ func handleExit() {
type watchersConfig struct {
ankr []config.WatcherBlockchainAddresses
aptos *config.WatcherBlockchain
arbitrum *config.WatcherBlockchainAddresses
avalanche *config.WatcherBlockchainAddresses
base *config.WatcherBlockchainAddresses
@ -60,7 +59,6 @@ type watchersConfig struct {
type rateLimitConfig struct {
ankr int
aptos int
arbitrum int
avalanche int
base int
@ -207,12 +205,6 @@ func newWatchers(config *config.ServiceConfiguration, testnetConfig *config.Test
result = append(result, terraWatcher)
}
// add aptos watcher
if watchers.aptos != nil {
aptosWatcher := builder.CreateAptosWatcher(watchers.rateLimit.aptos, config.AptosUrl, *watchers.aptos, logger, repo, metrics)
result = append(result, aptosWatcher)
}
// add oasis watcher
if watchers.oasis != nil {
oasisWatcher := builder.CreateEvmWatcher(watchers.rateLimit.oasis, config.OasisUrl, *watchers.oasis, logger, repo, metrics)
@ -270,7 +262,6 @@ func newWatchersForMainnet(cfg *config.ServiceConfiguration) *watchersConfig {
config.BSC_MAINNET,
config.FANTOM_MAINNET,
},
aptos: &config.APTOS_MAINNET,
arbitrum: &config.ARBITRUM_MAINNET,
avalanche: &config.AVALANCHE_MAINNET,
base: &config.BASE_MAINNET,
@ -285,7 +276,6 @@ func newWatchersForMainnet(cfg *config.ServiceConfiguration) *watchersConfig {
rateLimit: rateLimitConfig{
ankr: cfg.AnkrRequestsPerSecond,
avalanche: cfg.AvalancheRequestsPerSecond,
aptos: cfg.AptosRequestsPerSecond,
arbitrum: cfg.ArbitrumRequestsPerSecond,
base: cfg.BaseRequestsPerSecond,
celo: cfg.CeloRequestsPerSecond,
@ -305,7 +295,6 @@ func newWatchersForTestnet(cfg *config.ServiceConfiguration, testnetCfg *config.
config.BSC_TESTNET,
config.FANTOM_TESTNET,
},
aptos: &config.APTOS_TESTNET,
arbitrum: &config.ARBITRUM_TESTNET,
avalanche: &config.AVALANCHE_TESTNET,
celo: &config.CELO_TESTNET,
@ -320,7 +309,6 @@ func newWatchersForTestnet(cfg *config.ServiceConfiguration, testnetCfg *config.
rateLimit: rateLimitConfig{
ankr: cfg.AnkrRequestsPerSecond,
avalanche: cfg.AvalancheRequestsPerSecond,
aptos: cfg.AptosRequestsPerSecond,
arbitrum: cfg.ArbitrumRequestsPerSecond,
base: cfg.BaseRequestsPerSecond,
baseSepolia: testnetCfg.BaseSepoliaRequestsPerMinute,

View File

@ -14,8 +14,6 @@ type ServiceConfiguration struct {
AnkrUrl string `env:"ANKR_URL,required"`
AnkrRequestsPerSecond int `env:"ANKR_REQUESTS_PER_SECOND,required"`
AptosUrl string `env:"APTOS_URL,required"`
AptosRequestsPerSecond int `env:"APTOS_REQUESTS_PER_SECOND,required"`
ArbitrumUrl string `env:"ARBITRUM_URL,required"`
ArbitrumRequestsPerSecond int `env:"ARBITRUM_REQUESTS_PER_SECOND,required"`
AvalancheUrl string `env:"AVALANCHE_URL,required"`

View File

@ -191,15 +191,6 @@ var AVALANCHE_MAINNET = WatcherBlockchainAddresses{
},
}
var APTOS_MAINNET = WatcherBlockchain{
ChainID: vaa.ChainIDAptos,
Name: "aptos",
Address: "0x576410486a2da45eee6c949c995670112ddf2fbeedab20350d506328eefc9d4f",
SizeBlocks: 50,
WaitSeconds: 10,
InitialBlock: 1094430,
}
var OASIS_MAINNET = WatcherBlockchainAddresses{
ChainID: vaa.ChainIDOasis,
Name: "oasis",

View File

@ -210,15 +210,6 @@ var AVALANCHE_TESTNET = WatcherBlockchainAddresses{
},
}
var APTOS_TESTNET = WatcherBlockchain{
ChainID: vaa.ChainIDAptos,
Name: "aptos",
Address: "0x576410486a2da45eee6c949c995670112ddf2fbeedab20350d506328eefc9d4f",
SizeBlocks: 50,
WaitSeconds: 10,
InitialBlock: 21522262,
}
var OASIS_TESTNET = WatcherBlockchainAddresses{
ChainID: vaa.ChainIDOasis,
Name: "oasis",

View File

@ -1,151 +0,0 @@
package aptos
import (
"context"
"fmt"
"net/http"
"strconv"
"time"
"github.com/go-resty/resty/v2"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/metrics"
"go.uber.org/ratelimit"
)
var ErrTooManyRequests = fmt.Errorf("too many requests")
const clientName = "aptos"
// AptosSDK is a client for the Aptos API.
type AptosSDK struct {
client *resty.Client
rl ratelimit.Limiter
metrics metrics.Metrics
}
type GetLatestBlock struct {
BlockHeight string `json:"block_height"`
}
type Payload struct {
Function string `json:"function"`
TypeArguments []string `json:"type_arguments"`
Arguments []any `json:"arguments"`
Type string `json:"type"`
}
type Transaction struct {
Version string `json:"version"`
Hash string `json:"hash"`
Payload Payload `json:"payload,omitempty"`
}
type GetBlockResult struct {
BlockHeight string `json:"block_height"`
BlockHash string `json:"block_hash"`
BlockTimestamp string `json:"block_timestamp"`
Transactions []Transaction `json:"transactions"`
}
type GetTransactionResult struct {
Version string `json:"version"`
Hash string `json:"hash"`
StateChangeHash string `json:"state_change_hash"`
EventRootHash string `json:"event_root_hash"`
StateCheckpointHash any `json:"state_checkpoint_hash"`
GasUsed string `json:"gas_used"`
Success bool `json:"success"`
VMStatus string `json:"vm_status"`
}
func (r *GetBlockResult) GetBlockTime() (*time.Time, error) {
t, err := strconv.ParseUint(r.BlockTimestamp, 10, 64)
if err != nil {
return nil, err
}
tm := time.UnixMicro(int64(t))
return &tm, nil
}
// NewAptosSDK creates a new AptosSDK.
func NewAptosSDK(url string, rl ratelimit.Limiter, metrics metrics.Metrics) *AptosSDK {
return &AptosSDK{
rl: rl,
client: resty.New().SetBaseURL(url),
metrics: metrics,
}
}
func (s *AptosSDK) GetLatestBlock(ctx context.Context) (uint64, error) {
s.rl.Take()
resp, err := s.client.R().
SetContext(ctx).
SetResult(&GetLatestBlock{}).
Get("v1")
if err != nil {
return 0, err
}
s.metrics.IncRpcRequest(clientName, "get-latest-block", resp.StatusCode())
if resp.IsError() {
return 0, fmt.Errorf("status code: %s. %s", resp.Status(), string(resp.Body()))
}
result := resp.Result().(*GetLatestBlock)
if result == nil {
return 0, fmt.Errorf("empty response")
}
if result.BlockHeight == "" {
return 0, fmt.Errorf("empty block height")
}
return strconv.ParseUint(result.BlockHeight, 10, 64)
}
func (s *AptosSDK) GetBlock(ctx context.Context, block uint64) (*GetBlockResult, error) {
s.rl.Take()
resp, err := s.client.R().
SetContext(ctx).
SetResult(&GetBlockResult{}).
SetQueryParam("with_transactions", "true").
Get(fmt.Sprintf("v1/blocks/by_height/%d", block))
if err != nil {
return nil, err
}
s.metrics.IncRpcRequest(clientName, "get-block", resp.StatusCode())
if resp.IsError() {
if resp.StatusCode() == http.StatusTooManyRequests {
return nil, ErrTooManyRequests
}
return nil, fmt.Errorf("status code: %s. %s", resp.Status(), string(resp.Body()))
}
return resp.Result().(*GetBlockResult), nil
}
func (s *AptosSDK) GetTransaction(ctx context.Context, version string) (*GetTransactionResult, error) {
s.rl.Take()
resp, err := s.client.R().
SetContext(ctx).
SetResult(&GetTransactionResult{}).
SetQueryParam("with_transactions", "true").
Get(fmt.Sprintf("v1/transactions/by_version/%s", version))
if err != nil {
return nil, err
}
s.metrics.IncRpcRequest(clientName, "get-transaction", resp.StatusCode())
if resp.IsError() {
if resp.StatusCode() == http.StatusTooManyRequests {
return nil, ErrTooManyRequests
}
return nil, fmt.Errorf("status code: %s. %s", resp.Status(), string(resp.Body()))
}
return resp.Result().(*GetTransactionResult), nil
}

View File

@ -1,266 +0,0 @@
package watcher
import (
"context"
"encoding/hex"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/avast/retry-go"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/aptos"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/storage"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
const CompleteTransferMethod = "complete_transfer::submit_vaa_and_register_entry"
const aptosMaxRetries = 10
const aptosRetryDelay = 5 * time.Second
type AptosParams struct {
Blockchain string
ContractAddress string
SizeBlocks uint8
WaitSeconds uint16
InitialBlock int64
}
type AptosWatcher struct {
client *aptos.AptosSDK
chainID vaa.ChainID
blockchain string
contractAddress string
sizeBlocks uint8
waitSeconds uint16
initialBlock int64
repository *storage.Repository
logger *zap.Logger
close chan bool
wg sync.WaitGroup
metrics metrics.Metrics
}
func NewAptosWatcher(client *aptos.AptosSDK, params AptosParams, repo *storage.Repository, metrics metrics.Metrics, logger *zap.Logger) *AptosWatcher {
chainID := vaa.ChainIDAptos
return &AptosWatcher{
client: client,
chainID: chainID,
blockchain: params.Blockchain,
contractAddress: params.ContractAddress,
sizeBlocks: params.SizeBlocks,
waitSeconds: params.WaitSeconds,
initialBlock: params.InitialBlock,
repository: repo,
metrics: metrics,
logger: logger.With(zap.String("blockchain", params.Blockchain), zap.Uint16("chainId", uint16(chainID))),
}
}
func (w *AptosWatcher) GetBlockchain() string {
return w.blockchain
}
func (w *AptosWatcher) Start(ctx context.Context) error {
// get the current block for the chain.
cBlock, err := w.repository.GetCurrentBlock(ctx, w.blockchain, w.initialBlock)
if err != nil {
w.logger.Error("cannot get current block", zap.Error(err))
return err
}
currentBlock := uint64(cBlock)
w.wg.Add(1)
for {
select {
case <-ctx.Done():
w.logger.Info("clossing watcher by context")
w.wg.Done()
return nil
case <-w.close:
w.logger.Info("clossing watcher")
w.wg.Done()
return nil
default:
// get the latest block for the chain.
lastBlock, err := w.client.GetLatestBlock(ctx)
if err != nil {
w.logger.Error("cannot get latest block", zap.Error(err))
}
maxBlocks := uint64(w.sizeBlocks)
w.logger.Debug("current block", zap.Uint64("current", currentBlock), zap.Uint64("last", lastBlock))
if currentBlock < lastBlock {
w.metrics.SetLastBlock(w.chainID, lastBlock)
totalBlocks := (lastBlock-currentBlock)/maxBlocks + 1
for i := 0; i < int(totalBlocks); i++ {
fromBlock := currentBlock + uint64(i)*maxBlocks
toBlock := fromBlock + maxBlocks - 1
if toBlock > lastBlock {
toBlock = lastBlock
}
w.logger.Debug("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.processBlock(ctx, fromBlock, toBlock, true)
w.logger.Debug("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
// process all the blocks between current and last block.
} else {
w.logger.Debug("waiting for new blocks")
select {
case <-ctx.Done():
w.wg.Done()
return nil
case <-time.After(time.Duration(w.waitSeconds) * time.Second):
}
}
if lastBlock > currentBlock {
currentBlock = lastBlock
}
}
}
}
func (w *AptosWatcher) Close() {
close(w.close)
w.wg.Wait()
}
func (w *AptosWatcher) Backfill(ctx context.Context, fromBlock uint64, toBlock uint64, pageSize uint64, persistBlock bool) {
totalBlocks := getTotalBlocks(toBlock, fromBlock, pageSize)
for i := uint64(0); i < totalBlocks; i++ {
fromBlock, toBlock := getPage(fromBlock, i, pageSize, toBlock)
w.logger.Info("processing blocks", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
w.processBlock(ctx, fromBlock, toBlock, persistBlock)
w.logger.Info("blocks processed", zap.Uint64("from", fromBlock), zap.Uint64("to", toBlock))
}
}
func (w *AptosWatcher) processBlock(ctx context.Context, fromBlock uint64, toBlock uint64, updateWatcherBlock bool) {
for block := fromBlock; block <= toBlock; block++ {
w.logger.Debug("processing block", zap.Uint64("block", block))
retry.Do(
func() error {
// get the transactions for the block.
result, err := w.client.GetBlock(ctx, block)
if err != nil {
w.logger.Error("cannot get block", zap.Uint64("block", block), zap.Error(err))
if err == aptos.ErrTooManyRequests {
return err
}
return nil
}
blockTime, err := result.GetBlockTime()
if err != nil {
w.logger.Warn("cannot get block time", zap.Uint64("block", block), zap.Error(err))
}
for _, tx := range result.Transactions {
w.processTransaction(ctx, tx, block, blockTime)
}
if updateWatcherBlock {
// update the last block number processed in the database.
watcherBlock := storage.WatcherBlock{
ID: w.blockchain,
BlockNumber: int64(block),
UpdatedAt: time.Now(),
}
return w.repository.UpdateWatcherBlock(ctx, w.chainID, watcherBlock)
}
return nil
},
retry.Attempts(aptosMaxRetries),
retry.Delay(aptosRetryDelay),
)
}
}
func (w *AptosWatcher) processTransaction(ctx context.Context, tx aptos.Transaction, block uint64, blockTime *time.Time) {
found, method := w.isTokenBridgeFunction(tx.Payload.Function)
if !found {
return
}
log := w.logger.With(
zap.String("txHash", tx.Hash),
zap.String("txVersion", tx.Version),
zap.String("function", tx.Payload.Function),
zap.Uint64("block", block))
if method != CompleteTransferMethod {
log.Warn("unkown method", zap.String("method", method))
return
}
log.Debug("found Wormhole transaction")
if len(tx.Payload.Arguments) != 1 {
log.Error("invalid number of arguments",
zap.Int("arguments", len(tx.Payload.Arguments)))
return
}
switch tx.Payload.Arguments[0].(type) {
case string:
default:
log.Error("invalid type of argument")
return
}
vaaArg := tx.Payload.Arguments[0].(string)
data, err := hex.DecodeString(strings.TrimPrefix(vaaArg, "0x"))
if err != nil {
log.Error("invalid vaa argument",
zap.String("argument", vaaArg),
zap.Error(err))
return
}
result, err := vaa.Unmarshal(data)
if err != nil {
log.Error("invalid vaa",
zap.Error(err))
return
}
txResult, err := w.client.GetTransaction(ctx, tx.Version)
if err != nil {
log.Error("get transaction error",
zap.String("version", tx.Version),
zap.Error(err))
return
}
status := domain.DstTxStatusFailedToProcess
if txResult.Success {
status = domain.DstTxStatusConfirmed
}
updatedAt := time.Now()
globalTx := storage.TransactionUpdate{
ID: result.MessageID(),
Destination: storage.DestinationTx{
ChainID: w.chainID,
Status: status,
Method: method,
TxHash: tx.Hash,
BlockNumber: strconv.FormatUint(block, 10),
Timestamp: blockTime,
UpdatedAt: &updatedAt,
},
}
// update global transaction and check if it should be updated.
updateGlobalTransaction(ctx, w.chainID, globalTx, w.repository, log)
}
func (w *AptosWatcher) isTokenBridgeFunction(fn string) (bool, string) {
prefixFunction := fmt.Sprintf("%s::", w.contractAddress)
if !strings.HasPrefix(fn, prefixFunction) {
return false, ""
}
return true, strings.TrimPrefix(fn, prefixFunction)
}