node: minor refactoring of aptos watcher
Change-Id: Ideac6ef7e26d9d7272b4cda32a80cf007d62a8a5
This commit is contained in:
parent
bc6b5af095
commit
a6d6586d39
|
@ -5,7 +5,7 @@ import (
|
|||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
|
@ -64,19 +64,182 @@ func NewWatcher(
|
|||
}
|
||||
}
|
||||
|
||||
func (e *Watcher) Run(ctx context.Context) error {
|
||||
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAptos, &gossipv1.Heartbeat_Network{
|
||||
ContractAddress: e.aptosAccount,
|
||||
})
|
||||
|
||||
logger := supervisor.Logger(ctx)
|
||||
|
||||
logger.Info("Aptos watcher connecting to RPC node ", zap.String("url", e.aptosRPC))
|
||||
|
||||
// SECURITY: the API guarantees that we only get the events from the right
|
||||
// contract
|
||||
var eventsEndpoint = fmt.Sprintf(`%s/v1/accounts/%s/events/%s/event`, e.aptosRPC, e.aptosAccount, e.aptosHandle)
|
||||
var aptosHealth = fmt.Sprintf(`%s/v1`, e.aptosRPC)
|
||||
|
||||
// the events have sequence numbers associated with them in the aptos API
|
||||
// (NOTE: this is not the same as the wormhole sequence id). The event
|
||||
// endpoint is paginated, so we use this variable to keep track of which
|
||||
// sequence number to look up next.
|
||||
var nextSequence uint64 = 0
|
||||
|
||||
timer := time.NewTicker(time.Second * 1)
|
||||
defer timer.Stop()
|
||||
|
||||
supervisor.Signal(ctx, supervisor.SignalHealthy)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case r := <-e.obsvReqC:
|
||||
if vaa.ChainID(r.ChainId) != vaa.ChainIDAptos {
|
||||
panic("invalid chain ID")
|
||||
}
|
||||
|
||||
nativeSeq := binary.BigEndian.Uint64(r.TxHash)
|
||||
|
||||
logger.Info("Received obsv request", zap.Uint64("tx_hash", nativeSeq))
|
||||
|
||||
s := fmt.Sprintf(`%s?start=%d&limit=1`, eventsEndpoint, nativeSeq)
|
||||
|
||||
body, err := e.retrievePayload(s)
|
||||
if err != nil {
|
||||
logger.Error("retrievePayload", zap.Error(err))
|
||||
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
|
||||
return err
|
||||
}
|
||||
|
||||
if !gjson.Valid(string(body)) {
|
||||
logger.Error("InvalidJson: " + string(body))
|
||||
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
|
||||
break
|
||||
|
||||
}
|
||||
|
||||
outcomes := gjson.ParseBytes(body)
|
||||
|
||||
for _, chunk := range outcomes.Array() {
|
||||
newSeq := chunk.Get("sequence_number")
|
||||
if !newSeq.Exists() {
|
||||
break
|
||||
}
|
||||
|
||||
if newSeq.Uint() != nativeSeq {
|
||||
logger.Error("newSeq != nativeSeq")
|
||||
break
|
||||
|
||||
}
|
||||
|
||||
data := chunk.Get("data")
|
||||
if !data.Exists() {
|
||||
break
|
||||
}
|
||||
e.observeData(logger, data, nativeSeq)
|
||||
}
|
||||
|
||||
case <-timer.C:
|
||||
s := ""
|
||||
|
||||
if nextSequence == 0 {
|
||||
// if nextSequence is 0, we look up the most recent event
|
||||
s = fmt.Sprintf(`%s?limit=1`, eventsEndpoint)
|
||||
} else {
|
||||
// otherwise just look up events starting at nextSequence.
|
||||
// this will potentially return multiple events (whatever
|
||||
// the default limit is per page), so we'll handle all of them.
|
||||
s = fmt.Sprintf(`%s?start=%d`, eventsEndpoint, nextSequence)
|
||||
}
|
||||
|
||||
eventsJson, err := e.retrievePayload(s)
|
||||
if err != nil {
|
||||
logger.Error("retrievePayload", zap.Error(err))
|
||||
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
|
||||
return err
|
||||
}
|
||||
|
||||
// data doesn't exist yet. skip, and try again later
|
||||
// this happens when the sequence id we're looking up hasn't
|
||||
// been used yet.
|
||||
if string(eventsJson) == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if !gjson.Valid(string(eventsJson)) {
|
||||
logger.Error("InvalidJson: " + string(eventsJson))
|
||||
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
|
||||
break
|
||||
|
||||
}
|
||||
|
||||
events := gjson.ParseBytes(eventsJson)
|
||||
|
||||
// the endpoint returns an array of events, ordered by sequence
|
||||
// id (ASC)
|
||||
for _, event := range events.Array() {
|
||||
eventSequence := event.Get("sequence_number")
|
||||
if !eventSequence.Exists() {
|
||||
continue
|
||||
}
|
||||
|
||||
// this is interesting in the last iteration, whereby we
|
||||
// find the next sequence that comes after the array
|
||||
nextSequence = eventSequence.Uint() + 1
|
||||
|
||||
data := event.Get("data")
|
||||
if !data.Exists() {
|
||||
continue
|
||||
}
|
||||
e.observeData(logger, data, eventSequence.Uint())
|
||||
}
|
||||
|
||||
health, err := e.retrievePayload(aptosHealth)
|
||||
if err != nil {
|
||||
logger.Error("health", zap.Error(err))
|
||||
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
|
||||
return err
|
||||
}
|
||||
|
||||
if !gjson.Valid(string(health)) {
|
||||
logger.Error("Invalid JSON in health response: " + string(health))
|
||||
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
|
||||
continue
|
||||
|
||||
}
|
||||
|
||||
logger.Info(string(health) + string(eventsJson))
|
||||
|
||||
pHealth := gjson.ParseBytes(health)
|
||||
|
||||
blockHeight := pHealth.Get("block_height")
|
||||
|
||||
if blockHeight.Exists() {
|
||||
currentAptosHeight.Set(float64(blockHeight.Uint()))
|
||||
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAptos, &gossipv1.Heartbeat_Network{
|
||||
Height: int64(blockHeight.Uint()),
|
||||
ContractAddress: e.aptosAccount,
|
||||
})
|
||||
|
||||
readiness.SetReady(common.ReadinessAptosSyncing)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Watcher) retrievePayload(s string) ([]byte, error) {
|
||||
res, err := http.Get(s) // nolint
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
body, err := ioutil.ReadAll(res.Body)
|
||||
body, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return body, err
|
||||
}
|
||||
|
||||
func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, native_seq uint64) {
|
||||
func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, nativeSeq uint64) {
|
||||
em := data.Get("sender")
|
||||
if !em.Exists() {
|
||||
logger.Error("sender field missing")
|
||||
|
@ -90,7 +253,7 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, native_seq
|
|||
copy(a[24:], emitter)
|
||||
|
||||
id := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(id, native_seq)
|
||||
binary.BigEndian.PutUint64(id, nativeSeq)
|
||||
|
||||
var txHash = eth_common.BytesToHash(id) // 32 bytes = d3b136a6a182a40554b2fafbc8d12a7a22737c10c81e33b33d1dcb74c532708b
|
||||
|
||||
|
@ -124,9 +287,9 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, native_seq
|
|||
return
|
||||
}
|
||||
|
||||
consistency_level := data.Get("consistency_level")
|
||||
if !consistency_level.Exists() {
|
||||
logger.Error("consistency_level field missing")
|
||||
consistencyLevel := data.Get("consistency_level")
|
||||
if !consistencyLevel.Exists() {
|
||||
logger.Error("consistencyLevel field missing")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -138,7 +301,7 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, native_seq
|
|||
EmitterChain: vaa.ChainIDAptos,
|
||||
EmitterAddress: a,
|
||||
Payload: pl,
|
||||
ConsistencyLevel: uint8(consistency_level.Uint()),
|
||||
ConsistencyLevel: uint8(consistencyLevel.Uint()),
|
||||
}
|
||||
|
||||
aptosMessagesConfirmed.Inc()
|
||||
|
@ -151,182 +314,8 @@ func (e *Watcher) observeData(logger *zap.Logger, data gjson.Result, native_seq
|
|||
zap.Stringer("emitter_chain", observation.EmitterChain),
|
||||
zap.Stringer("emitter_address", observation.EmitterAddress),
|
||||
zap.Binary("payload", observation.Payload),
|
||||
zap.Uint8("consistency_level", observation.ConsistencyLevel),
|
||||
zap.Uint8("consistencyLevel", observation.ConsistencyLevel),
|
||||
)
|
||||
|
||||
e.msgChan <- observation
|
||||
}
|
||||
|
||||
func (e *Watcher) Run(ctx context.Context) error {
|
||||
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAptos, &gossipv1.Heartbeat_Network{
|
||||
ContractAddress: e.aptosAccount,
|
||||
})
|
||||
|
||||
logger := supervisor.Logger(ctx)
|
||||
errC := make(chan error)
|
||||
|
||||
logger.Info("Aptos watcher connecting to RPC node ", zap.String("url", e.aptosRPC))
|
||||
|
||||
// SECURITY: the API guarantees that we only get the events from the right
|
||||
// contract
|
||||
var eventsEndpoint string = fmt.Sprintf(`%s/v1/accounts/%s/events/%s/event`, e.aptosRPC, e.aptosAccount, e.aptosHandle)
|
||||
var aptosHealth string = fmt.Sprintf(`%s/v1`, e.aptosRPC)
|
||||
|
||||
// the events have sequence numbers associated with them in the aptos API
|
||||
// (NOTE: this is not the same as the wormhole sequence id). The event
|
||||
// endpoint is paginated, so we use this variable to keep track of which
|
||||
// sequence number to look up next.
|
||||
var next_sequence uint64 = 0
|
||||
|
||||
go func() {
|
||||
timer := time.NewTicker(time.Second * 1)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case r := <-e.obsvReqC:
|
||||
if vaa.ChainID(r.ChainId) != vaa.ChainIDAptos {
|
||||
panic("invalid chain ID")
|
||||
}
|
||||
|
||||
native_seq := binary.BigEndian.Uint64(r.TxHash)
|
||||
|
||||
logger.Info("Received obsv request", zap.Uint64("tx_hash", native_seq))
|
||||
|
||||
s := fmt.Sprintf(`%s?start=%d&limit=1`, eventsEndpoint, native_seq)
|
||||
|
||||
body, err := e.retrievePayload(s)
|
||||
if err != nil {
|
||||
logger.Error("retrievePayload", zap.Error(err))
|
||||
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
|
||||
errC <- err
|
||||
break
|
||||
}
|
||||
|
||||
if !gjson.Valid(string(body)) {
|
||||
logger.Error("InvalidJson: " + string(body))
|
||||
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
|
||||
break
|
||||
|
||||
}
|
||||
|
||||
outcomes := gjson.ParseBytes(body)
|
||||
|
||||
for _, chunk := range outcomes.Array() {
|
||||
newSeq := chunk.Get("sequence_number")
|
||||
if !newSeq.Exists() {
|
||||
break
|
||||
}
|
||||
|
||||
if newSeq.Uint() != native_seq {
|
||||
logger.Error("newSeq != native_seq")
|
||||
break
|
||||
|
||||
}
|
||||
|
||||
data := chunk.Get("data")
|
||||
if !data.Exists() {
|
||||
break
|
||||
}
|
||||
e.observeData(logger, data, native_seq)
|
||||
}
|
||||
|
||||
case <-timer.C:
|
||||
s := ""
|
||||
|
||||
if next_sequence == 0 {
|
||||
// if next_sequence is 0, we look up the most recent event
|
||||
s = fmt.Sprintf(`%s?limit=1`, eventsEndpoint)
|
||||
} else {
|
||||
// otherwise just look up events starting at next_sequence.
|
||||
// this will potentially return multiple events (whatever
|
||||
// the default limit is per page), so we'll handle all of them.
|
||||
s = fmt.Sprintf(`%s?start=%d`, eventsEndpoint, next_sequence)
|
||||
}
|
||||
|
||||
events_json, err := e.retrievePayload(s)
|
||||
if err != nil {
|
||||
logger.Error("retrievePayload", zap.Error(err))
|
||||
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
|
||||
errC <- err
|
||||
break
|
||||
}
|
||||
|
||||
// data doesn't exist yet. skip, and try again later
|
||||
// this happens when the sequence id we're looking up hasn't
|
||||
// been used yet.
|
||||
if string(events_json) == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
if !gjson.Valid(string(events_json)) {
|
||||
logger.Error("InvalidJson: " + string(events_json))
|
||||
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
|
||||
break
|
||||
|
||||
}
|
||||
|
||||
events := gjson.ParseBytes(events_json)
|
||||
|
||||
// the endpoint returns an array of events, ordered by sequence
|
||||
// id (ASC)
|
||||
for _, event := range events.Array() {
|
||||
event_sequence := event.Get("sequence_number")
|
||||
if !event_sequence.Exists() {
|
||||
continue
|
||||
}
|
||||
|
||||
// this is interesting in the last iteration, whereby we
|
||||
// find the next sequence that comes after the array
|
||||
next_sequence = event_sequence.Uint() + 1
|
||||
|
||||
data := event.Get("data")
|
||||
if !data.Exists() {
|
||||
continue
|
||||
}
|
||||
e.observeData(logger, data, event_sequence.Uint())
|
||||
}
|
||||
|
||||
health, err := e.retrievePayload(aptosHealth)
|
||||
if err != nil {
|
||||
logger.Error("health", zap.Error(err))
|
||||
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
|
||||
errC <- err
|
||||
break
|
||||
}
|
||||
|
||||
if !gjson.Valid(string(health)) {
|
||||
logger.Error("Invalid JSON in health response: " + string(health))
|
||||
p2p.DefaultRegistry.AddErrorCount(vaa.ChainIDAptos, 1)
|
||||
continue
|
||||
|
||||
}
|
||||
|
||||
logger.Info(string(health) + string(events_json))
|
||||
|
||||
phealth := gjson.ParseBytes(health)
|
||||
|
||||
block_height := phealth.Get("block_height")
|
||||
|
||||
if block_height.Exists() {
|
||||
currentAptosHeight.Set(float64(block_height.Uint()))
|
||||
p2p.DefaultRegistry.SetNetworkStats(vaa.ChainIDAptos, &gossipv1.Heartbeat_Network{
|
||||
Height: int64(block_height.Uint()),
|
||||
ContractAddress: e.aptosAccount,
|
||||
})
|
||||
|
||||
readiness.SetReady(common.ReadinessAptosSyncing)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-errC:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue