diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index f47cd7c01..ca737d4d0 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -1025,7 +1025,7 @@ func runNode(cmd *cobra.Command, args []string) { } if *nearRPC != "" { if err := supervisor.Run(ctx, "nearwatch", - near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear]).Run); err != nil { + near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear], !(*unsafeDevMode || *testnetMode)).Run); err != nil { return err } } diff --git a/node/pkg/near/watcher.go b/node/pkg/near/watcher.go index ab60ee839..55acd02f8 100644 --- a/node/pkg/near/watcher.go +++ b/node/pkg/near/watcher.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/hex" + "errors" "fmt" "io/ioutil" "net/http" @@ -28,6 +29,8 @@ import ( type ( // Watcher is responsible for looking over Near blockchain and reporting new transactions to the wormhole contract Watcher struct { + mainnet bool + nearRPC string wormholeContract string @@ -47,7 +50,6 @@ type ( pendingMessage struct { height uint64 - ts uint64 } ) @@ -70,6 +72,7 @@ func NewWatcher( wormholeContract string, lockEvents chan *common.MessagePublication, obsvReqC chan *gossipv1.ObservationRequest, + mainnet bool, ) *Watcher { return &Watcher{ nearRPC: nearRPC, @@ -79,6 +82,7 @@ func NewWatcher( next_round: 0, final_round: 0, pending: map[pendingKey]*pendingMessage{}, + mainnet: mainnet, } } @@ -94,6 +98,22 @@ func (e *Watcher) getBlock(block uint64) ([]byte, error) { return ioutil.ReadAll(resp.Body) } +func (e *Watcher) getBlockHash(block_id string) ([]byte, error) { + s := fmt.Sprintf(`{"id": "dontcare", "jsonrpc": "2.0", "method": "block", "params": {"block_id": "%s"}}`, block_id) + resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s))) + + if err != nil { + // TODO: We should look at the specifics of the error before we try twice + resp, err = http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s))) + if err != nil { + return nil, err + } + } + + defer resp.Body.Close() + return ioutil.ReadAll(resp.Body) +} + func (e *Watcher) getFinalBlock() ([]byte, error) { s := `{"id": "dontcare", "jsonrpc": "2.0", "method": "block", "params": {"finality": "final"}}` resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s))) @@ -132,7 +152,7 @@ func (e *Watcher) getTxStatus(logger *zap.Logger, tx string, src string) ([]byte return ioutil.ReadAll(resp.Body) } -func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string, ts uint64) error { +func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string) error { outcomes := gjson.ParseBytes(t).Get("result.receipts_outcome") if !outcomes.Exists() { @@ -155,6 +175,11 @@ func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string, ts uint if !l.Exists() { continue } + block_hash := o.Get("block_hash") + if !block_hash.Exists() { + logger.Error("block_hash key not found") + continue + } for _, log := range l.Array() { event := log.String() if !strings.HasPrefix(event, "EVENT_JSON:") { @@ -204,6 +229,29 @@ func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string, ts uint return err } + block_hash_str := block_hash.String() + + txBlock, err := e.getBlockHash(block_hash_str) + if err != nil { + return err + } + body := gjson.ParseBytes(txBlock) + if !body.Exists() { + return errors.New("block parse error") + } + ts_nanosec := body.Get("result.header.timestamp") + if !ts_nanosec.Exists() { + return errors.New("block parse error, missing timestamp") + } + ts := uint64(ts_nanosec.Uint()) / 1000000000 + + if e.mainnet { + height := body.Get("result.header.height") + if height.Exists() && height.Uint() < 74473147 { + return errors.New("test missing observe") + } + } + observation := &common.MessagePublication{ TxHash: txHash, Timestamp: time.Unix(int64(ts), 0), @@ -236,17 +284,17 @@ func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string, ts uint return nil } -func (e *Watcher) inspectStatus(logger *zap.Logger, hash string, receiver_id string, ts uint64) error { +func (e *Watcher) inspectStatus(logger *zap.Logger, hash string, receiver_id string) error { t, err := e.getTxStatus(logger, hash, receiver_id) if err != nil { return err } - return e.parseStatus(logger, t, hash, ts) + return e.parseStatus(logger, t, hash) } -func (e *Watcher) lastBlock(logger *zap.Logger, hash string, receiver_id string, ts uint64) ([]byte, uint64, error) { +func (e *Watcher) lastBlock(logger *zap.Logger, hash string, receiver_id string) ([]byte, uint64, error) { t, err := e.getTxStatus(logger, hash, receiver_id) if err != nil { @@ -320,12 +368,6 @@ func (e *Watcher) inspectBody(logger *zap.Logger, block uint64, body gjson.Resul return nil } - v := body.Get("result.header.timestamp") - if !v.Exists() { - return nil - } - ts := uint64(v.Uint()) / 1000000000 - for _, name := range result.Array() { chunk, err := e.getChunk(name.String()) if err != nil { @@ -343,7 +385,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, block uint64, body gjson.Resul continue } - t, round, err := e.lastBlock(logger, hash.String(), receiver_id.String(), ts) + t, round, err := e.lastBlock(logger, hash.String(), receiver_id.String()) if err != nil { return err } @@ -351,7 +393,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, block uint64, body gjson.Resul if round <= e.final_round { logger.Info("parseStatus direct", zap.Uint64("block.height", round), zap.Uint64("e.final_round", e.final_round)) - err := e.parseStatus(logger, t, hash.String(), ts) + err := e.parseStatus(logger, t, hash.String()) if err != nil { return err } @@ -367,7 +409,6 @@ func (e *Watcher) inspectBody(logger *zap.Logger, block uint64, body gjson.Resul e.pendingMu.Lock() e.pending[key] = &pendingMessage{ height: round, - ts: ts, } e.pendingMu.Unlock() } @@ -402,7 +443,7 @@ func (e *Watcher) Run(ctx context.Context) error { logger.Info("Received obsv request", zap.String("tx_hash", txHash)) - err := e.inspectStatus(logger, txHash, e.wormholeContract, 0) + err := e.inspectStatus(logger, txHash, e.wormholeContract) if err != nil { logger.Error(fmt.Sprintf("near obsvReqC: %s", err.Error())) } @@ -451,7 +492,7 @@ func (e *Watcher) Run(ctx context.Context) error { zap.String("key.hash", key.hash), ) - err := e.inspectStatus(logger, key.hash, e.wormholeContract, bLock.ts) + err := e.inspectStatus(logger, key.hash, e.wormholeContract) delete(e.pending, key) if err != nil {