2022-02-21 14:43:40 -08:00
package main
import (
"context"
"encoding/base64"
"encoding/hex"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"strconv"
"time"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
"go.uber.org/zap"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
"github.com/certusone/wormhole/node/pkg/terra"
"github.com/certusone/wormhole/node/pkg/vaa"
"github.com/tidwall/gjson"
2022-05-13 10:08:41 -07:00
"golang.org/x/time/rate"
2022-02-21 14:43:40 -08:00
"google.golang.org/grpc"
)
2022-06-16 12:17:43 -07:00
var fcdMap = map [ vaa . ChainID ] string {
vaa . ChainIDTerra : "https://fcd.terra.dev" ,
vaa . ChainIDTerra2 : "https://phoenix-fcd.terra.dev" ,
}
var coreContractMap = map [ vaa . ChainID ] string {
vaa . ChainIDTerra : "terra1dq03ugtd40zu9hcgdzrsq6z2z4hwhc9tqk2uy5" ,
}
var emitterMap = map [ vaa . ChainID ] string {
vaa . ChainIDTerra : "0000000000000000000000007cf7b764e38a0a5e967972c1df77d432510564e2" ,
}
type Emitter struct {
ChainID vaa . ChainID
Emitter string
}
2022-02-21 14:43:40 -08:00
var (
2022-06-16 12:17:43 -07:00
adminRPC = flag . String ( "adminRPC" , "/run/guardiand/admin.socket" , "Admin RPC address" )
chain = flag . String ( "chain" , "terra" , "CosmWasm Chain name" )
dryRun = flag . Bool ( "dryRun" , true , "Dry run" )
sleepTime = flag . Int ( "sleepTime" , 1 , "Time to sleep between http requests" )
2022-02-21 14:43:40 -08:00
)
func getAdminClient ( ctx context . Context , addr string ) ( * grpc . ClientConn , error , nodev1 . NodePrivilegedServiceClient ) {
conn , err := grpc . DialContext ( ctx , fmt . Sprintf ( "unix:///%s" , addr ) , grpc . WithInsecure ( ) )
if err != nil {
log . Fatalf ( "failed to connect to %s: %v" , addr , err )
}
c := nodev1 . NewNodePrivilegedServiceClient ( conn )
return conn , err , c
}
2022-06-16 12:17:43 -07:00
func getSequencesForTxhash ( txhash string , fcd string , contractAddressLogKey string , coreContract string , emitter Emitter ) ( [ ] uint64 , error ) {
2022-05-12 17:00:16 -07:00
client := & http . Client {
Timeout : time . Second * 5 ,
2022-02-21 14:43:40 -08:00
}
2022-06-16 12:17:43 -07:00
url := fmt . Sprintf ( "%s/cosmos/tx/v1beta1/txs/%s" , fcd , txhash )
2022-02-21 14:43:40 -08:00
resp , err := client . Get ( url )
if err != nil {
2022-05-12 17:00:16 -07:00
return [ ] uint64 { } , fmt . Errorf ( "failed to get message: %w" , err )
2022-02-21 14:43:40 -08:00
}
defer resp . Body . Close ( )
txBody , err := ioutil . ReadAll ( resp . Body )
if err != nil {
2022-05-12 17:00:16 -07:00
return [ ] uint64 { } , fmt . Errorf ( "failed to read message: %w" , err )
2022-02-21 14:43:40 -08:00
}
txJSON := string ( txBody )
if ! gjson . Valid ( txJSON ) {
2022-05-12 17:00:16 -07:00
return [ ] uint64 { } , fmt . Errorf ( "invalid JSON response" )
2022-02-21 14:43:40 -08:00
}
txHashRaw := gjson . Get ( txJSON , "tx_response.txhash" )
if ! txHashRaw . Exists ( ) {
2022-05-12 17:00:16 -07:00
return [ ] uint64 { } , fmt . Errorf ( "terra tx does not have tx hash" )
2022-02-21 14:43:40 -08:00
}
txHash := txHashRaw . String ( )
events := gjson . Get ( txJSON , "tx_response.events" )
if ! events . Exists ( ) {
2022-05-12 17:00:16 -07:00
return [ ] uint64 { } , fmt . Errorf ( "terra tx has no events" )
2022-02-21 14:43:40 -08:00
}
2022-06-16 12:17:43 -07:00
msgs := EventsToMessagePublications ( coreContract , txHash , events . Array ( ) , contractAddressLogKey )
2022-02-21 14:43:40 -08:00
// Should only ever be 1 message. Stole the above function from watcher.go
2022-05-12 17:00:16 -07:00
var sequences = [ ] uint64 { }
for _ , msg := range msgs {
2022-06-16 12:17:43 -07:00
tokenBridgeEmitter , err := vaa . StringToAddress ( emitter . Emitter )
2022-05-13 10:09:20 -07:00
if err != nil {
2022-06-16 12:17:43 -07:00
log . Fatalf ( "Terra emitter address is not valid: %s" , emitter . Emitter )
2022-05-13 10:09:20 -07:00
}
if msg . EmitterAddress == tokenBridgeEmitter {
sequences = append ( sequences , msg . Sequence )
}
2022-02-21 14:43:40 -08:00
}
2022-05-12 17:00:16 -07:00
return sequences , nil
2022-02-21 14:43:40 -08:00
}
// This was stolen from pkg/terra/watcher.go
2022-06-16 12:17:43 -07:00
func EventsToMessagePublications ( contract string , txHash string , events [ ] gjson . Result , contractAddressLogKey string ) [ ] * common . MessagePublication {
2022-02-21 14:43:40 -08:00
msgs := make ( [ ] * common . MessagePublication , 0 , len ( events ) )
for _ , event := range events {
if ! event . IsObject ( ) {
log . Println ( "terra event is invalid" , zap . String ( "tx_hash" , txHash ) , zap . String ( "event" , event . String ( ) ) )
continue
}
eventType := gjson . Get ( event . String ( ) , "type" )
if eventType . String ( ) != "wasm" {
continue
}
attributes := gjson . Get ( event . String ( ) , "attributes" )
if ! attributes . Exists ( ) {
log . Println ( "terra message event has no attributes" , zap . String ( "tx_hash" , txHash ) , zap . String ( "event" , event . String ( ) ) )
continue
}
mappedAttributes := map [ string ] string { }
for _ , attribute := range attributes . Array ( ) {
if ! attribute . IsObject ( ) {
log . Println ( "terra event attribute is invalid" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attribute" , attribute . String ( ) ) )
continue
}
keyBase := gjson . Get ( attribute . String ( ) , "key" )
if ! keyBase . Exists ( ) {
log . Println ( "terra event attribute does not have key" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attribute" , attribute . String ( ) ) )
continue
}
valueBase := gjson . Get ( attribute . String ( ) , "value" )
if ! valueBase . Exists ( ) {
log . Println ( "terra event attribute does not have value" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attribute" , attribute . String ( ) ) )
continue
}
key , err := base64 . StdEncoding . DecodeString ( keyBase . String ( ) )
if err != nil {
log . Println ( "terra event key attribute is invalid" , zap . String ( "tx_hash" , txHash ) , zap . String ( "key" , keyBase . String ( ) ) )
continue
}
value , err := base64 . StdEncoding . DecodeString ( valueBase . String ( ) )
if err != nil {
log . Println ( "terra event value attribute is invalid" , zap . String ( "tx_hash" , txHash ) , zap . String ( "key" , keyBase . String ( ) ) , zap . String ( "value" , valueBase . String ( ) ) )
continue
}
if _ , ok := mappedAttributes [ string ( key ) ] ; ok {
log . Println ( "duplicate key in events" , zap . String ( "tx_hash" , txHash ) , zap . String ( "key" , keyBase . String ( ) ) , zap . String ( "value" , valueBase . String ( ) ) )
continue
}
mappedAttributes [ string ( key ) ] = string ( value )
}
2022-06-16 12:17:43 -07:00
contractAddress , ok := mappedAttributes [ contractAddressLogKey ]
2022-02-21 14:43:40 -08:00
if ! ok {
log . Println ( "terra wasm event without contract address field set" , zap . String ( "event" , event . String ( ) ) )
continue
}
// This is not a wormhole message
if contractAddress != contract {
continue
}
payload , ok := mappedAttributes [ "message.message" ]
if ! ok {
log . Println ( "wormhole event does not have a message field" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attributes" , attributes . String ( ) ) )
continue
}
sender , ok := mappedAttributes [ "message.sender" ]
if ! ok {
log . Println ( "wormhole event does not have a sender field" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attributes" , attributes . String ( ) ) )
continue
}
nonce , ok := mappedAttributes [ "message.nonce" ]
if ! ok {
log . Println ( "wormhole event does not have a nonce field" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attributes" , attributes . String ( ) ) )
continue
}
sequence , ok := mappedAttributes [ "message.sequence" ]
if ! ok {
log . Println ( "wormhole event does not have a sequence field" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attributes" , attributes . String ( ) ) )
continue
}
blockTime , ok := mappedAttributes [ "message.block_time" ]
if ! ok {
log . Println ( "wormhole event does not have a block_time field" , zap . String ( "tx_hash" , txHash ) , zap . String ( "attributes" , attributes . String ( ) ) )
continue
}
senderAddress , err := terra . StringToAddress ( sender )
if err != nil {
log . Println ( "cannot decode emitter hex" , zap . String ( "tx_hash" , txHash ) , zap . String ( "value" , sender ) )
continue
}
txHashValue , err := terra . StringToHash ( txHash )
if err != nil {
log . Println ( "cannot decode tx hash hex" , zap . String ( "tx_hash" , txHash ) , zap . String ( "value" , txHash ) )
continue
}
payloadValue , err := hex . DecodeString ( payload )
if err != nil {
log . Println ( "cannot decode payload" , zap . String ( "tx_hash" , txHash ) , zap . String ( "value" , payload ) )
continue
}
blockTimeInt , err := strconv . ParseInt ( blockTime , 10 , 64 )
if err != nil {
log . Println ( "blocktime cannot be parsed as int" , zap . String ( "tx_hash" , txHash ) , zap . String ( "value" , blockTime ) )
continue
}
nonceInt , err := strconv . ParseUint ( nonce , 10 , 32 )
if err != nil {
log . Println ( "nonce cannot be parsed as int" , zap . String ( "tx_hash" , txHash ) , zap . String ( "value" , blockTime ) )
continue
}
sequenceInt , err := strconv . ParseUint ( sequence , 10 , 64 )
if err != nil {
log . Println ( "sequence cannot be parsed as int" , zap . String ( "tx_hash" , txHash ) , zap . String ( "value" , blockTime ) )
continue
}
messagePublication := & common . MessagePublication {
TxHash : txHashValue ,
Timestamp : time . Unix ( blockTimeInt , 0 ) ,
Nonce : uint32 ( nonceInt ) ,
Sequence : sequenceInt ,
EmitterChain : vaa . ChainIDTerra ,
EmitterAddress : senderAddress ,
Payload : payloadValue ,
ConsistencyLevel : 0 , // Instant finality
}
msgs = append ( msgs , messagePublication )
}
return msgs
}
func main ( ) {
flag . Parse ( )
2022-06-16 12:17:43 -07:00
chainID , err := vaa . ChainIDFromString ( * chain )
if err != nil {
log . Fatalf ( "Invalid chain: %v" , err )
}
fcd , ok := fcdMap [ chainID ]
if ! ok {
log . Fatal ( "Unsupported chain: no FCD defined" )
}
coreContract , ok := coreContractMap [ chainID ]
if ! ok {
log . Fatal ( "Unsupported chain: no core contract defined" )
}
emitterAddress , ok := emitterMap [ chainID ]
if ! ok {
log . Fatal ( "Unsupported chain: no emitter defined" )
}
emitter := Emitter { chainID , emitterAddress }
// CosmWasm 1.0.0
contractAddressLogKey := "_contract_address"
if chainID == vaa . ChainIDTerra {
// CosmWasm <1.0.0
contractAddressLogKey = "contract_address"
}
2022-02-21 14:43:40 -08:00
ctx := context . Background ( )
missingMessages := make ( map [ uint64 ] bool )
conn , err , admin := getAdminClient ( ctx , * adminRPC )
defer conn . Close ( )
if err != nil {
log . Fatalf ( "failed to get admin client: %v" , err )
}
2022-06-16 12:17:43 -07:00
log . Printf ( "Requesting missing messages for %s" , emitter . Emitter )
2022-02-21 14:43:40 -08:00
msg := nodev1 . FindMissingMessagesRequest {
2022-06-16 12:17:43 -07:00
EmitterChain : uint32 ( chainID ) ,
EmitterAddress : emitter . Emitter ,
2022-02-21 14:43:40 -08:00
RpcBackfill : true ,
BackfillNodes : common . PublicRPCEndpoints ,
}
resp , err := admin . FindMissingMessages ( ctx , & msg )
if err != nil {
log . Fatalf ( "failed to run find FindMissingMessages RPC: %v" , err )
}
msgs := make ( [ ] * db . VAAID , len ( resp . MissingMessages ) )
for i , id := range resp . MissingMessages {
vId , err := db . VaaIDFromString ( id )
if err != nil {
log . Fatalf ( "failed to parse VAAID: %v" , err )
}
msgs [ i ] = vId
}
if len ( msgs ) == 0 {
2022-06-16 12:17:43 -07:00
log . Printf ( "No missing messages found for %s" , emitter )
2022-02-21 14:43:40 -08:00
return
}
lowest := msgs [ 0 ] . Sequence
highest := msgs [ len ( msgs ) - 1 ] . Sequence
2022-06-16 12:17:43 -07:00
log . Printf ( "Found %d missing messages for %s: %d - %d" , len ( msgs ) , emitter , lowest , highest )
2022-02-21 14:43:40 -08:00
for _ , msg := range msgs {
missingMessages [ msg . Sequence ] = true
}
2022-05-13 10:08:41 -07:00
limiter := rate . NewLimiter ( rate . Every ( time . Duration ( * sleepTime ) * time . Second ) , 1 )
log . Printf ( "Starting search for missing sequence numbers (sleeping %ds between requests)..." , * sleepTime )
2022-02-21 14:43:40 -08:00
offset := 0
2022-05-13 10:08:41 -07:00
2022-02-21 14:43:40 -08:00
var firstTime bool = true
for ( offset > 0 ) || firstTime {
2022-05-13 10:08:41 -07:00
if err := limiter . Wait ( ctx ) ; err != nil {
log . Fatalf ( "failed to wait: %v" , err )
}
2022-02-21 14:43:40 -08:00
firstTime = false
2022-05-13 10:08:41 -07:00
client := & http . Client {
Timeout : time . Second * 5 ,
2022-02-21 14:43:40 -08:00
}
2022-06-16 12:17:43 -07:00
resp , err := client . Get ( fmt . Sprintf ( "%s/v1/txs?offset=%d&limit=100&account=%s" , fcd , offset , coreContract ) )
2022-02-21 14:43:40 -08:00
if err != nil {
log . Fatalf ( "failed to get log: %v" , err )
continue
}
defer resp . Body . Close ( )
blocksBody , err := ioutil . ReadAll ( resp . Body )
if err != nil {
log . Fatalf ( "failed to read log: %v" , err )
continue
}
blockJSON := string ( blocksBody )
if ! gjson . Valid ( blockJSON ) {
log . Println ( "invalid JSON response" )
continue
}
next := gjson . Get ( blockJSON , "next" )
log . Println ( "next block" , next . Int ( ) )
offset = int ( next . Uint ( ) )
// Get the transactions. Should be 100 of them
txs := gjson . Get ( blockJSON , "txs" )
for _ , tx := range txs . Array ( ) {
if ! tx . IsObject ( ) {
log . Fatalln ( "Bad Object" )
continue
}
txhash := gjson . Get ( tx . String ( ) , "txhash" )
// Get sequence number for tx
2022-06-16 12:17:43 -07:00
seqs , err := getSequencesForTxhash ( txhash . String ( ) , fcd , contractAddressLogKey , coreContract , emitter )
2022-02-21 14:43:40 -08:00
if err != nil {
log . Fatalln ( "Failed getting sequence number" , err )
continue
}
2022-05-12 17:00:16 -07:00
for _ , seq := range seqs {
// Check to see if this is a missing sequence number
if ! missingMessages [ seq ] {
continue
}
log . Println ( "txhash" , txhash . String ( ) , "sequence number" , seq )
// send observation request to guardian
if * dryRun {
log . Println ( "Would have sent txhash" , txhash , "to the guardian to re-observe" )
2022-02-21 14:43:40 -08:00
} else {
2022-05-12 17:00:16 -07:00
txHashAsByteArray , err := terra . StringToHash ( txhash . String ( ) )
2022-02-21 14:43:40 -08:00
if err != nil {
2022-05-12 17:00:16 -07:00
log . Fatalln ( "Couldn't decode the txhash" , txhash )
} else {
_ , err = admin . SendObservationRequest ( ctx , & nodev1 . SendObservationRequestRequest {
ObservationRequest : & gossipv1 . ObservationRequest {
2022-06-16 12:17:43 -07:00
ChainId : uint32 ( chainID ) ,
2022-05-12 17:00:16 -07:00
TxHash : txHashAsByteArray . Bytes ( ) ,
} } )
if err != nil {
log . Fatalf ( "SendObservationRequest: %v" , err )
}
2022-02-21 14:43:40 -08:00
}
}
2022-05-12 17:00:16 -07:00
if seq <= uint64 ( lowest ) {
// We are done
log . Println ( "Finished!" )
return
}
2022-02-21 14:43:40 -08:00
}
}
}
}