From 3f965da33b85bdbae1e250958db7b077377fd3cd Mon Sep 17 00:00:00 2001 From: jumpsiegel <83408952+jumpsiegel@users.noreply.github.com> Date: Fri, 5 Aug 2022 12:49:16 -0500 Subject: [PATCH] node: add near support (#1397) * node: add near support * Tweaks suggested in zoom review Co-authored-by: Bruce Riley --- node/cmd/guardiand/node.go | 35 +++- node/pkg/common/readiness.go | 1 + node/pkg/near/watcher.go | 381 +++++++++++++++++++++++++++++++++++ node/pkg/vaa/structs.go | 6 + node/pkg/vaa/structs_test.go | 3 + 5 files changed, 425 insertions(+), 1 deletion(-) create mode 100644 node/pkg/near/watcher.go diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index d80931705..91f941e16 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -45,6 +45,7 @@ import ( cosmwasm "github.com/certusone/wormhole/node/pkg/terra" "github.com/certusone/wormhole/node/pkg/algorand" + "github.com/certusone/wormhole/node/pkg/near" ipfslog "github.com/ipfs/go-log/v2" ) @@ -125,6 +126,9 @@ var ( algorandAlgodToken *string algorandAppID *uint64 + nearRPC *string + nearContract *string + solanaWsRPC *string solanaRPC *string @@ -239,6 +243,9 @@ func init() { algorandAlgodToken = NodeCmd.Flags().String("algorandAlgodToken", "", "Algorand Algod access token") algorandAppID = NodeCmd.Flags().Uint64("algorandAppID", 0, "Algorand app id") + nearRPC = NodeCmd.Flags().String("nearRPC", "", "near RPC URL") + nearContract = NodeCmd.Flags().String("nearContract", "", "near contract") + solanaWsRPC = NodeCmd.Flags().String("solanaWS", "", "Solana Websocket URL (required") solanaRPC = NodeCmd.Flags().String("solanaRPC", "", "Solana RPC URL (required") @@ -378,6 +385,9 @@ func runNode(cmd *cobra.Command, args []string) { if *algorandIndexerRPC != "" { readiness.RegisterComponent(common.ReadinessAlgorandSyncing) } + if *nearRPC != "" { + readiness.RegisterComponent(common.ReadinessNearSyncing) + } readiness.RegisterComponent(common.ReadinessBSCSyncing) readiness.RegisterComponent(common.ReadinessPolygonSyncing) readiness.RegisterComponent(common.ReadinessAvalancheSyncing) @@ -522,6 +532,15 @@ func runNode(cmd *cobra.Command, args []string) { if *celoContract == "" && !*unsafeDevMode { logger.Fatal("Please specify --celoContract") } + if *testnetMode || *unsafeDevMode { + if *nearRPC != "" { + if *nearContract == "" { + logger.Fatal("If --nearRPC is specified, then --nearContract must be specified") + } + } else if *nearContract != "" { + logger.Fatal("If --nearContract is specified, then --nearRPC must be specified") + } + } if *testnetMode { if *ethRopstenRPC == "" { logger.Fatal("Please specify --ethRopstenRPC") @@ -578,6 +597,12 @@ func runNode(cmd *cobra.Command, args []string) { if *injectiveContract != "" && !*unsafeDevMode { logger.Fatal("Please do not specify --injectiveContract") } + if *nearRPC != "" && !*unsafeDevMode { + logger.Fatal("Please do not specify --nearRPC") + } + if *nearContract != "" && !*unsafeDevMode { + logger.Fatal("Please do not specify --nearContract") + } } if *nodeName == "" { logger.Fatal("Please specify --nodeName") @@ -643,7 +668,6 @@ func runNode(cmd *cobra.Command, args []string) { logger.Fatal("Please specify --pythnetUrl") } } - } if *bigTablePersistenceEnabled { @@ -795,6 +819,9 @@ func runNode(cmd *cobra.Command, args []string) { if *testnetMode || *unsafeDevMode { chainObsvReqC[vaa.ChainIDAlgorand] = make(chan *gossipv1.ObservationRequest) } + if *nearRPC != "" { + chainObsvReqC[vaa.ChainIDNear] = make(chan *gossipv1.ObservationRequest) + } chainObsvReqC[vaa.ChainIDAurora] = make(chan *gossipv1.ObservationRequest) chainObsvReqC[vaa.ChainIDFantom] = make(chan *gossipv1.ObservationRequest) chainObsvReqC[vaa.ChainIDKarura] = make(chan *gossipv1.ObservationRequest) @@ -1033,6 +1060,12 @@ func runNode(cmd *cobra.Command, args []string) { return err } } + if *nearRPC != "" { + if err := supervisor.Run(ctx, "nearwatch", + near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear]).Run); err != nil { + return err + } + } if *solanaWsRPC != "" { if err := supervisor.Run(ctx, "solwatch-confirmed", diff --git a/node/pkg/common/readiness.go b/node/pkg/common/readiness.go index 00d201343..c50c87b1d 100644 --- a/node/pkg/common/readiness.go +++ b/node/pkg/common/readiness.go @@ -7,6 +7,7 @@ const ( ReadinessSolanaSyncing readiness.Component = "solanaSyncing" ReadinessTerraSyncing readiness.Component = "terraSyncing" ReadinessAlgorandSyncing readiness.Component = "algorandSyncing" + ReadinessNearSyncing readiness.Component = "nearSyncing" ReadinessBSCSyncing readiness.Component = "bscSyncing" ReadinessPolygonSyncing readiness.Component = "polygonSyncing" ReadinessEthRopstenSyncing readiness.Component = "ethRopstenSyncing" diff --git a/node/pkg/near/watcher.go b/node/pkg/near/watcher.go new file mode 100644 index 000000000..a8e2a76b5 --- /dev/null +++ b/node/pkg/near/watcher.go @@ -0,0 +1,381 @@ +package near + +import ( + "bytes" + "context" + "encoding/hex" + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + + "github.com/certusone/wormhole/node/pkg/common" + "github.com/certusone/wormhole/node/pkg/p2p" + gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" + "github.com/certusone/wormhole/node/pkg/readiness" + "github.com/certusone/wormhole/node/pkg/supervisor" + "github.com/certusone/wormhole/node/pkg/vaa" + eth_common "github.com/ethereum/go-ethereum/common" + "github.com/mr-tron/base58" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/tidwall/gjson" + "go.uber.org/zap" +) + +type ( + // Watcher is responsible for looking over Near blockchain and reporting new transactions to the wormhole contract + Watcher struct { + nearRPC string + wormholeContract string + + msgChan chan *common.MessagePublication + obsvReqC chan *gossipv1.ObservationRequest + + next_round uint64 + } +) + +var ( + nearMessagesConfirmed = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "wormhole_near_observations_confirmed_total", + Help: "Total number of verified Near observations found", + }) + currentNearHeight = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "wormhole_near_current_height", + Help: "Current Near block height", + }) +) + +// NewWatcher creates a new Near appid watcher +func NewWatcher( + nearRPC string, + wormholeContract string, + lockEvents chan *common.MessagePublication, + obsvReqC chan *gossipv1.ObservationRequest, +) *Watcher { + return &Watcher{ + nearRPC: nearRPC, + wormholeContract: wormholeContract, + msgChan: lockEvents, + obsvReqC: obsvReqC, + next_round: 0, + } +} + +func (e *Watcher) getBlock(block uint64) ([]byte, error) { + s := fmt.Sprintf(`{"id": "dontcare", "jsonrpc": "2.0", "method": "block", "params": {"block_id": %d}}`, block) + resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s))) + + if err != nil { + return nil, err + } + + defer resp.Body.Close() + return ioutil.ReadAll(resp.Body) +} + +func (e *Watcher) getFinalBlock() ([]byte, error) { + s := `{"id": "dontcare", "jsonrpc": "2.0", "method": "block", "params": {"finality": "final"}}` + resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s))) + + if err != nil { + return nil, err + } + + defer resp.Body.Close() + return ioutil.ReadAll(resp.Body) +} + +func (e *Watcher) getChunk(chunk string) ([]byte, error) { + s := fmt.Sprintf(`{"id": "dontcare", "jsonrpc": "2.0", "method": "chunk", "params": {"chunk_id": "%s"}}`, chunk) + + resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s))) + + if err != nil { + return nil, err + } + + defer resp.Body.Close() + return ioutil.ReadAll(resp.Body) +} + +func (e *Watcher) getTxStatus(logger *zap.Logger, tx string, src string) ([]byte, error) { + s := fmt.Sprintf(`{"id": "dontcare", "jsonrpc": "2.0", "method": "EXPERIMENTAL_tx_status", "params": ["%s", "%s"]}`, tx, src) + + resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s))) + + if err != nil { + return nil, err + } + + defer resp.Body.Close() + return ioutil.ReadAll(resp.Body) +} + +func (e *Watcher) inspectStatus(logger *zap.Logger, hash string, receiver_id string, ts uint64) error { + t, err := e.getTxStatus(logger, hash, receiver_id) + + if err != nil { + return err + } + + outcomes := gjson.ParseBytes(t).Get("result.receipts_outcome") + + if !outcomes.Exists() { + return nil + } + + for _, o := range outcomes.Array() { + outcome := o.Get("outcome") + if !outcome.Exists() { + continue + } + + executor_id := outcome.Get("executor_id") + if !executor_id.Exists() { + continue + } + + if executor_id.String() == e.wormholeContract { + l := outcome.Get("logs") + if !l.Exists() { + continue + } + for _, log := range l.Array() { + event := log.String() + if !strings.HasPrefix(event, "EVENT_JSON:") { + continue + } + logger.Info("event", zap.String("event", event[11:])) + + event_json := gjson.ParseBytes([]byte(event[11:])) + + standard := event_json.Get("standard") + if !standard.Exists() || standard.String() != "wormhole" { + continue + } + event_type := event_json.Get("event") + if !event_type.Exists() || event_type.String() != "publish" { + continue + } + + em := event_json.Get("emitter") + if !em.Exists() { + continue + } + + emitter, err := hex.DecodeString(em.String()) + if err != nil { + return err + } + + var a vaa.Address + copy(a[:], emitter) + + id, err := base58.Decode(hash) + if err != nil { + return err + } + + var txHash = eth_common.BytesToHash(id) // 32 bytes = d3b136a6a182a40554b2fafbc8d12a7a22737c10c81e33b33d1dcb74c532708b + + v := event_json.Get("data") + if !v.Exists() { + logger.Info("data") + return nil + } + + pl, err := hex.DecodeString(v.String()) + if err != nil { + return err + } + + observation := &common.MessagePublication{ + TxHash: txHash, + Timestamp: time.Unix(int64(ts), 0), + Nonce: uint32(event_json.Get("nonce").Uint()), // uint32 + Sequence: event_json.Get("seq").Uint(), + EmitterChain: vaa.ChainIDNear, + EmitterAddress: a, + Payload: pl, + ConsistencyLevel: 0, + } + + nearMessagesConfirmed.Inc() + + logger.Info("message observed", + zap.Uint64("ts", ts), + zap.Time("timestamp", observation.Timestamp), + zap.Uint32("nonce", observation.Nonce), + zap.Uint64("sequence", observation.Sequence), + zap.Stringer("emitter_chain", observation.EmitterChain), + zap.Stringer("emitter_address", observation.EmitterAddress), + zap.Binary("payload", observation.Payload), + zap.Uint8("consistency_level", observation.ConsistencyLevel), + ) + + e.msgChan <- observation + } + } + } + + return nil +} + +func (e *Watcher) inspectBody(logger *zap.Logger, block uint64, body gjson.Result) error { + logger.Info("inspectBody", zap.Uint64("block", block)) + + result := body.Get("result.chunks.#.chunk_hash") + if !result.Exists() { + return nil + } + + v := body.Get("result.header.timestamp") + if !v.Exists() { + return nil + } + ts := uint64(v.Uint()) / 1000000000 + + for _, name := range result.Array() { + chunk, err := e.getChunk(name.String()) + if err != nil { + return err + } + + txns := gjson.ParseBytes(chunk).Get("result.transactions") + if !txns.Exists() { + continue + } + for _, r := range txns.Array() { + hash := r.Get("hash") + receiver_id := r.Get("receiver_id") + if !hash.Exists() || !receiver_id.Exists() { + continue + } + + err = e.inspectStatus(logger, hash.String(), receiver_id.String(), ts) + if err != nil { + return err + } + } + + } + return nil +} + +func (e *Watcher) Run(ctx context.Context) error { + p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDNear, &gossipv1.Heartbeat_Network{ + ContractAddress: e.wormholeContract, + }) + + logger := supervisor.Logger(ctx) + errC := make(chan error) + + logger.Info("Near watcher connecting to RPC node ", zap.String("url", e.nearRPC)) + + go func() { + if e.next_round == 0 { + finalBody, err := e.getFinalBlock() + if err != nil { + logger.Error("StatusAfterBlock", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1) + errC <- err + return + } + e.next_round = gjson.ParseBytes(finalBody).Get("result.chunks.0.height_created").Uint() + } + + timer := time.NewTicker(time.Second * 1) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return + case r := <-e.obsvReqC: + if vaa.ChainID(r.ChainId) != vaa.ChainIDNear { + panic("invalid chain ID") + } + + txHash := base58.Encode(r.TxHash) + + logger.Info("Received obsv request", zap.String("tx_hash", txHash)) + + err := e.inspectStatus(logger, txHash, e.wormholeContract, 0) + if err != nil { + logger.Error(fmt.Sprintf("near obsvReqC: %s", err.Error())) + } + + case <-timer.C: + finalBody, err := e.getFinalBlock() + if err != nil { + logger.Error(fmt.Sprintf("nearClient.Status: %s", err.Error())) + + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1) + errC <- err + return + } + parsedFinalBody := gjson.ParseBytes(finalBody) + lastBlock := parsedFinalBody.Get("result.chunks.0.height_created").Uint() + + logger.Info("lastBlock", zap.Uint64("lastBlock", lastBlock), zap.Uint64("next_round", e.next_round)) + + if lastBlock < e.next_round { + logger.Error("Went backwards... ") + e.next_round = lastBlock + } + + for ; e.next_round <= lastBlock; e.next_round = e.next_round + 1 { + if e.next_round == lastBlock { + err := e.inspectBody(logger, e.next_round, parsedFinalBody) + if err != nil { + logger.Error(fmt.Sprintf("inspectBody: %s", err.Error())) + + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1) + errC <- err + return + + } + } else { + b, err := e.getBlock(e.next_round) + if err != nil { + logger.Error(fmt.Sprintf("nearClient.Status: %s", err.Error())) + + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1) + errC <- err + return + + } + err = e.inspectBody(logger, e.next_round, gjson.ParseBytes(b)) + if err != nil { + logger.Error(fmt.Sprintf("inspectBody: %s", err.Error())) + + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDNear, 1) + errC <- err + return + + } + } + } + + currentNearHeight.Set(float64(e.next_round - 1)) + p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDNear, &gossipv1.Heartbeat_Network{ + Height: int64(e.next_round - 1), + ContractAddress: e.wormholeContract, + }) + readiness.SetReady(common.ReadinessNearSyncing) + } + } + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errC: + return err + } +} diff --git a/node/pkg/vaa/structs.go b/node/pkg/vaa/structs.go index 0c577a61b..b2d72cfa3 100644 --- a/node/pkg/vaa/structs.go +++ b/node/pkg/vaa/structs.go @@ -114,6 +114,8 @@ func (c ChainID) String() string { return "fantom" case ChainIDAlgorand: return "algorand" + case ChainIDNear: + return "near" case ChainIDEthereumRopsten: return "ethereum-ropsten" case ChainIDKarura: @@ -163,6 +165,8 @@ func ChainIDFromString(s string) (ChainID, error) { return ChainIDFantom, nil case "algorand": return ChainIDAlgorand, nil + case "near": + return ChainIDNear, nil case "ethereum-ropsten": return ChainIDEthereumRopsten, nil case "karura": @@ -218,6 +222,8 @@ const ( ChainIDKlaytn ChainID = 13 // ChainIDCelo is the ChainID of Celo ChainIDCelo ChainID = 14 + // ChainIDNear is the ChainID of Near + ChainIDNear ChainID = 15 // ChainIDMoonbeam is the ChainID of Moonbeam ChainIDMoonbeam ChainID = 16 // ChainIDNeon is the ChainID of Neon diff --git a/node/pkg/vaa/structs_test.go b/node/pkg/vaa/structs_test.go index 1d73c4c3c..cedc06820 100644 --- a/node/pkg/vaa/structs_test.go +++ b/node/pkg/vaa/structs_test.go @@ -32,6 +32,7 @@ func TestChainIDFromString(t *testing.T) { {input: "avalanche", output: ChainIDAvalanche}, {input: "oasis", output: ChainIDOasis}, {input: "algorand", output: ChainIDAlgorand}, + {input: "near", output: ChainIDNear}, {input: "aurora", output: ChainIDAurora}, {input: "fantom", output: ChainIDFantom}, {input: "karura", output: ChainIDKarura}, @@ -52,6 +53,7 @@ func TestChainIDFromString(t *testing.T) { {input: "Avalanche", output: ChainIDAvalanche}, {input: "Oasis", output: ChainIDOasis}, {input: "Algorand", output: ChainIDAlgorand}, + {input: "Near", output: ChainIDNear}, {input: "Aurora", output: ChainIDAurora}, {input: "Fantom", output: ChainIDFantom}, {input: "Karura", output: ChainIDKarura}, @@ -148,6 +150,7 @@ func TestChainId_String(t *testing.T) { {input: 12, output: "acala"}, {input: 13, output: "klaytn"}, {input: 14, output: "celo"}, + {input: 15, output: "near"}, {input: 16, output: "moonbeam"}, {input: 17, output: "neon"}, {input: 18, output: "terra2"},