Node: ibc watcher (#2566)

* Node: IBC watcher

* Rework event parsing, add tests

* Config changes

* Config changes

* Start converting to tendermint

* More functionality

* Minor test changes

* Formatting changes

* Code review rework

* Rework event parsing

* More rework

* More rework

* Tweak some log levels

* More rework

* More rework

* Don't enable Terra2 over IBC

* Fix issue with error reporting

* More review rework

* Switch from connection IDs to channel IDs

* Remove unused errC from go routines
This commit is contained in:
bruce-riley 2023-04-17 09:54:36 -05:00 committed by GitHub
parent 8837a9af50
commit 53703d8ffc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1090 additions and 8 deletions

View File

@ -867,6 +867,8 @@ func (s *nodePrivilegedService) DumpRPCs(ctx context.Context, req *nodev1.DumpRP
rpcMap["celoRPC"] = *celoRPC
rpcMap["ethRPC"] = *ethRPC
rpcMap["fantomRPC"] = *fantomRPC
rpcMap["ibcLCD"] = *ibcLCD
rpcMap["ibcWS"] = *ibcWS
rpcMap["karuraRPC"] = *karuraRPC
rpcMap["klaytnRPC"] = *klaytnRPC
rpcMap["moonbeamRPC"] = *moonbeamRPC

View File

@ -20,6 +20,7 @@ import (
"github.com/certusone/wormhole/node/pkg/watchers/algorand"
"github.com/certusone/wormhole/node/pkg/watchers/aptos"
"github.com/certusone/wormhole/node/pkg/watchers/evm"
"github.com/certusone/wormhole/node/pkg/watchers/ibc"
"github.com/certusone/wormhole/node/pkg/watchers/near"
"github.com/certusone/wormhole/node/pkg/watchers/solana"
"github.com/certusone/wormhole/node/pkg/watchers/sui"
@ -147,6 +148,10 @@ var (
wormchainKeyPath *string
wormchainKeyPassPhrase *string
ibcWS *string
ibcLCD *string
ibcContract *string
accountantContract *string
accountantWS *string
accountantCheckEnabled *bool
@ -298,6 +303,10 @@ func init() {
wormchainKeyPath = NodeCmd.Flags().String("wormchainKeyPath", "", "path to wormhole-chain private key for signing transactions")
wormchainKeyPassPhrase = NodeCmd.Flags().String("wormchainKeyPassPhrase", "", "pass phrase used to unarmor the wormchain key file")
ibcWS = NodeCmd.Flags().String("ibcWS", "", "Websocket used to listen to the IBC receiver smart contract on wormchain")
ibcLCD = NodeCmd.Flags().String("ibcLCD", "", "Path to LCD service root for http calls")
ibcContract = NodeCmd.Flags().String("ibcContract", "", "Address of the IBC smart contract on wormchain")
accountantWS = NodeCmd.Flags().String("accountantWS", "", "Websocket used to listen to the accountant smart contract on wormchain")
accountantContract = NodeCmd.Flags().String("accountantContract", "", "Address of the accountant smart contract on wormchain")
accountantCheckEnabled = NodeCmd.Flags().Bool("accountantCheckEnabled", false, "Should accountant be enforced on transfers")
@ -1114,7 +1123,10 @@ func runNode(cmd *cobra.Command, args []string) {
rootCtxCancel,
acct,
gov,
nil, nil, components)); err != nil {
nil,
nil,
components,
&ibc.Features)); err != nil {
return err
}
@ -1310,7 +1322,6 @@ func runNode(cmd *cobra.Command, args []string) {
return err
}
}
if shouldStart(algorandIndexerRPC) {
logger.Info("Starting Algorand watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDAlgorand)
@ -1432,6 +1443,49 @@ func runNode(cmd *cobra.Command, args []string) {
}
}
if shouldStart(ibcWS) {
if *ibcLCD == "" {
logger.Fatal("If --ibcWS is specified, then --ibcLCD must be specified")
}
if *ibcContract == "" {
logger.Fatal("If --ibcWS is specified, then --ibcContract must be specified")
}
var chainConfig ibc.ChainConfig
for _, chainID := range ibc.Chains {
// Make sure the chain ID is valid.
if _, exists := chainMsgC[chainID]; !exists {
panic("invalid IBC chain ID")
}
// Make sure this chain isn't already configured.
if _, exists := chainObsvReqC[chainID]; exists {
logger.Info("not monitoring chain with IBC because it is already registered.", zap.Stringer("chainID", chainID))
continue
}
chainObsvReqC[chainID] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
common.MustRegisterReadinessSyncing(chainID)
chainConfig = append(chainConfig, ibc.ChainConfigEntry{
ChainID: chainID,
MsgC: chainMsgC[chainID],
ObsvReqC: chainObsvReqC[chainID],
})
}
if len(chainConfig) > 0 {
logger.Info("Starting IBC watcher")
readiness.RegisterComponent(common.ReadinessIBCSyncing)
if err := supervisor.Run(ctx, "ibcwatch",
ibc.NewWatcher(*ibcWS, *ibcLCD, *ibcContract, chainConfig).Run); err != nil {
return err
}
} else {
logger.Error("Although IBC is enabled, there are no chains for it to monitor")
}
}
go handleReobservationRequests(rootCtx, clock.New(), logger, obsvReqReadC, chainObsvReqC)
if acct != nil {

View File

@ -551,7 +551,9 @@ func runSpy(cmd *cobra.Command, args []string) {
nil,
nil,
nil,
components)); err != nil {
components,
nil, // ibc feature string
)); err != nil {
return err
}

View File

@ -9,6 +9,7 @@ import (
const (
ReadinessEthSyncing readiness.Component = "ethSyncing"
ReadinessIBCSyncing readiness.Component = "IBCSyncing"
)
// MustRegisterReadinessSyncing registers the specified chain for readiness syncing. It panics if the chain ID is invalid so it should only be used during initialization.

View File

@ -154,6 +154,7 @@ func Run(
signedGovCfg chan *gossipv1.SignedChainGovernorConfig,
signedGovSt chan *gossipv1.SignedChainGovernorStatus,
components *Components,
ibcFeatures *string,
) func(ctx context.Context) error {
if components == nil {
components = DefaultComponents()
@ -306,6 +307,9 @@ func Run(
if acct != nil {
features = append(features, acct.FeatureString())
}
if ibcFeatures != nil && *ibcFeatures != "" {
features = append(features, *ibcFeatures)
}
heartbeat := &gossipv1.Heartbeat{
NodeName: nodeName,

View File

@ -180,5 +180,7 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
g.gov,
g.signedGovCfg,
g.signedGovSt,
g.components))
g.components,
nil, // ibc feature string
))
}

View File

@ -29,6 +29,11 @@ import (
"nhooyr.io/websocket/wsjson"
)
// ReadLimitSize can be used to increase the read limit size on the listening connection. The default read limit size is not large enough,
// causing "failed to read: read limited at 32769 bytes" errors during testing. Increasing this limit effects an internal buffer that
// is used to as part of the zero alloc/copy design.
const ReadLimitSize = 524288
type (
// Watcher is responsible for looking over a cosmwasm blockchain and reporting new transactions to the contract
Watcher struct {
@ -150,10 +155,7 @@ func (e *Watcher) Run(ctx context.Context) error {
}
defer c.Close(websocket.StatusNormalClosure, "")
// During testing, I got a message larger then the default
// 32768. Increasing this limit effects an internal buffer that is used
// to as part of the zero alloc/copy design.
c.SetReadLimit(524288)
c.SetReadLimit(ReadLimitSize)
// Subscribe to smart contract transactions
params := [...]string{fmt.Sprintf("tm.event='Tx' AND %s='%s'", e.contractAddressFilterKey, e.contract)}

View File

@ -0,0 +1,100 @@
package ibc
import (
"encoding/base64"
"fmt"
"strconv"
"github.com/tidwall/gjson"
"go.uber.org/zap"
)
// WasmAttributes is an object to facilitate parsing of wasm event attributes. It provides a method to parse the attribute array in a wasm event,
// plus methods to return attributes as the appropriate type, including doing range checking.
type WasmAttributes struct {
m map[string]string
}
// GetAsString returns the attribute value as a string.
func (wa *WasmAttributes) GetAsString(key string) (string, error) {
value, exists := wa.m[key]
if !exists {
return "", fmt.Errorf("attribute %s does not exist", key)
}
return value, nil
}
// GetAsUint returns the attribute value as an unsigned int. It also performs range checking.
func (wa *WasmAttributes) GetAsUint(key string, bitSize int) (uint64, error) {
valueStr, exists := wa.m[key]
if !exists {
return 0, fmt.Errorf("attribute %s does not exist", key)
}
value, err := strconv.ParseUint(valueStr, 10, bitSize)
if err != nil {
return 0, fmt.Errorf("failed parse attribute %s with value %s as %d bit uint: %w", key, valueStr, bitSize, err)
}
return value, nil
}
// GetAsInt returns the attribute value as a signed int. It also performs range checking.
func (wa *WasmAttributes) GetAsInt(key string, bitSize int) (int64, error) {
valueStr, exists := wa.m[key]
if !exists {
return 0, fmt.Errorf("attribute %s does not exist", key)
}
value, err := strconv.ParseInt(valueStr, 10, bitSize)
if err != nil {
return 0, fmt.Errorf("failed parse attribute %s with value %s as %d bit int: %w", key, valueStr, bitSize, err)
}
return value, nil
}
// Parse parses the attributes in a wasm event.
func (wa *WasmAttributes) Parse(logger *zap.Logger, event gjson.Result) error {
wa.m = make(map[string]string)
attributes := gjson.Get(event.String(), "attributes")
if !attributes.Exists() {
return fmt.Errorf("event does not contain any attributes")
}
for _, attribute := range attributes.Array() {
if !attribute.IsObject() {
return fmt.Errorf("event attribute is invalid: %s", attribute.String())
}
keyBase := gjson.Get(attribute.String(), "key")
if !keyBase.Exists() {
return fmt.Errorf("event attribute does not have a key: %s", attribute.String())
}
valueBase := gjson.Get(attribute.String(), "value")
if !valueBase.Exists() {
return fmt.Errorf("event attribute does not have a value: %s", attribute.String())
}
keyRaw, err := base64.StdEncoding.DecodeString(keyBase.String())
if err != nil {
return fmt.Errorf("event attribute key is invalid base64: %s", attribute.String())
}
valueRaw, err := base64.StdEncoding.DecodeString(valueBase.String())
if err != nil {
return fmt.Errorf("event attribute value is invalid base64: %s", attribute.String())
}
key := string(keyRaw)
value := string(valueRaw)
if _, ok := wa.m[key]; ok {
return fmt.Errorf("duplicate key in event: %s", key)
}
logger.Debug("event attribute", zap.String("key", key), zap.String("value", value))
wa.m[key] = value
}
return nil
}

View File

@ -0,0 +1,700 @@
package ibc
import (
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
"github.com/certusone/wormhole/node/pkg/readiness"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers/cosmwasm"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/tidwall/gjson"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
ethCommon "github.com/ethereum/go-ethereum/common"
"go.uber.org/zap"
)
type (
// ChainConfig is the list of chains to be monitored over IBC, along with their channel data.
ChainConfig []ChainConfigEntry
// ChainConfigEntry defines the entry for a chain being monitored by IBC.
ChainConfigEntry struct {
ChainID vaa.ChainID
MsgC chan<- *common.MessagePublication
ObsvReqC <-chan *gossipv1.ObservationRequest
}
)
var (
// Chains defines the list of chains to be monitored by IBC. Add new chains here as necessary.
Chains = []vaa.ChainID{}
// Features is the feature string to be published in the gossip heartbeat messages. It will include all chains that are actually enabled on IBC.
Features = ""
ibcErrors = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "ibc_errors_by_reason",
Help: "Total number of errors on IBC",
}, []string{"reason"})
messagesConfirmed = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_ibc_messages_confirmed_total",
Help: "Total number of verified messages found on an IBC connected chain",
}, []string{"chain_name"})
currentSlotHeight = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "wormhole_ibc_current_height",
Help: "Current slot height on an IBC connected chain (the block height on wormchain)",
}, []string{"chain_name"})
invalidChainIdMismatches = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_ibc_chain_id_mismatches",
Help: "Total number of cases where the wormhole chain ID does not match the IBC connection ID",
}, []string{"ibc_channel_id"})
)
type (
// Watcher is responsible for monitoring the IBC contract on wormchain and publishing wormhole messages for all chains connected via IBC.
Watcher struct {
wsUrl string
lcdUrl string
contractAddress string
logger *zap.Logger
// chainMap defines the data associated with all connected / enabled chains.
chainMap map[vaa.ChainID]*chainEntry
// channelIdToChainIdMap provides a mapping from IBC channel ID to chain ID. Note that there can be multiple channels IDs for the same chain.
channelIdToChainIdMap map[string]vaa.ChainID
// channelIdToChainIdLock protects channelIdToChainIdMap.
channelIdToChainIdLock sync.Mutex
}
// chainEntry defines the data associated with a chain.
chainEntry struct {
chainID vaa.ChainID
chainName string
readiness readiness.Component
msgC chan<- *common.MessagePublication
obsvReqC <-chan *gossipv1.ObservationRequest
}
)
// NewWatcher creates a new IBC contract watcher
func NewWatcher(
wsUrl string,
lcdUrl string,
contractAddress string,
chainConfig ChainConfig,
) *Watcher {
features := ""
chainMap := make(map[vaa.ChainID]*chainEntry)
for _, chainToMonitor := range chainConfig {
_, exists := chainMap[chainToMonitor.ChainID]
if exists {
panic(fmt.Sprintf("detected duplicate chainID: %v", chainToMonitor.ChainID))
}
ce := &chainEntry{
chainID: chainToMonitor.ChainID,
chainName: chainToMonitor.ChainID.String(),
readiness: common.MustConvertChainIdToReadinessSyncing(chainToMonitor.ChainID),
msgC: chainToMonitor.MsgC,
obsvReqC: chainToMonitor.ObsvReqC,
}
chainMap[ce.chainID] = ce
if features == "" {
features = "ibc:"
} else {
features += "|"
}
features += ce.chainID.String()
}
Features = features
return &Watcher{
wsUrl: wsUrl,
lcdUrl: lcdUrl,
contractAddress: contractAddress,
chainMap: chainMap,
channelIdToChainIdMap: make(map[string]vaa.ChainID),
}
}
// clientRequest is used to subscribe for events from the contract.
type clientRequest struct {
JSONRPC string `json:"jsonrpc"`
// A String containing the name of the method to be invoked.
Method string `json:"method"`
// Object to pass as request parameter to the method.
Params [1]string `json:"params"`
// The request id. This can be of any type. It is used to match the
// response with the request that it is replying to.
ID uint64 `json:"id"`
}
// ibcReceivePublishEvent represents the log message received from the IBC receiver contract.
type ibcReceivePublishEvent struct {
ChannelID string
Msg *common.MessagePublication
}
// Run is the runnable for monitoring the IBC contract on wormchain.
func (w *Watcher) Run(ctx context.Context) error {
w.logger = supervisor.Logger(ctx)
errC := make(chan error)
w.logger.Info("creating watcher",
zap.String("wsUrl", w.wsUrl),
zap.String("lcdUrl", w.lcdUrl),
zap.String("contract", w.contractAddress),
zap.String("features", Features),
)
for _, ce := range w.chainMap {
w.logger.Info("will monitor chain over IBC", zap.String("chain", ce.chainName))
p2p.DefaultRegistry.SetNetworkStats(ce.chainID, &gossipv1.Heartbeat_Network{ContractAddress: w.contractAddress})
}
c, _, err := websocket.Dial(ctx, w.wsUrl, nil)
if err != nil {
ibcErrors.WithLabelValues("websocket_dial_error").Inc()
return fmt.Errorf("failed to establish tendermint websocket connection: %w", err)
}
defer c.Close(websocket.StatusNormalClosure, "")
c.SetReadLimit(cosmwasm.ReadLimitSize)
// Subscribe to smart contract transactions.
params := [...]string{fmt.Sprintf("tm.event='Tx' AND wasm._contract_address='%s'", w.contractAddress)}
command := &clientRequest{
JSONRPC: "2.0",
Method: "subscribe",
Params: params,
ID: 1,
}
err = wsjson.Write(ctx, c, command)
if err != nil {
ibcErrors.WithLabelValues("websocket_subscription_error").Inc()
return fmt.Errorf("failed to subscribe to events: %w", err)
}
// Wait for the success response.
_, subResp, err := c.Read(ctx)
if err != nil {
ibcErrors.WithLabelValues("websocket_subscription_error").Inc()
return fmt.Errorf("failed to receive response to subscribe request: %w", err)
}
if strings.Contains(string(subResp), "error") {
ibcErrors.WithLabelValues("websocket_subscription_error").Inc()
return fmt.Errorf("failed to subscribe to events, response: %s", string(subResp))
}
// Start a routine to listen for messages from the contract.
common.RunWithScissors(ctx, errC, "ibc_data_pump", func(ctx context.Context) error {
return w.handleEvents(ctx, c)
})
// Start a routine to periodically query the wormchain block height.
common.RunWithScissors(ctx, errC, "ibc_block_height", func(ctx context.Context) error {
return w.handleQueryBlockHeight(ctx, c)
})
// Start a routine for each chain to listen for observation requests.
for _, ce := range w.chainMap {
common.RunWithScissors(ctx, errC, "ibc_objs_req", func(ctx context.Context) error {
return w.handleObservationRequests(ctx, ce)
})
}
// Signal to the supervisor that this runnable has finished initialization.
supervisor.Signal(ctx, supervisor.SignalHealthy)
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
return err
}
}
// handleEvents reads messages from the IBC receiver contract and processes them.
func (w *Watcher) handleEvents(ctx context.Context, c *websocket.Conn) error {
for {
select {
case <-ctx.Done():
return nil
default:
_, message, err := c.Read(ctx)
if err != nil {
w.logger.Error("failed to read socket", zap.Error(err))
ibcErrors.WithLabelValues("channel_read_error").Inc()
return fmt.Errorf("failed to read socket: %w", err)
}
// Received a message from the blockchain.
json := string(message)
txHashRaw := gjson.Get(json, "result.events.tx\\.hash.0")
if !txHashRaw.Exists() {
w.logger.Warn("message does not have tx hash", zap.String("payload", json))
continue
}
txHash, err := vaa.StringToHash(txHashRaw.String())
if err != nil {
w.logger.Warn("failed to parse txHash", zap.String("txHash", txHashRaw.String()), zap.Error(err))
continue
}
events := gjson.Get(json, "result.data.value.TxResult.result.events")
if !events.Exists() {
w.logger.Warn("message has no events", zap.String("payload", json))
continue
}
for _, event := range events.Array() {
if !event.IsObject() {
w.logger.Warn("event is invalid", zap.Stringer("tx_hash", txHash), zap.String("event", event.String()))
continue
}
eventType := gjson.Get(event.String(), "type").String()
if eventType == "wasm" {
evt, err := parseIbcReceivePublishEvent(w.logger, w.contractAddress, event, txHash)
if err != nil {
w.logger.Error("failed to parse wasm event", zap.Error(err), zap.String("event", event.String()))
continue
}
if evt != nil {
if err := w.processIbcReceivePublishEvent(txHash, evt, "new"); err != nil {
return fmt.Errorf("failed to process new IBC event: %w", err)
}
}
} else {
w.logger.Debug("ignoring uninteresting event", zap.String("eventType", eventType))
}
}
}
}
}
// handleQueryBlockHeight gets the latest block height from wormchain each interval and updates the status on all the connected chains.
func (w *Watcher) handleQueryBlockHeight(ctx context.Context, c *websocket.Conn) error {
const latestBlockURL = "blocks/latest"
t := time.NewTicker(5 * time.Second)
client := &http.Client{
Timeout: time.Second * 5,
}
for {
select {
case <-ctx.Done():
return nil
case <-t.C:
resp, err := client.Get(fmt.Sprintf("%s/%s", w.lcdUrl, latestBlockURL))
if err != nil {
return fmt.Errorf("failed to query latest block: %w", err)
}
blocksBody, err := io.ReadAll(resp.Body)
if err != nil {
resp.Body.Close()
return fmt.Errorf("failed to read latest block body: %w", err)
}
resp.Body.Close()
blockJSON := string(blocksBody)
latestBlockAsInt := gjson.Get(blockJSON, "block.header.height").Int()
latestBlockAsFloat := float64(latestBlockAsInt)
w.logger.Debug("current block height", zap.Int64("height", latestBlockAsInt))
for _, ce := range w.chainMap {
currentSlotHeight.WithLabelValues(ce.chainName).Set(latestBlockAsFloat)
p2p.DefaultRegistry.SetNetworkStats(ce.chainID, &gossipv1.Heartbeat_Network{
Height: latestBlockAsInt,
ContractAddress: w.contractAddress,
})
readiness.SetReady(ce.readiness)
}
readiness.SetReady(common.ReadinessIBCSyncing)
}
}
}
// handleObservationRequests listens for observation requests for a single chain and processes them by reading the requested transaction
// from wormchain and publishing the associated message. This function is instantiated for each connected chain.
func (w *Watcher) handleObservationRequests(ctx context.Context, ce *chainEntry) error {
for {
select {
case <-ctx.Done():
return nil
case r := <-ce.obsvReqC:
if vaa.ChainID(r.ChainId) != ce.chainID {
panic("invalid chain ID")
}
reqTxHashStr := hex.EncodeToString(r.TxHash)
w.logger.Info("received observation request", zap.String("chain", ce.chainName), zap.String("txHash", reqTxHashStr))
client := &http.Client{
Timeout: time.Second * 5,
}
// Query for tx by hash.
resp, err := client.Get(fmt.Sprintf("%s/cosmos/tx/v1beta1/txs/%s", w.lcdUrl, reqTxHashStr))
if err != nil {
w.logger.Error("query tx response error", zap.String("chain", ce.chainName), zap.Error(err))
continue
}
txBody, err := io.ReadAll(resp.Body)
if err != nil {
w.logger.Error("query tx response read error", zap.String("chain", ce.chainName), zap.Error(err))
resp.Body.Close()
continue
}
resp.Body.Close()
txJSON := string(txBody)
txHashRaw := gjson.Get(txJSON, "tx_response.txhash")
if !txHashRaw.Exists() {
w.logger.Error("tx does not have tx hash", zap.String("chain", ce.chainName), zap.String("payload", txJSON))
continue
}
txHashStr := txHashRaw.String()
txHash, err := vaa.StringToHash(txHashStr)
if err != nil {
w.logger.Error("tx does not have tx hash", zap.String("chain", ce.chainName), zap.String("payload", txJSON))
continue
}
events := gjson.Get(txJSON, "tx_response.events")
if !events.Exists() {
w.logger.Error("tx has no events", zap.String("chain", ce.chainName), zap.String("payload", txJSON))
continue
}
for _, event := range events.Array() {
if !event.IsObject() {
w.logger.Error("event is invalid", zap.String("chain", ce.chainName), zap.String("tx_hash", txHashStr), zap.String("event", event.String()))
continue
}
eventType := gjson.Get(event.String(), "type")
if eventType.String() == "wasm" {
w.logger.Debug("found wasm event in reobservation", zap.String("chain", ce.chainName), zap.Stringer("txHash", txHash))
evt, err := parseIbcReceivePublishEvent(w.logger, w.contractAddress, event, txHash)
if err != nil {
w.logger.Error("failed to parse wasm event", zap.String("chain", ce.chainName), zap.Error(err), zap.Any("event", event))
continue
}
if evt != nil {
if err := w.processIbcReceivePublishEvent(txHash, evt, "reobservation"); err != nil {
return fmt.Errorf("failed to process reobserved IBC event: %w", err)
}
}
} else {
w.logger.Debug("ignoring uninteresting event in reobservation", zap.String("chain", ce.chainName), zap.Stringer("txHash", txHash), zap.String("eventType", eventType.String()))
}
}
}
}
}
// parseIbcReceivePublishEvent parses a wasm event into an object. Since the watcher only subscribes to events from a single contract, this function returns an error
// if the contract does not match the desired one. However, since the contract publishes multiple action types, this function returns nil rather than an error
// if the event is not for the desired action.
func parseIbcReceivePublishEvent(logger *zap.Logger, desiredContract string, event gjson.Result, txHash ethCommon.Hash) (*ibcReceivePublishEvent, error) {
var attributes WasmAttributes
err := attributes.Parse(logger, event)
if err != nil {
logger.Error("failed to parse event attributes", zap.Error(err), zap.String("event", event.String()))
return nil, fmt.Errorf("failed to parse attributes: %w", err)
}
str, err := attributes.GetAsString("_contract_address")
if err != nil {
return nil, err
}
if str != desiredContract {
return nil, fmt.Errorf("received an event from an unexpected contract: %s", str)
}
str, err = attributes.GetAsString("action")
if err != nil || str != "receive_publish" {
return nil, nil
}
evt := new(ibcReceivePublishEvent)
evt.Msg = new(common.MessagePublication)
evt.Msg.TxHash = txHash
evt.ChannelID, err = attributes.GetAsString("channel_id")
if err != nil {
return evt, err
}
unumber, err := attributes.GetAsUint("message.chain_id", 16)
if err != nil {
return evt, err
}
evt.Msg.EmitterChain = vaa.ChainID(unumber)
str, err = attributes.GetAsString("message.sender")
if err != nil {
return evt, err
}
evt.Msg.EmitterAddress, err = vaa.StringToAddress(str)
if err != nil {
return evt, fmt.Errorf("failed to parse message.sender attribute %s: %w", str, err)
}
unumber, err = attributes.GetAsUint("message.nonce", 32)
if err != nil {
return evt, err
}
evt.Msg.Nonce = uint32(unumber)
unumber, err = attributes.GetAsUint("message.sequence", 64)
if err != nil {
return evt, err
}
evt.Msg.Sequence = unumber
snumber, err := attributes.GetAsInt("message.block_time", 64)
if err != nil {
return evt, err
}
evt.Msg.Timestamp = time.Unix(snumber, 0)
str, err = attributes.GetAsString("message.message")
if err != nil {
return evt, err
}
evt.Msg.Payload, err = hex.DecodeString(str)
if err != nil {
return evt, fmt.Errorf("failed to parse message.message attribute %s: %w", str, err)
}
return evt, nil
}
// processIbcReceivePublishEvent takes an IBC event, maps it to a message publication and publishes it.
func (w *Watcher) processIbcReceivePublishEvent(txHash ethCommon.Hash, evt *ibcReceivePublishEvent, observationType string) error {
mappedChainID, err := w.getChainIdFromChannelID(evt.ChannelID)
if err != nil {
w.logger.Error("query for IBC channel ID failed",
zap.String("IbcChannelID", evt.ChannelID),
zap.Stringer("TxHash", evt.Msg.TxHash),
zap.Stringer("EmitterChain", evt.Msg.EmitterChain),
zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress),
zap.Uint64("Sequence", evt.Msg.Sequence),
zap.Uint32("Nonce", evt.Msg.Nonce),
zap.Stringer("Timestamp", evt.Msg.Timestamp),
zap.Uint8("ConsistencyLevel", evt.Msg.ConsistencyLevel),
zap.Error(err),
)
ibcErrors.WithLabelValues("query_error").Inc()
return fmt.Errorf("failed to query IBC channel ID mapping: %w", err)
}
if mappedChainID == vaa.ChainIDUnset {
// This can happen if the channel ID to chain ID mapping in the contract hasn't been updated yet (pending governance).
// Therefore we don't want to return an error here. Restarting won't help.
w.logger.Error(fmt.Sprintf("received %s message from unknown IBC channel, dropping observation", observationType),
zap.String("IbcChannelID", evt.ChannelID),
zap.Stringer("TxHash", evt.Msg.TxHash),
zap.Stringer("EmitterChain", evt.Msg.EmitterChain),
zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress),
zap.Uint64("Sequence", evt.Msg.Sequence),
zap.Uint32("Nonce", evt.Msg.Nonce),
zap.Stringer("Timestamp", evt.Msg.Timestamp),
zap.Uint8("ConsistencyLevel", evt.Msg.ConsistencyLevel),
)
ibcErrors.WithLabelValues("unexpected_ibc_channel_error").Inc()
return nil
}
ce, exists := w.chainMap[mappedChainID]
if !exists {
// This is not an error because some guardians may choose to run the full node and not listen to this chain over IBC.
w.logger.Debug(fmt.Sprintf("received %s message from an unconfigured chain, dropping observation", observationType),
zap.String("IbcChannelID", evt.ChannelID),
zap.Stringer("ChainID", mappedChainID),
zap.Stringer("TxHash", evt.Msg.TxHash),
zap.Stringer("EmitterChain", evt.Msg.EmitterChain),
zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress),
zap.Uint64("Sequence", evt.Msg.Sequence),
zap.Uint32("Nonce", evt.Msg.Nonce),
zap.Stringer("Timestamp", evt.Msg.Timestamp),
zap.Uint8("ConsistencyLevel", evt.Msg.ConsistencyLevel),
)
return nil
}
if evt.Msg.EmitterChain != ce.chainID {
w.logger.Error(fmt.Sprintf("chain id mismatch in %s message", observationType),
zap.String("IbcChannelID", evt.ChannelID),
zap.Uint16("MappedChainID", uint16(mappedChainID)),
zap.Uint16("ExpectedChainID", uint16(ce.chainID)),
zap.Stringer("TxHash", evt.Msg.TxHash),
zap.Stringer("EmitterChain", evt.Msg.EmitterChain),
zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress),
zap.Uint64("Sequence", evt.Msg.Sequence),
zap.Uint32("Nonce", evt.Msg.Nonce),
zap.Stringer("Timestamp", evt.Msg.Timestamp),
zap.Uint8("ConsistencyLevel", evt.Msg.ConsistencyLevel),
)
invalidChainIdMismatches.WithLabelValues(evt.ChannelID).Inc()
return nil // Don't return an error here because we don't want an external source to be able to kill the watcher.
}
w.logger.Info(fmt.Sprintf("%s message detected", observationType),
zap.String("IbcChannelID", evt.ChannelID),
zap.String("ChainName", ce.chainName),
zap.Stringer("TxHash", evt.Msg.TxHash),
zap.Stringer("EmitterChain", evt.Msg.EmitterChain),
zap.Stringer("EmitterAddress", evt.Msg.EmitterAddress),
zap.Uint64("Sequence", evt.Msg.Sequence),
zap.Uint32("Nonce", evt.Msg.Nonce),
zap.Stringer("Timestamp", evt.Msg.Timestamp),
zap.Uint8("ConsistencyLevel", evt.Msg.ConsistencyLevel),
)
ce.msgC <- evt.Msg
messagesConfirmed.WithLabelValues(ce.chainName).Inc()
return nil
}
// getChainIdFromChannelID returns the chain ID associated with the specified IBC channel. It uses a cache to avoid constantly querying
// wormchain. This works because once an IBC channel is closed its ID will never be reused. This also means that there could be multiple
// IBC channels for the same chain ID.
// See the IBC spec for details: https://github.com/cosmos/ibc/tree/main/spec/core/ics-004-channel-and-packet-semantics#closing-handshake
func (w *Watcher) getChainIdFromChannelID(channelID string) (vaa.ChainID, error) {
w.channelIdToChainIdLock.Lock()
defer w.channelIdToChainIdLock.Unlock()
chainID, exists := w.channelIdToChainIdMap[channelID]
if exists {
return chainID, nil
}
// We continue to hold the lock here because we don't want two routines (event handler and reobservation) both querying at the same time.
channelIdToChainIdMap, err := w.queryChannelIdToChainIdMapping()
if err != nil {
w.logger.Error("failed to query channelID to chainID mapping", zap.Error(err))
return vaa.ChainIDUnset, err
}
w.channelIdToChainIdMap = channelIdToChainIdMap
chainID, exists = w.channelIdToChainIdMap[channelID]
if exists {
return chainID, nil
}
return vaa.ChainIDUnset, nil
}
/*
This query:
`"all_channel_chains"` is `ImFsbF9jaGFubmVsX2NoYWlucyI=`
which becomes:
http://localhost:1319/cosmwasm/wasm/v1/contract/wormhole1nc5tatafv6eyq7llkr2gv50ff9e22mnf70qgjlv737ktmt4eswrq0kdhcj/smart/ImFsbF9jaGFubmVsX2NoYWlucyI%3D
Returns something like this:
{
"data": {
"channels_chains": [
[
"Y2hhbm5lbC0w",
18
]
]
}
}
*/
type ibcAllChannelChainsQueryResults struct {
Data struct {
ChannelChains [][]interface{} `json:"channels_chains"`
}
}
var allChannelChainsQuery = url.QueryEscape(base64.StdEncoding.EncodeToString([]byte(`"all_channel_chains"`)))
// queryChannelIdToChainIdMapping queries the contract for the set of IBC channels and their correspond chain IDs.
func (w *Watcher) queryChannelIdToChainIdMapping() (map[string]vaa.ChainID, error) {
client := &http.Client{
Timeout: time.Second * 5,
}
query := fmt.Sprintf(`%s/cosmwasm/wasm/v1/contract/%s/smart/%s`, w.lcdUrl, w.contractAddress, allChannelChainsQuery)
resp, err := client.Get(query)
if err != nil {
return nil, fmt.Errorf("query failed: %w", err)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read failed: %w", err)
}
resp.Body.Close()
var result ibcAllChannelChainsQueryResults
err = json.Unmarshal(body, &result)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %s, error: %w", string(body), err)
}
if len(result.Data.ChannelChains) == 0 {
return nil, fmt.Errorf("query did not return any data")
}
w.logger.Info("queried IBC channel ID mapping", zap.Int("numEntriesReturned", len(result.Data.ChannelChains)))
ret := make(map[string]vaa.ChainID)
for idx, entry := range result.Data.ChannelChains {
if len(entry) != 2 {
return nil, fmt.Errorf("channel map entry %d contains %d items when it should contain exactly two, json: %s", idx, len(entry), string(body))
}
channelIdBytes, err := base64.StdEncoding.DecodeString(entry[0].(string))
if err != nil {
return nil, fmt.Errorf("channel ID for entry %d is invalid base64: %s, err: %s", idx, entry[0], err)
}
channelID := string(channelIdBytes)
chainID := vaa.ChainID(entry[1].(float64))
ret[channelID] = chainID
w.logger.Info("IBC channel ID mapping", zap.String("channelID", channelID), zap.Uint16("chainID", uint16(chainID)))
}
return ret, nil
}

View File

@ -0,0 +1,215 @@
package ibc
import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"go.uber.org/zap"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
func TestParseIbcReceivePublishEvent(t *testing.T) {
logger := zap.NewNop()
eventJson := `{"type": "wasm","attributes": [` +
`{"key": "X2NvbnRyYWN0X2FkZHJlc3M=","value": "d29ybWhvbGUxbmM1dGF0YWZ2NmV5cTdsbGtyMmd2NTBmZjllMjJtbmY3MHFnamx2NzM3a3RtdDRlc3dycTBrZGhjag==","index": true},` +
`{"key": "YWN0aW9u", "value": "cmVjZWl2ZV9wdWJsaXNo", "index": true},` +
`{"key": "Y2hhbm5lbF9pZA==", "value": "Y2hhbm5lbC0w", "index": true},` +
`{"key": "bWVzc2FnZS5tZXNzYWdl","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwNA==","index": true},` +
`{"key": "bWVzc2FnZS5zZW5kZXI=","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMzU3NDMwNzQ5NTZjNzEwODAwZTgzMTk4MDExY2NiZDRkZGYxNTU2ZA==","index": true},` +
`{ "key": "bWVzc2FnZS5jaGFpbl9pZA==", "value": "MTg=", "index": true },` +
`{ "key": "bWVzc2FnZS5ub25jZQ==", "value": "MQ==", "index": true },` +
`{ "key": "bWVzc2FnZS5zZXF1ZW5jZQ==", "value": "Mg==", "index": true },` +
`{"key": "bWVzc2FnZS5ibG9ja190aW1l","value": "MTY4MDA5OTgxNA==","index": true},` +
`{"key": "bWVzc2FnZS5ibG9ja19oZWlnaHQ=","value": "MjYxMw==","index": true}` +
`]}`
require.Equal(t, true, gjson.Valid(eventJson))
event := gjson.Parse(eventJson)
contractAddress := "wormhole1nc5tatafv6eyq7llkr2gv50ff9e22mnf70qgjlv737ktmt4eswrq0kdhcj"
txHash, err := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b")
require.NoError(t, err)
evt, err := parseIbcReceivePublishEvent(logger, contractAddress, event, txHash)
require.NoError(t, err)
require.NotNil(t, evt)
expectedSender, err := vaa.StringToAddress("00000000000000000000000035743074956c710800e83198011ccbd4ddf1556d")
require.NoError(t, err)
expectedPayload, err := hex.DecodeString("0000000000000000000000000000000000000000000000000000000000000004")
require.NoError(t, err)
expectedResult := ibcReceivePublishEvent{
ChannelID: "channel-0",
Msg: &common.MessagePublication{
TxHash: txHash,
EmitterAddress: expectedSender,
EmitterChain: vaa.ChainIDTerra2,
Nonce: 1,
Sequence: 2,
Timestamp: time.Unix(1680099814, 0),
Payload: expectedPayload,
},
}
// Use DeepEqual() because the response contains pointers.
assert.True(t, reflect.DeepEqual(expectedResult, *evt))
}
func TestParseEventForWrongContract(t *testing.T) {
logger := zap.NewNop()
eventJson := `{"type": "wasm","attributes": [` +
`{"key": "X2NvbnRyYWN0X2FkZHJlc3M=","value": "d29ybWhvbGUxbmM1dGF0YWZ2NmV5cTdsbGtyMmd2NTBmZjllMjJtbmY3MHFnamx2NzM3a3RtdDRlc3dycTBrZGhjag==","index": true},` +
`{"key": "YWN0aW9u", "value": "cmVjZWl2ZV9wdWJsaXNo", "index": true},` +
`{"key": "Y2hhbm5lbF9pZA==", "value": "Y2hhbm5lbC0w", "index": true},` +
`{"key": "bWVzc2FnZS5tZXNzYWdl","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwNA==","index": true},` +
`{"key": "bWVzc2FnZS5zZW5kZXI=","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMzU3NDMwNzQ5NTZjNzEwODAwZTgzMTk4MDExY2NiZDRkZGYxNTU2ZA==","index": true},` +
`{ "key": "bWVzc2FnZS5jaGFpbl9pZA==", "value": "MTg=", "index": true },` +
`{ "key": "bWVzc2FnZS5ub25jZQ==", "value": "MQ==", "index": true },` +
`{ "key": "bWVzc2FnZS5zZXF1ZW5jZQ==", "value": "Mg==", "index": true },` +
`{"key": "bWVzc2FnZS5ibG9ja190aW1l","value": "MTY4MDA5OTgxNA==","index": true},` +
`{"key": "bWVzc2FnZS5ibG9ja19oZWlnaHQ=","value": "MjYxMw==","index": true}` +
`]}`
require.Equal(t, true, gjson.Valid(eventJson))
event := gjson.Parse(eventJson)
contractAddress := "someOtherContract"
txHash, err := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b")
require.NoError(t, err)
_, err = parseIbcReceivePublishEvent(logger, contractAddress, event, txHash)
assert.Error(t, err)
}
func TestParseEventForWrongAction(t *testing.T) {
logger := zap.NewNop()
eventJson := `{"type": "wasm","attributes": [` +
`{"key": "X2NvbnRyYWN0X2FkZHJlc3M=","value": "d29ybWhvbGUxbmM1dGF0YWZ2NmV5cTdsbGtyMmd2NTBmZjllMjJtbmY3MHFnamx2NzM3a3RtdDRlc3dycTBrZGhjag==","index": true},` +
`{"key": "YWN0aW9u", "value": "cmVjZWl2ZV9wa3Q=", "index": true},` + // Changed action value to "receive_pkt"
`{"key": "Y2hhbm5lbF9pZA==", "value": "Y2hhbm5lbC0w", "index": true},` +
`{"key": "bWVzc2FnZS5tZXNzYWdl","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwNA==","index": true},` +
`{"key": "bWVzc2FnZS5zZW5kZXI=","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMzU3NDMwNzQ5NTZjNzEwODAwZTgzMTk4MDExY2NiZDRkZGYxNTU2ZA==","index": true},` +
`{ "key": "bWVzc2FnZS5jaGFpbl9pZA==", "value": "MTg=", "index": true },` +
`{ "key": "bWVzc2FnZS5ub25jZQ==", "value": "MQ==", "index": true },` +
`{ "key": "bWVzc2FnZS5zZXF1ZW5jZQ==", "value": "Mg==", "index": true },` +
`{"key": "bWVzc2FnZS5ibG9ja190aW1l","value": "MTY4MDA5OTgxNA==","index": true},` +
`{"key": "bWVzc2FnZS5ibG9ja19oZWlnaHQ=","value": "MjYxMw==","index": true}` +
`]}`
require.Equal(t, true, gjson.Valid(eventJson))
event := gjson.Parse(eventJson)
contractAddress := "wormhole1nc5tatafv6eyq7llkr2gv50ff9e22mnf70qgjlv737ktmt4eswrq0kdhcj"
txHash, err := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b")
require.NoError(t, err)
evt, err := parseIbcReceivePublishEvent(logger, contractAddress, event, txHash)
require.NoError(t, err)
assert.Nil(t, evt)
}
func TestParseEventForNoContractSpecified(t *testing.T) {
logger := zap.NewNop()
eventJson := `{"type": "wasm","attributes": [` +
// No contract specified
`{"key": "YWN0aW9u", "value": "cmVjZWl2ZV9wdWJsaXNo", "index": true},` +
`{"key": "Y2hhbm5lbF9pZA==", "value": "Y2hhbm5lbC0w", "index": true},` +
`{"key": "bWVzc2FnZS5tZXNzYWdl","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwNA==","index": true},` +
`{"key": "bWVzc2FnZS5zZW5kZXI=","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMzU3NDMwNzQ5NTZjNzEwODAwZTgzMTk4MDExY2NiZDRkZGYxNTU2ZA==","index": true},` +
`{ "key": "bWVzc2FnZS5jaGFpbl9pZA==", "value": "MTg=", "index": true },` +
`{ "key": "bWVzc2FnZS5ub25jZQ==", "value": "MQ==", "index": true },` +
`{ "key": "bWVzc2FnZS5zZXF1ZW5jZQ==", "value": "Mg==", "index": true },` +
`{"key": "bWVzc2FnZS5ibG9ja190aW1l","value": "MTY4MDA5OTgxNA==","index": true},` +
`{"key": "bWVzc2FnZS5ibG9ja19oZWlnaHQ=","value": "MjYxMw==","index": true}` +
`]}`
require.Equal(t, true, gjson.Valid(eventJson))
event := gjson.Parse(eventJson)
contractAddress := "wormhole1nc5tatafv6eyq7llkr2gv50ff9e22mnf70qgjlv737ktmt4eswrq0kdhcj"
txHash, err := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b")
require.NoError(t, err)
_, err = parseIbcReceivePublishEvent(logger, contractAddress, event, txHash)
assert.Error(t, err)
}
func TestParseEventForNoActionSpecified(t *testing.T) {
logger := zap.NewNop()
eventJson := `{"type": "wasm","attributes": [` +
`{"key": "X2NvbnRyYWN0X2FkZHJlc3M=","value": "d29ybWhvbGUxbmM1dGF0YWZ2NmV5cTdsbGtyMmd2NTBmZjllMjJtbmY3MHFnamx2NzM3a3RtdDRlc3dycTBrZGhjag==","index": true},` +
// No action specified
`{"key": "Y2hhbm5lbF9pZA==", "value": "Y2hhbm5lbC0w", "index": true},` +
`{"key": "bWVzc2FnZS5tZXNzYWdl","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwNA==","index": true},` +
`{"key": "bWVzc2FnZS5zZW5kZXI=","value": "MDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMzU3NDMwNzQ5NTZjNzEwODAwZTgzMTk4MDExY2NiZDRkZGYxNTU2ZA==","index": true},` +
`{ "key": "bWVzc2FnZS5jaGFpbl9pZA==", "value": "MTg=", "index": true },` +
`{ "key": "bWVzc2FnZS5ub25jZQ==", "value": "MQ==", "index": true },` +
`{ "key": "bWVzc2FnZS5zZXF1ZW5jZQ==", "value": "Mg==", "index": true },` +
`{"key": "bWVzc2FnZS5ibG9ja190aW1l","value": "MTY4MDA5OTgxNA==","index": true},` +
`{"key": "bWVzc2FnZS5ibG9ja19oZWlnaHQ=","value": "MjYxMw==","index": true}` +
`]}`
require.Equal(t, true, gjson.Valid(eventJson))
event := gjson.Parse(eventJson)
contractAddress := "wormhole1nc5tatafv6eyq7llkr2gv50ff9e22mnf70qgjlv737ktmt4eswrq0kdhcj"
txHash, err := vaa.StringToHash("82ea2536c5d1671830cb49120f94479e34b54596a8dd369fbc2666667a765f4b")
require.NoError(t, err)
evt, err := parseIbcReceivePublishEvent(logger, contractAddress, event, txHash)
require.NoError(t, err)
assert.Nil(t, evt)
}
func TestParseIbcAllChannelChainsQueryResults(t *testing.T) {
respJson := []byte(`
{
"data": {
"channels_chains": [
[
"Y2hhbm5lbC0w",
18
],
[
"Y2hhbm5lbC00Mg==",
22
]
]
}
}
`)
var result ibcAllChannelChainsQueryResults
err := json.Unmarshal(respJson, &result)
require.NoError(t, err)
expectedChannStr1 := base64.StdEncoding.EncodeToString([]byte("channel-0"))
expectedChannStr2 := base64.StdEncoding.EncodeToString([]byte("channel-42"))
require.Equal(t, 2, len(result.Data.ChannelChains))
require.Equal(t, 2, len(result.Data.ChannelChains[0]))
assert.Equal(t, expectedChannStr1, result.Data.ChannelChains[0][0].(string))
assert.Equal(t, uint16(18), uint16(result.Data.ChannelChains[0][1].(float64)))
assert.Equal(t, expectedChannStr2, result.Data.ChannelChains[1][0].(string))
assert.Equal(t, uint16(22), uint16(result.Data.ChannelChains[1][1].(float64)))
}