Basic Sui watcher support (#1951)
This commit is contained in:
parent
a6156ed1c8
commit
402a2d1a30
16
Tiltfile
16
Tiltfile
|
@ -65,7 +65,7 @@ ci = cfg.get("ci", False)
|
||||||
algorand = cfg.get("algorand", ci)
|
algorand = cfg.get("algorand", ci)
|
||||||
near = cfg.get("near", ci)
|
near = cfg.get("near", ci)
|
||||||
aptos = cfg.get("aptos", ci)
|
aptos = cfg.get("aptos", ci)
|
||||||
sui = cfg.get("sui", ci)
|
sui = cfg.get("sui", False)
|
||||||
evm2 = cfg.get("evm2", ci)
|
evm2 = cfg.get("evm2", ci)
|
||||||
solana = cfg.get("solana", ci)
|
solana = cfg.get("solana", ci)
|
||||||
terra_classic = cfg.get("terra_classic", ci)
|
terra_classic = cfg.get("terra_classic", ci)
|
||||||
|
@ -175,6 +175,20 @@ def build_node_yaml():
|
||||||
"0xde0036a9600559e295d5f6802ef6f3f802f510366e0c23912b0655d972166017::state::WormholeMessageHandle",
|
"0xde0036a9600559e295d5f6802ef6f3f802f510366e0c23912b0655d972166017::state::WormholeMessageHandle",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
if sui:
|
||||||
|
container["command"] += [
|
||||||
|
"--suiRPC",
|
||||||
|
"http://sui:9002",
|
||||||
|
# In testnet and mainnet, you will need to also specify the suiPackage argument. In Devnet, we subscribe to
|
||||||
|
# event traffic purely based on the account since that is the only thing that is deterministic.
|
||||||
|
# "--suiPackage",
|
||||||
|
# "0x.....",
|
||||||
|
"--suiAccount",
|
||||||
|
"0x2acab6bb0e4722e528291bc6ca4f097e18ce9331",
|
||||||
|
"--suiWS",
|
||||||
|
"sui:9001",
|
||||||
|
]
|
||||||
|
|
||||||
if evm2:
|
if evm2:
|
||||||
container["command"] += [
|
container["command"] += [
|
||||||
"--bscRPC",
|
"--bscRPC",
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"github.com/certusone/wormhole/node/pkg/watchers/evm"
|
"github.com/certusone/wormhole/node/pkg/watchers/evm"
|
||||||
"github.com/certusone/wormhole/node/pkg/watchers/near"
|
"github.com/certusone/wormhole/node/pkg/watchers/near"
|
||||||
"github.com/certusone/wormhole/node/pkg/watchers/solana"
|
"github.com/certusone/wormhole/node/pkg/watchers/solana"
|
||||||
|
"github.com/certusone/wormhole/node/pkg/watchers/sui"
|
||||||
|
|
||||||
"github.com/benbjohnson/clock"
|
"github.com/benbjohnson/clock"
|
||||||
"github.com/certusone/wormhole/node/pkg/db"
|
"github.com/certusone/wormhole/node/pkg/db"
|
||||||
|
@ -142,6 +143,11 @@ var (
|
||||||
aptosAccount *string
|
aptosAccount *string
|
||||||
aptosHandle *string
|
aptosHandle *string
|
||||||
|
|
||||||
|
suiRPC *string
|
||||||
|
suiWS *string
|
||||||
|
suiAccount *string
|
||||||
|
suiPackage *string
|
||||||
|
|
||||||
solanaRPC *string
|
solanaRPC *string
|
||||||
|
|
||||||
pythnetContract *string
|
pythnetContract *string
|
||||||
|
@ -273,6 +279,11 @@ func init() {
|
||||||
aptosAccount = NodeCmd.Flags().String("aptosAccount", "", "aptos account")
|
aptosAccount = NodeCmd.Flags().String("aptosAccount", "", "aptos account")
|
||||||
aptosHandle = NodeCmd.Flags().String("aptosHandle", "", "aptos handle")
|
aptosHandle = NodeCmd.Flags().String("aptosHandle", "", "aptos handle")
|
||||||
|
|
||||||
|
suiRPC = NodeCmd.Flags().String("suiRPC", "", "sui RPC URL")
|
||||||
|
suiWS = NodeCmd.Flags().String("suiWS", "", "sui WS URL")
|
||||||
|
suiAccount = NodeCmd.Flags().String("suiAccount", "", "sui account")
|
||||||
|
suiPackage = NodeCmd.Flags().String("suiPackage", "", "sui package")
|
||||||
|
|
||||||
solanaRPC = NodeCmd.Flags().String("solanaRPC", "", "Solana RPC URL (required")
|
solanaRPC = NodeCmd.Flags().String("solanaRPC", "", "Solana RPC URL (required")
|
||||||
|
|
||||||
pythnetContract = NodeCmd.Flags().String("pythnetContract", "", "Address of the PythNet program (required)")
|
pythnetContract = NodeCmd.Flags().String("pythnetContract", "", "Address of the PythNet program (required)")
|
||||||
|
@ -572,6 +583,17 @@ func runNode(cmd *cobra.Command, args []string) {
|
||||||
logger.Fatal("If --aptosRPC is specified, then --aptosHandle must be specified")
|
logger.Fatal("If --aptosRPC is specified, then --aptosHandle must be specified")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if *suiRPC != "" {
|
||||||
|
if *suiWS == "" {
|
||||||
|
logger.Fatal("If --suiRPC is specified, then --suiWS must be specified")
|
||||||
|
}
|
||||||
|
if *suiAccount == "" {
|
||||||
|
logger.Fatal("If --suiRPC is specified, then --suiAccount must be specified")
|
||||||
|
}
|
||||||
|
if *suiPackage == "" && !*unsafeDevMode {
|
||||||
|
logger.Fatal("If --suiRPC is specified, then --suiPackage must be specified")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (*optimismRPC == "") != (*optimismContract == "") {
|
if (*optimismRPC == "") != (*optimismContract == "") {
|
||||||
logger.Fatal("Both --optimismContract and --optimismRPC must be set together or both unset")
|
logger.Fatal("Both --optimismContract and --optimismRPC must be set together or both unset")
|
||||||
|
@ -1129,6 +1151,16 @@ func runNode(cmd *cobra.Command, args []string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if shouldStart(suiRPC) {
|
||||||
|
logger.Info("Starting Sui watcher")
|
||||||
|
readiness.RegisterComponent(common.ReadinessSuiSyncing)
|
||||||
|
chainObsvReqC[vaa.ChainIDSui] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize)
|
||||||
|
if err := supervisor.Run(ctx, "suiwatch",
|
||||||
|
sui.NewWatcher(*suiRPC, *suiWS, *suiAccount, *suiPackage, *unsafeDevMode, lockC, chainObsvReqC[vaa.ChainIDSui]).Run); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var solanaFinalizedWatcher *solana.SolanaWatcher
|
var solanaFinalizedWatcher *solana.SolanaWatcher
|
||||||
if shouldStart(solanaRPC) {
|
if shouldStart(solanaRPC) {
|
||||||
logger.Info("Starting Solana watcher")
|
logger.Info("Starting Solana watcher")
|
||||||
|
|
|
@ -9,6 +9,7 @@ const (
|
||||||
ReadinessAlgorandSyncing readiness.Component = "algorandSyncing"
|
ReadinessAlgorandSyncing readiness.Component = "algorandSyncing"
|
||||||
ReadinessNearSyncing readiness.Component = "nearSyncing"
|
ReadinessNearSyncing readiness.Component = "nearSyncing"
|
||||||
ReadinessAptosSyncing readiness.Component = "aptosSyncing"
|
ReadinessAptosSyncing readiness.Component = "aptosSyncing"
|
||||||
|
ReadinessSuiSyncing readiness.Component = "suiSyncing"
|
||||||
ReadinessBSCSyncing readiness.Component = "bscSyncing"
|
ReadinessBSCSyncing readiness.Component = "bscSyncing"
|
||||||
ReadinessPolygonSyncing readiness.Component = "polygonSyncing"
|
ReadinessPolygonSyncing readiness.Component = "polygonSyncing"
|
||||||
ReadinessAvalancheSyncing readiness.Component = "avalancheSyncing"
|
ReadinessAvalancheSyncing readiness.Component = "avalancheSyncing"
|
||||||
|
|
|
@ -0,0 +1,412 @@
|
||||||
|
package sui
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"math/big"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"encoding/base64"
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
|
||||||
|
"github.com/certusone/wormhole/node/pkg/common"
|
||||||
|
"github.com/certusone/wormhole/node/pkg/p2p"
|
||||||
|
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
|
||||||
|
"github.com/certusone/wormhole/node/pkg/readiness"
|
||||||
|
"github.com/certusone/wormhole/node/pkg/supervisor"
|
||||||
|
|
||||||
|
eth_common "github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
|
|
||||||
|
"github.com/tidwall/gjson"
|
||||||
|
"github.com/wormhole-foundation/wormhole/sdk/vaa"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
// Watcher is responsible for looking over Sui blockchain and reporting new transactions to the wormhole contract
|
||||||
|
Watcher struct {
|
||||||
|
suiRPC string
|
||||||
|
suiWS string
|
||||||
|
suiAccount string
|
||||||
|
suiPackage string
|
||||||
|
|
||||||
|
unsafeDevMode bool
|
||||||
|
|
||||||
|
msgChan chan *common.MessagePublication
|
||||||
|
obsvReqC chan *gossipv1.ObservationRequest
|
||||||
|
|
||||||
|
subId int64
|
||||||
|
subscribed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
SuiResult struct {
|
||||||
|
Timestamp *int64 `json:"timestamp"`
|
||||||
|
TxDigest *string `json:"txDigest"`
|
||||||
|
Event struct {
|
||||||
|
MoveEvent *struct {
|
||||||
|
PackageID *string `json:"packageId"`
|
||||||
|
TransactionModule *string `json:"transactionModule"`
|
||||||
|
Sender *string `json:"sender"`
|
||||||
|
Type *string `json:"type"`
|
||||||
|
Fields *struct {
|
||||||
|
ConsistencyLevel *uint8 `json:"consistency_level"`
|
||||||
|
Nonce *uint64 `json:"nonce"`
|
||||||
|
Payload *string `json:"payload"`
|
||||||
|
Sender *uint64 `json:"sender"`
|
||||||
|
Sequence *uint64 `json:"sequence"`
|
||||||
|
} `json:"fields"`
|
||||||
|
Bcs string `json:"bcs"`
|
||||||
|
} `json:"moveEvent"`
|
||||||
|
} `json:"event"`
|
||||||
|
}
|
||||||
|
|
||||||
|
SuiEventError struct {
|
||||||
|
Code int64 `json:"code"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
}
|
||||||
|
|
||||||
|
SuiEventMsg struct {
|
||||||
|
Jsonrpc string `json:"jsonrpc"`
|
||||||
|
Method *string `json:"method"`
|
||||||
|
ID *int64 `json:"id"`
|
||||||
|
Error *SuiEventError `json:"error"`
|
||||||
|
Params *struct {
|
||||||
|
Subscription int64 `json:"subscription"`
|
||||||
|
Result *SuiResult `json:"result"`
|
||||||
|
} `json:"params"`
|
||||||
|
}
|
||||||
|
|
||||||
|
SuiTxnQuery struct {
|
||||||
|
Jsonrpc string `json:"jsonrpc"`
|
||||||
|
Result struct {
|
||||||
|
Data []SuiResult `json:"data"`
|
||||||
|
NextCursor interface{} `json:"nextCursor"`
|
||||||
|
} `json:"result"`
|
||||||
|
ID int `json:"id"`
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
suiMessagesConfirmed = promauto.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "wormhole_sui_observations_confirmed_total",
|
||||||
|
Help: "Total number of verified Sui observations found",
|
||||||
|
})
|
||||||
|
currentSuiHeight = promauto.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "wormhole_sui_current_height",
|
||||||
|
Help: "Current Sui block height",
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
// NewWatcher creates a new Sui appid watcher
|
||||||
|
func NewWatcher(
|
||||||
|
suiRPC string,
|
||||||
|
suiWS string,
|
||||||
|
suiAccount string,
|
||||||
|
suiPackage string,
|
||||||
|
unsafeDevMode bool,
|
||||||
|
messageEvents chan *common.MessagePublication,
|
||||||
|
obsvReqC chan *gossipv1.ObservationRequest,
|
||||||
|
) *Watcher {
|
||||||
|
return &Watcher{
|
||||||
|
suiRPC: suiRPC,
|
||||||
|
suiWS: suiWS,
|
||||||
|
suiAccount: suiAccount,
|
||||||
|
suiPackage: suiPackage,
|
||||||
|
unsafeDevMode: unsafeDevMode,
|
||||||
|
msgChan: messageEvents,
|
||||||
|
obsvReqC: obsvReqC,
|
||||||
|
subId: 0,
|
||||||
|
subscribed: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult) error {
|
||||||
|
if (body.Timestamp == nil) || (body.TxDigest == nil) {
|
||||||
|
return errors.New("Missing event fields")
|
||||||
|
}
|
||||||
|
if body.Event.MoveEvent == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
moveEvent := *body.Event.MoveEvent
|
||||||
|
if (moveEvent.PackageID == nil) || (moveEvent.Sender == nil) {
|
||||||
|
return errors.New("Missing event fields")
|
||||||
|
}
|
||||||
|
|
||||||
|
if moveEvent.Fields == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
fields := *moveEvent.Fields
|
||||||
|
if (fields.ConsistencyLevel == nil) || (fields.Nonce == nil) || (fields.Payload == nil) || (fields.Sender == nil) || (fields.Sequence == nil) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if e.suiAccount != *moveEvent.Sender {
|
||||||
|
logger.Info("account missmatch", zap.String("e.suiAccount", e.suiAccount), zap.String("account", *moveEvent.Sender))
|
||||||
|
return errors.New("account missmatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !e.unsafeDevMode && e.suiPackage != *moveEvent.PackageID {
|
||||||
|
logger.Info("package missmatch", zap.String("e.suiPackage", e.suiPackage), zap.String("package", *moveEvent.PackageID))
|
||||||
|
return errors.New("package missmatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
emitter := make([]byte, 8)
|
||||||
|
binary.BigEndian.PutUint64(emitter, *fields.Sender)
|
||||||
|
|
||||||
|
var a vaa.Address
|
||||||
|
copy(a[24:], emitter)
|
||||||
|
|
||||||
|
id, err := base64.StdEncoding.DecodeString(*body.TxDigest)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var txHash = eth_common.BytesToHash(id) // 32 bytes = d3b136a6a182a40554b2fafbc8d12a7a22737c10c81e33b33d1dcb74c532708b
|
||||||
|
|
||||||
|
pl, err := base64.StdEncoding.DecodeString(*fields.Payload)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
observation := &common.MessagePublication{
|
||||||
|
TxHash: txHash,
|
||||||
|
// We do NOT have a useful source of timestamp
|
||||||
|
// information. Every node has its own concept of a
|
||||||
|
// timestamp and nothing is persisted into the
|
||||||
|
// blockchain to make re-observation possible. Later
|
||||||
|
// we could explore putting the epoch or block height
|
||||||
|
// here but even those are currently not available.
|
||||||
|
//
|
||||||
|
// Timestamp: time.Unix(int64(timestamp.Uint()/1000), 0),
|
||||||
|
Timestamp: time.Unix(0, 0),
|
||||||
|
Nonce: uint32(*fields.Nonce), // uint32
|
||||||
|
Sequence: *fields.Sequence,
|
||||||
|
EmitterChain: vaa.ChainIDSui,
|
||||||
|
EmitterAddress: a,
|
||||||
|
Payload: pl,
|
||||||
|
ConsistencyLevel: uint8(*fields.ConsistencyLevel),
|
||||||
|
}
|
||||||
|
|
||||||
|
suiMessagesConfirmed.Inc()
|
||||||
|
|
||||||
|
logger.Info("message observed",
|
||||||
|
zap.Stringer("txHash", observation.TxHash),
|
||||||
|
zap.Time("timestamp", observation.Timestamp),
|
||||||
|
zap.Uint32("nonce", observation.Nonce),
|
||||||
|
zap.Uint64("sequence", observation.Sequence),
|
||||||
|
zap.Stringer("emitter_chain", observation.EmitterChain),
|
||||||
|
zap.Stringer("emitter_address", observation.EmitterAddress),
|
||||||
|
zap.Binary("payload", observation.Payload),
|
||||||
|
zap.Uint8("consistencyLevel", observation.ConsistencyLevel),
|
||||||
|
)
|
||||||
|
|
||||||
|
e.msgChan <- observation
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Watcher) Run(ctx context.Context) error {
|
||||||
|
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSui, &gossipv1.Heartbeat_Network{
|
||||||
|
ContractAddress: e.suiAccount,
|
||||||
|
})
|
||||||
|
|
||||||
|
logger := supervisor.Logger(ctx)
|
||||||
|
|
||||||
|
u := url.URL{Scheme: "ws", Host: e.suiWS}
|
||||||
|
|
||||||
|
logger.Info("Sui watcher connecting to WS node ", zap.String("url", u.String()))
|
||||||
|
|
||||||
|
ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(fmt.Sprintf("e.suiWS: %s", err.Error()))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var s string
|
||||||
|
|
||||||
|
nBig, _ := rand.Int(rand.Reader, big.NewInt(27))
|
||||||
|
e.subId = nBig.Int64()
|
||||||
|
|
||||||
|
if e.unsafeDevMode {
|
||||||
|
// There is no way to have a fixed package id on
|
||||||
|
// deployment. This means that in devnet, everytime
|
||||||
|
// we publish the contracts we will get a new package
|
||||||
|
// id. The solution is to just subscribe to the whole
|
||||||
|
// deployer account instead of to a specific package
|
||||||
|
// in that account...
|
||||||
|
s = fmt.Sprintf(`{"jsonrpc":"2.0", "id": %d, "method": "sui_subscribeEvent", "params": [{"SenderAddress": "%s"}]}`, e.subId, e.suiAccount)
|
||||||
|
} else {
|
||||||
|
s = fmt.Sprintf(`{"jsonrpc":"2.0", "id": %d, "method": "sui_subscribeEvent", "params": [{"SenderAddress": "%s", "Package": "%s"}]}`, e.subId, e.suiAccount, e.suiPackage)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Info("Subscribing using", zap.String("filter", s))
|
||||||
|
|
||||||
|
if err := ws.WriteMessage(websocket.TextMessage, []byte(s)); err != nil {
|
||||||
|
logger.Error(fmt.Sprintf("write: %s", err.Error()))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
timer := time.NewTicker(time.Second * 1)
|
||||||
|
defer timer.Stop()
|
||||||
|
|
||||||
|
supervisor.Signal(ctx, supervisor.SignalHealthy)
|
||||||
|
|
||||||
|
errC := make(chan error)
|
||||||
|
defer close(errC)
|
||||||
|
pumpData := make(chan []byte)
|
||||||
|
defer close(pumpData)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
if _, msg, err := ws.ReadMessage(); err != nil {
|
||||||
|
logger.Error(fmt.Sprintf("ReadMessage: '%s'", err.Error()))
|
||||||
|
if strings.HasSuffix(err.Error(), "EOF") {
|
||||||
|
errC <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pumpData <- msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case err := <-errC:
|
||||||
|
_ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
||||||
|
logger.Error("Pump died")
|
||||||
|
return err
|
||||||
|
case <-ctx.Done():
|
||||||
|
_ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
||||||
|
return ctx.Err()
|
||||||
|
case r := <-e.obsvReqC:
|
||||||
|
if vaa.ChainID(r.ChainId) != vaa.ChainIDSui {
|
||||||
|
panic("invalid chain ID")
|
||||||
|
}
|
||||||
|
|
||||||
|
id := base64.StdEncoding.EncodeToString(r.TxHash)
|
||||||
|
|
||||||
|
logger.Info("obsv request", zap.String("TxHash", string(id)))
|
||||||
|
|
||||||
|
buf := fmt.Sprintf(`{"jsonrpc":"2.0", "id": 1, "method": "sui_getEvents", "params": [{"Transaction": "%s"}, null, 10, true]}`, id)
|
||||||
|
|
||||||
|
resp, err := http.Post(e.suiRPC, "application/json", strings.NewReader(buf))
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(e.suiRPC, zap.Error(err))
|
||||||
|
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(e.suiRPC, zap.Error(err))
|
||||||
|
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
|
||||||
|
continue
|
||||||
|
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
logger.Info("receive", zap.String("body", string(body)))
|
||||||
|
|
||||||
|
var res SuiTxnQuery
|
||||||
|
err = json.Unmarshal(body, &res)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(e.suiRPC, zap.Error(err))
|
||||||
|
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
|
||||||
|
continue
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, chunk := range res.Result.Data {
|
||||||
|
err := e.inspectBody(logger, chunk)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(e.suiRPC, zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
case msg := <-pumpData:
|
||||||
|
logger.Info("receive", zap.String("body", string(msg)))
|
||||||
|
|
||||||
|
var res SuiEventMsg
|
||||||
|
err = json.Unmarshal(msg, &res)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("Unmarshal", zap.String("body", string(msg)), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if res.Error != nil {
|
||||||
|
_ = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
||||||
|
return errors.New((*res.Error).Message)
|
||||||
|
}
|
||||||
|
if res.ID != nil {
|
||||||
|
if *res.ID == e.subId {
|
||||||
|
logger.Info("Subscribed set to true")
|
||||||
|
e.subscribed = true
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if res.Params != nil && (*res.Params).Result != nil {
|
||||||
|
err := e.inspectBody(logger, *(*res.Params).Result)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(fmt.Sprintf("inspectBody: %s", err.Error()))
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-timer.C:
|
||||||
|
resp, err := http.Post(e.suiRPC, "application/json", strings.NewReader(`{"jsonrpc":"2.0", "id": 1, "method": "sui_getCommitteeInfo", "params": []}`))
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(fmt.Sprintf("sui_getCommitteeInfo: %s", err.Error()))
|
||||||
|
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
|
||||||
|
break
|
||||||
|
|
||||||
|
}
|
||||||
|
body, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
logger.Error(fmt.Sprintf("sui_getCommitteeInfo: %s", err.Error()))
|
||||||
|
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
|
||||||
|
break
|
||||||
|
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
if !gjson.Valid(string(body)) {
|
||||||
|
logger.Error("sui_getCommitteeInfo: " + string(body))
|
||||||
|
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
epoch := gjson.ParseBytes(body).Get("result.epoch")
|
||||||
|
if !epoch.Exists() {
|
||||||
|
logger.Error("sui_getCommitteeInfo: " + string(body))
|
||||||
|
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Epoch is currently not ticking in 0.15.0. They also
|
||||||
|
// might release another API that gives a
|
||||||
|
// proper block height as we traditionally
|
||||||
|
// understand it...
|
||||||
|
currentSuiHeight.Set(float64(epoch.Uint()))
|
||||||
|
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSui, &gossipv1.Heartbeat_Network{
|
||||||
|
Height: int64(epoch.Uint()),
|
||||||
|
ContractAddress: e.suiAccount,
|
||||||
|
})
|
||||||
|
|
||||||
|
if e.subscribed {
|
||||||
|
readiness.SetReady(common.ReadinessSuiSyncing)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue