diff --git a/node/cmd/guardiand/adminserver.go b/node/cmd/guardiand/adminserver.go index 3cdaa93ad..266b2ccc1 100644 --- a/node/cmd/guardiand/adminserver.go +++ b/node/cmd/guardiand/adminserver.go @@ -867,6 +867,8 @@ func (s *nodePrivilegedService) DumpRPCs(ctx context.Context, req *nodev1.DumpRP rpcMap["celoRPC"] = *celoRPC rpcMap["ethRPC"] = *ethRPC rpcMap["fantomRPC"] = *fantomRPC + rpcMap["ibcLCD"] = *ibcLCD + rpcMap["ibcWS"] = *ibcWS rpcMap["karuraRPC"] = *karuraRPC rpcMap["klaytnRPC"] = *klaytnRPC rpcMap["moonbeamRPC"] = *moonbeamRPC diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index fc476e5e4..96389bd60 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -20,6 +20,7 @@ import ( "github.com/certusone/wormhole/node/pkg/watchers/algorand" "github.com/certusone/wormhole/node/pkg/watchers/aptos" "github.com/certusone/wormhole/node/pkg/watchers/evm" + "github.com/certusone/wormhole/node/pkg/watchers/ibc" "github.com/certusone/wormhole/node/pkg/watchers/near" "github.com/certusone/wormhole/node/pkg/watchers/solana" "github.com/certusone/wormhole/node/pkg/watchers/sui" @@ -147,6 +148,10 @@ var ( wormchainKeyPath *string wormchainKeyPassPhrase *string + ibcWS *string + ibcLCD *string + ibcContract *string + accountantContract *string accountantWS *string accountantCheckEnabled *bool @@ -298,6 +303,10 @@ func init() { wormchainKeyPath = NodeCmd.Flags().String("wormchainKeyPath", "", "path to wormhole-chain private key for signing transactions") wormchainKeyPassPhrase = NodeCmd.Flags().String("wormchainKeyPassPhrase", "", "pass phrase used to unarmor the wormchain key file") + ibcWS = NodeCmd.Flags().String("ibcWS", "", "Websocket used to listen to the IBC receiver smart contract on wormchain") + ibcLCD = NodeCmd.Flags().String("ibcLCD", "", "Path to LCD service root for http calls") + ibcContract = NodeCmd.Flags().String("ibcContract", "", "Address of the IBC smart contract on wormchain") + accountantWS = NodeCmd.Flags().String("accountantWS", "", "Websocket used to listen to the accountant smart contract on wormchain") accountantContract = NodeCmd.Flags().String("accountantContract", "", "Address of the accountant smart contract on wormchain") accountantCheckEnabled = NodeCmd.Flags().Bool("accountantCheckEnabled", false, "Should accountant be enforced on transfers") @@ -1114,7 +1123,10 @@ func runNode(cmd *cobra.Command, args []string) { rootCtxCancel, acct, gov, - nil, nil, components)); err != nil { + nil, + nil, + components, + &ibc.Features)); err != nil { return err } @@ -1310,7 +1322,6 @@ func runNode(cmd *cobra.Command, args []string) { return err } } - if shouldStart(algorandIndexerRPC) { logger.Info("Starting Algorand watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDAlgorand) @@ -1432,6 +1443,49 @@ func runNode(cmd *cobra.Command, args []string) { } } + if shouldStart(ibcWS) { + if *ibcLCD == "" { + logger.Fatal("If --ibcWS is specified, then --ibcLCD must be specified") + } + if *ibcContract == "" { + logger.Fatal("If --ibcWS is specified, then --ibcContract must be specified") + } + + var chainConfig ibc.ChainConfig + for _, chainID := range ibc.Chains { + // Make sure the chain ID is valid. + if _, exists := chainMsgC[chainID]; !exists { + panic("invalid IBC chain ID") + } + + // Make sure this chain isn't already configured. + if _, exists := chainObsvReqC[chainID]; exists { + logger.Info("not monitoring chain with IBC because it is already registered.", zap.Stringer("chainID", chainID)) + continue + } + + chainObsvReqC[chainID] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + common.MustRegisterReadinessSyncing(chainID) + + chainConfig = append(chainConfig, ibc.ChainConfigEntry{ + ChainID: chainID, + MsgC: chainMsgC[chainID], + ObsvReqC: chainObsvReqC[chainID], + }) + } + + if len(chainConfig) > 0 { + logger.Info("Starting IBC watcher") + readiness.RegisterComponent(common.ReadinessIBCSyncing) + if err := supervisor.Run(ctx, "ibcwatch", + ibc.NewWatcher(*ibcWS, *ibcLCD, *ibcContract, chainConfig).Run); err != nil { + return err + } + } else { + logger.Error("Although IBC is enabled, there are no chains for it to monitor") + } + } + go handleReobservationRequests(rootCtx, clock.New(), logger, obsvReqReadC, chainObsvReqC) if acct != nil { diff --git a/node/cmd/spy/spy.go b/node/cmd/spy/spy.go index fd1e4a72c..dff020324 100644 --- a/node/cmd/spy/spy.go +++ b/node/cmd/spy/spy.go @@ -551,7 +551,9 @@ func runSpy(cmd *cobra.Command, args []string) { nil, nil, nil, - components)); err != nil { + components, + nil, // ibc feature string + )); err != nil { return err } diff --git a/node/pkg/common/readiness.go b/node/pkg/common/readiness.go index 01d886e7f..88c4d2602 100644 --- a/node/pkg/common/readiness.go +++ b/node/pkg/common/readiness.go @@ -9,6 +9,7 @@ import ( const ( ReadinessEthSyncing readiness.Component = "ethSyncing" + ReadinessIBCSyncing readiness.Component = "IBCSyncing" ) // MustRegisterReadinessSyncing registers the specified chain for readiness syncing. It panics if the chain ID is invalid so it should only be used during initialization. diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index 939aadebd..2dbe60b02 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -154,6 +154,7 @@ func Run( signedGovCfg chan *gossipv1.SignedChainGovernorConfig, signedGovSt chan *gossipv1.SignedChainGovernorStatus, components *Components, + ibcFeatures *string, ) func(ctx context.Context) error { if components == nil { components = DefaultComponents() @@ -306,6 +307,9 @@ func Run( if acct != nil { features = append(features, acct.FeatureString()) } + if ibcFeatures != nil && *ibcFeatures != "" { + features = append(features, *ibcFeatures) + } heartbeat := &gossipv1.Heartbeat{ NodeName: nodeName, diff --git a/node/pkg/p2p/watermark_test.go b/node/pkg/p2p/watermark_test.go index f7dc8d2f6..82e1dc72d 100644 --- a/node/pkg/p2p/watermark_test.go +++ b/node/pkg/p2p/watermark_test.go @@ -180,5 +180,7 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) { g.gov, g.signedGovCfg, g.signedGovSt, - g.components)) + g.components, + nil, // ibc feature string + )) } diff --git a/node/pkg/watchers/cosmwasm/watcher.go b/node/pkg/watchers/cosmwasm/watcher.go index fa8862b02..24678b4db 100644 --- a/node/pkg/watchers/cosmwasm/watcher.go +++ b/node/pkg/watchers/cosmwasm/watcher.go @@ -29,6 +29,11 @@ import ( "nhooyr.io/websocket/wsjson" ) +// ReadLimitSize can be used to increase the read limit size on the listening connection. The default read limit size is not large enough, +// causing "failed to read: read limited at 32769 bytes" errors during testing. Increasing this limit effects an internal buffer that +// is used to as part of the zero alloc/copy design. +const ReadLimitSize = 524288 + type ( // Watcher is responsible for looking over a cosmwasm blockchain and reporting new transactions to the contract Watcher struct { @@ -150,10 +155,7 @@ func (e *Watcher) Run(ctx context.Context) error { } defer c.Close(websocket.StatusNormalClosure, "") - // During testing, I got a message larger then the default - // 32768. Increasing this limit effects an internal buffer that is used - // to as part of the zero alloc/copy design. - c.SetReadLimit(524288) + c.SetReadLimit(ReadLimitSize) // Subscribe to smart contract transactions params := [...]string{fmt.Sprintf("tm.event='Tx' AND %s='%s'", e.contractAddressFilterKey, e.contract)} diff --git a/node/pkg/watchers/ibc/wasm_attrs.go b/node/pkg/watchers/ibc/wasm_attrs.go new file mode 100644 index 000000000..41788d733 --- /dev/null +++ b/node/pkg/watchers/ibc/wasm_attrs.go @@ -0,0 +1,100 @@ +package ibc + +import ( + "encoding/base64" + "fmt" + "strconv" + + "github.com/tidwall/gjson" + + "go.uber.org/zap" +) + +// WasmAttributes is an object to facilitate parsing of wasm event attributes. It provides a method to parse the attribute array in a wasm event, +// plus methods to return attributes as the appropriate type, including doing range checking. +type WasmAttributes struct { + m map[string]string +} + +// GetAsString returns the attribute value as a string. +func (wa *WasmAttributes) GetAsString(key string) (string, error) { + value, exists := wa.m[key] + if !exists { + return "", fmt.Errorf("attribute %s does not exist", key) + } + + return value, nil +} + +// GetAsUint returns the attribute value as an unsigned int. It also performs range checking. +func (wa *WasmAttributes) GetAsUint(key string, bitSize int) (uint64, error) { + valueStr, exists := wa.m[key] + if !exists { + return 0, fmt.Errorf("attribute %s does not exist", key) + } + + value, err := strconv.ParseUint(valueStr, 10, bitSize) + if err != nil { + return 0, fmt.Errorf("failed parse attribute %s with value %s as %d bit uint: %w", key, valueStr, bitSize, err) + } + + return value, nil +} + +// GetAsInt returns the attribute value as a signed int. It also performs range checking. +func (wa *WasmAttributes) GetAsInt(key string, bitSize int) (int64, error) { + valueStr, exists := wa.m[key] + if !exists { + return 0, fmt.Errorf("attribute %s does not exist", key) + } + + value, err := strconv.ParseInt(valueStr, 10, bitSize) + if err != nil { + return 0, fmt.Errorf("failed parse attribute %s with value %s as %d bit int: %w", key, valueStr, bitSize, err) + } + + return value, nil +} + +// Parse parses the attributes in a wasm event. +func (wa *WasmAttributes) Parse(logger *zap.Logger, event gjson.Result) error { + wa.m = make(map[string]string) + attributes := gjson.Get(event.String(), "attributes") + if !attributes.Exists() { + return fmt.Errorf("event does not contain any attributes") + } + + for _, attribute := range attributes.Array() { + if !attribute.IsObject() { + return fmt.Errorf("event attribute is invalid: %s", attribute.String()) + } + keyBase := gjson.Get(attribute.String(), "key") + if !keyBase.Exists() { + return fmt.Errorf("event attribute does not have a key: %s", attribute.String()) + } + valueBase := gjson.Get(attribute.String(), "value") + if !valueBase.Exists() { + return fmt.Errorf("event attribute does not have a value: %s", attribute.String()) + } + keyRaw, err := base64.StdEncoding.DecodeString(keyBase.String()) + if err != nil { + return fmt.Errorf("event attribute key is invalid base64: %s", attribute.String()) + } + valueRaw, err := base64.StdEncoding.DecodeString(valueBase.String()) + if err != nil { + return fmt.Errorf("event attribute value is invalid base64: %s", attribute.String()) + } + + key := string(keyRaw) + value := string(valueRaw) + + if _, ok := wa.m[key]; ok { + return fmt.Errorf("duplicate key in event: %s", key) + } + + logger.Debug("event attribute", zap.String("key", key), zap.String("value", value)) + wa.m[key] = value + } + + return nil +} diff --git a/node/pkg/watchers/ibc/watcher.go b/node/pkg/watchers/ibc/watcher.go new file mode 100644 index 000000000..e6f021d0d --- /dev/null +++ b/node/pkg/watchers/ibc/watcher.go @@ -0,0 +1,700 @@ +package ibc + +import ( + "context" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "sync" + "time" + + "github.com/certusone/wormhole/node/pkg/common" + "github.com/certusone/wormhole/node/pkg/p2p" + "github.com/certusone/wormhole/node/pkg/readiness" + "github.com/certusone/wormhole/node/pkg/supervisor" + "github.com/certusone/wormhole/node/pkg/watchers/cosmwasm" + "github.com/wormhole-foundation/wormhole/sdk/vaa" + + gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/tidwall/gjson" + + "nhooyr.io/websocket" + "nhooyr.io/websocket/wsjson" + + ethCommon "github.com/ethereum/go-ethereum/common" + + "go.uber.org/zap" +) + +type ( + // ChainConfig is the list of chains to be monitored over IBC, along with their channel data. + ChainConfig []ChainConfigEntry + + // ChainConfigEntry defines the entry for a chain being monitored by IBC. + ChainConfigEntry struct { + ChainID vaa.ChainID + MsgC chan<- *common.MessagePublication + ObsvReqC <-chan *gossipv1.ObservationRequest + } +) + +var ( + // Chains defines the list of chains to be monitored by IBC. Add new chains here as necessary. + Chains = []vaa.ChainID{} + + // Features is the feature string to be published in the gossip heartbeat messages. It will include all chains that are actually enabled on IBC. + Features = "" + + ibcErrors = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "ibc_errors_by_reason", + Help: "Total number of errors on IBC", + }, []string{"reason"}) + messagesConfirmed = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "wormhole_ibc_messages_confirmed_total", + Help: "Total number of verified messages found on an IBC connected chain", + }, []string{"chain_name"}) + currentSlotHeight = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "wormhole_ibc_current_height", + Help: "Current slot height on an IBC connected chain (the block height on wormchain)", + }, []string{"chain_name"}) + invalidChainIdMismatches = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "wormhole_ibc_chain_id_mismatches", + Help: "Total number of cases where the wormhole chain ID does not match the IBC connection ID", + }, []string{"ibc_channel_id"}) +) + +type ( + // Watcher is responsible for monitoring the IBC contract on wormchain and publishing wormhole messages for all chains connected via IBC. + Watcher struct { + wsUrl string + lcdUrl string + contractAddress string + logger *zap.Logger + + // chainMap defines the data associated with all connected / enabled chains. + chainMap map[vaa.ChainID]*chainEntry + + // channelIdToChainIdMap provides a mapping from IBC channel ID to chain ID. Note that there can be multiple channels IDs for the same chain. + channelIdToChainIdMap map[string]vaa.ChainID + + // channelIdToChainIdLock protects channelIdToChainIdMap. + channelIdToChainIdLock sync.Mutex + } + + // chainEntry defines the data associated with a chain. + chainEntry struct { + chainID vaa.ChainID + chainName string + readiness readiness.Component + msgC chan<- *common.MessagePublication + obsvReqC <-chan *gossipv1.ObservationRequest + } +) + +// NewWatcher creates a new IBC contract watcher +func NewWatcher( + wsUrl string, + lcdUrl string, + contractAddress string, + chainConfig ChainConfig, +) *Watcher { + features := "" + chainMap := make(map[vaa.ChainID]*chainEntry) + for _, chainToMonitor := range chainConfig { + _, exists := chainMap[chainToMonitor.ChainID] + if exists { + panic(fmt.Sprintf("detected duplicate chainID: %v", chainToMonitor.ChainID)) + } + + ce := &chainEntry{ + chainID: chainToMonitor.ChainID, + chainName: chainToMonitor.ChainID.String(), + readiness: common.MustConvertChainIdToReadinessSyncing(chainToMonitor.ChainID), + msgC: chainToMonitor.MsgC, + obsvReqC: chainToMonitor.ObsvReqC, + } + + chainMap[ce.chainID] = ce + + if features == "" { + features = "ibc:" + } else { + features += "|" + } + features += ce.chainID.String() + } + + Features = features + + return &Watcher{ + wsUrl: wsUrl, + lcdUrl: lcdUrl, + contractAddress: contractAddress, + chainMap: chainMap, + channelIdToChainIdMap: make(map[string]vaa.ChainID), + } +} + +// clientRequest is used to subscribe for events from the contract. +type clientRequest struct { + JSONRPC string `json:"jsonrpc"` + // A String containing the name of the method to be invoked. + Method string `json:"method"` + // Object to pass as request parameter to the method. + Params [1]string `json:"params"` + // The request id. This can be of any type. It is used to match the + // response with the request that it is replying to. + ID uint64 `json:"id"` +} + +// ibcReceivePublishEvent represents the log message received from the IBC receiver contract. +type ibcReceivePublishEvent struct { + ChannelID string + Msg *common.MessagePublication +} + +// Run is the runnable for monitoring the IBC contract on wormchain. +func (w *Watcher) Run(ctx context.Context) error { + w.logger = supervisor.Logger(ctx) + errC := make(chan error) + + w.logger.Info("creating watcher", + zap.String("wsUrl", w.wsUrl), + zap.String("lcdUrl", w.lcdUrl), + zap.String("contract", w.contractAddress), + zap.String("features", Features), + ) + + for _, ce := range w.chainMap { + w.logger.Info("will monitor chain over IBC", zap.String("chain", ce.chainName)) + p2p.DefaultRegistry.SetNetworkStats(ce.chainID, &gossipv1.Heartbeat_Network{ContractAddress: w.contractAddress}) + } + + c, _, err := websocket.Dial(ctx, w.wsUrl, nil) + if err != nil { + ibcErrors.WithLabelValues("websocket_dial_error").Inc() + return fmt.Errorf("failed to establish tendermint websocket connection: %w", err) + } + defer c.Close(websocket.StatusNormalClosure, "") + + c.SetReadLimit(cosmwasm.ReadLimitSize) + + // Subscribe to smart contract transactions. + params := [...]string{fmt.Sprintf("tm.event='Tx' AND wasm._contract_address='%s'", w.contractAddress)} + command := &clientRequest{ + JSONRPC: "2.0", + Method: "subscribe", + Params: params, + ID: 1, + } + err = wsjson.Write(ctx, c, command) + if err != nil { + ibcErrors.WithLabelValues("websocket_subscription_error").Inc() + return fmt.Errorf("failed to subscribe to events: %w", err) + } + + // Wait for the success response. + _, subResp, err := c.Read(ctx) + if err != nil { + ibcErrors.WithLabelValues("websocket_subscription_error").Inc() + return fmt.Errorf("failed to receive response to subscribe request: %w", err) + } + if strings.Contains(string(subResp), "error") { + ibcErrors.WithLabelValues("websocket_subscription_error").Inc() + return fmt.Errorf("failed to subscribe to events, response: %s", string(subResp)) + } + + // Start a routine to listen for messages from the contract. + common.RunWithScissors(ctx, errC, "ibc_data_pump", func(ctx context.Context) error { + return w.handleEvents(ctx, c) + }) + + // Start a routine to periodically query the wormchain block height. + common.RunWithScissors(ctx, errC, "ibc_block_height", func(ctx context.Context) error { + return w.handleQueryBlockHeight(ctx, c) + }) + + // Start a routine for each chain to listen for observation requests. + for _, ce := range w.chainMap { + common.RunWithScissors(ctx, errC, "ibc_objs_req", func(ctx context.Context) error { + return w.handleObservationRequests(ctx, ce) + }) + } + + // Signal to the supervisor that this runnable has finished initialization. + supervisor.Signal(ctx, supervisor.SignalHealthy) + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errC: + return err + } +} + +// handleEvents reads messages from the IBC receiver contract and processes them. +func (w *Watcher) handleEvents(ctx context.Context, c *websocket.Conn) error { + for { + select { + case <-ctx.Done(): + return nil + default: + _, message, err := c.Read(ctx) + if err != nil { + w.logger.Error("failed to read socket", zap.Error(err)) + ibcErrors.WithLabelValues("channel_read_error").Inc() + return fmt.Errorf("failed to read socket: %w", err) + } + + // Received a message from the blockchain. + json := string(message) + + txHashRaw := gjson.Get(json, "result.events.tx\\.hash.0") + if !txHashRaw.Exists() { + w.logger.Warn("message does not have tx hash", zap.String("payload", json)) + continue + } + txHash, err := vaa.StringToHash(txHashRaw.String()) + if err != nil { + w.logger.Warn("failed to parse txHash", zap.String("txHash", txHashRaw.String()), zap.Error(err)) + continue + } + + events := gjson.Get(json, "result.data.value.TxResult.result.events") + if !events.Exists() { + w.logger.Warn("message has no events", zap.String("payload", json)) + continue + } + + for _, event := range events.Array() { + if !event.IsObject() { + w.logger.Warn("event is invalid", zap.Stringer("tx_hash", txHash), zap.String("event", event.String())) + continue + } + eventType := gjson.Get(event.String(), "type").String() + if eventType == "wasm" { + evt, err := parseIbcReceivePublishEvent(w.logger, w.contractAddress, event, txHash) + if err != nil { + w.logger.Error("failed to parse wasm event", zap.Error(err), zap.String("event", event.String())) + continue + } + + if evt != nil { + if err := w.processIbcReceivePublishEvent(txHash, evt, "new"); err != nil { + return fmt.Errorf("failed to process new IBC event: %w", err) + } + } + } else { + w.logger.Debug("ignoring uninteresting event", zap.String("eventType", eventType)) + } + } + } + } +} + +// handleQueryBlockHeight gets the latest block height from wormchain each interval and updates the status on all the connected chains. +func (w *Watcher) handleQueryBlockHeight(ctx context.Context, c *websocket.Conn) error { + const latestBlockURL = "blocks/latest" + + t := time.NewTicker(5 * time.Second) + client := &http.Client{ + Timeout: time.Second * 5, + } + + for { + select { + case <-ctx.Done(): + return nil + case <-t.C: + resp, err := client.Get(fmt.Sprintf("%s/%s", w.lcdUrl, latestBlockURL)) + if err != nil { + return fmt.Errorf("failed to query latest block: %w", err) + } + blocksBody, err := io.ReadAll(resp.Body) + if err != nil { + resp.Body.Close() + return fmt.Errorf("failed to read latest block body: %w", err) + } + resp.Body.Close() + + blockJSON := string(blocksBody) + latestBlockAsInt := gjson.Get(blockJSON, "block.header.height").Int() + latestBlockAsFloat := float64(latestBlockAsInt) + w.logger.Debug("current block height", zap.Int64("height", latestBlockAsInt)) + + for _, ce := range w.chainMap { + currentSlotHeight.WithLabelValues(ce.chainName).Set(latestBlockAsFloat) + p2p.DefaultRegistry.SetNetworkStats(ce.chainID, &gossipv1.Heartbeat_Network{ + Height: latestBlockAsInt, + ContractAddress: w.contractAddress, + }) + + readiness.SetReady(ce.readiness) + } + + readiness.SetReady(common.ReadinessIBCSyncing) + } + } +} + +// handleObservationRequests listens for observation requests for a single chain and processes them by reading the requested transaction +// from wormchain and publishing the associated message. This function is instantiated for each connected chain. +func (w *Watcher) handleObservationRequests(ctx context.Context, ce *chainEntry) error { + for { + select { + case <-ctx.Done(): + return nil + case r := <-ce.obsvReqC: + if vaa.ChainID(r.ChainId) != ce.chainID { + panic("invalid chain ID") + } + + reqTxHashStr := hex.EncodeToString(r.TxHash) + w.logger.Info("received observation request", zap.String("chain", ce.chainName), zap.String("txHash", reqTxHashStr)) + + client := &http.Client{ + Timeout: time.Second * 5, + } + + // Query for tx by hash. + resp, err := client.Get(fmt.Sprintf("%s/cosmos/tx/v1beta1/txs/%s", w.lcdUrl, reqTxHashStr)) + if err != nil { + w.logger.Error("query tx response error", zap.String("chain", ce.chainName), zap.Error(err)) + continue + } + txBody, err := io.ReadAll(resp.Body) + if err != nil { + w.logger.Error("query tx response read error", zap.String("chain", ce.chainName), zap.Error(err)) + resp.Body.Close() + continue + } + resp.Body.Close() + + txJSON := string(txBody) + txHashRaw := gjson.Get(txJSON, "tx_response.txhash") + if !txHashRaw.Exists() { + w.logger.Error("tx does not have tx hash", zap.String("chain", ce.chainName), zap.String("payload", txJSON)) + continue + } + txHashStr := txHashRaw.String() + txHash, err := vaa.StringToHash(txHashStr) + if err != nil { + w.logger.Error("tx does not have tx hash", zap.String("chain", ce.chainName), zap.String("payload", txJSON)) + continue + } + + events := gjson.Get(txJSON, "tx_response.events") + if !events.Exists() { + w.logger.Error("tx has no events", zap.String("chain", ce.chainName), zap.String("payload", txJSON)) + continue + } + + for _, event := range events.Array() { + if !event.IsObject() { + w.logger.Error("event is invalid", zap.String("chain", ce.chainName), zap.String("tx_hash", txHashStr), zap.String("event", event.String())) + continue + } + eventType := gjson.Get(event.String(), "type") + if eventType.String() == "wasm" { + w.logger.Debug("found wasm event in reobservation", zap.String("chain", ce.chainName), zap.Stringer("txHash", txHash)) + evt, err := parseIbcReceivePublishEvent(w.logger, w.contractAddress, event, txHash) + if err != nil { + w.logger.Error("failed to parse wasm event", zap.String("chain", ce.chainName), zap.Error(err), zap.Any("event", event)) + continue + } + + if evt != nil { + if err := w.processIbcReceivePublishEvent(txHash, evt, "reobservation"); err != nil { + return fmt.Errorf("failed to process reobserved IBC event: %w", err) + } + } + } else { + w.logger.Debug("ignoring uninteresting event in reobservation", zap.String("chain", ce.chainName), zap.Stringer("txHash", txHash), zap.String("eventType", eventType.String())) + } + } + } + } +} + +// parseIbcReceivePublishEvent parses a wasm event into an object. Since the watcher only subscribes to events from a single contract, this function returns an error +// if the contract does not match the desired one. However, since the contract publishes multiple action types, this function returns nil rather than an error +// if the event is not for the desired action. +func parseIbcReceivePublishEvent(logger *zap.Logger, desiredContract string, event gjson.Result, txHash ethCommon.Hash) (*ibcReceivePublishEvent, error) { + var attributes WasmAttributes + err := attributes.Parse(logger, event) + if err != nil { + logger.Error("failed to parse event attributes", zap.Error(err), zap.String("event", event.String())) + return nil, fmt.Errorf("failed to parse attributes: %w", err) + } + + str, err := attributes.GetAsString("_contract_address") + if err != nil { + return nil, err + } + if str != desiredContract { + return nil, fmt.Errorf("received an event from an unexpected contract: %s", str) + } + + str, err = attributes.GetAsString("action") + if err != nil || str != "receive_publish" { + return nil, nil + } + + evt := new(ibcReceivePublishEvent) + evt.Msg = new(common.MessagePublication) + evt.Msg.TxHash = txHash + + evt.ChannelID, err = attributes.GetAsString("channel_id") + if err != nil { + return evt, err + } + + unumber, err := attributes.GetAsUint("message.chain_id", 16) + if err != nil { + return evt, err + } + evt.Msg.EmitterChain = vaa.ChainID(unumber) + + str, err = attributes.GetAsString("message.sender") + if err != nil { + return evt, err + } + evt.Msg.EmitterAddress, err = vaa.StringToAddress(str) + if err != nil { + return evt, fmt.Errorf("failed to parse message.sender attribute %s: %w", str, err) + } + + unumber, err = attributes.GetAsUint("message.nonce", 32) + if err != nil { + return evt, err + } + evt.Msg.Nonce = uint32(unumber) + + unumber, err = attributes.GetAsUint("message.sequence", 64) + if err != nil { + return evt, err + } + evt.Msg.Sequence = unumber + + snumber, err := attributes.GetAsInt("message.block_time", 64) + if err != nil { + return evt, err + } + evt.Msg.Timestamp = time.Unix(snumber, 0) + + str, err = attributes.GetAsString("message.message") + if err != nil { + return evt, err + } + evt.Msg.Payload, err = hex.DecodeString(str) + if err != nil { + return evt, fmt.Errorf("failed to parse message.message attribute %s: %w", str, err) + } + + return evt, nil +} + +// processIbcReceivePublishEvent takes an IBC event, maps it to a message publication and publishes it. +func (w *Watcher) processIbcReceivePublishEvent(txHash ethCommon.Hash, evt *ibcReceivePublishEvent, observationType string) error { + mappedChainID, err := w.getChainIdFromChannelID(evt.ChannelID) + if err != nil { + w.logger.Error("query for IBC channel ID failed", + zap.String("IbcChannelID", evt.ChannelID), + zap.Stringer("TxHash", evt.Msg.TxHash), + zap.Stringer("EmitterChain", evt.Msg.EmitterChain), + zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress), + zap.Uint64("Sequence", evt.Msg.Sequence), + zap.Uint32("Nonce", evt.Msg.Nonce), + zap.Stringer("Timestamp", evt.Msg.Timestamp), + zap.Uint8("ConsistencyLevel", evt.Msg.ConsistencyLevel), + zap.Error(err), + ) + ibcErrors.WithLabelValues("query_error").Inc() + return fmt.Errorf("failed to query IBC channel ID mapping: %w", err) + } + + if mappedChainID == vaa.ChainIDUnset { + // This can happen if the channel ID to chain ID mapping in the contract hasn't been updated yet (pending governance). + // Therefore we don't want to return an error here. Restarting won't help. + w.logger.Error(fmt.Sprintf("received %s message from unknown IBC channel, dropping observation", observationType), + zap.String("IbcChannelID", evt.ChannelID), + zap.Stringer("TxHash", evt.Msg.TxHash), + zap.Stringer("EmitterChain", evt.Msg.EmitterChain), + zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress), + zap.Uint64("Sequence", evt.Msg.Sequence), + zap.Uint32("Nonce", evt.Msg.Nonce), + zap.Stringer("Timestamp", evt.Msg.Timestamp), + zap.Uint8("ConsistencyLevel", evt.Msg.ConsistencyLevel), + ) + ibcErrors.WithLabelValues("unexpected_ibc_channel_error").Inc() + return nil + } + + ce, exists := w.chainMap[mappedChainID] + if !exists { + // This is not an error because some guardians may choose to run the full node and not listen to this chain over IBC. + w.logger.Debug(fmt.Sprintf("received %s message from an unconfigured chain, dropping observation", observationType), + zap.String("IbcChannelID", evt.ChannelID), + zap.Stringer("ChainID", mappedChainID), + zap.Stringer("TxHash", evt.Msg.TxHash), + zap.Stringer("EmitterChain", evt.Msg.EmitterChain), + zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress), + zap.Uint64("Sequence", evt.Msg.Sequence), + zap.Uint32("Nonce", evt.Msg.Nonce), + zap.Stringer("Timestamp", evt.Msg.Timestamp), + zap.Uint8("ConsistencyLevel", evt.Msg.ConsistencyLevel), + ) + return nil + } + + if evt.Msg.EmitterChain != ce.chainID { + w.logger.Error(fmt.Sprintf("chain id mismatch in %s message", observationType), + zap.String("IbcChannelID", evt.ChannelID), + zap.Uint16("MappedChainID", uint16(mappedChainID)), + zap.Uint16("ExpectedChainID", uint16(ce.chainID)), + zap.Stringer("TxHash", evt.Msg.TxHash), + zap.Stringer("EmitterChain", evt.Msg.EmitterChain), + zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress), + zap.Uint64("Sequence", evt.Msg.Sequence), + zap.Uint32("Nonce", evt.Msg.Nonce), + zap.Stringer("Timestamp", evt.Msg.Timestamp), + zap.Uint8("ConsistencyLevel", evt.Msg.ConsistencyLevel), + ) + invalidChainIdMismatches.WithLabelValues(evt.ChannelID).Inc() + return nil // Don't return an error here because we don't want an external source to be able to kill the watcher. + } + + w.logger.Info(fmt.Sprintf("%s message detected", observationType), + zap.String("IbcChannelID", evt.ChannelID), + zap.String("ChainName", ce.chainName), + zap.Stringer("TxHash", evt.Msg.TxHash), + zap.Stringer("EmitterChain", evt.Msg.EmitterChain), + zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress), + zap.Uint64("Sequence", evt.Msg.Sequence), + zap.Uint32("Nonce", evt.Msg.Nonce), + zap.Stringer("Timestamp", evt.Msg.Timestamp), + zap.Uint8("ConsistencyLevel", evt.Msg.ConsistencyLevel), + ) + + ce.msgC <- evt.Msg + messagesConfirmed.WithLabelValues(ce.chainName).Inc() + return nil +} + +// getChainIdFromChannelID returns the chain ID associated with the specified IBC channel. It uses a cache to avoid constantly querying +// wormchain. This works because once an IBC channel is closed its ID will never be reused. This also means that there could be multiple +// IBC channels for the same chain ID. +// See the IBC spec for details: https://github.com/cosmos/ibc/tree/main/spec/core/ics-004-channel-and-packet-semantics#closing-handshake +func (w *Watcher) getChainIdFromChannelID(channelID string) (vaa.ChainID, error) { + w.channelIdToChainIdLock.Lock() + defer w.channelIdToChainIdLock.Unlock() + chainID, exists := w.channelIdToChainIdMap[channelID] + if exists { + return chainID, nil + } + + // We continue to hold the lock here because we don't want two routines (event handler and reobservation) both querying at the same time. + channelIdToChainIdMap, err := w.queryChannelIdToChainIdMapping() + if err != nil { + w.logger.Error("failed to query channelID to chainID mapping", zap.Error(err)) + return vaa.ChainIDUnset, err + } + + w.channelIdToChainIdMap = channelIdToChainIdMap + + chainID, exists = w.channelIdToChainIdMap[channelID] + if exists { + return chainID, nil + } + + return vaa.ChainIDUnset, nil +} + +/* +This query: +`"all_channel_chains"` is `ImFsbF9jaGFubmVsX2NoYWlucyI=` +which becomes: +http://localhost:1319/cosmwasm/wasm/v1/contract/wormhole1nc5tatafv6eyq7llkr2gv50ff9e22mnf70qgjlv737ktmt4eswrq0kdhcj/smart/ImFsbF9jaGFubmVsX2NoYWlucyI%3D + +Returns something like this: +{ + "data": { + "channels_chains": [ + [ + "Y2hhbm5lbC0w", + 18 + ] + ] + } +} + +*/ + +type ibcAllChannelChainsQueryResults struct { + Data struct { + ChannelChains [][]interface{} `json:"channels_chains"` + } +} + +var allChannelChainsQuery = url.QueryEscape(base64.StdEncoding.EncodeToString([]byte(`"all_channel_chains"`))) + +// queryChannelIdToChainIdMapping queries the contract for the set of IBC channels and their correspond chain IDs. +func (w *Watcher) queryChannelIdToChainIdMapping() (map[string]vaa.ChainID, error) { + client := &http.Client{ + Timeout: time.Second * 5, + } + + query := fmt.Sprintf(`%s/cosmwasm/wasm/v1/contract/%s/smart/%s`, w.lcdUrl, w.contractAddress, allChannelChainsQuery) + resp, err := client.Get(query) + if err != nil { + return nil, fmt.Errorf("query failed: %w", err) + } + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read failed: %w", err) + } + resp.Body.Close() + + var result ibcAllChannelChainsQueryResults + err = json.Unmarshal(body, &result) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal response: %s, error: %w", string(body), err) + } + + if len(result.Data.ChannelChains) == 0 { + return nil, fmt.Errorf("query did not return any data") + } + + w.logger.Info("queried IBC channel ID mapping", zap.Int("numEntriesReturned", len(result.Data.ChannelChains))) + + ret := make(map[string]vaa.ChainID) + for idx, entry := range result.Data.ChannelChains { + if len(entry) != 2 { + return nil, fmt.Errorf("channel map entry %d contains %d items when it should contain exactly two, json: %s", idx, len(entry), string(body)) + } + + channelIdBytes, err := base64.StdEncoding.DecodeString(entry[0].(string)) + if err != nil { + return nil, fmt.Errorf("channel ID for entry %d is invalid base64: %s, err: %s", idx, entry[0], err) + } + + channelID := string(channelIdBytes) + chainID := vaa.ChainID(entry[1].(float64)) + ret[channelID] = chainID + w.logger.Info("IBC channel ID mapping", zap.String("channelID", channelID), zap.Uint16("chainID", uint16(chainID))) + } + + return ret, nil +} diff --git a/node/pkg/watchers/ibc/watcher_test.go b/node/pkg/watchers/ibc/watcher_test.go new file mode 100644 index 000000000..d27b685db --- /dev/null +++ b/node/pkg/watchers/ibc/watcher_test.go @@ -0,0 +1,215 @@ +package ibc + +import ( + "encoding/base64" + "encoding/hex" + "encoding/json" + "reflect" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" + "go.uber.org/zap" + + "github.com/certusone/wormhole/node/pkg/common" + "github.com/wormhole-foundation/wormhole/sdk/vaa" +) + +func TestParseIbcReceivePublishEvent(t *testing.T) { + logger := zap.NewNop() + + eventJson := `{"type": "wasm","attributes": [` + + `{"key": "X2NvbnRyYWN0X2FkZHJlc3M=","value": "d29ybWhvbGUxbmM1dGF0YWZ2NmV5cTdsbGtyMmd2NTBmZjllMjJtbmY3MHFnamx2NzM3a3RtdDRlc3dycTBrZGhjag==","index": true},` + + `{"key": "YWN0aW9u", "value": "cmVjZWl2ZV9wdWJsaXNo", "index": true},` + + `{"key": "Y2hhbm5lbF9pZA==", "value": "Y2hhbm5lbC0w", "index": true},` + + `{"key": "bWVzc2FnZS5tZXNzYWdl","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwNA==","index": true},` + + `{"key": "bWVzc2FnZS5zZW5kZXI=","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMzU3NDMwNzQ5NTZjNzEwODAwZTgzMTk4MDExY2NiZDRkZGYxNTU2ZA==","index": true},` + + `{ "key": "bWVzc2FnZS5jaGFpbl9pZA==", "value": "MTg=", "index": true },` + + `{ "key": "bWVzc2FnZS5ub25jZQ==", "value": "MQ==", "index": true },` + + `{ "key": "bWVzc2FnZS5zZXF1ZW5jZQ==", "value": "Mg==", "index": true },` + + `{"key": "bWVzc2FnZS5ibG9ja190aW1l","value": "MTY4MDA5OTgxNA==","index": true},` + + `{"key": "bWVzc2FnZS5ibG9ja19oZWlnaHQ=","value": "MjYxMw==","index": true}` + + `]}` + + require.Equal(t, true, gjson.Valid(eventJson)) + event := gjson.Parse(eventJson) + + contractAddress := "wormhole1nc5tatafv6eyq7llkr2gv50ff9e22mnf70qgjlv737ktmt4eswrq0kdhcj" + + txHash, err := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b") + require.NoError(t, err) + + evt, err := parseIbcReceivePublishEvent(logger, contractAddress, event, txHash) + require.NoError(t, err) + require.NotNil(t, evt) + + expectedSender, err := vaa.StringToAddress("00000000000000000000000035743074956c710800e83198011ccbd4ddf1556d") + require.NoError(t, err) + + expectedPayload, err := hex.DecodeString("0000000000000000000000000000000000000000000000000000000000000004") + require.NoError(t, err) + + expectedResult := ibcReceivePublishEvent{ + ChannelID: "channel-0", + Msg: &common.MessagePublication{ + TxHash: txHash, + EmitterAddress: expectedSender, + EmitterChain: vaa.ChainIDTerra2, + Nonce: 1, + Sequence: 2, + Timestamp: time.Unix(1680099814, 0), + Payload: expectedPayload, + }, + } + // Use DeepEqual() because the response contains pointers. + assert.True(t, reflect.DeepEqual(expectedResult, *evt)) +} + +func TestParseEventForWrongContract(t *testing.T) { + logger := zap.NewNop() + + eventJson := `{"type": "wasm","attributes": [` + + `{"key": "X2NvbnRyYWN0X2FkZHJlc3M=","value": "d29ybWhvbGUxbmM1dGF0YWZ2NmV5cTdsbGtyMmd2NTBmZjllMjJtbmY3MHFnamx2NzM3a3RtdDRlc3dycTBrZGhjag==","index": true},` + + `{"key": "YWN0aW9u", "value": "cmVjZWl2ZV9wdWJsaXNo", "index": true},` + + `{"key": "Y2hhbm5lbF9pZA==", "value": "Y2hhbm5lbC0w", "index": true},` + + `{"key": "bWVzc2FnZS5tZXNzYWdl","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwNA==","index": true},` + + `{"key": "bWVzc2FnZS5zZW5kZXI=","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMzU3NDMwNzQ5NTZjNzEwODAwZTgzMTk4MDExY2NiZDRkZGYxNTU2ZA==","index": true},` + + `{ "key": "bWVzc2FnZS5jaGFpbl9pZA==", "value": "MTg=", "index": true },` + + `{ "key": "bWVzc2FnZS5ub25jZQ==", "value": "MQ==", "index": true },` + + `{ "key": "bWVzc2FnZS5zZXF1ZW5jZQ==", "value": "Mg==", "index": true },` + + `{"key": "bWVzc2FnZS5ibG9ja190aW1l","value": "MTY4MDA5OTgxNA==","index": true},` + + `{"key": "bWVzc2FnZS5ibG9ja19oZWlnaHQ=","value": "MjYxMw==","index": true}` + + `]}` + + require.Equal(t, true, gjson.Valid(eventJson)) + event := gjson.Parse(eventJson) + + contractAddress := "someOtherContract" + + txHash, err := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b") + require.NoError(t, err) + + _, err = parseIbcReceivePublishEvent(logger, contractAddress, event, txHash) + assert.Error(t, err) +} + +func TestParseEventForWrongAction(t *testing.T) { + logger := zap.NewNop() + + eventJson := `{"type": "wasm","attributes": [` + + `{"key": "X2NvbnRyYWN0X2FkZHJlc3M=","value": "d29ybWhvbGUxbmM1dGF0YWZ2NmV5cTdsbGtyMmd2NTBmZjllMjJtbmY3MHFnamx2NzM3a3RtdDRlc3dycTBrZGhjag==","index": true},` + + `{"key": "YWN0aW9u", "value": "cmVjZWl2ZV9wa3Q=", "index": true},` + // Changed action value to "receive_pkt" + `{"key": "Y2hhbm5lbF9pZA==", "value": "Y2hhbm5lbC0w", "index": true},` + + `{"key": "bWVzc2FnZS5tZXNzYWdl","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwNA==","index": true},` + + `{"key": "bWVzc2FnZS5zZW5kZXI=","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMzU3NDMwNzQ5NTZjNzEwODAwZTgzMTk4MDExY2NiZDRkZGYxNTU2ZA==","index": true},` + + `{ "key": "bWVzc2FnZS5jaGFpbl9pZA==", "value": "MTg=", "index": true },` + + `{ "key": "bWVzc2FnZS5ub25jZQ==", "value": "MQ==", "index": true },` + + `{ "key": "bWVzc2FnZS5zZXF1ZW5jZQ==", "value": "Mg==", "index": true },` + + `{"key": "bWVzc2FnZS5ibG9ja190aW1l","value": "MTY4MDA5OTgxNA==","index": true},` + + `{"key": "bWVzc2FnZS5ibG9ja19oZWlnaHQ=","value": "MjYxMw==","index": true}` + + `]}` + + require.Equal(t, true, gjson.Valid(eventJson)) + event := gjson.Parse(eventJson) + + contractAddress := "wormhole1nc5tatafv6eyq7llkr2gv50ff9e22mnf70qgjlv737ktmt4eswrq0kdhcj" + + txHash, err := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b") + require.NoError(t, err) + + evt, err := parseIbcReceivePublishEvent(logger, contractAddress, event, txHash) + require.NoError(t, err) + assert.Nil(t, evt) +} + +func TestParseEventForNoContractSpecified(t *testing.T) { + logger := zap.NewNop() + + eventJson := `{"type": "wasm","attributes": [` + + // No contract specified + `{"key": "YWN0aW9u", "value": "cmVjZWl2ZV9wdWJsaXNo", "index": true},` + + `{"key": "Y2hhbm5lbF9pZA==", "value": "Y2hhbm5lbC0w", "index": true},` + + `{"key": "bWVzc2FnZS5tZXNzYWdl","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwNA==","index": true},` + + `{"key": "bWVzc2FnZS5zZW5kZXI=","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMzU3NDMwNzQ5NTZjNzEwODAwZTgzMTk4MDExY2NiZDRkZGYxNTU2ZA==","index": true},` + + `{ "key": "bWVzc2FnZS5jaGFpbl9pZA==", "value": "MTg=", "index": true },` + + `{ "key": "bWVzc2FnZS5ub25jZQ==", "value": "MQ==", "index": true },` + + `{ "key": "bWVzc2FnZS5zZXF1ZW5jZQ==", "value": "Mg==", "index": true },` + + `{"key": "bWVzc2FnZS5ibG9ja190aW1l","value": "MTY4MDA5OTgxNA==","index": true},` + + `{"key": "bWVzc2FnZS5ibG9ja19oZWlnaHQ=","value": "MjYxMw==","index": true}` + + `]}` + + require.Equal(t, true, gjson.Valid(eventJson)) + event := gjson.Parse(eventJson) + + contractAddress := "wormhole1nc5tatafv6eyq7llkr2gv50ff9e22mnf70qgjlv737ktmt4eswrq0kdhcj" + + txHash, err := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b") + require.NoError(t, err) + + _, err = parseIbcReceivePublishEvent(logger, contractAddress, event, txHash) + assert.Error(t, err) +} + +func TestParseEventForNoActionSpecified(t *testing.T) { + logger := zap.NewNop() + + eventJson := `{"type": "wasm","attributes": [` + + `{"key": "X2NvbnRyYWN0X2FkZHJlc3M=","value": "d29ybWhvbGUxbmM1dGF0YWZ2NmV5cTdsbGtyMmd2NTBmZjllMjJtbmY3MHFnamx2NzM3a3RtdDRlc3dycTBrZGhjag==","index": true},` + + // No action specified + `{"key": "Y2hhbm5lbF9pZA==", "value": "Y2hhbm5lbC0w", "index": true},` + + `{"key": "bWVzc2FnZS5tZXNzYWdl","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwNA==","index": true},` + + `{"key": "bWVzc2FnZS5zZW5kZXI=","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMzU3NDMwNzQ5NTZjNzEwODAwZTgzMTk4MDExY2NiZDRkZGYxNTU2ZA==","index": true},` + + `{ "key": "bWVzc2FnZS5jaGFpbl9pZA==", "value": "MTg=", "index": true },` + + `{ "key": "bWVzc2FnZS5ub25jZQ==", "value": "MQ==", "index": true },` + + `{ "key": "bWVzc2FnZS5zZXF1ZW5jZQ==", "value": "Mg==", "index": true },` + + `{"key": "bWVzc2FnZS5ibG9ja190aW1l","value": "MTY4MDA5OTgxNA==","index": true},` + + `{"key": "bWVzc2FnZS5ibG9ja19oZWlnaHQ=","value": "MjYxMw==","index": true}` + + `]}` + + require.Equal(t, true, gjson.Valid(eventJson)) + event := gjson.Parse(eventJson) + + contractAddress := "wormhole1nc5tatafv6eyq7llkr2gv50ff9e22mnf70qgjlv737ktmt4eswrq0kdhcj" + + txHash, err := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b") + require.NoError(t, err) + + evt, err := parseIbcReceivePublishEvent(logger, contractAddress, event, txHash) + require.NoError(t, err) + assert.Nil(t, evt) +} + +func TestParseIbcAllChannelChainsQueryResults(t *testing.T) { + respJson := []byte(` + { + "data": { + "channels_chains": [ + [ + "Y2hhbm5lbC0w", + 18 + ], + [ + "Y2hhbm5lbC00Mg==", + 22 + ] + ] + } + } + `) + + var result ibcAllChannelChainsQueryResults + err := json.Unmarshal(respJson, &result) + require.NoError(t, err) + + expectedChannStr1 := base64.StdEncoding.EncodeToString([]byte("channel-0")) + expectedChannStr2 := base64.StdEncoding.EncodeToString([]byte("channel-42")) + + require.Equal(t, 2, len(result.Data.ChannelChains)) + require.Equal(t, 2, len(result.Data.ChannelChains[0])) + assert.Equal(t, expectedChannStr1, result.Data.ChannelChains[0][0].(string)) + assert.Equal(t, uint16(18), uint16(result.Data.ChannelChains[0][1].(float64))) + assert.Equal(t, expectedChannStr2, result.Data.ChannelChains[1][0].(string)) + assert.Equal(t, uint16(22), uint16(result.Data.ChannelChains[1][1].(float64))) +}