From 9ed71c00380ffdbc5230533e85ba58b60c28c4b3 Mon Sep 17 00:00:00 2001 From: Evan Gray Date: Wed, 9 Feb 2022 17:54:33 +0000 Subject: [PATCH] node: terra reobservation --- node/cmd/guardiand/node.go | 3 +- node/pkg/terra/watcher.go | 368 ++++++++++++++++++++++--------------- 2 files changed, 224 insertions(+), 147 deletions(-) diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 3a0b8ab96..28ff8d151 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -540,6 +540,7 @@ func runNode(cmd *cobra.Command, args []string) { // Observation request channel for each chain supporting observation requests. chainObsvReqC[vaa.ChainIDSolana] = make(chan *gossipv1.ObservationRequest) chainObsvReqC[vaa.ChainIDEthereum] = make(chan *gossipv1.ObservationRequest) + chainObsvReqC[vaa.ChainIDTerra] = make(chan *gossipv1.ObservationRequest) chainObsvReqC[vaa.ChainIDBSC] = make(chan *gossipv1.ObservationRequest) chainObsvReqC[vaa.ChainIDPolygon] = make(chan *gossipv1.ObservationRequest) chainObsvReqC[vaa.ChainIDAvalanche] = make(chan *gossipv1.ObservationRequest) @@ -694,7 +695,7 @@ func runNode(cmd *cobra.Command, args []string) { // Start Terra watcher only if configured logger.Info("Starting Terra watcher") if err := supervisor.Run(ctx, "terrawatch", - terra.NewWatcher(*terraWS, *terraLCD, *terraContract, lockC, setC).Run); err != nil { + terra.NewWatcher(*terraWS, *terraLCD, *terraContract, lockC, setC, chainObsvReqC[vaa.ChainIDTerra]).Run); err != nil { return err } diff --git a/node/pkg/terra/watcher.go b/node/pkg/terra/watcher.go index 6cee3a081..4ce1be3fd 100644 --- a/node/pkg/terra/watcher.go +++ b/node/pkg/terra/watcher.go @@ -35,6 +35,10 @@ type ( msgChan chan *common.MessagePublication setChan chan *common.GuardianSet + + // Incoming re-observation requests from the network. Pre-filtered to only + // include requests for our chainID. + obsvReqC chan *gossipv1.ObservationRequest } ) @@ -73,8 +77,14 @@ type clientRequest struct { } // NewWatcher creates a new Terra contract watcher -func NewWatcher(urlWS string, urlLCD string, contract string, lockEvents chan *common.MessagePublication, setEvents chan *common.GuardianSet) *Watcher { - return &Watcher{urlWS: urlWS, urlLCD: urlLCD, contract: contract, msgChan: lockEvents, setChan: setEvents} +func NewWatcher( + urlWS string, + urlLCD string, + contract string, + lockEvents chan *common.MessagePublication, + setEvents chan *common.GuardianSet, + obsvReqC chan *gossipv1.ObservationRequest) *Watcher { + return &Watcher{urlWS: urlWS, urlLCD: urlLCD, contract: contract, msgChan: lockEvents, setChan: setEvents, obsvReqC: obsvReqC} } func (e *Watcher) Run(ctx context.Context) error { @@ -138,7 +148,7 @@ func (e *Watcher) Run(ctx context.Context) error { } blocksBody, err := ioutil.ReadAll(resp.Body) if err != nil { - logger.Error("query guardian set error", zap.Error(err)) + logger.Error("query latest block response read error", zap.Error(err)) errC <- err resp.Body.Close() continue @@ -156,6 +166,63 @@ func (e *Watcher) Run(ctx context.Context) error { } }() + go func() { + for { + select { + case <-ctx.Done(): + return + case r := <-e.obsvReqC: + if vaa.ChainID(r.ChainId) != vaa.ChainIDTerra { + panic("invalid chain ID") + } + + tx := hex.EncodeToString(r.TxHash) + + logger.Info("received observation request for terra", + zap.String("tx_hash", tx)) + + client := &http.Client{ + Timeout: time.Second * 5, + } + + // Query for tx by hash + resp, err := client.Get(fmt.Sprintf("%s/cosmos/tx/v1beta1/txs/%s", e.urlLCD, tx)) + if err != nil { + logger.Error("query tx response error", zap.Error(err)) + continue + } + txBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + logger.Error("query tx response read error", zap.Error(err)) + resp.Body.Close() + continue + } + resp.Body.Close() + + txJSON := string(txBody) + + txHashRaw := gjson.Get(txJSON, "tx_response.txhash") + if !txHashRaw.Exists() { + logger.Error("terra tx does not have tx hash", zap.String("payload", txJSON)) + continue + } + txHash := txHashRaw.String() + + events := gjson.Get(txJSON, "tx_response.events") + if !events.Exists() { + logger.Error("terra tx has no events", zap.String("payload", txJSON)) + continue + } + + msgs := EventsToMessagePublications(e.contract, txHash, events.Array(), logger) + for _, msg := range msgs { + e.msgChan <- msg + terraMessagesConfirmed.Inc() + } + } + } + }() + go func() { defer close(errC) @@ -185,149 +252,9 @@ func (e *Watcher) Run(ctx context.Context) error { continue } - for _, event := range events.Array() { - if !event.IsObject() { - logger.Warn("terra event is invalid", zap.String("tx_hash", txHash), zap.String("event", event.String())) - continue - } - eventType := gjson.Get(event.String(), "type") - if eventType.String() != "wasm" { - continue - } - - attributes := gjson.Get(event.String(), "attributes") - if !attributes.Exists() { - logger.Warn("terra message event has no attributes", zap.String("payload", json), zap.String("event", event.String())) - continue - } - mappedAttributes := map[string]string{} - for _, attribute := range attributes.Array() { - if !attribute.IsObject() { - logger.Warn("terra event attribute is invalid", zap.String("tx_hash", txHash), zap.String("attribute", attribute.String())) - continue - } - keyBase := gjson.Get(attribute.String(), "key") - if !keyBase.Exists(){ - logger.Warn("terra event attribute does not have key", zap.String("tx_hash", txHash), zap.String("attribute", attribute.String())) - continue - } - valueBase := gjson.Get(attribute.String(), "value") - if !valueBase.Exists(){ - logger.Warn("terra event attribute does not have value", zap.String("tx_hash", txHash), zap.String("attribute", attribute.String())) - continue - } - - key, err := base64.StdEncoding.DecodeString(keyBase.String()) - if err != nil { - logger.Warn("terra event key attribute is invalid", zap.String("tx_hash", txHash), zap.String("key", keyBase.String())) - continue - } - value, err := base64.StdEncoding.DecodeString(valueBase.String()) - if err != nil { - logger.Warn("terra event value attribute is invalid", zap.String("tx_hash", txHash), zap.String("key", keyBase.String()), zap.String("value", valueBase.String())) - continue - } - - if _, ok := mappedAttributes[string(key)]; ok { - logger.Debug("duplicate key in events", zap.String("tx_hash", txHash), zap.String("key", keyBase.String()), zap.String("value", valueBase.String())) - continue - } - - mappedAttributes[string(key)] = string(value) - } - - contractAddress, ok := mappedAttributes["contract_address"] - if !ok { - logger.Warn("terra wasm event without contract address field set", zap.String("event", event.String())) - continue - } - // This is not a wormhole message - if contractAddress != e.contract { - continue - } - - payload, ok := mappedAttributes["message.message"] - if !ok { - logger.Error("wormhole event does not have a message field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String())) - continue - } - sender, ok := mappedAttributes["message.sender"] - if !ok { - logger.Error("wormhole event does not have a sender field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String())) - continue - } - chainId, ok := mappedAttributes["message.chain_id"] - if !ok { - logger.Error("wormhole event does not have a chain_id field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String())) - continue - } - nonce, ok := mappedAttributes["message.nonce"] - if !ok { - logger.Error("wormhole event does not have a nonce field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String())) - continue - } - sequence, ok := mappedAttributes["message.sequence"] - if !ok { - logger.Error("wormhole event does not have a sequence field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String())) - continue - } - blockTime, ok := mappedAttributes["message.block_time"] - if !ok { - logger.Error("wormhole event does not have a block_time field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String())) - continue - } - - logger.Info("new message detected on terra", - zap.String("chainId", chainId), - zap.String("txHash", txHash), - zap.String("sender", sender), - zap.String("nonce", nonce), - zap.String("sequence", sequence), - zap.String("blockTime", blockTime), - ) - - senderAddress, err := StringToAddress(sender) - if err != nil { - logger.Error("cannot decode emitter hex", zap.String("tx_hash", txHash), zap.String("value", sender)) - continue - } - txHashValue, err := StringToHash(txHash) - if err != nil { - logger.Error("cannot decode tx hash hex", zap.String("tx_hash", txHash), zap.String("value", txHash)) - continue - } - payloadValue, err := hex.DecodeString(payload) - if err != nil { - logger.Error("cannot decode payload", zap.String("tx_hash", txHash), zap.String("value", payload)) - continue - } - - blockTimeInt, err := strconv.ParseInt(blockTime, 10, 64) - if err != nil { - logger.Error("blocktime cannot be parsed as int", zap.String("tx_hash", txHash), zap.String("value", blockTime)) - continue - } - nonceInt, err := strconv.ParseUint(nonce, 10, 32) - if err != nil { - logger.Error("nonce cannot be parsed as int", zap.String("tx_hash", txHash), zap.String("value", blockTime)) - continue - } - sequenceInt, err := strconv.ParseUint(sequence, 10, 64) - if err != nil { - logger.Error("sequence cannot be parsed as int", zap.String("tx_hash", txHash), zap.String("value", blockTime)) - continue - } - messagePublication := &common.MessagePublication{ - TxHash: txHashValue, - Timestamp: time.Unix(blockTimeInt, 0), - Nonce: uint32(nonceInt), - Sequence: sequenceInt, - EmitterChain: vaa.ChainIDTerra, - EmitterAddress: senderAddress, - Payload: payloadValue, - ConsistencyLevel: 0, // Instant finality - } - e.msgChan <- messagePublication + msgs := EventsToMessagePublications(e.contract, txHash, events.Array(), logger) + for _, msg := range msgs { + e.msgChan <- msg terraMessagesConfirmed.Inc() } @@ -391,6 +318,155 @@ func (e *Watcher) Run(ctx context.Context) error { } } +func EventsToMessagePublications(contract string, txHash string, events []gjson.Result, logger *zap.Logger) []*common.MessagePublication { + msgs := make([]*common.MessagePublication, 0, len(events)) + for _, event := range events { + if !event.IsObject() { + logger.Warn("terra event is invalid", zap.String("tx_hash", txHash), zap.String("event", event.String())) + continue + } + eventType := gjson.Get(event.String(), "type") + if eventType.String() != "wasm" { + continue + } + + attributes := gjson.Get(event.String(), "attributes") + if !attributes.Exists() { + logger.Warn("terra message event has no attributes", zap.String("tx_hash", txHash), zap.String("event", event.String())) + continue + } + mappedAttributes := map[string]string{} + for _, attribute := range attributes.Array() { + if !attribute.IsObject() { + logger.Warn("terra event attribute is invalid", zap.String("tx_hash", txHash), zap.String("attribute", attribute.String())) + continue + } + keyBase := gjson.Get(attribute.String(), "key") + if !keyBase.Exists() { + logger.Warn("terra event attribute does not have key", zap.String("tx_hash", txHash), zap.String("attribute", attribute.String())) + continue + } + valueBase := gjson.Get(attribute.String(), "value") + if !valueBase.Exists() { + logger.Warn("terra event attribute does not have value", zap.String("tx_hash", txHash), zap.String("attribute", attribute.String())) + continue + } + + key, err := base64.StdEncoding.DecodeString(keyBase.String()) + if err != nil { + logger.Warn("terra event key attribute is invalid", zap.String("tx_hash", txHash), zap.String("key", keyBase.String())) + continue + } + value, err := base64.StdEncoding.DecodeString(valueBase.String()) + if err != nil { + logger.Warn("terra event value attribute is invalid", zap.String("tx_hash", txHash), zap.String("key", keyBase.String()), zap.String("value", valueBase.String())) + continue + } + + if _, ok := mappedAttributes[string(key)]; ok { + logger.Debug("duplicate key in events", zap.String("tx_hash", txHash), zap.String("key", keyBase.String()), zap.String("value", valueBase.String())) + continue + } + + mappedAttributes[string(key)] = string(value) + } + + contractAddress, ok := mappedAttributes["contract_address"] + if !ok { + logger.Warn("terra wasm event without contract address field set", zap.String("event", event.String())) + continue + } + // This is not a wormhole message + if contractAddress != contract { + continue + } + + payload, ok := mappedAttributes["message.message"] + if !ok { + logger.Error("wormhole event does not have a message field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String())) + continue + } + sender, ok := mappedAttributes["message.sender"] + if !ok { + logger.Error("wormhole event does not have a sender field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String())) + continue + } + chainId, ok := mappedAttributes["message.chain_id"] + if !ok { + logger.Error("wormhole event does not have a chain_id field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String())) + continue + } + nonce, ok := mappedAttributes["message.nonce"] + if !ok { + logger.Error("wormhole event does not have a nonce field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String())) + continue + } + sequence, ok := mappedAttributes["message.sequence"] + if !ok { + logger.Error("wormhole event does not have a sequence field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String())) + continue + } + blockTime, ok := mappedAttributes["message.block_time"] + if !ok { + logger.Error("wormhole event does not have a block_time field", zap.String("tx_hash", txHash), zap.String("attributes", attributes.String())) + continue + } + + logger.Info("new message detected on terra", + zap.String("chainId", chainId), + zap.String("txHash", txHash), + zap.String("sender", sender), + zap.String("nonce", nonce), + zap.String("sequence", sequence), + zap.String("blockTime", blockTime), + ) + + senderAddress, err := StringToAddress(sender) + if err != nil { + logger.Error("cannot decode emitter hex", zap.String("tx_hash", txHash), zap.String("value", sender)) + continue + } + txHashValue, err := StringToHash(txHash) + if err != nil { + logger.Error("cannot decode tx hash hex", zap.String("tx_hash", txHash), zap.String("value", txHash)) + continue + } + payloadValue, err := hex.DecodeString(payload) + if err != nil { + logger.Error("cannot decode payload", zap.String("tx_hash", txHash), zap.String("value", payload)) + continue + } + + blockTimeInt, err := strconv.ParseInt(blockTime, 10, 64) + if err != nil { + logger.Error("blocktime cannot be parsed as int", zap.String("tx_hash", txHash), zap.String("value", blockTime)) + continue + } + nonceInt, err := strconv.ParseUint(nonce, 10, 32) + if err != nil { + logger.Error("nonce cannot be parsed as int", zap.String("tx_hash", txHash), zap.String("value", blockTime)) + continue + } + sequenceInt, err := strconv.ParseUint(sequence, 10, 64) + if err != nil { + logger.Error("sequence cannot be parsed as int", zap.String("tx_hash", txHash), zap.String("value", blockTime)) + continue + } + messagePublication := &common.MessagePublication{ + TxHash: txHashValue, + Timestamp: time.Unix(blockTimeInt, 0), + Nonce: uint32(nonceInt), + Sequence: sequenceInt, + EmitterChain: vaa.ChainIDTerra, + EmitterAddress: senderAddress, + Payload: payloadValue, + ConsistencyLevel: 0, // Instant finality + } + msgs = append(msgs, messagePublication) + } + return msgs +} + // StringToAddress convert string into address func StringToAddress(value string) (vaa.Address, error) { var address vaa.Address