near/timestamps: timestamp simplifcation

This commit is contained in:
Josh Siegel 2022-09-19 10:54:44 -05:00 committed by jumpsiegel
parent 3fc357ebcd
commit 42779b3a5f
2 changed files with 58 additions and 17 deletions

View File

@ -1025,7 +1025,7 @@ func runNode(cmd *cobra.Command, args []string) {
}
if *nearRPC != "" {
if err := supervisor.Run(ctx, "nearwatch",
near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear]).Run); err != nil {
near.NewWatcher(*nearRPC, *nearContract, lockC, chainObsvReqC[vaa.ChainIDNear], !(*unsafeDevMode || *testnetMode)).Run); err != nil {
return err
}
}

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"io/ioutil"
"net/http"
@ -28,6 +29,8 @@ import (
type (
// Watcher is responsible for looking over Near blockchain and reporting new transactions to the wormhole contract
Watcher struct {
mainnet bool
nearRPC string
wormholeContract string
@ -47,7 +50,6 @@ type (
pendingMessage struct {
height uint64
ts uint64
}
)
@ -70,6 +72,7 @@ func NewWatcher(
wormholeContract string,
lockEvents chan *common.MessagePublication,
obsvReqC chan *gossipv1.ObservationRequest,
mainnet bool,
) *Watcher {
return &Watcher{
nearRPC: nearRPC,
@ -79,6 +82,7 @@ func NewWatcher(
next_round: 0,
final_round: 0,
pending: map[pendingKey]*pendingMessage{},
mainnet: mainnet,
}
}
@ -94,6 +98,22 @@ func (e *Watcher) getBlock(block uint64) ([]byte, error) {
return ioutil.ReadAll(resp.Body)
}
func (e *Watcher) getBlockHash(block_id string) ([]byte, error) {
s := fmt.Sprintf(`{"id": "dontcare", "jsonrpc": "2.0", "method": "block", "params": {"block_id": "%s"}}`, block_id)
resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s)))
if err != nil {
// TODO: We should look at the specifics of the error before we try twice
resp, err = http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s)))
if err != nil {
return nil, err
}
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
func (e *Watcher) getFinalBlock() ([]byte, error) {
s := `{"id": "dontcare", "jsonrpc": "2.0", "method": "block", "params": {"finality": "final"}}`
resp, err := http.Post(e.nearRPC, "application/json", bytes.NewBuffer([]byte(s)))
@ -132,7 +152,7 @@ func (e *Watcher) getTxStatus(logger *zap.Logger, tx string, src string) ([]byte
return ioutil.ReadAll(resp.Body)
}
func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string, ts uint64) error {
func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string) error {
outcomes := gjson.ParseBytes(t).Get("result.receipts_outcome")
if !outcomes.Exists() {
@ -155,6 +175,11 @@ func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string, ts uint
if !l.Exists() {
continue
}
block_hash := o.Get("block_hash")
if !block_hash.Exists() {
logger.Error("block_hash key not found")
continue
}
for _, log := range l.Array() {
event := log.String()
if !strings.HasPrefix(event, "EVENT_JSON:") {
@ -204,6 +229,29 @@ func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string, ts uint
return err
}
block_hash_str := block_hash.String()
txBlock, err := e.getBlockHash(block_hash_str)
if err != nil {
return err
}
body := gjson.ParseBytes(txBlock)
if !body.Exists() {
return errors.New("block parse error")
}
ts_nanosec := body.Get("result.header.timestamp")
if !ts_nanosec.Exists() {
return errors.New("block parse error, missing timestamp")
}
ts := uint64(ts_nanosec.Uint()) / 1000000000
if e.mainnet {
height := body.Get("result.header.height")
if height.Exists() && height.Uint() < 74473147 {
return errors.New("test missing observe")
}
}
observation := &common.MessagePublication{
TxHash: txHash,
Timestamp: time.Unix(int64(ts), 0),
@ -236,17 +284,17 @@ func (e *Watcher) parseStatus(logger *zap.Logger, t []byte, hash string, ts uint
return nil
}
func (e *Watcher) inspectStatus(logger *zap.Logger, hash string, receiver_id string, ts uint64) error {
func (e *Watcher) inspectStatus(logger *zap.Logger, hash string, receiver_id string) error {
t, err := e.getTxStatus(logger, hash, receiver_id)
if err != nil {
return err
}
return e.parseStatus(logger, t, hash, ts)
return e.parseStatus(logger, t, hash)
}
func (e *Watcher) lastBlock(logger *zap.Logger, hash string, receiver_id string, ts uint64) ([]byte, uint64, error) {
func (e *Watcher) lastBlock(logger *zap.Logger, hash string, receiver_id string) ([]byte, uint64, error) {
t, err := e.getTxStatus(logger, hash, receiver_id)
if err != nil {
@ -320,12 +368,6 @@ func (e *Watcher) inspectBody(logger *zap.Logger, block uint64, body gjson.Resul
return nil
}
v := body.Get("result.header.timestamp")
if !v.Exists() {
return nil
}
ts := uint64(v.Uint()) / 1000000000
for _, name := range result.Array() {
chunk, err := e.getChunk(name.String())
if err != nil {
@ -343,7 +385,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, block uint64, body gjson.Resul
continue
}
t, round, err := e.lastBlock(logger, hash.String(), receiver_id.String(), ts)
t, round, err := e.lastBlock(logger, hash.String(), receiver_id.String())
if err != nil {
return err
}
@ -351,7 +393,7 @@ func (e *Watcher) inspectBody(logger *zap.Logger, block uint64, body gjson.Resul
if round <= e.final_round {
logger.Info("parseStatus direct", zap.Uint64("block.height", round), zap.Uint64("e.final_round", e.final_round))
err := e.parseStatus(logger, t, hash.String(), ts)
err := e.parseStatus(logger, t, hash.String())
if err != nil {
return err
}
@ -367,7 +409,6 @@ func (e *Watcher) inspectBody(logger *zap.Logger, block uint64, body gjson.Resul
e.pendingMu.Lock()
e.pending[key] = &pendingMessage{
height: round,
ts: ts,
}
e.pendingMu.Unlock()
}
@ -402,7 +443,7 @@ func (e *Watcher) Run(ctx context.Context) error {
logger.Info("Received obsv request", zap.String("tx_hash", txHash))
err := e.inspectStatus(logger, txHash, e.wormholeContract, 0)
err := e.inspectStatus(logger, txHash, e.wormholeContract)
if err != nil {
logger.Error(fmt.Sprintf("near obsvReqC: %s", err.Error()))
}
@ -451,7 +492,7 @@ func (e *Watcher) Run(ctx context.Context) error {
zap.String("key.hash", key.hash),
)
err := e.inspectStatus(logger, key.hash, e.wormholeContract, bLock.ts)
err := e.inspectStatus(logger, key.hash, e.wormholeContract)
delete(e.pending, key)
if err != nil {