diff --git a/contract-watcher/cmd/main.go b/contract-watcher/cmd/main.go index 26c3ef83..7ffef5c2 100644 --- a/contract-watcher/cmd/main.go +++ b/contract-watcher/cmd/main.go @@ -17,6 +17,7 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/ankr" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/db" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/solana" + "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/terra" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/processor" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/storage" "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/watcher" @@ -122,12 +123,14 @@ type watcherBlockchain struct { type watchersConfig struct { evms []watcherBlockchain solana *watcherBlockchain + terra *watcherBlockchain rateLimit rateLimitConfig } type rateLimitConfig struct { evm int solana int + terra int } func newWatchers(config *config.Configuration, repo *storage.Repository, logger *zap.Logger) []watcher.ContractWatcher { @@ -140,6 +143,8 @@ func newWatchers(config *config.Configuration, repo *storage.Repository, logger default: watchers = &watchersConfig{} } + + // add evm watchers result := make([]watcher.ContractWatcher, 0) // add evm watchers @@ -163,6 +168,16 @@ func newWatchers(config *config.Configuration, repo *storage.Repository, logger SizeBlocks: watchers.solana.sizeBlocks, WaitSeconds: watchers.solana.waitSeconds, InitialBlock: watchers.solana.initialBlock} result = append(result, watcher.NewSolanaWatcher(solanaClient, repo, params, logger)) } + + // add terra watcher + if watchers.terra != nil { + terraLimiter := ratelimit.New(watchers.rateLimit.terra, ratelimit.Per(time.Second)) + terraClient := terra.NewTerraSDK(config.TerraUrl, terraLimiter) + params := watcher.TerraParams{ChainID: watchers.terra.chainID, Blockchain: watchers.terra.name, + ContractAddress: watchers.terra.address, WaitSeconds: watchers.terra.waitSeconds, InitialBlock: watchers.terra.initialBlock} + result = append(result, watcher.NewTerraWatcher(terraClient, params, repo, logger)) + } + return result } @@ -175,9 +190,11 @@ func newEVMWatchersForMainnet() *watchersConfig { {vaa.ChainIDFantom, "fantom", "0x7C9Fc5741288cDFdD83CeB07f3ea7e22618D79D2", 100, 10, 57525624}, }, solana: &watcherBlockchain{vaa.ChainIDSolana, "solana", "wormDTUJ6AWPNvk59vGQbDvGJmqbDTdgWgAqcLBCgUb", 100, 10, 183675278}, + terra: &watcherBlockchain{vaa.ChainIDTerra, "terra", "terra10nmmwe8r3g99a9newtqa7a75xfgs2e8z87r2sf", 0, 10, 12005338}, rateLimit: rateLimitConfig{ evm: 1000, solana: 3, + terra: 10, }, } } @@ -194,6 +211,7 @@ func newEVMWatchersForTestnet() *watchersConfig { rateLimit: rateLimitConfig{ evm: 10, solana: 2, + terra: 5, }, } } diff --git a/contract-watcher/config/config.go b/contract-watcher/config/config.go index b8562533..3d101097 100644 --- a/contract-watcher/config/config.go +++ b/contract-watcher/config/config.go @@ -16,6 +16,7 @@ type Configuration struct { MongoDatabase string `env:"MONGODB_DATABASE,required"` AnkrUrl string `env:"ANKR_URL,required"` SolanaUrl string `env:"SOLANA_URL,required"` + TerraUrl string `env:"TERRA_URL,required"` PprofEnabled bool `env:"PPROF_ENABLED,default=false"` P2pNetwork string `env:"P2P_NETWORK,required"` } diff --git a/contract-watcher/internal/ankr/client.go b/contract-watcher/internal/ankr/client.go index a2138fae..e0d4228a 100644 --- a/contract-watcher/internal/ankr/client.go +++ b/contract-watcher/internal/ankr/client.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "fmt" "io/ioutil" "math/rand" "net/http" @@ -33,9 +32,7 @@ func (s AnkrSDK) GetTransactionsByAddress(ctx context.Context, request Transacti } req, err := http.NewRequest("POST", s.url, bytes.NewReader(payload)) - if err != nil { - fmt.Println(err) return nil, err } req.Header.Add("Content-Type", "application/json") diff --git a/contract-watcher/internal/terra/client.go b/contract-watcher/internal/terra/client.go new file mode 100644 index 00000000..fc6491a7 --- /dev/null +++ b/contract-watcher/internal/terra/client.go @@ -0,0 +1,180 @@ +package terra + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strconv" + "time" + + "go.uber.org/ratelimit" +) + +// TerraSDK is a client for the Terra blockchain. +type TerraSDK struct { + url string + client *http.Client + rl ratelimit.Limiter +} + +// TerraTrx is a transaction on the Terra blockchain. +type TerraTrx struct { +} + +// NewTerraSDK creates a new TerraSDK. +func NewTerraSDK(url string, rl ratelimit.Limiter) *TerraSDK { + return &TerraSDK{ + url: url, + rl: rl, + client: &http.Client{}, + } +} + +type LastBlockResponse struct { + Block struct { + Header struct { + Height string `json:"height"` + } `json:"header"` + } `json:"block"` +} + +// GetLastBlock returns the last block height. +func (t *TerraSDK) GetLastBlock(ctx context.Context) (int64, error) { + lastBlockURL := fmt.Sprintf("%s/cosmos/base/tendermint/v1beta1/blocks/latest", t.url) + req, err := http.NewRequest(http.MethodGet, lastBlockURL, nil) + if err != nil { + return 0, err + } + req.Header.Add("Content-Type", "application/json") + + t.rl.Take() + + res, err := t.client.Do(req) + if err != nil { + return 0, err + } + defer res.Body.Close() + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return 0, err + } + + var response LastBlockResponse + err = json.Unmarshal(body, &response) + if err != nil { + return 0, err + } + + lastBlockHeight, err := strconv.ParseInt(response.Block.Header.Height, 10, 64) + if err != nil { + return 0, err + } + + return lastBlockHeight, nil +} + +type TxByBlockResponse struct { + Limit int `json:"limit"` + NextOffset *int `json:"next"` + Txs []Tx `json:"txs"` +} + +type Tx struct { + ID int `json:"id"` + Tx struct { + Type string `json:"type"` + Value struct { + Fee struct { + Gas string `json:"gas"` + Amount []struct { + Denom string `json:"denom"` + Amount string `json:"amount"` + } `json:"amount"` + } `json:"fee"` + Msg []struct { + Type string `json:"type"` + Value struct { + Coins []any `json:"coins"` + Sender string `json:"sender"` + Contract string `json:"contract"` + ExecuteMsg struct { + SubmitVaa struct { + Data []byte `json:"data"` + } `json:"submit_vaa"` + } `json:"execute_msg"` + } `json:"value"` + } `json:"msg"` + Memo string `json:"memo"` + Signatures []struct { + PubKey struct { + Type string `json:"type"` + Value string `json:"value"` + } `json:"pub_key"` + Signature string `json:"signature"` + } `json:"signatures"` + TimeoutHeight string `json:"timeout_height"` + } `json:"value"` + } `json:"tx"` + Logs []struct { + Log struct { + Tax string `json:"tax"` + } `json:"log"` + Events []struct { + Type string `json:"type"` + Attributes []struct { + Key string `json:"key"` + Value string `json:"value"` + } `json:"attributes"` + } `json:"events"` + MsgIndex int `json:"msg_index"` + } `json:"logs"` + Code int `json:"code"` + Height string `json:"height"` + Txhash string `json:"txhash"` + RawLog string `json:"raw_log"` + GasUsed string `json:"gas_used"` + Timestamp time.Time `json:"timestamp"` + GasWanted string `json:"gas_wanted"` +} + +// GetTransactionsByBlockHeight returns the transactions for a given block height. +func (t *TerraSDK) GetTransactionsByBlockHeight(ctx context.Context, height int64, offset *int) (*TxByBlockResponse, error) { + transactionsByBlockURL := fmt.Sprintf("%s/v1/txs", t.url) + req, err := http.NewRequest(http.MethodGet, transactionsByBlockURL, nil) + if err != nil { + return nil, err + } + values := req.URL.Query() + values.Add("block", strconv.FormatInt(height, 10)) + values.Add("limit", "100") + if offset != nil { + values.Add("offset", strconv.Itoa(*offset)) + } + req.URL.RawQuery = values.Encode() + + req.Header.Add("Content-Type", "application/json") + + t.rl.Take() + + res, err := t.client.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, err + } + + var response TxByBlockResponse + err = json.Unmarshal(body, &response) + if err != nil { + return nil, err + } + + return &response, nil +} diff --git a/contract-watcher/watcher/evm_watcher.go b/contract-watcher/watcher/evm_watcher.go index e219ee1f..fc963520 100644 --- a/contract-watcher/watcher/evm_watcher.go +++ b/contract-watcher/watcher/evm_watcher.go @@ -278,7 +278,6 @@ func (w *EVMWatcher) getMethodByInput(input string) string { return MethodUpdateWrapped default: return MethodUnkown - } } diff --git a/contract-watcher/watcher/terra_watcher.go b/contract-watcher/watcher/terra_watcher.go new file mode 100644 index 00000000..c499cdde --- /dev/null +++ b/contract-watcher/watcher/terra_watcher.go @@ -0,0 +1,253 @@ +package watcher + +import ( + "context" + "errors" + "net/http" + "strconv" + "sync" + "time" + + "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/terra" + "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/storage" + "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.uber.org/zap" +) + +// Terra action methods. +const ( + MethodDepositTokens = "deposit_tokens" + MethodWithdrawTokens = "withdraw_tokens" + MethodRegisterAsset = "register_asset" + MethodContractUpgrade = "contract_upgrade" + MethodCompleteWrapped = "complete_transfer_wrapped" + MethodCompleteNative = "complete_transfer_native" + MethodCompleteTerra = "complete_transfer_terra_native" + MethodReplyHandler = "reply_handler" +) + +// Terrawatcher is a watcher for the terra chain. +type TerraWatcher struct { + terraSDK *terra.TerraSDK + chainID vaa.ChainID + blockchain string + contractAddress string + waitSeconds uint16 + initialBlock int64 + client *http.Client + repository *storage.Repository + logger *zap.Logger + close chan bool + wg sync.WaitGroup +} + +// TerraParams are the params for the terra watcher. +type TerraParams struct { + ChainID vaa.ChainID + Blockchain string + ContractAddress string + WaitSeconds uint16 + InitialBlock int64 +} + +// NewTerraWatcher creates a new terra watcher. +func NewTerraWatcher(terraSDK *terra.TerraSDK, params TerraParams, repository *storage.Repository, logger *zap.Logger) *TerraWatcher { + return &TerraWatcher{ + terraSDK: terraSDK, + chainID: params.ChainID, + blockchain: params.Blockchain, + contractAddress: params.ContractAddress, + waitSeconds: params.WaitSeconds, + initialBlock: params.InitialBlock, + client: &http.Client{}, + repository: repository, + logger: logger.With(zap.String("blockchain", params.Blockchain), zap.Uint16("chainId", uint16(params.ChainID))), + } +} + +// Start starts the terra watcher. +func (w *TerraWatcher) Start(ctx context.Context) error { + // get the current block for the chain. + currentBlock, err := w.repository.GetCurrentBlock(ctx, w.blockchain, w.initialBlock) + if err != nil { + w.logger.Error("cannot get current block", zap.Error(err)) + return err + } + + w.wg.Add(1) + for { + select { + case <-ctx.Done(): + w.logger.Info("clossing terra watcher by context") + w.wg.Done() + return nil + case <-w.close: + w.logger.Info("clossing terra watcher") + w.wg.Done() + return nil + default: + // get the latest block for the terra chain. + lastBlock, err := w.terraSDK.GetLastBlock(ctx) + if err != nil { + w.logger.Error("cannot get terra lastblock", zap.Error(err)) + } + + // check if there are new blocks to process. + if currentBlock < lastBlock { + w.logger.Info("processing blocks", zap.Int64("from", currentBlock), zap.Int64("to", lastBlock)) + for block := currentBlock; block <= lastBlock; block++ { + w.processBlock(ctx, block) + // update block watcher + watcherBlock := storage.WatcherBlock{ + ID: w.blockchain, + BlockNumber: block, + UpdatedAt: time.Now(), + } + w.repository.UpdateWatcherBlock(ctx, watcherBlock) + } + } else { + w.logger.Info("waiting for new terra blocks") + select { + case <-ctx.Done(): + w.wg.Done() + return nil + case <-time.After(time.Duration(w.waitSeconds) * time.Second): + } + } + currentBlock = lastBlock + } + } +} + +func (w *TerraWatcher) processBlock(ctx context.Context, block int64) { + var offset *int + hasPage := true + for hasPage { + // get transactions for the block. + transactions, err := w.terraSDK.GetTransactionsByBlockHeight(ctx, block, offset) + if err != nil { + w.logger.Error("cannot get transactions by address", zap.Error(err)) + time.Sleep(10 * time.Second) + continue + } + + // process all the transactions in the block + for _, tx := range transactions.Txs { + + // check transaction contract address + isTokenBridgeContract := w.checkTransactionContractAddress(tx) + if !isTokenBridgeContract { + continue + } + + // check transaction method + supportedMethod, method := w.checkTransactionMethod(tx) + if !supportedMethod { + continue + } + + // get from, to and VAA from transaction message. + from, to, vaa, err := w.getTransactionData(tx) + if err != nil { + w.logger.Error("cannot get transaction data", zap.Error(err), + zap.String("txHash", tx.Txhash), zap.Int64("block", block)) + continue + } + + if vaa == nil { + w.logger.Error("cannot get VAA from transaction", zap.Error(err), + zap.String("txHash", tx.Txhash), zap.Int64("block", block)) + } + + // create global transaction. + updatedAt := time.Now() + globalTx := storage.TransactionUpdate{ + ID: vaa.MessageID(), + Destination: storage.DestinationTx{ + ChainID: w.chainID, + Status: getStatus(tx), + Method: method, + TxHash: tx.Txhash, + From: from, + To: to, + BlockNumber: strconv.Itoa(int(block)), + Timestamp: &tx.Timestamp, + UpdatedAt: &updatedAt, + }, + } + + err = w.repository.UpsertGlobalTransaction(ctx, globalTx) + if err != nil { + w.logger.Error("cannot save globalTransaction", zap.Error(err)) + } else { + w.logger.Info("saved redeemed tx", zap.String("vaa", vaa.MessageID())) + } + } + + if transactions.NextOffset == nil { + hasPage = false + } else { + offset = transactions.NextOffset + } + } +} + +func (w *TerraWatcher) checkTransactionContractAddress(tx terra.Tx) bool { + for _, msg := range tx.Tx.Value.Msg { + if msg.Value.Contract == w.contractAddress { + return true + } + } + return false +} + +// checkTransactionMethod checks the method of the transaction. +// iterate over the logs, events and attributes to find the method. +func (w *TerraWatcher) checkTransactionMethod(tx terra.Tx) (bool, string) { + for _, log := range tx.Logs { + for _, event := range log.Events { + for _, attribute := range event.Attributes { + if attribute.Key == "action" && filterTransactionMethod(attribute.Value) { + return true, attribute.Value + } + } + } + } + return false, "" +} + +// getTransactionData +func (w *TerraWatcher) getTransactionData(tx terra.Tx) (string, string, *vaa.VAA, error) { + for _, msg := range tx.Tx.Value.Msg { + if msg.Value.Contract == w.contractAddress { + // unmarshal vaa + vaa, err := vaa.Unmarshal(msg.Value.ExecuteMsg.SubmitVaa.Data) + if err != nil { + return msg.Value.Sender, msg.Value.Contract, nil, err + } + return msg.Value.Sender, msg.Value.Contract, vaa, nil + } + } + return "", "", nil, errors.New("cannot find transaction data") +} + +func filterTransactionMethod(method string) bool { + switch method { + case MethodCompleteWrapped, MethodCompleteNative, MethodCompleteTerra: + return true + default: + return false + } +} + +func getStatus(tx terra.Tx) string { + if tx.Code == 0 { + return TxStatusConfirmed + } + return TxStatusFailedToProcess +} + +func (w *TerraWatcher) Close() { + close(w.close) + w.wg.Wait() +} diff --git a/deploy/contract-watcher/contract-watcher-service.yaml b/deploy/contract-watcher/contract-watcher-service.yaml index 5ce51fae..2eab018a 100644 --- a/deploy/contract-watcher/contract-watcher-service.yaml +++ b/deploy/contract-watcher/contract-watcher-service.yaml @@ -67,6 +67,11 @@ spec: secretKeyRef: name: blockchain key: solana-url + - name: TERRA_URL + valueFrom: + secretKeyRef: + name: blockchain + key: terra-url resources: limits: memory: {{ .RESOURCES_LIMITS_MEMORY }} diff --git a/deploy/contract-watcher/env/production.env b/deploy/contract-watcher/env/production.env index 87395db2..2cea259b 100644 --- a/deploy/contract-watcher/env/production.env +++ b/deploy/contract-watcher/env/production.env @@ -11,3 +11,5 @@ P2P_NETWORK=mainnet PPROF_ENABLED=false ANKR_URL= SOLANA_URL= +TERRA_URL= + diff --git a/deploy/contract-watcher/env/staging.env b/deploy/contract-watcher/env/staging.env index 0c0cd225..7614f3af 100644 --- a/deploy/contract-watcher/env/staging.env +++ b/deploy/contract-watcher/env/staging.env @@ -11,3 +11,4 @@ P2P_NETWORK=mainnet PPROF_ENABLED=true ANKR_URL= SOLANA_URL= +TERRA_URL= diff --git a/deploy/contract-watcher/env/test.env b/deploy/contract-watcher/env/test.env index 59d16ec0..969df610 100644 --- a/deploy/contract-watcher/env/test.env +++ b/deploy/contract-watcher/env/test.env @@ -11,3 +11,4 @@ P2P_NETWORK=testnet PPROF_ENABLED=false ANKR_URL= SOLANA_URL= +TERRA_URL= diff --git a/deploy/contract-watcher/secrets.yaml b/deploy/contract-watcher/secrets.yaml index 57f6fa4b..5c6a410b 100644 --- a/deploy/contract-watcher/secrets.yaml +++ b/deploy/contract-watcher/secrets.yaml @@ -7,4 +7,5 @@ metadata: data: ankr-url: {{ .ANKR_URL | b64enc }} solana-url: {{ .SOLANA_URL | b64enc }} + terra-url: {{ .TERRA_URL | b64enc }} type: Opaque