2020-11-16 04:28:07 -08:00
package terra
import (
"context"
2021-12-23 14:59:32 -08:00
"encoding/base64"
2020-11-16 04:28:07 -08:00
"encoding/hex"
"fmt"
2021-08-26 01:35:09 -07:00
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
2021-07-23 07:06:35 -07:00
"github.com/prometheus/client_golang/prometheus/promauto"
2020-11-16 04:28:07 -08:00
"io/ioutil"
"net/http"
2021-12-23 14:59:32 -08:00
"strconv"
2020-11-16 04:28:07 -08:00
"time"
2021-02-04 10:48:54 -08:00
"github.com/prometheus/client_golang/prometheus"
2020-11-16 04:28:07 -08:00
eth_common "github.com/ethereum/go-ethereum/common"
2021-08-26 01:35:09 -07:00
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/readiness"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/vaa"
2020-11-16 04:28:07 -08:00
"github.com/gorilla/websocket"
"github.com/tidwall/gjson"
"go.uber.org/zap"
)
type (
2021-08-30 07:19:00 -07:00
// Watcher is responsible for looking over Terra blockchain and reporting new transactions to the contract
Watcher struct {
urlWS string
urlLCD string
contract string
2020-11-16 04:28:07 -08:00
2021-06-29 04:55:44 -07:00
msgChan chan * common . MessagePublication
setChan chan * common . GuardianSet
2022-02-09 09:54:33 -08:00
// Incoming re-observation requests from the network. Pre-filtered to only
// include requests for our chainID.
obsvReqC chan * gossipv1 . ObservationRequest
2020-11-16 04:28:07 -08:00
}
)
2021-02-04 05:20:49 -08:00
var (
2021-07-23 07:06:35 -07:00
terraConnectionErrors = promauto . NewCounterVec (
2021-02-04 05:20:49 -08:00
prometheus . CounterOpts {
Name : "wormhole_terra_connection_errors_total" ,
Help : "Total number of Terra connection errors" ,
} , [ ] string { "reason" } )
2021-07-23 07:06:35 -07:00
terraMessagesConfirmed = promauto . NewCounter (
2021-02-04 05:20:49 -08:00
prometheus . CounterOpts {
2021-07-21 10:46:10 -07:00
Name : "wormhole_terra_messages_confirmed_total" ,
Help : "Total number of verified terra messages found" ,
2021-02-04 05:20:49 -08:00
} )
2021-07-23 07:06:35 -07:00
currentTerraHeight = promauto . NewGauge (
2021-02-04 05:20:49 -08:00
prometheus . GaugeOpts {
Name : "wormhole_terra_current_height" ,
2021-07-21 10:46:10 -07:00
Help : "Current terra slot height (at default commitment level, not the level used for observations)" ,
2021-02-04 05:20:49 -08:00
} )
2021-07-23 07:06:35 -07:00
queryLatency = promauto . NewHistogramVec (
2021-02-04 05:20:49 -08:00
prometheus . HistogramOpts {
Name : "wormhole_terra_query_latency" ,
Help : "Latency histogram for terra RPC calls" ,
} , [ ] string { "operation" } )
)
2020-11-16 04:28:07 -08:00
type clientRequest struct {
JSONRPC string ` json:"jsonrpc" `
// A String containing the name of the method to be invoked.
Method string ` json:"method" `
// Object to pass as request parameter to the method.
Params [ 1 ] string ` json:"params" `
// The request id. This can be of any type. It is used to match the
// response with the request that it is replying to.
ID uint64 ` json:"id" `
}
2021-08-30 07:19:00 -07:00
// NewWatcher creates a new Terra contract watcher
2022-02-09 09:54:33 -08:00
func NewWatcher (
urlWS string ,
urlLCD string ,
contract string ,
lockEvents chan * common . MessagePublication ,
setEvents chan * common . GuardianSet ,
obsvReqC chan * gossipv1 . ObservationRequest ) * Watcher {
return & Watcher { urlWS : urlWS , urlLCD : urlLCD , contract : contract , msgChan : lockEvents , setChan : setEvents , obsvReqC : obsvReqC }
2020-11-16 04:28:07 -08:00
}
2021-08-30 07:19:00 -07:00
func ( e * Watcher ) Run ( ctx context . Context ) error {
2021-02-09 17:00:45 -08:00
p2p . DefaultRegistry . SetNetworkStats ( vaa . ChainIDTerra , & gossipv1 . Heartbeat_Network {
2021-08-30 07:19:00 -07:00
ContractAddress : e . contract ,
2021-02-09 17:00:45 -08:00
} )
2020-11-16 04:28:07 -08:00
errC := make ( chan error )
logger := supervisor . Logger ( ctx )
2020-11-16 07:59:58 -08:00
logger . Info ( "connecting to websocket" , zap . String ( "url" , e . urlWS ) )
2020-11-16 04:28:07 -08:00
2020-11-16 07:59:58 -08:00
c , _ , err := websocket . DefaultDialer . DialContext ( ctx , e . urlWS , nil )
2020-11-16 04:28:07 -08:00
if err != nil {
2021-08-08 08:16:41 -07:00
p2p . DefaultRegistry . AddErrorCount ( vaa . ChainIDTerra , 1 )
2021-02-04 05:20:49 -08:00
terraConnectionErrors . WithLabelValues ( "websocket_dial_error" ) . Inc ( )
2020-11-16 07:59:58 -08:00
return fmt . Errorf ( "websocket dial failed: %w" , err )
2020-11-16 04:28:07 -08:00
}
defer c . Close ( )
// Subscribe to smart contract transactions
2021-08-30 07:19:00 -07:00
params := [ ... ] string { fmt . Sprintf ( "tm.event='Tx' AND execute_contract.contract_address='%s'" , e . contract ) }
2020-11-16 04:28:07 -08:00
command := & clientRequest {
JSONRPC : "2.0" ,
Method : "subscribe" ,
Params : params ,
ID : 1 ,
}
err = c . WriteJSON ( command )
if err != nil {
2021-08-08 08:16:41 -07:00
p2p . DefaultRegistry . AddErrorCount ( vaa . ChainIDTerra , 1 )
2021-02-04 05:20:49 -08:00
terraConnectionErrors . WithLabelValues ( "websocket_subscription_error" ) . Inc ( )
2020-11-16 04:28:07 -08:00
return fmt . Errorf ( "websocket subscription failed: %w" , err )
}
// Wait for the success response
_ , _ , err = c . ReadMessage ( )
if err != nil {
2021-08-08 08:16:41 -07:00
p2p . DefaultRegistry . AddErrorCount ( vaa . ChainIDTerra , 1 )
2021-02-04 05:20:49 -08:00
terraConnectionErrors . WithLabelValues ( "event_subscription_error" ) . Inc ( )
2020-11-16 04:28:07 -08:00
return fmt . Errorf ( "event subscription failed: %w" , err )
}
2020-11-16 04:23:29 -08:00
logger . Info ( "subscribed to new transaction events" )
2020-11-16 04:28:07 -08:00
2020-11-29 04:30:18 -08:00
readiness . SetReady ( common . ReadinessTerraSyncing )
2020-11-27 15:46:37 -08:00
2021-02-05 06:16:31 -08:00
go func ( ) {
t := time . NewTicker ( 5 * time . Second )
client := & http . Client {
Timeout : time . Second * 5 ,
}
for {
<- t . C
// Query and report height and set currentTerraHeight
resp , err := client . Get ( fmt . Sprintf ( "%s/blocks/latest" , e . urlLCD ) )
if err != nil {
logger . Error ( "query latest block response error" , zap . Error ( err ) )
continue
}
blocksBody , err := ioutil . ReadAll ( resp . Body )
if err != nil {
2022-02-09 09:54:33 -08:00
logger . Error ( "query latest block response read error" , zap . Error ( err ) )
2021-02-05 06:16:31 -08:00
errC <- err
resp . Body . Close ( )
continue
}
resp . Body . Close ( )
blockJSON := string ( blocksBody )
latestBlock := gjson . Get ( blockJSON , "block.header.height" )
logger . Info ( "current Terra height" , zap . Int64 ( "block" , latestBlock . Int ( ) ) )
currentTerraHeight . Set ( float64 ( latestBlock . Int ( ) ) )
2021-02-09 17:00:45 -08:00
p2p . DefaultRegistry . SetNetworkStats ( vaa . ChainIDTerra , & gossipv1 . Heartbeat_Network {
2021-12-23 14:59:32 -08:00
Height : latestBlock . Int ( ) ,
2021-08-30 07:19:00 -07:00
ContractAddress : e . contract ,
2021-02-09 17:00:45 -08:00
} )
2021-02-05 06:16:31 -08:00
}
} ( )
2022-02-09 09:54:33 -08:00
go func ( ) {
for {
select {
case <- ctx . Done ( ) :
return
case r := <- e . obsvReqC :
if vaa . ChainID ( r . ChainId ) != vaa . ChainIDTerra {
panic ( "invalid chain ID" )
}
tx := hex . EncodeToString ( r . TxHash )
logger . Info ( "received observation request for terra" ,
zap . String ( "tx_hash" , tx ) )
client := & http . Client {
Timeout : time . Second * 5 ,
}
// Query for tx by hash
resp , err := client . Get ( fmt . Sprintf ( "%s/cosmos/tx/v1beta1/txs/%s" , e . urlLCD , tx ) )
if err != nil {
logger . Error ( "query tx response error" , zap . Error ( err ) )
continue
}
txBody , err := ioutil . ReadAll ( resp . Body )
if err != nil {
logger . Error ( "query tx response read error" , zap . Error ( err ) )
resp . Body . Close ( )
continue
}
resp . Body . Close ( )
txJSON := string ( txBody )
txHashRaw := gjson . Get ( txJSON , "tx_response.txhash" )
if ! txHashRaw . Exists ( ) {
logger . Error ( "terra tx does not have tx hash" , zap . String ( "payload" , txJSON ) )
continue
}
txHash := txHashRaw . String ( )
events := gjson . Get ( txJSON , "tx_response.events" )
if ! events . Exists ( ) {
logger . Error ( "terra tx has no events" , zap . String ( "payload" , txJSON ) )
continue
}
msgs := EventsToMessagePublications ( e . contract , txHash , events . Array ( ) , logger )
for _ , msg := range msgs {
e . msgChan <- msg
terraMessagesConfirmed . Inc ( )
}
}
}
} ( )
2020-11-16 04:28:07 -08:00
go func ( ) {
defer close ( errC )
for {
_ , message , err := c . ReadMessage ( )
if err != nil {
2021-08-08 08:16:41 -07:00
p2p . DefaultRegistry . AddErrorCount ( vaa . ChainIDTerra , 1 )
2021-02-04 05:20:49 -08:00
terraConnectionErrors . WithLabelValues ( "channel_read_error" ) . Inc ( )
2020-11-16 04:28:07 -08:00
logger . Error ( "error reading channel" , zap . Error ( err ) )
errC <- err
return
}
// Received a message from the blockchain
json := string ( message )
2021-12-23 14:59:32 -08:00
txHashRaw := gjson . Get ( json , "result.events.tx\\.hash.0" )
if ! txHashRaw . Exists ( ) {
logger . Warn ( "terra message does not have tx hash" , zap . String ( "payload" , json ) )
continue
}
txHash := txHashRaw . String ( )
events := gjson . Get ( json , "result.data.value.TxResult.result.events" )
if ! events . Exists ( ) {
logger . Warn ( "terra message has no events" , zap . String ( "payload" , json ) )
continue
}
2022-02-09 09:54:33 -08:00
msgs := EventsToMessagePublications ( e . contract , txHash , events . Array ( ) , logger )
for _ , msg := range msgs {
e . msgChan <- msg
2021-07-21 10:46:10 -07:00
terraMessagesConfirmed . Inc ( )
2020-11-16 04:28:07 -08:00
}
2021-02-04 10:48:54 -08:00
client := & http . Client {
Timeout : time . Second * 15 ,
}
2021-02-04 05:20:49 -08:00
2020-11-16 04:28:07 -08:00
// Query and report guardian set status
2021-08-30 07:19:00 -07:00
requestURL := fmt . Sprintf ( "%s/wasm/contracts/%s/store?query_msg={\"guardian_set_info\":{}}" , e . urlLCD , e . contract )
2020-11-16 04:28:07 -08:00
req , err := http . NewRequestWithContext ( ctx , http . MethodGet , requestURL , nil )
if err != nil {
2021-08-08 08:16:41 -07:00
p2p . DefaultRegistry . AddErrorCount ( vaa . ChainIDTerra , 1 )
2021-02-04 05:20:49 -08:00
terraConnectionErrors . WithLabelValues ( "guardian_set_req_error" ) . Inc ( )
2020-11-16 04:28:07 -08:00
logger . Error ( "query guardian set request error" , zap . Error ( err ) )
errC <- err
return
}
2020-11-16 04:23:29 -08:00
2021-02-04 05:20:49 -08:00
msm := time . Now ( )
2020-11-16 04:28:07 -08:00
resp , err := client . Do ( req )
if err != nil {
2021-08-08 08:16:41 -07:00
p2p . DefaultRegistry . AddErrorCount ( vaa . ChainIDTerra , 1 )
2020-11-16 04:28:07 -08:00
logger . Error ( "query guardian set response error" , zap . Error ( err ) )
errC <- err
return
}
body , err := ioutil . ReadAll ( resp . Body )
2021-02-04 05:20:49 -08:00
queryLatency . WithLabelValues ( "guardian_set_info" ) . Observe ( time . Since ( msm ) . Seconds ( ) )
2020-11-16 04:28:07 -08:00
if err != nil {
2021-08-08 08:16:41 -07:00
p2p . DefaultRegistry . AddErrorCount ( vaa . ChainIDTerra , 1 )
2020-11-16 04:28:07 -08:00
logger . Error ( "query guardian set error" , zap . Error ( err ) )
errC <- err
resp . Body . Close ( )
return
}
2020-11-16 04:23:29 -08:00
2020-11-16 04:28:07 -08:00
json = string ( body )
guardianSetIndex := gjson . Get ( json , "result.guardian_set_index" )
addresses := gjson . Get ( json , "result.addresses.#.bytes" )
2020-11-16 04:23:29 -08:00
logger . Debug ( "current guardian set on Terra" ,
2020-11-16 04:28:07 -08:00
zap . Any ( "guardianSetIndex" , guardianSetIndex ) ,
zap . Any ( "addresses" , addresses ) )
resp . Body . Close ( )
2020-11-16 04:23:29 -08:00
// We do not send guardian changes to the processor - ETH guardians are the source of truth.
2020-11-16 04:28:07 -08:00
}
} ( )
select {
case <- ctx . Done ( ) :
err := c . WriteMessage ( websocket . CloseMessage , websocket . FormatCloseMessage ( websocket . CloseNormalClosure , "" ) )
if err != nil {
logger . Error ( "error on closing socket " , zap . Error ( err ) )
}
return ctx . Err ( )
case err := <- errC :
return err
}
}
2022-02-09 09:54:33 -08:00
func EventsToMessagePublications ( contract string , txHash string , events [ ] gjson . Result , logger * zap . Logger ) [ ] * common . MessagePublication {
msgs := make ( [ ] * common . MessagePublication , 0 , len ( events ) )
for _ , event := range events {
if ! event . IsObject ( ) {
logger . Warn ( "terra event is invalid" , zap . String ( "tx_hash" , txHash ) , zap . String ( "event" , event . String ( ) ) )
continue
}
eventType := gjson . Get ( event . String ( ) , "type" )
if eventType . String ( ) != "wasm" {
continue
}
attributes := gjson . Get ( event . String ( ) , "attributes" )
if ! attributes . Exists ( ) {
logger . Warn ( "terra message event has no attributes" , zap . String ( "tx_hash" , txHash ) , zap . String ( "event" , event . String ( ) ) )
continue
}
mappedAttributes := map [ string ] string { }
for _ , attribute := range attributes . Array ( ) {
if ! attribute . IsObject ( ) {
logger . Warn ( "terra event attribute is invalid" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attribute" , attribute . String ( ) ) )
continue
}
keyBase := gjson . Get ( attribute . String ( ) , "key" )
if ! keyBase . Exists ( ) {
logger . Warn ( "terra event attribute does not have key" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attribute" , attribute . String ( ) ) )
continue
}
valueBase := gjson . Get ( attribute . String ( ) , "value" )
if ! valueBase . Exists ( ) {
logger . Warn ( "terra event attribute does not have value" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attribute" , attribute . String ( ) ) )
continue
}
key , err := base64 . StdEncoding . DecodeString ( keyBase . String ( ) )
if err != nil {
logger . Warn ( "terra event key attribute is invalid" , zap . String ( "tx_hash" , txHash ) , zap . String ( "key" , keyBase . String ( ) ) )
continue
}
value , err := base64 . StdEncoding . DecodeString ( valueBase . String ( ) )
if err != nil {
logger . Warn ( "terra event value attribute is invalid" , zap . String ( "tx_hash" , txHash ) , zap . String ( "key" , keyBase . String ( ) ) , zap . String ( "value" , valueBase . String ( ) ) )
continue
}
if _ , ok := mappedAttributes [ string ( key ) ] ; ok {
logger . Debug ( "duplicate key in events" , zap . String ( "tx_hash" , txHash ) , zap . String ( "key" , keyBase . String ( ) ) , zap . String ( "value" , valueBase . String ( ) ) )
continue
}
mappedAttributes [ string ( key ) ] = string ( value )
}
contractAddress , ok := mappedAttributes [ "contract_address" ]
if ! ok {
logger . Warn ( "terra wasm event without contract address field set" , zap . String ( "event" , event . String ( ) ) )
continue
}
// This is not a wormhole message
if contractAddress != contract {
continue
}
payload , ok := mappedAttributes [ "message.message" ]
if ! ok {
logger . Error ( "wormhole event does not have a message field" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attributes" , attributes . String ( ) ) )
continue
}
sender , ok := mappedAttributes [ "message.sender" ]
if ! ok {
logger . Error ( "wormhole event does not have a sender field" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attributes" , attributes . String ( ) ) )
continue
}
chainId , ok := mappedAttributes [ "message.chain_id" ]
if ! ok {
logger . Error ( "wormhole event does not have a chain_id field" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attributes" , attributes . String ( ) ) )
continue
}
nonce , ok := mappedAttributes [ "message.nonce" ]
if ! ok {
logger . Error ( "wormhole event does not have a nonce field" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attributes" , attributes . String ( ) ) )
continue
}
sequence , ok := mappedAttributes [ "message.sequence" ]
if ! ok {
logger . Error ( "wormhole event does not have a sequence field" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attributes" , attributes . String ( ) ) )
continue
}
blockTime , ok := mappedAttributes [ "message.block_time" ]
if ! ok {
logger . Error ( "wormhole event does not have a block_time field" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attributes" , attributes . String ( ) ) )
continue
}
logger . Info ( "new message detected on terra" ,
zap . String ( "chainId" , chainId ) ,
zap . String ( "txHash" , txHash ) ,
zap . String ( "sender" , sender ) ,
zap . String ( "nonce" , nonce ) ,
zap . String ( "sequence" , sequence ) ,
zap . String ( "blockTime" , blockTime ) ,
)
senderAddress , err := StringToAddress ( sender )
if err != nil {
logger . Error ( "cannot decode emitter hex" , zap . String ( "tx_hash" , txHash ) , zap . String ( "value" , sender ) )
continue
}
txHashValue , err := StringToHash ( txHash )
if err != nil {
logger . Error ( "cannot decode tx hash hex" , zap . String ( "tx_hash" , txHash ) , zap . String ( "value" , txHash ) )
continue
}
payloadValue , err := hex . DecodeString ( payload )
if err != nil {
logger . Error ( "cannot decode payload" , zap . String ( "tx_hash" , txHash ) , zap . String ( "value" , payload ) )
continue
}
blockTimeInt , err := strconv . ParseInt ( blockTime , 10 , 64 )
if err != nil {
logger . Error ( "blocktime cannot be parsed as int" , zap . String ( "tx_hash" , txHash ) , zap . String ( "value" , blockTime ) )
continue
}
nonceInt , err := strconv . ParseUint ( nonce , 10 , 32 )
if err != nil {
logger . Error ( "nonce cannot be parsed as int" , zap . String ( "tx_hash" , txHash ) , zap . String ( "value" , blockTime ) )
continue
}
sequenceInt , err := strconv . ParseUint ( sequence , 10 , 64 )
if err != nil {
logger . Error ( "sequence cannot be parsed as int" , zap . String ( "tx_hash" , txHash ) , zap . String ( "value" , blockTime ) )
continue
}
messagePublication := & common . MessagePublication {
TxHash : txHashValue ,
Timestamp : time . Unix ( blockTimeInt , 0 ) ,
Nonce : uint32 ( nonceInt ) ,
Sequence : sequenceInt ,
EmitterChain : vaa . ChainIDTerra ,
EmitterAddress : senderAddress ,
Payload : payloadValue ,
ConsistencyLevel : 0 , // Instant finality
}
msgs = append ( msgs , messagePublication )
}
return msgs
}
2020-11-16 04:28:07 -08:00
// StringToAddress convert string into address
func StringToAddress ( value string ) ( vaa . Address , error ) {
var address vaa . Address
res , err := hex . DecodeString ( value )
if err != nil {
return address , err
}
copy ( address [ : ] , res )
return address , nil
}
// StringToHash convert string into transaction hash
func StringToHash ( value string ) ( eth_common . Hash , error ) {
var hash eth_common . Hash
res , err := hex . DecodeString ( value )
if err != nil {
return hash , err
}
copy ( hash [ : ] , res )
return hash , nil
}