node: add sui watcher for devnet and testnet only

This commit is contained in:
Paul Noel 2023-04-17 17:56:52 +00:00 committed by Evan Gray
parent 7640539a44
commit 40a638de4c
2 changed files with 310 additions and 256 deletions

View File

@ -160,10 +160,9 @@ var (
aptosAccount *string
aptosHandle *string
suiRPC *string
suiWS *string
suiAccount *string
suiPackage *string
suiRPC *string
suiWS *string
suiMoveEventType *string
solanaRPC *string
@ -317,8 +316,7 @@ func init() {
suiRPC = NodeCmd.Flags().String("suiRPC", "", "sui RPC URL")
suiWS = NodeCmd.Flags().String("suiWS", "", "sui WS URL")
suiAccount = NodeCmd.Flags().String("suiAccount", "", "sui account")
suiPackage = NodeCmd.Flags().String("suiPackage", "", "sui package")
suiMoveEventType = NodeCmd.Flags().String("suiMoveEventType", "", "sui move event type for publish_message")
solanaRPC = NodeCmd.Flags().String("solanaRPC", "", "Solana RPC URL (required)")
@ -638,11 +636,8 @@ func runNode(cmd *cobra.Command, args []string) {
if *suiWS == "" {
logger.Fatal("If --suiRPC is specified, then --suiWS must be specified")
}
if *suiAccount == "" {
logger.Fatal("If --suiRPC is specified, then --suiAccount must be specified")
}
if *suiPackage == "" && !*unsafeDevMode {
logger.Fatal("If --suiRPC is specified, then --suiPackage must be specified")
if *suiMoveEventType == "" {
logger.Fatal("If --suiRPC is specified, then --suiMoveEventType must be specified")
}
}
@ -1322,6 +1317,7 @@ func runNode(cmd *cobra.Command, args []string) {
return err
}
}
if shouldStart(algorandIndexerRPC) {
logger.Info("Starting Algorand watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDAlgorand)
@ -1362,12 +1358,16 @@ func runNode(cmd *cobra.Command, args []string) {
}
if shouldStart(suiRPC) {
logger.Info("Starting Sui watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDSui)
chainObsvReqC[vaa.ChainIDSui] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "suiwatch",
sui.NewWatcher(*suiRPC, *suiWS, *suiAccount, *suiPackage, *unsafeDevMode, chainMsgC[vaa.ChainIDSui], chainObsvReqC[vaa.ChainIDSui]).Run); err != nil {
return err
if !*unsafeDevMode && !*testnetMode {
logger.Fatal("Can only start Sui watcher in devnet or testnet")
} else {
logger.Info("Starting Sui watcher")
common.MustRegisterReadinessSyncing(vaa.ChainIDSui)
chainObsvReqC[vaa.ChainIDSui] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
if err := supervisor.Run(ctx, "suiwatch",
sui.NewWatcher(*suiRPC, *suiWS, *suiMoveEventType, *unsafeDevMode, chainMsgC[vaa.ChainIDSui], chainObsvReqC[vaa.ChainIDSui]).Run); err != nil {
return err
}
}
}

View File

@ -3,20 +3,19 @@ package sui
import (
"context"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io"
"math/big"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"encoding/base64"
"encoding/json"
"github.com/gorilla/websocket"
"nhooyr.io/websocket"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
@ -28,6 +27,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/mr-tron/base58"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
@ -35,10 +35,9 @@ import (
type (
// Watcher is responsible for looking over Sui blockchain and reporting new transactions to the wormhole contract
Watcher struct {
suiRPC string
suiWS string
suiAccount string
suiPackage string
suiRPC string
suiWS string
suiMoveEventType string
unsafeDevMode bool
@ -46,29 +45,30 @@ type (
obsvReqC chan *gossipv1.ObservationRequest
readinessSync readiness.Component
subId int64
subscribed bool
subId int64
}
FieldsData struct {
ConsistencyLevel *uint8 `json:"consistency_level"`
Nonce *uint64 `json:"nonce"`
Payload []byte `json:"payload"`
Sender *string `json:"sender"`
Sequence *string `json:"sequence"`
Timestamp *string `json:"timestamp"`
}
SuiResult struct {
Timestamp *int64 `json:"timestamp"`
TxDigest *string `json:"txDigest"`
Event struct {
MoveEvent *struct {
PackageID *string `json:"packageId"`
TransactionModule *string `json:"transactionModule"`
Sender *string `json:"sender"`
Type *string `json:"type"`
Fields *struct {
ConsistencyLevel *uint8 `json:"consistency_level"`
Nonce *uint64 `json:"nonce"`
Payload *string `json:"payload"`
Sender *uint64 `json:"sender"`
Sequence *uint64 `json:"sequence"`
} `json:"fields"`
Bcs string `json:"bcs"`
} `json:"moveEvent"`
} `json:"event"`
ID struct {
TxDigest *string `json:"txDigest"`
EventSeq *string `json:"eventSeq"`
} `json:"id"`
PackageID *string `json:"packageId"`
TransactionModule *string `json:"transactionModule"`
Sender *string `json:"sender"`
Type *string `json:"type"`
Bcs *string `json:"bcs"`
Timestamp *string `json:"timestampMs"`
Fields *FieldsData `json:"parsedJson"`
}
SuiEventError struct {
@ -88,25 +88,49 @@ type (
}
SuiTxnQuery struct {
Jsonrpc string `json:"jsonrpc"`
Result struct {
Data []SuiResult `json:"data"`
NextCursor interface{} `json:"nextCursor"`
} `json:"result"`
ID int `json:"id"`
Jsonrpc string `json:"jsonrpc"`
Result []SuiResult `json:"result"`
ID int `json:"id"`
}
// {
// "jsonrpc": "2.0",
// "result": [
// {
// "id": {
// "txDigest": "6Yff8smmPZMandj6Psjy6wgZv5Deii78o1Sbghh5sHPA",
// "eventSeq": "0"
// },
// "packageId": "0x8b04a73ab3cb1e36bee5a86fdbfa481e97d3cc7ce8b594edea1400103ff0134d",
// "transactionModule": "sender",
// "sender": "0xed867315e3f7c83ae82e6d5858b6a6cc57c291fd84f7509646ebc8162169cf96",
// "type": "0x7483d0db53a140eed72bd6cb133daa59c539844f4c053924b9e3f0d2d7ba146d::publish_message::WormholeMessage",
// "parsedJson": {
// "consistency_level": 0,
// "nonce": 0,
// "payload": [104, 101, 108, 108, 111],
// "sender": "0x71c2aa2c549bb7381e88fbeca7eeb791be0afd455c8af9184613ce5db5ddba47",
// "sequence": "0",
// "timestamp": "1681411389"
// },
// "bcs": "5ZuknLT3Xsicr2D8zyk828thPByMBfR1cPJyEHF67k16AcEotDWhrpCDCTbk6BBbpSSs3bUk3msfADzrs"
// }
// ],
// "id": 1
// }
SuiCommitteeInfo struct {
SuiCheckpointSN struct {
Jsonrpc string `json:"jsonrpc"`
Result struct {
Epoch int `json:"epoch"`
CommitteeInfo [][]interface{} `json:"committee_info"`
} `json:"result"`
ID int `json:"id"`
Result string `json:"result"`
ID int `json:"id"`
}
)
var (
suiConnectionErrors = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_sui_connection_errors_total",
Help: "Total number of SUI connection errors",
}, []string{"reason"})
suiMessagesConfirmed = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_sui_observations_confirmed_total",
@ -123,91 +147,89 @@ var (
func NewWatcher(
suiRPC string,
suiWS string,
suiAccount string,
suiPackage string,
suiMoveEventType string,
unsafeDevMode bool,
messageEvents chan *common.MessagePublication,
obsvReqC chan *gossipv1.ObservationRequest,
) *Watcher {
return &Watcher{
suiRPC: suiRPC,
suiWS: suiWS,
suiAccount: suiAccount,
suiPackage: suiPackage,
unsafeDevMode: unsafeDevMode,
msgChan: messageEvents,
obsvReqC: obsvReqC,
readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDSui),
subId: 0,
subscribed: false,
suiRPC: suiRPC,
suiWS: suiWS,
suiMoveEventType: suiMoveEventType,
unsafeDevMode: unsafeDevMode,
msgChan: messageEvents,
obsvReqC: obsvReqC,
readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDSui),
subId: 0,
}
}
func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult) error {
if (body.Timestamp == nil) || (body.TxDigest == nil) {
return errors.New("Missing event fields")
if body.ID.TxDigest == nil {
return errors.New("Missing TxDigest field")
}
if body.Event.MoveEvent == nil {
return nil
}
moveEvent := *body.Event.MoveEvent
if (moveEvent.PackageID == nil) || (moveEvent.Sender == nil) {
return errors.New("Missing event fields")
if body.Type == nil {
return errors.New("Missing Type field")
}
if moveEvent.Fields == nil {
// There may be moveEvents caught without these params.
// So, not necessarily an error.
if body.Fields == nil {
return nil
}
fields := *moveEvent.Fields
if e.suiMoveEventType != *body.Type {
logger.Info("type mismatch", zap.String("e.suiMoveEventType", e.suiMoveEventType), zap.String("type", *body.Type))
return errors.New("type mismatch")
}
fields := *body.Fields
if (fields.ConsistencyLevel == nil) || (fields.Nonce == nil) || (fields.Payload == nil) || (fields.Sender == nil) || (fields.Sequence == nil) {
logger.Info("Missing required fields in event.")
return nil
}
if e.suiAccount != *moveEvent.Sender {
logger.Info("account mismatch", zap.String("e.suiAccount", e.suiAccount), zap.String("account", *moveEvent.Sender))
return errors.New("account mismatch")
}
if !e.unsafeDevMode && e.suiPackage != *moveEvent.PackageID {
logger.Info("package mismatch", zap.String("e.suiPackage", e.suiPackage), zap.String("package", *moveEvent.PackageID))
return errors.New("package mismatch")
}
emitter := make([]byte, 8)
binary.BigEndian.PutUint64(emitter, *fields.Sender)
var a vaa.Address
copy(a[24:], emitter)
id, err := base64.StdEncoding.DecodeString(*body.TxDigest)
emitter, err := vaa.StringToAddress(*fields.Sender)
if err != nil {
return err
}
var txHash = eth_common.BytesToHash(id) // 32 bytes = d3b136a6a182a40554b2fafbc8d12a7a22737c10c81e33b33d1dcb74c532708b
pl, err := base64.StdEncoding.DecodeString(*fields.Payload)
txHashBytes, err := base58.Decode(*body.ID.TxDigest)
if err != nil {
return err
}
if len(txHashBytes) != 32 {
logger.Error(
"Transaction hash is not 32 bytes",
zap.String("error_type", "malformed_wormhole_event"),
zap.String("log_msg_type", "tx_processing_error"),
zap.String("txHash", *body.ID.TxDigest),
)
return errors.New("Transaction hash is not 32 bytes")
}
txHashEthFormat := eth_common.BytesToHash(txHashBytes)
seq, err := strconv.ParseUint(*fields.Sequence, 10, 64)
if err != nil {
logger.Info("Sequence decode error", zap.String("Sequence", *fields.Sequence))
return err
}
ts, err := strconv.ParseInt(*fields.Timestamp, 10, 64)
if err != nil {
logger.Info("Timestamp decode error", zap.String("Timestamp", *fields.Timestamp))
return err
}
observation := &common.MessagePublication{
TxHash: txHash,
// We do NOT have a useful source of timestamp
// information. Every node has its own concept of a
// timestamp and nothing is persisted into the
// blockchain to make re-observation possible. Later
// we could explore putting the epoch or block height
// here but even those are currently not available.
//
// Timestamp: time.Unix(int64(timestamp.Uint()/1000), 0),
Timestamp: time.Unix(0, 0),
Nonce: uint32(*fields.Nonce), // uint32
Sequence: *fields.Sequence,
TxHash: txHashEthFormat,
Timestamp: time.Unix(ts, 0),
Nonce: uint32(*fields.Nonce),
Sequence: seq,
EmitterChain: vaa.ChainIDSui,
EmitterAddress: a,
Payload: pl,
EmitterAddress: emitter,
Payload: fields.Payload,
ConsistencyLevel: uint8(*fields.ConsistencyLevel),
}
@ -231,7 +253,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult) error {
func (e *Watcher) Run(ctx context.Context) error {
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSui, &gossipv1.Heartbeat_Network{
ContractAddress: e.suiAccount,
ContractAddress: e.suiMoveEventType,
})
logger := supervisor.Logger(ctx)
@ -239,181 +261,213 @@ func (e *Watcher) Run(ctx context.Context) error {
u := url.URL{Scheme: "ws", Host: e.suiWS}
logger.Info("Sui watcher connecting to WS node ", zap.String("url", u.String()))
logger.Debug("SUI watcher:", zap.String("suiRPC", e.suiRPC), zap.String("suiWS", e.suiWS), zap.String("suiMoveEventType", e.suiMoveEventType))
ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
ws, _, err := websocket.Dial(ctx, u.String(), nil)
if err != nil {
logger.Error(fmt.Sprintf("e.suiWS: %s", err.Error()))
return err
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
suiConnectionErrors.WithLabelValues("websocket_dial_error").Inc()
return fmt.Errorf("websocket dial failed: %w", err)
}
var s string
defer ws.Close(websocket.StatusNormalClosure, "")
nBig, _ := rand.Int(rand.Reader, big.NewInt(27))
e.subId = nBig.Int64()
if e.unsafeDevMode {
// There is no way to have a fixed package id on
// deployment. This means that in devnet, everytime
// we publish the contracts we will get a new package
// id. The solution is to just subscribe to the whole
// deployer account instead of to a specific package
// in that account...
s = fmt.Sprintf(`{"jsonrpc":"2.0", "id": %d, "method": "sui_subscribeEvent", "params": [{"SenderAddress": "%s"}]}`, e.subId, e.suiAccount)
} else {
s = fmt.Sprintf(`{"jsonrpc":"2.0", "id": %d, "method": "sui_subscribeEvent", "params": [{"SenderAddress": "%s", "Package": "%s"}]}`, e.subId, e.suiAccount, e.suiPackage)
subscription := fmt.Sprintf(`{"jsonrpc":"2.0", "id": %d, "method": "suix_subscribeEvent", "params": [{"MoveEventType": "%s"}]}`, e.subId, e.suiMoveEventType)
logger.Debug("Subscribing using", zap.String("json:", subscription))
err = ws.Write(ctx, websocket.MessageText, []byte(subscription))
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
suiConnectionErrors.WithLabelValues("websocket_subscription_error").Inc()
return fmt.Errorf("websocket subscription failed: %w", err)
}
logger.Info("Subscribing using", zap.String("filter", s))
if err := ws.WriteMessage(websocket.TextMessage, []byte(s)); err != nil {
logger.Error(fmt.Sprintf("write: %s", err.Error()))
return err
// Wait for the success response
mType, p, err := ws.Read(ctx)
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
suiConnectionErrors.WithLabelValues("event_subscription_error").Inc()
return fmt.Errorf("event subscription failed: %w", err)
}
var subRes map[string]any
err = json.Unmarshal(p, &subRes)
if err != nil {
return fmt.Errorf("failed to Unmarshal the subscription result: %w", err)
}
logger.Debug("Unmarshalled json", zap.Any("subRes", subRes))
actualResult := subRes["result"]
logger.Debug("actualResult", zap.Any("res", actualResult))
if actualResult == nil {
return fmt.Errorf("Failed to request filter in subscription request")
}
logger.Debug("subscribed to new transaction events", zap.Int("messageType", int(mType)), zap.String("bytes", string(p)))
timer := time.NewTicker(time.Second * 1)
timer := time.NewTicker(time.Second * 5)
defer timer.Stop()
supervisor.Signal(ctx, supervisor.SignalHealthy)
errC := make(chan error)
defer close(errC)
pumpData := make(chan []byte)
defer close(pumpData)
go func() {
supervisor.Signal(ctx, supervisor.SignalHealthy)
readiness.SetReady(e.readinessSync)
common.RunWithScissors(ctx, errC, "sui_data_pump", func(ctx context.Context) error {
for {
if _, msg, err := ws.ReadMessage(); err != nil {
logger.Error(fmt.Sprintf("ReadMessage: '%s'", err.Error()))
if strings.HasSuffix(err.Error(), "EOF") {
errC <- err
return
select {
case <-ctx.Done():
logger.Error("sui_data_pump context done")
return ctx.Err()
default:
_, msg, err := ws.Read(ctx)
if err != nil {
logger.Error(fmt.Sprintf("ReadMessage: '%s'", err.Error()))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
suiConnectionErrors.WithLabelValues("channel_read_error").Inc()
return err
}
var res SuiEventMsg
err = json.Unmarshal(msg, &res)
if err != nil {
logger.Error("Failed to unmarshal SuiEventMsg", zap.String("body", string(msg)), zap.Error(err))
return fmt.Errorf("Failed to unmarshal SuiEventMsg, body: %s, error: %w", string(msg), err)
}
if res.Error != nil {
return fmt.Errorf("Bad SuiEventMsg, body: %s, error: %w", string(msg), err)
}
logger.Debug("SUI result message", zap.String("message", string(msg)), zap.Any("event", res))
if res.ID != nil {
logger.Error("Found an unexpected res.ID")
continue
}
if res.Params != nil && (*res.Params).Result != nil {
err := e.inspectBody(logger, *(*res.Params).Result)
if err != nil {
logger.Error(fmt.Sprintf("inspectBody: %s", err.Error()))
}
continue
}
} else {
pumpData <- msg
}
}
}()
})
for {
select {
case err := <-errC:
_ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
logger.Error("Pump died")
return err
case <-ctx.Done():
_ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
return ctx.Err()
case r := <-e.obsvReqC:
if vaa.ChainID(r.ChainId) != vaa.ChainIDSui {
panic("invalid chain ID")
}
common.RunWithScissors(ctx, errC, "sui_block_height", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
logger.Error("sui_block_height context done")
return ctx.Err()
id := base64.StdEncoding.EncodeToString(r.TxHash)
logger.Info("obsv request", zap.String("TxHash", string(id)))
buf := fmt.Sprintf(`{"jsonrpc":"2.0", "id": 1, "method": "sui_getEvents", "params": [{"Transaction": "%s"}, null, 10, true]}`, id)
resp, err := http.Post(e.suiRPC, "application/json", strings.NewReader(buf))
if err != nil {
logger.Error("getEvents API failed", zap.String("suiRPC", e.suiRPC), zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
continue
}
body, err := io.ReadAll(resp.Body)
if err != nil {
logger.Error("unexpected truncated body when calling getEvents", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
continue
}
resp.Body.Close()
logger.Info("receive", zap.String("body", string(body)))
var res SuiTxnQuery
err = json.Unmarshal(body, &res)
if err != nil {
logger.Error("failed to unmarshal event message", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
continue
}
for _, chunk := range res.Result.Data {
err := e.inspectBody(logger, chunk)
case <-timer.C:
resp, err := http.Post(e.suiRPC, "application/json", strings.NewReader(`{"jsonrpc":"2.0", "id": 1, "method": "sui_getLatestCheckpointSequenceNumber", "params": []}`))
if err != nil {
logger.Error("unspected error while parsing chunk data in event", zap.Error(err))
logger.Error("sui_getLatestCheckpointSequenceNumber failed", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
return fmt.Errorf("sui_getLatestCheckpointSequenceNumber failed to post: %w", err)
}
}
case msg := <-pumpData:
logger.Info("receive", zap.String("body", string(msg)))
var res SuiEventMsg
err = json.Unmarshal(msg, &res)
if err != nil {
logger.Error("Failed to unmarshal SuiEventMsg", zap.String("body", string(msg)), zap.Error(err))
continue
}
if res.Error != nil {
_ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
return errors.New((*res.Error).Message)
}
if res.ID != nil {
if *res.ID == e.subId {
logger.Debug("Subscribed set to true")
e.subscribed = true
}
continue
}
if res.Params != nil && (*res.Params).Result != nil {
err := e.inspectBody(logger, *(*res.Params).Result)
body, err := io.ReadAll(resp.Body)
if err != nil {
logger.Error(fmt.Sprintf("inspectBody: %s", err.Error()))
logger.Error("sui_getLatestCheckpointSequenceNumber failed", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
return fmt.Errorf("sui_getLatestCheckpointSequenceNumber failed to read: %w", err)
}
continue
}
resp.Body.Close()
logger.Debug("Body before unmarshalling", zap.String("body", string(body)))
case <-timer.C:
resp, err := http.Post(e.suiRPC, "application/json", strings.NewReader(`{"jsonrpc":"2.0", "id": 1, "method": "sui_getCommitteeInfo", "params": []}`))
if err != nil {
logger.Error("sui_getCommitteeInfo failed", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
break
var res SuiCheckpointSN
err = json.Unmarshal(body, &res)
if err != nil {
logger.Error("unmarshal failed into uint64", zap.String("body", string(body)), zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
return fmt.Errorf("sui_getLatestCheckpointSequenceNumber failed to unmarshal body: %s, error: %w", string(body), err)
}
}
body, err := io.ReadAll(resp.Body)
if err != nil {
logger.Error("sui_getCommitteeInfo failed", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
break
}
resp.Body.Close()
height, pErr := strconv.ParseInt(res.Result, 0, 64)
if pErr != nil {
logger.Error("Failed to ParseInt")
} else {
currentSuiHeight.Set(float64(height))
logger.Debug("sui_getLatestCheckpointSequenceNumber", zap.String("result", res.Result))
var res SuiCommitteeInfo
err = json.Unmarshal(body, &res)
if err != nil {
logger.Error("unmarshal failed into SuiCommitteeInfo", zap.String("body", string(body)), zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
continue
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSui, &gossipv1.Heartbeat_Network{
Height: int64(height),
ContractAddress: e.suiMoveEventType,
})
}
}
// Epoch is currently not ticking in 0.16.0. They also
// might release another API that gives a
// proper block height as we traditionally
// understand it...
currentSuiHeight.Set(float64(res.Result.Epoch))
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSui, &gossipv1.Heartbeat_Network{
Height: int64(res.Result.Epoch),
ContractAddress: e.suiAccount,
})
if e.subscribed {
readiness.SetReady(e.readinessSync)
}
}
})
common.RunWithScissors(ctx, errC, "sui_fetch_obvs_req", func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
logger.Error("sui_fetch_obvs_req context done")
return ctx.Err()
case r := <-e.obsvReqC:
if vaa.ChainID(r.ChainId) != vaa.ChainIDSui {
panic("invalid chain ID")
}
tx58 := base58.Encode(r.TxHash)
buf := fmt.Sprintf(`{"jsonrpc":"2.0", "id": 1, "method": "sui_getEvents", "params": ["%s"]}`, tx58)
logger.Error(buf)
resp, err := http.Post(e.suiRPC, "application/json", strings.NewReader(buf))
if err != nil {
logger.Error("getEvents API failed", zap.String("suiRPC", e.suiRPC), zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
continue
}
body, err := io.ReadAll(resp.Body)
if err != nil {
logger.Error("unexpected truncated body when calling getEvents", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
return fmt.Errorf("sui__fetch_obvs_req failed to post: %w", err)
}
resp.Body.Close()
logger.Debug("receive", zap.String("body", string(body)))
if strings.Contains(string(body), "error") {
logger.Error("Failed to get events for re-observation request", zap.String("Result", string(body)))
continue
}
var res SuiTxnQuery
err = json.Unmarshal(body, &res)
if err != nil {
logger.Error("failed to unmarshal event message", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
return fmt.Errorf("sui__fetch_obvs_req failed to unmarshal: %w", err)
}
for i, chunk := range res.Result {
err := e.inspectBody(logger, chunk)
if err != nil {
logger.Info("skipping event data in result", zap.String("txhash", tx58), zap.Int("index", i), zap.Error(err))
}
}
}
}
})
select {
case <-ctx.Done():
_ = ws.Close(websocket.StatusNormalClosure, "")
return ctx.Err()
case err := <-errC:
_ = ws.Close(websocket.StatusInternalError, err.Error())
return err
}
}