diff --git a/node/pkg/watchers/aptos/watcher.go b/node/pkg/watchers/aptos/watcher.go index 551fe638e..ea49b295f 100644 --- a/node/pkg/watchers/aptos/watcher.go +++ b/node/pkg/watchers/aptos/watcher.go @@ -5,7 +5,7 @@ import ( "encoding/binary" "encoding/hex" "fmt" - "io/ioutil" + "io" "net/http" "time" @@ -64,19 +64,182 @@ func NewWatcher( } } +func (e *Watcher) Run(ctx context.Context) error { + p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAptos, &gossipv1.Heartbeat_Network{ + ContractAddress: e.aptosAccount, + }) + + logger := supervisor.Logger(ctx) + + logger.Info("Aptos watcher connecting to RPC node ", zap.String("url", e.aptosRPC)) + + // SECURITY: the API guarantees that we only get the events from the right + // contract + var eventsEndpoint = fmt.Sprintf(`%s/v1/accounts/%s/events/%s/event`, e.aptosRPC, e.aptosAccount, e.aptosHandle) + var aptosHealth = fmt.Sprintf(`%s/v1`, e.aptosRPC) + + // the events have sequence numbers associated with them in the aptos API + // (NOTE: this is not the same as the wormhole sequence id). The event + // endpoint is paginated, so we use this variable to keep track of which + // sequence number to look up next. + var nextSequence uint64 = 0 + + timer := time.NewTicker(time.Second * 1) + defer timer.Stop() + + supervisor.Signal(ctx, supervisor.SignalHealthy) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case r := <-e.obsvReqC: + if vaa.ChainID(r.ChainId) != vaa.ChainIDAptos { + panic("invalid chain ID") + } + + nativeSeq := binary.BigEndian.Uint64(r.TxHash) + + logger.Info("Received obsv request", zap.Uint64("tx_hash", nativeSeq)) + + s := fmt.Sprintf(`%s?start=%d&limit=1`, eventsEndpoint, nativeSeq) + + body, err := e.retrievePayload(s) + if err != nil { + logger.Error("retrievePayload", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1) + return err + } + + if !gjson.Valid(string(body)) { + logger.Error("InvalidJson: " + string(body)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1) + break + + } + + outcomes := gjson.ParseBytes(body) + + for _, chunk := range outcomes.Array() { + newSeq := chunk.Get("sequence_number") + if !newSeq.Exists() { + break + } + + if newSeq.Uint() != nativeSeq { + logger.Error("newSeq != nativeSeq") + break + + } + + data := chunk.Get("data") + if !data.Exists() { + break + } + e.observeData(logger, data, nativeSeq) + } + + case <-timer.C: + s := "" + + if nextSequence == 0 { + // if nextSequence is 0, we look up the most recent event + s = fmt.Sprintf(`%s?limit=1`, eventsEndpoint) + } else { + // otherwise just look up events starting at nextSequence. + // this will potentially return multiple events (whatever + // the default limit is per page), so we'll handle all of them. + s = fmt.Sprintf(`%s?start=%d`, eventsEndpoint, nextSequence) + } + + eventsJson, err := e.retrievePayload(s) + if err != nil { + logger.Error("retrievePayload", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1) + return err + } + + // data doesn't exist yet. skip, and try again later + // this happens when the sequence id we're looking up hasn't + // been used yet. + if string(eventsJson) == "" { + continue + } + + if !gjson.Valid(string(eventsJson)) { + logger.Error("InvalidJson: " + string(eventsJson)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1) + break + + } + + events := gjson.ParseBytes(eventsJson) + + // the endpoint returns an array of events, ordered by sequence + // id (ASC) + for _, event := range events.Array() { + eventSequence := event.Get("sequence_number") + if !eventSequence.Exists() { + continue + } + + // this is interesting in the last iteration, whereby we + // find the next sequence that comes after the array + nextSequence = eventSequence.Uint() + 1 + + data := event.Get("data") + if !data.Exists() { + continue + } + e.observeData(logger, data, eventSequence.Uint()) + } + + health, err := e.retrievePayload(aptosHealth) + if err != nil { + logger.Error("health", zap.Error(err)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1) + return err + } + + if !gjson.Valid(string(health)) { + logger.Error("Invalid JSON in health response: " + string(health)) + p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1) + continue + + } + + logger.Info(string(health) + string(eventsJson)) + + pHealth := gjson.ParseBytes(health) + + blockHeight := pHealth.Get("block_height") + + if blockHeight.Exists() { + currentAptosHeight.Set(float64(blockHeight.Uint())) + p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAptos, &gossipv1.Heartbeat_Network{ + Height: int64(blockHeight.Uint()), + ContractAddress: e.aptosAccount, + }) + + readiness.SetReady(common.ReadinessAptosSyncing) + } + } + } +} + func (e *Watcher) retrievePayload(s string) ([]byte, error) { res, err := http.Get(s) // nolint if err != nil { return nil, err } - body, err := ioutil.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) if err != nil { return nil, err } return body, err } -func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, native_seq uint64) { +func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq uint64) { em := data.Get("sender") if !em.Exists() { logger.Error("sender field missing") @@ -90,7 +253,7 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, native_seq copy(a[24:], emitter) id := make([]byte, 8) - binary.BigEndian.PutUint64(id, native_seq) + binary.BigEndian.PutUint64(id, nativeSeq) var txHash = eth_common.BytesToHash(id) // 32 bytes = d3b136a6a182a40554b2fafbc8d12a7a22737c10c81e33b33d1dcb74c532708b @@ -124,9 +287,9 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, native_seq return } - consistency_level := data.Get("consistency_level") - if !consistency_level.Exists() { - logger.Error("consistency_level field missing") + consistencyLevel := data.Get("consistency_level") + if !consistencyLevel.Exists() { + logger.Error("consistencyLevel field missing") return } @@ -138,7 +301,7 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, native_seq EmitterChain: vaa.ChainIDAptos, EmitterAddress: a, Payload: pl, - ConsistencyLevel: uint8(consistency_level.Uint()), + ConsistencyLevel: uint8(consistencyLevel.Uint()), } aptosMessagesConfirmed.Inc() @@ -151,182 +314,8 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, native_seq zap.Stringer("emitter_chain", observation.EmitterChain), zap.Stringer("emitter_address", observation.EmitterAddress), zap.Binary("payload", observation.Payload), - zap.Uint8("consistency_level", observation.ConsistencyLevel), + zap.Uint8("consistencyLevel", observation.ConsistencyLevel), ) e.msgChan <- observation } - -func (e *Watcher) Run(ctx context.Context) error { - p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAptos, &gossipv1.Heartbeat_Network{ - ContractAddress: e.aptosAccount, - }) - - logger := supervisor.Logger(ctx) - errC := make(chan error) - - logger.Info("Aptos watcher connecting to RPC node ", zap.String("url", e.aptosRPC)) - - // SECURITY: the API guarantees that we only get the events from the right - // contract - var eventsEndpoint string = fmt.Sprintf(`%s/v1/accounts/%s/events/%s/event`, e.aptosRPC, e.aptosAccount, e.aptosHandle) - var aptosHealth string = fmt.Sprintf(`%s/v1`, e.aptosRPC) - - // the events have sequence numbers associated with them in the aptos API - // (NOTE: this is not the same as the wormhole sequence id). The event - // endpoint is paginated, so we use this variable to keep track of which - // sequence number to look up next. - var next_sequence uint64 = 0 - - go func() { - timer := time.NewTicker(time.Second * 1) - defer timer.Stop() - - for { - select { - case <-ctx.Done(): - return - case r := <-e.obsvReqC: - if vaa.ChainID(r.ChainId) != vaa.ChainIDAptos { - panic("invalid chain ID") - } - - native_seq := binary.BigEndian.Uint64(r.TxHash) - - logger.Info("Received obsv request", zap.Uint64("tx_hash", native_seq)) - - s := fmt.Sprintf(`%s?start=%d&limit=1`, eventsEndpoint, native_seq) - - body, err := e.retrievePayload(s) - if err != nil { - logger.Error("retrievePayload", zap.Error(err)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1) - errC <- err - break - } - - if !gjson.Valid(string(body)) { - logger.Error("InvalidJson: " + string(body)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1) - break - - } - - outcomes := gjson.ParseBytes(body) - - for _, chunk := range outcomes.Array() { - newSeq := chunk.Get("sequence_number") - if !newSeq.Exists() { - break - } - - if newSeq.Uint() != native_seq { - logger.Error("newSeq != native_seq") - break - - } - - data := chunk.Get("data") - if !data.Exists() { - break - } - e.observeData(logger, data, native_seq) - } - - case <-timer.C: - s := "" - - if next_sequence == 0 { - // if next_sequence is 0, we look up the most recent event - s = fmt.Sprintf(`%s?limit=1`, eventsEndpoint) - } else { - // otherwise just look up events starting at next_sequence. - // this will potentially return multiple events (whatever - // the default limit is per page), so we'll handle all of them. - s = fmt.Sprintf(`%s?start=%d`, eventsEndpoint, next_sequence) - } - - events_json, err := e.retrievePayload(s) - if err != nil { - logger.Error("retrievePayload", zap.Error(err)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1) - errC <- err - break - } - - // data doesn't exist yet. skip, and try again later - // this happens when the sequence id we're looking up hasn't - // been used yet. - if string(events_json) == "" { - continue - } - - if !gjson.Valid(string(events_json)) { - logger.Error("InvalidJson: " + string(events_json)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1) - break - - } - - events := gjson.ParseBytes(events_json) - - // the endpoint returns an array of events, ordered by sequence - // id (ASC) - for _, event := range events.Array() { - event_sequence := event.Get("sequence_number") - if !event_sequence.Exists() { - continue - } - - // this is interesting in the last iteration, whereby we - // find the next sequence that comes after the array - next_sequence = event_sequence.Uint() + 1 - - data := event.Get("data") - if !data.Exists() { - continue - } - e.observeData(logger, data, event_sequence.Uint()) - } - - health, err := e.retrievePayload(aptosHealth) - if err != nil { - logger.Error("health", zap.Error(err)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1) - errC <- err - break - } - - if !gjson.Valid(string(health)) { - logger.Error("Invalid JSON in health response: " + string(health)) - p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1) - continue - - } - - logger.Info(string(health) + string(events_json)) - - phealth := gjson.ParseBytes(health) - - block_height := phealth.Get("block_height") - - if block_height.Exists() { - currentAptosHeight.Set(float64(block_height.Uint())) - p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAptos, &gossipv1.Heartbeat_Network{ - Height: int64(block_height.Uint()), - ContractAddress: e.aptosAccount, - }) - - readiness.SetReady(common.ReadinessAptosSyncing) - } - } - } - }() - - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-errC: - return err - } -}