From 40a638de4c830b863577816e6de3b57f39082154 Mon Sep 17 00:00:00 2001 From: Paul Noel Date: Mon, 17 Apr 2023 17:56:52 +0000 Subject: [PATCH] node: add sui watcher for devnet and testnet only --- node/cmd/guardiand/node.go | 34 +- node/pkg/watchers/sui/watcher.go | 532 +++++++++++++++++-------------- 2 files changed, 310 insertions(+), 256 deletions(-) diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 96389bd60..6ef81d038 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -160,10 +160,9 @@ var ( aptosAccount *string aptosHandle *string - suiRPC *string - suiWS *string - suiAccount *string - suiPackage *string + suiRPC *string + suiWS *string + suiMoveEventType *string solanaRPC *string @@ -317,8 +316,7 @@ func init() { suiRPC = NodeCmd.Flags().String("suiRPC", "", "sui RPC URL") suiWS = NodeCmd.Flags().String("suiWS", "", "sui WS URL") - suiAccount = NodeCmd.Flags().String("suiAccount", "", "sui account") - suiPackage = NodeCmd.Flags().String("suiPackage", "", "sui package") + suiMoveEventType = NodeCmd.Flags().String("suiMoveEventType", "", "sui move event type for publish_message") solanaRPC = NodeCmd.Flags().String("solanaRPC", "", "Solana RPC URL (required)") @@ -638,11 +636,8 @@ func runNode(cmd *cobra.Command, args []string) { if *suiWS == "" { logger.Fatal("If --suiRPC is specified, then --suiWS must be specified") } - if *suiAccount == "" { - logger.Fatal("If --suiRPC is specified, then --suiAccount must be specified") - } - if *suiPackage == "" && !*unsafeDevMode { - logger.Fatal("If --suiRPC is specified, then --suiPackage must be specified") + if *suiMoveEventType == "" { + logger.Fatal("If --suiRPC is specified, then --suiMoveEventType must be specified") } } @@ -1322,6 +1317,7 @@ func runNode(cmd *cobra.Command, args []string) { return err } } + if shouldStart(algorandIndexerRPC) { logger.Info("Starting Algorand watcher") common.MustRegisterReadinessSyncing(vaa.ChainIDAlgorand) @@ -1362,12 +1358,16 @@ func runNode(cmd *cobra.Command, args []string) { } if shouldStart(suiRPC) { - logger.Info("Starting Sui watcher") - common.MustRegisterReadinessSyncing(vaa.ChainIDSui) - chainObsvReqC[vaa.ChainIDSui] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) - if err := supervisor.Run(ctx, "suiwatch", - sui.NewWatcher(*suiRPC, *suiWS, *suiAccount, *suiPackage, *unsafeDevMode, chainMsgC[vaa.ChainIDSui], chainObsvReqC[vaa.ChainIDSui]).Run); err != nil { - return err + if !*unsafeDevMode && !*testnetMode { + logger.Fatal("Can only start Sui watcher in devnet or testnet") + } else { + logger.Info("Starting Sui watcher") + common.MustRegisterReadinessSyncing(vaa.ChainIDSui) + chainObsvReqC[vaa.ChainIDSui] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + if err := supervisor.Run(ctx, "suiwatch", + sui.NewWatcher(*suiRPC, *suiWS, *suiMoveEventType, *unsafeDevMode, chainMsgC[vaa.ChainIDSui], chainObsvReqC[vaa.ChainIDSui]).Run); err != nil { + return err + } } } diff --git a/node/pkg/watchers/sui/watcher.go b/node/pkg/watchers/sui/watcher.go index 2a4c2e88b..c3c253009 100644 --- a/node/pkg/watchers/sui/watcher.go +++ b/node/pkg/watchers/sui/watcher.go @@ -3,20 +3,19 @@ package sui import ( "context" "crypto/rand" - "encoding/binary" "errors" "fmt" "io" "math/big" "net/http" "net/url" + "strconv" "strings" "time" - "encoding/base64" "encoding/json" - "github.com/gorilla/websocket" + "nhooyr.io/websocket" "github.com/certusone/wormhole/node/pkg/common" "github.com/certusone/wormhole/node/pkg/p2p" @@ -28,6 +27,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/mr-tron/base58" "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) @@ -35,10 +35,9 @@ import ( type ( // Watcher is responsible for looking over Sui blockchain and reporting new transactions to the wormhole contract Watcher struct { - suiRPC string - suiWS string - suiAccount string - suiPackage string + suiRPC string + suiWS string + suiMoveEventType string unsafeDevMode bool @@ -46,29 +45,30 @@ type ( obsvReqC chan *gossipv1.ObservationRequest readinessSync readiness.Component - subId int64 - subscribed bool + subId int64 + } + + FieldsData struct { + ConsistencyLevel *uint8 `json:"consistency_level"` + Nonce *uint64 `json:"nonce"` + Payload []byte `json:"payload"` + Sender *string `json:"sender"` + Sequence *string `json:"sequence"` + Timestamp *string `json:"timestamp"` } SuiResult struct { - Timestamp *int64 `json:"timestamp"` - TxDigest *string `json:"txDigest"` - Event struct { - MoveEvent *struct { - PackageID *string `json:"packageId"` - TransactionModule *string `json:"transactionModule"` - Sender *string `json:"sender"` - Type *string `json:"type"` - Fields *struct { - ConsistencyLevel *uint8 `json:"consistency_level"` - Nonce *uint64 `json:"nonce"` - Payload *string `json:"payload"` - Sender *uint64 `json:"sender"` - Sequence *uint64 `json:"sequence"` - } `json:"fields"` - Bcs string `json:"bcs"` - } `json:"moveEvent"` - } `json:"event"` + ID struct { + TxDigest *string `json:"txDigest"` + EventSeq *string `json:"eventSeq"` + } `json:"id"` + PackageID *string `json:"packageId"` + TransactionModule *string `json:"transactionModule"` + Sender *string `json:"sender"` + Type *string `json:"type"` + Bcs *string `json:"bcs"` + Timestamp *string `json:"timestampMs"` + Fields *FieldsData `json:"parsedJson"` } SuiEventError struct { @@ -88,25 +88,49 @@ type ( } SuiTxnQuery struct { - Jsonrpc string `json:"jsonrpc"` - Result struct { - Data []SuiResult `json:"data"` - NextCursor interface{} `json:"nextCursor"` - } `json:"result"` - ID int `json:"id"` + Jsonrpc string `json:"jsonrpc"` + Result []SuiResult `json:"result"` + ID int `json:"id"` } + // { + // "jsonrpc": "2.0", + // "result": [ + // { + // "id": { + // "txDigest": "6Yff8smmPZMandj6Psjy6wgZv5Deii78o1Sbghh5sHPA", + // "eventSeq": "0" + // }, + // "packageId": "0x8b04a73ab3cb1e36bee5a86fdbfa481e97d3cc7ce8b594edea1400103ff0134d", + // "transactionModule": "sender", + // "sender": "0xed867315e3f7c83ae82e6d5858b6a6cc57c291fd84f7509646ebc8162169cf96", + // "type": "0x7483d0db53a140eed72bd6cb133daa59c539844f4c053924b9e3f0d2d7ba146d::publish_message::WormholeMessage", + // "parsedJson": { + // "consistency_level": 0, + // "nonce": 0, + // "payload": [104, 101, 108, 108, 111], + // "sender": "0x71c2aa2c549bb7381e88fbeca7eeb791be0afd455c8af9184613ce5db5ddba47", + // "sequence": "0", + // "timestamp": "1681411389" + // }, + // "bcs": "5ZuknLT3Xsicr2D8zyk828thPByMBfR1cPJyEHF67k16AcEotDWhrpCDCTbk6BBbpSSs3bUk3msfADzrs" + // } + // ], + // "id": 1 + // } - SuiCommitteeInfo struct { + SuiCheckpointSN struct { Jsonrpc string `json:"jsonrpc"` - Result struct { - Epoch int `json:"epoch"` - CommitteeInfo [][]interface{} `json:"committee_info"` - } `json:"result"` - ID int `json:"id"` + Result string `json:"result"` + ID int `json:"id"` } ) var ( + suiConnectionErrors = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "wormhole_sui_connection_errors_total", + Help: "Total number of SUI connection errors", + }, []string{"reason"}) suiMessagesConfirmed = promauto.NewCounter( prometheus.CounterOpts{ Name: "wormhole_sui_observations_confirmed_total", @@ -123,91 +147,89 @@ var ( func NewWatcher( suiRPC string, suiWS string, - suiAccount string, - suiPackage string, + suiMoveEventType string, unsafeDevMode bool, messageEvents chan *common.MessagePublication, obsvReqC chan *gossipv1.ObservationRequest, ) *Watcher { return &Watcher{ - suiRPC: suiRPC, - suiWS: suiWS, - suiAccount: suiAccount, - suiPackage: suiPackage, - unsafeDevMode: unsafeDevMode, - msgChan: messageEvents, - obsvReqC: obsvReqC, - readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDSui), - subId: 0, - subscribed: false, + suiRPC: suiRPC, + suiWS: suiWS, + suiMoveEventType: suiMoveEventType, + unsafeDevMode: unsafeDevMode, + msgChan: messageEvents, + obsvReqC: obsvReqC, + readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDSui), + subId: 0, } } func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult) error { - if (body.Timestamp == nil) || (body.TxDigest == nil) { - return errors.New("Missing event fields") + if body.ID.TxDigest == nil { + return errors.New("Missing TxDigest field") } - if body.Event.MoveEvent == nil { - return nil - } - moveEvent := *body.Event.MoveEvent - if (moveEvent.PackageID == nil) || (moveEvent.Sender == nil) { - return errors.New("Missing event fields") + if body.Type == nil { + return errors.New("Missing Type field") } - if moveEvent.Fields == nil { + // There may be moveEvents caught without these params. + // So, not necessarily an error. + if body.Fields == nil { return nil } - fields := *moveEvent.Fields + + if e.suiMoveEventType != *body.Type { + logger.Info("type mismatch", zap.String("e.suiMoveEventType", e.suiMoveEventType), zap.String("type", *body.Type)) + return errors.New("type mismatch") + } + + fields := *body.Fields if (fields.ConsistencyLevel == nil) || (fields.Nonce == nil) || (fields.Payload == nil) || (fields.Sender == nil) || (fields.Sequence == nil) { + logger.Info("Missing required fields in event.") return nil } - if e.suiAccount != *moveEvent.Sender { - logger.Info("account mismatch", zap.String("e.suiAccount", e.suiAccount), zap.String("account", *moveEvent.Sender)) - return errors.New("account mismatch") - } - - if !e.unsafeDevMode && e.suiPackage != *moveEvent.PackageID { - logger.Info("package mismatch", zap.String("e.suiPackage", e.suiPackage), zap.String("package", *moveEvent.PackageID)) - return errors.New("package mismatch") - } - - emitter := make([]byte, 8) - binary.BigEndian.PutUint64(emitter, *fields.Sender) - - var a vaa.Address - copy(a[24:], emitter) - - id, err := base64.StdEncoding.DecodeString(*body.TxDigest) + emitter, err := vaa.StringToAddress(*fields.Sender) if err != nil { return err } - var txHash = eth_common.BytesToHash(id) // 32 bytes = d3b136a6a182a40554b2fafbc8d12a7a22737c10c81e33b33d1dcb74c532708b - - pl, err := base64.StdEncoding.DecodeString(*fields.Payload) + txHashBytes, err := base58.Decode(*body.ID.TxDigest) if err != nil { return err + } + if len(txHashBytes) != 32 { + logger.Error( + "Transaction hash is not 32 bytes", + zap.String("error_type", "malformed_wormhole_event"), + zap.String("log_msg_type", "tx_processing_error"), + zap.String("txHash", *body.ID.TxDigest), + ) + return errors.New("Transaction hash is not 32 bytes") + } + + txHashEthFormat := eth_common.BytesToHash(txHashBytes) + + seq, err := strconv.ParseUint(*fields.Sequence, 10, 64) + if err != nil { + logger.Info("Sequence decode error", zap.String("Sequence", *fields.Sequence)) + return err + } + ts, err := strconv.ParseInt(*fields.Timestamp, 10, 64) + if err != nil { + logger.Info("Timestamp decode error", zap.String("Timestamp", *fields.Timestamp)) + return err } observation := &common.MessagePublication{ - TxHash: txHash, - // We do NOT have a useful source of timestamp - // information. Every node has its own concept of a - // timestamp and nothing is persisted into the - // blockchain to make re-observation possible. Later - // we could explore putting the epoch or block height - // here but even those are currently not available. - // - // Timestamp: time.Unix(int64(timestamp.Uint()/1000), 0), - Timestamp: time.Unix(0, 0), - Nonce: uint32(*fields.Nonce), // uint32 - Sequence: *fields.Sequence, + TxHash: txHashEthFormat, + Timestamp: time.Unix(ts, 0), + Nonce: uint32(*fields.Nonce), + Sequence: seq, EmitterChain: vaa.ChainIDSui, - EmitterAddress: a, - Payload: pl, + EmitterAddress: emitter, + Payload: fields.Payload, ConsistencyLevel: uint8(*fields.ConsistencyLevel), } @@ -231,7 +253,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult) error { func (e *Watcher) Run(ctx context.Context) error { p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSui, &gossipv1.Heartbeat_Network{ - ContractAddress: e.suiAccount, + ContractAddress: e.suiMoveEventType, }) logger := supervisor.Logger(ctx) @@ -239,181 +261,213 @@ func (e *Watcher) Run(ctx context.Context) error { u := url.URL{Scheme: "ws", Host: e.suiWS} logger.Info("Sui watcher connecting to WS node ", zap.String("url", u.String())) + logger.Debug("SUI watcher:", zap.String("suiRPC", e.suiRPC), zap.String("suiWS", e.suiWS), zap.String("suiMoveEventType", e.suiMoveEventType)) - ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + ws, _, err := websocket.Dial(ctx, u.String(), nil) if err != nil { - logger.Error(fmt.Sprintf("e.suiWS: %s", err.Error())) - return err + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) + suiConnectionErrors.WithLabelValues("websocket_dial_error").Inc() + return fmt.Errorf("websocket dial failed: %w", err) } - - var s string + defer ws.Close(websocket.StatusNormalClosure, "") nBig, _ := rand.Int(rand.Reader, big.NewInt(27)) e.subId = nBig.Int64() - if e.unsafeDevMode { - // There is no way to have a fixed package id on - // deployment. This means that in devnet, everytime - // we publish the contracts we will get a new package - // id. The solution is to just subscribe to the whole - // deployer account instead of to a specific package - // in that account... - s = fmt.Sprintf(`{"jsonrpc":"2.0", "id": %d, "method": "sui_subscribeEvent", "params": [{"SenderAddress": "%s"}]}`, e.subId, e.suiAccount) - } else { - s = fmt.Sprintf(`{"jsonrpc":"2.0", "id": %d, "method": "sui_subscribeEvent", "params": [{"SenderAddress": "%s", "Package": "%s"}]}`, e.subId, e.suiAccount, e.suiPackage) + subscription := fmt.Sprintf(`{"jsonrpc":"2.0", "id": %d, "method": "suix_subscribeEvent", "params": [{"MoveEventType": "%s"}]}`, e.subId, e.suiMoveEventType) + + logger.Debug("Subscribing using", zap.String("json:", subscription)) + + err = ws.Write(ctx, websocket.MessageText, []byte(subscription)) + if err != nil { + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) + suiConnectionErrors.WithLabelValues("websocket_subscription_error").Inc() + return fmt.Errorf("websocket subscription failed: %w", err) } - - logger.Info("Subscribing using", zap.String("filter", s)) - - if err := ws.WriteMessage(websocket.TextMessage, []byte(s)); err != nil { - logger.Error(fmt.Sprintf("write: %s", err.Error())) - return err + // Wait for the success response + mType, p, err := ws.Read(ctx) + if err != nil { + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) + suiConnectionErrors.WithLabelValues("event_subscription_error").Inc() + return fmt.Errorf("event subscription failed: %w", err) } + var subRes map[string]any + err = json.Unmarshal(p, &subRes) + if err != nil { + return fmt.Errorf("failed to Unmarshal the subscription result: %w", err) + } + logger.Debug("Unmarshalled json", zap.Any("subRes", subRes)) + actualResult := subRes["result"] + logger.Debug("actualResult", zap.Any("res", actualResult)) + if actualResult == nil { + return fmt.Errorf("Failed to request filter in subscription request") + } + logger.Debug("subscribed to new transaction events", zap.Int("messageType", int(mType)), zap.String("bytes", string(p))) - timer := time.NewTicker(time.Second * 1) + timer := time.NewTicker(time.Second * 5) defer timer.Stop() - supervisor.Signal(ctx, supervisor.SignalHealthy) - errC := make(chan error) - defer close(errC) pumpData := make(chan []byte) defer close(pumpData) - go func() { + supervisor.Signal(ctx, supervisor.SignalHealthy) + readiness.SetReady(e.readinessSync) + + common.RunWithScissors(ctx, errC, "sui_data_pump", func(ctx context.Context) error { for { - if _, msg, err := ws.ReadMessage(); err != nil { - logger.Error(fmt.Sprintf("ReadMessage: '%s'", err.Error())) - if strings.HasSuffix(err.Error(), "EOF") { - errC <- err - return + select { + case <-ctx.Done(): + logger.Error("sui_data_pump context done") + return ctx.Err() + + default: + _, msg, err := ws.Read(ctx) + if err != nil { + logger.Error(fmt.Sprintf("ReadMessage: '%s'", err.Error())) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) + suiConnectionErrors.WithLabelValues("channel_read_error").Inc() + return err + } + + var res SuiEventMsg + err = json.Unmarshal(msg, &res) + if err != nil { + logger.Error("Failed to unmarshal SuiEventMsg", zap.String("body", string(msg)), zap.Error(err)) + return fmt.Errorf("Failed to unmarshal SuiEventMsg, body: %s, error: %w", string(msg), err) + } + if res.Error != nil { + return fmt.Errorf("Bad SuiEventMsg, body: %s, error: %w", string(msg), err) + } + logger.Debug("SUI result message", zap.String("message", string(msg)), zap.Any("event", res)) + if res.ID != nil { + logger.Error("Found an unexpected res.ID") + continue + } + + if res.Params != nil && (*res.Params).Result != nil { + err := e.inspectBody(logger, *(*res.Params).Result) + if err != nil { + logger.Error(fmt.Sprintf("inspectBody: %s", err.Error())) + } + continue } - } else { - pumpData <- msg } } - }() + }) - for { - select { - case err := <-errC: - _ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) - logger.Error("Pump died") - return err - case <-ctx.Done(): - _ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) - return ctx.Err() - case r := <-e.obsvReqC: - if vaa.ChainID(r.ChainId) != vaa.ChainIDSui { - panic("invalid chain ID") - } + common.RunWithScissors(ctx, errC, "sui_block_height", func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + logger.Error("sui_block_height context done") + return ctx.Err() - id := base64.StdEncoding.EncodeToString(r.TxHash) - - logger.Info("obsv request", zap.String("TxHash", string(id))) - - buf := fmt.Sprintf(`{"jsonrpc":"2.0", "id": 1, "method": "sui_getEvents", "params": [{"Transaction": "%s"}, null, 10, true]}`, id) - - resp, err := http.Post(e.suiRPC, "application/json", strings.NewReader(buf)) - if err != nil { - logger.Error("getEvents API failed", zap.String("suiRPC", e.suiRPC), zap.Error(err)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) - continue - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - logger.Error("unexpected truncated body when calling getEvents", zap.Error(err)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) - continue - - } - resp.Body.Close() - - logger.Info("receive", zap.String("body", string(body))) - - var res SuiTxnQuery - err = json.Unmarshal(body, &res) - if err != nil { - logger.Error("failed to unmarshal event message", zap.Error(err)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) - continue - - } - - for _, chunk := range res.Result.Data { - err := e.inspectBody(logger, chunk) + case <-timer.C: + resp, err := http.Post(e.suiRPC, "application/json", strings.NewReader(`{"jsonrpc":"2.0", "id": 1, "method": "sui_getLatestCheckpointSequenceNumber", "params": []}`)) if err != nil { - logger.Error("unspected error while parsing chunk data in event", zap.Error(err)) + logger.Error("sui_getLatestCheckpointSequenceNumber failed", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) + return fmt.Errorf("sui_getLatestCheckpointSequenceNumber failed to post: %w", err) } - } - case msg := <-pumpData: - logger.Info("receive", zap.String("body", string(msg))) - - var res SuiEventMsg - err = json.Unmarshal(msg, &res) - if err != nil { - logger.Error("Failed to unmarshal SuiEventMsg", zap.String("body", string(msg)), zap.Error(err)) - continue - } - if res.Error != nil { - _ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) - return errors.New((*res.Error).Message) - } - if res.ID != nil { - if *res.ID == e.subId { - logger.Debug("Subscribed set to true") - e.subscribed = true - } - continue - } - - if res.Params != nil && (*res.Params).Result != nil { - err := e.inspectBody(logger, *(*res.Params).Result) + body, err := io.ReadAll(resp.Body) if err != nil { - logger.Error(fmt.Sprintf("inspectBody: %s", err.Error())) + logger.Error("sui_getLatestCheckpointSequenceNumber failed", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) + return fmt.Errorf("sui_getLatestCheckpointSequenceNumber failed to read: %w", err) } - continue - } + resp.Body.Close() + logger.Debug("Body before unmarshalling", zap.String("body", string(body))) - case <-timer.C: - resp, err := http.Post(e.suiRPC, "application/json", strings.NewReader(`{"jsonrpc":"2.0", "id": 1, "method": "sui_getCommitteeInfo", "params": []}`)) - if err != nil { - logger.Error("sui_getCommitteeInfo failed", zap.Error(err)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) - break + var res SuiCheckpointSN + err = json.Unmarshal(body, &res) + if err != nil { + logger.Error("unmarshal failed into uint64", zap.String("body", string(body)), zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) + return fmt.Errorf("sui_getLatestCheckpointSequenceNumber failed to unmarshal body: %s, error: %w", string(body), err) + } - } - body, err := io.ReadAll(resp.Body) - if err != nil { - logger.Error("sui_getCommitteeInfo failed", zap.Error(err)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) - break - } - resp.Body.Close() + height, pErr := strconv.ParseInt(res.Result, 0, 64) + if pErr != nil { + logger.Error("Failed to ParseInt") + } else { + currentSuiHeight.Set(float64(height)) + logger.Debug("sui_getLatestCheckpointSequenceNumber", zap.String("result", res.Result)) - var res SuiCommitteeInfo - err = json.Unmarshal(body, &res) - if err != nil { - logger.Error("unmarshal failed into SuiCommitteeInfo", zap.String("body", string(body)), zap.Error(err)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) - continue + p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSui, &gossipv1.Heartbeat_Network{ + Height: int64(height), + ContractAddress: e.suiMoveEventType, + }) + } - } - - // Epoch is currently not ticking in 0.16.0. They also - // might release another API that gives a - // proper block height as we traditionally - // understand it... - currentSuiHeight.Set(float64(res.Result.Epoch)) - p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSui, &gossipv1.Heartbeat_Network{ - Height: int64(res.Result.Epoch), - ContractAddress: e.suiAccount, - }) - - if e.subscribed { readiness.SetReady(e.readinessSync) } } + }) + + common.RunWithScissors(ctx, errC, "sui_fetch_obvs_req", func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + logger.Error("sui_fetch_obvs_req context done") + return ctx.Err() + case r := <-e.obsvReqC: + if vaa.ChainID(r.ChainId) != vaa.ChainIDSui { + panic("invalid chain ID") + } + + tx58 := base58.Encode(r.TxHash) + + buf := fmt.Sprintf(`{"jsonrpc":"2.0", "id": 1, "method": "sui_getEvents", "params": ["%s"]}`, tx58) + logger.Error(buf) + + resp, err := http.Post(e.suiRPC, "application/json", strings.NewReader(buf)) + if err != nil { + logger.Error("getEvents API failed", zap.String("suiRPC", e.suiRPC), zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) + continue + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + logger.Error("unexpected truncated body when calling getEvents", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) + return fmt.Errorf("sui__fetch_obvs_req failed to post: %w", err) + + } + resp.Body.Close() + + logger.Debug("receive", zap.String("body", string(body))) + + if strings.Contains(string(body), "error") { + logger.Error("Failed to get events for re-observation request", zap.String("Result", string(body))) + continue + } + var res SuiTxnQuery + err = json.Unmarshal(body, &res) + if err != nil { + logger.Error("failed to unmarshal event message", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1) + return fmt.Errorf("sui__fetch_obvs_req failed to unmarshal: %w", err) + + } + + for i, chunk := range res.Result { + err := e.inspectBody(logger, chunk) + if err != nil { + logger.Info("skipping event data in result", zap.String("txhash", tx58), zap.Int("index", i), zap.Error(err)) + } + } + } + } + }) + + select { + case <-ctx.Done(): + _ = ws.Close(websocket.StatusNormalClosure, "") + return ctx.Err() + case err := <-errC: + _ = ws.Close(websocket.StatusInternalError, err.Error()) + return err } }