node: remove websockets from SUI watcher

This commit is contained in:
Paul Noel 2024-07-03 14:22:47 -05:00 committed by Paul Noel
parent 7c6e4c597c
commit 53997afc65
4 changed files with 210 additions and 212 deletions

View File

@ -1407,7 +1407,6 @@ func runNode(cmd *cobra.Command, args []string) {
NetworkID: "sui",
ChainID: vaa.ChainIDSui,
Rpc: *suiRPC,
Websocket: *suiWS,
SuiMoveEventType: *suiMoveEventType,
}
watcherConfigs = append(watcherConfigs, wc)

View File

@ -14,7 +14,6 @@ type WatcherConfig struct {
NetworkID watchers.NetworkID // human readable name
ChainID vaa.ChainID // ChainID
Rpc string
Websocket string
SuiMoveEventType string
}
@ -44,5 +43,5 @@ func (wc *WatcherConfig) Create(
) (interfaces.L1Finalizer, supervisor.Runnable, error) {
var devMode bool = (env == common.UnsafeDevNet)
return nil, NewWatcher(wc.Rpc, wc.Websocket, wc.SuiMoveEventType, devMode, msgC, obsvReqC).Run, nil
return nil, NewWatcher(wc.Rpc, wc.SuiMoveEventType, devMode, msgC, obsvReqC).Run, nil
}

View File

@ -2,21 +2,16 @@ package sui
import (
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"math/big"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"encoding/json"
"nhooyr.io/websocket"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
@ -36,7 +31,6 @@ type (
// Watcher is responsible for looking over Sui blockchain and reporting new transactions to the wormhole contract
Watcher struct {
suiRPC string
suiWS string
suiMoveEventType string
unsafeDevMode bool
@ -45,7 +39,25 @@ type (
obsvReqC <-chan *gossipv1.ObservationRequest
readinessSync readiness.Component
subId int64
subId int64
latestProcessedCheckpoint int64
maximumBatchSize int
descendingOrder bool
loopDelayInSecs int
}
SuiEventResponse struct {
Jsonrpc string `json:"jsonrpc"`
Result SuiEventResponseData `json:"result"`
ID int `json:"id"`
}
SuiEventResponseData struct {
Data []SuiResult `json:"data"`
NextCursor struct {
TxDigest string `json:"txDigest"`
EventSeq string `json:"eventSeq"`
} `json:"nextCursor"`
HasNextPage bool `json:"hasNextPage"`
}
FieldsData struct {
@ -101,45 +113,25 @@ type (
Result []SuiResult `json:"result"`
ID int `json:"id"`
}
// {
// "jsonrpc": "2.0",
// "result": [
// {
// "id": {
// "txDigest": "6Yff8smmPZMandj6Psjy6wgZv5Deii78o1Sbghh5sHPA",
// "eventSeq": "0"
// },
// "packageId": "0x8b04a73ab3cb1e36bee5a86fdbfa481e97d3cc7ce8b594edea1400103ff0134d",
// "transactionModule": "sender",
// "sender": "0xed867315e3f7c83ae82e6d5858b6a6cc57c291fd84f7509646ebc8162169cf96",
// "type": "0x7483d0db53a140eed72bd6cb133daa59c539844f4c053924b9e3f0d2d7ba146d::publish_message::WormholeMessage",
// "parsedJson": {
// "consistency_level": 0,
// "nonce": 0,
// "payload": [104, 101, 108, 108, 111],
// "sender": "0x71c2aa2c549bb7381e88fbeca7eeb791be0afd455c8af9184613ce5db5ddba47",
// "sequence": "0",
// "timestamp": "1681411389"
// },
// "bcs": "5ZuknLT3Xsicr2D8zyk828thPByMBfR1cPJyEHF67k16AcEotDWhrpCDCTbk6BBbpSSs3bUk3msfADzrs"
// }
// ],
// "id": 1
// }
SuiCheckpointSN struct {
Jsonrpc string `json:"jsonrpc"`
Result string `json:"result"`
ID int `json:"id"`
}
GetCheckpointResponse struct {
Jsonrpc string `json:"jsonrpc"`
Result struct {
Digest string `json:"digest"`
TimestampMs string `json:"timestampMs"`
Checkpoint string `json:"checkpoint"`
} `json:"result"`
ID int `json:"id"`
}
)
var (
suiConnectionErrors = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormhole_sui_connection_errors_total",
Help: "Total number of SUI connection errors",
}, []string{"reason"})
suiMessagesConfirmed = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_sui_observations_confirmed_total",
@ -155,30 +147,32 @@ var (
// NewWatcher creates a new Sui appid watcher
func NewWatcher(
suiRPC string,
suiWS string,
suiMoveEventType string,
unsafeDevMode bool,
messageEvents chan<- *common.MessagePublication,
obsvReqC <-chan *gossipv1.ObservationRequest,
) *Watcher {
return &Watcher{
suiRPC: suiRPC,
suiWS: suiWS,
suiMoveEventType: suiMoveEventType,
unsafeDevMode: unsafeDevMode,
msgChan: messageEvents,
obsvReqC: obsvReqC,
readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDSui),
subId: 0,
suiRPC: suiRPC,
suiMoveEventType: suiMoveEventType,
unsafeDevMode: unsafeDevMode,
msgChan: messageEvents,
obsvReqC: obsvReqC,
readinessSync: common.MustConvertChainIdToReadinessSyncing(vaa.ChainIDSui),
subId: 0,
latestProcessedCheckpoint: 0,
maximumBatchSize: 10,
descendingOrder: true, // Retrieve newest events first
loopDelayInSecs: 3, // SUI produces a checkpoint every ~3 seconds
}
}
func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult, isReobservation bool) error {
if body.ID.TxDigest == nil {
return errors.New("Missing TxDigest field")
return errors.New("missing TxDigest field")
}
if body.Type == nil {
return errors.New("Missing Type field")
return errors.New("missing Type field")
}
// There may be moveEvents caught without these params.
@ -224,7 +218,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, body SuiResult, isReobservatio
zap.String("log_msg_type", "tx_processing_error"),
zap.String("txHash", *body.ID.TxDigest),
)
return errors.New("Transaction hash is not 32 bytes")
return errors.New("transaction hash is not 32 bytes")
}
txHashEthFormat := eth_common.BytesToHash(txHashBytes)
@ -277,62 +271,20 @@ func (e *Watcher) Run(ctx context.Context) error {
logger := supervisor.Logger(ctx)
// guardiand v2.16.0 shipped hardcoding "ws://" for the websocket url. This makes
// the flag value the same as all of the other uses of rpc websocket values.
err := e.fixSuiWsURL(logger)
// Get the latest checkpoint sequence number. This will be the starting point for the watcher.
latest, err := e.getLatestCheckpointSN(logger)
if err != nil {
return err
return fmt.Errorf("failed to get latest checkpoint sequence number: %w", err)
}
e.latestProcessedCheckpoint = latest
logger.Info("Starting watcher",
zap.String("watcher_name", "sui"),
zap.String("suiRPC", e.suiRPC),
zap.String("suiWS", e.suiWS),
zap.String("suiMoveEventType", e.suiMoveEventType),
zap.Bool("unsafeDevMode", e.unsafeDevMode),
)
ws, _, err := websocket.Dial(ctx, e.suiWS, nil)
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
suiConnectionErrors.WithLabelValues("websocket_dial_error").Inc()
return fmt.Errorf("websocket dial failed: %w", err)
}
defer ws.Close(websocket.StatusNormalClosure, "")
nBig, _ := rand.Int(rand.Reader, big.NewInt(27))
e.subId = nBig.Int64()
subscription := fmt.Sprintf(`{"jsonrpc":"2.0", "id": %d, "method": "suix_subscribeEvent", "params": [{"MoveEventType": "%s"}]}`, e.subId, e.suiMoveEventType)
logger.Debug("Subscribing using", zap.String("json:", subscription))
err = ws.Write(ctx, websocket.MessageText, []byte(subscription))
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
suiConnectionErrors.WithLabelValues("websocket_subscription_error").Inc()
return fmt.Errorf("websocket subscription failed: %w", err)
}
// Wait for the success response
mType, p, err := ws.Read(ctx)
if err != nil {
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
suiConnectionErrors.WithLabelValues("event_subscription_error").Inc()
return fmt.Errorf("event subscription failed: %w", err)
}
var subRes map[string]any
err = json.Unmarshal(p, &subRes)
if err != nil {
return fmt.Errorf("failed to Unmarshal the subscription result: %w", err)
}
logger.Debug("Unmarshalled json", zap.Any("subRes", subRes))
actualResult := subRes["result"]
logger.Debug("actualResult", zap.Any("res", actualResult))
if actualResult == nil {
return fmt.Errorf("Failed to request filter in subscription request")
}
logger.Debug("subscribed to new transaction events", zap.Int("messageType", int(mType)), zap.String("bytes", string(p)))
timer := time.NewTicker(time.Second * 5)
defer timer.Stop()
@ -351,36 +303,31 @@ func (e *Watcher) Run(ctx context.Context) error {
return ctx.Err()
default:
_, msg, err := ws.Read(ctx)
fmt.Println("Getting events...", time.Now().Format(time.RFC3339))
// This will return an array of events in the correct range and order
event, err := e.getEvents()
if err != nil {
logger.Error(fmt.Sprintf("ReadMessage: '%s'", err.Error()))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
suiConnectionErrors.WithLabelValues("channel_read_error").Inc()
return err
}
var res SuiEventMsg
err = json.Unmarshal(msg, &res)
if err != nil {
logger.Error("Failed to unmarshal SuiEventMsg", zap.String("body", string(msg)), zap.Error(err))
return fmt.Errorf("Failed to unmarshal SuiEventMsg, body: %s, error: %w", string(msg), err)
}
if res.Error != nil {
return fmt.Errorf("Bad SuiEventMsg, body: %s, error: %w", string(msg), err)
}
logger.Debug("SUI result message", zap.String("message", string(msg)), zap.Any("event", res))
if res.ID != nil {
logger.Error("Found an unexpected res.ID")
logger.Error(fmt.Sprintf("sui_data_pump Error: %s", err.Error()))
continue
}
if res.Params != nil && (*res.Params).Result != nil {
err := e.inspectBody(logger, *(*res.Params).Result, false)
for _, datum := range event.Data {
err = e.inspectBody(logger, datum, false)
if err != nil {
logger.Error(fmt.Sprintf("inspectBody: %s", err.Error()))
logger.Error(fmt.Sprintf("inspectBody Error: %s", err.Error()))
continue
}
// Get the checkpoint for the event
fmt.Println("Getting checkpoint for returned event")
lph, err := e.getCheckpointForEvent(datum)
if err != nil {
fmt.Println("Error:", err)
continue
}
if lph > e.latestProcessedCheckpoint {
e.latestProcessedCheckpoint = lph
}
continue
}
time.Sleep(time.Duration(e.loopDelayInSecs) * time.Second)
}
}
})
@ -503,25 +450,154 @@ func (e *Watcher) Run(ctx context.Context) error {
select {
case <-ctx.Done():
_ = ws.Close(websocket.StatusNormalClosure, "")
return ctx.Err()
case err := <-errC:
_ = ws.Close(websocket.StatusInternalError, err.Error())
return err
}
}
// fixSuiWsURL ensures the websocket scheme is properly specified
func (e *Watcher) fixSuiWsURL(logger *zap.Logger) error {
u, _ := url.Parse(e.suiWS)
// When the scheme is empty/nil or when the Host is empty but a scheme is set
if u == nil || u.Scheme == "" || (u.Scheme != "" && u.Host == "") {
logger.Warn(fmt.Sprintf("DEPRECATED: Prefix --suiWS address with the url scheme e.g.: ws://%s or wss://%s", e.suiWS, e.suiWS))
u = &url.URL{Scheme: "ws", Host: e.suiWS}
} else if u.Scheme != "ws" && u.Scheme != "wss" {
return fmt.Errorf("invalid url scheme specified for --suiWS, try ws:// or wss://: %s", e.suiWS)
func (w *Watcher) getEvents() (SuiEventResponseData, error) {
// Only get events newer than the last processed height
var retVal SuiEventResponseData
var nextCursor struct {
TxDigest string
EventSeq string
}
e.suiWS = u.String()
return nil
firstTime := true
done := false
for !done {
var reader io.Reader
if firstTime {
reader = strings.NewReader(
fmt.Sprintf(`{"jsonrpc":"2.0", "id": 1, "method": "suix_queryEvents", "params": [{ "MoveEventType": "%s" }, null, %d, %t]}`,
w.suiMoveEventType, w.maximumBatchSize, w.descendingOrder))
} else {
reader = strings.NewReader(
fmt.Sprintf(`{"jsonrpc":"2.0", "id": 1, "method": "suix_queryEvents", "params": [{ "MoveEventType": "%s" }, { "txDigest": "%s", "eventSeq": "%s" }, %d, %t]}`,
w.suiMoveEventType, nextCursor.TxDigest, nextCursor.EventSeq, w.maximumBatchSize, w.descendingOrder))
}
resp, err := http.Post(w.suiRPC, "application/json", reader) //nolint:noctx // TODO FIXME we should propagate context with Deadline here.
if err != nil {
return retVal, fmt.Errorf("suix_queryEvents failed to post: %w", err)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return retVal, fmt.Errorf("suix_queryEvents failed to read: %w", err)
}
resp.Body.Close()
var res SuiEventResponse
err = json.Unmarshal(body, &res)
if err != nil {
return retVal, fmt.Errorf("suix_queryEvents failed to unmarshal body: %s, error: %w", string(body), err)
}
fmt.Println("Number of events:", len(res.Result.Data))
for _, datum := range res.Result.Data {
// Check if the event is newer than the last processed height
height, hErr := w.getCheckpointForEvent(datum)
if hErr != nil {
fmt.Println("Error getting checkpoint for event:", hErr)
return retVal, hErr
}
fmt.Println("Comparing Height:", height, "LastProcessedHeight:", w.latestProcessedCheckpoint)
if height <= w.latestProcessedCheckpoint {
done = true
fmt.Println("Done processing events.")
break
} else {
fmt.Println("Adding event to retVal")
retVal.Data = append(retVal.Data, datum)
}
}
if (!done) && res.Result.HasNextPage {
fmt.Println("Getting next page...")
nextCursor.TxDigest = res.Result.NextCursor.TxDigest
nextCursor.EventSeq = res.Result.NextCursor.EventSeq
} else {
done = true
}
}
retVal.Data = reverseArray(retVal.Data)
return retVal, nil
}
func (e *Watcher) getLatestCheckpointSN(logger *zap.Logger) (int64, error) {
resp, err := http.Post(e.suiRPC, "application/json", strings.NewReader(`{"jsonrpc":"2.0", "id": 1, "method": "sui_getLatestCheckpointSequenceNumber", "params": []}`)) //nolint:noctx // TODO FIXME we should propagate context with Deadline here.
if err != nil {
logger.Error("sui_getLatestCheckpointSequenceNumber failed", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
return 0, fmt.Errorf("sui_getLatestCheckpointSequenceNumber failed to post: %w", err)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
logger.Error("sui_getLatestCheckpointSequenceNumber failed", zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
return 0, fmt.Errorf("sui_getLatestCheckpointSequenceNumber failed to read: %w", err)
}
resp.Body.Close()
logger.Debug("Body before unmarshalling", zap.String("body", string(body)))
var res SuiCheckpointSN
err = json.Unmarshal(body, &res)
if err != nil {
logger.Error("unmarshal failed into uint64", zap.String("body", string(body)), zap.Error(err))
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDSui, 1)
return 0, fmt.Errorf("sui_getLatestCheckpointSequenceNumber failed to unmarshal body: %s, error: %w", string(body), err)
}
height, pErr := strconv.ParseInt(res.Result, 0, 64)
if pErr != nil {
logger.Error("Failed to ParseInt")
} else {
currentSuiHeight.Set(float64(height))
logger.Debug("sui_getLatestCheckpointSequenceNumber", zap.String("result", res.Result))
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDSui, &gossipv1.Heartbeat_Network{
Height: height,
ContractAddress: e.suiMoveEventType,
})
}
return height, nil
}
func (e *Watcher) getCheckpointForEvent(event SuiResult) (int64, error) {
retVal := int64(0)
reader := strings.NewReader(fmt.Sprintf(`{"jsonrpc":"2.0", "id": 1, "method": "sui_getTransactionBlock", "params": [ "%s" ]}`,
*event.ID.TxDigest))
resp, err := http.Post(e.suiRPC, "application/json", reader) //nolint:noctx // TODO FIXME we should propagate context with Deadline here.
if err != nil {
return retVal, fmt.Errorf("sui_getTransactionBlock failed to post: %w", err)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return retVal, fmt.Errorf("sui_getTransactionBlock failed to read: %w", err)
}
resp.Body.Close()
var res GetCheckpointResponse
err = json.Unmarshal(body, &res)
if err != nil {
return retVal, fmt.Errorf("sui_getTransactionBlock failed to unmarshal body: %s, error: %w", string(body), err)
}
retVal, err = strconv.ParseInt(res.Result.Checkpoint, 10, 64)
if err != nil {
return retVal, fmt.Errorf("sui_getTransactionBlock failed to ParseInt: %w", err)
}
return retVal, nil
}
// reverseArray reverses the elements of the given slice
func reverseArray[T any](arr []T) []T {
left := 0
right := len(arr) - 1
for left < right {
// Swap the elements
arr[left], arr[right] = arr[right], arr[left]
left++
right--
}
return arr
}

View File

@ -7,84 +7,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)
func Test_fixSuiWsURL(t *testing.T) {
tests := []struct {
name string
value string
expected string
logMessage string
errMessage string
}{
{
name: "valid",
value: "ws://1.2.3.4:5678",
expected: "ws://1.2.3.4:5678",
},
{
name: "tilt",
value: "sui:9000",
expected: "ws://sui:9000",
logMessage: "DEPRECATED: Prefix --suiWS address with the url scheme e.g.: ws://sui:9000 or wss://sui:9000",
},
{
name: "valid-no-port",
value: "ws://1.2.3.4",
expected: "ws://1.2.3.4",
},
{
name: "no-scheme",
value: "1.2.3.4:5678",
expected: "ws://1.2.3.4:5678",
logMessage: "DEPRECATED: Prefix --suiWS address with the url scheme e.g.: ws://1.2.3.4:5678 or wss://1.2.3.4:5678",
},
{
name: "no-scheme-no-port",
value: "1.2.3.4",
expected: "ws://1.2.3.4",
logMessage: "DEPRECATED: Prefix --suiWS address with the url scheme e.g.: ws://1.2.3.4 or wss://1.2.3.4",
},
{
name: "wrong-scheme",
value: "http://1.2.3.4",
errMessage: "invalid url scheme specified for --suiWS, try ws:// or wss://: http://1.2.3.4",
},
}
for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
testCore, logs := observer.New(zap.InfoLevel)
testLogger := zap.New(testCore)
suiWatcher := Watcher{
suiWS: testCase.value,
}
err := suiWatcher.fixSuiWsURL(testLogger)
if testCase.errMessage != "" {
require.EqualError(t, err, testCase.errMessage)
} else {
require.NoError(t, err)
// Only verify the value if no error was returned
assert.Equal(t, testCase.expected, suiWatcher.suiWS)
}
if len(testCase.logMessage) != 0 {
// If the testcase expects a log, then there should only be 1 log
require.Equal(t, 1, logs.Len())
// Ensure the log message is correct
actualLogMessage := logs.All()[0].Message
require.Equal(t, testCase.logMessage, actualLogMessage)
} else {
// If the testcase does not expect a log, none should be emitted
require.Equal(t, 0, logs.Len())
}
})
}
}
func Test_JSONParseOneWHMSg(t *testing.T) {
// JSON with only the first result (contains all of the fields in `FieldsData` - parses successfully)
msg := []byte("{\"jsonrpc\":\"2.0\",\"result\":[{\"id\":{\"txDigest\":\"2Z4A1ND5JL8c5ma9WMzFXUvpVqnwoQdYuaX4RwnLyMXU\",\"eventSeq\":\"0\"},\"packageId\":\"0x826915f8ca6d11597dfe6599b8aa02a4c08bd8d39674855254a06ee83fe7220e\",\"transactionModule\":\"lending_portal_v2\",\"sender\":\"0xccce7bbffaf1b9e9e8ca88a68a08fec11f568a697023f475f99efb7bcee951cf\",\"type\":\"0x5306f64e312b581766351c07af79c72fcb1cd25147157fdc2f8ad76de9a3fb6a::publish_message::WormholeMessage\",\"parsedJson\":{\"consistency_level\":0,\"nonce\":0,\"payload\":[0,1,0,34,0,0,204,206,123,191,250,241,185,233,232,202,136,166,138,8,254,193,31,86,138,105,112,35,244,117,249,158,251,123,206,233,81,207,2,0,133,0,0,0,0,0,0,0,0,10,202,0,0,0,10,122,53,130,0,0,76,0,0,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,50,58,58,115,117,105,58,58,83,85,73,0,34,0,0,204,206,123,191,250,241,185,233,232,202,136,166,138,8,254,193,31,86,138,105,112,35,244,117,249,158,251,123,206,233,81,207,2],\"sender\":\"0xdd1ca0bd0b9e449ff55259e5bcf7e0fc1b8b7ab49aabad218681ccce7b202bd6\",\"sequence\":\"2768\",\"timestamp\":\"1693091880\"},\"bcs\":\"J8cfJrtMWT2kg6uBgWQmd8T9k9cSibQg65ufpgxugVM2ghgC8bb1vvqoXmETiMvfb9DJLEDKy2pnvAYivyWJfz8zKSn5u7EfDbMntpszG7D4gsNNu9cU2rMUi4aF7DXnv6QAp5hoaHvJymehRwXkncHfjZ7zKsZ8cUtSKJh6S6YjHMRZ67s1PPwGEVwUdQt5S3WhQdag3tuySe8FDrUWgJfbBawyUKLdbNcR1aXFtBiPu6jQ51BF7sv13x9hp2nbs5EUMYjnN1ykK4YQaKx55eY7TQcxVCRzPrEARSkMjB8VgqefLNpwiCRdq\"}],\"id\":1}")